tsio.py 21.7 KB
Newer Older
1
from datetime import datetime
2
from contextlib import contextmanager
3
import logging
4
import hashlib
5
6
7

import pandas as pd

8
9
from sqlalchemy import Table, Column, Integer, ForeignKey, Index
from sqlalchemy.sql.elements import NONE_NAME
10
from sqlalchemy.engine.base import Engine
Aurélien Campéas's avatar
Aurélien Campéas committed
11
from sqlalchemy.sql.expression import select, func, desc
12
from sqlalchemy.dialects.postgresql import BYTEA
13

14
from tshistory.schema import tsschema
15
16
from tshistory.util import (
    inject_in_index,
17
    num2float,
18
    subset,
19
    SeriesServices,
20
21
    tzaware_serie
)
22
from tshistory.snapshot import Snapshot, TABLES as SNAPTABLES
23

24
L = logging.getLogger('tshistory.tsio')
25
TABLES = {}
26
27


28
class TimeSerie(SeriesServices):
29
    namespace = 'tsh'
30
    schema = None
31
    metadatacache = None
32
    registry_map = None
33
    serie_tablename = None
34
35
36

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
37
38
        self.schema = tsschema(namespace)
        self.schema.define()
39
        self.metadatacache = {}
40
        self.registry_map = {}
41
        self.serie_tablename = {}
42

43
    def insert(self, cn, newts, seriename, author,
44
45
               metadata=None,
               _insertion_date=None):
46
        """Create a new revision of a given time series
47

48
        newts: pandas.Series with date index
49
        seriename: str unique identifier of the serie
50
        author: str free-form author name
51
        metadata: optional dict for changeset metadata
52
53
        """
        assert isinstance(newts, pd.Series)
54
        assert isinstance(seriename, str)
Aurélien Campéas's avatar
Aurélien Campéas committed
55
        assert isinstance(author, str)
56
        assert metadata is None or isinstance(metadata, dict)
Aurélien Campéas's avatar
Aurélien Campéas committed
57
        assert _insertion_date is None or isinstance(_insertion_date, datetime)
58
        assert not newts.index.duplicated().any()
59
        assert newts.index.is_monotonic_increasing
60

61
        newts = num2float(newts)
62

63
        if not len(newts):
64
            return
65

66
        assert ('<M8[ns]' == newts.index.dtype or
67
                'datetime' in str(newts.index.dtype) and not
68
69
                isinstance(newts.index, pd.MultiIndex))

70
71
        newts.name = seriename
        table = self._get_ts_table(cn, seriename)
72

73
        if table is None:
74
            return self._create(cn, newts, seriename, author,
75
                                metadata, _insertion_date)
76

77
        return self._update(cn, table, newts, seriename, author,
78
                            metadata, _insertion_date)
79

80
    def get(self, cn, seriename, revision_date=None,
81
82
            from_value_date=None, to_value_date=None,
            _keep_nans=False):
83
84
85
86
87
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

88
        """
89
        table = self._get_ts_table(cn, seriename)
90
91
        if table is None:
            return
92

93
        csetfilter = []
94
        if revision_date:
95
            csetfilter.append(lambda cset: cset.c.insertion_date <= revision_date)
96
        snap = Snapshot(cn, self, seriename)
97
        _, current = snap.find(csetfilter=csetfilter,
98
99
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)
100

101
        if current is not None and not _keep_nans:
102
            current.name = seriename
103
            current = current[~current.isnull()]
104
        return current
105

106
    def metadata(self, cn, seriename):
107
        """Return metadata dict of timeserie."""
108
109
        if seriename in self.metadatacache:
            return self.metadatacache[seriename]
110
111
        reg = self.schema.registry
        sql = select([reg.c.metadata]).where(
112
            reg.c.seriename == seriename
113
114
        )
        meta = cn.execute(sql).scalar()
115
        self.metadatacache[seriename] = meta
116
117
        return meta

118
    def update_metadata(self, cn, seriename, metadata, internal=False):
119
        assert isinstance(metadata, dict)
120
        meta = self.metadata(cn, seriename)
121
122
123
124
125
        if not internal:
            assert set(meta.keys()).intersection(metadata.keys()) == set()
        meta.update(metadata)
        reg = self.schema.registry
        sql = reg.update().where(
126
            reg.c.seriename == seriename
127
128
129
        ).values(metadata=metadata)
        cn.execute(sql)

130
131
132
133
134
135
136
137
    def changeset_metadata(self, cn, csid):
        cset = self.schema.changeset
        sql = 'select metadata from "{ns}".changeset where id = {id}'.format(
            ns=self.namespace,
            id=csid
        )
        return cn.execute(sql).scalar()

138
    def get_history(self, cn, seriename,
139
                    from_insertion_date=None,
140
141
                    to_insertion_date=None,
                    from_value_date=None,
142
                    to_value_date=None,
143
                    deltabefore=None,
144
145
                    deltaafter=None,
                    diffmode=False):
146
        table = self._get_ts_table(cn, seriename)
147
148
149
        if table is None:
            return

150
151
152
153
        if deltabefore is not None or deltaafter is not None:
            assert from_value_date is None
            assert to_value_date is None

154
        cset = self.schema.changeset
155
156
157
158
159
160
161
162
163
164
165
166
        revsql = select(
            [cset.c.id, cset.c.insertion_date]
        ).order_by(
            cset.c.id
        ).where(
            table.c.cset == cset.c.id
        )

        if from_insertion_date:
            revsql = revsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            revsql = revsql.where(cset.c.insertion_date <= to_insertion_date)
167

168
169
170
        revs = cn.execute(revsql).fetchall()
        if not revs:
            return
171

172
173
174
175
176
177
        if diffmode:
            # compute the previous serie value
            first_csid = revs[0][0]
            previous_csid = self._previous_cset(cn, seriename, first_csid)
            revs.insert(0, (previous_csid, None))

178
        snapshot = Snapshot(cn, self, seriename)
179
        series = []
180
181
        if (deltabefore, deltaafter) != (None, None):
            for csid, idate in revs:
182
183
                from_date = None
                to_date = None
184
                if deltabefore is not None:
185
                    from_date = idate - deltabefore
186
                if deltaafter is not None:
187
                    to_date = idate + deltaafter
188
189
190
                series.append((
                    idate,
                    snapshot.find(csetfilter=[lambda cset: cset.c.id == csid],
191
192
                                  from_value_date=from_date,
                                  to_value_date=to_date)[1]
193
194
195
196
197
198
                ))
        else:
            series = snapshot.findall(revs,
                                      from_value_date,
                                      to_value_date
            )
199

200
201
202
203
204
205
206
207
208
209
210
        if diffmode:
            diffs = []
            for (revdate_a, serie_a), (revdate_b, serie_b) in zip(series, series[1:]):
                if serie_a is None:
                    # when we scan the entirety of the history: there exists no "previous" serie
                    # we therefore consider the first serie as a diff to the "null" serie
                    diffs.append((revdate_b, serie_b))
                else:
                    diffs.append((revdate_b, self.diff(serie_a, serie_b)))
            series = diffs

211
212
213
214
        return {
            idate: serie
            for idate, serie in series
        }
215

216
217
218
219
    def get_delta(self, cn, seriename, delta,
                  from_value_date=None,
                  to_value_date=None):

220
        histo = self.get_history(
221
            cn, seriename, deltabefore=-delta
222
        )
223
224
225
        for revdate, serie in histo.items():
            inject_in_index(serie, revdate)
        histo = pd.concat([serie for serie in histo.values()])
226
227

        df = histo.reset_index()
228

229
230
231
232
233
234
235
        # df_date is a dataframe with two columns: value_date and insertion_date
        df_date = df.loc[:, ['insertion_date', 'value_date']]

        # now in selected_dates each value_date has only one occurence
        # which is the last inserted
        selected_dates = df_date.groupby('value_date').max().reset_index()

236
        ts = df[seriename]
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
        # ts is built from the df returned from get_history
        # ts index is now a simple index of tuples (insert_date, value_date)
        ts.index = ((row.insertion_date, row.value_date)
                    for row in df.itertuples())
        # in ts, there ie still all the couple value_date * insertion_date
        # We now used the selected_dates to select in ts only
        # the couple (value_date, insertion_date)
        # which corresponds to the last insertion_date
        ts_select = ts[[(row[2], row[1])
                        for row in selected_dates.itertuples()]]

        # ts_select has still a simple index of tuples (value_date, insertion_date)
        new_index = (elt[1] for elt in ts_select.index)

        # we only keep the value_date information from the index
        ts_select.index = new_index
253
        return subset(ts_select, from_value_date, to_value_date)
254
255


256
257
    def exists(self, cn, seriename):
        return self._get_ts_table(cn, seriename) is not None
258

259
    def latest_insertion_date(self, cn, seriename):
260
        cset = self.schema.changeset
261
        tstable = self._get_ts_table(cn, seriename)
262
        sql = select([func.max(cset.c.insertion_date)]
263
        ).where(tstable.c.cset == cset.c.id)
264
        return cn.execute(sql).scalar()
265

266
267
268
269
    def last_id(self, cn, seriename):
        snapshot = Snapshot(cn, self, seriename)
        return snapshot.last_id()

270
271
272
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
        assert mode in ('strict', 'before', 'after')
        cset = self.schema.changeset
273
        table = self._table_definition_for(cn, seriename)
274
275
        sql = select([table.c.cset]).where(
            table.c.cset == cset.c.id
276
277
278
279
280
281
282
283
284
        )
        if mode == 'strict':
            sql = sql.where(cset.c.insertion_date == revdate)
        elif mode == 'before':
            sql = sql.where(cset.c.insertion_date <= revdate)
        else:
            sql = sql.where(cset.c.insertion_date >= revdate)
        return cn.execute(sql).scalar()

285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
    def delete(self, cn, seriename):
        assert not isinstance(cn, Engine), 'use a transaction object'
        assert self.exists(cn, seriename)
        # changeset will keep ghost entries
        # we cleanup changeset series, then registry
        # then we drop the two remaining tables
        # cn *must* be a transaction scope
        rid, tablename = cn.execute(
            'select id, table_name from "{}".registry '
            'where seriename = %(seriename)s'.format(self.namespace),
            seriename=seriename
        ).fetchone()
        # drop series tables
        cn.execute(
            'drop table "{}.timeserie"."{}" cascade'.format(self.namespace, tablename)
        )
        cn.execute(
            'drop table "{}.snapshot"."{}" cascade'.format(self.namespace, tablename)
        )
        # cleanup changesets table
        cn.execute('with csets as ('
                   ' select cset from "{ns}".changeset_series '
                   ' where serie = %(rid)s'
                   ') '
                   'delete from "{ns}".changeset as cset using csets '
                   'where cset.id = csets.cset'.format(ns=self.namespace),
                   rid=rid
        )
        cn.execute('delete from "{}".registry '
                   'where id = %(rid)s'.format(self.namespace),
                   rid=rid)
        # -> this will transitively cleanup state changeset_series entries
        self._resetcaches()

319
320
321
322
323
324
325
326
327
    def strip(self, cn, seriename, csid):
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs

        # put stripping info in the metadata
        cset = self.schema.changeset
        cset_serie = self.schema.changeset_series
        for log in logs:
            # update changeset.metadata
328
            metadata = self.changeset_metadata(cn, log['rev']) or {}
329
330
331
332
333
334
            metadata['tshistory.info'] = 'got stripped from {}'.format(csid)
            sql = cset.update().where(cset.c.id == log['rev']
            ).values(metadata=metadata)
            cn.execute(sql)
            # delete changset_serie item
            sql = cset_serie.delete().where(
335
                cset_serie.c.cset == log['rev']
336
            ).where(
337
                cset_serie.c.serie == self._name_to_regid(cn, seriename)
338
339
340
341
            )
            cn.execute(sql)

        # wipe the diffs
342
        table = self._table_definition_for(cn, seriename)
343
        cn.execute(table.delete().where(table.c.cset >= csid))
344

345
    def info(self, cn):
346
347
        """Gather global statistics on the current tshistory repository
        """
348
        sql = 'select count(*) from {}.registry'.format(self.namespace)
349
        stats = {'series count': cn.execute(sql).scalar()}
350
        sql = 'select max(id) from {}.changeset'.format(self.namespace)
351
        stats['changeset count'] = cn.execute(sql).scalar()
352
        sql = 'select distinct name from {}.registry order by name'.format(self.namespace)
353
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
354
355
        return stats

356
    def log(self, cn, limit=0, names=None, authors=None,
357
            stripped=False,
358
359
            fromrev=None, torev=None,
            fromdate=None, todate=None):
360
361
362
363
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
364
365
366
367
368
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
369

370
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date, cset.c.metadata]
371
        ).distinct().order_by(desc(cset.c.id))
372
373
374

        if limit:
            sql = sql.limit(limit)
375
        if names:
376
            sql = sql.where(reg.c.seriename.in_(names))
377
378
        if authors:
            sql = sql.where(cset.c.author.in_(authors))
379
380
381
382
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)
        if torev:
            sql = sql.where(cset.c.id <= torev)
383
384
385
386
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)
        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)
387
388
389
390
        if stripped:
            # outerjoin to show dead things
            sql = sql.select_from(cset.outerjoin(cset_series))
        else:
391
            sql = sql.where(cset.c.id == cset_series.c.cset
392
            ).where(cset_series.c.serie == reg.c.id)
393

394
        rset = cn.execute(sql)
395
        for csetid, author, revdate, meta in rset.fetchall():
396
397
            log.append({'rev': csetid, 'author': author,
                        'date': pd.Timestamp(revdate, tz='utc'),
398
                        'meta': meta or {},
399
                        'names': self._changeset_series(cn, csetid)})
400

401
        log.sort(key=lambda rev: rev['rev'])
402
403
        return log

404
405
    # /API
    # Helpers
406

407
408
    # creation / update

409
    def _create(self, cn, newts, seriename, author,
410
                metadata=None, insertion_date=None):
411
412
413
        # initial insertion
        if len(newts) == 0:
            return None
414
        self._register_serie(cn, seriename, newts)
415
        snapshot = Snapshot(cn, self, seriename)
416
        csid = self._newchangeset(cn, author, insertion_date, metadata)
417
418
419
420
421
        head = snapshot.create(newts)
        value = {
            'cset': csid,
            'snapshot': head
        }
422
        table = self._make_ts_table(cn, seriename, newts)
423
        cn.execute(table.insert().values(value))
424
        self._finalize_insertion(cn, csid, seriename)
425
        L.info('first insertion of %s (size=%s) by %s',
426
               seriename, len(newts), author)
427
428
        return newts

429
    def _update(self, cn, table, newts, seriename, author,
430
                metadata=None, insertion_date=None):
431
432
        self._validate(cn, newts, seriename)
        snapshot = Snapshot(cn, self, seriename)
433
434
435
        diff = self.diff(snapshot.last(newts.index.min(),
                                       newts.index.max()),
                         newts)
436
437
        if not len(diff):
            L.info('no difference in %s by %s (for ts of size %s)',
438
                   seriename, author, len(newts))
439
440
            return

441
        csid = self._newchangeset(cn, author, insertion_date, metadata)
442
443
444
445
446
447
        head = snapshot.update(diff)
        value = {
            'cset': csid,
            'snapshot': head
        }
        cn.execute(table.insert().values(value))
448
        self._finalize_insertion(cn, csid, seriename)
449
450

        L.info('inserted diff (size=%s) for ts %s by %s',
451
               len(diff), seriename, author)
452
453
        return diff

454
    # serie table handling
455

456
457
458
459
    def _serie_to_tablename(self, cn, seriename):
        tablename = self.serie_tablename.get(seriename)
        if tablename is not None:
            return tablename
460

461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
        reg = self.schema.registry
        sql = select([reg.c.table_name]).where(reg.c.seriename == seriename)
        tablename = cn.execute(sql).scalar()
        if tablename is None:
            # creation time
            return
        self.serie_tablename[seriename] = tablename
        return tablename

    def _table_definition_for(self, cn, seriename):
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            # creation time
            tablename = self._make_tablename(seriename)
        fq_tablename = '{}.timeserie.{}'.format(self.namespace, tablename)
        table = TABLES.get(fq_tablename)
477
        if table is None:
478
479
            TABLES[fq_tablename] = table = Table(
                tablename, self.schema.meta,
480
481
482
                Column('id', Integer, primary_key=True),
                Column('cset', Integer,
                       ForeignKey('{}.changeset.id'.format(self.namespace)),
483
                       nullable=False),
484
485
486
                Column('snapshot', Integer,
                       ForeignKey('{}.snapshot.{}.id'.format(
                           self.namespace,
487
488
489
                           tablename))),
                Index(NONE_NAME, 'cset'),
                Index(NONE_NAME, 'snapshot'),
490
                schema='{}.timeserie'.format(self.namespace),
491
                keep_existing=True
492
493
            )
        return table
494

495
    def _make_ts_table(self, cn, seriename, ts):
496
        table = self._table_definition_for(cn, seriename)
497
        table.create(cn)
498
499
500
        return table

    def _register_serie(self, cn, seriename, ts):
501
502
        index = ts.index
        inames = [name for name in index.names if name]
503
        sql = self.schema.registry.insert().values(
504
505
            seriename=seriename,
            table_name=self._make_tablename(seriename),
506
507
508
509
510
511
            metadata={
                'tzaware': tzaware_serie(ts),
                'index_type': index.dtype.name,
                'index_names': inames,
                'value_type': ts.dtypes.name
            },
512
        )
513
514
        regid = cn.execute(sql).inserted_primary_key[0]
        self.registry_map[seriename] = regid
515

516
    def _get_ts_table(self, cn, seriename):
517
518
519
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename:
            return self._table_definition_for(cn, tablename)
520

521
522
    # changeset handling

523
    def _newchangeset(self, cn, author, insertion_date=None, metadata=None):
524
        table = self.schema.changeset
525
526
527
        if insertion_date is not None:
            assert insertion_date.tzinfo is not None
        idate = pd.Timestamp(insertion_date or datetime.utcnow(), tz='UTC')
528
529
        sql = table.insert().values(
            author=author,
530
            metadata=metadata,
531
            insertion_date=idate)
532
        return cn.execute(sql).inserted_primary_key[0]
533

534
    def _changeset_series(self, cn, csid):
535
        cset_serie = self.schema.changeset_series
536
537
        reg = self.schema.registry
        sql = select(
538
            [reg.c.seriename]
539
540
        ).where(cset_serie.c.cset == csid
        ).where(cset_serie.c.serie == reg.c.id)
541

542
        return [
543
            row.seriename
544
545
            for row in cn.execute(sql).fetchall()
        ]
546

547
548
549
550
551
552
553
    def _previous_cset(self, cn, seriename, csid):
        tablename = self._serie_to_tablename(cn, seriename)
        sql = ('select cset from "{}.timeserie"."{}" '
               'where cset < %(csid)s '
               'order by cset desc limit 1').format(self.namespace, tablename)
        return cn.execute(sql, csid=csid).scalar()

554
555
    # insertion handling

556
    def _validate(self, cn, ts, seriename):
557
558
        if ts.isnull().all():
            # ts erasure
559
            return
560
        tstype = ts.dtype
561
        meta = self.metadata(cn, seriename)
562
        if tstype != meta['value_type']:
563
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
564
                seriename, tstype, meta['value_type'])
565
            raise Exception(m)
566
        if ts.index.dtype.name != meta['index_type']:
567
            raise Exception('Incompatible index types')
568

569
570
    def _name_to_regid(self, cn, seriename):
        regid = self.registry_map.get(seriename)
571
572
573
574
        if regid is not None:
            return regid

        registry = self.schema.registry
575
        sql = select([registry.c.id]).where(registry.c.seriename == seriename)
576
        regid = self.registry_map[seriename] = cn.execute(sql).scalar()
577
578
        return regid

579
    def _finalize_insertion(self, cn, csid, seriename):
580
581
582
        table = self.schema.changeset_series
        sql = table.insert().values(
            cset=csid,
583
            serie=self._name_to_regid(cn, seriename)
584
585
        )
        cn.execute(sql)
586

587
588
589
    def _resetcaches(self):
        TABLES.clear()
        SNAPTABLES.clear()
590
591
592
        self.metadatacache.clear()
        self.registry_map.clear()
        self.serie_tablename.clear()