tsio.py 23.8 KB
Newer Older
1
from datetime import datetime
2
import logging
3
4
5

import pandas as pd

6
7
from sqlalchemy import Table, Column, Integer, ForeignKey, Index
from sqlalchemy.sql.elements import NONE_NAME
8
from sqlalchemy.engine.base import Engine
Aurélien Campéas's avatar
Aurélien Campéas committed
9
from sqlalchemy.sql.expression import select, func, desc
Aurélien Campéas's avatar
Aurélien Campéas committed
10
from sqlalchemy.dialects.postgresql import TIMESTAMP
11

12
from tshistory.schema import tsschema
13
from tshistory.util import (
14
    closed_overlaps,
15
    num2float,
16
    subset,
17
    SeriesServices,
18
    start_end,
19
20
    tzaware_serie
)
21
from tshistory.snapshot import Snapshot, TABLES as SNAPTABLES
22

23
L = logging.getLogger('tshistory.tsio')
24
TABLES = {}
25
26


27
class TimeSerie(SeriesServices):
28
    namespace = 'tsh'
29
    schema = None
30
    metadatacache = None
31
    registry_map = None
32
    serie_tablename = None
33
    create_lock_id = None
34
35
36

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
37
38
        self.schema = tsschema(namespace)
        self.schema.define()
39
        self.metadatacache = {}
40
        self.registry_map = {}
41
        self.serie_tablename = {}
42
        self.create_lock_id = sum(ord(c) for c in namespace)
43

44
    def insert(self, cn, newts, seriename, author,
45
46
               metadata=None,
               _insertion_date=None):
47
        """Create a new revision of a given time series
48

49
        newts: pandas.Series with date index
50
        seriename: str unique identifier of the serie
51
        author: str free-form author name
52
        metadata: optional dict for changeset metadata
53
        """
54
55
56
57
        assert isinstance(newts, pd.Series), 'Not a pd.Series'
        assert isinstance(seriename, str), 'Name not a string'
        assert isinstance(author, str), 'Author not a string'
        assert metadata is None or isinstance(metadata, dict), 'Bad format for metadata'
58
59
        assert (_insertion_date is None or
                isinstance(_insertion_date, datetime)), 'Bad format for insertion date'
60
        assert not newts.index.duplicated().any(), 'There are some duplicates in the index'
61

62
        assert newts.index.notna().all(), 'The index contains NaT entries'
63
64
        if not newts.index.is_monotonic_increasing:
            newts = newts.sort_index()
65

66
        newts = num2float(newts)
67

68
        if not len(newts):
69
            return
70

71
        assert ('<M8[ns]' == newts.index.dtype or
72
                'datetime' in str(newts.index.dtype) and not
73
74
                isinstance(newts.index, pd.MultiIndex))

75
76
        newts.name = seriename
        table = self._get_ts_table(cn, seriename)
77

78
        if table is None:
79
            return self._create(cn, newts, seriename, author,
80
                                metadata, _insertion_date)
81

82
        return self._update(cn, table, newts, seriename, author,
83
                            metadata, _insertion_date)
84

85
    def get(self, cn, seriename, revision_date=None,
86
87
            from_value_date=None, to_value_date=None,
            _keep_nans=False):
88
89
90
91
92
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

93
        """
94
        table = self._get_ts_table(cn, seriename)
95
96
        if table is None:
            return
97

98
        csetfilter = []
99
        if revision_date:
100
            csetfilter.append(lambda cset: cset.c.insertion_date <= revision_date)
101
        snap = Snapshot(cn, self, seriename)
102
        _, current = snap.find(csetfilter=csetfilter,
103
104
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)
105

106
        if current is not None and not _keep_nans:
107
            current.name = seriename
108
            current = current[~current.isnull()]
109
        return current
110

111
    def metadata(self, cn, seriename):
112
        """Return metadata dict of timeserie."""
113
114
        if seriename in self.metadatacache:
            return self.metadatacache[seriename]
115
116
        reg = self.schema.registry
        sql = select([reg.c.metadata]).where(
117
            reg.c.seriename == seriename
118
119
        )
        meta = cn.execute(sql).scalar()
120
        self.metadatacache[seriename] = meta
121
122
        return meta

123
    def update_metadata(self, cn, seriename, metadata, internal=False):
124
        assert isinstance(metadata, dict)
125
        meta = self.metadata(cn, seriename)
126
127
128
129
130
        if not internal:
            assert set(meta.keys()).intersection(metadata.keys()) == set()
        meta.update(metadata)
        reg = self.schema.registry
        sql = reg.update().where(
131
            reg.c.seriename == seriename
132
133
        ).values(metadata=meta)
        self.metadatacache.pop(seriename)
134
135
        cn.execute(sql)

136
137
138
139
140
141
142
    def changeset_metadata(self, cn, csid):
        sql = 'select metadata from "{ns}".changeset where id = {id}'.format(
            ns=self.namespace,
            id=csid
        )
        return cn.execute(sql).scalar()

143
    def get_history(self, cn, seriename,
144
                    from_insertion_date=None,
145
146
                    to_insertion_date=None,
                    from_value_date=None,
147
                    to_value_date=None,
148
                    deltabefore=None,
149
150
                    deltaafter=None,
                    diffmode=False):
151
        table = self._get_ts_table(cn, seriename)
152
153
154
        if table is None:
            return

155
        cset = self.schema.changeset
156
157
158
159
160
161
162
163
164
165
166
167
        revsql = select(
            [cset.c.id, cset.c.insertion_date]
        ).order_by(
            cset.c.id
        ).where(
            table.c.cset == cset.c.id
        )

        if from_insertion_date:
            revsql = revsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            revsql = revsql.where(cset.c.insertion_date <= to_insertion_date)
168

169
170
171
172
173
174
175
176
177
        if from_value_date or to_value_date:
            revsql = revsql.where(
                closed_overlaps(from_value_date, to_value_date)
            )

        revs = cn.execute(
            revsql,
            {'fromdate': from_value_date, 'todate': to_value_date}
        ).fetchall()
178
        if not revs:
179
            return {}
180

181
182
183
184
185
186
        if diffmode:
            # compute the previous serie value
            first_csid = revs[0][0]
            previous_csid = self._previous_cset(cn, seriename, first_csid)
            revs.insert(0, (previous_csid, None))

187
        snapshot = Snapshot(cn, self, seriename)
188
        series = []
189
190
        if (deltabefore, deltaafter) != (None, None):
            for csid, idate in revs:
191
192
                from_date = None
                to_date = None
193
                if deltabefore is not None:
194
                    from_date = idate - deltabefore
195
                if deltaafter is not None:
196
                    to_date = idate + deltaafter
197
198
199
                series.append((
                    idate,
                    snapshot.find(csetfilter=[lambda cset: cset.c.id == csid],
200
201
                                  from_value_date=from_date,
                                  to_value_date=to_date)[1]
202
203
204
205
206
207
                ))
        else:
            series = snapshot.findall(revs,
                                      from_value_date,
                                      to_value_date
            )
208

209
210
        if diffmode:
            diffs = []
Aurélien Campéas's avatar
Aurélien Campéas committed
211
            for (_revdate_a, serie_a), (revdate_b, serie_b) in zip(series, series[1:]):
212
213
214
215
216
217
218
219
                if serie_a is None:
                    # when we scan the entirety of the history: there exists no "previous" serie
                    # we therefore consider the first serie as a diff to the "null" serie
                    diffs.append((revdate_b, serie_b))
                else:
                    diffs.append((revdate_b, self.diff(serie_a, serie_b)))
            series = diffs

220
221
222
223
        return {
            idate: serie
            for idate, serie in series
        }
224

225
226
227
    def get_delta(self, cn, seriename, delta,
                  from_value_date=None,
                  to_value_date=None):
228
229
230
231
        """ compute a series whose value dates are bounded to be
        `delta` time after the insertion dates and where we
        keep the most recent ones
        """
232

233
        histo = self.get_history(
234
235
236
            cn, seriename, deltabefore=-delta,
            from_value_date=from_value_date,
            to_value_date=to_value_date
237
        )
238
239
        if histo is None:
            return None
240

241
242
243
244
245
246
247
248
        vimap = {}
        vvmap = {}
        for idate, series in histo.items():
            for vdate, value in series.iteritems():
                if vdate not in vimap or vimap[vdate] < idate:
                    vimap[vdate] = idate
                    vvmap[vdate] = value

249
        ts = subset(pd.Series(vvmap).sort_index(), from_value_date, to_value_date)
250
251
        ts.name = seriename
        return ts
252

253
254
    def exists(self, cn, seriename):
        return self._get_ts_table(cn, seriename) is not None
255

256
    def latest_insertion_date(self, cn, seriename):
257
        cset = self.schema.changeset
258
        tstable = self._get_ts_table(cn, seriename)
259
        sql = select([func.max(cset.c.insertion_date)]
260
        ).where(tstable.c.cset == cset.c.id)
261
        return cn.execute(sql).scalar()
262

263
264
265
266
    def last_id(self, cn, seriename):
        snapshot = Snapshot(cn, self, seriename)
        return snapshot.last_id()

267
268
269
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
        assert mode in ('strict', 'before', 'after')
        cset = self.schema.changeset
270
        table = self._table_definition_for(cn, seriename)
271
272
        sql = select([table.c.cset]).where(
            table.c.cset == cset.c.id
273
274
275
276
277
278
279
280
281
        )
        if mode == 'strict':
            sql = sql.where(cset.c.insertion_date == revdate)
        elif mode == 'before':
            sql = sql.where(cset.c.insertion_date <= revdate)
        else:
            sql = sql.where(cset.c.insertion_date >= revdate)
        return cn.execute(sql).scalar()

282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
    def delete(self, cn, seriename):
        assert not isinstance(cn, Engine), 'use a transaction object'
        assert self.exists(cn, seriename)
        # changeset will keep ghost entries
        # we cleanup changeset series, then registry
        # then we drop the two remaining tables
        # cn *must* be a transaction scope
        rid, tablename = cn.execute(
            'select id, table_name from "{}".registry '
            'where seriename = %(seriename)s'.format(self.namespace),
            seriename=seriename
        ).fetchone()
        # drop series tables
        cn.execute(
            'drop table "{}.timeserie"."{}" cascade'.format(self.namespace, tablename)
        )
        cn.execute(
            'drop table "{}.snapshot"."{}" cascade'.format(self.namespace, tablename)
        )
        # cleanup changesets table
        cn.execute('with csets as ('
                   ' select cset from "{ns}".changeset_series '
                   ' where serie = %(rid)s'
                   ') '
                   'delete from "{ns}".changeset as cset using csets '
                   'where cset.id = csets.cset'.format(ns=self.namespace),
                   rid=rid
        )
        cn.execute('delete from "{}".registry '
                   'where id = %(rid)s'.format(self.namespace),
                   rid=rid)
        # -> this will transitively cleanup state changeset_series entries
        self._resetcaches()

316
317
318
319
320
321
322
323
324
    def strip(self, cn, seriename, csid):
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs

        # put stripping info in the metadata
        cset = self.schema.changeset
        cset_serie = self.schema.changeset_series
        for log in logs:
            # update changeset.metadata
325
            metadata = self.changeset_metadata(cn, log['rev']) or {}
326
327
328
329
330
331
            metadata['tshistory.info'] = 'got stripped from {}'.format(csid)
            sql = cset.update().where(cset.c.id == log['rev']
            ).values(metadata=metadata)
            cn.execute(sql)
            # delete changset_serie item
            sql = cset_serie.delete().where(
332
                cset_serie.c.cset == log['rev']
333
            ).where(
334
                cset_serie.c.serie == self._name_to_regid(cn, seriename)
335
336
337
338
            )
            cn.execute(sql)

        # wipe the diffs
339
        table = self._table_definition_for(cn, seriename)
340
        cn.execute(table.delete().where(table.c.cset >= csid))
341

342
    def info(self, cn):
343
344
        """Gather global statistics on the current tshistory repository
        """
345
        sql = 'select count(*) from "{}".registry'.format(self.namespace)
346
        stats = {'series count': cn.execute(sql).scalar()}
347
        sql = 'select max(id) from "{}".changeset'.format(self.namespace)
348
        stats['changeset count'] = cn.execute(sql).scalar()
349
350
351
        sql = 'select distinct seriename from "{}".registry order by seriename'.format(
            self.namespace
        )
352
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
353
354
        return stats

355
    def log(self, cn, limit=0, names=None, authors=None,
356
            stripped=False,
357
358
            fromrev=None, torev=None,
            fromdate=None, todate=None):
359
360
361
362
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
363
364
365
366
367
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
368

369
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date, cset.c.metadata]
370
        ).distinct().order_by(desc(cset.c.id))
371
372
373

        if limit:
            sql = sql.limit(limit)
374
        if names:
375
            sql = sql.where(reg.c.seriename.in_(names))
376
377
        if authors:
            sql = sql.where(cset.c.author.in_(authors))
378
379
380
381
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)
        if torev:
            sql = sql.where(cset.c.id <= torev)
382
383
384
385
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)
        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)
386
387
388
389
        if stripped:
            # outerjoin to show dead things
            sql = sql.select_from(cset.outerjoin(cset_series))
        else:
390
            sql = sql.where(cset.c.id == cset_series.c.cset
391
            ).where(cset_series.c.serie == reg.c.id)
392

393
        rset = cn.execute(sql)
394
        for csetid, author, revdate, meta in rset.fetchall():
395
396
            log.append({'rev': csetid, 'author': author,
                        'date': pd.Timestamp(revdate, tz='utc'),
397
                        'meta': meta or {},
398
                        'names': self._changeset_series(cn, csetid)})
399

400
        log.sort(key=lambda rev: rev['rev'])
401
402
        return log

403
    def interval(self, cn, seriename, notz=False):
404
405
406
407
408
409
410
411
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            raise ValueError(f'no such serie: {seriename}')
        sql = (f'select start, "end" '
               f'from "{self.namespace}.timeserie"."{tablename}" '
               f'order by cset desc limit 1')
        res = cn.execute(sql).fetchone()
        start, end = res.start, res.end
412
        if self.metadata(cn, seriename).get('tzaware') and not notz:
413
414
415
            start, end = pd.Timestamp(start, tz='UTC'), pd.Timestamp(end, tz='UTC')
        return pd.Interval(left=start, right=end, closed='both')

416
417
    # /API
    # Helpers
418

419
420
    # creation / update

421
    def _create(self, cn, newts, seriename, author,
422
                metadata=None, insertion_date=None):
423
        start, end = start_end(newts, notz=False)
424
425
426
427
        if start is None:
            assert end is None
            # this is just full of nans
            return None
428
429
430
431
432
        # chop off unwanted nans
        newts = newts.loc[start:end]
        if len(newts) == 0:
            return None

433
434
435
436
437
        # at creation time we take an exclusive lock to avoid
        # a deadlock on created tables against the changeset-series fk
        cn.execute(
            'select pg_advisory_xact_lock({})'.format(self.create_lock_id)
        )
438
        self._register_serie(cn, seriename, newts)
439
        snapshot = Snapshot(cn, self, seriename)
440
        csid = self._newchangeset(cn, author, insertion_date, metadata)
441
        head = snapshot.create(newts)
442
        start, end = start_end(newts)
443
444
        value = {
            'cset': csid,
445
446
447
            'snapshot': head,
            'start': start,
            'end': end
448
        }
Aurélien Campéas's avatar
Aurélien Campéas committed
449
        table = self._make_ts_table(cn, seriename)
450
        cn.execute(table.insert().values(value))
451
        self._finalize_insertion(cn, csid, seriename)
452
        L.info('first insertion of %s (size=%s) by %s',
453
               seriename, len(newts), author)
454
455
        return newts

456
    def _update(self, cn, table, newts, seriename, author,
457
                metadata=None, insertion_date=None):
458
459
        self._validate(cn, newts, seriename)
        snapshot = Snapshot(cn, self, seriename)
460
461
462
        diff = self.diff(snapshot.last(newts.index.min(),
                                       newts.index.max()),
                         newts)
463
464
        if not len(diff):
            L.info('no difference in %s by %s (for ts of size %s)',
465
                   seriename, author, len(newts))
466
467
            return

468
        # compute series start/end stamps
469
        tsstart, tsend = start_end(newts)
470
471
472
        ival = self.interval(cn, seriename, notz=True)
        start = min(tsstart or ival.left, ival.left)
        end = max(tsend or ival.right, ival.right)
473
474
475
476
477
478
479
480
481
482
483
484
485
486

        if pd.isnull(diff[0]) or pd.isnull(diff[-1]):
            # we *might* be shrinking, let's look at the full series
            # and yes, shrinkers have a slow path
            last = snapshot.last()
            patched = self.patch(last, diff).dropna()
            if not len(patched):
                raise ValueError('complete erasure of a series is forbidden')
            if pd.isnull(diff[0]):
                start = patched.index[0]
            if pd.isnull(diff[-1]):
                end = patched.index[-1]

        csid = self._newchangeset(cn, author, insertion_date, metadata)
487
488
489
        head = snapshot.update(diff)
        value = {
            'cset': csid,
490
491
492
            'snapshot': head,
            'start': start,
            'end': end
493
494
        }
        cn.execute(table.insert().values(value))
495
        self._finalize_insertion(cn, csid, seriename)
496
497

        L.info('inserted diff (size=%s) for ts %s by %s',
498
               len(diff), seriename, author)
499
500
        return diff

501
    # serie table handling
502

503
504
505
506
    def _serie_to_tablename(self, cn, seriename):
        tablename = self.serie_tablename.get(seriename)
        if tablename is not None:
            return tablename
507

508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
        reg = self.schema.registry
        sql = select([reg.c.table_name]).where(reg.c.seriename == seriename)
        tablename = cn.execute(sql).scalar()
        if tablename is None:
            # creation time
            return
        self.serie_tablename[seriename] = tablename
        return tablename

    def _table_definition_for(self, cn, seriename):
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            # creation time
            tablename = self._make_tablename(seriename)
        fq_tablename = '{}.timeserie.{}'.format(self.namespace, tablename)
        table = TABLES.get(fq_tablename)
524
        if table is None:
525
526
            TABLES[fq_tablename] = table = Table(
                tablename, self.schema.meta,
527
528
529
                Column('id', Integer, primary_key=True),
                Column('cset', Integer,
                       ForeignKey('{}.changeset.id'.format(self.namespace)),
530
                       nullable=False),
531
532
                Column('start', TIMESTAMP, nullable=False),
                Column('end', TIMESTAMP, nullable=False),
533
534
535
                Column('snapshot', Integer,
                       ForeignKey('{}.snapshot.{}.id'.format(
                           self.namespace,
536
537
538
                           tablename))),
                Index(NONE_NAME, 'cset'),
                Index(NONE_NAME, 'snapshot'),
539
540
                Index(NONE_NAME, 'start'),
                Index(NONE_NAME, 'end'),
541
                schema='{}.timeserie'.format(self.namespace),
542
                keep_existing=True
543
544
            )
        return table
545

Aurélien Campéas's avatar
Aurélien Campéas committed
546
    def _make_ts_table(self, cn, seriename):
547
        table = self._table_definition_for(cn, seriename)
548
        table.create(cn)
549
550
551
        return table

    def _register_serie(self, cn, seriename, ts):
552
553
        index = ts.index
        inames = [name for name in index.names if name]
554
        sql = self.schema.registry.insert().values(
555
556
            seriename=seriename,
            table_name=self._make_tablename(seriename),
557
558
559
560
            metadata={
                'tzaware': tzaware_serie(ts),
                'index_type': index.dtype.name,
                'index_names': inames,
561
562
                'index_dtype': index.dtype.str,
                'value_dtype': ts.dtypes.str,
563
                'value_type': ts.dtypes.name
564
            }
565
        )
566
567
        regid = cn.execute(sql).inserted_primary_key[0]
        self.registry_map[seriename] = regid
568

569
    def _get_ts_table(self, cn, seriename):
570
571
572
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename:
            return self._table_definition_for(cn, tablename)
573

574
575
    # changeset handling

576
    def _newchangeset(self, cn, author, insertion_date=None, metadata=None):
577
        table = self.schema.changeset
578
579
580
        if insertion_date is not None:
            assert insertion_date.tzinfo is not None
        idate = pd.Timestamp(insertion_date or datetime.utcnow(), tz='UTC')
581
582
        sql = table.insert().values(
            author=author,
583
            metadata=metadata,
584
            insertion_date=idate)
585
        return cn.execute(sql).inserted_primary_key[0]
586

587
    def _changeset_series(self, cn, csid):
588
        cset_serie = self.schema.changeset_series
589
590
        reg = self.schema.registry
        sql = select(
591
            [reg.c.seriename]
592
593
        ).where(cset_serie.c.cset == csid
        ).where(cset_serie.c.serie == reg.c.id)
594

595
        return [
596
            row.seriename
597
598
            for row in cn.execute(sql).fetchall()
        ]
599

600
601
602
603
604
605
606
    def _previous_cset(self, cn, seriename, csid):
        tablename = self._serie_to_tablename(cn, seriename)
        sql = ('select cset from "{}.timeserie"."{}" '
               'where cset < %(csid)s '
               'order by cset desc limit 1').format(self.namespace, tablename)
        return cn.execute(sql, csid=csid).scalar()

607
608
    # insertion handling

609
    def _validate(self, cn, ts, seriename):
610
611
        if ts.isnull().all():
            # ts erasure
612
            return
613
        tstype = ts.dtype
614
        meta = self.metadata(cn, seriename)
615
        if tstype != meta['value_type']:
616
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
617
                seriename, tstype, meta['value_type'])
618
            raise Exception(m)
619
        if ts.index.dtype.name != meta['index_type']:
620
            raise Exception('Incompatible index types')
621

622
623
    def _name_to_regid(self, cn, seriename):
        regid = self.registry_map.get(seriename)
624
625
626
627
        if regid is not None:
            return regid

        registry = self.schema.registry
628
        sql = select([registry.c.id]).where(registry.c.seriename == seriename)
629
        regid = self.registry_map[seriename] = cn.execute(sql).scalar()
630
631
        return regid

632
    def _finalize_insertion(self, cn, csid, seriename):
633
634
635
        table = self.schema.changeset_series
        sql = table.insert().values(
            cset=csid,
636
            serie=self._name_to_regid(cn, seriename)
637
638
        )
        cn.execute(sql)
639

640
641
642
    def _resetcaches(self):
        TABLES.clear()
        SNAPTABLES.clear()
643
644
645
        self.metadatacache.clear()
        self.registry_map.clear()
        self.serie_tablename.clear()