tsio.py 19 KB
Newer Older
1
from datetime import datetime
2
from contextlib import contextmanager
3
import logging
4
import hashlib
5
6
7

import pandas as pd

8
from sqlalchemy import Table, Column, Integer, ForeignKey
Aurélien Campéas's avatar
Aurélien Campéas committed
9
from sqlalchemy.sql.expression import select, func, desc
10
from sqlalchemy.dialects.postgresql import BYTEA
11

12
from tshistory.schema import tsschema
13
14
from tshistory.util import (
    inject_in_index,
15
    num2float,
16
    subset,
17
    SeriesServices,
18
19
    tzaware_serie
)
20
from tshistory.snapshot import Snapshot
21

22
L = logging.getLogger('tshistory.tsio')
23
TABLES = {}
24
25


26
class TimeSerie(SeriesServices):
27
    namespace = 'tsh'
28
    schema = None
29
    metadatacache = None
30
    registry_map = None
31
    serie_tablename = None
32
33
34

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
35
36
        self.schema = tsschema(namespace)
        self.schema.define()
37
        self.metadatacache = {}
38
        self.registry_map = {}
39
        self.serie_tablename = {}
40

41
    def insert(self, cn, newts, seriename, author,
42
43
               metadata=None,
               _insertion_date=None):
44
        """Create a new revision of a given time series
45

46
        newts: pandas.Series with date index
47
        seriename: str unique identifier of the serie
48
        author: str free-form author name
49
        metadata: optional dict for changeset metadata
50
51
        """
        assert isinstance(newts, pd.Series)
52
        assert isinstance(seriename, str)
Aurélien Campéas's avatar
Aurélien Campéas committed
53
        assert isinstance(author, str)
54
        assert metadata is None or isinstance(metadata, dict)
Aurélien Campéas's avatar
Aurélien Campéas committed
55
        assert _insertion_date is None or isinstance(_insertion_date, datetime)
56
        assert not newts.index.duplicated().any()
57

58
        newts = num2float(newts)
59

60
        if not len(newts):
61
            return
62

63
        assert ('<M8[ns]' == newts.index.dtype or
64
                'datetime' in str(newts.index.dtype) and not
65
66
                isinstance(newts.index, pd.MultiIndex))

67
68
        newts.name = seriename
        table = self._get_ts_table(cn, seriename)
69

70
        if table is None:
71
            return self._create(cn, newts, seriename, author,
72
                                metadata, _insertion_date)
73

74
        return self._update(cn, table, newts, seriename, author,
75
                            metadata, _insertion_date)
76

77
    def get(self, cn, seriename, revision_date=None,
78
79
            from_value_date=None, to_value_date=None,
            _keep_nans=False):
80
81
82
83
84
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

85
        """
86
        table = self._get_ts_table(cn, seriename)
87
88
        if table is None:
            return
89

90
        csetfilter = []
91
        if revision_date:
92
            csetfilter.append(lambda cset: cset.c.insertion_date <= revision_date)
93
        snap = Snapshot(cn, self, seriename)
94
        _, current = snap.find(csetfilter=csetfilter,
95
96
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)
97

98
        if current is not None and not _keep_nans:
99
            current.name = seriename
100
            current = current[~current.isnull()]
101
        return current
102

103
    def metadata(self, cn, seriename):
104
        """Return metadata dict of timeserie."""
105
106
        if seriename in self.metadatacache:
            return self.metadatacache[seriename]
107
108
        reg = self.schema.registry
        sql = select([reg.c.metadata]).where(
109
            reg.c.seriename == seriename
110
111
        )
        meta = cn.execute(sql).scalar()
112
        self.metadatacache[seriename] = meta
113
114
        return meta

115
    def update_metadata(self, cn, seriename, metadata, internal=False):
116
        assert isinstance(metadata, dict)
117
        meta = self.metadata(cn, seriename)
118
119
120
121
122
        if not internal:
            assert set(meta.keys()).intersection(metadata.keys()) == set()
        meta.update(metadata)
        reg = self.schema.registry
        sql = reg.update().where(
123
            reg.c.seriename == seriename
124
125
126
        ).values(metadata=metadata)
        cn.execute(sql)

127
128
129
130
131
132
133
134
    def changeset_metadata(self, cn, csid):
        cset = self.schema.changeset
        sql = 'select metadata from "{ns}".changeset where id = {id}'.format(
            ns=self.namespace,
            id=csid
        )
        return cn.execute(sql).scalar()

135
    def get_history(self, cn, seriename,
136
                    from_insertion_date=None,
137
138
                    to_insertion_date=None,
                    from_value_date=None,
139
                    to_value_date=None,
140
                    deltabefore=None,
141
                    deltaafter=None):
142
        table = self._get_ts_table(cn, seriename)
143
144
145
        if table is None:
            return

146
147
148
149
        if deltabefore is not None or deltaafter is not None:
            assert from_value_date is None
            assert to_value_date is None

150
        cset = self.schema.changeset
151
152
153
154
155
156
157
158
159
160
161
162
        revsql = select(
            [cset.c.id, cset.c.insertion_date]
        ).order_by(
            cset.c.id
        ).where(
            table.c.cset == cset.c.id
        )

        if from_insertion_date:
            revsql = revsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            revsql = revsql.where(cset.c.insertion_date <= to_insertion_date)
163

164
165
166
        revs = cn.execute(revsql).fetchall()
        if not revs:
            return
167

168
        snapshot = Snapshot(cn, self, seriename)
169
170
        series = []
        for csid, idate in revs:
171
172
173
174
            if (deltabefore, deltaafter) != (None, None):
                from_value_date = None
                to_value_date = None
                if deltabefore is not None:
175
                    from_value_date = idate - deltabefore
176
                if deltaafter is not None:
177
                    to_value_date = idate + deltaafter
178
179
            series.append((
                idate,
180
181
182
                snapshot.find(csetfilter=[lambda cset: cset.c.id == csid],
                              from_value_date=from_value_date,
                              to_value_date=to_value_date)[1]
183
            ))
184
185

        for revdate, serie in series:
186
            inject_in_index(serie, revdate)
187
188

        serie = pd.concat([serie for revdate_, serie in series])
189
        serie.name = seriename
190
        return serie
191

192
193
194
195
196
197
198
    def _previous_cset(self, cn, seriename, csid):
        tablename = self._serie_to_tablename(cn, seriename)
        sql = ('select cset from "{}.timeserie"."{}" '
               'where cset < %(csid)s '
               'order by cset desc limit 1').format(self.namespace, tablename)
        return cn.execute(sql, csid=csid).scalar()

199
200
201
202
    def get_delta(self, cn, seriename, delta,
                  from_value_date=None,
                  to_value_date=None):

203
        histo = self.get_history(
204
            cn, seriename, deltabefore=-delta
205
206
207
        )

        df = histo.reset_index()
208

209
210
211
212
213
214
215
        # df_date is a dataframe with two columns: value_date and insertion_date
        df_date = df.loc[:, ['insertion_date', 'value_date']]

        # now in selected_dates each value_date has only one occurence
        # which is the last inserted
        selected_dates = df_date.groupby('value_date').max().reset_index()

216
        ts = df[seriename]
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
        # ts is built from the df returned from get_history
        # ts index is now a simple index of tuples (insert_date, value_date)
        ts.index = ((row.insertion_date, row.value_date)
                    for row in df.itertuples())
        # in ts, there ie still all the couple value_date * insertion_date
        # We now used the selected_dates to select in ts only
        # the couple (value_date, insertion_date)
        # which corresponds to the last insertion_date
        ts_select = ts[[(row[2], row[1])
                        for row in selected_dates.itertuples()]]

        # ts_select has still a simple index of tuples (value_date, insertion_date)
        new_index = (elt[1] for elt in ts_select.index)

        # we only keep the value_date information from the index
        ts_select.index = new_index
233
        return subset(ts_select, from_value_date, to_value_date)
234
235


236
237
    def exists(self, cn, seriename):
        return self._get_ts_table(cn, seriename) is not None
238

239
    def latest_insertion_date(self, cn, seriename):
240
        cset = self.schema.changeset
241
        tstable = self._get_ts_table(cn, seriename)
242
        sql = select([func.max(cset.c.insertion_date)]
243
        ).where(tstable.c.cset == cset.c.id)
244
        return cn.execute(sql).scalar()
245

246
247
248
249
    def last_id(self, cn, seriename):
        snapshot = Snapshot(cn, self, seriename)
        return snapshot.last_id()

250
251
252
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
        assert mode in ('strict', 'before', 'after')
        cset = self.schema.changeset
253
        table = self._table_definition_for(cn, seriename)
254
255
        sql = select([table.c.cset]).where(
            table.c.cset == cset.c.id
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
        )
        if mode == 'strict':
            sql = sql.where(cset.c.insertion_date == revdate)
        elif mode == 'before':
            sql = sql.where(cset.c.insertion_date <= revdate)
        else:
            sql = sql.where(cset.c.insertion_date >= revdate)
        return cn.execute(sql).scalar()

    def strip(self, cn, seriename, csid):
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs

        # put stripping info in the metadata
        cset = self.schema.changeset
        cset_serie = self.schema.changeset_series
        for log in logs:
            # update changeset.metadata
274
            metadata = self.changeset_metadata(cn, log['rev']) or {}
275
276
277
278
279
280
            metadata['tshistory.info'] = 'got stripped from {}'.format(csid)
            sql = cset.update().where(cset.c.id == log['rev']
            ).values(metadata=metadata)
            cn.execute(sql)
            # delete changset_serie item
            sql = cset_serie.delete().where(
281
                cset_serie.c.cset == log['rev']
282
            ).where(
283
                cset_serie.c.serie == self._name_to_regid(cn, seriename)
284
285
286
287
            )
            cn.execute(sql)

        # wipe the diffs
288
        table = self._table_definition_for(cn, seriename)
289
        cn.execute(table.delete().where(table.c.cset >= csid))
290

291
    def info(self, cn):
292
293
        """Gather global statistics on the current tshistory repository
        """
294
        sql = 'select count(*) from {}.registry'.format(self.namespace)
295
        stats = {'series count': cn.execute(sql).scalar()}
296
        sql = 'select max(id) from {}.changeset'.format(self.namespace)
297
        stats['changeset count'] = cn.execute(sql).scalar()
298
        sql = 'select distinct name from {}.registry order by name'.format(self.namespace)
299
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
300
301
        return stats

302
    def log(self, cn, limit=0, names=None, authors=None,
303
            stripped=False,
304
305
            fromrev=None, torev=None,
            fromdate=None, todate=None):
306
307
308
309
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
310
311
312
313
314
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
315

316
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date, cset.c.metadata]
317
        ).distinct().order_by(desc(cset.c.id))
318
319
320

        if limit:
            sql = sql.limit(limit)
321
        if names:
322
            sql = sql.where(reg.c.seriename.in_(names))
323
324
        if authors:
            sql = sql.where(cset.c.author.in_(authors))
325
326
327
328
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)
        if torev:
            sql = sql.where(cset.c.id <= torev)
329
330
331
332
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)
        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)
333
334
335
336
        if stripped:
            # outerjoin to show dead things
            sql = sql.select_from(cset.outerjoin(cset_series))
        else:
337
            sql = sql.where(cset.c.id == cset_series.c.cset
338
            ).where(cset_series.c.serie == reg.c.id)
339

340
        rset = cn.execute(sql)
341
        for csetid, author, revdate, meta in rset.fetchall():
342
343
            log.append({'rev': csetid, 'author': author,
                        'date': pd.Timestamp(revdate, tz='utc'),
344
                        'meta': meta or {},
345
                        'names': self._changeset_series(cn, csetid)})
346

347
        log.sort(key=lambda rev: rev['rev'])
348
349
        return log

350
351
    # /API
    # Helpers
352

353
354
    # creation / update

355
    def _create(self, cn, newts, seriename, author,
356
                metadata=None, insertion_date=None):
357
358
359
        # initial insertion
        if len(newts) == 0:
            return None
360
        self._register_serie(cn, seriename, newts)
361
        snapshot = Snapshot(cn, self, seriename)
362
        csid = self._newchangeset(cn, author, insertion_date, metadata)
363
364
365
366
367
        head = snapshot.create(newts)
        value = {
            'cset': csid,
            'snapshot': head
        }
368
        table = self._make_ts_table(cn, seriename, newts)
369
        cn.execute(table.insert().values(value))
370
        self._finalize_insertion(cn, csid, seriename)
371
        L.info('first insertion of %s (size=%s) by %s',
372
               seriename, len(newts), author)
373
374
        return newts

375
    def _update(self, cn, table, newts, seriename, author,
376
                metadata=None, insertion_date=None):
377
378
        self._validate(cn, newts, seriename)
        snapshot = Snapshot(cn, self, seriename)
379
380
381
        diff = self.diff(snapshot.last(newts.index.min(),
                                       newts.index.max()),
                         newts)
382
383
        if not len(diff):
            L.info('no difference in %s by %s (for ts of size %s)',
384
                   seriename, author, len(newts))
385
386
            return

387
        csid = self._newchangeset(cn, author, insertion_date, metadata)
388
389
390
391
392
393
        head = snapshot.update(diff)
        value = {
            'cset': csid,
            'snapshot': head
        }
        cn.execute(table.insert().values(value))
394
        self._finalize_insertion(cn, csid, seriename)
395
396

        L.info('inserted diff (size=%s) for ts %s by %s',
397
               len(diff), seriename, author)
398
399
        return diff

400
    # serie table handling
401

402
403
404
405
    def _serie_to_tablename(self, cn, seriename):
        tablename = self.serie_tablename.get(seriename)
        if tablename is not None:
            return tablename
406

407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
        reg = self.schema.registry
        sql = select([reg.c.table_name]).where(reg.c.seriename == seriename)
        tablename = cn.execute(sql).scalar()
        if tablename is None:
            # creation time
            return
        self.serie_tablename[seriename] = tablename
        return tablename

    def _table_definition_for(self, cn, seriename):
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            # creation time
            tablename = self._make_tablename(seriename)
        fq_tablename = '{}.timeserie.{}'.format(self.namespace, tablename)
        table = TABLES.get(fq_tablename)
423
        if table is None:
424
425
            TABLES[fq_tablename] = table = Table(
                tablename, self.schema.meta,
426
427
428
429
430
431
432
                Column('id', Integer, primary_key=True),
                Column('cset', Integer,
                       ForeignKey('{}.changeset.id'.format(self.namespace)),
                       index=True, nullable=False),
                Column('snapshot', Integer,
                       ForeignKey('{}.snapshot.{}.id'.format(
                           self.namespace,
433
                           tablename)),
434
435
436
437
438
                       index=True),
                schema='{}.timeserie'.format(self.namespace),
                extend_existing=True
            )
        return table
439

440
    def _make_ts_table(self, cn, seriename, ts):
441
        table = self._table_definition_for(cn, seriename)
442
        table.create(cn)
443
444
445
        return table

    def _register_serie(self, cn, seriename, ts):
446
447
        index = ts.index
        inames = [name for name in index.names if name]
448
        sql = self.schema.registry.insert().values(
449
450
            seriename=seriename,
            table_name=self._make_tablename(seriename),
451
452
453
454
455
456
            metadata={
                'tzaware': tzaware_serie(ts),
                'index_type': index.dtype.name,
                'index_names': inames,
                'value_type': ts.dtypes.name
            },
457
        )
458
459
        regid = cn.execute(sql).inserted_primary_key[0]
        self.registry_map[seriename] = regid
460

461
    def _get_ts_table(self, cn, seriename):
462
463
464
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename:
            return self._table_definition_for(cn, tablename)
465

466
467
    # changeset handling

468
    def _newchangeset(self, cn, author, insertion_date=None, metadata=None):
469
        table = self.schema.changeset
470
471
472
        if insertion_date is not None:
            assert insertion_date.tzinfo is not None
        idate = pd.Timestamp(insertion_date or datetime.utcnow(), tz='UTC')
473
474
        sql = table.insert().values(
            author=author,
475
            metadata=metadata,
476
            insertion_date=idate)
477
        return cn.execute(sql).inserted_primary_key[0]
478

479
    def _changeset_series(self, cn, csid):
480
        cset_serie = self.schema.changeset_series
481
482
        reg = self.schema.registry
        sql = select(
483
            [reg.c.seriename]
484
485
        ).where(cset_serie.c.cset == csid
        ).where(cset_serie.c.serie == reg.c.id)
486

487
        return [
488
            row.seriename
489
490
            for row in cn.execute(sql).fetchall()
        ]
491
492
493

    # insertion handling

494
    def _validate(self, cn, ts, seriename):
495
496
        if ts.isnull().all():
            # ts erasure
497
            return
498
        tstype = ts.dtype
499
        meta = self.metadata(cn, seriename)
500
        if tstype != meta['value_type']:
501
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
502
                seriename, tstype, meta['value_type'])
503
            raise Exception(m)
504
        if ts.index.dtype.name != meta['index_type']:
505
            raise Exception('Incompatible index types')
506

507
508
    def _name_to_regid(self, cn, seriename):
        regid = self.registry_map.get(seriename)
509
510
511
512
        if regid is not None:
            return regid

        registry = self.schema.registry
513
        sql = select([registry.c.id]).where(registry.c.seriename == seriename)
514
        regid = self.registry_map[seriename] = cn.execute(sql).scalar()
515
516
        return regid

517
    def _finalize_insertion(self, cn, csid, seriename):
518
519
520
        table = self.schema.changeset_series
        sql = table.insert().values(
            cset=csid,
521
            serie=self._name_to_regid(cn, seriename)
522
523
        )
        cn.execute(sql)
524
525
526
527
528
529
530

    # don't use this

    def resetcaches(self):
        self.metadatacache.clear()
        self.registry_map.clear()
        self.serie_tablename.clear()