tsio.py 22.3 KB
Newer Older
1
from datetime import datetime
2
import logging
3
4
5

import pandas as pd

6
7
from sqlalchemy import Table, Column, Integer, ForeignKey, Index
from sqlalchemy.sql.elements import NONE_NAME
8
from sqlalchemy.engine.base import Engine
Aurélien Campéas's avatar
Aurélien Campéas committed
9
from sqlalchemy.sql.expression import select, func, desc
Aurélien Campéas's avatar
Aurélien Campéas committed
10
from sqlalchemy.dialects.postgresql import TIMESTAMP
11

12
from tshistory.schema import tsschema
13
from tshistory.util import (
14
    closed_overlaps,
15
    num2float,
16
    subset,
17
    SeriesServices,
18
    start_end,
19
20
    tzaware_serie
)
21
from tshistory.snapshot import Snapshot, TABLES as SNAPTABLES
22

23
L = logging.getLogger('tshistory.tsio')
24
TABLES = {}
25
26


27
class TimeSerie(SeriesServices):
28
    namespace = 'tsh'
29
    schema = None
30
    metadatacache = None
31
    registry_map = None
32
    serie_tablename = None
33
34
35

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
36
37
        self.schema = tsschema(namespace)
        self.schema.define()
38
        self.metadatacache = {}
39
        self.registry_map = {}
40
        self.serie_tablename = {}
41

42
    def insert(self, cn, newts, seriename, author,
43
44
               metadata=None,
               _insertion_date=None):
45
        """Create a new revision of a given time series
46

47
        newts: pandas.Series with date index
48
        seriename: str unique identifier of the serie
49
        author: str free-form author name
50
        metadata: optional dict for changeset metadata
51
52
        """
        assert isinstance(newts, pd.Series)
53
        assert isinstance(seriename, str)
Aurélien Campéas's avatar
Aurélien Campéas committed
54
        assert isinstance(author, str)
55
        assert metadata is None or isinstance(metadata, dict)
Aurélien Campéas's avatar
Aurélien Campéas committed
56
        assert _insertion_date is None or isinstance(_insertion_date, datetime)
57
        assert not newts.index.duplicated().any()
58
        assert newts.index.is_monotonic_increasing
59

60
        newts = num2float(newts)
61

62
        if not len(newts):
63
            return
64

65
        assert ('<M8[ns]' == newts.index.dtype or
66
                'datetime' in str(newts.index.dtype) and not
67
68
                isinstance(newts.index, pd.MultiIndex))

69
70
        newts.name = seriename
        table = self._get_ts_table(cn, seriename)
71

72
        if table is None:
73
            return self._create(cn, newts, seriename, author,
74
                                metadata, _insertion_date)
75

76
        return self._update(cn, table, newts, seriename, author,
77
                            metadata, _insertion_date)
78

79
    def get(self, cn, seriename, revision_date=None,
80
81
            from_value_date=None, to_value_date=None,
            _keep_nans=False):
82
83
84
85
86
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

87
        """
88
        table = self._get_ts_table(cn, seriename)
89
90
        if table is None:
            return
91

92
        csetfilter = []
93
        if revision_date:
94
            csetfilter.append(lambda cset: cset.c.insertion_date <= revision_date)
95
        snap = Snapshot(cn, self, seriename)
96
        _, current = snap.find(csetfilter=csetfilter,
97
98
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)
99

100
        if current is not None and not _keep_nans:
101
            current.name = seriename
102
            current = current[~current.isnull()]
103
        return current
104

105
    def metadata(self, cn, seriename):
106
        """Return metadata dict of timeserie."""
107
108
        if seriename in self.metadatacache:
            return self.metadatacache[seriename]
109
110
        reg = self.schema.registry
        sql = select([reg.c.metadata]).where(
111
            reg.c.seriename == seriename
112
113
        )
        meta = cn.execute(sql).scalar()
114
        self.metadatacache[seriename] = meta
115
116
        return meta

117
    def update_metadata(self, cn, seriename, metadata, internal=False):
118
        assert isinstance(metadata, dict)
119
        meta = self.metadata(cn, seriename)
120
121
122
123
124
        if not internal:
            assert set(meta.keys()).intersection(metadata.keys()) == set()
        meta.update(metadata)
        reg = self.schema.registry
        sql = reg.update().where(
125
            reg.c.seriename == seriename
126
127
        ).values(metadata=meta)
        self.metadatacache.pop(seriename)
128
129
        cn.execute(sql)

130
131
132
133
134
135
136
    def changeset_metadata(self, cn, csid):
        sql = 'select metadata from "{ns}".changeset where id = {id}'.format(
            ns=self.namespace,
            id=csid
        )
        return cn.execute(sql).scalar()

137
    def get_history(self, cn, seriename,
138
                    from_insertion_date=None,
139
140
                    to_insertion_date=None,
                    from_value_date=None,
141
                    to_value_date=None,
142
                    deltabefore=None,
143
144
                    deltaafter=None,
                    diffmode=False):
145
        table = self._get_ts_table(cn, seriename)
146
147
148
        if table is None:
            return

149
        cset = self.schema.changeset
150
151
152
153
154
155
156
157
158
159
160
161
        revsql = select(
            [cset.c.id, cset.c.insertion_date]
        ).order_by(
            cset.c.id
        ).where(
            table.c.cset == cset.c.id
        )

        if from_insertion_date:
            revsql = revsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            revsql = revsql.where(cset.c.insertion_date <= to_insertion_date)
162

163
164
165
166
167
168
169
170
171
        if from_value_date or to_value_date:
            revsql = revsql.where(
                closed_overlaps(from_value_date, to_value_date)
            )

        revs = cn.execute(
            revsql,
            {'fromdate': from_value_date, 'todate': to_value_date}
        ).fetchall()
172
173
        if not revs:
            return
174

175
176
177
178
179
180
        if diffmode:
            # compute the previous serie value
            first_csid = revs[0][0]
            previous_csid = self._previous_cset(cn, seriename, first_csid)
            revs.insert(0, (previous_csid, None))

181
        snapshot = Snapshot(cn, self, seriename)
182
        series = []
183
184
        if (deltabefore, deltaafter) != (None, None):
            for csid, idate in revs:
185
186
                from_date = None
                to_date = None
187
                if deltabefore is not None:
188
                    from_date = idate - deltabefore
189
                if deltaafter is not None:
190
                    to_date = idate + deltaafter
191
192
193
                series.append((
                    idate,
                    snapshot.find(csetfilter=[lambda cset: cset.c.id == csid],
194
195
                                  from_value_date=from_date,
                                  to_value_date=to_date)[1]
196
197
198
199
200
201
                ))
        else:
            series = snapshot.findall(revs,
                                      from_value_date,
                                      to_value_date
            )
202

203
204
        if diffmode:
            diffs = []
Aurélien Campéas's avatar
Aurélien Campéas committed
205
            for (_revdate_a, serie_a), (revdate_b, serie_b) in zip(series, series[1:]):
206
207
208
209
210
211
212
213
                if serie_a is None:
                    # when we scan the entirety of the history: there exists no "previous" serie
                    # we therefore consider the first serie as a diff to the "null" serie
                    diffs.append((revdate_b, serie_b))
                else:
                    diffs.append((revdate_b, self.diff(serie_a, serie_b)))
            series = diffs

214
215
216
217
        return {
            idate: serie
            for idate, serie in series
        }
218

219
220
221
    def get_delta(self, cn, seriename, delta,
                  from_value_date=None,
                  to_value_date=None):
222
223
224
225
        """ compute a series whose value dates are bounded to be
        `delta` time after the insertion dates and where we
        keep the most recent ones
        """
226

227
        histo = self.get_history(
228
229
230
            cn, seriename, deltabefore=-delta,
            from_value_date=from_value_date,
            to_value_date=to_value_date
231
232
        )

233
234
235
236
237
238
239
240
241
        vimap = {}
        vvmap = {}
        for idate, series in histo.items():
            for vdate, value in series.iteritems():
                if vdate not in vimap or vimap[vdate] < idate:
                    vimap[vdate] = idate
                    vvmap[vdate] = value

        return subset(pd.Series(vvmap), from_value_date, to_value_date)
242

243
244
    def exists(self, cn, seriename):
        return self._get_ts_table(cn, seriename) is not None
245

246
    def latest_insertion_date(self, cn, seriename):
247
        cset = self.schema.changeset
248
        tstable = self._get_ts_table(cn, seriename)
249
        sql = select([func.max(cset.c.insertion_date)]
250
        ).where(tstable.c.cset == cset.c.id)
251
        return cn.execute(sql).scalar()
252

253
254
255
256
    def last_id(self, cn, seriename):
        snapshot = Snapshot(cn, self, seriename)
        return snapshot.last_id()

257
258
259
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
        assert mode in ('strict', 'before', 'after')
        cset = self.schema.changeset
260
        table = self._table_definition_for(cn, seriename)
261
262
        sql = select([table.c.cset]).where(
            table.c.cset == cset.c.id
263
264
265
266
267
268
269
270
271
        )
        if mode == 'strict':
            sql = sql.where(cset.c.insertion_date == revdate)
        elif mode == 'before':
            sql = sql.where(cset.c.insertion_date <= revdate)
        else:
            sql = sql.where(cset.c.insertion_date >= revdate)
        return cn.execute(sql).scalar()

272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
    def delete(self, cn, seriename):
        assert not isinstance(cn, Engine), 'use a transaction object'
        assert self.exists(cn, seriename)
        # changeset will keep ghost entries
        # we cleanup changeset series, then registry
        # then we drop the two remaining tables
        # cn *must* be a transaction scope
        rid, tablename = cn.execute(
            'select id, table_name from "{}".registry '
            'where seriename = %(seriename)s'.format(self.namespace),
            seriename=seriename
        ).fetchone()
        # drop series tables
        cn.execute(
            'drop table "{}.timeserie"."{}" cascade'.format(self.namespace, tablename)
        )
        cn.execute(
            'drop table "{}.snapshot"."{}" cascade'.format(self.namespace, tablename)
        )
        # cleanup changesets table
        cn.execute('with csets as ('
                   ' select cset from "{ns}".changeset_series '
                   ' where serie = %(rid)s'
                   ') '
                   'delete from "{ns}".changeset as cset using csets '
                   'where cset.id = csets.cset'.format(ns=self.namespace),
                   rid=rid
        )
        cn.execute('delete from "{}".registry '
                   'where id = %(rid)s'.format(self.namespace),
                   rid=rid)
        # -> this will transitively cleanup state changeset_series entries
        self._resetcaches()

306
307
308
309
310
311
312
313
314
    def strip(self, cn, seriename, csid):
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs

        # put stripping info in the metadata
        cset = self.schema.changeset
        cset_serie = self.schema.changeset_series
        for log in logs:
            # update changeset.metadata
315
            metadata = self.changeset_metadata(cn, log['rev']) or {}
316
317
318
319
320
321
            metadata['tshistory.info'] = 'got stripped from {}'.format(csid)
            sql = cset.update().where(cset.c.id == log['rev']
            ).values(metadata=metadata)
            cn.execute(sql)
            # delete changset_serie item
            sql = cset_serie.delete().where(
322
                cset_serie.c.cset == log['rev']
323
            ).where(
324
                cset_serie.c.serie == self._name_to_regid(cn, seriename)
325
326
327
328
            )
            cn.execute(sql)

        # wipe the diffs
329
        table = self._table_definition_for(cn, seriename)
330
        cn.execute(table.delete().where(table.c.cset >= csid))
331

332
    def info(self, cn):
333
334
        """Gather global statistics on the current tshistory repository
        """
335
        sql = 'select count(*) from "{}".registry'.format(self.namespace)
336
        stats = {'series count': cn.execute(sql).scalar()}
337
        sql = 'select max(id) from "{}".changeset'.format(self.namespace)
338
        stats['changeset count'] = cn.execute(sql).scalar()
339
340
341
        sql = 'select distinct seriename from "{}".registry order by seriename'.format(
            self.namespace
        )
342
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
343
344
        return stats

345
    def log(self, cn, limit=0, names=None, authors=None,
346
            stripped=False,
347
348
            fromrev=None, torev=None,
            fromdate=None, todate=None):
349
350
351
352
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
353
354
355
356
357
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
358

359
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date, cset.c.metadata]
360
        ).distinct().order_by(desc(cset.c.id))
361
362
363

        if limit:
            sql = sql.limit(limit)
364
        if names:
365
            sql = sql.where(reg.c.seriename.in_(names))
366
367
        if authors:
            sql = sql.where(cset.c.author.in_(authors))
368
369
370
371
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)
        if torev:
            sql = sql.where(cset.c.id <= torev)
372
373
374
375
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)
        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)
376
377
378
379
        if stripped:
            # outerjoin to show dead things
            sql = sql.select_from(cset.outerjoin(cset_series))
        else:
380
            sql = sql.where(cset.c.id == cset_series.c.cset
381
            ).where(cset_series.c.serie == reg.c.id)
382

383
        rset = cn.execute(sql)
384
        for csetid, author, revdate, meta in rset.fetchall():
385
386
            log.append({'rev': csetid, 'author': author,
                        'date': pd.Timestamp(revdate, tz='utc'),
387
                        'meta': meta or {},
388
                        'names': self._changeset_series(cn, csetid)})
389

390
        log.sort(key=lambda rev: rev['rev'])
391
392
        return log

393
394
395
396
397
398
399
400
401
402
403
404
405
    def interval(self, cn, seriename):
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            raise ValueError(f'no such serie: {seriename}')
        sql = (f'select start, "end" '
               f'from "{self.namespace}.timeserie"."{tablename}" '
               f'order by cset desc limit 1')
        res = cn.execute(sql).fetchone()
        start, end = res.start, res.end
        if self.metadata(cn, seriename).get('tzaware'):
            start, end = pd.Timestamp(start, tz='UTC'), pd.Timestamp(end, tz='UTC')
        return pd.Interval(left=start, right=end, closed='both')

406
407
    # /API
    # Helpers
408

409
410
    # creation / update

411
    def _create(self, cn, newts, seriename, author,
412
                metadata=None, insertion_date=None):
413
414
415
        # initial insertion
        if len(newts) == 0:
            return None
416
        self._register_serie(cn, seriename, newts)
417
        snapshot = Snapshot(cn, self, seriename)
418
        csid = self._newchangeset(cn, author, insertion_date, metadata)
419
        head = snapshot.create(newts)
420
        start, end = start_end(newts)
421
422
        value = {
            'cset': csid,
423
424
425
            'snapshot': head,
            'start': start,
            'end': end
426
        }
Aurélien Campéas's avatar
Aurélien Campéas committed
427
        table = self._make_ts_table(cn, seriename)
428
        cn.execute(table.insert().values(value))
429
        self._finalize_insertion(cn, csid, seriename)
430
        L.info('first insertion of %s (size=%s) by %s',
431
               seriename, len(newts), author)
432
433
        return newts

434
    def _update(self, cn, table, newts, seriename, author,
435
                metadata=None, insertion_date=None):
436
437
        self._validate(cn, newts, seriename)
        snapshot = Snapshot(cn, self, seriename)
438
439
440
        diff = self.diff(snapshot.last(newts.index.min(),
                                       newts.index.max()),
                         newts)
441
442
        if not len(diff):
            L.info('no difference in %s by %s (for ts of size %s)',
443
                   seriename, author, len(newts))
444
445
            return

446
        csid = self._newchangeset(cn, author, insertion_date, metadata)
447
448
449
450
        tsstart, tsend = start_end(newts)
        ival = self.interval(cn, seriename)
        start = min(tsstart, ival.left.replace(tzinfo=None))
        end = max(tsend, ival.right.replace(tzinfo=None))
451
452
453
        head = snapshot.update(diff)
        value = {
            'cset': csid,
454
455
456
            'snapshot': head,
            'start': start,
            'end': end
457
458
        }
        cn.execute(table.insert().values(value))
459
        self._finalize_insertion(cn, csid, seriename)
460
461

        L.info('inserted diff (size=%s) for ts %s by %s',
462
               len(diff), seriename, author)
463
464
        return diff

465
    # serie table handling
466

467
468
469
470
    def _serie_to_tablename(self, cn, seriename):
        tablename = self.serie_tablename.get(seriename)
        if tablename is not None:
            return tablename
471

472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
        reg = self.schema.registry
        sql = select([reg.c.table_name]).where(reg.c.seriename == seriename)
        tablename = cn.execute(sql).scalar()
        if tablename is None:
            # creation time
            return
        self.serie_tablename[seriename] = tablename
        return tablename

    def _table_definition_for(self, cn, seriename):
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            # creation time
            tablename = self._make_tablename(seriename)
        fq_tablename = '{}.timeserie.{}'.format(self.namespace, tablename)
        table = TABLES.get(fq_tablename)
488
        if table is None:
489
490
            TABLES[fq_tablename] = table = Table(
                tablename, self.schema.meta,
491
492
493
                Column('id', Integer, primary_key=True),
                Column('cset', Integer,
                       ForeignKey('{}.changeset.id'.format(self.namespace)),
494
                       nullable=False),
495
496
                Column('start', TIMESTAMP, nullable=False),
                Column('end', TIMESTAMP, nullable=False),
497
498
499
                Column('snapshot', Integer,
                       ForeignKey('{}.snapshot.{}.id'.format(
                           self.namespace,
500
501
502
                           tablename))),
                Index(NONE_NAME, 'cset'),
                Index(NONE_NAME, 'snapshot'),
503
504
                Index(NONE_NAME, 'start'),
                Index(NONE_NAME, 'end'),
505
                schema='{}.timeserie'.format(self.namespace),
506
                keep_existing=True
507
508
            )
        return table
509

Aurélien Campéas's avatar
Aurélien Campéas committed
510
    def _make_ts_table(self, cn, seriename):
511
        table = self._table_definition_for(cn, seriename)
512
        table.create(cn)
513
514
515
        return table

    def _register_serie(self, cn, seriename, ts):
516
517
        index = ts.index
        inames = [name for name in index.names if name]
518
        sql = self.schema.registry.insert().values(
519
520
            seriename=seriename,
            table_name=self._make_tablename(seriename),
521
522
523
524
            metadata={
                'tzaware': tzaware_serie(ts),
                'index_type': index.dtype.name,
                'index_names': inames,
525
526
                'index_dtype': index.dtype.str,
                'value_dtype': ts.dtypes.str,
527
                'value_type': ts.dtypes.name
528
            }
529
        )
530
531
        regid = cn.execute(sql).inserted_primary_key[0]
        self.registry_map[seriename] = regid
532

533
    def _get_ts_table(self, cn, seriename):
534
535
536
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename:
            return self._table_definition_for(cn, tablename)
537

538
539
    # changeset handling

540
    def _newchangeset(self, cn, author, insertion_date=None, metadata=None):
541
        table = self.schema.changeset
542
543
544
        if insertion_date is not None:
            assert insertion_date.tzinfo is not None
        idate = pd.Timestamp(insertion_date or datetime.utcnow(), tz='UTC')
545
546
        sql = table.insert().values(
            author=author,
547
            metadata=metadata,
548
            insertion_date=idate)
549
        return cn.execute(sql).inserted_primary_key[0]
550

551
    def _changeset_series(self, cn, csid):
552
        cset_serie = self.schema.changeset_series
553
554
        reg = self.schema.registry
        sql = select(
555
            [reg.c.seriename]
556
557
        ).where(cset_serie.c.cset == csid
        ).where(cset_serie.c.serie == reg.c.id)
558

559
        return [
560
            row.seriename
561
562
            for row in cn.execute(sql).fetchall()
        ]
563

564
565
566
567
568
569
570
    def _previous_cset(self, cn, seriename, csid):
        tablename = self._serie_to_tablename(cn, seriename)
        sql = ('select cset from "{}.timeserie"."{}" '
               'where cset < %(csid)s '
               'order by cset desc limit 1').format(self.namespace, tablename)
        return cn.execute(sql, csid=csid).scalar()

571
572
    # insertion handling

573
    def _validate(self, cn, ts, seriename):
574
575
        if ts.isnull().all():
            # ts erasure
576
            return
577
        tstype = ts.dtype
578
        meta = self.metadata(cn, seriename)
579
        if tstype != meta['value_type']:
580
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
581
                seriename, tstype, meta['value_type'])
582
            raise Exception(m)
583
        if ts.index.dtype.name != meta['index_type']:
584
            raise Exception('Incompatible index types')
585

586
587
    def _name_to_regid(self, cn, seriename):
        regid = self.registry_map.get(seriename)
588
589
590
591
        if regid is not None:
            return regid

        registry = self.schema.registry
592
        sql = select([registry.c.id]).where(registry.c.seriename == seriename)
593
        regid = self.registry_map[seriename] = cn.execute(sql).scalar()
594
595
        return regid

596
    def _finalize_insertion(self, cn, csid, seriename):
597
598
599
        table = self.schema.changeset_series
        sql = table.insert().values(
            cset=csid,
600
            serie=self._name_to_regid(cn, seriename)
601
602
        )
        cn.execute(sql)
603

604
605
606
    def _resetcaches(self):
        TABLES.clear()
        SNAPTABLES.clear()
607
608
609
        self.metadatacache.clear()
        self.registry_map.clear()
        self.serie_tablename.clear()