tsio.py 22.2 KB
Newer Older
1
from datetime import datetime
2
import logging
3
4
5

import pandas as pd

6
7
from sqlalchemy import Table, Column, Integer, ForeignKey, Index
from sqlalchemy.sql.elements import NONE_NAME
8
from sqlalchemy.engine.base import Engine
Aurélien Campéas's avatar
Aurélien Campéas committed
9
from sqlalchemy.sql.expression import select, func, desc
Aurélien Campéas's avatar
Aurélien Campéas committed
10
from sqlalchemy.dialects.postgresql import TIMESTAMP
11

12
from tshistory.schema import tsschema
13
from tshistory.util import (
14
    closed_overlaps,
15
    num2float,
16
    subset,
17
    SeriesServices,
18
    start_end,
19
20
    tzaware_serie
)
21
from tshistory.snapshot import Snapshot, TABLES as SNAPTABLES
22

23
L = logging.getLogger('tshistory.tsio')
24
TABLES = {}
25
26


27
class TimeSerie(SeriesServices):
28
    namespace = 'tsh'
29
    schema = None
30
    metadatacache = None
31
    registry_map = None
32
    serie_tablename = None
33
34
35

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
36
37
        self.schema = tsschema(namespace)
        self.schema.define()
38
        self.metadatacache = {}
39
        self.registry_map = {}
40
        self.serie_tablename = {}
41

42
    def insert(self, cn, newts, seriename, author,
43
44
               metadata=None,
               _insertion_date=None):
45
        """Create a new revision of a given time series
46

47
        newts: pandas.Series with date index
48
        seriename: str unique identifier of the serie
49
        author: str free-form author name
50
        metadata: optional dict for changeset metadata
51
52
        """
        assert isinstance(newts, pd.Series)
53
        assert isinstance(seriename, str)
Aurélien Campéas's avatar
Aurélien Campéas committed
54
        assert isinstance(author, str)
55
        assert metadata is None or isinstance(metadata, dict)
Aurélien Campéas's avatar
Aurélien Campéas committed
56
        assert _insertion_date is None or isinstance(_insertion_date, datetime)
57
        assert not newts.index.duplicated().any()
58
        assert newts.index.is_monotonic_increasing
59

60
        newts = num2float(newts)
61

62
        if not len(newts):
63
            return
64

65
        assert ('<M8[ns]' == newts.index.dtype or
66
                'datetime' in str(newts.index.dtype) and not
67
68
                isinstance(newts.index, pd.MultiIndex))

69
70
        newts.name = seriename
        table = self._get_ts_table(cn, seriename)
71

72
        if table is None:
73
            return self._create(cn, newts, seriename, author,
74
                                metadata, _insertion_date)
75

76
        return self._update(cn, table, newts, seriename, author,
77
                            metadata, _insertion_date)
78

79
    def get(self, cn, seriename, revision_date=None,
80
81
            from_value_date=None, to_value_date=None,
            _keep_nans=False):
82
83
84
85
86
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

87
        """
88
        table = self._get_ts_table(cn, seriename)
89
90
        if table is None:
            return
91

92
        csetfilter = []
93
        if revision_date:
94
            csetfilter.append(lambda cset: cset.c.insertion_date <= revision_date)
95
        snap = Snapshot(cn, self, seriename)
96
        _, current = snap.find(csetfilter=csetfilter,
97
98
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)
99

100
        if current is not None and not _keep_nans:
101
            current.name = seriename
102
            current = current[~current.isnull()]
103
        return current
104

105
    def metadata(self, cn, seriename):
106
        """Return metadata dict of timeserie."""
107
108
        if seriename in self.metadatacache:
            return self.metadatacache[seriename]
109
110
        reg = self.schema.registry
        sql = select([reg.c.metadata]).where(
111
            reg.c.seriename == seriename
112
113
        )
        meta = cn.execute(sql).scalar()
114
        self.metadatacache[seriename] = meta
115
116
        return meta

117
    def update_metadata(self, cn, seriename, metadata, internal=False):
118
        assert isinstance(metadata, dict)
119
        meta = self.metadata(cn, seriename)
120
121
122
123
124
        if not internal:
            assert set(meta.keys()).intersection(metadata.keys()) == set()
        meta.update(metadata)
        reg = self.schema.registry
        sql = reg.update().where(
125
            reg.c.seriename == seriename
126
127
128
        ).values(metadata=metadata)
        cn.execute(sql)

129
130
131
132
133
134
135
    def changeset_metadata(self, cn, csid):
        sql = 'select metadata from "{ns}".changeset where id = {id}'.format(
            ns=self.namespace,
            id=csid
        )
        return cn.execute(sql).scalar()

136
    def get_history(self, cn, seriename,
137
                    from_insertion_date=None,
138
139
                    to_insertion_date=None,
                    from_value_date=None,
140
                    to_value_date=None,
141
                    deltabefore=None,
142
143
                    deltaafter=None,
                    diffmode=False):
144
        table = self._get_ts_table(cn, seriename)
145
146
147
        if table is None:
            return

148
        cset = self.schema.changeset
149
150
151
152
153
154
155
156
157
158
159
160
        revsql = select(
            [cset.c.id, cset.c.insertion_date]
        ).order_by(
            cset.c.id
        ).where(
            table.c.cset == cset.c.id
        )

        if from_insertion_date:
            revsql = revsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            revsql = revsql.where(cset.c.insertion_date <= to_insertion_date)
161

162
163
164
165
166
167
168
169
170
        if from_value_date or to_value_date:
            revsql = revsql.where(
                closed_overlaps(from_value_date, to_value_date)
            )

        revs = cn.execute(
            revsql,
            {'fromdate': from_value_date, 'todate': to_value_date}
        ).fetchall()
171
172
        if not revs:
            return
173

174
175
176
177
178
179
        if diffmode:
            # compute the previous serie value
            first_csid = revs[0][0]
            previous_csid = self._previous_cset(cn, seriename, first_csid)
            revs.insert(0, (previous_csid, None))

180
        snapshot = Snapshot(cn, self, seriename)
181
        series = []
182
183
        if (deltabefore, deltaafter) != (None, None):
            for csid, idate in revs:
184
185
                from_date = None
                to_date = None
186
                if deltabefore is not None:
187
                    from_date = idate - deltabefore
188
                if deltaafter is not None:
189
                    to_date = idate + deltaafter
190
191
192
                series.append((
                    idate,
                    snapshot.find(csetfilter=[lambda cset: cset.c.id == csid],
193
194
                                  from_value_date=from_date,
                                  to_value_date=to_date)[1]
195
196
197
198
199
200
                ))
        else:
            series = snapshot.findall(revs,
                                      from_value_date,
                                      to_value_date
            )
201

202
203
        if diffmode:
            diffs = []
Aurélien Campéas's avatar
Aurélien Campéas committed
204
            for (_revdate_a, serie_a), (revdate_b, serie_b) in zip(series, series[1:]):
205
206
207
208
209
210
211
212
                if serie_a is None:
                    # when we scan the entirety of the history: there exists no "previous" serie
                    # we therefore consider the first serie as a diff to the "null" serie
                    diffs.append((revdate_b, serie_b))
                else:
                    diffs.append((revdate_b, self.diff(serie_a, serie_b)))
            series = diffs

213
214
215
216
        return {
            idate: serie
            for idate, serie in series
        }
217

218
219
220
    def get_delta(self, cn, seriename, delta,
                  from_value_date=None,
                  to_value_date=None):
221
222
223
224
        """ compute a series whose value dates are bounded to be
        `delta` time after the insertion dates and where we
        keep the most recent ones
        """
225

226
        histo = self.get_history(
227
228
229
            cn, seriename, deltabefore=-delta,
            from_value_date=from_value_date,
            to_value_date=to_value_date
230
231
        )

232
233
234
235
236
237
238
239
240
        vimap = {}
        vvmap = {}
        for idate, series in histo.items():
            for vdate, value in series.iteritems():
                if vdate not in vimap or vimap[vdate] < idate:
                    vimap[vdate] = idate
                    vvmap[vdate] = value

        return subset(pd.Series(vvmap), from_value_date, to_value_date)
241

242
243
    def exists(self, cn, seriename):
        return self._get_ts_table(cn, seriename) is not None
244

245
    def latest_insertion_date(self, cn, seriename):
246
        cset = self.schema.changeset
247
        tstable = self._get_ts_table(cn, seriename)
248
        sql = select([func.max(cset.c.insertion_date)]
249
        ).where(tstable.c.cset == cset.c.id)
250
        return cn.execute(sql).scalar()
251

252
253
254
255
    def last_id(self, cn, seriename):
        snapshot = Snapshot(cn, self, seriename)
        return snapshot.last_id()

256
257
258
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
        assert mode in ('strict', 'before', 'after')
        cset = self.schema.changeset
259
        table = self._table_definition_for(cn, seriename)
260
261
        sql = select([table.c.cset]).where(
            table.c.cset == cset.c.id
262
263
264
265
266
267
268
269
270
        )
        if mode == 'strict':
            sql = sql.where(cset.c.insertion_date == revdate)
        elif mode == 'before':
            sql = sql.where(cset.c.insertion_date <= revdate)
        else:
            sql = sql.where(cset.c.insertion_date >= revdate)
        return cn.execute(sql).scalar()

271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
    def delete(self, cn, seriename):
        assert not isinstance(cn, Engine), 'use a transaction object'
        assert self.exists(cn, seriename)
        # changeset will keep ghost entries
        # we cleanup changeset series, then registry
        # then we drop the two remaining tables
        # cn *must* be a transaction scope
        rid, tablename = cn.execute(
            'select id, table_name from "{}".registry '
            'where seriename = %(seriename)s'.format(self.namespace),
            seriename=seriename
        ).fetchone()
        # drop series tables
        cn.execute(
            'drop table "{}.timeserie"."{}" cascade'.format(self.namespace, tablename)
        )
        cn.execute(
            'drop table "{}.snapshot"."{}" cascade'.format(self.namespace, tablename)
        )
        # cleanup changesets table
        cn.execute('with csets as ('
                   ' select cset from "{ns}".changeset_series '
                   ' where serie = %(rid)s'
                   ') '
                   'delete from "{ns}".changeset as cset using csets '
                   'where cset.id = csets.cset'.format(ns=self.namespace),
                   rid=rid
        )
        cn.execute('delete from "{}".registry '
                   'where id = %(rid)s'.format(self.namespace),
                   rid=rid)
        # -> this will transitively cleanup state changeset_series entries
        self._resetcaches()

305
306
307
308
309
310
311
312
313
    def strip(self, cn, seriename, csid):
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs

        # put stripping info in the metadata
        cset = self.schema.changeset
        cset_serie = self.schema.changeset_series
        for log in logs:
            # update changeset.metadata
314
            metadata = self.changeset_metadata(cn, log['rev']) or {}
315
316
317
318
319
320
            metadata['tshistory.info'] = 'got stripped from {}'.format(csid)
            sql = cset.update().where(cset.c.id == log['rev']
            ).values(metadata=metadata)
            cn.execute(sql)
            # delete changset_serie item
            sql = cset_serie.delete().where(
321
                cset_serie.c.cset == log['rev']
322
            ).where(
323
                cset_serie.c.serie == self._name_to_regid(cn, seriename)
324
325
326
327
            )
            cn.execute(sql)

        # wipe the diffs
328
        table = self._table_definition_for(cn, seriename)
329
        cn.execute(table.delete().where(table.c.cset >= csid))
330

331
    def info(self, cn):
332
333
        """Gather global statistics on the current tshistory repository
        """
334
        sql = 'select count(*) from "{}".registry'.format(self.namespace)
335
        stats = {'series count': cn.execute(sql).scalar()}
336
        sql = 'select max(id) from "{}".changeset'.format(self.namespace)
337
        stats['changeset count'] = cn.execute(sql).scalar()
338
339
340
        sql = 'select distinct seriename from "{}".registry order by seriename'.format(
            self.namespace
        )
341
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
342
343
        return stats

344
    def log(self, cn, limit=0, names=None, authors=None,
345
            stripped=False,
346
347
            fromrev=None, torev=None,
            fromdate=None, todate=None):
348
349
350
351
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
352
353
354
355
356
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
357

358
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date, cset.c.metadata]
359
        ).distinct().order_by(desc(cset.c.id))
360
361
362

        if limit:
            sql = sql.limit(limit)
363
        if names:
364
            sql = sql.where(reg.c.seriename.in_(names))
365
366
        if authors:
            sql = sql.where(cset.c.author.in_(authors))
367
368
369
370
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)
        if torev:
            sql = sql.where(cset.c.id <= torev)
371
372
373
374
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)
        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)
375
376
377
378
        if stripped:
            # outerjoin to show dead things
            sql = sql.select_from(cset.outerjoin(cset_series))
        else:
379
            sql = sql.where(cset.c.id == cset_series.c.cset
380
            ).where(cset_series.c.serie == reg.c.id)
381

382
        rset = cn.execute(sql)
383
        for csetid, author, revdate, meta in rset.fetchall():
384
385
            log.append({'rev': csetid, 'author': author,
                        'date': pd.Timestamp(revdate, tz='utc'),
386
                        'meta': meta or {},
387
                        'names': self._changeset_series(cn, csetid)})
388

389
        log.sort(key=lambda rev: rev['rev'])
390
391
        return log

392
393
394
395
396
397
398
399
400
401
402
403
404
    def interval(self, cn, seriename):
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            raise ValueError(f'no such serie: {seriename}')
        sql = (f'select start, "end" '
               f'from "{self.namespace}.timeserie"."{tablename}" '
               f'order by cset desc limit 1')
        res = cn.execute(sql).fetchone()
        start, end = res.start, res.end
        if self.metadata(cn, seriename).get('tzaware'):
            start, end = pd.Timestamp(start, tz='UTC'), pd.Timestamp(end, tz='UTC')
        return pd.Interval(left=start, right=end, closed='both')

405
406
    # /API
    # Helpers
407

408
409
    # creation / update

410
    def _create(self, cn, newts, seriename, author,
411
                metadata=None, insertion_date=None):
412
413
414
        # initial insertion
        if len(newts) == 0:
            return None
415
        self._register_serie(cn, seriename, newts)
416
        snapshot = Snapshot(cn, self, seriename)
417
        csid = self._newchangeset(cn, author, insertion_date, metadata)
418
        head = snapshot.create(newts)
419
        start, end = start_end(newts)
420
421
        value = {
            'cset': csid,
422
423
424
            'snapshot': head,
            'start': start,
            'end': end
425
        }
Aurélien Campéas's avatar
Aurélien Campéas committed
426
        table = self._make_ts_table(cn, seriename)
427
        cn.execute(table.insert().values(value))
428
        self._finalize_insertion(cn, csid, seriename)
429
        L.info('first insertion of %s (size=%s) by %s',
430
               seriename, len(newts), author)
431
432
        return newts

433
    def _update(self, cn, table, newts, seriename, author,
434
                metadata=None, insertion_date=None):
435
436
        self._validate(cn, newts, seriename)
        snapshot = Snapshot(cn, self, seriename)
437
438
439
        diff = self.diff(snapshot.last(newts.index.min(),
                                       newts.index.max()),
                         newts)
440
441
        if not len(diff):
            L.info('no difference in %s by %s (for ts of size %s)',
442
                   seriename, author, len(newts))
443
444
            return

445
        csid = self._newchangeset(cn, author, insertion_date, metadata)
446
447
448
449
        tsstart, tsend = start_end(newts)
        ival = self.interval(cn, seriename)
        start = min(tsstart, ival.left.replace(tzinfo=None))
        end = max(tsend, ival.right.replace(tzinfo=None))
450
451
452
        head = snapshot.update(diff)
        value = {
            'cset': csid,
453
454
455
            'snapshot': head,
            'start': start,
            'end': end
456
457
        }
        cn.execute(table.insert().values(value))
458
        self._finalize_insertion(cn, csid, seriename)
459
460

        L.info('inserted diff (size=%s) for ts %s by %s',
461
               len(diff), seriename, author)
462
463
        return diff

464
    # serie table handling
465

466
467
468
469
    def _serie_to_tablename(self, cn, seriename):
        tablename = self.serie_tablename.get(seriename)
        if tablename is not None:
            return tablename
470

471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
        reg = self.schema.registry
        sql = select([reg.c.table_name]).where(reg.c.seriename == seriename)
        tablename = cn.execute(sql).scalar()
        if tablename is None:
            # creation time
            return
        self.serie_tablename[seriename] = tablename
        return tablename

    def _table_definition_for(self, cn, seriename):
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            # creation time
            tablename = self._make_tablename(seriename)
        fq_tablename = '{}.timeserie.{}'.format(self.namespace, tablename)
        table = TABLES.get(fq_tablename)
487
        if table is None:
488
489
            TABLES[fq_tablename] = table = Table(
                tablename, self.schema.meta,
490
491
492
                Column('id', Integer, primary_key=True),
                Column('cset', Integer,
                       ForeignKey('{}.changeset.id'.format(self.namespace)),
493
                       nullable=False),
494
495
                Column('start', TIMESTAMP, nullable=False),
                Column('end', TIMESTAMP, nullable=False),
496
497
498
                Column('snapshot', Integer,
                       ForeignKey('{}.snapshot.{}.id'.format(
                           self.namespace,
499
500
501
                           tablename))),
                Index(NONE_NAME, 'cset'),
                Index(NONE_NAME, 'snapshot'),
502
503
                Index(NONE_NAME, 'start'),
                Index(NONE_NAME, 'end'),
504
                schema='{}.timeserie'.format(self.namespace),
505
                keep_existing=True
506
507
            )
        return table
508

Aurélien Campéas's avatar
Aurélien Campéas committed
509
    def _make_ts_table(self, cn, seriename):
510
        table = self._table_definition_for(cn, seriename)
511
        table.create(cn)
512
513
514
        return table

    def _register_serie(self, cn, seriename, ts):
515
516
        index = ts.index
        inames = [name for name in index.names if name]
517
        sql = self.schema.registry.insert().values(
518
519
            seriename=seriename,
            table_name=self._make_tablename(seriename),
520
521
522
523
            metadata={
                'tzaware': tzaware_serie(ts),
                'index_type': index.dtype.name,
                'index_names': inames,
524
525
                'index_dtype': index.dtype.str,
                'value_dtype': ts.dtypes.str,
526
                'value_type': ts.dtypes.name
527
            }
528
        )
529
530
        regid = cn.execute(sql).inserted_primary_key[0]
        self.registry_map[seriename] = regid
531

532
    def _get_ts_table(self, cn, seriename):
533
534
535
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename:
            return self._table_definition_for(cn, tablename)
536

537
538
    # changeset handling

539
    def _newchangeset(self, cn, author, insertion_date=None, metadata=None):
540
        table = self.schema.changeset
541
542
543
        if insertion_date is not None:
            assert insertion_date.tzinfo is not None
        idate = pd.Timestamp(insertion_date or datetime.utcnow(), tz='UTC')
544
545
        sql = table.insert().values(
            author=author,
546
            metadata=metadata,
547
            insertion_date=idate)
548
        return cn.execute(sql).inserted_primary_key[0]
549

550
    def _changeset_series(self, cn, csid):
551
        cset_serie = self.schema.changeset_series
552
553
        reg = self.schema.registry
        sql = select(
554
            [reg.c.seriename]
555
556
        ).where(cset_serie.c.cset == csid
        ).where(cset_serie.c.serie == reg.c.id)
557

558
        return [
559
            row.seriename
560
561
            for row in cn.execute(sql).fetchall()
        ]
562

563
564
565
566
567
568
569
    def _previous_cset(self, cn, seriename, csid):
        tablename = self._serie_to_tablename(cn, seriename)
        sql = ('select cset from "{}.timeserie"."{}" '
               'where cset < %(csid)s '
               'order by cset desc limit 1').format(self.namespace, tablename)
        return cn.execute(sql, csid=csid).scalar()

570
571
    # insertion handling

572
    def _validate(self, cn, ts, seriename):
573
574
        if ts.isnull().all():
            # ts erasure
575
            return
576
        tstype = ts.dtype
577
        meta = self.metadata(cn, seriename)
578
        if tstype != meta['value_type']:
579
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
580
                seriename, tstype, meta['value_type'])
581
            raise Exception(m)
582
        if ts.index.dtype.name != meta['index_type']:
583
            raise Exception('Incompatible index types')
584

585
586
    def _name_to_regid(self, cn, seriename):
        regid = self.registry_map.get(seriename)
587
588
589
590
        if regid is not None:
            return regid

        registry = self.schema.registry
591
        sql = select([registry.c.id]).where(registry.c.seriename == seriename)
592
        regid = self.registry_map[seriename] = cn.execute(sql).scalar()
593
594
        return regid

595
    def _finalize_insertion(self, cn, csid, seriename):
596
597
598
        table = self.schema.changeset_series
        sql = table.insert().values(
            cset=csid,
599
            serie=self._name_to_regid(cn, seriename)
600
601
        )
        cn.execute(sql)
602

603
604
605
    def _resetcaches(self):
        TABLES.clear()
        SNAPTABLES.clear()
606
607
608
        self.metadatacache.clear()
        self.registry_map.clear()
        self.serie_tablename.clear()