tsio.py 22.4 KB
Newer Older
1
from datetime import datetime
2
from contextlib import contextmanager
3
import logging
4
import hashlib
5
6
7

import pandas as pd

8
9
from sqlalchemy import Table, Column, Integer, ForeignKey, Index
from sqlalchemy.sql.elements import NONE_NAME
10
from sqlalchemy.engine.base import Engine
Aurélien Campéas's avatar
Aurélien Campéas committed
11
from sqlalchemy.sql.expression import select, func, desc
12
from sqlalchemy.dialects.postgresql import BYTEA, TIMESTAMP
13

14
from tshistory.schema import tsschema
15
from tshistory.util import (
16
    closed_overlaps,
17
    inject_in_index,
18
    num2float,
19
    subset,
20
    SeriesServices,
21
    start_end,
22
23
    tzaware_serie
)
24
from tshistory.snapshot import Snapshot, TABLES as SNAPTABLES
25

26
L = logging.getLogger('tshistory.tsio')
27
TABLES = {}
28
29


30
class TimeSerie(SeriesServices):
31
    namespace = 'tsh'
32
    schema = None
33
    metadatacache = None
34
    registry_map = None
35
    serie_tablename = None
36
37
38

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
39
40
        self.schema = tsschema(namespace)
        self.schema.define()
41
        self.metadatacache = {}
42
        self.registry_map = {}
43
        self.serie_tablename = {}
44

45
    def insert(self, cn, newts, seriename, author,
46
47
               metadata=None,
               _insertion_date=None):
48
        """Create a new revision of a given time series
49

50
        newts: pandas.Series with date index
51
        seriename: str unique identifier of the serie
52
        author: str free-form author name
53
        metadata: optional dict for changeset metadata
54
55
        """
        assert isinstance(newts, pd.Series)
56
        assert isinstance(seriename, str)
Aurélien Campéas's avatar
Aurélien Campéas committed
57
        assert isinstance(author, str)
58
        assert metadata is None or isinstance(metadata, dict)
Aurélien Campéas's avatar
Aurélien Campéas committed
59
        assert _insertion_date is None or isinstance(_insertion_date, datetime)
60
        assert not newts.index.duplicated().any()
61
        assert newts.index.is_monotonic_increasing
62

63
        newts = num2float(newts)
64

65
        if not len(newts):
66
            return
67

68
        assert ('<M8[ns]' == newts.index.dtype or
69
                'datetime' in str(newts.index.dtype) and not
70
71
                isinstance(newts.index, pd.MultiIndex))

72
73
        newts.name = seriename
        table = self._get_ts_table(cn, seriename)
74

75
        if table is None:
76
            return self._create(cn, newts, seriename, author,
77
                                metadata, _insertion_date)
78

79
        return self._update(cn, table, newts, seriename, author,
80
                            metadata, _insertion_date)
81

82
    def get(self, cn, seriename, revision_date=None,
83
84
            from_value_date=None, to_value_date=None,
            _keep_nans=False):
85
86
87
88
89
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

90
        """
91
        table = self._get_ts_table(cn, seriename)
92
93
        if table is None:
            return
94

95
        csetfilter = []
96
        if revision_date:
97
            csetfilter.append(lambda cset: cset.c.insertion_date <= revision_date)
98
        snap = Snapshot(cn, self, seriename)
99
        _, current = snap.find(csetfilter=csetfilter,
100
101
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)
102

103
        if current is not None and not _keep_nans:
104
            current.name = seriename
105
            current = current[~current.isnull()]
106
        return current
107

108
    def metadata(self, cn, seriename):
109
        """Return metadata dict of timeserie."""
110
111
        if seriename in self.metadatacache:
            return self.metadatacache[seriename]
112
113
        reg = self.schema.registry
        sql = select([reg.c.metadata]).where(
114
            reg.c.seriename == seriename
115
116
        )
        meta = cn.execute(sql).scalar()
117
        self.metadatacache[seriename] = meta
118
119
        return meta

120
    def update_metadata(self, cn, seriename, metadata, internal=False):
121
        assert isinstance(metadata, dict)
122
        meta = self.metadata(cn, seriename)
123
124
125
126
127
        if not internal:
            assert set(meta.keys()).intersection(metadata.keys()) == set()
        meta.update(metadata)
        reg = self.schema.registry
        sql = reg.update().where(
128
            reg.c.seriename == seriename
129
130
131
        ).values(metadata=metadata)
        cn.execute(sql)

132
133
134
135
136
137
138
139
    def changeset_metadata(self, cn, csid):
        cset = self.schema.changeset
        sql = 'select metadata from "{ns}".changeset where id = {id}'.format(
            ns=self.namespace,
            id=csid
        )
        return cn.execute(sql).scalar()

140
    def get_history(self, cn, seriename,
141
                    from_insertion_date=None,
142
143
                    to_insertion_date=None,
                    from_value_date=None,
144
                    to_value_date=None,
145
                    deltabefore=None,
146
147
                    deltaafter=None,
                    diffmode=False):
148
        table = self._get_ts_table(cn, seriename)
149
150
151
        if table is None:
            return

152
        cset = self.schema.changeset
153
154
155
156
157
158
159
160
161
162
163
164
        revsql = select(
            [cset.c.id, cset.c.insertion_date]
        ).order_by(
            cset.c.id
        ).where(
            table.c.cset == cset.c.id
        )

        if from_insertion_date:
            revsql = revsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            revsql = revsql.where(cset.c.insertion_date <= to_insertion_date)
165

166
167
168
169
170
171
172
173
174
        if from_value_date or to_value_date:
            revsql = revsql.where(
                closed_overlaps(from_value_date, to_value_date)
            )

        revs = cn.execute(
            revsql,
            {'fromdate': from_value_date, 'todate': to_value_date}
        ).fetchall()
175
176
        if not revs:
            return
177

178
179
180
181
182
183
        if diffmode:
            # compute the previous serie value
            first_csid = revs[0][0]
            previous_csid = self._previous_cset(cn, seriename, first_csid)
            revs.insert(0, (previous_csid, None))

184
        snapshot = Snapshot(cn, self, seriename)
185
        series = []
186
187
        if (deltabefore, deltaafter) != (None, None):
            for csid, idate in revs:
188
189
                from_date = None
                to_date = None
190
                if deltabefore is not None:
191
                    from_date = idate - deltabefore
192
                if deltaafter is not None:
193
                    to_date = idate + deltaafter
194
195
196
                series.append((
                    idate,
                    snapshot.find(csetfilter=[lambda cset: cset.c.id == csid],
197
198
                                  from_value_date=from_date,
                                  to_value_date=to_date)[1]
199
200
201
202
203
204
                ))
        else:
            series = snapshot.findall(revs,
                                      from_value_date,
                                      to_value_date
            )
205

206
207
208
209
210
211
212
213
214
215
216
        if diffmode:
            diffs = []
            for (revdate_a, serie_a), (revdate_b, serie_b) in zip(series, series[1:]):
                if serie_a is None:
                    # when we scan the entirety of the history: there exists no "previous" serie
                    # we therefore consider the first serie as a diff to the "null" serie
                    diffs.append((revdate_b, serie_b))
                else:
                    diffs.append((revdate_b, self.diff(serie_a, serie_b)))
            series = diffs

217
218
219
220
        return {
            idate: serie
            for idate, serie in series
        }
221

222
223
224
    def get_delta(self, cn, seriename, delta,
                  from_value_date=None,
                  to_value_date=None):
225
226
227
228
        """ compute a series whose value dates are bounded to be
        `delta` time after the insertion dates and where we
        keep the most recent ones
        """
229

230
        histo = self.get_history(
231
232
233
            cn, seriename, deltabefore=-delta,
            from_value_date=from_value_date,
            to_value_date=to_value_date
234
235
        )

236
237
238
239
240
241
242
243
244
        vimap = {}
        vvmap = {}
        for idate, series in histo.items():
            for vdate, value in series.iteritems():
                if vdate not in vimap or vimap[vdate] < idate:
                    vimap[vdate] = idate
                    vvmap[vdate] = value

        return subset(pd.Series(vvmap), from_value_date, to_value_date)
245

246
247
    def exists(self, cn, seriename):
        return self._get_ts_table(cn, seriename) is not None
248

249
    def latest_insertion_date(self, cn, seriename):
250
        cset = self.schema.changeset
251
        tstable = self._get_ts_table(cn, seriename)
252
        sql = select([func.max(cset.c.insertion_date)]
253
        ).where(tstable.c.cset == cset.c.id)
254
        return cn.execute(sql).scalar()
255

256
257
258
259
    def last_id(self, cn, seriename):
        snapshot = Snapshot(cn, self, seriename)
        return snapshot.last_id()

260
261
262
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
        assert mode in ('strict', 'before', 'after')
        cset = self.schema.changeset
263
        table = self._table_definition_for(cn, seriename)
264
265
        sql = select([table.c.cset]).where(
            table.c.cset == cset.c.id
266
267
268
269
270
271
272
273
274
        )
        if mode == 'strict':
            sql = sql.where(cset.c.insertion_date == revdate)
        elif mode == 'before':
            sql = sql.where(cset.c.insertion_date <= revdate)
        else:
            sql = sql.where(cset.c.insertion_date >= revdate)
        return cn.execute(sql).scalar()

275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
    def delete(self, cn, seriename):
        assert not isinstance(cn, Engine), 'use a transaction object'
        assert self.exists(cn, seriename)
        # changeset will keep ghost entries
        # we cleanup changeset series, then registry
        # then we drop the two remaining tables
        # cn *must* be a transaction scope
        rid, tablename = cn.execute(
            'select id, table_name from "{}".registry '
            'where seriename = %(seriename)s'.format(self.namespace),
            seriename=seriename
        ).fetchone()
        # drop series tables
        cn.execute(
            'drop table "{}.timeserie"."{}" cascade'.format(self.namespace, tablename)
        )
        cn.execute(
            'drop table "{}.snapshot"."{}" cascade'.format(self.namespace, tablename)
        )
        # cleanup changesets table
        cn.execute('with csets as ('
                   ' select cset from "{ns}".changeset_series '
                   ' where serie = %(rid)s'
                   ') '
                   'delete from "{ns}".changeset as cset using csets '
                   'where cset.id = csets.cset'.format(ns=self.namespace),
                   rid=rid
        )
        cn.execute('delete from "{}".registry '
                   'where id = %(rid)s'.format(self.namespace),
                   rid=rid)
        # -> this will transitively cleanup state changeset_series entries
        self._resetcaches()

309
310
311
312
313
314
315
316
317
    def strip(self, cn, seriename, csid):
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs

        # put stripping info in the metadata
        cset = self.schema.changeset
        cset_serie = self.schema.changeset_series
        for log in logs:
            # update changeset.metadata
318
            metadata = self.changeset_metadata(cn, log['rev']) or {}
319
320
321
322
323
324
            metadata['tshistory.info'] = 'got stripped from {}'.format(csid)
            sql = cset.update().where(cset.c.id == log['rev']
            ).values(metadata=metadata)
            cn.execute(sql)
            # delete changset_serie item
            sql = cset_serie.delete().where(
325
                cset_serie.c.cset == log['rev']
326
            ).where(
327
                cset_serie.c.serie == self._name_to_regid(cn, seriename)
328
329
330
331
            )
            cn.execute(sql)

        # wipe the diffs
332
        table = self._table_definition_for(cn, seriename)
333
        cn.execute(table.delete().where(table.c.cset >= csid))
334

335
    def info(self, cn):
336
337
        """Gather global statistics on the current tshistory repository
        """
338
        sql = 'select count(*) from "{}".registry'.format(self.namespace)
339
        stats = {'series count': cn.execute(sql).scalar()}
340
        sql = 'select max(id) from "{}".changeset'.format(self.namespace)
341
        stats['changeset count'] = cn.execute(sql).scalar()
342
343
344
        sql = 'select distinct seriename from "{}".registry order by seriename'.format(
            self.namespace
        )
345
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
346
347
        return stats

348
    def log(self, cn, limit=0, names=None, authors=None,
349
            stripped=False,
350
351
            fromrev=None, torev=None,
            fromdate=None, todate=None):
352
353
354
355
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
356
357
358
359
360
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
361

362
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date, cset.c.metadata]
363
        ).distinct().order_by(desc(cset.c.id))
364
365
366

        if limit:
            sql = sql.limit(limit)
367
        if names:
368
            sql = sql.where(reg.c.seriename.in_(names))
369
370
        if authors:
            sql = sql.where(cset.c.author.in_(authors))
371
372
373
374
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)
        if torev:
            sql = sql.where(cset.c.id <= torev)
375
376
377
378
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)
        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)
379
380
381
382
        if stripped:
            # outerjoin to show dead things
            sql = sql.select_from(cset.outerjoin(cset_series))
        else:
383
            sql = sql.where(cset.c.id == cset_series.c.cset
384
            ).where(cset_series.c.serie == reg.c.id)
385

386
        rset = cn.execute(sql)
387
        for csetid, author, revdate, meta in rset.fetchall():
388
389
            log.append({'rev': csetid, 'author': author,
                        'date': pd.Timestamp(revdate, tz='utc'),
390
                        'meta': meta or {},
391
                        'names': self._changeset_series(cn, csetid)})
392

393
        log.sort(key=lambda rev: rev['rev'])
394
395
        return log

396
397
398
399
400
401
402
403
404
405
406
407
408
    def interval(self, cn, seriename):
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            raise ValueError(f'no such serie: {seriename}')
        sql = (f'select start, "end" '
               f'from "{self.namespace}.timeserie"."{tablename}" '
               f'order by cset desc limit 1')
        res = cn.execute(sql).fetchone()
        start, end = res.start, res.end
        if self.metadata(cn, seriename).get('tzaware'):
            start, end = pd.Timestamp(start, tz='UTC'), pd.Timestamp(end, tz='UTC')
        return pd.Interval(left=start, right=end, closed='both')

409
410
    # /API
    # Helpers
411

412
413
    # creation / update

414
    def _create(self, cn, newts, seriename, author,
415
                metadata=None, insertion_date=None):
416
417
418
        # initial insertion
        if len(newts) == 0:
            return None
419
        self._register_serie(cn, seriename, newts)
420
        snapshot = Snapshot(cn, self, seriename)
421
        csid = self._newchangeset(cn, author, insertion_date, metadata)
422
        head = snapshot.create(newts)
423
        start, end = start_end(newts)
424
425
        value = {
            'cset': csid,
426
427
428
            'snapshot': head,
            'start': start,
            'end': end
429
        }
430
        table = self._make_ts_table(cn, seriename, newts)
431
        cn.execute(table.insert().values(value))
432
        self._finalize_insertion(cn, csid, seriename)
433
        L.info('first insertion of %s (size=%s) by %s',
434
               seriename, len(newts), author)
435
436
        return newts

437
    def _update(self, cn, table, newts, seriename, author,
438
                metadata=None, insertion_date=None):
439
440
        self._validate(cn, newts, seriename)
        snapshot = Snapshot(cn, self, seriename)
441
442
443
        diff = self.diff(snapshot.last(newts.index.min(),
                                       newts.index.max()),
                         newts)
444
445
        if not len(diff):
            L.info('no difference in %s by %s (for ts of size %s)',
446
                   seriename, author, len(newts))
447
448
            return

449
        csid = self._newchangeset(cn, author, insertion_date, metadata)
450
451
452
453
        tsstart, tsend = start_end(newts)
        ival = self.interval(cn, seriename)
        start = min(tsstart, ival.left.replace(tzinfo=None))
        end = max(tsend, ival.right.replace(tzinfo=None))
454
455
456
        head = snapshot.update(diff)
        value = {
            'cset': csid,
457
458
459
            'snapshot': head,
            'start': start,
            'end': end
460
461
        }
        cn.execute(table.insert().values(value))
462
        self._finalize_insertion(cn, csid, seriename)
463
464

        L.info('inserted diff (size=%s) for ts %s by %s',
465
               len(diff), seriename, author)
466
467
        return diff

468
    # serie table handling
469

470
471
472
473
    def _serie_to_tablename(self, cn, seriename):
        tablename = self.serie_tablename.get(seriename)
        if tablename is not None:
            return tablename
474

475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
        reg = self.schema.registry
        sql = select([reg.c.table_name]).where(reg.c.seriename == seriename)
        tablename = cn.execute(sql).scalar()
        if tablename is None:
            # creation time
            return
        self.serie_tablename[seriename] = tablename
        return tablename

    def _table_definition_for(self, cn, seriename):
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            # creation time
            tablename = self._make_tablename(seriename)
        fq_tablename = '{}.timeserie.{}'.format(self.namespace, tablename)
        table = TABLES.get(fq_tablename)
491
        if table is None:
492
493
            TABLES[fq_tablename] = table = Table(
                tablename, self.schema.meta,
494
495
496
                Column('id', Integer, primary_key=True),
                Column('cset', Integer,
                       ForeignKey('{}.changeset.id'.format(self.namespace)),
497
                       nullable=False),
498
499
                Column('start', TIMESTAMP, nullable=False),
                Column('end', TIMESTAMP, nullable=False),
500
501
502
                Column('snapshot', Integer,
                       ForeignKey('{}.snapshot.{}.id'.format(
                           self.namespace,
503
504
505
                           tablename))),
                Index(NONE_NAME, 'cset'),
                Index(NONE_NAME, 'snapshot'),
506
507
                Index(NONE_NAME, 'start'),
                Index(NONE_NAME, 'end'),
508
                schema='{}.timeserie'.format(self.namespace),
509
                keep_existing=True
510
511
            )
        return table
512

513
    def _make_ts_table(self, cn, seriename, ts):
514
        table = self._table_definition_for(cn, seriename)
515
        table.create(cn)
516
517
518
        return table

    def _register_serie(self, cn, seriename, ts):
519
520
        index = ts.index
        inames = [name for name in index.names if name]
521
        sql = self.schema.registry.insert().values(
522
523
            seriename=seriename,
            table_name=self._make_tablename(seriename),
524
525
526
527
            metadata={
                'tzaware': tzaware_serie(ts),
                'index_type': index.dtype.name,
                'index_names': inames,
528
529
                'index_dtype': index.dtype.str,
                'value_dtype': ts.dtypes.str,
530
                'value_type': ts.dtypes.name
531
            }
532
        )
533
534
        regid = cn.execute(sql).inserted_primary_key[0]
        self.registry_map[seriename] = regid
535

536
    def _get_ts_table(self, cn, seriename):
537
538
539
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename:
            return self._table_definition_for(cn, tablename)
540

541
542
    # changeset handling

543
    def _newchangeset(self, cn, author, insertion_date=None, metadata=None):
544
        table = self.schema.changeset
545
546
547
        if insertion_date is not None:
            assert insertion_date.tzinfo is not None
        idate = pd.Timestamp(insertion_date or datetime.utcnow(), tz='UTC')
548
549
        sql = table.insert().values(
            author=author,
550
            metadata=metadata,
551
            insertion_date=idate)
552
        return cn.execute(sql).inserted_primary_key[0]
553

554
    def _changeset_series(self, cn, csid):
555
        cset_serie = self.schema.changeset_series
556
557
        reg = self.schema.registry
        sql = select(
558
            [reg.c.seriename]
559
560
        ).where(cset_serie.c.cset == csid
        ).where(cset_serie.c.serie == reg.c.id)
561

562
        return [
563
            row.seriename
564
565
            for row in cn.execute(sql).fetchall()
        ]
566

567
568
569
570
571
572
573
    def _previous_cset(self, cn, seriename, csid):
        tablename = self._serie_to_tablename(cn, seriename)
        sql = ('select cset from "{}.timeserie"."{}" '
               'where cset < %(csid)s '
               'order by cset desc limit 1').format(self.namespace, tablename)
        return cn.execute(sql, csid=csid).scalar()

574
575
    # insertion handling

576
    def _validate(self, cn, ts, seriename):
577
578
        if ts.isnull().all():
            # ts erasure
579
            return
580
        tstype = ts.dtype
581
        meta = self.metadata(cn, seriename)
582
        if tstype != meta['value_type']:
583
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
584
                seriename, tstype, meta['value_type'])
585
            raise Exception(m)
586
        if ts.index.dtype.name != meta['index_type']:
587
            raise Exception('Incompatible index types')
588

589
590
    def _name_to_regid(self, cn, seriename):
        regid = self.registry_map.get(seriename)
591
592
593
594
        if regid is not None:
            return regid

        registry = self.schema.registry
595
        sql = select([registry.c.id]).where(registry.c.seriename == seriename)
596
        regid = self.registry_map[seriename] = cn.execute(sql).scalar()
597
598
        return regid

599
    def _finalize_insertion(self, cn, csid, seriename):
600
601
602
        table = self.schema.changeset_series
        sql = table.insert().values(
            cset=csid,
603
            serie=self._name_to_regid(cn, seriename)
604
605
        )
        cn.execute(sql)
606

607
608
609
    def _resetcaches(self):
        TABLES.clear()
        SNAPTABLES.clear()
610
611
612
        self.metadatacache.clear()
        self.registry_map.clear()
        self.serie_tablename.clear()