tsio.py 18.8 KB
Newer Older
1
from datetime import datetime
2
from contextlib import contextmanager
3
import logging
4
import hashlib
5
6
7

import pandas as pd

8
from sqlalchemy import Table, Column, Integer, ForeignKey
Aurélien Campéas's avatar
Aurélien Campéas committed
9
from sqlalchemy.sql.expression import select, func, desc
10
from sqlalchemy.dialects.postgresql import BYTEA
11

12
from tshistory.schema import tsschema
13
14
from tshistory.util import (
    inject_in_index,
15
    num2float,
16
    subset,
17
    SeriesServices,
18
19
    tzaware_serie
)
20
from tshistory.snapshot import Snapshot
21

22
L = logging.getLogger('tshistory.tsio')
23
TABLES = {}
24
25


26
class TimeSerie(SeriesServices):
27
    namespace = 'tsh'
28
    schema = None
29
    metadatacache = None
30
    registry_map = None
31
    serie_tablename = None
32
33
34

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
35
36
        self.schema = tsschema(namespace)
        self.schema.define()
37
        self.metadatacache = {}
38
        self.registry_map = {}
39
        self.serie_tablename = {}
40

41
    def insert(self, cn, newts, seriename, author,
42
43
               metadata=None,
               _insertion_date=None):
44
        """Create a new revision of a given time series
45

46
        newts: pandas.Series with date index
47
        seriename: str unique identifier of the serie
48
        author: str free-form author name
49
        metadata: optional dict for changeset metadata
50
51
        """
        assert isinstance(newts, pd.Series)
52
        assert isinstance(seriename, str)
Aurélien Campéas's avatar
Aurélien Campéas committed
53
        assert isinstance(author, str)
54
        assert metadata is None or isinstance(metadata, dict)
Aurélien Campéas's avatar
Aurélien Campéas committed
55
        assert _insertion_date is None or isinstance(_insertion_date, datetime)
56
        assert not newts.index.duplicated().any()
57

58
        newts = num2float(newts)
59

60
        if not len(newts):
61
            return
62

63
        assert ('<M8[ns]' == newts.index.dtype or
64
                'datetime' in str(newts.index.dtype) and not
65
66
                isinstance(newts.index, pd.MultiIndex))

67
68
        newts.name = seriename
        table = self._get_ts_table(cn, seriename)
69

70
        if table is None:
71
            return self._create(cn, newts, seriename, author,
72
                                metadata, _insertion_date)
73

74
        return self._update(cn, table, newts, seriename, author,
75
                            metadata, _insertion_date)
76

77
    def get(self, cn, seriename, revision_date=None,
78
79
            from_value_date=None, to_value_date=None,
            _keep_nans=False):
80
81
82
83
84
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

85
        """
86
        table = self._get_ts_table(cn, seriename)
87
88
        if table is None:
            return
89

90
        csetfilter = []
91
        if revision_date:
92
            csetfilter.append(lambda cset: cset.c.insertion_date <= revision_date)
93
        snap = Snapshot(cn, self, seriename)
94
        _, current = snap.find(csetfilter=csetfilter,
95
96
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)
97

98
        if current is not None and not _keep_nans:
99
            current.name = seriename
100
            current = current[~current.isnull()]
101
        return current
102

103
    def metadata(self, cn, seriename):
104
        """Return metadata dict of timeserie."""
105
106
        if seriename in self.metadatacache:
            return self.metadatacache[seriename]
107
108
        reg = self.schema.registry
        sql = select([reg.c.metadata]).where(
109
            reg.c.seriename == seriename
110
111
        )
        meta = cn.execute(sql).scalar()
112
        self.metadatacache[seriename] = meta
113
114
        return meta

115
    def update_metadata(self, cn, seriename, metadata, internal=False):
116
        assert isinstance(metadata, dict)
117
        meta = self.metadata(cn, seriename)
118
119
120
121
122
        if not internal:
            assert set(meta.keys()).intersection(metadata.keys()) == set()
        meta.update(metadata)
        reg = self.schema.registry
        sql = reg.update().where(
123
            reg.c.seriename == seriename
124
125
126
        ).values(metadata=metadata)
        cn.execute(sql)

127
128
129
130
131
132
133
134
    def changeset_metadata(self, cn, csid):
        cset = self.schema.changeset
        sql = 'select metadata from "{ns}".changeset where id = {id}'.format(
            ns=self.namespace,
            id=csid
        )
        return cn.execute(sql).scalar()

135
    def get_history(self, cn, seriename,
136
                    from_insertion_date=None,
137
138
                    to_insertion_date=None,
                    from_value_date=None,
139
                    to_value_date=None,
140
                    deltabefore=None,
141
                    deltaafter=None):
142
        table = self._get_ts_table(cn, seriename)
143
144
145
        if table is None:
            return

146
147
148
149
        if deltabefore is not None or deltaafter is not None:
            assert from_value_date is None
            assert to_value_date is None

150
        cset = self.schema.changeset
151
152
153
154
155
156
157
158
159
160
161
162
        revsql = select(
            [cset.c.id, cset.c.insertion_date]
        ).order_by(
            cset.c.id
        ).where(
            table.c.cset == cset.c.id
        )

        if from_insertion_date:
            revsql = revsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            revsql = revsql.where(cset.c.insertion_date <= to_insertion_date)
163

164
165
166
        revs = cn.execute(revsql).fetchall()
        if not revs:
            return
167

168
        snapshot = Snapshot(cn, self, seriename)
169
170
        series = []
        for csid, idate in revs:
171
172
173
174
            if (deltabefore, deltaafter) != (None, None):
                from_value_date = None
                to_value_date = None
                if deltabefore is not None:
175
                    from_value_date = idate - deltabefore
176
                if deltaafter is not None:
177
                    to_value_date = idate + deltaafter
178
179
            series.append((
                idate,
180
181
182
                snapshot.find(csetfilter=[lambda cset: cset.c.id == csid],
                              from_value_date=from_value_date,
                              to_value_date=to_value_date)[1]
183
            ))
184
185

        for revdate, serie in series:
186
            inject_in_index(serie, revdate)
187
188

        serie = pd.concat([serie for revdate_, serie in series])
189
        serie.name = seriename
190
        return serie
191

192
    def get_delta(self, cn, seriename, delta):
193
        histo = self.get_history(
194
            cn, seriename, deltabefore=-delta
195
196
197
198
199
200
201
202
203
204
        )

        df = histo.reset_index()
        # df_date is a dataframe with two columns: value_date and insertion_date
        df_date = df.loc[:, ['insertion_date', 'value_date']]

        # now in selected_dates each value_date has only one occurence
        # which is the last inserted
        selected_dates = df_date.groupby('value_date').max().reset_index()

205
        ts = df[seriename]
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
        # ts is built from the df returned from get_history
        # ts index is now a simple index of tuples (insert_date, value_date)
        ts.index = ((row.insertion_date, row.value_date)
                    for row in df.itertuples())
        # in ts, there ie still all the couple value_date * insertion_date
        # We now used the selected_dates to select in ts only
        # the couple (value_date, insertion_date)
        # which corresponds to the last insertion_date
        ts_select = ts[[(row[2], row[1])
                        for row in selected_dates.itertuples()]]

        # ts_select has still a simple index of tuples (value_date, insertion_date)
        new_index = (elt[1] for elt in ts_select.index)

        # we only keep the value_date information from the index
        ts_select.index = new_index
        return ts_select


225
226
    def exists(self, cn, seriename):
        return self._get_ts_table(cn, seriename) is not None
227

228
    def latest_insertion_date(self, cn, seriename):
229
        cset = self.schema.changeset
230
        tstable = self._get_ts_table(cn, seriename)
231
        sql = select([func.max(cset.c.insertion_date)]
232
        ).where(tstable.c.cset == cset.c.id)
233
        return cn.execute(sql).scalar()
234

235
236
237
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
        assert mode in ('strict', 'before', 'after')
        cset = self.schema.changeset
238
        table = self._table_definition_for(cn, seriename)
239
240
        sql = select([table.c.cset]).where(
            table.c.cset == cset.c.id
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
        )
        if mode == 'strict':
            sql = sql.where(cset.c.insertion_date == revdate)
        elif mode == 'before':
            sql = sql.where(cset.c.insertion_date <= revdate)
        else:
            sql = sql.where(cset.c.insertion_date >= revdate)
        return cn.execute(sql).scalar()

    def strip(self, cn, seriename, csid):
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs

        # put stripping info in the metadata
        cset = self.schema.changeset
        cset_serie = self.schema.changeset_series
        for log in logs:
            # update changeset.metadata
259
            metadata = self.changeset_metadata(cn, log['rev']) or {}
260
261
262
263
264
265
            metadata['tshistory.info'] = 'got stripped from {}'.format(csid)
            sql = cset.update().where(cset.c.id == log['rev']
            ).values(metadata=metadata)
            cn.execute(sql)
            # delete changset_serie item
            sql = cset_serie.delete().where(
266
                cset_serie.c.cset == log['rev']
267
            ).where(
268
                cset_serie.c.serie == self._name_to_regid(cn, seriename)
269
270
271
272
            )
            cn.execute(sql)

        # wipe the diffs
273
        table = self._table_definition_for(cn, seriename)
274
        cn.execute(table.delete().where(table.c.cset >= csid))
275

276
    def info(self, cn):
277
278
        """Gather global statistics on the current tshistory repository
        """
279
        sql = 'select count(*) from {}.registry'.format(self.namespace)
280
        stats = {'series count': cn.execute(sql).scalar()}
281
        sql = 'select max(id) from {}.changeset'.format(self.namespace)
282
        stats['changeset count'] = cn.execute(sql).scalar()
283
        sql = 'select distinct name from {}.registry order by name'.format(self.namespace)
284
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
285
286
        return stats

287
    def log(self, cn, limit=0, names=None, authors=None,
288
            stripped=False,
289
290
            fromrev=None, torev=None,
            fromdate=None, todate=None):
291
292
293
294
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
295
296
297
298
299
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
300

301
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date, cset.c.metadata]
302
        ).distinct().order_by(desc(cset.c.id))
303
304
305

        if limit:
            sql = sql.limit(limit)
306
        if names:
307
            sql = sql.where(reg.c.seriename.in_(names))
308
309
        if authors:
            sql = sql.where(cset.c.author.in_(authors))
310
311
312
313
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)
        if torev:
            sql = sql.where(cset.c.id <= torev)
314
315
316
317
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)
        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)
318
319
320
321
        if stripped:
            # outerjoin to show dead things
            sql = sql.select_from(cset.outerjoin(cset_series))
        else:
322
            sql = sql.where(cset.c.id == cset_series.c.cset
323
            ).where(cset_series.c.serie == reg.c.id)
324

325
        rset = cn.execute(sql)
326
        for csetid, author, revdate, meta in rset.fetchall():
327
328
            log.append({'rev': csetid, 'author': author,
                        'date': pd.Timestamp(revdate, tz='utc'),
329
                        'meta': meta or {},
330
                        'names': self._changeset_series(cn, csetid)})
331

332
        log.sort(key=lambda rev: rev['rev'])
333
334
        return log

335
336
    # /API
    # Helpers
337

338
339
    # creation / update

340
    def _create(self, cn, newts, seriename, author,
341
                metadata=None, insertion_date=None):
342
343
344
        # initial insertion
        if len(newts) == 0:
            return None
345
        self._register_serie(cn, seriename, newts)
346
        snapshot = Snapshot(cn, self, seriename)
347
        csid = self._newchangeset(cn, author, insertion_date, metadata)
348
349
350
351
352
        head = snapshot.create(newts)
        value = {
            'cset': csid,
            'snapshot': head
        }
353
        table = self._make_ts_table(cn, seriename, newts)
354
        cn.execute(table.insert().values(value))
355
        self._finalize_insertion(cn, csid, seriename)
356
        L.info('first insertion of %s (size=%s) by %s',
357
               seriename, len(newts), author)
358
359
        return newts

360
    def _update(self, cn, table, newts, seriename, author,
361
                metadata=None, insertion_date=None):
362
363
        self._validate(cn, newts, seriename)
        snapshot = Snapshot(cn, self, seriename)
364
365
366
        diff = self.diff(snapshot.last(newts.index.min(),
                                       newts.index.max()),
                         newts)
367
368
        if not len(diff):
            L.info('no difference in %s by %s (for ts of size %s)',
369
                   seriename, author, len(newts))
370
371
            return

372
        csid = self._newchangeset(cn, author, insertion_date, metadata)
373
374
375
376
377
378
        head = snapshot.update(diff)
        value = {
            'cset': csid,
            'snapshot': head
        }
        cn.execute(table.insert().values(value))
379
        self._finalize_insertion(cn, csid, seriename)
380
381

        L.info('inserted diff (size=%s) for ts %s by %s',
382
               len(diff), seriename, author)
383
384
        return diff

385
386
    # ts serialisation

387
388
389
390
391
    def _ensure_tz_consistency(self, cn, ts):
        """Return timeserie with tz aware index or not depending on metadata
        tzaware.
        """
        assert ts.name is not None
392
        metadata = self.metadata(cn, ts.name)
393
394
395
396
        if metadata and metadata.get('tzaware', False):
            return ts.tz_localize('UTC')
        return ts

397
    # serie table handling
398

399
400
401
402
    def _serie_to_tablename(self, cn, seriename):
        tablename = self.serie_tablename.get(seriename)
        if tablename is not None:
            return tablename
403

404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
        reg = self.schema.registry
        sql = select([reg.c.table_name]).where(reg.c.seriename == seriename)
        tablename = cn.execute(sql).scalar()
        if tablename is None:
            # creation time
            return
        self.serie_tablename[seriename] = tablename
        return tablename

    def _table_definition_for(self, cn, seriename):
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            # creation time
            tablename = self._make_tablename(seriename)
        fq_tablename = '{}.timeserie.{}'.format(self.namespace, tablename)
        table = TABLES.get(fq_tablename)
420
        if table is None:
421
422
            TABLES[fq_tablename] = table = Table(
                tablename, self.schema.meta,
423
424
425
426
427
428
429
                Column('id', Integer, primary_key=True),
                Column('cset', Integer,
                       ForeignKey('{}.changeset.id'.format(self.namespace)),
                       index=True, nullable=False),
                Column('snapshot', Integer,
                       ForeignKey('{}.snapshot.{}.id'.format(
                           self.namespace,
430
                           tablename)),
431
432
433
434
435
                       index=True),
                schema='{}.timeserie'.format(self.namespace),
                extend_existing=True
            )
        return table
436

437
    def _make_ts_table(self, cn, seriename, ts):
438
        table = self._table_definition_for(cn, seriename)
439
        table.create(cn)
440
441
442
        return table

    def _register_serie(self, cn, seriename, ts):
443
444
        index = ts.index
        inames = [name for name in index.names if name]
445
        sql = self.schema.registry.insert().values(
446
447
            seriename=seriename,
            table_name=self._make_tablename(seriename),
448
449
450
451
452
453
            metadata={
                'tzaware': tzaware_serie(ts),
                'index_type': index.dtype.name,
                'index_names': inames,
                'value_type': ts.dtypes.name
            },
454
        )
455
456
        regid = cn.execute(sql).inserted_primary_key[0]
        self.registry_map[seriename] = regid
457

458
    def _get_ts_table(self, cn, seriename):
459
460
461
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename:
            return self._table_definition_for(cn, tablename)
462

463
464
    # changeset handling

465
    def _newchangeset(self, cn, author, insertion_date=None, metadata=None):
466
        table = self.schema.changeset
467
468
469
        if insertion_date is not None:
            assert insertion_date.tzinfo is not None
        idate = pd.Timestamp(insertion_date or datetime.utcnow(), tz='UTC')
470
471
        sql = table.insert().values(
            author=author,
472
            metadata=metadata,
473
            insertion_date=idate)
474
        return cn.execute(sql).inserted_primary_key[0]
475

476
    def _changeset_series(self, cn, csid):
477
        cset_serie = self.schema.changeset_series
478
479
        reg = self.schema.registry
        sql = select(
480
            [reg.c.seriename]
481
482
        ).where(cset_serie.c.cset == csid
        ).where(cset_serie.c.serie == reg.c.id)
483

484
        return [
485
            row.seriename
486
487
            for row in cn.execute(sql).fetchall()
        ]
488
489
490

    # insertion handling

491
    def _validate(self, cn, ts, seriename):
492
493
        if ts.isnull().all():
            # ts erasure
494
            return
495
        tstype = ts.dtype
496
        meta = self.metadata(cn, seriename)
497
        if tstype != meta['value_type']:
498
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
499
                seriename, tstype, meta['value_type'])
500
            raise Exception(m)
501
        if ts.index.dtype.name != meta['index_type']:
502
            raise Exception('Incompatible index types')
503

504
505
    def _name_to_regid(self, cn, seriename):
        regid = self.registry_map.get(seriename)
506
507
508
509
        if regid is not None:
            return regid

        registry = self.schema.registry
510
        sql = select([registry.c.id]).where(registry.c.seriename == seriename)
511
        regid = self.registry_map[seriename] = cn.execute(sql).scalar()
512
513
        return regid

514
    def _finalize_insertion(self, cn, csid, seriename):
515
516
517
        table = self.schema.changeset_series
        sql = table.insert().values(
            cset=csid,
518
            serie=self._name_to_regid(cn, seriename)
519
520
        )
        cn.execute(sql)
521
522
523
524
525
526
527

    # don't use this

    def resetcaches(self):
        self.metadatacache.clear()
        self.registry_map.clear()
        self.serie_tablename.clear()