tsio.py 22.8 KB
Newer Older
1
from datetime import datetime
2
import logging
3
4
5

import pandas as pd

6
7
from sqlalchemy import Table, Column, Integer, ForeignKey, Index
from sqlalchemy.sql.elements import NONE_NAME
8
from sqlalchemy.engine.base import Engine
Aurélien Campéas's avatar
Aurélien Campéas committed
9
from sqlalchemy.sql.expression import select, func, desc
Aurélien Campéas's avatar
Aurélien Campéas committed
10
from sqlalchemy.dialects.postgresql import TIMESTAMP
11

12
from tshistory.schema import tsschema
13
from tshistory.util import (
14
    closed_overlaps,
15
    num2float,
16
    subset,
17
    SeriesServices,
18
    start_end,
19
20
    tzaware_serie
)
21
from tshistory.snapshot import Snapshot, TABLES as SNAPTABLES
22

23
L = logging.getLogger('tshistory.tsio')
24
TABLES = {}
25
26


27
class TimeSerie(SeriesServices):
28
    namespace = 'tsh'
29
    schema = None
30
    metadatacache = None
31
    registry_map = None
32
    serie_tablename = None
33
    create_lock_id = None
34
35
36

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
37
38
        self.schema = tsschema(namespace)
        self.schema.define()
39
        self.metadatacache = {}
40
        self.registry_map = {}
41
        self.serie_tablename = {}
42
        self.create_lock_id = sum(ord(c) for c in namespace)
43

44
    def insert(self, cn, newts, seriename, author,
45
46
               metadata=None,
               _insertion_date=None):
47
        """Create a new revision of a given time series
48

49
        newts: pandas.Series with date index
50
        seriename: str unique identifier of the serie
51
        author: str free-form author name
52
        metadata: optional dict for changeset metadata
53
        """
54
55
56
57
58
59
60
        assert isinstance(newts, pd.Series), 'Not a pd.Series'
        assert isinstance(seriename, str), 'Name not a string'
        assert isinstance(author, str), 'Author not a string'
        assert metadata is None or isinstance(metadata, dict), 'Bad format for metadata'
        assert _insertion_date is None or isinstance(_insertion_date, datetime), 'Bad format for insertion date'
        assert not newts.index.duplicated().any(), 'There are some duplicates in the index'
        assert newts.index.is_monotonic_increasing, 'The index is not monotonic'
61

62
        newts = num2float(newts)
63

64
        if not len(newts):
65
            return
66

67
        assert ('<M8[ns]' == newts.index.dtype or
68
                'datetime' in str(newts.index.dtype) and not
69
70
                isinstance(newts.index, pd.MultiIndex))

71
72
        newts.name = seriename
        table = self._get_ts_table(cn, seriename)
73

74
        if table is None:
75
            return self._create(cn, newts, seriename, author,
76
                                metadata, _insertion_date)
77

78
        return self._update(cn, table, newts, seriename, author,
79
                            metadata, _insertion_date)
80

81
    def get(self, cn, seriename, revision_date=None,
82
83
            from_value_date=None, to_value_date=None,
            _keep_nans=False):
84
85
86
87
88
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

89
        """
90
        table = self._get_ts_table(cn, seriename)
91
92
        if table is None:
            return
93

94
        csetfilter = []
95
        if revision_date:
96
            csetfilter.append(lambda cset: cset.c.insertion_date <= revision_date)
97
        snap = Snapshot(cn, self, seriename)
98
        _, current = snap.find(csetfilter=csetfilter,
99
100
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)
101

102
        if current is not None and not _keep_nans:
103
            current.name = seriename
104
            current = current[~current.isnull()]
105
        return current
106

107
    def metadata(self, cn, seriename):
108
        """Return metadata dict of timeserie."""
109
110
        if seriename in self.metadatacache:
            return self.metadatacache[seriename]
111
112
        reg = self.schema.registry
        sql = select([reg.c.metadata]).where(
113
            reg.c.seriename == seriename
114
115
        )
        meta = cn.execute(sql).scalar()
116
        self.metadatacache[seriename] = meta
117
118
        return meta

119
    def update_metadata(self, cn, seriename, metadata, internal=False):
120
        assert isinstance(metadata, dict)
121
        meta = self.metadata(cn, seriename)
122
123
124
125
126
        if not internal:
            assert set(meta.keys()).intersection(metadata.keys()) == set()
        meta.update(metadata)
        reg = self.schema.registry
        sql = reg.update().where(
127
            reg.c.seriename == seriename
128
129
        ).values(metadata=meta)
        self.metadatacache.pop(seriename)
130
131
        cn.execute(sql)

132
133
134
135
136
137
138
    def changeset_metadata(self, cn, csid):
        sql = 'select metadata from "{ns}".changeset where id = {id}'.format(
            ns=self.namespace,
            id=csid
        )
        return cn.execute(sql).scalar()

139
    def get_history(self, cn, seriename,
140
                    from_insertion_date=None,
141
142
                    to_insertion_date=None,
                    from_value_date=None,
143
                    to_value_date=None,
144
                    deltabefore=None,
145
146
                    deltaafter=None,
                    diffmode=False):
147
        table = self._get_ts_table(cn, seriename)
148
149
150
        if table is None:
            return

151
        cset = self.schema.changeset
152
153
154
155
156
157
158
159
160
161
162
163
        revsql = select(
            [cset.c.id, cset.c.insertion_date]
        ).order_by(
            cset.c.id
        ).where(
            table.c.cset == cset.c.id
        )

        if from_insertion_date:
            revsql = revsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            revsql = revsql.where(cset.c.insertion_date <= to_insertion_date)
164

165
166
167
168
169
170
171
172
173
        if from_value_date or to_value_date:
            revsql = revsql.where(
                closed_overlaps(from_value_date, to_value_date)
            )

        revs = cn.execute(
            revsql,
            {'fromdate': from_value_date, 'todate': to_value_date}
        ).fetchall()
174
        if not revs:
175
            return {}
176

177
178
179
180
181
182
        if diffmode:
            # compute the previous serie value
            first_csid = revs[0][0]
            previous_csid = self._previous_cset(cn, seriename, first_csid)
            revs.insert(0, (previous_csid, None))

183
        snapshot = Snapshot(cn, self, seriename)
184
        series = []
185
186
        if (deltabefore, deltaafter) != (None, None):
            for csid, idate in revs:
187
188
                from_date = None
                to_date = None
189
                if deltabefore is not None:
190
                    from_date = idate - deltabefore
191
                if deltaafter is not None:
192
                    to_date = idate + deltaafter
193
194
195
                series.append((
                    idate,
                    snapshot.find(csetfilter=[lambda cset: cset.c.id == csid],
196
197
                                  from_value_date=from_date,
                                  to_value_date=to_date)[1]
198
199
200
201
202
203
                ))
        else:
            series = snapshot.findall(revs,
                                      from_value_date,
                                      to_value_date
            )
204

205
206
        if diffmode:
            diffs = []
Aurélien Campéas's avatar
Aurélien Campéas committed
207
            for (_revdate_a, serie_a), (revdate_b, serie_b) in zip(series, series[1:]):
208
209
210
211
212
213
214
215
                if serie_a is None:
                    # when we scan the entirety of the history: there exists no "previous" serie
                    # we therefore consider the first serie as a diff to the "null" serie
                    diffs.append((revdate_b, serie_b))
                else:
                    diffs.append((revdate_b, self.diff(serie_a, serie_b)))
            series = diffs

216
217
218
219
        return {
            idate: serie
            for idate, serie in series
        }
220

221
222
223
    def get_delta(self, cn, seriename, delta,
                  from_value_date=None,
                  to_value_date=None):
224
225
226
227
        """ compute a series whose value dates are bounded to be
        `delta` time after the insertion dates and where we
        keep the most recent ones
        """
228

229
        histo = self.get_history(
230
231
232
            cn, seriename, deltabefore=-delta,
            from_value_date=from_value_date,
            to_value_date=to_value_date
233
234
        )

235
236
237
238
239
240
241
242
        vimap = {}
        vvmap = {}
        for idate, series in histo.items():
            for vdate, value in series.iteritems():
                if vdate not in vimap or vimap[vdate] < idate:
                    vimap[vdate] = idate
                    vvmap[vdate] = value

243
244
245
        ts = subset(pd.Series(vvmap), from_value_date, to_value_date)
        ts.name = seriename
        return ts
246

247
248
    def exists(self, cn, seriename):
        return self._get_ts_table(cn, seriename) is not None
249

250
    def latest_insertion_date(self, cn, seriename):
251
        cset = self.schema.changeset
252
        tstable = self._get_ts_table(cn, seriename)
253
        sql = select([func.max(cset.c.insertion_date)]
254
        ).where(tstable.c.cset == cset.c.id)
255
        return cn.execute(sql).scalar()
256

257
258
259
260
    def last_id(self, cn, seriename):
        snapshot = Snapshot(cn, self, seriename)
        return snapshot.last_id()

261
262
263
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
        assert mode in ('strict', 'before', 'after')
        cset = self.schema.changeset
264
        table = self._table_definition_for(cn, seriename)
265
266
        sql = select([table.c.cset]).where(
            table.c.cset == cset.c.id
267
268
269
270
271
272
273
274
275
        )
        if mode == 'strict':
            sql = sql.where(cset.c.insertion_date == revdate)
        elif mode == 'before':
            sql = sql.where(cset.c.insertion_date <= revdate)
        else:
            sql = sql.where(cset.c.insertion_date >= revdate)
        return cn.execute(sql).scalar()

276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
    def delete(self, cn, seriename):
        assert not isinstance(cn, Engine), 'use a transaction object'
        assert self.exists(cn, seriename)
        # changeset will keep ghost entries
        # we cleanup changeset series, then registry
        # then we drop the two remaining tables
        # cn *must* be a transaction scope
        rid, tablename = cn.execute(
            'select id, table_name from "{}".registry '
            'where seriename = %(seriename)s'.format(self.namespace),
            seriename=seriename
        ).fetchone()
        # drop series tables
        cn.execute(
            'drop table "{}.timeserie"."{}" cascade'.format(self.namespace, tablename)
        )
        cn.execute(
            'drop table "{}.snapshot"."{}" cascade'.format(self.namespace, tablename)
        )
        # cleanup changesets table
        cn.execute('with csets as ('
                   ' select cset from "{ns}".changeset_series '
                   ' where serie = %(rid)s'
                   ') '
                   'delete from "{ns}".changeset as cset using csets '
                   'where cset.id = csets.cset'.format(ns=self.namespace),
                   rid=rid
        )
        cn.execute('delete from "{}".registry '
                   'where id = %(rid)s'.format(self.namespace),
                   rid=rid)
        # -> this will transitively cleanup state changeset_series entries
        self._resetcaches()

310
311
312
313
314
315
316
317
318
    def strip(self, cn, seriename, csid):
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs

        # put stripping info in the metadata
        cset = self.schema.changeset
        cset_serie = self.schema.changeset_series
        for log in logs:
            # update changeset.metadata
319
            metadata = self.changeset_metadata(cn, log['rev']) or {}
320
321
322
323
324
325
            metadata['tshistory.info'] = 'got stripped from {}'.format(csid)
            sql = cset.update().where(cset.c.id == log['rev']
            ).values(metadata=metadata)
            cn.execute(sql)
            # delete changset_serie item
            sql = cset_serie.delete().where(
326
                cset_serie.c.cset == log['rev']
327
            ).where(
328
                cset_serie.c.serie == self._name_to_regid(cn, seriename)
329
330
331
332
            )
            cn.execute(sql)

        # wipe the diffs
333
        table = self._table_definition_for(cn, seriename)
334
        cn.execute(table.delete().where(table.c.cset >= csid))
335

336
    def info(self, cn):
337
338
        """Gather global statistics on the current tshistory repository
        """
339
        sql = 'select count(*) from "{}".registry'.format(self.namespace)
340
        stats = {'series count': cn.execute(sql).scalar()}
341
        sql = 'select max(id) from "{}".changeset'.format(self.namespace)
342
        stats['changeset count'] = cn.execute(sql).scalar()
343
344
345
        sql = 'select distinct seriename from "{}".registry order by seriename'.format(
            self.namespace
        )
346
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
347
348
        return stats

349
    def log(self, cn, limit=0, names=None, authors=None,
350
            stripped=False,
351
352
            fromrev=None, torev=None,
            fromdate=None, todate=None):
353
354
355
356
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
357
358
359
360
361
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
362

363
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date, cset.c.metadata]
364
        ).distinct().order_by(desc(cset.c.id))
365
366
367

        if limit:
            sql = sql.limit(limit)
368
        if names:
369
            sql = sql.where(reg.c.seriename.in_(names))
370
371
        if authors:
            sql = sql.where(cset.c.author.in_(authors))
372
373
374
375
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)
        if torev:
            sql = sql.where(cset.c.id <= torev)
376
377
378
379
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)
        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)
380
381
382
383
        if stripped:
            # outerjoin to show dead things
            sql = sql.select_from(cset.outerjoin(cset_series))
        else:
384
            sql = sql.where(cset.c.id == cset_series.c.cset
385
            ).where(cset_series.c.serie == reg.c.id)
386

387
        rset = cn.execute(sql)
388
        for csetid, author, revdate, meta in rset.fetchall():
389
390
            log.append({'rev': csetid, 'author': author,
                        'date': pd.Timestamp(revdate, tz='utc'),
391
                        'meta': meta or {},
392
                        'names': self._changeset_series(cn, csetid)})
393

394
        log.sort(key=lambda rev: rev['rev'])
395
396
        return log

397
398
399
400
401
402
403
404
405
406
407
408
409
    def interval(self, cn, seriename):
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            raise ValueError(f'no such serie: {seriename}')
        sql = (f'select start, "end" '
               f'from "{self.namespace}.timeserie"."{tablename}" '
               f'order by cset desc limit 1')
        res = cn.execute(sql).fetchone()
        start, end = res.start, res.end
        if self.metadata(cn, seriename).get('tzaware'):
            start, end = pd.Timestamp(start, tz='UTC'), pd.Timestamp(end, tz='UTC')
        return pd.Interval(left=start, right=end, closed='both')

410
411
    # /API
    # Helpers
412

413
414
    # creation / update

415
    def _create(self, cn, newts, seriename, author,
416
                metadata=None, insertion_date=None):
417
418
419
        # initial insertion
        if len(newts) == 0:
            return None
420
421
422
423
424
        # at creation time we take an exclusive lock to avoid
        # a deadlock on created tables against the changeset-series fk
        cn.execute(
            'select pg_advisory_xact_lock({})'.format(self.create_lock_id)
        )
425
        self._register_serie(cn, seriename, newts)
426
        snapshot = Snapshot(cn, self, seriename)
427
        csid = self._newchangeset(cn, author, insertion_date, metadata)
428
        head = snapshot.create(newts)
429
        start, end = start_end(newts)
430
431
        value = {
            'cset': csid,
432
433
434
            'snapshot': head,
            'start': start,
            'end': end
435
        }
Aurélien Campéas's avatar
Aurélien Campéas committed
436
        table = self._make_ts_table(cn, seriename)
437
        cn.execute(table.insert().values(value))
438
        self._finalize_insertion(cn, csid, seriename)
439
        L.info('first insertion of %s (size=%s) by %s',
440
               seriename, len(newts), author)
441
442
        return newts

443
    def _update(self, cn, table, newts, seriename, author,
444
                metadata=None, insertion_date=None):
445
446
        self._validate(cn, newts, seriename)
        snapshot = Snapshot(cn, self, seriename)
447
448
449
        diff = self.diff(snapshot.last(newts.index.min(),
                                       newts.index.max()),
                         newts)
450
451
        if not len(diff):
            L.info('no difference in %s by %s (for ts of size %s)',
452
                   seriename, author, len(newts))
453
454
            return

455
        csid = self._newchangeset(cn, author, insertion_date, metadata)
456
457
458
459
        tsstart, tsend = start_end(newts)
        ival = self.interval(cn, seriename)
        start = min(tsstart, ival.left.replace(tzinfo=None))
        end = max(tsend, ival.right.replace(tzinfo=None))
460
461
462
        head = snapshot.update(diff)
        value = {
            'cset': csid,
463
464
465
            'snapshot': head,
            'start': start,
            'end': end
466
467
        }
        cn.execute(table.insert().values(value))
468
        self._finalize_insertion(cn, csid, seriename)
469
470

        L.info('inserted diff (size=%s) for ts %s by %s',
471
               len(diff), seriename, author)
472
473
        return diff

474
    # serie table handling
475

476
477
478
479
    def _serie_to_tablename(self, cn, seriename):
        tablename = self.serie_tablename.get(seriename)
        if tablename is not None:
            return tablename
480

481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
        reg = self.schema.registry
        sql = select([reg.c.table_name]).where(reg.c.seriename == seriename)
        tablename = cn.execute(sql).scalar()
        if tablename is None:
            # creation time
            return
        self.serie_tablename[seriename] = tablename
        return tablename

    def _table_definition_for(self, cn, seriename):
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            # creation time
            tablename = self._make_tablename(seriename)
        fq_tablename = '{}.timeserie.{}'.format(self.namespace, tablename)
        table = TABLES.get(fq_tablename)
497
        if table is None:
498
499
            TABLES[fq_tablename] = table = Table(
                tablename, self.schema.meta,
500
501
502
                Column('id', Integer, primary_key=True),
                Column('cset', Integer,
                       ForeignKey('{}.changeset.id'.format(self.namespace)),
503
                       nullable=False),
504
505
                Column('start', TIMESTAMP, nullable=False),
                Column('end', TIMESTAMP, nullable=False),
506
507
508
                Column('snapshot', Integer,
                       ForeignKey('{}.snapshot.{}.id'.format(
                           self.namespace,
509
510
511
                           tablename))),
                Index(NONE_NAME, 'cset'),
                Index(NONE_NAME, 'snapshot'),
512
513
                Index(NONE_NAME, 'start'),
                Index(NONE_NAME, 'end'),
514
                schema='{}.timeserie'.format(self.namespace),
515
                keep_existing=True
516
517
            )
        return table
518

Aurélien Campéas's avatar
Aurélien Campéas committed
519
    def _make_ts_table(self, cn, seriename):
520
        table = self._table_definition_for(cn, seriename)
521
        table.create(cn)
522
523
524
        return table

    def _register_serie(self, cn, seriename, ts):
525
526
        index = ts.index
        inames = [name for name in index.names if name]
527
        sql = self.schema.registry.insert().values(
528
529
            seriename=seriename,
            table_name=self._make_tablename(seriename),
530
531
532
533
            metadata={
                'tzaware': tzaware_serie(ts),
                'index_type': index.dtype.name,
                'index_names': inames,
534
535
                'index_dtype': index.dtype.str,
                'value_dtype': ts.dtypes.str,
536
                'value_type': ts.dtypes.name
537
            }
538
        )
539
540
        regid = cn.execute(sql).inserted_primary_key[0]
        self.registry_map[seriename] = regid
541

542
    def _get_ts_table(self, cn, seriename):
543
544
545
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename:
            return self._table_definition_for(cn, tablename)
546

547
548
    # changeset handling

549
    def _newchangeset(self, cn, author, insertion_date=None, metadata=None):
550
        table = self.schema.changeset
551
552
553
        if insertion_date is not None:
            assert insertion_date.tzinfo is not None
        idate = pd.Timestamp(insertion_date or datetime.utcnow(), tz='UTC')
554
555
        sql = table.insert().values(
            author=author,
556
            metadata=metadata,
557
            insertion_date=idate)
558
        return cn.execute(sql).inserted_primary_key[0]
559

560
    def _changeset_series(self, cn, csid):
561
        cset_serie = self.schema.changeset_series
562
563
        reg = self.schema.registry
        sql = select(
564
            [reg.c.seriename]
565
566
        ).where(cset_serie.c.cset == csid
        ).where(cset_serie.c.serie == reg.c.id)
567

568
        return [
569
            row.seriename
570
571
            for row in cn.execute(sql).fetchall()
        ]
572

573
574
575
576
577
578
579
    def _previous_cset(self, cn, seriename, csid):
        tablename = self._serie_to_tablename(cn, seriename)
        sql = ('select cset from "{}.timeserie"."{}" '
               'where cset < %(csid)s '
               'order by cset desc limit 1').format(self.namespace, tablename)
        return cn.execute(sql, csid=csid).scalar()

580
581
    # insertion handling

582
    def _validate(self, cn, ts, seriename):
583
584
        if ts.isnull().all():
            # ts erasure
585
            return
586
        tstype = ts.dtype
587
        meta = self.metadata(cn, seriename)
588
        if tstype != meta['value_type']:
589
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
590
                seriename, tstype, meta['value_type'])
591
            raise Exception(m)
592
        if ts.index.dtype.name != meta['index_type']:
593
            raise Exception('Incompatible index types')
594

595
596
    def _name_to_regid(self, cn, seriename):
        regid = self.registry_map.get(seriename)
597
598
599
600
        if regid is not None:
            return regid

        registry = self.schema.registry
601
        sql = select([registry.c.id]).where(registry.c.seriename == seriename)
602
        regid = self.registry_map[seriename] = cn.execute(sql).scalar()
603
604
        return regid

605
    def _finalize_insertion(self, cn, csid, seriename):
606
607
608
        table = self.schema.changeset_series
        sql = table.insert().values(
            cset=csid,
609
            serie=self._name_to_regid(cn, seriename)
610
611
        )
        cn.execute(sql)
612

613
614
615
    def _resetcaches(self):
        TABLES.clear()
        SNAPTABLES.clear()
616
617
618
        self.metadatacache.clear()
        self.registry_map.clear()
        self.serie_tablename.clear()