tsio.py 20.1 KB
Newer Older
1
from datetime import datetime
2
from contextlib import contextmanager
3
import logging
4
import hashlib
5
6
7

import pandas as pd

8
from sqlalchemy import Table, Column, Integer, ForeignKey
Aurélien Campéas's avatar
Aurélien Campéas committed
9
from sqlalchemy.sql.expression import select, func, desc
10
from sqlalchemy.dialects.postgresql import BYTEA
11

12
from tshistory.schema import tsschema
13
14
from tshistory.util import (
    inject_in_index,
15
    num2float,
16
    subset,
17
    SeriesServices,
18
19
    tzaware_serie
)
20
from tshistory.snapshot import Snapshot
21

22
L = logging.getLogger('tshistory.tsio')
23
TABLES = {}
24
25


26
class TimeSerie(SeriesServices):
27
    namespace = 'tsh'
28
    schema = None
29
    metadatacache = None
30
    registry_map = None
31
    serie_tablename = None
32
33
34

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
35
36
        self.schema = tsschema(namespace)
        self.schema.define()
37
        self.metadatacache = {}
38
        self.registry_map = {}
39
        self.serie_tablename = {}
40

41
    def insert(self, cn, newts, seriename, author,
42
43
               metadata=None,
               _insertion_date=None):
44
        """Create a new revision of a given time series
45

46
        newts: pandas.Series with date index
47
        seriename: str unique identifier of the serie
48
        author: str free-form author name
49
        metadata: optional dict for changeset metadata
50
51
        """
        assert isinstance(newts, pd.Series)
52
        assert isinstance(seriename, str)
Aurélien Campéas's avatar
Aurélien Campéas committed
53
        assert isinstance(author, str)
54
        assert metadata is None or isinstance(metadata, dict)
Aurélien Campéas's avatar
Aurélien Campéas committed
55
        assert _insertion_date is None or isinstance(_insertion_date, datetime)
56
        assert not newts.index.duplicated().any()
57

58
        newts = num2float(newts)
59

60
        if not len(newts):
61
            return
62

63
        assert ('<M8[ns]' == newts.index.dtype or
64
                'datetime' in str(newts.index.dtype) and not
65
66
                isinstance(newts.index, pd.MultiIndex))

67
68
        newts.name = seriename
        table = self._get_ts_table(cn, seriename)
69

70
        if table is None:
71
            return self._create(cn, newts, seriename, author,
72
                                metadata, _insertion_date)
73

74
        return self._update(cn, table, newts, seriename, author,
75
                            metadata, _insertion_date)
76

77
    def get(self, cn, seriename, revision_date=None,
78
79
            from_value_date=None, to_value_date=None,
            _keep_nans=False):
80
81
82
83
84
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

85
        """
86
        table = self._get_ts_table(cn, seriename)
87
88
        if table is None:
            return
89

90
        csetfilter = []
91
        if revision_date:
92
            csetfilter.append(lambda cset: cset.c.insertion_date <= revision_date)
93
        snap = Snapshot(cn, self, seriename)
94
        _, current = snap.find(csetfilter=csetfilter,
95
96
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)
97

98
        if current is not None and not _keep_nans:
99
            current.name = seriename
100
            current = current[~current.isnull()]
101
        return current
102

103
    def metadata(self, cn, seriename):
104
        """Return metadata dict of timeserie."""
105
106
        if seriename in self.metadatacache:
            return self.metadatacache[seriename]
107
108
        reg = self.schema.registry
        sql = select([reg.c.metadata]).where(
109
            reg.c.seriename == seriename
110
111
        )
        meta = cn.execute(sql).scalar()
112
        self.metadatacache[seriename] = meta
113
114
        return meta

115
    def update_metadata(self, cn, seriename, metadata, internal=False):
116
        assert isinstance(metadata, dict)
117
        meta = self.metadata(cn, seriename)
118
119
120
121
122
        if not internal:
            assert set(meta.keys()).intersection(metadata.keys()) == set()
        meta.update(metadata)
        reg = self.schema.registry
        sql = reg.update().where(
123
            reg.c.seriename == seriename
124
125
126
        ).values(metadata=metadata)
        cn.execute(sql)

127
128
129
130
131
132
133
134
    def changeset_metadata(self, cn, csid):
        cset = self.schema.changeset
        sql = 'select metadata from "{ns}".changeset where id = {id}'.format(
            ns=self.namespace,
            id=csid
        )
        return cn.execute(sql).scalar()

135
    def get_history(self, cn, seriename,
136
                    from_insertion_date=None,
137
138
                    to_insertion_date=None,
                    from_value_date=None,
139
                    to_value_date=None,
140
                    deltabefore=None,
141
142
                    deltaafter=None,
                    diffmode=False):
143
        table = self._get_ts_table(cn, seriename)
144
145
146
        if table is None:
            return

147
148
149
150
        if deltabefore is not None or deltaafter is not None:
            assert from_value_date is None
            assert to_value_date is None

151
        cset = self.schema.changeset
152
153
154
155
156
157
158
159
160
161
162
163
        revsql = select(
            [cset.c.id, cset.c.insertion_date]
        ).order_by(
            cset.c.id
        ).where(
            table.c.cset == cset.c.id
        )

        if from_insertion_date:
            revsql = revsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            revsql = revsql.where(cset.c.insertion_date <= to_insertion_date)
164

165
166
167
        revs = cn.execute(revsql).fetchall()
        if not revs:
            return
168

169
170
171
172
173
174
        if diffmode:
            # compute the previous serie value
            first_csid = revs[0][0]
            previous_csid = self._previous_cset(cn, seriename, first_csid)
            revs.insert(0, (previous_csid, None))

175
        snapshot = Snapshot(cn, self, seriename)
176
        series = []
177
178
        if (deltabefore, deltaafter) != (None, None):
            for csid, idate in revs:
179
180
181
                from_value_date = None
                to_value_date = None
                if deltabefore is not None:
182
                    from_value_date = idate - deltabefore
183
                if deltaafter is not None:
184
                    to_value_date = idate + deltaafter
185
186
187
188
189
190
191
192
193
194
195
                series.append((
                    idate,
                    snapshot.find(csetfilter=[lambda cset: cset.c.id == csid],
                                  from_value_date=from_value_date,
                                  to_value_date=to_value_date)[1]
                ))
        else:
            series = snapshot.findall(revs,
                                      from_value_date,
                                      to_value_date
            )
196

197
198
199
200
201
202
203
204
205
206
207
        if diffmode:
            diffs = []
            for (revdate_a, serie_a), (revdate_b, serie_b) in zip(series, series[1:]):
                if serie_a is None:
                    # when we scan the entirety of the history: there exists no "previous" serie
                    # we therefore consider the first serie as a diff to the "null" serie
                    diffs.append((revdate_b, serie_b))
                else:
                    diffs.append((revdate_b, self.diff(serie_a, serie_b)))
            series = diffs

208
209
210
211
        return {
            idate: serie
            for idate, serie in series
        }
212

213
214
215
216
    def get_delta(self, cn, seriename, delta,
                  from_value_date=None,
                  to_value_date=None):

217
        histo = self.get_history(
218
            cn, seriename, deltabefore=-delta
219
        )
220
221
222
        for revdate, serie in histo.items():
            inject_in_index(serie, revdate)
        histo = pd.concat([serie for serie in histo.values()])
223
224

        df = histo.reset_index()
225

226
227
228
229
230
231
232
        # df_date is a dataframe with two columns: value_date and insertion_date
        df_date = df.loc[:, ['insertion_date', 'value_date']]

        # now in selected_dates each value_date has only one occurence
        # which is the last inserted
        selected_dates = df_date.groupby('value_date').max().reset_index()

233
        ts = df[seriename]
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
        # ts is built from the df returned from get_history
        # ts index is now a simple index of tuples (insert_date, value_date)
        ts.index = ((row.insertion_date, row.value_date)
                    for row in df.itertuples())
        # in ts, there ie still all the couple value_date * insertion_date
        # We now used the selected_dates to select in ts only
        # the couple (value_date, insertion_date)
        # which corresponds to the last insertion_date
        ts_select = ts[[(row[2], row[1])
                        for row in selected_dates.itertuples()]]

        # ts_select has still a simple index of tuples (value_date, insertion_date)
        new_index = (elt[1] for elt in ts_select.index)

        # we only keep the value_date information from the index
        ts_select.index = new_index
250
        return subset(ts_select, from_value_date, to_value_date)
251
252


253
254
    def exists(self, cn, seriename):
        return self._get_ts_table(cn, seriename) is not None
255

256
    def latest_insertion_date(self, cn, seriename):
257
        cset = self.schema.changeset
258
        tstable = self._get_ts_table(cn, seriename)
259
        sql = select([func.max(cset.c.insertion_date)]
260
        ).where(tstable.c.cset == cset.c.id)
261
        return cn.execute(sql).scalar()
262

263
264
265
266
    def last_id(self, cn, seriename):
        snapshot = Snapshot(cn, self, seriename)
        return snapshot.last_id()

267
268
269
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
        assert mode in ('strict', 'before', 'after')
        cset = self.schema.changeset
270
        table = self._table_definition_for(cn, seriename)
271
272
        sql = select([table.c.cset]).where(
            table.c.cset == cset.c.id
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
        )
        if mode == 'strict':
            sql = sql.where(cset.c.insertion_date == revdate)
        elif mode == 'before':
            sql = sql.where(cset.c.insertion_date <= revdate)
        else:
            sql = sql.where(cset.c.insertion_date >= revdate)
        return cn.execute(sql).scalar()

    def strip(self, cn, seriename, csid):
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs

        # put stripping info in the metadata
        cset = self.schema.changeset
        cset_serie = self.schema.changeset_series
        for log in logs:
            # update changeset.metadata
291
            metadata = self.changeset_metadata(cn, log['rev']) or {}
292
293
294
295
296
297
            metadata['tshistory.info'] = 'got stripped from {}'.format(csid)
            sql = cset.update().where(cset.c.id == log['rev']
            ).values(metadata=metadata)
            cn.execute(sql)
            # delete changset_serie item
            sql = cset_serie.delete().where(
298
                cset_serie.c.cset == log['rev']
299
            ).where(
300
                cset_serie.c.serie == self._name_to_regid(cn, seriename)
301
302
303
304
            )
            cn.execute(sql)

        # wipe the diffs
305
        table = self._table_definition_for(cn, seriename)
306
        cn.execute(table.delete().where(table.c.cset >= csid))
307

308
    def info(self, cn):
309
310
        """Gather global statistics on the current tshistory repository
        """
311
        sql = 'select count(*) from {}.registry'.format(self.namespace)
312
        stats = {'series count': cn.execute(sql).scalar()}
313
        sql = 'select max(id) from {}.changeset'.format(self.namespace)
314
        stats['changeset count'] = cn.execute(sql).scalar()
315
        sql = 'select distinct name from {}.registry order by name'.format(self.namespace)
316
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
317
318
        return stats

319
    def log(self, cn, limit=0, names=None, authors=None,
320
            stripped=False,
321
322
            fromrev=None, torev=None,
            fromdate=None, todate=None):
323
324
325
326
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
327
328
329
330
331
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
332

333
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date, cset.c.metadata]
334
        ).distinct().order_by(desc(cset.c.id))
335
336
337

        if limit:
            sql = sql.limit(limit)
338
        if names:
339
            sql = sql.where(reg.c.seriename.in_(names))
340
341
        if authors:
            sql = sql.where(cset.c.author.in_(authors))
342
343
344
345
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)
        if torev:
            sql = sql.where(cset.c.id <= torev)
346
347
348
349
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)
        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)
350
351
352
353
        if stripped:
            # outerjoin to show dead things
            sql = sql.select_from(cset.outerjoin(cset_series))
        else:
354
            sql = sql.where(cset.c.id == cset_series.c.cset
355
            ).where(cset_series.c.serie == reg.c.id)
356

357
        rset = cn.execute(sql)
358
        for csetid, author, revdate, meta in rset.fetchall():
359
360
            log.append({'rev': csetid, 'author': author,
                        'date': pd.Timestamp(revdate, tz='utc'),
361
                        'meta': meta or {},
362
                        'names': self._changeset_series(cn, csetid)})
363

364
        log.sort(key=lambda rev: rev['rev'])
365
366
        return log

367
368
    # /API
    # Helpers
369

370
371
    # creation / update

372
    def _create(self, cn, newts, seriename, author,
373
                metadata=None, insertion_date=None):
374
375
376
        # initial insertion
        if len(newts) == 0:
            return None
377
        self._register_serie(cn, seriename, newts)
378
        snapshot = Snapshot(cn, self, seriename)
379
        csid = self._newchangeset(cn, author, insertion_date, metadata)
380
381
382
383
384
        head = snapshot.create(newts)
        value = {
            'cset': csid,
            'snapshot': head
        }
385
        table = self._make_ts_table(cn, seriename, newts)
386
        cn.execute(table.insert().values(value))
387
        self._finalize_insertion(cn, csid, seriename)
388
        L.info('first insertion of %s (size=%s) by %s',
389
               seriename, len(newts), author)
390
391
        return newts

392
    def _update(self, cn, table, newts, seriename, author,
393
                metadata=None, insertion_date=None):
394
395
        self._validate(cn, newts, seriename)
        snapshot = Snapshot(cn, self, seriename)
396
397
398
        diff = self.diff(snapshot.last(newts.index.min(),
                                       newts.index.max()),
                         newts)
399
400
        if not len(diff):
            L.info('no difference in %s by %s (for ts of size %s)',
401
                   seriename, author, len(newts))
402
403
            return

404
        csid = self._newchangeset(cn, author, insertion_date, metadata)
405
406
407
408
409
410
        head = snapshot.update(diff)
        value = {
            'cset': csid,
            'snapshot': head
        }
        cn.execute(table.insert().values(value))
411
        self._finalize_insertion(cn, csid, seriename)
412
413

        L.info('inserted diff (size=%s) for ts %s by %s',
414
               len(diff), seriename, author)
415
416
        return diff

417
    # serie table handling
418

419
420
421
422
    def _serie_to_tablename(self, cn, seriename):
        tablename = self.serie_tablename.get(seriename)
        if tablename is not None:
            return tablename
423

424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
        reg = self.schema.registry
        sql = select([reg.c.table_name]).where(reg.c.seriename == seriename)
        tablename = cn.execute(sql).scalar()
        if tablename is None:
            # creation time
            return
        self.serie_tablename[seriename] = tablename
        return tablename

    def _table_definition_for(self, cn, seriename):
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            # creation time
            tablename = self._make_tablename(seriename)
        fq_tablename = '{}.timeserie.{}'.format(self.namespace, tablename)
        table = TABLES.get(fq_tablename)
440
        if table is None:
441
442
            TABLES[fq_tablename] = table = Table(
                tablename, self.schema.meta,
443
444
445
446
447
448
449
                Column('id', Integer, primary_key=True),
                Column('cset', Integer,
                       ForeignKey('{}.changeset.id'.format(self.namespace)),
                       index=True, nullable=False),
                Column('snapshot', Integer,
                       ForeignKey('{}.snapshot.{}.id'.format(
                           self.namespace,
450
                           tablename)),
451
452
453
454
455
                       index=True),
                schema='{}.timeserie'.format(self.namespace),
                extend_existing=True
            )
        return table
456

457
    def _make_ts_table(self, cn, seriename, ts):
458
        table = self._table_definition_for(cn, seriename)
459
        table.create(cn)
460
461
462
        return table

    def _register_serie(self, cn, seriename, ts):
463
464
        index = ts.index
        inames = [name for name in index.names if name]
465
        sql = self.schema.registry.insert().values(
466
467
            seriename=seriename,
            table_name=self._make_tablename(seriename),
468
469
470
471
472
473
            metadata={
                'tzaware': tzaware_serie(ts),
                'index_type': index.dtype.name,
                'index_names': inames,
                'value_type': ts.dtypes.name
            },
474
        )
475
476
        regid = cn.execute(sql).inserted_primary_key[0]
        self.registry_map[seriename] = regid
477

478
    def _get_ts_table(self, cn, seriename):
479
480
481
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename:
            return self._table_definition_for(cn, tablename)
482

483
484
    # changeset handling

485
    def _newchangeset(self, cn, author, insertion_date=None, metadata=None):
486
        table = self.schema.changeset
487
488
489
        if insertion_date is not None:
            assert insertion_date.tzinfo is not None
        idate = pd.Timestamp(insertion_date or datetime.utcnow(), tz='UTC')
490
491
        sql = table.insert().values(
            author=author,
492
            metadata=metadata,
493
            insertion_date=idate)
494
        return cn.execute(sql).inserted_primary_key[0]
495

496
    def _changeset_series(self, cn, csid):
497
        cset_serie = self.schema.changeset_series
498
499
        reg = self.schema.registry
        sql = select(
500
            [reg.c.seriename]
501
502
        ).where(cset_serie.c.cset == csid
        ).where(cset_serie.c.serie == reg.c.id)
503

504
        return [
505
            row.seriename
506
507
            for row in cn.execute(sql).fetchall()
        ]
508

509
510
511
512
513
514
515
    def _previous_cset(self, cn, seriename, csid):
        tablename = self._serie_to_tablename(cn, seriename)
        sql = ('select cset from "{}.timeserie"."{}" '
               'where cset < %(csid)s '
               'order by cset desc limit 1').format(self.namespace, tablename)
        return cn.execute(sql, csid=csid).scalar()

516
517
    # insertion handling

518
    def _validate(self, cn, ts, seriename):
519
520
        if ts.isnull().all():
            # ts erasure
521
            return
522
        tstype = ts.dtype
523
        meta = self.metadata(cn, seriename)
524
        if tstype != meta['value_type']:
525
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
526
                seriename, tstype, meta['value_type'])
527
            raise Exception(m)
528
        if ts.index.dtype.name != meta['index_type']:
529
            raise Exception('Incompatible index types')
530

531
532
    def _name_to_regid(self, cn, seriename):
        regid = self.registry_map.get(seriename)
533
534
535
536
        if regid is not None:
            return regid

        registry = self.schema.registry
537
        sql = select([registry.c.id]).where(registry.c.seriename == seriename)
538
        regid = self.registry_map[seriename] = cn.execute(sql).scalar()
539
540
        return regid

541
    def _finalize_insertion(self, cn, csid, seriename):
542
543
544
        table = self.schema.changeset_series
        sql = table.insert().values(
            cset=csid,
545
            serie=self._name_to_regid(cn, seriename)
546
547
        )
        cn.execute(sql)
548
549
550
551
552
553
554

    # don't use this

    def resetcaches(self):
        self.metadatacache.clear()
        self.registry_map.clear()
        self.serie_tablename.clear()