tsio.py 19.8 KB
Newer Older
1
from datetime import datetime
2
from contextlib import contextmanager
3
import logging
4
import hashlib
5
6
7

import pandas as pd

8
from sqlalchemy import Table, Column, Integer, ForeignKey
Aurélien Campéas's avatar
Aurélien Campéas committed
9
from sqlalchemy.sql.expression import select, func, desc
10
from sqlalchemy.dialects.postgresql import BYTEA
11

12
from tshistory.schema import tsschema
13
14
from tshistory.util import (
    inject_in_index,
15
    num2float,
16
    subset,
17
    SeriesServices,
18
19
    tzaware_serie
)
20
from tshistory.snapshot import Snapshot
21

22
L = logging.getLogger('tshistory.tsio')
23
TABLES = {}
24
25


26
class TimeSerie(SeriesServices):
27
    namespace = 'tsh'
28
    schema = None
29
    metadatacache = None
30
    registry_map = None
31
    serie_tablename = None
32
33
34

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
35
36
        self.schema = tsschema(namespace)
        self.schema.define()
37
        self.metadatacache = {}
38
        self.registry_map = {}
39
        self.serie_tablename = {}
40

41
    def insert(self, cn, newts, seriename, author,
42
43
               metadata=None,
               _insertion_date=None):
44
        """Create a new revision of a given time series
45

46
        newts: pandas.Series with date index
47
        seriename: str unique identifier of the serie
48
        author: str free-form author name
49
        metadata: optional dict for changeset metadata
50
51
        """
        assert isinstance(newts, pd.Series)
52
        assert isinstance(seriename, str)
Aurélien Campéas's avatar
Aurélien Campéas committed
53
        assert isinstance(author, str)
54
        assert metadata is None or isinstance(metadata, dict)
Aurélien Campéas's avatar
Aurélien Campéas committed
55
        assert _insertion_date is None or isinstance(_insertion_date, datetime)
56
        assert not newts.index.duplicated().any()
57

58
        newts = num2float(newts)
59

60
        if not len(newts):
61
            return
62

63
        assert ('<M8[ns]' == newts.index.dtype or
64
                'datetime' in str(newts.index.dtype) and not
65
66
                isinstance(newts.index, pd.MultiIndex))

67
68
        newts.name = seriename
        table = self._get_ts_table(cn, seriename)
69

70
        if table is None:
71
            return self._create(cn, newts, seriename, author,
72
                                metadata, _insertion_date)
73

74
        return self._update(cn, table, newts, seriename, author,
75
                            metadata, _insertion_date)
76

77
    def get(self, cn, seriename, revision_date=None,
78
79
            from_value_date=None, to_value_date=None,
            _keep_nans=False):
80
81
82
83
84
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

85
        """
86
        table = self._get_ts_table(cn, seriename)
87
88
        if table is None:
            return
89

90
        csetfilter = []
91
        if revision_date:
92
            csetfilter.append(lambda cset: cset.c.insertion_date <= revision_date)
93
        snap = Snapshot(cn, self, seriename)
94
        _, current = snap.find(csetfilter=csetfilter,
95
96
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)
97

98
        if current is not None and not _keep_nans:
99
            current.name = seriename
100
            current = current[~current.isnull()]
101
        return current
102

103
    def metadata(self, cn, seriename):
104
        """Return metadata dict of timeserie."""
105
106
        if seriename in self.metadatacache:
            return self.metadatacache[seriename]
107
108
        reg = self.schema.registry
        sql = select([reg.c.metadata]).where(
109
            reg.c.seriename == seriename
110
111
        )
        meta = cn.execute(sql).scalar()
112
        self.metadatacache[seriename] = meta
113
114
        return meta

115
    def update_metadata(self, cn, seriename, metadata, internal=False):
116
        assert isinstance(metadata, dict)
117
        meta = self.metadata(cn, seriename)
118
119
120
121
122
        if not internal:
            assert set(meta.keys()).intersection(metadata.keys()) == set()
        meta.update(metadata)
        reg = self.schema.registry
        sql = reg.update().where(
123
            reg.c.seriename == seriename
124
125
126
        ).values(metadata=metadata)
        cn.execute(sql)

127
128
129
130
131
132
133
134
    def changeset_metadata(self, cn, csid):
        cset = self.schema.changeset
        sql = 'select metadata from "{ns}".changeset where id = {id}'.format(
            ns=self.namespace,
            id=csid
        )
        return cn.execute(sql).scalar()

135
    def get_history(self, cn, seriename,
136
                    from_insertion_date=None,
137
138
                    to_insertion_date=None,
                    from_value_date=None,
139
                    to_value_date=None,
140
                    deltabefore=None,
141
142
                    deltaafter=None,
                    diffmode=False):
143
        table = self._get_ts_table(cn, seriename)
144
145
146
        if table is None:
            return

147
148
149
150
        if deltabefore is not None or deltaafter is not None:
            assert from_value_date is None
            assert to_value_date is None

151
        cset = self.schema.changeset
152
153
154
155
156
157
158
159
160
161
162
163
        revsql = select(
            [cset.c.id, cset.c.insertion_date]
        ).order_by(
            cset.c.id
        ).where(
            table.c.cset == cset.c.id
        )

        if from_insertion_date:
            revsql = revsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            revsql = revsql.where(cset.c.insertion_date <= to_insertion_date)
164

165
166
167
        revs = cn.execute(revsql).fetchall()
        if not revs:
            return
168

169
170
171
172
173
174
        if diffmode:
            # compute the previous serie value
            first_csid = revs[0][0]
            previous_csid = self._previous_cset(cn, seriename, first_csid)
            revs.insert(0, (previous_csid, None))

175
        snapshot = Snapshot(cn, self, seriename)
176
177
        series = []
        for csid, idate in revs:
178
179
180
181
            if (deltabefore, deltaafter) != (None, None):
                from_value_date = None
                to_value_date = None
                if deltabefore is not None:
182
                    from_value_date = idate - deltabefore
183
                if deltaafter is not None:
184
                    to_value_date = idate + deltaafter
185
186
            series.append((
                idate,
187
188
189
                snapshot.find(csetfilter=[lambda cset: cset.c.id == csid],
                              from_value_date=from_value_date,
                              to_value_date=to_value_date)[1]
190
            ))
191

192
193
194
195
196
197
198
199
200
201
202
        if diffmode:
            diffs = []
            for (revdate_a, serie_a), (revdate_b, serie_b) in zip(series, series[1:]):
                if serie_a is None:
                    # when we scan the entirety of the history: there exists no "previous" serie
                    # we therefore consider the first serie as a diff to the "null" serie
                    diffs.append((revdate_b, serie_b))
                else:
                    diffs.append((revdate_b, self.diff(serie_a, serie_b)))
            series = diffs

203
        for revdate, serie in series:
204
            inject_in_index(serie, revdate)
205
206

        serie = pd.concat([serie for revdate_, serie in series])
207
        serie.name = seriename
208
        return serie
209

210
211
212
213
214
215
216
    def _previous_cset(self, cn, seriename, csid):
        tablename = self._serie_to_tablename(cn, seriename)
        sql = ('select cset from "{}.timeserie"."{}" '
               'where cset < %(csid)s '
               'order by cset desc limit 1').format(self.namespace, tablename)
        return cn.execute(sql, csid=csid).scalar()

217
218
219
220
    def get_delta(self, cn, seriename, delta,
                  from_value_date=None,
                  to_value_date=None):

221
        histo = self.get_history(
222
            cn, seriename, deltabefore=-delta
223
224
225
        )

        df = histo.reset_index()
226

227
228
229
230
231
232
233
        # df_date is a dataframe with two columns: value_date and insertion_date
        df_date = df.loc[:, ['insertion_date', 'value_date']]

        # now in selected_dates each value_date has only one occurence
        # which is the last inserted
        selected_dates = df_date.groupby('value_date').max().reset_index()

234
        ts = df[seriename]
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
        # ts is built from the df returned from get_history
        # ts index is now a simple index of tuples (insert_date, value_date)
        ts.index = ((row.insertion_date, row.value_date)
                    for row in df.itertuples())
        # in ts, there ie still all the couple value_date * insertion_date
        # We now used the selected_dates to select in ts only
        # the couple (value_date, insertion_date)
        # which corresponds to the last insertion_date
        ts_select = ts[[(row[2], row[1])
                        for row in selected_dates.itertuples()]]

        # ts_select has still a simple index of tuples (value_date, insertion_date)
        new_index = (elt[1] for elt in ts_select.index)

        # we only keep the value_date information from the index
        ts_select.index = new_index
251
        return subset(ts_select, from_value_date, to_value_date)
252
253


254
255
    def exists(self, cn, seriename):
        return self._get_ts_table(cn, seriename) is not None
256

257
    def latest_insertion_date(self, cn, seriename):
258
        cset = self.schema.changeset
259
        tstable = self._get_ts_table(cn, seriename)
260
        sql = select([func.max(cset.c.insertion_date)]
261
        ).where(tstable.c.cset == cset.c.id)
262
        return cn.execute(sql).scalar()
263

264
265
266
267
    def last_id(self, cn, seriename):
        snapshot = Snapshot(cn, self, seriename)
        return snapshot.last_id()

268
269
270
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
        assert mode in ('strict', 'before', 'after')
        cset = self.schema.changeset
271
        table = self._table_definition_for(cn, seriename)
272
273
        sql = select([table.c.cset]).where(
            table.c.cset == cset.c.id
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
        )
        if mode == 'strict':
            sql = sql.where(cset.c.insertion_date == revdate)
        elif mode == 'before':
            sql = sql.where(cset.c.insertion_date <= revdate)
        else:
            sql = sql.where(cset.c.insertion_date >= revdate)
        return cn.execute(sql).scalar()

    def strip(self, cn, seriename, csid):
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs

        # put stripping info in the metadata
        cset = self.schema.changeset
        cset_serie = self.schema.changeset_series
        for log in logs:
            # update changeset.metadata
292
            metadata = self.changeset_metadata(cn, log['rev']) or {}
293
294
295
296
297
298
            metadata['tshistory.info'] = 'got stripped from {}'.format(csid)
            sql = cset.update().where(cset.c.id == log['rev']
            ).values(metadata=metadata)
            cn.execute(sql)
            # delete changset_serie item
            sql = cset_serie.delete().where(
299
                cset_serie.c.cset == log['rev']
300
            ).where(
301
                cset_serie.c.serie == self._name_to_regid(cn, seriename)
302
303
304
305
            )
            cn.execute(sql)

        # wipe the diffs
306
        table = self._table_definition_for(cn, seriename)
307
        cn.execute(table.delete().where(table.c.cset >= csid))
308

309
    def info(self, cn):
310
311
        """Gather global statistics on the current tshistory repository
        """
312
        sql = 'select count(*) from {}.registry'.format(self.namespace)
313
        stats = {'series count': cn.execute(sql).scalar()}
314
        sql = 'select max(id) from {}.changeset'.format(self.namespace)
315
        stats['changeset count'] = cn.execute(sql).scalar()
316
        sql = 'select distinct name from {}.registry order by name'.format(self.namespace)
317
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
318
319
        return stats

320
    def log(self, cn, limit=0, names=None, authors=None,
321
            stripped=False,
322
323
            fromrev=None, torev=None,
            fromdate=None, todate=None):
324
325
326
327
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
328
329
330
331
332
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
333

334
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date, cset.c.metadata]
335
        ).distinct().order_by(desc(cset.c.id))
336
337
338

        if limit:
            sql = sql.limit(limit)
339
        if names:
340
            sql = sql.where(reg.c.seriename.in_(names))
341
342
        if authors:
            sql = sql.where(cset.c.author.in_(authors))
343
344
345
346
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)
        if torev:
            sql = sql.where(cset.c.id <= torev)
347
348
349
350
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)
        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)
351
352
353
354
        if stripped:
            # outerjoin to show dead things
            sql = sql.select_from(cset.outerjoin(cset_series))
        else:
355
            sql = sql.where(cset.c.id == cset_series.c.cset
356
            ).where(cset_series.c.serie == reg.c.id)
357

358
        rset = cn.execute(sql)
359
        for csetid, author, revdate, meta in rset.fetchall():
360
361
            log.append({'rev': csetid, 'author': author,
                        'date': pd.Timestamp(revdate, tz='utc'),
362
                        'meta': meta or {},
363
                        'names': self._changeset_series(cn, csetid)})
364

365
        log.sort(key=lambda rev: rev['rev'])
366
367
        return log

368
369
    # /API
    # Helpers
370

371
372
    # creation / update

373
    def _create(self, cn, newts, seriename, author,
374
                metadata=None, insertion_date=None):
375
376
377
        # initial insertion
        if len(newts) == 0:
            return None
378
        self._register_serie(cn, seriename, newts)
379
        snapshot = Snapshot(cn, self, seriename)
380
        csid = self._newchangeset(cn, author, insertion_date, metadata)
381
382
383
384
385
        head = snapshot.create(newts)
        value = {
            'cset': csid,
            'snapshot': head
        }
386
        table = self._make_ts_table(cn, seriename, newts)
387
        cn.execute(table.insert().values(value))
388
        self._finalize_insertion(cn, csid, seriename)
389
        L.info('first insertion of %s (size=%s) by %s',
390
               seriename, len(newts), author)
391
392
        return newts

393
    def _update(self, cn, table, newts, seriename, author,
394
                metadata=None, insertion_date=None):
395
396
        self._validate(cn, newts, seriename)
        snapshot = Snapshot(cn, self, seriename)
397
398
399
        diff = self.diff(snapshot.last(newts.index.min(),
                                       newts.index.max()),
                         newts)
400
401
        if not len(diff):
            L.info('no difference in %s by %s (for ts of size %s)',
402
                   seriename, author, len(newts))
403
404
            return

405
        csid = self._newchangeset(cn, author, insertion_date, metadata)
406
407
408
409
410
411
        head = snapshot.update(diff)
        value = {
            'cset': csid,
            'snapshot': head
        }
        cn.execute(table.insert().values(value))
412
        self._finalize_insertion(cn, csid, seriename)
413
414

        L.info('inserted diff (size=%s) for ts %s by %s',
415
               len(diff), seriename, author)
416
417
        return diff

418
    # serie table handling
419

420
421
422
423
    def _serie_to_tablename(self, cn, seriename):
        tablename = self.serie_tablename.get(seriename)
        if tablename is not None:
            return tablename
424

425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
        reg = self.schema.registry
        sql = select([reg.c.table_name]).where(reg.c.seriename == seriename)
        tablename = cn.execute(sql).scalar()
        if tablename is None:
            # creation time
            return
        self.serie_tablename[seriename] = tablename
        return tablename

    def _table_definition_for(self, cn, seriename):
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            # creation time
            tablename = self._make_tablename(seriename)
        fq_tablename = '{}.timeserie.{}'.format(self.namespace, tablename)
        table = TABLES.get(fq_tablename)
441
        if table is None:
442
443
            TABLES[fq_tablename] = table = Table(
                tablename, self.schema.meta,
444
445
446
447
448
449
450
                Column('id', Integer, primary_key=True),
                Column('cset', Integer,
                       ForeignKey('{}.changeset.id'.format(self.namespace)),
                       index=True, nullable=False),
                Column('snapshot', Integer,
                       ForeignKey('{}.snapshot.{}.id'.format(
                           self.namespace,
451
                           tablename)),
452
453
454
455
456
                       index=True),
                schema='{}.timeserie'.format(self.namespace),
                extend_existing=True
            )
        return table
457

458
    def _make_ts_table(self, cn, seriename, ts):
459
        table = self._table_definition_for(cn, seriename)
460
        table.create(cn)
461
462
463
        return table

    def _register_serie(self, cn, seriename, ts):
464
465
        index = ts.index
        inames = [name for name in index.names if name]
466
        sql = self.schema.registry.insert().values(
467
468
            seriename=seriename,
            table_name=self._make_tablename(seriename),
469
470
471
472
473
474
            metadata={
                'tzaware': tzaware_serie(ts),
                'index_type': index.dtype.name,
                'index_names': inames,
                'value_type': ts.dtypes.name
            },
475
        )
476
477
        regid = cn.execute(sql).inserted_primary_key[0]
        self.registry_map[seriename] = regid
478

479
    def _get_ts_table(self, cn, seriename):
480
481
482
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename:
            return self._table_definition_for(cn, tablename)
483

484
485
    # changeset handling

486
    def _newchangeset(self, cn, author, insertion_date=None, metadata=None):
487
        table = self.schema.changeset
488
489
490
        if insertion_date is not None:
            assert insertion_date.tzinfo is not None
        idate = pd.Timestamp(insertion_date or datetime.utcnow(), tz='UTC')
491
492
        sql = table.insert().values(
            author=author,
493
            metadata=metadata,
494
            insertion_date=idate)
495
        return cn.execute(sql).inserted_primary_key[0]
496

497
    def _changeset_series(self, cn, csid):
498
        cset_serie = self.schema.changeset_series
499
500
        reg = self.schema.registry
        sql = select(
501
            [reg.c.seriename]
502
503
        ).where(cset_serie.c.cset == csid
        ).where(cset_serie.c.serie == reg.c.id)
504

505
        return [
506
            row.seriename
507
508
            for row in cn.execute(sql).fetchall()
        ]
509
510
511

    # insertion handling

512
    def _validate(self, cn, ts, seriename):
513
514
        if ts.isnull().all():
            # ts erasure
515
            return
516
        tstype = ts.dtype
517
        meta = self.metadata(cn, seriename)
518
        if tstype != meta['value_type']:
519
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
520
                seriename, tstype, meta['value_type'])
521
            raise Exception(m)
522
        if ts.index.dtype.name != meta['index_type']:
523
            raise Exception('Incompatible index types')
524

525
526
    def _name_to_regid(self, cn, seriename):
        regid = self.registry_map.get(seriename)
527
528
529
530
        if regid is not None:
            return regid

        registry = self.schema.registry
531
        sql = select([registry.c.id]).where(registry.c.seriename == seriename)
532
        regid = self.registry_map[seriename] = cn.execute(sql).scalar()
533
534
        return regid

535
    def _finalize_insertion(self, cn, csid, seriename):
536
537
538
        table = self.schema.changeset_series
        sql = table.insert().values(
            cset=csid,
539
            serie=self._name_to_regid(cn, seriename)
540
541
        )
        cn.execute(sql)
542
543
544
545
546
547
548

    # don't use this

    def resetcaches(self):
        self.metadatacache.clear()
        self.registry_map.clear()
        self.serie_tablename.clear()