tsio.py 22.5 KB
Newer Older
1
from datetime import datetime
2
import logging
3
4
5

import pandas as pd

6
7
from sqlalchemy import Table, Column, Integer, ForeignKey, Index
from sqlalchemy.sql.elements import NONE_NAME
8
from sqlalchemy.engine.base import Engine
Aurélien Campéas's avatar
Aurélien Campéas committed
9
from sqlalchemy.sql.expression import select, func, desc
Aurélien Campéas's avatar
Aurélien Campéas committed
10
from sqlalchemy.dialects.postgresql import TIMESTAMP
11

12
from tshistory.schema import tsschema
13
from tshistory.util import (
14
    closed_overlaps,
15
    num2float,
16
    subset,
17
    SeriesServices,
18
    start_end,
19
20
    tzaware_serie
)
21
from tshistory.snapshot import Snapshot, TABLES as SNAPTABLES
22

23
L = logging.getLogger('tshistory.tsio')
24
TABLES = {}
25
26


27
class TimeSerie(SeriesServices):
28
    namespace = 'tsh'
29
    schema = None
30
    metadatacache = None
31
    registry_map = None
32
    serie_tablename = None
33
34
35

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
36
37
        self.schema = tsschema(namespace)
        self.schema.define()
38
        self.metadatacache = {}
39
        self.registry_map = {}
40
        self.serie_tablename = {}
41

42
    def insert(self, cn, newts, seriename, author,
43
44
               metadata=None,
               _insertion_date=None):
45
        """Create a new revision of a given time series
46

47
        newts: pandas.Series with date index
48
        seriename: str unique identifier of the serie
49
        author: str free-form author name
50
        metadata: optional dict for changeset metadata
51
        """
52
53
54
55
56
57
58
        assert isinstance(newts, pd.Series), 'Not a pd.Series'
        assert isinstance(seriename, str), 'Name not a string'
        assert isinstance(author, str), 'Author not a string'
        assert metadata is None or isinstance(metadata, dict), 'Bad format for metadata'
        assert _insertion_date is None or isinstance(_insertion_date, datetime), 'Bad format for insertion date'
        assert not newts.index.duplicated().any(), 'There are some duplicates in the index'
        assert newts.index.is_monotonic_increasing, 'The index is not monotonic'
59

60
        newts = num2float(newts)
61

62
        if not len(newts):
63
            return
64

65
        assert ('<M8[ns]' == newts.index.dtype or
66
                'datetime' in str(newts.index.dtype) and not
67
68
                isinstance(newts.index, pd.MultiIndex))

69
70
        newts.name = seriename
        table = self._get_ts_table(cn, seriename)
71

72
        if table is None:
73
            return self._create(cn, newts, seriename, author,
74
                                metadata, _insertion_date)
75

76
        return self._update(cn, table, newts, seriename, author,
77
                            metadata, _insertion_date)
78

79
    def get(self, cn, seriename, revision_date=None,
80
81
            from_value_date=None, to_value_date=None,
            _keep_nans=False):
82
83
84
85
86
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

87
        """
88
        table = self._get_ts_table(cn, seriename)
89
90
        if table is None:
            return
91

92
        csetfilter = []
93
        if revision_date:
94
            csetfilter.append(lambda cset: cset.c.insertion_date <= revision_date)
95
        snap = Snapshot(cn, self, seriename)
96
        _, current = snap.find(csetfilter=csetfilter,
97
98
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)
99

100
        if current is not None and not _keep_nans:
101
            current.name = seriename
102
            current = current[~current.isnull()]
103
        return current
104

105
    def metadata(self, cn, seriename):
106
        """Return metadata dict of timeserie."""
107
108
        if seriename in self.metadatacache:
            return self.metadatacache[seriename]
109
110
        reg = self.schema.registry
        sql = select([reg.c.metadata]).where(
111
            reg.c.seriename == seriename
112
113
        )
        meta = cn.execute(sql).scalar()
114
        self.metadatacache[seriename] = meta
115
116
        return meta

117
    def update_metadata(self, cn, seriename, metadata, internal=False):
118
        assert isinstance(metadata, dict)
119
        meta = self.metadata(cn, seriename)
120
121
122
123
124
        if not internal:
            assert set(meta.keys()).intersection(metadata.keys()) == set()
        meta.update(metadata)
        reg = self.schema.registry
        sql = reg.update().where(
125
            reg.c.seriename == seriename
126
127
        ).values(metadata=meta)
        self.metadatacache.pop(seriename)
128
129
        cn.execute(sql)

130
131
132
133
134
135
136
    def changeset_metadata(self, cn, csid):
        sql = 'select metadata from "{ns}".changeset where id = {id}'.format(
            ns=self.namespace,
            id=csid
        )
        return cn.execute(sql).scalar()

137
    def get_history(self, cn, seriename,
138
                    from_insertion_date=None,
139
140
                    to_insertion_date=None,
                    from_value_date=None,
141
                    to_value_date=None,
142
                    deltabefore=None,
143
144
                    deltaafter=None,
                    diffmode=False):
145
        table = self._get_ts_table(cn, seriename)
146
147
148
        if table is None:
            return

149
        cset = self.schema.changeset
150
151
152
153
154
155
156
157
158
159
160
161
        revsql = select(
            [cset.c.id, cset.c.insertion_date]
        ).order_by(
            cset.c.id
        ).where(
            table.c.cset == cset.c.id
        )

        if from_insertion_date:
            revsql = revsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            revsql = revsql.where(cset.c.insertion_date <= to_insertion_date)
162

163
164
165
166
167
168
169
170
171
        if from_value_date or to_value_date:
            revsql = revsql.where(
                closed_overlaps(from_value_date, to_value_date)
            )

        revs = cn.execute(
            revsql,
            {'fromdate': from_value_date, 'todate': to_value_date}
        ).fetchall()
172
173
        if not revs:
            return
174

175
176
177
178
179
180
        if diffmode:
            # compute the previous serie value
            first_csid = revs[0][0]
            previous_csid = self._previous_cset(cn, seriename, first_csid)
            revs.insert(0, (previous_csid, None))

181
        snapshot = Snapshot(cn, self, seriename)
182
        series = []
183
184
        if (deltabefore, deltaafter) != (None, None):
            for csid, idate in revs:
185
186
                from_date = None
                to_date = None
187
                if deltabefore is not None:
188
                    from_date = idate - deltabefore
189
                if deltaafter is not None:
190
                    to_date = idate + deltaafter
191
192
193
                series.append((
                    idate,
                    snapshot.find(csetfilter=[lambda cset: cset.c.id == csid],
194
195
                                  from_value_date=from_date,
                                  to_value_date=to_date)[1]
196
197
198
199
200
201
                ))
        else:
            series = snapshot.findall(revs,
                                      from_value_date,
                                      to_value_date
            )
202

203
204
        if diffmode:
            diffs = []
Aurélien Campéas's avatar
Aurélien Campéas committed
205
            for (_revdate_a, serie_a), (revdate_b, serie_b) in zip(series, series[1:]):
206
207
208
209
210
211
212
213
                if serie_a is None:
                    # when we scan the entirety of the history: there exists no "previous" serie
                    # we therefore consider the first serie as a diff to the "null" serie
                    diffs.append((revdate_b, serie_b))
                else:
                    diffs.append((revdate_b, self.diff(serie_a, serie_b)))
            series = diffs

214
215
216
217
        return {
            idate: serie
            for idate, serie in series
        }
218

219
220
221
    def get_delta(self, cn, seriename, delta,
                  from_value_date=None,
                  to_value_date=None):
222
223
224
225
        """ compute a series whose value dates are bounded to be
        `delta` time after the insertion dates and where we
        keep the most recent ones
        """
226

227
        histo = self.get_history(
228
229
230
            cn, seriename, deltabefore=-delta,
            from_value_date=from_value_date,
            to_value_date=to_value_date
231
232
        )

233
234
235
236
237
238
239
240
        vimap = {}
        vvmap = {}
        for idate, series in histo.items():
            for vdate, value in series.iteritems():
                if vdate not in vimap or vimap[vdate] < idate:
                    vimap[vdate] = idate
                    vvmap[vdate] = value

241
242
243
        ts = subset(pd.Series(vvmap), from_value_date, to_value_date)
        ts.name = seriename
        return ts
244

245
246
    def exists(self, cn, seriename):
        return self._get_ts_table(cn, seriename) is not None
247

248
    def latest_insertion_date(self, cn, seriename):
249
        cset = self.schema.changeset
250
        tstable = self._get_ts_table(cn, seriename)
251
        sql = select([func.max(cset.c.insertion_date)]
252
        ).where(tstable.c.cset == cset.c.id)
253
        return cn.execute(sql).scalar()
254

255
256
257
258
    def last_id(self, cn, seriename):
        snapshot = Snapshot(cn, self, seriename)
        return snapshot.last_id()

259
260
261
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
        assert mode in ('strict', 'before', 'after')
        cset = self.schema.changeset
262
        table = self._table_definition_for(cn, seriename)
263
264
        sql = select([table.c.cset]).where(
            table.c.cset == cset.c.id
265
266
267
268
269
270
271
272
273
        )
        if mode == 'strict':
            sql = sql.where(cset.c.insertion_date == revdate)
        elif mode == 'before':
            sql = sql.where(cset.c.insertion_date <= revdate)
        else:
            sql = sql.where(cset.c.insertion_date >= revdate)
        return cn.execute(sql).scalar()

274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
    def delete(self, cn, seriename):
        assert not isinstance(cn, Engine), 'use a transaction object'
        assert self.exists(cn, seriename)
        # changeset will keep ghost entries
        # we cleanup changeset series, then registry
        # then we drop the two remaining tables
        # cn *must* be a transaction scope
        rid, tablename = cn.execute(
            'select id, table_name from "{}".registry '
            'where seriename = %(seriename)s'.format(self.namespace),
            seriename=seriename
        ).fetchone()
        # drop series tables
        cn.execute(
            'drop table "{}.timeserie"."{}" cascade'.format(self.namespace, tablename)
        )
        cn.execute(
            'drop table "{}.snapshot"."{}" cascade'.format(self.namespace, tablename)
        )
        # cleanup changesets table
        cn.execute('with csets as ('
                   ' select cset from "{ns}".changeset_series '
                   ' where serie = %(rid)s'
                   ') '
                   'delete from "{ns}".changeset as cset using csets '
                   'where cset.id = csets.cset'.format(ns=self.namespace),
                   rid=rid
        )
        cn.execute('delete from "{}".registry '
                   'where id = %(rid)s'.format(self.namespace),
                   rid=rid)
        # -> this will transitively cleanup state changeset_series entries
        self._resetcaches()

308
309
310
311
312
313
314
315
316
    def strip(self, cn, seriename, csid):
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs

        # put stripping info in the metadata
        cset = self.schema.changeset
        cset_serie = self.schema.changeset_series
        for log in logs:
            # update changeset.metadata
317
            metadata = self.changeset_metadata(cn, log['rev']) or {}
318
319
320
321
322
323
            metadata['tshistory.info'] = 'got stripped from {}'.format(csid)
            sql = cset.update().where(cset.c.id == log['rev']
            ).values(metadata=metadata)
            cn.execute(sql)
            # delete changset_serie item
            sql = cset_serie.delete().where(
324
                cset_serie.c.cset == log['rev']
325
            ).where(
326
                cset_serie.c.serie == self._name_to_regid(cn, seriename)
327
328
329
330
            )
            cn.execute(sql)

        # wipe the diffs
331
        table = self._table_definition_for(cn, seriename)
332
        cn.execute(table.delete().where(table.c.cset >= csid))
333

334
    def info(self, cn):
335
336
        """Gather global statistics on the current tshistory repository
        """
337
        sql = 'select count(*) from "{}".registry'.format(self.namespace)
338
        stats = {'series count': cn.execute(sql).scalar()}
339
        sql = 'select max(id) from "{}".changeset'.format(self.namespace)
340
        stats['changeset count'] = cn.execute(sql).scalar()
341
342
343
        sql = 'select distinct seriename from "{}".registry order by seriename'.format(
            self.namespace
        )
344
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
345
346
        return stats

347
    def log(self, cn, limit=0, names=None, authors=None,
348
            stripped=False,
349
350
            fromrev=None, torev=None,
            fromdate=None, todate=None):
351
352
353
354
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
355
356
357
358
359
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
360

361
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date, cset.c.metadata]
362
        ).distinct().order_by(desc(cset.c.id))
363
364
365

        if limit:
            sql = sql.limit(limit)
366
        if names:
367
            sql = sql.where(reg.c.seriename.in_(names))
368
369
        if authors:
            sql = sql.where(cset.c.author.in_(authors))
370
371
372
373
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)
        if torev:
            sql = sql.where(cset.c.id <= torev)
374
375
376
377
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)
        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)
378
379
380
381
        if stripped:
            # outerjoin to show dead things
            sql = sql.select_from(cset.outerjoin(cset_series))
        else:
382
            sql = sql.where(cset.c.id == cset_series.c.cset
383
            ).where(cset_series.c.serie == reg.c.id)
384

385
        rset = cn.execute(sql)
386
        for csetid, author, revdate, meta in rset.fetchall():
387
388
            log.append({'rev': csetid, 'author': author,
                        'date': pd.Timestamp(revdate, tz='utc'),
389
                        'meta': meta or {},
390
                        'names': self._changeset_series(cn, csetid)})
391

392
        log.sort(key=lambda rev: rev['rev'])
393
394
        return log

395
396
397
398
399
400
401
402
403
404
405
406
407
    def interval(self, cn, seriename):
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            raise ValueError(f'no such serie: {seriename}')
        sql = (f'select start, "end" '
               f'from "{self.namespace}.timeserie"."{tablename}" '
               f'order by cset desc limit 1')
        res = cn.execute(sql).fetchone()
        start, end = res.start, res.end
        if self.metadata(cn, seriename).get('tzaware'):
            start, end = pd.Timestamp(start, tz='UTC'), pd.Timestamp(end, tz='UTC')
        return pd.Interval(left=start, right=end, closed='both')

408
409
    # /API
    # Helpers
410

411
412
    # creation / update

413
    def _create(self, cn, newts, seriename, author,
414
                metadata=None, insertion_date=None):
415
416
417
        # initial insertion
        if len(newts) == 0:
            return None
418
        self._register_serie(cn, seriename, newts)
419
        snapshot = Snapshot(cn, self, seriename)
420
        csid = self._newchangeset(cn, author, insertion_date, metadata)
421
        head = snapshot.create(newts)
422
        start, end = start_end(newts)
423
424
        value = {
            'cset': csid,
425
426
427
            'snapshot': head,
            'start': start,
            'end': end
428
        }
Aurélien Campéas's avatar
Aurélien Campéas committed
429
        table = self._make_ts_table(cn, seriename)
430
        cn.execute(table.insert().values(value))
431
        self._finalize_insertion(cn, csid, seriename)
432
        L.info('first insertion of %s (size=%s) by %s',
433
               seriename, len(newts), author)
434
435
        return newts

436
    def _update(self, cn, table, newts, seriename, author,
437
                metadata=None, insertion_date=None):
438
439
        self._validate(cn, newts, seriename)
        snapshot = Snapshot(cn, self, seriename)
440
441
442
        diff = self.diff(snapshot.last(newts.index.min(),
                                       newts.index.max()),
                         newts)
443
444
        if not len(diff):
            L.info('no difference in %s by %s (for ts of size %s)',
445
                   seriename, author, len(newts))
446
447
            return

448
        csid = self._newchangeset(cn, author, insertion_date, metadata)
449
450
451
452
        tsstart, tsend = start_end(newts)
        ival = self.interval(cn, seriename)
        start = min(tsstart, ival.left.replace(tzinfo=None))
        end = max(tsend, ival.right.replace(tzinfo=None))
453
454
455
        head = snapshot.update(diff)
        value = {
            'cset': csid,
456
457
458
            'snapshot': head,
            'start': start,
            'end': end
459
460
        }
        cn.execute(table.insert().values(value))
461
        self._finalize_insertion(cn, csid, seriename)
462
463

        L.info('inserted diff (size=%s) for ts %s by %s',
464
               len(diff), seriename, author)
465
466
        return diff

467
    # serie table handling
468

469
470
471
472
    def _serie_to_tablename(self, cn, seriename):
        tablename = self.serie_tablename.get(seriename)
        if tablename is not None:
            return tablename
473

474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
        reg = self.schema.registry
        sql = select([reg.c.table_name]).where(reg.c.seriename == seriename)
        tablename = cn.execute(sql).scalar()
        if tablename is None:
            # creation time
            return
        self.serie_tablename[seriename] = tablename
        return tablename

    def _table_definition_for(self, cn, seriename):
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            # creation time
            tablename = self._make_tablename(seriename)
        fq_tablename = '{}.timeserie.{}'.format(self.namespace, tablename)
        table = TABLES.get(fq_tablename)
490
        if table is None:
491
492
            TABLES[fq_tablename] = table = Table(
                tablename, self.schema.meta,
493
494
495
                Column('id', Integer, primary_key=True),
                Column('cset', Integer,
                       ForeignKey('{}.changeset.id'.format(self.namespace)),
496
                       nullable=False),
497
498
                Column('start', TIMESTAMP, nullable=False),
                Column('end', TIMESTAMP, nullable=False),
499
500
501
                Column('snapshot', Integer,
                       ForeignKey('{}.snapshot.{}.id'.format(
                           self.namespace,
502
503
504
                           tablename))),
                Index(NONE_NAME, 'cset'),
                Index(NONE_NAME, 'snapshot'),
505
506
                Index(NONE_NAME, 'start'),
                Index(NONE_NAME, 'end'),
507
                schema='{}.timeserie'.format(self.namespace),
508
                keep_existing=True
509
510
            )
        return table
511

Aurélien Campéas's avatar
Aurélien Campéas committed
512
    def _make_ts_table(self, cn, seriename):
513
        table = self._table_definition_for(cn, seriename)
514
        table.create(cn)
515
516
517
        return table

    def _register_serie(self, cn, seriename, ts):
518
519
        index = ts.index
        inames = [name for name in index.names if name]
520
        sql = self.schema.registry.insert().values(
521
522
            seriename=seriename,
            table_name=self._make_tablename(seriename),
523
524
525
526
            metadata={
                'tzaware': tzaware_serie(ts),
                'index_type': index.dtype.name,
                'index_names': inames,
527
528
                'index_dtype': index.dtype.str,
                'value_dtype': ts.dtypes.str,
529
                'value_type': ts.dtypes.name
530
            }
531
        )
532
533
        regid = cn.execute(sql).inserted_primary_key[0]
        self.registry_map[seriename] = regid
534

535
    def _get_ts_table(self, cn, seriename):
536
537
538
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename:
            return self._table_definition_for(cn, tablename)
539

540
541
    # changeset handling

542
    def _newchangeset(self, cn, author, insertion_date=None, metadata=None):
543
        table = self.schema.changeset
544
545
546
        if insertion_date is not None:
            assert insertion_date.tzinfo is not None
        idate = pd.Timestamp(insertion_date or datetime.utcnow(), tz='UTC')
547
548
        sql = table.insert().values(
            author=author,
549
            metadata=metadata,
550
            insertion_date=idate)
551
        return cn.execute(sql).inserted_primary_key[0]
552

553
    def _changeset_series(self, cn, csid):
554
        cset_serie = self.schema.changeset_series
555
556
        reg = self.schema.registry
        sql = select(
557
            [reg.c.seriename]
558
559
        ).where(cset_serie.c.cset == csid
        ).where(cset_serie.c.serie == reg.c.id)
560

561
        return [
562
            row.seriename
563
564
            for row in cn.execute(sql).fetchall()
        ]
565

566
567
568
569
570
571
572
    def _previous_cset(self, cn, seriename, csid):
        tablename = self._serie_to_tablename(cn, seriename)
        sql = ('select cset from "{}.timeserie"."{}" '
               'where cset < %(csid)s '
               'order by cset desc limit 1').format(self.namespace, tablename)
        return cn.execute(sql, csid=csid).scalar()

573
574
    # insertion handling

575
    def _validate(self, cn, ts, seriename):
576
577
        if ts.isnull().all():
            # ts erasure
578
            return
579
        tstype = ts.dtype
580
        meta = self.metadata(cn, seriename)
581
        if tstype != meta['value_type']:
582
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
583
                seriename, tstype, meta['value_type'])
584
            raise Exception(m)
585
        if ts.index.dtype.name != meta['index_type']:
586
            raise Exception('Incompatible index types')
587

588
589
    def _name_to_regid(self, cn, seriename):
        regid = self.registry_map.get(seriename)
590
591
592
593
        if regid is not None:
            return regid

        registry = self.schema.registry
594
        sql = select([registry.c.id]).where(registry.c.seriename == seriename)
595
        regid = self.registry_map[seriename] = cn.execute(sql).scalar()
596
597
        return regid

598
    def _finalize_insertion(self, cn, csid, seriename):
599
600
601
        table = self.schema.changeset_series
        sql = table.insert().values(
            cset=csid,
602
            serie=self._name_to_regid(cn, seriename)
603
604
        )
        cn.execute(sql)
605

606
607
608
    def _resetcaches(self):
        TABLES.clear()
        SNAPTABLES.clear()
609
610
611
        self.metadatacache.clear()
        self.registry_map.clear()
        self.serie_tablename.clear()