tsio.py 24 KB
Newer Older
1
from datetime import datetime
2
import logging
3
4
5

import pandas as pd

6
7
from sqlalchemy import Table, Column, Integer, ForeignKey, Index
from sqlalchemy.sql.elements import NONE_NAME
8
from sqlalchemy.engine.base import Engine
Aurélien Campéas's avatar
Aurélien Campéas committed
9
from sqlalchemy.sql.expression import select, func, desc
Aurélien Campéas's avatar
Aurélien Campéas committed
10
from sqlalchemy.dialects.postgresql import TIMESTAMP
11

12
from tshistory.schema import tsschema
13
from tshistory.util import (
14
    closed_overlaps,
15
    num2float,
16
    subset,
17
    SeriesServices,
18
    start_end,
19
20
    tzaware_serie
)
21
from tshistory.snapshot import Snapshot, TABLES as SNAPTABLES
22

23
L = logging.getLogger('tshistory.tsio')
24
TABLES = {}
25
26


27
class TimeSerie(SeriesServices):
28
    namespace = 'tsh'
29
    schema = None
30
    metadatacache = None
31
32
33
34
35
36
37
38
    metakeys = {
        'tzaware',
        'index_type',
        'index_names',
        'index_dtype',
        'value_dtype',
        'value_type'
    }
39
    registry_map = None
40
    serie_tablename = None
41
    create_lock_id = None
42
43
44

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
45
46
        self.schema = tsschema(namespace)
        self.schema.define()
47
        self.metadatacache = {}
48
        self.registry_map = {}
49
        self.serie_tablename = {}
50
        self.create_lock_id = sum(ord(c) for c in namespace)
51

52
    def insert(self, cn, newts, seriename, author,
53
54
               metadata=None,
               _insertion_date=None):
55
        """Create a new revision of a given time series
56

57
        newts: pandas.Series with date index
58
        seriename: str unique identifier of the serie
59
        author: str free-form author name
60
        metadata: optional dict for changeset metadata
61
        """
62
63
64
65
        assert isinstance(newts, pd.Series), 'Not a pd.Series'
        assert isinstance(seriename, str), 'Name not a string'
        assert isinstance(author, str), 'Author not a string'
        assert metadata is None or isinstance(metadata, dict), 'Bad format for metadata'
66
67
        assert (_insertion_date is None or
                isinstance(_insertion_date, datetime)), 'Bad format for insertion date'
68
        assert not newts.index.duplicated().any(), 'There are some duplicates in the index'
69

70
        assert newts.index.notna().all(), 'The index contains NaT entries'
71
72
        if not newts.index.is_monotonic_increasing:
            newts = newts.sort_index()
73

74
        newts = num2float(newts)
75

76
        if not len(newts):
77
            return
78

79
        assert ('<M8[ns]' == newts.index.dtype or
80
                'datetime' in str(newts.index.dtype) and not
81
82
                isinstance(newts.index, pd.MultiIndex))

83
84
        newts.name = seriename
        table = self._get_ts_table(cn, seriename)
85

86
        if table is None:
87
            return self._create(cn, newts, seriename, author,
88
                                metadata, _insertion_date)
89

90
        return self._update(cn, table, newts, seriename, author,
91
                            metadata, _insertion_date)
92

93
    def get(self, cn, seriename, revision_date=None,
94
95
            from_value_date=None, to_value_date=None,
            _keep_nans=False):
96
97
98
99
100
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

101
        """
102
        table = self._get_ts_table(cn, seriename)
103
104
        if table is None:
            return
105

106
        csetfilter = []
107
        if revision_date:
108
            csetfilter.append(lambda cset: cset.c.insertion_date <= revision_date)
109
        snap = Snapshot(cn, self, seriename)
110
        _, current = snap.find(csetfilter=csetfilter,
111
112
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)
113

114
        if current is not None and not _keep_nans:
115
            current.name = seriename
116
            current = current[~current.isnull()]
117
        return current
118

119
    def metadata(self, cn, seriename):
120
        """Return metadata dict of timeserie."""
121
122
        if seriename in self.metadatacache:
            return self.metadatacache[seriename]
123
124
        reg = self.schema.registry
        sql = select([reg.c.metadata]).where(
125
            reg.c.seriename == seriename
126
127
        )
        meta = cn.execute(sql).scalar()
128
        self.metadatacache[seriename] = meta
129
130
        return meta

131
    def update_metadata(self, cn, seriename, metadata):
132
        assert isinstance(metadata, dict)
133
        assert not set(metadata.keys()) & self.metakeys
134
        meta = self.metadata(cn, seriename)
135
136
137
        meta.update(metadata)
        reg = self.schema.registry
        sql = reg.update().where(
138
            reg.c.seriename == seriename
139
140
        ).values(metadata=meta)
        self.metadatacache.pop(seriename)
141
142
        cn.execute(sql)

143
144
145
146
147
148
149
    def changeset_metadata(self, cn, csid):
        sql = 'select metadata from "{ns}".changeset where id = {id}'.format(
            ns=self.namespace,
            id=csid
        )
        return cn.execute(sql).scalar()

150
    def get_history(self, cn, seriename,
151
                    from_insertion_date=None,
152
153
                    to_insertion_date=None,
                    from_value_date=None,
154
                    to_value_date=None,
155
                    deltabefore=None,
156
157
                    deltaafter=None,
                    diffmode=False):
158
        table = self._get_ts_table(cn, seriename)
159
160
161
        if table is None:
            return

162
        cset = self.schema.changeset
163
164
165
166
167
168
169
170
171
172
173
174
        revsql = select(
            [cset.c.id, cset.c.insertion_date]
        ).order_by(
            cset.c.id
        ).where(
            table.c.cset == cset.c.id
        )

        if from_insertion_date:
            revsql = revsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            revsql = revsql.where(cset.c.insertion_date <= to_insertion_date)
175

176
177
178
179
180
181
182
183
184
        if from_value_date or to_value_date:
            revsql = revsql.where(
                closed_overlaps(from_value_date, to_value_date)
            )

        revs = cn.execute(
            revsql,
            {'fromdate': from_value_date, 'todate': to_value_date}
        ).fetchall()
185
        if not revs:
186
            return {}
187

188
189
190
191
192
193
        if diffmode:
            # compute the previous serie value
            first_csid = revs[0][0]
            previous_csid = self._previous_cset(cn, seriename, first_csid)
            revs.insert(0, (previous_csid, None))

194
        snapshot = Snapshot(cn, self, seriename)
195
        series = []
196
197
        if (deltabefore, deltaafter) != (None, None):
            for csid, idate in revs:
198
199
                from_date = None
                to_date = None
200
                if deltabefore is not None:
201
                    from_date = idate - deltabefore
202
                if deltaafter is not None:
203
                    to_date = idate + deltaafter
204
205
206
                series.append((
                    idate,
                    snapshot.find(csetfilter=[lambda cset: cset.c.id == csid],
207
208
                                  from_value_date=from_date,
                                  to_value_date=to_date)[1]
209
210
211
212
213
214
                ))
        else:
            series = snapshot.findall(revs,
                                      from_value_date,
                                      to_value_date
            )
215

216
217
        if diffmode:
            diffs = []
Aurélien Campéas's avatar
Aurélien Campéas committed
218
            for (_revdate_a, serie_a), (revdate_b, serie_b) in zip(series, series[1:]):
219
220
221
222
223
224
225
                if serie_a is None:
                    # when we scan the entirety of the history: there exists no "previous" serie
                    # we therefore consider the first serie as a diff to the "null" serie
                    diffs.append((revdate_b, serie_b))
                else:
                    diffs.append((revdate_b, self.diff(serie_a, serie_b)))
            series = diffs
226
227
228
229
230
        else:
            series = [
                (idate, ts.dropna())
                 for idate, ts in series
            ]
231

232
233
234
235
        return {
            idate: serie
            for idate, serie in series
        }
236

237
238
239
    def get_delta(self, cn, seriename, delta,
                  from_value_date=None,
                  to_value_date=None):
240
241
242
243
        """ compute a series whose value dates are bounded to be
        `delta` time after the insertion dates and where we
        keep the most recent ones
        """
244

245
        histo = self.get_history(
246
247
248
            cn, seriename, deltabefore=-delta,
            from_value_date=from_value_date,
            to_value_date=to_value_date
249
        )
250
251
        if histo is None:
            return None
252

253
254
255
256
257
258
259
260
        vimap = {}
        vvmap = {}
        for idate, series in histo.items():
            for vdate, value in series.iteritems():
                if vdate not in vimap or vimap[vdate] < idate:
                    vimap[vdate] = idate
                    vvmap[vdate] = value

261
        ts = subset(pd.Series(vvmap).sort_index(), from_value_date, to_value_date)
262
263
        ts.name = seriename
        return ts
264

265
266
    def exists(self, cn, seriename):
        return self._get_ts_table(cn, seriename) is not None
267

268
    def latest_insertion_date(self, cn, seriename):
269
        cset = self.schema.changeset
270
        tstable = self._get_ts_table(cn, seriename)
271
        sql = select([func.max(cset.c.insertion_date)]
272
        ).where(tstable.c.cset == cset.c.id)
273
        return cn.execute(sql).scalar()
274

275
276
277
278
    def last_id(self, cn, seriename):
        snapshot = Snapshot(cn, self, seriename)
        return snapshot.last_id()

279
280
281
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
        assert mode in ('strict', 'before', 'after')
        cset = self.schema.changeset
282
        table = self._table_definition_for(cn, seriename)
283
284
        sql = select([table.c.cset]).where(
            table.c.cset == cset.c.id
285
286
287
288
289
290
291
292
293
        )
        if mode == 'strict':
            sql = sql.where(cset.c.insertion_date == revdate)
        elif mode == 'before':
            sql = sql.where(cset.c.insertion_date <= revdate)
        else:
            sql = sql.where(cset.c.insertion_date >= revdate)
        return cn.execute(sql).scalar()

294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
    def delete(self, cn, seriename):
        assert not isinstance(cn, Engine), 'use a transaction object'
        assert self.exists(cn, seriename)
        # changeset will keep ghost entries
        # we cleanup changeset series, then registry
        # then we drop the two remaining tables
        # cn *must* be a transaction scope
        rid, tablename = cn.execute(
            'select id, table_name from "{}".registry '
            'where seriename = %(seriename)s'.format(self.namespace),
            seriename=seriename
        ).fetchone()
        # drop series tables
        cn.execute(
            'drop table "{}.timeserie"."{}" cascade'.format(self.namespace, tablename)
        )
        cn.execute(
            'drop table "{}.snapshot"."{}" cascade'.format(self.namespace, tablename)
        )
        # cleanup changesets table
        cn.execute('with csets as ('
                   ' select cset from "{ns}".changeset_series '
                   ' where serie = %(rid)s'
                   ') '
                   'delete from "{ns}".changeset as cset using csets '
                   'where cset.id = csets.cset'.format(ns=self.namespace),
                   rid=rid
        )
        cn.execute('delete from "{}".registry '
                   'where id = %(rid)s'.format(self.namespace),
                   rid=rid)
        # -> this will transitively cleanup state changeset_series entries
        self._resetcaches()

328
329
330
331
332
333
334
335
336
    def strip(self, cn, seriename, csid):
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs

        # put stripping info in the metadata
        cset = self.schema.changeset
        cset_serie = self.schema.changeset_series
        for log in logs:
            # update changeset.metadata
337
            metadata = self.changeset_metadata(cn, log['rev']) or {}
338
339
340
341
342
343
            metadata['tshistory.info'] = 'got stripped from {}'.format(csid)
            sql = cset.update().where(cset.c.id == log['rev']
            ).values(metadata=metadata)
            cn.execute(sql)
            # delete changset_serie item
            sql = cset_serie.delete().where(
344
                cset_serie.c.cset == log['rev']
345
            ).where(
346
                cset_serie.c.serie == self._name_to_regid(cn, seriename)
347
348
349
350
            )
            cn.execute(sql)

        # wipe the diffs
351
        table = self._table_definition_for(cn, seriename)
352
        cn.execute(table.delete().where(table.c.cset >= csid))
353

354
    def info(self, cn):
355
356
        """Gather global statistics on the current tshistory repository
        """
357
        sql = 'select count(*) from "{}".registry'.format(self.namespace)
358
        stats = {'series count': cn.execute(sql).scalar()}
359
        sql = 'select max(id) from "{}".changeset'.format(self.namespace)
360
        stats['changeset count'] = cn.execute(sql).scalar()
361
362
363
        sql = 'select distinct seriename from "{}".registry order by seriename'.format(
            self.namespace
        )
364
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
365
366
        return stats

367
    def log(self, cn, limit=0, names=None, authors=None,
368
            stripped=False,
369
370
            fromrev=None, torev=None,
            fromdate=None, todate=None):
371
372
373
374
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
375
376
377
378
379
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
380

381
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date, cset.c.metadata]
382
        ).distinct().order_by(desc(cset.c.id))
383
384
385

        if limit:
            sql = sql.limit(limit)
386
        if names:
387
            sql = sql.where(reg.c.seriename.in_(names))
388
389
        if authors:
            sql = sql.where(cset.c.author.in_(authors))
390
391
392
393
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)
        if torev:
            sql = sql.where(cset.c.id <= torev)
394
395
396
397
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)
        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)
398
399
400
401
        if stripped:
            # outerjoin to show dead things
            sql = sql.select_from(cset.outerjoin(cset_series))
        else:
402
            sql = sql.where(cset.c.id == cset_series.c.cset
403
            ).where(cset_series.c.serie == reg.c.id)
404

405
        rset = cn.execute(sql)
406
        for csetid, author, revdate, meta in rset.fetchall():
407
408
            log.append({'rev': csetid, 'author': author,
                        'date': pd.Timestamp(revdate, tz='utc'),
409
                        'meta': meta or {},
410
                        'names': self._changeset_series(cn, csetid)})
411

412
        log.sort(key=lambda rev: rev['rev'])
413
414
        return log

415
    def interval(self, cn, seriename, notz=False):
416
417
418
419
420
421
422
423
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            raise ValueError(f'no such serie: {seriename}')
        sql = (f'select start, "end" '
               f'from "{self.namespace}.timeserie"."{tablename}" '
               f'order by cset desc limit 1')
        res = cn.execute(sql).fetchone()
        start, end = res.start, res.end
424
        if self.metadata(cn, seriename).get('tzaware') and not notz:
425
426
427
            start, end = pd.Timestamp(start, tz='UTC'), pd.Timestamp(end, tz='UTC')
        return pd.Interval(left=start, right=end, closed='both')

428
429
    # /API
    # Helpers
430

431
432
    # creation / update

433
    def _create(self, cn, newts, seriename, author,
434
                metadata=None, insertion_date=None):
435
        start, end = start_end(newts, notz=False)
436
437
438
439
        if start is None:
            assert end is None
            # this is just full of nans
            return None
440
441
442
443
444
        # chop off unwanted nans
        newts = newts.loc[start:end]
        if len(newts) == 0:
            return None

445
446
447
448
449
        # at creation time we take an exclusive lock to avoid
        # a deadlock on created tables against the changeset-series fk
        cn.execute(
            'select pg_advisory_xact_lock({})'.format(self.create_lock_id)
        )
450
        self._register_serie(cn, seriename, newts)
451
        snapshot = Snapshot(cn, self, seriename)
452
        csid = self._newchangeset(cn, author, insertion_date, metadata)
453
        head = snapshot.create(newts)
454
        start, end = start_end(newts)
455
456
        value = {
            'cset': csid,
457
458
459
            'snapshot': head,
            'start': start,
            'end': end
460
        }
Aurélien Campéas's avatar
Aurélien Campéas committed
461
        table = self._make_ts_table(cn, seriename)
462
        cn.execute(table.insert().values(value))
463
        self._finalize_insertion(cn, csid, seriename)
464
        L.info('first insertion of %s (size=%s) by %s',
465
               seriename, len(newts), author)
466
467
        return newts

468
    def _update(self, cn, table, newts, seriename, author,
469
                metadata=None, insertion_date=None):
470
471
        self._validate(cn, newts, seriename)
        snapshot = Snapshot(cn, self, seriename)
472
473
474
        diff = self.diff(snapshot.last(newts.index.min(),
                                       newts.index.max()),
                         newts)
475
476
        if not len(diff):
            L.info('no difference in %s by %s (for ts of size %s)',
477
                   seriename, author, len(newts))
478
479
            return

480
        # compute series start/end stamps
481
        tsstart, tsend = start_end(newts)
482
483
484
        ival = self.interval(cn, seriename, notz=True)
        start = min(tsstart or ival.left, ival.left)
        end = max(tsend or ival.right, ival.right)
485
486
487
488
489
490
491
492
493
494
495
496
497
498

        if pd.isnull(diff[0]) or pd.isnull(diff[-1]):
            # we *might* be shrinking, let's look at the full series
            # and yes, shrinkers have a slow path
            last = snapshot.last()
            patched = self.patch(last, diff).dropna()
            if not len(patched):
                raise ValueError('complete erasure of a series is forbidden')
            if pd.isnull(diff[0]):
                start = patched.index[0]
            if pd.isnull(diff[-1]):
                end = patched.index[-1]

        csid = self._newchangeset(cn, author, insertion_date, metadata)
499
500
501
        head = snapshot.update(diff)
        value = {
            'cset': csid,
502
503
504
            'snapshot': head,
            'start': start,
            'end': end
505
506
        }
        cn.execute(table.insert().values(value))
507
        self._finalize_insertion(cn, csid, seriename)
508
509

        L.info('inserted diff (size=%s) for ts %s by %s',
510
               len(diff), seriename, author)
511
512
        return diff

513
    # serie table handling
514

515
516
517
518
    def _serie_to_tablename(self, cn, seriename):
        tablename = self.serie_tablename.get(seriename)
        if tablename is not None:
            return tablename
519

520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
        reg = self.schema.registry
        sql = select([reg.c.table_name]).where(reg.c.seriename == seriename)
        tablename = cn.execute(sql).scalar()
        if tablename is None:
            # creation time
            return
        self.serie_tablename[seriename] = tablename
        return tablename

    def _table_definition_for(self, cn, seriename):
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            # creation time
            tablename = self._make_tablename(seriename)
        fq_tablename = '{}.timeserie.{}'.format(self.namespace, tablename)
        table = TABLES.get(fq_tablename)
536
        if table is None:
537
538
            TABLES[fq_tablename] = table = Table(
                tablename, self.schema.meta,
539
540
541
                Column('id', Integer, primary_key=True),
                Column('cset', Integer,
                       ForeignKey('{}.changeset.id'.format(self.namespace)),
542
                       nullable=False),
543
544
                Column('start', TIMESTAMP, nullable=False),
                Column('end', TIMESTAMP, nullable=False),
545
546
547
                Column('snapshot', Integer,
                       ForeignKey('{}.snapshot.{}.id'.format(
                           self.namespace,
548
549
550
                           tablename))),
                Index(NONE_NAME, 'cset'),
                Index(NONE_NAME, 'snapshot'),
551
552
                Index(NONE_NAME, 'start'),
                Index(NONE_NAME, 'end'),
553
                schema='{}.timeserie'.format(self.namespace),
554
                keep_existing=True
555
556
            )
        return table
557

Aurélien Campéas's avatar
Aurélien Campéas committed
558
    def _make_ts_table(self, cn, seriename):
559
        table = self._table_definition_for(cn, seriename)
560
        table.create(cn)
561
562
563
        return table

    def _register_serie(self, cn, seriename, ts):
564
565
        index = ts.index
        inames = [name for name in index.names if name]
566
        sql = self.schema.registry.insert().values(
567
568
            seriename=seriename,
            table_name=self._make_tablename(seriename),
569
570
571
572
            metadata={
                'tzaware': tzaware_serie(ts),
                'index_type': index.dtype.name,
                'index_names': inames,
573
574
                'index_dtype': index.dtype.str,
                'value_dtype': ts.dtypes.str,
575
                'value_type': ts.dtypes.name
576
            }
577
        )
578
579
        regid = cn.execute(sql).inserted_primary_key[0]
        self.registry_map[seriename] = regid
580

581
    def _get_ts_table(self, cn, seriename):
582
583
584
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename:
            return self._table_definition_for(cn, tablename)
585

586
587
    # changeset handling

588
    def _newchangeset(self, cn, author, insertion_date=None, metadata=None):
589
        table = self.schema.changeset
590
591
592
        if insertion_date is not None:
            assert insertion_date.tzinfo is not None
        idate = pd.Timestamp(insertion_date or datetime.utcnow(), tz='UTC')
593
594
        sql = table.insert().values(
            author=author,
595
            metadata=metadata,
596
            insertion_date=idate)
597
        return cn.execute(sql).inserted_primary_key[0]
598

599
    def _changeset_series(self, cn, csid):
600
        cset_serie = self.schema.changeset_series
601
602
        reg = self.schema.registry
        sql = select(
603
            [reg.c.seriename]
604
605
        ).where(cset_serie.c.cset == csid
        ).where(cset_serie.c.serie == reg.c.id)
606

607
        return [
608
            row.seriename
609
610
            for row in cn.execute(sql).fetchall()
        ]
611

612
613
614
615
616
617
618
    def _previous_cset(self, cn, seriename, csid):
        tablename = self._serie_to_tablename(cn, seriename)
        sql = ('select cset from "{}.timeserie"."{}" '
               'where cset < %(csid)s '
               'order by cset desc limit 1').format(self.namespace, tablename)
        return cn.execute(sql, csid=csid).scalar()

619
620
    # insertion handling

621
    def _validate(self, cn, ts, seriename):
622
623
        if ts.isnull().all():
            # ts erasure
624
            return
625
        tstype = ts.dtype
626
        meta = self.metadata(cn, seriename)
627
        if tstype != meta['value_type']:
628
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
629
                seriename, tstype, meta['value_type'])
630
            raise Exception(m)
631
        if ts.index.dtype.name != meta['index_type']:
632
            raise Exception('Incompatible index types')
633

634
635
    def _name_to_regid(self, cn, seriename):
        regid = self.registry_map.get(seriename)
636
637
638
639
        if regid is not None:
            return regid

        registry = self.schema.registry
640
        sql = select([registry.c.id]).where(registry.c.seriename == seriename)
641
        regid = self.registry_map[seriename] = cn.execute(sql).scalar()
642
643
        return regid

644
    def _finalize_insertion(self, cn, csid, seriename):
645
646
647
        table = self.schema.changeset_series
        sql = table.insert().values(
            cset=csid,
648
            serie=self._name_to_regid(cn, seriename)
649
650
        )
        cn.execute(sql)
651

652
653
654
    def _resetcaches(self):
        TABLES.clear()
        SNAPTABLES.clear()
655
656
657
        self.metadatacache.clear()
        self.registry_map.clear()
        self.serie_tablename.clear()