tsio.py 17 KB
Newer Older
1
from datetime import datetime
2
from contextlib import contextmanager
3
import logging
4
5
6

import pandas as pd

7
from sqlalchemy import Table, Column, Integer, ForeignKey
Aurélien Campéas's avatar
Aurélien Campéas committed
8
from sqlalchemy.sql.expression import select, func, desc
9
from sqlalchemy.dialects.postgresql import BYTEA
10

11
from tshistory.schema import tsschema
12
13
from tshistory.util import (
    inject_in_index,
14
    num2float,
15
    subset,
16
    SeriesServices,
17
18
    tzaware_serie
)
19
from tshistory.snapshot import Snapshot
20

21
L = logging.getLogger('tshistory.tsio')
22
TABLES = {}
23
24


25
class TimeSerie(SeriesServices):
26
    namespace = 'tsh'
27
    schema = None
28
29
30

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
31
32
        self.schema = tsschema(namespace)
        self.schema.define()
33
        self.metadatacache = {}
34

35
    def insert(self, cn, newts, name, author, _insertion_date=None):
36
        """Create a new revision of a given time series
37

38
        newts: pandas.Series with date index
39
        name: str unique identifier of the serie
40
        author: str free-form author name
41
42
        """
        assert isinstance(newts, pd.Series)
Aurélien Campéas's avatar
Aurélien Campéas committed
43
44
45
        assert isinstance(name, str)
        assert isinstance(author, str)
        assert _insertion_date is None or isinstance(_insertion_date, datetime)
46
        assert not newts.index.duplicated().any()
47

48
        newts = num2float(newts)
49

50
        if not len(newts):
51
            return
52

53
        assert ('<M8[ns]' == newts.index.dtype or
54
                'datetime' in str(newts.index.dtype) and not
55
56
                isinstance(newts.index, pd.MultiIndex))

57
        newts.name = name
58
        table = self._get_ts_table(cn, name)
59

60
        if table is None:
61
            return self._create(cn, newts, name, author, _insertion_date)
62

63
        return self._update(cn, table, newts, name, author, _insertion_date)
64

65
    def get(self, cn, name, revision_date=None,
66
67
            from_value_date=None, to_value_date=None,
            _keep_nans=False):
68
69
70
71
72
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

73
        """
74
        table = self._get_ts_table(cn, name)
75
76
        if table is None:
            return
77

78
79
80
        qfilter = []
        if revision_date:
            qfilter.append(lambda cset, _: cset.c.insertion_date <= revision_date)
81
        snap = Snapshot(cn, self, name)
82
83
84
        _, current = snap.find(qfilter,
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)
85

86
        if current is not None and not _keep_nans:
87
            current.name = name
88
            current = current[~current.isnull()]
89
        return current
90

91
    def metadata(self, cn, tsname):
92
93
94
95
96
97
98
99
100
101
102
        """Return metadata dict of timeserie."""
        if tsname in self.metadatacache:
            return self.metadatacache[tsname]
        reg = self.schema.registry
        sql = select([reg.c.metadata]).where(
            reg.c.name == tsname
        )
        meta = cn.execute(sql).scalar()
        self.metadatacache[tsname] = meta
        return meta

103
104
    def get_history(self, cn, name,
                    from_insertion_date=None,
105
106
                    to_insertion_date=None,
                    from_value_date=None,
107
                    to_value_date=None,
108
109
                    deltabefore=None,
                    deltaafter=None,
110
                    diffmode=False):
111
112
113
114
        table = self._get_ts_table(cn, name)
        if table is None:
            return

115
116
117
118
119
        if deltabefore is not None or deltaafter is not None:
            assert diffmode is False
            assert from_value_date is None
            assert to_value_date is None

120
        cset = self.schema.changeset
121

122
        if diffmode:
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
            # compute diffs above the snapshot
            diffsql = select([cset.c.id, cset.c.insertion_date, table.c.diff]
            ).order_by(cset.c.id
            ).where(table.c.cset == cset.c.id)

            if from_insertion_date:
                diffsql = diffsql.where(cset.c.insertion_date >= from_insertion_date)
            if to_insertion_date:
                diffsql = diffsql.where(cset.c.insertion_date <= to_insertion_date)

            diffs = cn.execute(diffsql).fetchall()
            if not diffs:
                # it's fine to ask for an insertion date range
                # where noting did happen, but you get nothing
                return

139
            snapshot = Snapshot(cn, self, name)
140
141
142
            series = []
            for csid, revdate, diff in diffs:
                if diff is None:  # we must fetch the initial snapshot
143
144
145
146
                    serie = subset(snapshot.first, from_value_date, to_value_date)
                else:
                    serie = subset(self._deserialize(diff, name), from_value_date, to_value_date)
                    serie = self._ensure_tz_consistency(cn, serie)
147
                inject_in_index(serie, revdate)
148
149
150
151
152
                series.append(serie)
            series = pd.concat(series)
            series.name = name
            return series

153
154
155
156
157
158
159
160
161
162
163
164
        revsql = select(
            [cset.c.id, cset.c.insertion_date]
        ).order_by(
            cset.c.id
        ).where(
            table.c.cset == cset.c.id
        )

        if from_insertion_date:
            revsql = revsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            revsql = revsql.where(cset.c.insertion_date <= to_insertion_date)
165

166
167
168
        revs = cn.execute(revsql).fetchall()
        if not revs:
            return
169

170
171
172
        snapshot = Snapshot(cn, self, name)
        series = []
        for csid, idate in revs:
173
174
175
176
177
178
179
            if deltabefore or deltaafter:
                from_value_date = idate
                to_value_date = idate
                if deltabefore:
                    from_value_date = idate - deltabefore
                if deltaafter:
                    to_value_date = idate + deltaafter
180
181
182
183
184
185
            series.append((
                idate,
                snapshot.find([lambda cset, _: cset.c.id == csid],
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)[1]
            ))
186
187

        for revdate, serie in series:
188
            inject_in_index(serie, revdate)
189
190
191
192

        serie = pd.concat([serie for revdate_, serie in series])
        serie.name = name
        return serie
193

194
195
196
    def exists(self, cn, name):
        return self._get_ts_table(cn, name) is not None

197
    def latest_insertion_date(self, cn, name):
198
        cset = self.schema.changeset
199
        tstable = self._get_ts_table(cn, name)
200
        sql = select([func.max(cset.c.insertion_date)]
201
        ).where(tstable.c.cset == cset.c.id)
202
        return cn.execute(sql).scalar()
203

204
205
206
207
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
        assert mode in ('strict', 'before', 'after')
        cset = self.schema.changeset
        table = self._table_definition_for(seriename)
208
209
        sql = select([table.c.cset]).where(
            table.c.cset == cset.c.id
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
        )
        if mode == 'strict':
            sql = sql.where(cset.c.insertion_date == revdate)
        elif mode == 'before':
            sql = sql.where(cset.c.insertion_date <= revdate)
        else:
            sql = sql.where(cset.c.insertion_date >= revdate)
        return cn.execute(sql).scalar()

    def strip(self, cn, seriename, csid):
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs

        # put stripping info in the metadata
        cset = self.schema.changeset
        cset_serie = self.schema.changeset_series
        for log in logs:
            # update changeset.metadata
            metadata = cn.execute(
                select([cset.c.metadata]).where(cset.c.id == log['rev'])
            ).scalar() or {}
            metadata['tshistory.info'] = 'got stripped from {}'.format(csid)
            sql = cset.update().where(cset.c.id == log['rev']
            ).values(metadata=metadata)
            cn.execute(sql)
            # delete changset_serie item
            sql = cset_serie.delete().where(
237
                cset_serie.c.cset == log['rev']
238
239
240
241
242
243
244
            ).where(
                cset_serie.c.serie == seriename
            )
            cn.execute(sql)

        # wipe the diffs
        table = self._table_definition_for(seriename)
245
        cn.execute(table.delete().where(table.c.cset >= csid))
246

247
    def info(self, cn):
248
249
        """Gather global statistics on the current tshistory repository
        """
250
        sql = 'select count(*) from {}.registry'.format(self.namespace)
251
        stats = {'series count': cn.execute(sql).scalar()}
252
        sql = 'select max(id) from {}.changeset'.format(self.namespace)
253
        stats['changeset count'] = cn.execute(sql).scalar()
254
        sql = 'select distinct name from {}.registry order by name'.format(self.namespace)
255
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
256
257
        return stats

258
    def log(self, cn, limit=0, diff=False, names=None, authors=None,
259
            stripped=False,
260
261
            fromrev=None, torev=None,
            fromdate=None, todate=None):
262
263
264
265
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
266
267
268
269
270
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
271

272
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date, cset.c.metadata]
273
        ).distinct().order_by(desc(cset.c.id))
274
275
276

        if limit:
            sql = sql.limit(limit)
277
278
        if names:
            sql = sql.where(reg.c.name.in_(names))
279
280
        if authors:
            sql = sql.where(cset.c.author.in_(authors))
281
282
283
284
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)
        if torev:
            sql = sql.where(cset.c.id <= torev)
285
286
287
288
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)
        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)
289
290
291
292
        if stripped:
            # outerjoin to show dead things
            sql = sql.select_from(cset.outerjoin(cset_series))
        else:
293
            sql = sql.where(cset.c.id == cset_series.c.cset
294
            ).where(cset_series.c.serie == reg.c.name)
295

296
        rset = cn.execute(sql)
297
        for csetid, author, revdate, meta in rset.fetchall():
298
299
            log.append({'rev': csetid, 'author': author,
                        'date': pd.Timestamp(revdate, tz='utc'),
300
                        'meta': meta or {},
301
                        'names': self._changeset_series(cn, csetid)})
302
303
304

        if diff:
            for rev in log:
305
                rev['diff'] = {name: self.diff_at(cn, rev['rev'], name)
306
307
                               for name in rev['names']}

308
        log.sort(key=lambda rev: rev['rev'])
309
310
        return log

311
312
    # /API
    # Helpers
313

314
315
316
317
318
319
320
    # creation / update

    def _create(self, cn, newts, name, author, insertion_date=None):
        # initial insertion
        if len(newts) == 0:
            return None
        snapshot = Snapshot(cn, self, name)
321
        csid = self._newchangeset(cn, author, insertion_date)
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
        head = snapshot.create(newts)
        value = {
            'cset': csid,
            'snapshot': head
        }
        table = self._make_ts_table(cn, name, newts)
        cn.execute(table.insert().values(value))
        self._finalize_insertion(cn, csid, name)
        L.info('first insertion of %s (size=%s) by %s',
               name, len(newts), author or self._author)
        return newts

    def _update(self, cn, table, newts, name, author, insertion_date=None):
        self._validate(cn, newts, name)
        snapshot = Snapshot(cn, self, name)
337
338
339
        diff = self.diff(snapshot.last(newts.index.min(),
                                       newts.index.max()),
                         newts)
340
341
342
343
344
        if not len(diff):
            L.info('no difference in %s by %s (for ts of size %s)',
                   name, author or self._author, len(newts))
            return

345
        csid = self._newchangeset(cn, author, insertion_date)
346
347
348
349
350
351
352
353
354
355
356
357
358
        head = snapshot.update(diff)
        value = {
            'cset': csid,
            'diff': self._serialize(diff),
            'snapshot': head
        }
        cn.execute(table.insert().values(value))
        self._finalize_insertion(cn, csid, name)

        L.info('inserted diff (size=%s) for ts %s by %s',
               len(diff), name, author or self._author)
        return diff

359
360
    # ts serialisation

361
362
363
364
365
    def _ensure_tz_consistency(self, cn, ts):
        """Return timeserie with tz aware index or not depending on metadata
        tzaware.
        """
        assert ts.name is not None
366
        metadata = self.metadata(cn, ts.name)
367
368
369
370
        if metadata and metadata.get('tzaware', False):
            return ts.tz_localize('UTC')
        return ts

371
    # serie table handling
372

373
374
    def _ts_table_name(self, seriename):
        # namespace.seriename
375
        return '{}.timeserie.{}'.format(self.namespace, seriename)
376

377
    def _table_definition_for(self, seriename):
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
        tablename = self._ts_table_name(seriename)
        table = TABLES.get(tablename)
        if table is None:
            TABLES[tablename] = table = Table(
                seriename, self.schema.meta,
                Column('id', Integer, primary_key=True),
                Column('cset', Integer,
                       ForeignKey('{}.changeset.id'.format(self.namespace)),
                       index=True, nullable=False),
                Column('diff', BYTEA),
                Column('snapshot', Integer,
                       ForeignKey('{}.snapshot.{}.id'.format(
                           self.namespace,
                           seriename)),
                       index=True),
                schema='{}.timeserie'.format(self.namespace),
                extend_existing=True
            )
        return table
397

398
    def _make_ts_table(self, cn, name, ts):
399
        tablename = self._ts_table_name(name)
400
        table = self._table_definition_for(name)
401
        table.create(cn)
402
403
        index = ts.index
        inames = [name for name in index.names if name]
404
        sql = self.schema.registry.insert().values(
405
            name=name,
406
            table_name=tablename,
407
408
409
410
411
412
            metadata={
                'tzaware': tzaware_serie(ts),
                'index_type': index.dtype.name,
                'index_names': inames,
                'value_type': ts.dtypes.name
            },
413
        )
414
        cn.execute(sql)
415
416
        return table

417
    def _get_ts_table(self, cn, name):
418
        reg = self.schema.registry
419
420
        tablename = self._ts_table_name(name)
        sql = reg.select().where(reg.c.table_name == tablename)
421
        tid = cn.execute(sql).scalar()
422
        if tid:
423
            return self._table_definition_for(name)
424

425
426
    # changeset handling

427
    def _newchangeset(self, cn, author, _insertion_date=None):
428
        table = self.schema.changeset
429
430
431
        if _insertion_date is not None:
            assert _insertion_date.tzinfo is not None
        idate = pd.Timestamp(_insertion_date or datetime.utcnow(), tz='UTC')
432
433
        sql = table.insert().values(
            author=author,
434
            insertion_date=idate)
435
        return cn.execute(sql).inserted_primary_key[0]
436

437
    def _changeset_series(self, cn, csid):
438
        cset_serie = self.schema.changeset_series
439
        sql = select([cset_serie.c.serie]
440
        ).where(cset_serie.c.cset == csid)
441

442
        return [seriename for seriename, in cn.execute(sql).fetchall()]
443
444
445

    # insertion handling

446
    def _validate(self, cn, ts, name):
447
448
        if ts.isnull().all():
            # ts erasure
449
            return
450
        tstype = ts.dtype
451
        meta = self.metadata(cn, name)
452
        if tstype != meta['value_type']:
453
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
454
                name, tstype, meta['value_type'])
455
            raise Exception(m)
456
        if ts.index.dtype.name != meta['index_type']:
457
            raise Exception('Incompatible index types')
458

459
460
461
462
463
464
465
    def _finalize_insertion(self, cn, csid, name):
        table = self.schema.changeset_series
        sql = table.insert().values(
            cset=csid,
            serie=name
        )
        cn.execute(sql)
466

467
    def diff_at(self, cn, csetid, name):
468
        table = self._get_ts_table(cn, name)
469
        cset = self.schema.changeset
470
471

        def filtercset(sql):
472
            return sql.where(table.c.cset == cset.c.id
473
474
475
            ).where(cset.c.id == csetid)

        sql = filtercset(select([table.c.id]))
476
        tsid = cn.execute(sql).scalar()
477
478

        if tsid == 1:
479
            return Snapshot(cn, self, name).first
480

481
        sql = filtercset(select([table.c.diff]))
482
483
        ts = self._deserialize(cn.execute(sql).scalar(), name)
        return self._ensure_tz_consistency(cn, ts)