tsio.py 18 KB
Newer Older
1
from datetime import datetime
2
from contextlib import contextmanager
3
import logging
4
import hashlib
5
6
7

import pandas as pd

8
from sqlalchemy import Table, Column, Integer, ForeignKey
Aurélien Campéas's avatar
Aurélien Campéas committed
9
from sqlalchemy.sql.expression import select, func, desc
10
from sqlalchemy.dialects.postgresql import BYTEA
11

12
from tshistory.schema import tsschema
13
14
from tshistory.util import (
    inject_in_index,
15
    num2float,
16
    subset,
17
    SeriesServices,
18
19
    tzaware_serie
)
20
from tshistory.snapshot import Snapshot
21

22
L = logging.getLogger('tshistory.tsio')
23
TABLES = {}
24
25


26
class TimeSerie(SeriesServices):
27
    namespace = 'tsh'
28
    schema = None
29
    metadatacache = None
30
    registry_map = None
31
32
33

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
34
35
        self.schema = tsschema(namespace)
        self.schema.define()
36
        self.metadatacache = {}
37
        self.registry_map = {}
38

39
    def insert(self, cn, newts, seriename, author,
40
41
               metadata=None,
               _insertion_date=None):
42
        """Create a new revision of a given time series
43

44
        newts: pandas.Series with date index
45
        seriename: str unique identifier of the serie
46
        author: str free-form author name
47
        metadata: optional dict for changeset metadata
48
49
        """
        assert isinstance(newts, pd.Series)
50
        assert isinstance(seriename, str)
Aurélien Campéas's avatar
Aurélien Campéas committed
51
        assert isinstance(author, str)
52
        assert metadata is None or isinstance(metadata, dict)
Aurélien Campéas's avatar
Aurélien Campéas committed
53
        assert _insertion_date is None or isinstance(_insertion_date, datetime)
54
        assert not newts.index.duplicated().any()
55

56
        newts = num2float(newts)
57

58
        if not len(newts):
59
            return
60

61
        assert ('<M8[ns]' == newts.index.dtype or
62
                'datetime' in str(newts.index.dtype) and not
63
64
                isinstance(newts.index, pd.MultiIndex))

65
66
        newts.name = seriename
        table = self._get_ts_table(cn, seriename)
67

68
        if table is None:
69
            return self._create(cn, newts, seriename, author,
70
                                metadata, _insertion_date)
71

72
        return self._update(cn, table, newts, seriename, author,
73
                            metadata, _insertion_date)
74

75
    def get(self, cn, seriename, revision_date=None,
76
77
            from_value_date=None, to_value_date=None,
            _keep_nans=False):
78
79
80
81
82
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

83
        """
84
        table = self._get_ts_table(cn, seriename)
85
86
        if table is None:
            return
87

88
        csetfilter = []
89
        if revision_date:
90
            csetfilter.append(lambda cset: cset.c.insertion_date <= revision_date)
91
        snap = Snapshot(cn, self, seriename)
92
        _, current = snap.find(csetfilter=csetfilter,
93
94
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)
95

96
        if current is not None and not _keep_nans:
97
            current.name = seriename
98
            current = current[~current.isnull()]
99
        return current
100

101
    def metadata(self, cn, seriename):
102
        """Return metadata dict of timeserie."""
103
104
        if seriename in self.metadatacache:
            return self.metadatacache[seriename]
105
106
        reg = self.schema.registry
        sql = select([reg.c.metadata]).where(
107
            reg.c.name == seriename
108
109
        )
        meta = cn.execute(sql).scalar()
110
        self.metadatacache[seriename] = meta
111
112
        return meta

113
    def update_metadata(self, cn, seriename, metadata, internal=False):
114
        assert isinstance(metadata, dict)
115
        meta = self.metadata(cn, seriename)
116
117
118
119
120
        if not internal:
            assert set(meta.keys()).intersection(metadata.keys()) == set()
        meta.update(metadata)
        reg = self.schema.registry
        sql = reg.update().where(
121
            reg.c.name == seriename
122
123
124
        ).values(metadata=metadata)
        cn.execute(sql)

125
126
127
128
129
130
131
132
    def changeset_metadata(self, cn, csid):
        cset = self.schema.changeset
        sql = 'select metadata from "{ns}".changeset where id = {id}'.format(
            ns=self.namespace,
            id=csid
        )
        return cn.execute(sql).scalar()

133
    def get_history(self, cn, seriename,
134
                    from_insertion_date=None,
135
136
                    to_insertion_date=None,
                    from_value_date=None,
137
                    to_value_date=None,
138
                    deltabefore=None,
139
                    deltaafter=None):
140
        table = self._get_ts_table(cn, seriename)
141
142
143
        if table is None:
            return

144
145
146
147
        if deltabefore is not None or deltaafter is not None:
            assert from_value_date is None
            assert to_value_date is None

148
        cset = self.schema.changeset
149
150
151
152
153
154
155
156
157
158
159
160
        revsql = select(
            [cset.c.id, cset.c.insertion_date]
        ).order_by(
            cset.c.id
        ).where(
            table.c.cset == cset.c.id
        )

        if from_insertion_date:
            revsql = revsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            revsql = revsql.where(cset.c.insertion_date <= to_insertion_date)
161

162
163
164
        revs = cn.execute(revsql).fetchall()
        if not revs:
            return
165

166
        snapshot = Snapshot(cn, self, seriename)
167
168
        series = []
        for csid, idate in revs:
169
170
171
172
            if (deltabefore, deltaafter) != (None, None):
                from_value_date = None
                to_value_date = None
                if deltabefore is not None:
173
                    from_value_date = idate - deltabefore
174
                if deltaafter is not None:
175
                    to_value_date = idate + deltaafter
176
177
            series.append((
                idate,
178
179
180
                snapshot.find(csetfilter=[lambda cset: cset.c.id == csid],
                              from_value_date=from_value_date,
                              to_value_date=to_value_date)[1]
181
            ))
182
183

        for revdate, serie in series:
184
            inject_in_index(serie, revdate)
185
186

        serie = pd.concat([serie for revdate_, serie in series])
187
        serie.name = seriename
188
        return serie
189

190
    def get_delta(self, cn, seriename, delta):
191
        histo = self.get_history(
192
            cn, seriename, deltabefore=-delta
193
194
195
196
197
198
199
200
201
202
        )

        df = histo.reset_index()
        # df_date is a dataframe with two columns: value_date and insertion_date
        df_date = df.loc[:, ['insertion_date', 'value_date']]

        # now in selected_dates each value_date has only one occurence
        # which is the last inserted
        selected_dates = df_date.groupby('value_date').max().reset_index()

203
        ts = df[seriename]
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
        # ts is built from the df returned from get_history
        # ts index is now a simple index of tuples (insert_date, value_date)
        ts.index = ((row.insertion_date, row.value_date)
                    for row in df.itertuples())
        # in ts, there ie still all the couple value_date * insertion_date
        # We now used the selected_dates to select in ts only
        # the couple (value_date, insertion_date)
        # which corresponds to the last insertion_date
        ts_select = ts[[(row[2], row[1])
                        for row in selected_dates.itertuples()]]

        # ts_select has still a simple index of tuples (value_date, insertion_date)
        new_index = (elt[1] for elt in ts_select.index)

        # we only keep the value_date information from the index
        ts_select.index = new_index
        return ts_select


223
224
    def exists(self, cn, seriename):
        return self._get_ts_table(cn, seriename) is not None
225

226
    def latest_insertion_date(self, cn, seriename):
227
        cset = self.schema.changeset
228
        tstable = self._get_ts_table(cn, seriename)
229
        sql = select([func.max(cset.c.insertion_date)]
230
        ).where(tstable.c.cset == cset.c.id)
231
        return cn.execute(sql).scalar()
232

233
234
235
236
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
        assert mode in ('strict', 'before', 'after')
        cset = self.schema.changeset
        table = self._table_definition_for(seriename)
237
238
        sql = select([table.c.cset]).where(
            table.c.cset == cset.c.id
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
        )
        if mode == 'strict':
            sql = sql.where(cset.c.insertion_date == revdate)
        elif mode == 'before':
            sql = sql.where(cset.c.insertion_date <= revdate)
        else:
            sql = sql.where(cset.c.insertion_date >= revdate)
        return cn.execute(sql).scalar()

    def strip(self, cn, seriename, csid):
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs

        # put stripping info in the metadata
        cset = self.schema.changeset
        cset_serie = self.schema.changeset_series
        for log in logs:
            # update changeset.metadata
257
            metadata = self.changeset_metadata(cn, log['rev']) or {}
258
259
260
261
262
263
            metadata['tshistory.info'] = 'got stripped from {}'.format(csid)
            sql = cset.update().where(cset.c.id == log['rev']
            ).values(metadata=metadata)
            cn.execute(sql)
            # delete changset_serie item
            sql = cset_serie.delete().where(
264
                cset_serie.c.cset == log['rev']
265
            ).where(
266
                cset_serie.c.serie == self._name_to_regid(cn, seriename)
267
268
269
270
271
            )
            cn.execute(sql)

        # wipe the diffs
        table = self._table_definition_for(seriename)
272
        cn.execute(table.delete().where(table.c.cset >= csid))
273

274
    def info(self, cn):
275
276
        """Gather global statistics on the current tshistory repository
        """
277
        sql = 'select count(*) from {}.registry'.format(self.namespace)
278
        stats = {'series count': cn.execute(sql).scalar()}
279
        sql = 'select max(id) from {}.changeset'.format(self.namespace)
280
        stats['changeset count'] = cn.execute(sql).scalar()
281
        sql = 'select distinct name from {}.registry order by name'.format(self.namespace)
282
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
283
284
        return stats

285
    def log(self, cn, limit=0, names=None, authors=None,
286
            stripped=False,
287
288
            fromrev=None, torev=None,
            fromdate=None, todate=None):
289
290
291
292
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
293
294
295
296
297
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
298

299
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date, cset.c.metadata]
300
        ).distinct().order_by(desc(cset.c.id))
301
302
303

        if limit:
            sql = sql.limit(limit)
304
305
        if names:
            sql = sql.where(reg.c.name.in_(names))
306
307
        if authors:
            sql = sql.where(cset.c.author.in_(authors))
308
309
310
311
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)
        if torev:
            sql = sql.where(cset.c.id <= torev)
312
313
314
315
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)
        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)
316
317
318
319
        if stripped:
            # outerjoin to show dead things
            sql = sql.select_from(cset.outerjoin(cset_series))
        else:
320
            sql = sql.where(cset.c.id == cset_series.c.cset
321
            ).where(cset_series.c.serie == reg.c.id)
322

323
        rset = cn.execute(sql)
324
        for csetid, author, revdate, meta in rset.fetchall():
325
326
            log.append({'rev': csetid, 'author': author,
                        'date': pd.Timestamp(revdate, tz='utc'),
327
                        'meta': meta or {},
328
                        'names': self._changeset_series(cn, csetid)})
329

330
        log.sort(key=lambda rev: rev['rev'])
331
332
        return log

333
334
    # /API
    # Helpers
335

336
337
    # creation / update

338
    def _create(self, cn, newts, seriename, author,
339
                metadata=None, insertion_date=None):
340
341
342
        # initial insertion
        if len(newts) == 0:
            return None
343
        snapshot = Snapshot(cn, self, seriename)
344
        csid = self._newchangeset(cn, author, insertion_date, metadata)
345
346
347
348
349
        head = snapshot.create(newts)
        value = {
            'cset': csid,
            'snapshot': head
        }
350
        table = self._make_ts_table(cn, seriename, newts)
351
        cn.execute(table.insert().values(value))
352
        self._finalize_insertion(cn, csid, seriename)
353
        L.info('first insertion of %s (size=%s) by %s',
354
               seriename, len(newts), author)
355
356
        return newts

357
    def _update(self, cn, table, newts, seriename, author,
358
                metadata=None, insertion_date=None):
359
360
        self._validate(cn, newts, seriename)
        snapshot = Snapshot(cn, self, seriename)
361
362
363
        diff = self.diff(snapshot.last(newts.index.min(),
                                       newts.index.max()),
                         newts)
364
365
        if not len(diff):
            L.info('no difference in %s by %s (for ts of size %s)',
366
                   seriename, author, len(newts))
367
368
            return

369
        csid = self._newchangeset(cn, author, insertion_date, metadata)
370
371
372
373
374
375
        head = snapshot.update(diff)
        value = {
            'cset': csid,
            'snapshot': head
        }
        cn.execute(table.insert().values(value))
376
        self._finalize_insertion(cn, csid, seriename)
377
378

        L.info('inserted diff (size=%s) for ts %s by %s',
379
               len(diff), seriename, author)
380
381
        return diff

382
383
    # ts serialisation

384
385
386
387
388
    def _ensure_tz_consistency(self, cn, ts):
        """Return timeserie with tz aware index or not depending on metadata
        tzaware.
        """
        assert ts.name is not None
389
        metadata = self.metadata(cn, ts.name)
390
391
392
393
        if metadata and metadata.get('tzaware', False):
            return ts.tz_localize('UTC')
        return ts

394
    # serie table handling
395

396
    def _ts_table_name(self, seriename):
397
        seriename = self._tablename(seriename)
398
        return '{}.timeserie.{}'.format(self.namespace, seriename)
399

400
    def _table_definition_for(self, seriename):
401
        tablename = self._ts_table_name(seriename)
402
        seriename = self._tablename(seriename)
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
        table = TABLES.get(tablename)
        if table is None:
            TABLES[tablename] = table = Table(
                seriename, self.schema.meta,
                Column('id', Integer, primary_key=True),
                Column('cset', Integer,
                       ForeignKey('{}.changeset.id'.format(self.namespace)),
                       index=True, nullable=False),
                Column('snapshot', Integer,
                       ForeignKey('{}.snapshot.{}.id'.format(
                           self.namespace,
                           seriename)),
                       index=True),
                schema='{}.timeserie'.format(self.namespace),
                extend_existing=True
            )
        return table
420

421
422
423
    def _make_ts_table(self, cn, seriename, ts):
        tablename = self._ts_table_name(seriename)
        table = self._table_definition_for(seriename)
424
        table.create(cn)
425
426
        index = ts.index
        inames = [name for name in index.names if name]
427
        sql = self.schema.registry.insert().values(
428
            name=seriename,
429
            table_name=tablename,
430
431
432
433
434
435
            metadata={
                'tzaware': tzaware_serie(ts),
                'index_type': index.dtype.name,
                'index_names': inames,
                'value_type': ts.dtypes.name
            },
436
        )
437
        cn.execute(sql)
438
439
        return table

440
    def _get_ts_table(self, cn, seriename):
441
        reg = self.schema.registry
442
        tablename = self._ts_table_name(seriename)
443
        sql = reg.select().where(reg.c.table_name == tablename)
444
        tid = cn.execute(sql).scalar()
445
        if tid:
446
            return self._table_definition_for(seriename)
447

448
449
    # changeset handling

450
    def _newchangeset(self, cn, author, insertion_date=None, metadata=None):
451
        table = self.schema.changeset
452
453
454
        if insertion_date is not None:
            assert insertion_date.tzinfo is not None
        idate = pd.Timestamp(insertion_date or datetime.utcnow(), tz='UTC')
455
456
        sql = table.insert().values(
            author=author,
457
            metadata=metadata,
458
            insertion_date=idate)
459
        return cn.execute(sql).inserted_primary_key[0]
460

461
    def _changeset_series(self, cn, csid):
462
        cset_serie = self.schema.changeset_series
463
464
465
466
467
        reg = self.schema.registry
        sql = select(
            [reg.c.name]
        ).where(cset_serie.c.cset == csid
        ).where(cset_serie.c.serie == reg.c.id)
468

469
470
471
472
        return [
            row.name
            for row in cn.execute(sql).fetchall()
        ]
473
474
475

    # insertion handling

476
    def _validate(self, cn, ts, seriename):
477
478
        if ts.isnull().all():
            # ts erasure
479
            return
480
        tstype = ts.dtype
481
        meta = self.metadata(cn, seriename)
482
        if tstype != meta['value_type']:
483
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
484
                seriename, tstype, meta['value_type'])
485
            raise Exception(m)
486
        if ts.index.dtype.name != meta['index_type']:
487
            raise Exception('Incompatible index types')
488

489
490
    def _name_to_regid(self, cn, seriename):
        regid = self.registry_map.get(seriename)
491
492
493
494
        if regid is not None:
            return regid

        registry = self.schema.registry
495
496
        sql = select([registry.c.id]).where(registry.c.name == seriename)
        regid = self.registry_map[seriename] = cn.execute(sql).scalar()
497
498
        return regid

499
    def _finalize_insertion(self, cn, csid, seriename):
500
501
502
        table = self.schema.changeset_series
        sql = table.insert().values(
            cset=csid,
503
            serie=self._name_to_regid(cn, seriename)
504
505
        )
        cn.execute(sql)