tsio.py 20.1 KB
Newer Older
1
from datetime import datetime
2
from contextlib import contextmanager
3
import logging
4
import hashlib
5
6
7

import pandas as pd

8
from sqlalchemy import Table, Column, Integer, ForeignKey
Aurélien Campéas's avatar
Aurélien Campéas committed
9
from sqlalchemy.sql.expression import select, func, desc
10
from sqlalchemy.dialects.postgresql import BYTEA
11

12
from tshistory.schema import tsschema
13
14
from tshistory.util import (
    inject_in_index,
15
    num2float,
16
    subset,
17
    SeriesServices,
18
19
    tzaware_serie
)
20
from tshistory.snapshot import Snapshot
21

22
L = logging.getLogger('tshistory.tsio')
23
TABLES = {}
24
25


26
class TimeSerie(SeriesServices):
27
    namespace = 'tsh'
28
    schema = None
29
    metadatacache = None
30
    registry_map = None
31
    serie_tablename = None
32
33
34

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
35
36
        self.schema = tsschema(namespace)
        self.schema.define()
37
        self.metadatacache = {}
38
        self.registry_map = {}
39
        self.serie_tablename = {}
40

41
    def insert(self, cn, newts, seriename, author,
42
43
               metadata=None,
               _insertion_date=None):
44
        """Create a new revision of a given time series
45

46
        newts: pandas.Series with date index
47
        seriename: str unique identifier of the serie
48
        author: str free-form author name
49
        metadata: optional dict for changeset metadata
50
51
        """
        assert isinstance(newts, pd.Series)
52
        assert isinstance(seriename, str)
Aurélien Campéas's avatar
Aurélien Campéas committed
53
        assert isinstance(author, str)
54
        assert metadata is None or isinstance(metadata, dict)
Aurélien Campéas's avatar
Aurélien Campéas committed
55
        assert _insertion_date is None or isinstance(_insertion_date, datetime)
56
        assert not newts.index.duplicated().any()
57

58
        newts = num2float(newts)
59

60
        if not len(newts):
61
            return
62

63
        assert ('<M8[ns]' == newts.index.dtype or
64
                'datetime' in str(newts.index.dtype) and not
65
66
                isinstance(newts.index, pd.MultiIndex))

67
68
        newts.name = seriename
        table = self._get_ts_table(cn, seriename)
69

70
        if table is None:
71
            return self._create(cn, newts, seriename, author,
72
                                metadata, _insertion_date)
73

74
        return self._update(cn, table, newts, seriename, author,
75
                            metadata, _insertion_date)
76

77
    def get(self, cn, seriename, revision_date=None,
78
79
            from_value_date=None, to_value_date=None,
            _keep_nans=False):
80
81
82
83
84
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

85
        """
86
        table = self._get_ts_table(cn, seriename)
87
88
        if table is None:
            return
89

90
        csetfilter = []
91
        if revision_date:
92
            csetfilter.append(lambda cset: cset.c.insertion_date <= revision_date)
93
        snap = Snapshot(cn, self, seriename)
94
        _, current = snap.find(csetfilter=csetfilter,
95
96
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)
97

98
        if current is not None and not _keep_nans:
99
            current.name = seriename
100
            current = current[~current.isnull()]
101
        return current
102

103
    def metadata(self, cn, seriename):
104
        """Return metadata dict of timeserie."""
105
106
        if seriename in self.metadatacache:
            return self.metadatacache[seriename]
107
108
        reg = self.schema.registry
        sql = select([reg.c.metadata]).where(
109
            reg.c.seriename == seriename
110
111
        )
        meta = cn.execute(sql).scalar()
112
        self.metadatacache[seriename] = meta
113
114
        return meta

115
    def update_metadata(self, cn, seriename, metadata, internal=False):
116
        assert isinstance(metadata, dict)
117
        meta = self.metadata(cn, seriename)
118
119
120
121
122
        if not internal:
            assert set(meta.keys()).intersection(metadata.keys()) == set()
        meta.update(metadata)
        reg = self.schema.registry
        sql = reg.update().where(
123
            reg.c.seriename == seriename
124
125
126
        ).values(metadata=metadata)
        cn.execute(sql)

127
128
129
130
131
132
133
134
    def changeset_metadata(self, cn, csid):
        cset = self.schema.changeset
        sql = 'select metadata from "{ns}".changeset where id = {id}'.format(
            ns=self.namespace,
            id=csid
        )
        return cn.execute(sql).scalar()

135
    def get_history(self, cn, seriename,
136
                    from_insertion_date=None,
137
138
                    to_insertion_date=None,
                    from_value_date=None,
139
                    to_value_date=None,
140
                    deltabefore=None,
141
142
                    deltaafter=None,
                    diffmode=False):
143
        table = self._get_ts_table(cn, seriename)
144
145
146
        if table is None:
            return

147
148
149
150
        if deltabefore is not None or deltaafter is not None:
            assert from_value_date is None
            assert to_value_date is None

151
        cset = self.schema.changeset
152
153
154
155
156
157
158
159
160
161
162
163
        revsql = select(
            [cset.c.id, cset.c.insertion_date]
        ).order_by(
            cset.c.id
        ).where(
            table.c.cset == cset.c.id
        )

        if from_insertion_date:
            revsql = revsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            revsql = revsql.where(cset.c.insertion_date <= to_insertion_date)
164

165
166
167
        revs = cn.execute(revsql).fetchall()
        if not revs:
            return
168

169
170
171
172
173
174
        if diffmode:
            # compute the previous serie value
            first_csid = revs[0][0]
            previous_csid = self._previous_cset(cn, seriename, first_csid)
            revs.insert(0, (previous_csid, None))

175
        snapshot = Snapshot(cn, self, seriename)
176
        series = []
177
178
        if (deltabefore, deltaafter) != (None, None):
            for csid, idate in revs:
179
180
181
                from_value_date = None
                to_value_date = None
                if deltabefore is not None:
182
                    from_value_date = idate - deltabefore
183
                if deltaafter is not None:
184
                    to_value_date = idate + deltaafter
185
186
187
188
189
190
191
192
193
194
195
                series.append((
                    idate,
                    snapshot.find(csetfilter=[lambda cset: cset.c.id == csid],
                                  from_value_date=from_value_date,
                                  to_value_date=to_value_date)[1]
                ))
        else:
            series = snapshot.findall(revs,
                                      from_value_date,
                                      to_value_date
            )
196

197
198
199
200
201
202
203
204
205
206
207
        if diffmode:
            diffs = []
            for (revdate_a, serie_a), (revdate_b, serie_b) in zip(series, series[1:]):
                if serie_a is None:
                    # when we scan the entirety of the history: there exists no "previous" serie
                    # we therefore consider the first serie as a diff to the "null" serie
                    diffs.append((revdate_b, serie_b))
                else:
                    diffs.append((revdate_b, self.diff(serie_a, serie_b)))
            series = diffs

208
209
210
211
        return {
            idate: serie
            for idate, serie in series
        }
212

213
214
215
216
217
218
219
    def _previous_cset(self, cn, seriename, csid):
        tablename = self._serie_to_tablename(cn, seriename)
        sql = ('select cset from "{}.timeserie"."{}" '
               'where cset < %(csid)s '
               'order by cset desc limit 1').format(self.namespace, tablename)
        return cn.execute(sql, csid=csid).scalar()

220
221
222
223
    def get_delta(self, cn, seriename, delta,
                  from_value_date=None,
                  to_value_date=None):

224
        histo = self.get_history(
225
            cn, seriename, deltabefore=-delta
226
        )
227
228
229
        for revdate, serie in histo.items():
            inject_in_index(serie, revdate)
        histo = pd.concat([serie for serie in histo.values()])
230
231

        df = histo.reset_index()
232

233
234
235
236
237
238
239
        # df_date is a dataframe with two columns: value_date and insertion_date
        df_date = df.loc[:, ['insertion_date', 'value_date']]

        # now in selected_dates each value_date has only one occurence
        # which is the last inserted
        selected_dates = df_date.groupby('value_date').max().reset_index()

240
        ts = df[seriename]
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
        # ts is built from the df returned from get_history
        # ts index is now a simple index of tuples (insert_date, value_date)
        ts.index = ((row.insertion_date, row.value_date)
                    for row in df.itertuples())
        # in ts, there ie still all the couple value_date * insertion_date
        # We now used the selected_dates to select in ts only
        # the couple (value_date, insertion_date)
        # which corresponds to the last insertion_date
        ts_select = ts[[(row[2], row[1])
                        for row in selected_dates.itertuples()]]

        # ts_select has still a simple index of tuples (value_date, insertion_date)
        new_index = (elt[1] for elt in ts_select.index)

        # we only keep the value_date information from the index
        ts_select.index = new_index
257
        return subset(ts_select, from_value_date, to_value_date)
258
259


260
261
    def exists(self, cn, seriename):
        return self._get_ts_table(cn, seriename) is not None
262

263
    def latest_insertion_date(self, cn, seriename):
264
        cset = self.schema.changeset
265
        tstable = self._get_ts_table(cn, seriename)
266
        sql = select([func.max(cset.c.insertion_date)]
267
        ).where(tstable.c.cset == cset.c.id)
268
        return cn.execute(sql).scalar()
269

270
271
272
273
    def last_id(self, cn, seriename):
        snapshot = Snapshot(cn, self, seriename)
        return snapshot.last_id()

274
275
276
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
        assert mode in ('strict', 'before', 'after')
        cset = self.schema.changeset
277
        table = self._table_definition_for(cn, seriename)
278
279
        sql = select([table.c.cset]).where(
            table.c.cset == cset.c.id
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
        )
        if mode == 'strict':
            sql = sql.where(cset.c.insertion_date == revdate)
        elif mode == 'before':
            sql = sql.where(cset.c.insertion_date <= revdate)
        else:
            sql = sql.where(cset.c.insertion_date >= revdate)
        return cn.execute(sql).scalar()

    def strip(self, cn, seriename, csid):
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs

        # put stripping info in the metadata
        cset = self.schema.changeset
        cset_serie = self.schema.changeset_series
        for log in logs:
            # update changeset.metadata
298
            metadata = self.changeset_metadata(cn, log['rev']) or {}
299
300
301
302
303
304
            metadata['tshistory.info'] = 'got stripped from {}'.format(csid)
            sql = cset.update().where(cset.c.id == log['rev']
            ).values(metadata=metadata)
            cn.execute(sql)
            # delete changset_serie item
            sql = cset_serie.delete().where(
305
                cset_serie.c.cset == log['rev']
306
            ).where(
307
                cset_serie.c.serie == self._name_to_regid(cn, seriename)
308
309
310
311
            )
            cn.execute(sql)

        # wipe the diffs
312
        table = self._table_definition_for(cn, seriename)
313
        cn.execute(table.delete().where(table.c.cset >= csid))
314

315
    def info(self, cn):
316
317
        """Gather global statistics on the current tshistory repository
        """
318
        sql = 'select count(*) from {}.registry'.format(self.namespace)
319
        stats = {'series count': cn.execute(sql).scalar()}
320
        sql = 'select max(id) from {}.changeset'.format(self.namespace)
321
        stats['changeset count'] = cn.execute(sql).scalar()
322
        sql = 'select distinct name from {}.registry order by name'.format(self.namespace)
323
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
324
325
        return stats

326
    def log(self, cn, limit=0, names=None, authors=None,
327
            stripped=False,
328
329
            fromrev=None, torev=None,
            fromdate=None, todate=None):
330
331
332
333
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
334
335
336
337
338
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
339

340
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date, cset.c.metadata]
341
        ).distinct().order_by(desc(cset.c.id))
342
343
344

        if limit:
            sql = sql.limit(limit)
345
        if names:
346
            sql = sql.where(reg.c.seriename.in_(names))
347
348
        if authors:
            sql = sql.where(cset.c.author.in_(authors))
349
350
351
352
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)
        if torev:
            sql = sql.where(cset.c.id <= torev)
353
354
355
356
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)
        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)
357
358
359
360
        if stripped:
            # outerjoin to show dead things
            sql = sql.select_from(cset.outerjoin(cset_series))
        else:
361
            sql = sql.where(cset.c.id == cset_series.c.cset
362
            ).where(cset_series.c.serie == reg.c.id)
363

364
        rset = cn.execute(sql)
365
        for csetid, author, revdate, meta in rset.fetchall():
366
367
            log.append({'rev': csetid, 'author': author,
                        'date': pd.Timestamp(revdate, tz='utc'),
368
                        'meta': meta or {},
369
                        'names': self._changeset_series(cn, csetid)})
370

371
        log.sort(key=lambda rev: rev['rev'])
372
373
        return log

374
375
    # /API
    # Helpers
376

377
378
    # creation / update

379
    def _create(self, cn, newts, seriename, author,
380
                metadata=None, insertion_date=None):
381
382
383
        # initial insertion
        if len(newts) == 0:
            return None
384
        self._register_serie(cn, seriename, newts)
385
        snapshot = Snapshot(cn, self, seriename)
386
        csid = self._newchangeset(cn, author, insertion_date, metadata)
387
388
389
390
391
        head = snapshot.create(newts)
        value = {
            'cset': csid,
            'snapshot': head
        }
392
        table = self._make_ts_table(cn, seriename, newts)
393
        cn.execute(table.insert().values(value))
394
        self._finalize_insertion(cn, csid, seriename)
395
        L.info('first insertion of %s (size=%s) by %s',
396
               seriename, len(newts), author)
397
398
        return newts

399
    def _update(self, cn, table, newts, seriename, author,
400
                metadata=None, insertion_date=None):
401
402
        self._validate(cn, newts, seriename)
        snapshot = Snapshot(cn, self, seriename)
403
404
405
        diff = self.diff(snapshot.last(newts.index.min(),
                                       newts.index.max()),
                         newts)
406
407
        if not len(diff):
            L.info('no difference in %s by %s (for ts of size %s)',
408
                   seriename, author, len(newts))
409
410
            return

411
        csid = self._newchangeset(cn, author, insertion_date, metadata)
412
413
414
415
416
417
        head = snapshot.update(diff)
        value = {
            'cset': csid,
            'snapshot': head
        }
        cn.execute(table.insert().values(value))
418
        self._finalize_insertion(cn, csid, seriename)
419
420

        L.info('inserted diff (size=%s) for ts %s by %s',
421
               len(diff), seriename, author)
422
423
        return diff

424
    # serie table handling
425

426
427
428
429
    def _serie_to_tablename(self, cn, seriename):
        tablename = self.serie_tablename.get(seriename)
        if tablename is not None:
            return tablename
430

431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
        reg = self.schema.registry
        sql = select([reg.c.table_name]).where(reg.c.seriename == seriename)
        tablename = cn.execute(sql).scalar()
        if tablename is None:
            # creation time
            return
        self.serie_tablename[seriename] = tablename
        return tablename

    def _table_definition_for(self, cn, seriename):
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            # creation time
            tablename = self._make_tablename(seriename)
        fq_tablename = '{}.timeserie.{}'.format(self.namespace, tablename)
        table = TABLES.get(fq_tablename)
447
        if table is None:
448
449
            TABLES[fq_tablename] = table = Table(
                tablename, self.schema.meta,
450
451
452
453
454
455
456
                Column('id', Integer, primary_key=True),
                Column('cset', Integer,
                       ForeignKey('{}.changeset.id'.format(self.namespace)),
                       index=True, nullable=False),
                Column('snapshot', Integer,
                       ForeignKey('{}.snapshot.{}.id'.format(
                           self.namespace,
457
                           tablename)),
458
459
460
461
462
                       index=True),
                schema='{}.timeserie'.format(self.namespace),
                extend_existing=True
            )
        return table
463

464
    def _make_ts_table(self, cn, seriename, ts):
465
        table = self._table_definition_for(cn, seriename)
466
        table.create(cn)
467
468
469
        return table

    def _register_serie(self, cn, seriename, ts):
470
471
        index = ts.index
        inames = [name for name in index.names if name]
472
        sql = self.schema.registry.insert().values(
473
474
            seriename=seriename,
            table_name=self._make_tablename(seriename),
475
476
477
478
479
480
            metadata={
                'tzaware': tzaware_serie(ts),
                'index_type': index.dtype.name,
                'index_names': inames,
                'value_type': ts.dtypes.name
            },
481
        )
482
483
        regid = cn.execute(sql).inserted_primary_key[0]
        self.registry_map[seriename] = regid
484

485
    def _get_ts_table(self, cn, seriename):
486
487
488
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename:
            return self._table_definition_for(cn, tablename)
489

490
491
    # changeset handling

492
    def _newchangeset(self, cn, author, insertion_date=None, metadata=None):
493
        table = self.schema.changeset
494
495
496
        if insertion_date is not None:
            assert insertion_date.tzinfo is not None
        idate = pd.Timestamp(insertion_date or datetime.utcnow(), tz='UTC')
497
498
        sql = table.insert().values(
            author=author,
499
            metadata=metadata,
500
            insertion_date=idate)
501
        return cn.execute(sql).inserted_primary_key[0]
502

503
    def _changeset_series(self, cn, csid):
504
        cset_serie = self.schema.changeset_series
505
506
        reg = self.schema.registry
        sql = select(
507
            [reg.c.seriename]
508
509
        ).where(cset_serie.c.cset == csid
        ).where(cset_serie.c.serie == reg.c.id)
510

511
        return [
512
            row.seriename
513
514
            for row in cn.execute(sql).fetchall()
        ]
515
516
517

    # insertion handling

518
    def _validate(self, cn, ts, seriename):
519
520
        if ts.isnull().all():
            # ts erasure
521
            return
522
        tstype = ts.dtype
523
        meta = self.metadata(cn, seriename)
524
        if tstype != meta['value_type']:
525
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
526
                seriename, tstype, meta['value_type'])
527
            raise Exception(m)
528
        if ts.index.dtype.name != meta['index_type']:
529
            raise Exception('Incompatible index types')
530

531
532
    def _name_to_regid(self, cn, seriename):
        regid = self.registry_map.get(seriename)
533
534
535
536
        if regid is not None:
            return regid

        registry = self.schema.registry
537
        sql = select([registry.c.id]).where(registry.c.seriename == seriename)
538
        regid = self.registry_map[seriename] = cn.execute(sql).scalar()
539
540
        return regid

541
    def _finalize_insertion(self, cn, csid, seriename):
542
543
544
        table = self.schema.changeset_series
        sql = table.insert().values(
            cset=csid,
545
            serie=self._name_to_regid(cn, seriename)
546
547
        )
        cn.execute(sql)
548
549
550
551
552
553
554

    # don't use this

    def resetcaches(self):
        self.metadatacache.clear()
        self.registry_map.clear()
        self.serie_tablename.clear()