tsio.py 23.1 KB
Newer Older
1
from datetime import datetime
2
from contextlib import contextmanager
3
import logging
4
import hashlib
5
6
7

import pandas as pd

8
9
from sqlalchemy import Table, Column, Integer, ForeignKey, Index
from sqlalchemy.sql.elements import NONE_NAME
10
from sqlalchemy.engine.base import Engine
Aurélien Campéas's avatar
Aurélien Campéas committed
11
from sqlalchemy.sql.expression import select, func, desc
12
from sqlalchemy.dialects.postgresql import BYTEA, TIMESTAMP
13

14
from tshistory.schema import tsschema
15
from tshistory.util import (
16
    closed_overlaps,
17
    inject_in_index,
18
    num2float,
19
    subset,
20
    SeriesServices,
21
    start_end,
22
23
    tzaware_serie
)
24
from tshistory.snapshot import Snapshot, TABLES as SNAPTABLES
25

26
L = logging.getLogger('tshistory.tsio')
27
TABLES = {}
28
29


30
class TimeSerie(SeriesServices):
31
    namespace = 'tsh'
32
    schema = None
33
    metadatacache = None
34
    registry_map = None
35
    serie_tablename = None
36
37
38

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
39
40
        self.schema = tsschema(namespace)
        self.schema.define()
41
        self.metadatacache = {}
42
        self.registry_map = {}
43
        self.serie_tablename = {}
44

45
    def insert(self, cn, newts, seriename, author,
46
47
               metadata=None,
               _insertion_date=None):
48
        """Create a new revision of a given time series
49

50
        newts: pandas.Series with date index
51
        seriename: str unique identifier of the serie
52
        author: str free-form author name
53
        metadata: optional dict for changeset metadata
54
55
        """
        assert isinstance(newts, pd.Series)
56
        assert isinstance(seriename, str)
Aurélien Campéas's avatar
Aurélien Campéas committed
57
        assert isinstance(author, str)
58
        assert metadata is None or isinstance(metadata, dict)
Aurélien Campéas's avatar
Aurélien Campéas committed
59
        assert _insertion_date is None or isinstance(_insertion_date, datetime)
60
        assert not newts.index.duplicated().any()
61
        assert newts.index.is_monotonic_increasing
62

63
        newts = num2float(newts)
64

65
        if not len(newts):
66
            return
67

68
        assert ('<M8[ns]' == newts.index.dtype or
69
                'datetime' in str(newts.index.dtype) and not
70
71
                isinstance(newts.index, pd.MultiIndex))

72
73
        newts.name = seriename
        table = self._get_ts_table(cn, seriename)
74

75
        if table is None:
76
            return self._create(cn, newts, seriename, author,
77
                                metadata, _insertion_date)
78

79
        return self._update(cn, table, newts, seriename, author,
80
                            metadata, _insertion_date)
81

82
    def get(self, cn, seriename, revision_date=None,
83
84
            from_value_date=None, to_value_date=None,
            _keep_nans=False):
85
86
87
88
89
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

90
        """
91
        table = self._get_ts_table(cn, seriename)
92
93
        if table is None:
            return
94

95
        csetfilter = []
96
        if revision_date:
97
            csetfilter.append(lambda cset: cset.c.insertion_date <= revision_date)
98
        snap = Snapshot(cn, self, seriename)
99
        _, current = snap.find(csetfilter=csetfilter,
100
101
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)
102

103
        if current is not None and not _keep_nans:
104
            current.name = seriename
105
            current = current[~current.isnull()]
106
        return current
107

108
    def metadata(self, cn, seriename):
109
        """Return metadata dict of timeserie."""
110
111
        if seriename in self.metadatacache:
            return self.metadatacache[seriename]
112
113
        reg = self.schema.registry
        sql = select([reg.c.metadata]).where(
114
            reg.c.seriename == seriename
115
116
        )
        meta = cn.execute(sql).scalar()
117
        self.metadatacache[seriename] = meta
118
119
        return meta

120
    def update_metadata(self, cn, seriename, metadata, internal=False):
121
        assert isinstance(metadata, dict)
122
        meta = self.metadata(cn, seriename)
123
124
125
126
127
        if not internal:
            assert set(meta.keys()).intersection(metadata.keys()) == set()
        meta.update(metadata)
        reg = self.schema.registry
        sql = reg.update().where(
128
            reg.c.seriename == seriename
129
130
131
        ).values(metadata=metadata)
        cn.execute(sql)

132
133
134
135
136
137
138
139
    def changeset_metadata(self, cn, csid):
        cset = self.schema.changeset
        sql = 'select metadata from "{ns}".changeset where id = {id}'.format(
            ns=self.namespace,
            id=csid
        )
        return cn.execute(sql).scalar()

140
    def get_history(self, cn, seriename,
141
                    from_insertion_date=None,
142
143
                    to_insertion_date=None,
                    from_value_date=None,
144
                    to_value_date=None,
145
                    deltabefore=None,
146
147
                    deltaafter=None,
                    diffmode=False):
148
        table = self._get_ts_table(cn, seriename)
149
150
151
        if table is None:
            return

152
        cset = self.schema.changeset
153
154
155
156
157
158
159
160
161
162
163
164
        revsql = select(
            [cset.c.id, cset.c.insertion_date]
        ).order_by(
            cset.c.id
        ).where(
            table.c.cset == cset.c.id
        )

        if from_insertion_date:
            revsql = revsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            revsql = revsql.where(cset.c.insertion_date <= to_insertion_date)
165

166
167
168
169
170
171
172
173
174
        if from_value_date or to_value_date:
            revsql = revsql.where(
                closed_overlaps(from_value_date, to_value_date)
            )

        revs = cn.execute(
            revsql,
            {'fromdate': from_value_date, 'todate': to_value_date}
        ).fetchall()
175
176
        if not revs:
            return
177

178
179
180
181
182
183
        if diffmode:
            # compute the previous serie value
            first_csid = revs[0][0]
            previous_csid = self._previous_cset(cn, seriename, first_csid)
            revs.insert(0, (previous_csid, None))

184
        snapshot = Snapshot(cn, self, seriename)
185
        series = []
186
187
        if (deltabefore, deltaafter) != (None, None):
            for csid, idate in revs:
188
189
                from_date = None
                to_date = None
190
                if deltabefore is not None:
191
                    from_date = idate - deltabefore
192
                if deltaafter is not None:
193
                    to_date = idate + deltaafter
194
195
196
                series.append((
                    idate,
                    snapshot.find(csetfilter=[lambda cset: cset.c.id == csid],
197
198
                                  from_value_date=from_date,
                                  to_value_date=to_date)[1]
199
200
201
202
203
204
                ))
        else:
            series = snapshot.findall(revs,
                                      from_value_date,
                                      to_value_date
            )
205

206
207
208
209
210
211
212
213
214
215
216
        if diffmode:
            diffs = []
            for (revdate_a, serie_a), (revdate_b, serie_b) in zip(series, series[1:]):
                if serie_a is None:
                    # when we scan the entirety of the history: there exists no "previous" serie
                    # we therefore consider the first serie as a diff to the "null" serie
                    diffs.append((revdate_b, serie_b))
                else:
                    diffs.append((revdate_b, self.diff(serie_a, serie_b)))
            series = diffs

217
218
219
220
        return {
            idate: serie
            for idate, serie in series
        }
221

222
223
224
225
    def get_delta(self, cn, seriename, delta,
                  from_value_date=None,
                  to_value_date=None):

226
        histo = self.get_history(
227
228
229
            cn, seriename, deltabefore=-delta,
            from_value_date=from_value_date,
            to_value_date=to_value_date
230
        )
231
232
233
        for revdate, serie in histo.items():
            inject_in_index(serie, revdate)
        histo = pd.concat([serie for serie in histo.values()])
234
235

        df = histo.reset_index()
236

237
238
239
240
241
242
243
        # df_date is a dataframe with two columns: value_date and insertion_date
        df_date = df.loc[:, ['insertion_date', 'value_date']]

        # now in selected_dates each value_date has only one occurence
        # which is the last inserted
        selected_dates = df_date.groupby('value_date').max().reset_index()

244
        ts = df[seriename]
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
        # ts is built from the df returned from get_history
        # ts index is now a simple index of tuples (insert_date, value_date)
        ts.index = ((row.insertion_date, row.value_date)
                    for row in df.itertuples())
        # in ts, there ie still all the couple value_date * insertion_date
        # We now used the selected_dates to select in ts only
        # the couple (value_date, insertion_date)
        # which corresponds to the last insertion_date
        ts_select = ts[[(row[2], row[1])
                        for row in selected_dates.itertuples()]]

        # ts_select has still a simple index of tuples (value_date, insertion_date)
        new_index = (elt[1] for elt in ts_select.index)

        # we only keep the value_date information from the index
        ts_select.index = new_index
261
        return subset(ts_select, from_value_date, to_value_date)
262
263


264
265
    def exists(self, cn, seriename):
        return self._get_ts_table(cn, seriename) is not None
266

267
    def latest_insertion_date(self, cn, seriename):
268
        cset = self.schema.changeset
269
        tstable = self._get_ts_table(cn, seriename)
270
        sql = select([func.max(cset.c.insertion_date)]
271
        ).where(tstable.c.cset == cset.c.id)
272
        return cn.execute(sql).scalar()
273

274
275
276
277
    def last_id(self, cn, seriename):
        snapshot = Snapshot(cn, self, seriename)
        return snapshot.last_id()

278
279
280
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
        assert mode in ('strict', 'before', 'after')
        cset = self.schema.changeset
281
        table = self._table_definition_for(cn, seriename)
282
283
        sql = select([table.c.cset]).where(
            table.c.cset == cset.c.id
284
285
286
287
288
289
290
291
292
        )
        if mode == 'strict':
            sql = sql.where(cset.c.insertion_date == revdate)
        elif mode == 'before':
            sql = sql.where(cset.c.insertion_date <= revdate)
        else:
            sql = sql.where(cset.c.insertion_date >= revdate)
        return cn.execute(sql).scalar()

293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
    def delete(self, cn, seriename):
        assert not isinstance(cn, Engine), 'use a transaction object'
        assert self.exists(cn, seriename)
        # changeset will keep ghost entries
        # we cleanup changeset series, then registry
        # then we drop the two remaining tables
        # cn *must* be a transaction scope
        rid, tablename = cn.execute(
            'select id, table_name from "{}".registry '
            'where seriename = %(seriename)s'.format(self.namespace),
            seriename=seriename
        ).fetchone()
        # drop series tables
        cn.execute(
            'drop table "{}.timeserie"."{}" cascade'.format(self.namespace, tablename)
        )
        cn.execute(
            'drop table "{}.snapshot"."{}" cascade'.format(self.namespace, tablename)
        )
        # cleanup changesets table
        cn.execute('with csets as ('
                   ' select cset from "{ns}".changeset_series '
                   ' where serie = %(rid)s'
                   ') '
                   'delete from "{ns}".changeset as cset using csets '
                   'where cset.id = csets.cset'.format(ns=self.namespace),
                   rid=rid
        )
        cn.execute('delete from "{}".registry '
                   'where id = %(rid)s'.format(self.namespace),
                   rid=rid)
        # -> this will transitively cleanup state changeset_series entries
        self._resetcaches()

327
328
329
330
331
332
333
334
335
    def strip(self, cn, seriename, csid):
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs

        # put stripping info in the metadata
        cset = self.schema.changeset
        cset_serie = self.schema.changeset_series
        for log in logs:
            # update changeset.metadata
336
            metadata = self.changeset_metadata(cn, log['rev']) or {}
337
338
339
340
341
342
            metadata['tshistory.info'] = 'got stripped from {}'.format(csid)
            sql = cset.update().where(cset.c.id == log['rev']
            ).values(metadata=metadata)
            cn.execute(sql)
            # delete changset_serie item
            sql = cset_serie.delete().where(
343
                cset_serie.c.cset == log['rev']
344
            ).where(
345
                cset_serie.c.serie == self._name_to_regid(cn, seriename)
346
347
348
349
            )
            cn.execute(sql)

        # wipe the diffs
350
        table = self._table_definition_for(cn, seriename)
351
        cn.execute(table.delete().where(table.c.cset >= csid))
352

353
    def info(self, cn):
354
355
        """Gather global statistics on the current tshistory repository
        """
356
        sql = 'select count(*) from {}.registry'.format(self.namespace)
357
        stats = {'series count': cn.execute(sql).scalar()}
358
        sql = 'select max(id) from {}.changeset'.format(self.namespace)
359
        stats['changeset count'] = cn.execute(sql).scalar()
360
        sql = 'select distinct name from {}.registry order by name'.format(self.namespace)
361
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
362
363
        return stats

364
    def log(self, cn, limit=0, names=None, authors=None,
365
            stripped=False,
366
367
            fromrev=None, torev=None,
            fromdate=None, todate=None):
368
369
370
371
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
372
373
374
375
376
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
377

378
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date, cset.c.metadata]
379
        ).distinct().order_by(desc(cset.c.id))
380
381
382

        if limit:
            sql = sql.limit(limit)
383
        if names:
384
            sql = sql.where(reg.c.seriename.in_(names))
385
386
        if authors:
            sql = sql.where(cset.c.author.in_(authors))
387
388
389
390
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)
        if torev:
            sql = sql.where(cset.c.id <= torev)
391
392
393
394
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)
        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)
395
396
397
398
        if stripped:
            # outerjoin to show dead things
            sql = sql.select_from(cset.outerjoin(cset_series))
        else:
399
            sql = sql.where(cset.c.id == cset_series.c.cset
400
            ).where(cset_series.c.serie == reg.c.id)
401

402
        rset = cn.execute(sql)
403
        for csetid, author, revdate, meta in rset.fetchall():
404
405
            log.append({'rev': csetid, 'author': author,
                        'date': pd.Timestamp(revdate, tz='utc'),
406
                        'meta': meta or {},
407
                        'names': self._changeset_series(cn, csetid)})
408

409
        log.sort(key=lambda rev: rev['rev'])
410
411
        return log

412
413
414
415
416
417
418
419
420
421
422
423
424
    def interval(self, cn, seriename):
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            raise ValueError(f'no such serie: {seriename}')
        sql = (f'select start, "end" '
               f'from "{self.namespace}.timeserie"."{tablename}" '
               f'order by cset desc limit 1')
        res = cn.execute(sql).fetchone()
        start, end = res.start, res.end
        if self.metadata(cn, seriename).get('tzaware'):
            start, end = pd.Timestamp(start, tz='UTC'), pd.Timestamp(end, tz='UTC')
        return pd.Interval(left=start, right=end, closed='both')

425
426
    # /API
    # Helpers
427

428
429
    # creation / update

430
    def _create(self, cn, newts, seriename, author,
431
                metadata=None, insertion_date=None):
432
433
434
        # initial insertion
        if len(newts) == 0:
            return None
435
        self._register_serie(cn, seriename, newts)
436
        snapshot = Snapshot(cn, self, seriename)
437
        csid = self._newchangeset(cn, author, insertion_date, metadata)
438
        head = snapshot.create(newts)
439
        start, end = start_end(newts)
440
441
        value = {
            'cset': csid,
442
443
444
            'snapshot': head,
            'start': start,
            'end': end
445
        }
446
        table = self._make_ts_table(cn, seriename, newts)
447
        cn.execute(table.insert().values(value))
448
        self._finalize_insertion(cn, csid, seriename)
449
        L.info('first insertion of %s (size=%s) by %s',
450
               seriename, len(newts), author)
451
452
        return newts

453
    def _update(self, cn, table, newts, seriename, author,
454
                metadata=None, insertion_date=None):
455
456
        self._validate(cn, newts, seriename)
        snapshot = Snapshot(cn, self, seriename)
457
458
459
        diff = self.diff(snapshot.last(newts.index.min(),
                                       newts.index.max()),
                         newts)
460
461
        if not len(diff):
            L.info('no difference in %s by %s (for ts of size %s)',
462
                   seriename, author, len(newts))
463
464
            return

465
        csid = self._newchangeset(cn, author, insertion_date, metadata)
466
467
468
469
        tsstart, tsend = start_end(newts)
        ival = self.interval(cn, seriename)
        start = min(tsstart, ival.left.replace(tzinfo=None))
        end = max(tsend, ival.right.replace(tzinfo=None))
470
471
472
        head = snapshot.update(diff)
        value = {
            'cset': csid,
473
474
475
            'snapshot': head,
            'start': start,
            'end': end
476
477
        }
        cn.execute(table.insert().values(value))
478
        self._finalize_insertion(cn, csid, seriename)
479
480

        L.info('inserted diff (size=%s) for ts %s by %s',
481
               len(diff), seriename, author)
482
483
        return diff

484
    # serie table handling
485

486
487
488
489
    def _serie_to_tablename(self, cn, seriename):
        tablename = self.serie_tablename.get(seriename)
        if tablename is not None:
            return tablename
490

491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
        reg = self.schema.registry
        sql = select([reg.c.table_name]).where(reg.c.seriename == seriename)
        tablename = cn.execute(sql).scalar()
        if tablename is None:
            # creation time
            return
        self.serie_tablename[seriename] = tablename
        return tablename

    def _table_definition_for(self, cn, seriename):
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            # creation time
            tablename = self._make_tablename(seriename)
        fq_tablename = '{}.timeserie.{}'.format(self.namespace, tablename)
        table = TABLES.get(fq_tablename)
507
        if table is None:
508
509
            TABLES[fq_tablename] = table = Table(
                tablename, self.schema.meta,
510
511
512
                Column('id', Integer, primary_key=True),
                Column('cset', Integer,
                       ForeignKey('{}.changeset.id'.format(self.namespace)),
513
                       nullable=False),
514
515
                Column('start', TIMESTAMP, nullable=False),
                Column('end', TIMESTAMP, nullable=False),
516
517
518
                Column('snapshot', Integer,
                       ForeignKey('{}.snapshot.{}.id'.format(
                           self.namespace,
519
520
521
                           tablename))),
                Index(NONE_NAME, 'cset'),
                Index(NONE_NAME, 'snapshot'),
522
523
                Index(NONE_NAME, 'start'),
                Index(NONE_NAME, 'end'),
524
                schema='{}.timeserie'.format(self.namespace),
525
                keep_existing=True
526
527
            )
        return table
528

529
    def _make_ts_table(self, cn, seriename, ts):
530
        table = self._table_definition_for(cn, seriename)
531
        table.create(cn)
532
533
534
        return table

    def _register_serie(self, cn, seriename, ts):
535
536
        index = ts.index
        inames = [name for name in index.names if name]
537
        sql = self.schema.registry.insert().values(
538
539
            seriename=seriename,
            table_name=self._make_tablename(seriename),
540
541
542
543
544
545
            metadata={
                'tzaware': tzaware_serie(ts),
                'index_type': index.dtype.name,
                'index_names': inames,
                'value_type': ts.dtypes.name
            },
546
        )
547
548
        regid = cn.execute(sql).inserted_primary_key[0]
        self.registry_map[seriename] = regid
549

550
    def _get_ts_table(self, cn, seriename):
551
552
553
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename:
            return self._table_definition_for(cn, tablename)
554

555
556
    # changeset handling

557
    def _newchangeset(self, cn, author, insertion_date=None, metadata=None):
558
        table = self.schema.changeset
559
560
561
        if insertion_date is not None:
            assert insertion_date.tzinfo is not None
        idate = pd.Timestamp(insertion_date or datetime.utcnow(), tz='UTC')
562
563
        sql = table.insert().values(
            author=author,
564
            metadata=metadata,
565
            insertion_date=idate)
566
        return cn.execute(sql).inserted_primary_key[0]
567

568
    def _changeset_series(self, cn, csid):
569
        cset_serie = self.schema.changeset_series
570
571
        reg = self.schema.registry
        sql = select(
572
            [reg.c.seriename]
573
574
        ).where(cset_serie.c.cset == csid
        ).where(cset_serie.c.serie == reg.c.id)
575

576
        return [
577
            row.seriename
578
579
            for row in cn.execute(sql).fetchall()
        ]
580

581
582
583
584
585
586
587
    def _previous_cset(self, cn, seriename, csid):
        tablename = self._serie_to_tablename(cn, seriename)
        sql = ('select cset from "{}.timeserie"."{}" '
               'where cset < %(csid)s '
               'order by cset desc limit 1').format(self.namespace, tablename)
        return cn.execute(sql, csid=csid).scalar()

588
589
    # insertion handling

590
    def _validate(self, cn, ts, seriename):
591
592
        if ts.isnull().all():
            # ts erasure
593
            return
594
        tstype = ts.dtype
595
        meta = self.metadata(cn, seriename)
596
        if tstype != meta['value_type']:
597
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
598
                seriename, tstype, meta['value_type'])
599
            raise Exception(m)
600
        if ts.index.dtype.name != meta['index_type']:
601
            raise Exception('Incompatible index types')
602

603
604
    def _name_to_regid(self, cn, seriename):
        regid = self.registry_map.get(seriename)
605
606
607
608
        if regid is not None:
            return regid

        registry = self.schema.registry
609
        sql = select([registry.c.id]).where(registry.c.seriename == seriename)
610
        regid = self.registry_map[seriename] = cn.execute(sql).scalar()
611
612
        return regid

613
    def _finalize_insertion(self, cn, csid, seriename):
614
615
616
        table = self.schema.changeset_series
        sql = table.insert().values(
            cset=csid,
617
            serie=self._name_to_regid(cn, seriename)
618
619
        )
        cn.execute(sql)
620

621
622
623
    def _resetcaches(self):
        TABLES.clear()
        SNAPTABLES.clear()
624
625
626
        self.metadatacache.clear()
        self.registry_map.clear()
        self.serie_tablename.clear()