tsio.py 17.2 KB
Newer Older
1
from datetime import datetime
2
from contextlib import contextmanager
3
import logging
4
import hashlib
5
6
7

import pandas as pd

8
from sqlalchemy import Table, Column, Integer, ForeignKey
Aurélien Campéas's avatar
Aurélien Campéas committed
9
from sqlalchemy.sql.expression import select, func, desc
10
from sqlalchemy.dialects.postgresql import BYTEA
11

12
from tshistory.schema import tsschema
13
14
from tshistory.util import (
    inject_in_index,
15
    num2float,
16
    subset,
17
    SeriesServices,
18
19
    tzaware_serie
)
20
from tshistory.snapshot import Snapshot
21

22
L = logging.getLogger('tshistory.tsio')
23
TABLES = {}
24
25


26
class TimeSerie(SeriesServices):
27
    namespace = 'tsh'
28
    schema = None
29
    metadatacache = None
30
31
32

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
33
34
        self.schema = tsschema(namespace)
        self.schema.define()
35
        self.metadatacache = {}
36

37
38
39
    def insert(self, cn, newts, name, author,
               metadata=None,
               _insertion_date=None):
40
        """Create a new revision of a given time series
41

42
        newts: pandas.Series with date index
43
        name: str unique identifier of the serie
44
        author: str free-form author name
45
        metadata: optional dict for changeset metadata
46
47
        """
        assert isinstance(newts, pd.Series)
Aurélien Campéas's avatar
Aurélien Campéas committed
48
49
        assert isinstance(name, str)
        assert isinstance(author, str)
50
        assert metadata is None or isinstance(metadata, dict)
Aurélien Campéas's avatar
Aurélien Campéas committed
51
        assert _insertion_date is None or isinstance(_insertion_date, datetime)
52
        assert not newts.index.duplicated().any()
53

54
        newts = num2float(newts)
55

56
        if not len(newts):
57
            return
58

59
        assert ('<M8[ns]' == newts.index.dtype or
60
                'datetime' in str(newts.index.dtype) and not
61
62
                isinstance(newts.index, pd.MultiIndex))

63
        newts.name = name
64
        table = self._get_ts_table(cn, name)
65

66
        if table is None:
67
68
            return self._create(cn, newts, name, author,
                                metadata, _insertion_date)
69

70
71
        return self._update(cn, table, newts, name, author,
                            metadata, _insertion_date)
72

73
    def get(self, cn, name, revision_date=None,
74
75
            from_value_date=None, to_value_date=None,
            _keep_nans=False):
76
77
78
79
80
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

81
        """
82
        table = self._get_ts_table(cn, name)
83
84
        if table is None:
            return
85

86
        csetfilter = []
87
        if revision_date:
88
            csetfilter.append(lambda cset: cset.c.insertion_date <= revision_date)
89
        snap = Snapshot(cn, self, name)
90
        _, current = snap.find(csetfilter=csetfilter,
91
92
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)
93

94
        if current is not None and not _keep_nans:
95
            current.name = name
96
            current = current[~current.isnull()]
97
        return current
98

99
    def metadata(self, cn, tsname):
100
101
102
103
104
105
106
107
108
109
110
        """Return metadata dict of timeserie."""
        if tsname in self.metadatacache:
            return self.metadatacache[tsname]
        reg = self.schema.registry
        sql = select([reg.c.metadata]).where(
            reg.c.name == tsname
        )
        meta = cn.execute(sql).scalar()
        self.metadatacache[tsname] = meta
        return meta

111
112
113
114
115
116
117
118
119
120
121
122
    def update_metadata(self, cn, tsname, metadata, internal=False):
        assert isinstance(metadata, dict)
        meta = self.metadata(cn, tsname)
        if not internal:
            assert set(meta.keys()).intersection(metadata.keys()) == set()
        meta.update(metadata)
        reg = self.schema.registry
        sql = reg.update().where(
            reg.c.name == tsname
        ).values(metadata=metadata)
        cn.execute(sql)

123
124
125
126
127
128
129
130
    def changeset_metadata(self, cn, csid):
        cset = self.schema.changeset
        sql = 'select metadata from "{ns}".changeset where id = {id}'.format(
            ns=self.namespace,
            id=csid
        )
        return cn.execute(sql).scalar()

131
132
    def get_history(self, cn, name,
                    from_insertion_date=None,
133
134
                    to_insertion_date=None,
                    from_value_date=None,
135
                    to_value_date=None,
136
                    deltabefore=None,
137
                    deltaafter=None):
138
139
140
141
        table = self._get_ts_table(cn, name)
        if table is None:
            return

142
143
144
145
        if deltabefore is not None or deltaafter is not None:
            assert from_value_date is None
            assert to_value_date is None

146
        cset = self.schema.changeset
147
148
149
150
151
152
153
154
155
156
157
158
        revsql = select(
            [cset.c.id, cset.c.insertion_date]
        ).order_by(
            cset.c.id
        ).where(
            table.c.cset == cset.c.id
        )

        if from_insertion_date:
            revsql = revsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            revsql = revsql.where(cset.c.insertion_date <= to_insertion_date)
159

160
161
162
        revs = cn.execute(revsql).fetchall()
        if not revs:
            return
163

164
165
166
        snapshot = Snapshot(cn, self, name)
        series = []
        for csid, idate in revs:
167
168
169
170
            if (deltabefore, deltaafter) != (None, None):
                from_value_date = None
                to_value_date = None
                if deltabefore is not None:
171
                    from_value_date = idate - deltabefore
172
                if deltaafter is not None:
173
                    to_value_date = idate + deltaafter
174
175
            series.append((
                idate,
176
177
178
                snapshot.find(csetfilter=[lambda cset: cset.c.id == csid],
                              from_value_date=from_value_date,
                              to_value_date=to_value_date)[1]
179
            ))
180
181

        for revdate, serie in series:
182
            inject_in_index(serie, revdate)
183
184
185
186

        serie = pd.concat([serie for revdate_, serie in series])
        serie.name = name
        return serie
187

188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
    def get_delta(self, cn, name, delta):
        histo = self.get_history(
            cn, name, deltabefore=-delta
        )

        df = histo.reset_index()
        # df_date is a dataframe with two columns: value_date and insertion_date
        df_date = df.loc[:, ['insertion_date', 'value_date']]

        # now in selected_dates each value_date has only one occurence
        # which is the last inserted
        selected_dates = df_date.groupby('value_date').max().reset_index()

        ts = df[name]
        # ts is built from the df returned from get_history
        # ts index is now a simple index of tuples (insert_date, value_date)
        ts.index = ((row.insertion_date, row.value_date)
                    for row in df.itertuples())
        # in ts, there ie still all the couple value_date * insertion_date
        # We now used the selected_dates to select in ts only
        # the couple (value_date, insertion_date)
        # which corresponds to the last insertion_date
        ts_select = ts[[(row[2], row[1])
                        for row in selected_dates.itertuples()]]

        # ts_select has still a simple index of tuples (value_date, insertion_date)
        new_index = (elt[1] for elt in ts_select.index)

        # we only keep the value_date information from the index
        ts_select.index = new_index
        return ts_select


221
222
223
    def exists(self, cn, name):
        return self._get_ts_table(cn, name) is not None

224
    def latest_insertion_date(self, cn, name):
225
        cset = self.schema.changeset
226
        tstable = self._get_ts_table(cn, name)
227
        sql = select([func.max(cset.c.insertion_date)]
228
        ).where(tstable.c.cset == cset.c.id)
229
        return cn.execute(sql).scalar()
230

231
232
233
234
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
        assert mode in ('strict', 'before', 'after')
        cset = self.schema.changeset
        table = self._table_definition_for(seriename)
235
236
        sql = select([table.c.cset]).where(
            table.c.cset == cset.c.id
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
        )
        if mode == 'strict':
            sql = sql.where(cset.c.insertion_date == revdate)
        elif mode == 'before':
            sql = sql.where(cset.c.insertion_date <= revdate)
        else:
            sql = sql.where(cset.c.insertion_date >= revdate)
        return cn.execute(sql).scalar()

    def strip(self, cn, seriename, csid):
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs

        # put stripping info in the metadata
        cset = self.schema.changeset
        cset_serie = self.schema.changeset_series
        for log in logs:
            # update changeset.metadata
255
            metadata = self.changeset_metadata(cn, log['rev']) or {}
256
257
258
259
260
261
            metadata['tshistory.info'] = 'got stripped from {}'.format(csid)
            sql = cset.update().where(cset.c.id == log['rev']
            ).values(metadata=metadata)
            cn.execute(sql)
            # delete changset_serie item
            sql = cset_serie.delete().where(
262
                cset_serie.c.cset == log['rev']
263
264
265
266
267
268
269
            ).where(
                cset_serie.c.serie == seriename
            )
            cn.execute(sql)

        # wipe the diffs
        table = self._table_definition_for(seriename)
270
        cn.execute(table.delete().where(table.c.cset >= csid))
271

272
    def info(self, cn):
273
274
        """Gather global statistics on the current tshistory repository
        """
275
        sql = 'select count(*) from {}.registry'.format(self.namespace)
276
        stats = {'series count': cn.execute(sql).scalar()}
277
        sql = 'select max(id) from {}.changeset'.format(self.namespace)
278
        stats['changeset count'] = cn.execute(sql).scalar()
279
        sql = 'select distinct name from {}.registry order by name'.format(self.namespace)
280
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
281
282
        return stats

283
    def log(self, cn, limit=0, names=None, authors=None,
284
            stripped=False,
285
286
            fromrev=None, torev=None,
            fromdate=None, todate=None):
287
288
289
290
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
291
292
293
294
295
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
296

297
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date, cset.c.metadata]
298
        ).distinct().order_by(desc(cset.c.id))
299
300
301

        if limit:
            sql = sql.limit(limit)
302
303
        if names:
            sql = sql.where(reg.c.name.in_(names))
304
305
        if authors:
            sql = sql.where(cset.c.author.in_(authors))
306
307
308
309
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)
        if torev:
            sql = sql.where(cset.c.id <= torev)
310
311
312
313
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)
        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)
314
315
316
317
        if stripped:
            # outerjoin to show dead things
            sql = sql.select_from(cset.outerjoin(cset_series))
        else:
318
            sql = sql.where(cset.c.id == cset_series.c.cset
319
            ).where(cset_series.c.serie == reg.c.name)
320

321
        rset = cn.execute(sql)
322
        for csetid, author, revdate, meta in rset.fetchall():
323
324
            log.append({'rev': csetid, 'author': author,
                        'date': pd.Timestamp(revdate, tz='utc'),
325
                        'meta': meta or {},
326
                        'names': self._changeset_series(cn, csetid)})
327

328
        log.sort(key=lambda rev: rev['rev'])
329
330
        return log

331
332
    # /API
    # Helpers
333

334
335
    # creation / update

336
337
    def _create(self, cn, newts, name, author,
                metadata=None, insertion_date=None):
338
339
340
341
        # initial insertion
        if len(newts) == 0:
            return None
        snapshot = Snapshot(cn, self, name)
342
        csid = self._newchangeset(cn, author, insertion_date, metadata)
343
344
345
346
347
348
349
350
351
        head = snapshot.create(newts)
        value = {
            'cset': csid,
            'snapshot': head
        }
        table = self._make_ts_table(cn, name, newts)
        cn.execute(table.insert().values(value))
        self._finalize_insertion(cn, csid, name)
        L.info('first insertion of %s (size=%s) by %s',
Aurélien Campéas's avatar
Aurélien Campéas committed
352
               name, len(newts), author)
353
354
        return newts

355
356
    def _update(self, cn, table, newts, name, author,
                metadata=None, insertion_date=None):
357
358
        self._validate(cn, newts, name)
        snapshot = Snapshot(cn, self, name)
359
360
361
        diff = self.diff(snapshot.last(newts.index.min(),
                                       newts.index.max()),
                         newts)
362
363
        if not len(diff):
            L.info('no difference in %s by %s (for ts of size %s)',
Aurélien Campéas's avatar
Aurélien Campéas committed
364
                   name, author, len(newts))
365
366
            return

367
        csid = self._newchangeset(cn, author, insertion_date, metadata)
368
369
370
371
372
373
374
375
376
        head = snapshot.update(diff)
        value = {
            'cset': csid,
            'snapshot': head
        }
        cn.execute(table.insert().values(value))
        self._finalize_insertion(cn, csid, name)

        L.info('inserted diff (size=%s) for ts %s by %s',
Aurélien Campéas's avatar
Aurélien Campéas committed
377
               len(diff), name, author)
378
379
        return diff

380
381
    # ts serialisation

382
383
384
385
386
    def _ensure_tz_consistency(self, cn, ts):
        """Return timeserie with tz aware index or not depending on metadata
        tzaware.
        """
        assert ts.name is not None
387
        metadata = self.metadata(cn, ts.name)
388
389
390
391
        if metadata and metadata.get('tzaware', False):
            return ts.tz_localize('UTC')
        return ts

392
    # serie table handling
393

394
    def _ts_table_name(self, seriename):
395
        seriename = self._tablename(seriename)
396
        return '{}.timeserie.{}'.format(self.namespace, seriename)
397

398
    def _table_definition_for(self, seriename):
399
        tablename = self._ts_table_name(seriename)
400
        seriename = self._tablename(seriename)
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
        table = TABLES.get(tablename)
        if table is None:
            TABLES[tablename] = table = Table(
                seriename, self.schema.meta,
                Column('id', Integer, primary_key=True),
                Column('cset', Integer,
                       ForeignKey('{}.changeset.id'.format(self.namespace)),
                       index=True, nullable=False),
                Column('snapshot', Integer,
                       ForeignKey('{}.snapshot.{}.id'.format(
                           self.namespace,
                           seriename)),
                       index=True),
                schema='{}.timeserie'.format(self.namespace),
                extend_existing=True
            )
        return table
418

419
    def _make_ts_table(self, cn, name, ts):
420
        tablename = self._ts_table_name(name)
421
        table = self._table_definition_for(name)
422
        table.create(cn)
423
424
        index = ts.index
        inames = [name for name in index.names if name]
425
        sql = self.schema.registry.insert().values(
426
            name=name,
427
            table_name=tablename,
428
429
430
431
432
433
            metadata={
                'tzaware': tzaware_serie(ts),
                'index_type': index.dtype.name,
                'index_names': inames,
                'value_type': ts.dtypes.name
            },
434
        )
435
        cn.execute(sql)
436
437
        return table

438
    def _get_ts_table(self, cn, name):
439
        reg = self.schema.registry
440
441
        tablename = self._ts_table_name(name)
        sql = reg.select().where(reg.c.table_name == tablename)
442
        tid = cn.execute(sql).scalar()
443
        if tid:
444
            return self._table_definition_for(name)
445

446
447
    # changeset handling

448
    def _newchangeset(self, cn, author, insertion_date=None, metadata=None):
449
        table = self.schema.changeset
450
451
452
        if insertion_date is not None:
            assert insertion_date.tzinfo is not None
        idate = pd.Timestamp(insertion_date or datetime.utcnow(), tz='UTC')
453
454
        sql = table.insert().values(
            author=author,
455
            metadata=metadata,
456
            insertion_date=idate)
457
        return cn.execute(sql).inserted_primary_key[0]
458

459
    def _changeset_series(self, cn, csid):
460
        cset_serie = self.schema.changeset_series
461
        sql = select([cset_serie.c.serie]
462
        ).where(cset_serie.c.cset == csid)
463

464
        return [seriename for seriename, in cn.execute(sql).fetchall()]
465
466
467

    # insertion handling

468
    def _validate(self, cn, ts, name):
469
470
        if ts.isnull().all():
            # ts erasure
471
            return
472
        tstype = ts.dtype
473
        meta = self.metadata(cn, name)
474
        if tstype != meta['value_type']:
475
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
476
                name, tstype, meta['value_type'])
477
            raise Exception(m)
478
        if ts.index.dtype.name != meta['index_type']:
479
            raise Exception('Incompatible index types')
480

481
482
483
484
485
486
487
    def _finalize_insertion(self, cn, csid, name):
        table = self.schema.changeset_series
        sql = table.insert().values(
            cset=csid,
            serie=name
        )
        cn.execute(sql)