tsio.py 18.6 KB
Newer Older
1
from datetime import datetime
2
from contextlib import contextmanager
3
import logging
4
import hashlib
5
6
7

import pandas as pd

8
from sqlalchemy import Table, Column, Integer, ForeignKey
Aurélien Campéas's avatar
Aurélien Campéas committed
9
from sqlalchemy.sql.expression import select, func, desc
10
from sqlalchemy.dialects.postgresql import BYTEA
11

12
from tshistory.schema import tsschema
13
14
from tshistory.util import (
    inject_in_index,
15
    num2float,
16
    subset,
17
    SeriesServices,
18
19
    tzaware_serie
)
20
from tshistory.snapshot import Snapshot
21

22
L = logging.getLogger('tshistory.tsio')
23
TABLES = {}
24
25


26
class TimeSerie(SeriesServices):
27
    namespace = 'tsh'
28
    schema = None
29
    metadatacache = None
30
    registry_map = None
31
    serie_tablename = None
32
33
34

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
35
36
        self.schema = tsschema(namespace)
        self.schema.define()
37
        self.metadatacache = {}
38
        self.registry_map = {}
39
        self.serie_tablename = {}
40

41
    def insert(self, cn, newts, seriename, author,
42
43
               metadata=None,
               _insertion_date=None):
44
        """Create a new revision of a given time series
45

46
        newts: pandas.Series with date index
47
        seriename: str unique identifier of the serie
48
        author: str free-form author name
49
        metadata: optional dict for changeset metadata
50
51
        """
        assert isinstance(newts, pd.Series)
52
        assert isinstance(seriename, str)
Aurélien Campéas's avatar
Aurélien Campéas committed
53
        assert isinstance(author, str)
54
        assert metadata is None or isinstance(metadata, dict)
Aurélien Campéas's avatar
Aurélien Campéas committed
55
        assert _insertion_date is None or isinstance(_insertion_date, datetime)
56
        assert not newts.index.duplicated().any()
57

58
        newts = num2float(newts)
59

60
        if not len(newts):
61
            return
62

63
        assert ('<M8[ns]' == newts.index.dtype or
64
                'datetime' in str(newts.index.dtype) and not
65
66
                isinstance(newts.index, pd.MultiIndex))

67
68
        newts.name = seriename
        table = self._get_ts_table(cn, seriename)
69

70
        if table is None:
71
            return self._create(cn, newts, seriename, author,
72
                                metadata, _insertion_date)
73

74
        return self._update(cn, table, newts, seriename, author,
75
                            metadata, _insertion_date)
76

77
    def get(self, cn, seriename, revision_date=None,
78
79
            from_value_date=None, to_value_date=None,
            _keep_nans=False):
80
81
82
83
84
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

85
        """
86
        table = self._get_ts_table(cn, seriename)
87
88
        if table is None:
            return
89

90
        csetfilter = []
91
        if revision_date:
92
            csetfilter.append(lambda cset: cset.c.insertion_date <= revision_date)
93
        snap = Snapshot(cn, self, seriename)
94
        _, current = snap.find(csetfilter=csetfilter,
95
96
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)
97

98
        if current is not None and not _keep_nans:
99
            current.name = seriename
100
            current = current[~current.isnull()]
101
        return current
102

103
    def metadata(self, cn, seriename):
104
        """Return metadata dict of timeserie."""
105
106
        if seriename in self.metadatacache:
            return self.metadatacache[seriename]
107
108
        reg = self.schema.registry
        sql = select([reg.c.metadata]).where(
109
            reg.c.seriename == seriename
110
111
        )
        meta = cn.execute(sql).scalar()
112
        self.metadatacache[seriename] = meta
113
114
        return meta

115
    def update_metadata(self, cn, seriename, metadata, internal=False):
116
        assert isinstance(metadata, dict)
117
        meta = self.metadata(cn, seriename)
118
119
120
121
122
        if not internal:
            assert set(meta.keys()).intersection(metadata.keys()) == set()
        meta.update(metadata)
        reg = self.schema.registry
        sql = reg.update().where(
123
            reg.c.seriename == seriename
124
125
126
        ).values(metadata=metadata)
        cn.execute(sql)

127
128
129
130
131
132
133
134
    def changeset_metadata(self, cn, csid):
        cset = self.schema.changeset
        sql = 'select metadata from "{ns}".changeset where id = {id}'.format(
            ns=self.namespace,
            id=csid
        )
        return cn.execute(sql).scalar()

135
    def get_history(self, cn, seriename,
136
                    from_insertion_date=None,
137
138
                    to_insertion_date=None,
                    from_value_date=None,
139
                    to_value_date=None,
140
                    deltabefore=None,
141
                    deltaafter=None):
142
        table = self._get_ts_table(cn, seriename)
143
144
145
        if table is None:
            return

146
147
148
149
        if deltabefore is not None or deltaafter is not None:
            assert from_value_date is None
            assert to_value_date is None

150
        cset = self.schema.changeset
151
152
153
154
155
156
157
158
159
160
161
162
        revsql = select(
            [cset.c.id, cset.c.insertion_date]
        ).order_by(
            cset.c.id
        ).where(
            table.c.cset == cset.c.id
        )

        if from_insertion_date:
            revsql = revsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            revsql = revsql.where(cset.c.insertion_date <= to_insertion_date)
163

164
165
166
        revs = cn.execute(revsql).fetchall()
        if not revs:
            return
167

168
        snapshot = Snapshot(cn, self, seriename)
169
170
        series = []
        for csid, idate in revs:
171
172
173
174
            if (deltabefore, deltaafter) != (None, None):
                from_value_date = None
                to_value_date = None
                if deltabefore is not None:
175
                    from_value_date = idate - deltabefore
176
                if deltaafter is not None:
177
                    to_value_date = idate + deltaafter
178
179
            series.append((
                idate,
180
181
182
                snapshot.find(csetfilter=[lambda cset: cset.c.id == csid],
                              from_value_date=from_value_date,
                              to_value_date=to_value_date)[1]
183
            ))
184
185

        for revdate, serie in series:
186
            inject_in_index(serie, revdate)
187
188

        serie = pd.concat([serie for revdate_, serie in series])
189
        serie.name = seriename
190
        return serie
191

192
    def get_delta(self, cn, seriename, delta):
193
        histo = self.get_history(
194
            cn, seriename, deltabefore=-delta
195
196
197
198
199
200
201
202
203
204
        )

        df = histo.reset_index()
        # df_date is a dataframe with two columns: value_date and insertion_date
        df_date = df.loc[:, ['insertion_date', 'value_date']]

        # now in selected_dates each value_date has only one occurence
        # which is the last inserted
        selected_dates = df_date.groupby('value_date').max().reset_index()

205
        ts = df[seriename]
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
        # ts is built from the df returned from get_history
        # ts index is now a simple index of tuples (insert_date, value_date)
        ts.index = ((row.insertion_date, row.value_date)
                    for row in df.itertuples())
        # in ts, there ie still all the couple value_date * insertion_date
        # We now used the selected_dates to select in ts only
        # the couple (value_date, insertion_date)
        # which corresponds to the last insertion_date
        ts_select = ts[[(row[2], row[1])
                        for row in selected_dates.itertuples()]]

        # ts_select has still a simple index of tuples (value_date, insertion_date)
        new_index = (elt[1] for elt in ts_select.index)

        # we only keep the value_date information from the index
        ts_select.index = new_index
        return ts_select


225
226
    def exists(self, cn, seriename):
        return self._get_ts_table(cn, seriename) is not None
227

228
    def latest_insertion_date(self, cn, seriename):
229
        cset = self.schema.changeset
230
        tstable = self._get_ts_table(cn, seriename)
231
        sql = select([func.max(cset.c.insertion_date)]
232
        ).where(tstable.c.cset == cset.c.id)
233
        return cn.execute(sql).scalar()
234

235
236
237
238
    def last_id(self, cn, seriename):
        snapshot = Snapshot(cn, self, seriename)
        return snapshot.last_id()

239
240
241
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
        assert mode in ('strict', 'before', 'after')
        cset = self.schema.changeset
242
        table = self._table_definition_for(cn, seriename)
243
244
        sql = select([table.c.cset]).where(
            table.c.cset == cset.c.id
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
        )
        if mode == 'strict':
            sql = sql.where(cset.c.insertion_date == revdate)
        elif mode == 'before':
            sql = sql.where(cset.c.insertion_date <= revdate)
        else:
            sql = sql.where(cset.c.insertion_date >= revdate)
        return cn.execute(sql).scalar()

    def strip(self, cn, seriename, csid):
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs

        # put stripping info in the metadata
        cset = self.schema.changeset
        cset_serie = self.schema.changeset_series
        for log in logs:
            # update changeset.metadata
263
            metadata = self.changeset_metadata(cn, log['rev']) or {}
264
265
266
267
268
269
            metadata['tshistory.info'] = 'got stripped from {}'.format(csid)
            sql = cset.update().where(cset.c.id == log['rev']
            ).values(metadata=metadata)
            cn.execute(sql)
            # delete changset_serie item
            sql = cset_serie.delete().where(
270
                cset_serie.c.cset == log['rev']
271
            ).where(
272
                cset_serie.c.serie == self._name_to_regid(cn, seriename)
273
274
275
276
            )
            cn.execute(sql)

        # wipe the diffs
277
        table = self._table_definition_for(cn, seriename)
278
        cn.execute(table.delete().where(table.c.cset >= csid))
279

280
    def info(self, cn):
281
282
        """Gather global statistics on the current tshistory repository
        """
283
        sql = 'select count(*) from {}.registry'.format(self.namespace)
284
        stats = {'series count': cn.execute(sql).scalar()}
285
        sql = 'select max(id) from {}.changeset'.format(self.namespace)
286
        stats['changeset count'] = cn.execute(sql).scalar()
287
        sql = 'select distinct name from {}.registry order by name'.format(self.namespace)
288
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
289
290
        return stats

291
    def log(self, cn, limit=0, names=None, authors=None,
292
            stripped=False,
293
294
            fromrev=None, torev=None,
            fromdate=None, todate=None):
295
296
297
298
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
299
300
301
302
303
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
304

305
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date, cset.c.metadata]
306
        ).distinct().order_by(desc(cset.c.id))
307
308
309

        if limit:
            sql = sql.limit(limit)
310
        if names:
311
            sql = sql.where(reg.c.seriename.in_(names))
312
313
        if authors:
            sql = sql.where(cset.c.author.in_(authors))
314
315
316
317
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)
        if torev:
            sql = sql.where(cset.c.id <= torev)
318
319
320
321
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)
        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)
322
323
324
325
        if stripped:
            # outerjoin to show dead things
            sql = sql.select_from(cset.outerjoin(cset_series))
        else:
326
            sql = sql.where(cset.c.id == cset_series.c.cset
327
            ).where(cset_series.c.serie == reg.c.id)
328

329
        rset = cn.execute(sql)
330
        for csetid, author, revdate, meta in rset.fetchall():
331
332
            log.append({'rev': csetid, 'author': author,
                        'date': pd.Timestamp(revdate, tz='utc'),
333
                        'meta': meta or {},
334
                        'names': self._changeset_series(cn, csetid)})
335

336
        log.sort(key=lambda rev: rev['rev'])
337
338
        return log

339
340
    # /API
    # Helpers
341

342
343
    # creation / update

344
    def _create(self, cn, newts, seriename, author,
345
                metadata=None, insertion_date=None):
346
347
348
        # initial insertion
        if len(newts) == 0:
            return None
349
        self._register_serie(cn, seriename, newts)
350
        snapshot = Snapshot(cn, self, seriename)
351
        csid = self._newchangeset(cn, author, insertion_date, metadata)
352
353
354
355
356
        head = snapshot.create(newts)
        value = {
            'cset': csid,
            'snapshot': head
        }
357
        table = self._make_ts_table(cn, seriename, newts)
358
        cn.execute(table.insert().values(value))
359
        self._finalize_insertion(cn, csid, seriename)
360
        L.info('first insertion of %s (size=%s) by %s',
361
               seriename, len(newts), author)
362
363
        return newts

364
    def _update(self, cn, table, newts, seriename, author,
365
                metadata=None, insertion_date=None):
366
367
        self._validate(cn, newts, seriename)
        snapshot = Snapshot(cn, self, seriename)
368
369
370
        diff = self.diff(snapshot.last(newts.index.min(),
                                       newts.index.max()),
                         newts)
371
372
        if not len(diff):
            L.info('no difference in %s by %s (for ts of size %s)',
373
                   seriename, author, len(newts))
374
375
            return

376
        csid = self._newchangeset(cn, author, insertion_date, metadata)
377
378
379
380
381
382
        head = snapshot.update(diff)
        value = {
            'cset': csid,
            'snapshot': head
        }
        cn.execute(table.insert().values(value))
383
        self._finalize_insertion(cn, csid, seriename)
384
385

        L.info('inserted diff (size=%s) for ts %s by %s',
386
               len(diff), seriename, author)
387
388
        return diff

389
    # serie table handling
390

391
392
393
394
    def _serie_to_tablename(self, cn, seriename):
        tablename = self.serie_tablename.get(seriename)
        if tablename is not None:
            return tablename
395

396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
        reg = self.schema.registry
        sql = select([reg.c.table_name]).where(reg.c.seriename == seriename)
        tablename = cn.execute(sql).scalar()
        if tablename is None:
            # creation time
            return
        self.serie_tablename[seriename] = tablename
        return tablename

    def _table_definition_for(self, cn, seriename):
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            # creation time
            tablename = self._make_tablename(seriename)
        fq_tablename = '{}.timeserie.{}'.format(self.namespace, tablename)
        table = TABLES.get(fq_tablename)
412
        if table is None:
413
414
            TABLES[fq_tablename] = table = Table(
                tablename, self.schema.meta,
415
416
417
418
419
420
421
                Column('id', Integer, primary_key=True),
                Column('cset', Integer,
                       ForeignKey('{}.changeset.id'.format(self.namespace)),
                       index=True, nullable=False),
                Column('snapshot', Integer,
                       ForeignKey('{}.snapshot.{}.id'.format(
                           self.namespace,
422
                           tablename)),
423
424
425
426
427
                       index=True),
                schema='{}.timeserie'.format(self.namespace),
                extend_existing=True
            )
        return table
428

429
    def _make_ts_table(self, cn, seriename, ts):
430
        table = self._table_definition_for(cn, seriename)
431
        table.create(cn)
432
433
434
        return table

    def _register_serie(self, cn, seriename, ts):
435
436
        index = ts.index
        inames = [name for name in index.names if name]
437
        sql = self.schema.registry.insert().values(
438
439
            seriename=seriename,
            table_name=self._make_tablename(seriename),
440
441
442
443
444
445
            metadata={
                'tzaware': tzaware_serie(ts),
                'index_type': index.dtype.name,
                'index_names': inames,
                'value_type': ts.dtypes.name
            },
446
        )
447
448
        regid = cn.execute(sql).inserted_primary_key[0]
        self.registry_map[seriename] = regid
449

450
    def _get_ts_table(self, cn, seriename):
451
452
453
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename:
            return self._table_definition_for(cn, tablename)
454

455
456
    # changeset handling

457
    def _newchangeset(self, cn, author, insertion_date=None, metadata=None):
458
        table = self.schema.changeset
459
460
461
        if insertion_date is not None:
            assert insertion_date.tzinfo is not None
        idate = pd.Timestamp(insertion_date or datetime.utcnow(), tz='UTC')
462
463
        sql = table.insert().values(
            author=author,
464
            metadata=metadata,
465
            insertion_date=idate)
466
        return cn.execute(sql).inserted_primary_key[0]
467

468
    def _changeset_series(self, cn, csid):
469
        cset_serie = self.schema.changeset_series
470
471
        reg = self.schema.registry
        sql = select(
472
            [reg.c.seriename]
473
474
        ).where(cset_serie.c.cset == csid
        ).where(cset_serie.c.serie == reg.c.id)
475

476
        return [
477
            row.seriename
478
479
            for row in cn.execute(sql).fetchall()
        ]
480
481
482

    # insertion handling

483
    def _validate(self, cn, ts, seriename):
484
485
        if ts.isnull().all():
            # ts erasure
486
            return
487
        tstype = ts.dtype
488
        meta = self.metadata(cn, seriename)
489
        if tstype != meta['value_type']:
490
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
491
                seriename, tstype, meta['value_type'])
492
            raise Exception(m)
493
        if ts.index.dtype.name != meta['index_type']:
494
            raise Exception('Incompatible index types')
495

496
497
    def _name_to_regid(self, cn, seriename):
        regid = self.registry_map.get(seriename)
498
499
500
501
        if regid is not None:
            return regid

        registry = self.schema.registry
502
        sql = select([registry.c.id]).where(registry.c.seriename == seriename)
503
        regid = self.registry_map[seriename] = cn.execute(sql).scalar()
504
505
        return regid

506
    def _finalize_insertion(self, cn, csid, seriename):
507
508
509
        table = self.schema.changeset_series
        sql = table.insert().values(
            cset=csid,
510
            serie=self._name_to_regid(cn, seriename)
511
512
        )
        cn.execute(sql)
513
514
515
516
517
518
519

    # don't use this

    def resetcaches(self):
        self.metadatacache.clear()
        self.registry_map.clear()
        self.serie_tablename.clear()