tsio.py 23.9 KB
Newer Older
1
from datetime import datetime
2
import logging
3
4
5

import pandas as pd

6
7
from sqlalchemy import Table, Column, Integer, ForeignKey, Index
from sqlalchemy.sql.elements import NONE_NAME
8
from sqlalchemy.engine.base import Engine
Aurélien Campéas's avatar
Aurélien Campéas committed
9
from sqlalchemy.sql.expression import select, func, desc
Aurélien Campéas's avatar
Aurélien Campéas committed
10
from sqlalchemy.dialects.postgresql import TIMESTAMP
11

12
from tshistory.schema import tsschema
13
from tshistory.util import (
14
    closed_overlaps,
15
    num2float,
16
    subset,
17
    SeriesServices,
18
    start_end,
19
20
    tzaware_serie
)
21
from tshistory.snapshot import Snapshot, TABLES as SNAPTABLES
22

23
L = logging.getLogger('tshistory.tsio')
24
TABLES = {}
25
26


27
class TimeSerie(SeriesServices):
28
    namespace = 'tsh'
29
    schema = None
30
    metadatacache = None
31
    registry_map = None
32
    serie_tablename = None
33
    create_lock_id = None
34
35
36

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
37
38
        self.schema = tsschema(namespace)
        self.schema.define()
39
        self.metadatacache = {}
40
        self.registry_map = {}
41
        self.serie_tablename = {}
42
        self.create_lock_id = sum(ord(c) for c in namespace)
43

44
    def insert(self, cn, newts, seriename, author,
45
46
               metadata=None,
               _insertion_date=None):
47
        """Create a new revision of a given time series
48

49
        newts: pandas.Series with date index
50
        seriename: str unique identifier of the serie
51
        author: str free-form author name
52
        metadata: optional dict for changeset metadata
53
        """
54
55
56
57
        assert isinstance(newts, pd.Series), 'Not a pd.Series'
        assert isinstance(seriename, str), 'Name not a string'
        assert isinstance(author, str), 'Author not a string'
        assert metadata is None or isinstance(metadata, dict), 'Bad format for metadata'
58
59
        assert (_insertion_date is None or
                isinstance(_insertion_date, datetime)), 'Bad format for insertion date'
60
        assert not newts.index.duplicated().any(), 'There are some duplicates in the index'
61

62
        assert newts.index.notna().all(), 'The index contains NaT entries'
63
64
        if not newts.index.is_monotonic_increasing:
            newts = newts.sort_index()
65

66
        newts = num2float(newts)
67

68
        if not len(newts):
69
            return
70

71
        assert ('<M8[ns]' == newts.index.dtype or
72
                'datetime' in str(newts.index.dtype) and not
73
74
                isinstance(newts.index, pd.MultiIndex))

75
76
        newts.name = seriename
        table = self._get_ts_table(cn, seriename)
77

78
        if table is None:
79
            return self._create(cn, newts, seriename, author,
80
                                metadata, _insertion_date)
81

82
        return self._update(cn, table, newts, seriename, author,
83
                            metadata, _insertion_date)
84

85
    def get(self, cn, seriename, revision_date=None,
86
87
            from_value_date=None, to_value_date=None,
            _keep_nans=False):
88
89
90
91
92
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

93
        """
94
        table = self._get_ts_table(cn, seriename)
95
96
        if table is None:
            return
97

98
        csetfilter = []
99
        if revision_date:
100
            csetfilter.append(lambda cset: cset.c.insertion_date <= revision_date)
101
        snap = Snapshot(cn, self, seriename)
102
        _, current = snap.find(csetfilter=csetfilter,
103
104
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)
105

106
        if current is not None and not _keep_nans:
107
            current.name = seriename
108
            current = current[~current.isnull()]
109
        return current
110

111
    def metadata(self, cn, seriename):
112
        """Return metadata dict of timeserie."""
113
114
        if seriename in self.metadatacache:
            return self.metadatacache[seriename]
115
116
        reg = self.schema.registry
        sql = select([reg.c.metadata]).where(
117
            reg.c.seriename == seriename
118
119
        )
        meta = cn.execute(sql).scalar()
120
        self.metadatacache[seriename] = meta
121
122
        return meta

123
    def update_metadata(self, cn, seriename, metadata, internal=False):
124
        assert isinstance(metadata, dict)
125
        meta = self.metadata(cn, seriename)
126
127
128
129
130
        if not internal:
            assert set(meta.keys()).intersection(metadata.keys()) == set()
        meta.update(metadata)
        reg = self.schema.registry
        sql = reg.update().where(
131
            reg.c.seriename == seriename
132
133
        ).values(metadata=meta)
        self.metadatacache.pop(seriename)
134
135
        cn.execute(sql)

136
137
138
139
140
141
142
    def changeset_metadata(self, cn, csid):
        sql = 'select metadata from "{ns}".changeset where id = {id}'.format(
            ns=self.namespace,
            id=csid
        )
        return cn.execute(sql).scalar()

143
    def get_history(self, cn, seriename,
144
                    from_insertion_date=None,
145
146
                    to_insertion_date=None,
                    from_value_date=None,
147
                    to_value_date=None,
148
                    deltabefore=None,
149
150
                    deltaafter=None,
                    diffmode=False):
151
        table = self._get_ts_table(cn, seriename)
152
153
154
        if table is None:
            return

155
        cset = self.schema.changeset
156
157
158
159
160
161
162
163
164
165
166
167
        revsql = select(
            [cset.c.id, cset.c.insertion_date]
        ).order_by(
            cset.c.id
        ).where(
            table.c.cset == cset.c.id
        )

        if from_insertion_date:
            revsql = revsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            revsql = revsql.where(cset.c.insertion_date <= to_insertion_date)
168

169
170
171
172
173
174
175
176
177
        if from_value_date or to_value_date:
            revsql = revsql.where(
                closed_overlaps(from_value_date, to_value_date)
            )

        revs = cn.execute(
            revsql,
            {'fromdate': from_value_date, 'todate': to_value_date}
        ).fetchall()
178
        if not revs:
179
            return {}
180

181
182
183
184
185
186
        if diffmode:
            # compute the previous serie value
            first_csid = revs[0][0]
            previous_csid = self._previous_cset(cn, seriename, first_csid)
            revs.insert(0, (previous_csid, None))

187
        snapshot = Snapshot(cn, self, seriename)
188
        series = []
189
190
        if (deltabefore, deltaafter) != (None, None):
            for csid, idate in revs:
191
192
                from_date = None
                to_date = None
193
                if deltabefore is not None:
194
                    from_date = idate - deltabefore
195
                if deltaafter is not None:
196
                    to_date = idate + deltaafter
197
198
199
                series.append((
                    idate,
                    snapshot.find(csetfilter=[lambda cset: cset.c.id == csid],
200
201
                                  from_value_date=from_date,
                                  to_value_date=to_date)[1]
202
203
204
205
206
207
                ))
        else:
            series = snapshot.findall(revs,
                                      from_value_date,
                                      to_value_date
            )
208

209
210
        if diffmode:
            diffs = []
Aurélien Campéas's avatar
Aurélien Campéas committed
211
            for (_revdate_a, serie_a), (revdate_b, serie_b) in zip(series, series[1:]):
212
213
214
215
216
217
218
                if serie_a is None:
                    # when we scan the entirety of the history: there exists no "previous" serie
                    # we therefore consider the first serie as a diff to the "null" serie
                    diffs.append((revdate_b, serie_b))
                else:
                    diffs.append((revdate_b, self.diff(serie_a, serie_b)))
            series = diffs
219
220
221
222
223
        else:
            series = [
                (idate, ts.dropna())
                 for idate, ts in series
            ]
224

225
226
227
228
        return {
            idate: serie
            for idate, serie in series
        }
229

230
231
232
    def get_delta(self, cn, seriename, delta,
                  from_value_date=None,
                  to_value_date=None):
233
234
235
236
        """ compute a series whose value dates are bounded to be
        `delta` time after the insertion dates and where we
        keep the most recent ones
        """
237

238
        histo = self.get_history(
239
240
241
            cn, seriename, deltabefore=-delta,
            from_value_date=from_value_date,
            to_value_date=to_value_date
242
        )
243
244
        if histo is None:
            return None
245

246
247
248
249
250
251
252
253
        vimap = {}
        vvmap = {}
        for idate, series in histo.items():
            for vdate, value in series.iteritems():
                if vdate not in vimap or vimap[vdate] < idate:
                    vimap[vdate] = idate
                    vvmap[vdate] = value

254
        ts = subset(pd.Series(vvmap).sort_index(), from_value_date, to_value_date)
255
256
        ts.name = seriename
        return ts
257

258
259
    def exists(self, cn, seriename):
        return self._get_ts_table(cn, seriename) is not None
260

261
    def latest_insertion_date(self, cn, seriename):
262
        cset = self.schema.changeset
263
        tstable = self._get_ts_table(cn, seriename)
264
        sql = select([func.max(cset.c.insertion_date)]
265
        ).where(tstable.c.cset == cset.c.id)
266
        return cn.execute(sql).scalar()
267

268
269
270
271
    def last_id(self, cn, seriename):
        snapshot = Snapshot(cn, self, seriename)
        return snapshot.last_id()

272
273
274
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
        assert mode in ('strict', 'before', 'after')
        cset = self.schema.changeset
275
        table = self._table_definition_for(cn, seriename)
276
277
        sql = select([table.c.cset]).where(
            table.c.cset == cset.c.id
278
279
280
281
282
283
284
285
286
        )
        if mode == 'strict':
            sql = sql.where(cset.c.insertion_date == revdate)
        elif mode == 'before':
            sql = sql.where(cset.c.insertion_date <= revdate)
        else:
            sql = sql.where(cset.c.insertion_date >= revdate)
        return cn.execute(sql).scalar()

287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
    def delete(self, cn, seriename):
        assert not isinstance(cn, Engine), 'use a transaction object'
        assert self.exists(cn, seriename)
        # changeset will keep ghost entries
        # we cleanup changeset series, then registry
        # then we drop the two remaining tables
        # cn *must* be a transaction scope
        rid, tablename = cn.execute(
            'select id, table_name from "{}".registry '
            'where seriename = %(seriename)s'.format(self.namespace),
            seriename=seriename
        ).fetchone()
        # drop series tables
        cn.execute(
            'drop table "{}.timeserie"."{}" cascade'.format(self.namespace, tablename)
        )
        cn.execute(
            'drop table "{}.snapshot"."{}" cascade'.format(self.namespace, tablename)
        )
        # cleanup changesets table
        cn.execute('with csets as ('
                   ' select cset from "{ns}".changeset_series '
                   ' where serie = %(rid)s'
                   ') '
                   'delete from "{ns}".changeset as cset using csets '
                   'where cset.id = csets.cset'.format(ns=self.namespace),
                   rid=rid
        )
        cn.execute('delete from "{}".registry '
                   'where id = %(rid)s'.format(self.namespace),
                   rid=rid)
        # -> this will transitively cleanup state changeset_series entries
        self._resetcaches()

321
322
323
324
325
326
327
328
329
    def strip(self, cn, seriename, csid):
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs

        # put stripping info in the metadata
        cset = self.schema.changeset
        cset_serie = self.schema.changeset_series
        for log in logs:
            # update changeset.metadata
330
            metadata = self.changeset_metadata(cn, log['rev']) or {}
331
332
333
334
335
336
            metadata['tshistory.info'] = 'got stripped from {}'.format(csid)
            sql = cset.update().where(cset.c.id == log['rev']
            ).values(metadata=metadata)
            cn.execute(sql)
            # delete changset_serie item
            sql = cset_serie.delete().where(
337
                cset_serie.c.cset == log['rev']
338
            ).where(
339
                cset_serie.c.serie == self._name_to_regid(cn, seriename)
340
341
342
343
            )
            cn.execute(sql)

        # wipe the diffs
344
        table = self._table_definition_for(cn, seriename)
345
        cn.execute(table.delete().where(table.c.cset >= csid))
346

347
    def info(self, cn):
348
349
        """Gather global statistics on the current tshistory repository
        """
350
        sql = 'select count(*) from "{}".registry'.format(self.namespace)
351
        stats = {'series count': cn.execute(sql).scalar()}
352
        sql = 'select max(id) from "{}".changeset'.format(self.namespace)
353
        stats['changeset count'] = cn.execute(sql).scalar()
354
355
356
        sql = 'select distinct seriename from "{}".registry order by seriename'.format(
            self.namespace
        )
357
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
358
359
        return stats

360
    def log(self, cn, limit=0, names=None, authors=None,
361
            stripped=False,
362
363
            fromrev=None, torev=None,
            fromdate=None, todate=None):
364
365
366
367
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
368
369
370
371
372
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
373

374
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date, cset.c.metadata]
375
        ).distinct().order_by(desc(cset.c.id))
376
377
378

        if limit:
            sql = sql.limit(limit)
379
        if names:
380
            sql = sql.where(reg.c.seriename.in_(names))
381
382
        if authors:
            sql = sql.where(cset.c.author.in_(authors))
383
384
385
386
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)
        if torev:
            sql = sql.where(cset.c.id <= torev)
387
388
389
390
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)
        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)
391
392
393
394
        if stripped:
            # outerjoin to show dead things
            sql = sql.select_from(cset.outerjoin(cset_series))
        else:
395
            sql = sql.where(cset.c.id == cset_series.c.cset
396
            ).where(cset_series.c.serie == reg.c.id)
397

398
        rset = cn.execute(sql)
399
        for csetid, author, revdate, meta in rset.fetchall():
400
401
            log.append({'rev': csetid, 'author': author,
                        'date': pd.Timestamp(revdate, tz='utc'),
402
                        'meta': meta or {},
403
                        'names': self._changeset_series(cn, csetid)})
404

405
        log.sort(key=lambda rev: rev['rev'])
406
407
        return log

408
    def interval(self, cn, seriename, notz=False):
409
410
411
412
413
414
415
416
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            raise ValueError(f'no such serie: {seriename}')
        sql = (f'select start, "end" '
               f'from "{self.namespace}.timeserie"."{tablename}" '
               f'order by cset desc limit 1')
        res = cn.execute(sql).fetchone()
        start, end = res.start, res.end
417
        if self.metadata(cn, seriename).get('tzaware') and not notz:
418
419
420
            start, end = pd.Timestamp(start, tz='UTC'), pd.Timestamp(end, tz='UTC')
        return pd.Interval(left=start, right=end, closed='both')

421
422
    # /API
    # Helpers
423

424
425
    # creation / update

426
    def _create(self, cn, newts, seriename, author,
427
                metadata=None, insertion_date=None):
428
        start, end = start_end(newts, notz=False)
429
430
431
432
        if start is None:
            assert end is None
            # this is just full of nans
            return None
433
434
435
436
437
        # chop off unwanted nans
        newts = newts.loc[start:end]
        if len(newts) == 0:
            return None

438
439
440
441
442
        # at creation time we take an exclusive lock to avoid
        # a deadlock on created tables against the changeset-series fk
        cn.execute(
            'select pg_advisory_xact_lock({})'.format(self.create_lock_id)
        )
443
        self._register_serie(cn, seriename, newts)
444
        snapshot = Snapshot(cn, self, seriename)
445
        csid = self._newchangeset(cn, author, insertion_date, metadata)
446
        head = snapshot.create(newts)
447
        start, end = start_end(newts)
448
449
        value = {
            'cset': csid,
450
451
452
            'snapshot': head,
            'start': start,
            'end': end
453
        }
Aurélien Campéas's avatar
Aurélien Campéas committed
454
        table = self._make_ts_table(cn, seriename)
455
        cn.execute(table.insert().values(value))
456
        self._finalize_insertion(cn, csid, seriename)
457
        L.info('first insertion of %s (size=%s) by %s',
458
               seriename, len(newts), author)
459
460
        return newts

461
    def _update(self, cn, table, newts, seriename, author,
462
                metadata=None, insertion_date=None):
463
464
        self._validate(cn, newts, seriename)
        snapshot = Snapshot(cn, self, seriename)
465
466
467
        diff = self.diff(snapshot.last(newts.index.min(),
                                       newts.index.max()),
                         newts)
468
469
        if not len(diff):
            L.info('no difference in %s by %s (for ts of size %s)',
470
                   seriename, author, len(newts))
471
472
            return

473
        # compute series start/end stamps
474
        tsstart, tsend = start_end(newts)
475
476
477
        ival = self.interval(cn, seriename, notz=True)
        start = min(tsstart or ival.left, ival.left)
        end = max(tsend or ival.right, ival.right)
478
479
480
481
482
483
484
485
486
487
488
489
490
491

        if pd.isnull(diff[0]) or pd.isnull(diff[-1]):
            # we *might* be shrinking, let's look at the full series
            # and yes, shrinkers have a slow path
            last = snapshot.last()
            patched = self.patch(last, diff).dropna()
            if not len(patched):
                raise ValueError('complete erasure of a series is forbidden')
            if pd.isnull(diff[0]):
                start = patched.index[0]
            if pd.isnull(diff[-1]):
                end = patched.index[-1]

        csid = self._newchangeset(cn, author, insertion_date, metadata)
492
493
494
        head = snapshot.update(diff)
        value = {
            'cset': csid,
495
496
497
            'snapshot': head,
            'start': start,
            'end': end
498
499
        }
        cn.execute(table.insert().values(value))
500
        self._finalize_insertion(cn, csid, seriename)
501
502

        L.info('inserted diff (size=%s) for ts %s by %s',
503
               len(diff), seriename, author)
504
505
        return diff

506
    # serie table handling
507

508
509
510
511
    def _serie_to_tablename(self, cn, seriename):
        tablename = self.serie_tablename.get(seriename)
        if tablename is not None:
            return tablename
512

513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
        reg = self.schema.registry
        sql = select([reg.c.table_name]).where(reg.c.seriename == seriename)
        tablename = cn.execute(sql).scalar()
        if tablename is None:
            # creation time
            return
        self.serie_tablename[seriename] = tablename
        return tablename

    def _table_definition_for(self, cn, seriename):
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            # creation time
            tablename = self._make_tablename(seriename)
        fq_tablename = '{}.timeserie.{}'.format(self.namespace, tablename)
        table = TABLES.get(fq_tablename)
529
        if table is None:
530
531
            TABLES[fq_tablename] = table = Table(
                tablename, self.schema.meta,
532
533
534
                Column('id', Integer, primary_key=True),
                Column('cset', Integer,
                       ForeignKey('{}.changeset.id'.format(self.namespace)),
535
                       nullable=False),
536
537
                Column('start', TIMESTAMP, nullable=False),
                Column('end', TIMESTAMP, nullable=False),
538
539
540
                Column('snapshot', Integer,
                       ForeignKey('{}.snapshot.{}.id'.format(
                           self.namespace,
541
542
543
                           tablename))),
                Index(NONE_NAME, 'cset'),
                Index(NONE_NAME, 'snapshot'),
544
545
                Index(NONE_NAME, 'start'),
                Index(NONE_NAME, 'end'),
546
                schema='{}.timeserie'.format(self.namespace),
547
                keep_existing=True
548
549
            )
        return table
550

Aurélien Campéas's avatar
Aurélien Campéas committed
551
    def _make_ts_table(self, cn, seriename):
552
        table = self._table_definition_for(cn, seriename)
553
        table.create(cn)
554
555
556
        return table

    def _register_serie(self, cn, seriename, ts):
557
558
        index = ts.index
        inames = [name for name in index.names if name]
559
        sql = self.schema.registry.insert().values(
560
561
            seriename=seriename,
            table_name=self._make_tablename(seriename),
562
563
564
565
            metadata={
                'tzaware': tzaware_serie(ts),
                'index_type': index.dtype.name,
                'index_names': inames,
566
567
                'index_dtype': index.dtype.str,
                'value_dtype': ts.dtypes.str,
568
                'value_type': ts.dtypes.name
569
            }
570
        )
571
572
        regid = cn.execute(sql).inserted_primary_key[0]
        self.registry_map[seriename] = regid
573

574
    def _get_ts_table(self, cn, seriename):
575
576
577
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename:
            return self._table_definition_for(cn, tablename)
578

579
580
    # changeset handling

581
    def _newchangeset(self, cn, author, insertion_date=None, metadata=None):
582
        table = self.schema.changeset
583
584
585
        if insertion_date is not None:
            assert insertion_date.tzinfo is not None
        idate = pd.Timestamp(insertion_date or datetime.utcnow(), tz='UTC')
586
587
        sql = table.insert().values(
            author=author,
588
            metadata=metadata,
589
            insertion_date=idate)
590
        return cn.execute(sql).inserted_primary_key[0]
591

592
    def _changeset_series(self, cn, csid):
593
        cset_serie = self.schema.changeset_series
594
595
        reg = self.schema.registry
        sql = select(
596
            [reg.c.seriename]
597
598
        ).where(cset_serie.c.cset == csid
        ).where(cset_serie.c.serie == reg.c.id)
599

600
        return [
601
            row.seriename
602
603
            for row in cn.execute(sql).fetchall()
        ]
604

605
606
607
608
609
610
611
    def _previous_cset(self, cn, seriename, csid):
        tablename = self._serie_to_tablename(cn, seriename)
        sql = ('select cset from "{}.timeserie"."{}" '
               'where cset < %(csid)s '
               'order by cset desc limit 1').format(self.namespace, tablename)
        return cn.execute(sql, csid=csid).scalar()

612
613
    # insertion handling

614
    def _validate(self, cn, ts, seriename):
615
616
        if ts.isnull().all():
            # ts erasure
617
            return
618
        tstype = ts.dtype
619
        meta = self.metadata(cn, seriename)
620
        if tstype != meta['value_type']:
621
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
622
                seriename, tstype, meta['value_type'])
623
            raise Exception(m)
624
        if ts.index.dtype.name != meta['index_type']:
625
            raise Exception('Incompatible index types')
626

627
628
    def _name_to_regid(self, cn, seriename):
        regid = self.registry_map.get(seriename)
629
630
631
632
        if regid is not None:
            return regid

        registry = self.schema.registry
633
        sql = select([registry.c.id]).where(registry.c.seriename == seriename)
634
        regid = self.registry_map[seriename] = cn.execute(sql).scalar()
635
636
        return regid

637
    def _finalize_insertion(self, cn, csid, seriename):
638
639
640
        table = self.schema.changeset_series
        sql = table.insert().values(
            cset=csid,
641
            serie=self._name_to_regid(cn, seriename)
642
643
        )
        cn.execute(sql)
644

645
646
647
    def _resetcaches(self):
        TABLES.clear()
        SNAPTABLES.clear()
648
649
650
        self.metadatacache.clear()
        self.registry_map.clear()
        self.serie_tablename.clear()