tsio.py 16.6 KB
Newer Older
1
from datetime import datetime
2
from contextlib import contextmanager
3
import logging
4
5
6

import pandas as pd

7
from sqlalchemy import Table, Column, Integer, ForeignKey
Aurélien Campéas's avatar
Aurélien Campéas committed
8
from sqlalchemy.sql.expression import select, func, desc
9
from sqlalchemy.dialects.postgresql import BYTEA
10

11
from tshistory.schema import tsschema
12
13
from tshistory.util import (
    inject_in_index,
14
    num2float,
15
    subset,
16
    SeriesServices,
17
18
    tzaware_serie
)
19
from tshistory.snapshot import Snapshot
20

21
L = logging.getLogger('tshistory.tsio')
22
TABLES = {}
23
24


25
class TimeSerie(SeriesServices):
26
    namespace = 'tsh'
27
    schema = None
28
29
30

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
31
32
        self.schema = tsschema(namespace)
        self.schema.define()
33
        self.metadatacache = {}
34

35
36
37
    def insert(self, cn, newts, name, author,
               metadata=None,
               _insertion_date=None):
38
        """Create a new revision of a given time series
39

40
        newts: pandas.Series with date index
41
        name: str unique identifier of the serie
42
        author: str free-form author name
43
        metadata: optional dict for changeset metadata
44
45
        """
        assert isinstance(newts, pd.Series)
Aurélien Campéas's avatar
Aurélien Campéas committed
46
47
        assert isinstance(name, str)
        assert isinstance(author, str)
48
        assert metadata is None or isinstance(metadata, dict)
Aurélien Campéas's avatar
Aurélien Campéas committed
49
        assert _insertion_date is None or isinstance(_insertion_date, datetime)
50
        assert not newts.index.duplicated().any()
51

52
        newts = num2float(newts)
53

54
        if not len(newts):
55
            return
56

57
        assert ('<M8[ns]' == newts.index.dtype or
58
                'datetime' in str(newts.index.dtype) and not
59
60
                isinstance(newts.index, pd.MultiIndex))

61
        newts.name = name
62
        table = self._get_ts_table(cn, name)
63

64
        if table is None:
65
66
            return self._create(cn, newts, name, author,
                                metadata, _insertion_date)
67

68
69
        return self._update(cn, table, newts, name, author,
                            metadata, _insertion_date)
70

71
    def get(self, cn, name, revision_date=None,
72
73
            from_value_date=None, to_value_date=None,
            _keep_nans=False):
74
75
76
77
78
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

79
        """
80
        table = self._get_ts_table(cn, name)
81
82
        if table is None:
            return
83

84
85
86
        qfilter = []
        if revision_date:
            qfilter.append(lambda cset, _: cset.c.insertion_date <= revision_date)
87
        snap = Snapshot(cn, self, name)
88
89
90
        _, current = snap.find(qfilter,
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)
91

92
        if current is not None and not _keep_nans:
93
            current.name = name
94
            current = current[~current.isnull()]
95
        return current
96

97
    def metadata(self, cn, tsname):
98
99
100
101
102
103
104
105
106
107
108
        """Return metadata dict of timeserie."""
        if tsname in self.metadatacache:
            return self.metadatacache[tsname]
        reg = self.schema.registry
        sql = select([reg.c.metadata]).where(
            reg.c.name == tsname
        )
        meta = cn.execute(sql).scalar()
        self.metadatacache[tsname] = meta
        return meta

109
110
111
112
113
114
115
116
117
118
119
120
    def update_metadata(self, cn, tsname, metadata, internal=False):
        assert isinstance(metadata, dict)
        meta = self.metadata(cn, tsname)
        if not internal:
            assert set(meta.keys()).intersection(metadata.keys()) == set()
        meta.update(metadata)
        reg = self.schema.registry
        sql = reg.update().where(
            reg.c.name == tsname
        ).values(metadata=metadata)
        cn.execute(sql)

121
122
123
124
125
126
127
128
    def changeset_metadata(self, cn, csid):
        cset = self.schema.changeset
        sql = 'select metadata from "{ns}".changeset where id = {id}'.format(
            ns=self.namespace,
            id=csid
        )
        return cn.execute(sql).scalar()

129
130
    def get_history(self, cn, name,
                    from_insertion_date=None,
131
132
                    to_insertion_date=None,
                    from_value_date=None,
133
                    to_value_date=None,
134
                    deltabefore=None,
135
                    deltaafter=None):
136
137
138
139
        table = self._get_ts_table(cn, name)
        if table is None:
            return

140
141
142
143
        if deltabefore is not None or deltaafter is not None:
            assert from_value_date is None
            assert to_value_date is None

144
        cset = self.schema.changeset
145
146
147
148
149
150
151
152
153
154
155
156
        revsql = select(
            [cset.c.id, cset.c.insertion_date]
        ).order_by(
            cset.c.id
        ).where(
            table.c.cset == cset.c.id
        )

        if from_insertion_date:
            revsql = revsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            revsql = revsql.where(cset.c.insertion_date <= to_insertion_date)
157

158
159
160
        revs = cn.execute(revsql).fetchall()
        if not revs:
            return
161

162
163
164
        snapshot = Snapshot(cn, self, name)
        series = []
        for csid, idate in revs:
165
166
167
168
169
170
171
            if deltabefore or deltaafter:
                from_value_date = idate
                to_value_date = idate
                if deltabefore:
                    from_value_date = idate - deltabefore
                if deltaafter:
                    to_value_date = idate + deltaafter
172
173
174
175
176
177
            series.append((
                idate,
                snapshot.find([lambda cset, _: cset.c.id == csid],
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)[1]
            ))
178
179

        for revdate, serie in series:
180
            inject_in_index(serie, revdate)
181
182
183
184

        serie = pd.concat([serie for revdate_, serie in series])
        serie.name = name
        return serie
185

186
187
188
    def exists(self, cn, name):
        return self._get_ts_table(cn, name) is not None

189
    def latest_insertion_date(self, cn, name):
190
        cset = self.schema.changeset
191
        tstable = self._get_ts_table(cn, name)
192
        sql = select([func.max(cset.c.insertion_date)]
193
        ).where(tstable.c.cset == cset.c.id)
194
        return cn.execute(sql).scalar()
195

196
197
198
199
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
        assert mode in ('strict', 'before', 'after')
        cset = self.schema.changeset
        table = self._table_definition_for(seriename)
200
201
        sql = select([table.c.cset]).where(
            table.c.cset == cset.c.id
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
        )
        if mode == 'strict':
            sql = sql.where(cset.c.insertion_date == revdate)
        elif mode == 'before':
            sql = sql.where(cset.c.insertion_date <= revdate)
        else:
            sql = sql.where(cset.c.insertion_date >= revdate)
        return cn.execute(sql).scalar()

    def strip(self, cn, seriename, csid):
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs

        # put stripping info in the metadata
        cset = self.schema.changeset
        cset_serie = self.schema.changeset_series
        for log in logs:
            # update changeset.metadata
220
            metadata = self.changeset_metadata(cn, log['rev']) or {}
221
222
223
224
225
226
            metadata['tshistory.info'] = 'got stripped from {}'.format(csid)
            sql = cset.update().where(cset.c.id == log['rev']
            ).values(metadata=metadata)
            cn.execute(sql)
            # delete changset_serie item
            sql = cset_serie.delete().where(
227
                cset_serie.c.cset == log['rev']
228
229
230
231
232
233
234
            ).where(
                cset_serie.c.serie == seriename
            )
            cn.execute(sql)

        # wipe the diffs
        table = self._table_definition_for(seriename)
235
        cn.execute(table.delete().where(table.c.cset >= csid))
236

237
    def info(self, cn):
238
239
        """Gather global statistics on the current tshistory repository
        """
240
        sql = 'select count(*) from {}.registry'.format(self.namespace)
241
        stats = {'series count': cn.execute(sql).scalar()}
242
        sql = 'select max(id) from {}.changeset'.format(self.namespace)
243
        stats['changeset count'] = cn.execute(sql).scalar()
244
        sql = 'select distinct name from {}.registry order by name'.format(self.namespace)
245
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
246
247
        return stats

248
    def log(self, cn, limit=0, diff=False, names=None, authors=None,
249
            stripped=False,
250
251
            fromrev=None, torev=None,
            fromdate=None, todate=None):
252
253
254
255
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
256
257
258
259
260
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
261

262
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date, cset.c.metadata]
263
        ).distinct().order_by(desc(cset.c.id))
264
265
266

        if limit:
            sql = sql.limit(limit)
267
268
        if names:
            sql = sql.where(reg.c.name.in_(names))
269
270
        if authors:
            sql = sql.where(cset.c.author.in_(authors))
271
272
273
274
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)
        if torev:
            sql = sql.where(cset.c.id <= torev)
275
276
277
278
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)
        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)
279
280
281
282
        if stripped:
            # outerjoin to show dead things
            sql = sql.select_from(cset.outerjoin(cset_series))
        else:
283
            sql = sql.where(cset.c.id == cset_series.c.cset
284
            ).where(cset_series.c.serie == reg.c.name)
285

286
        rset = cn.execute(sql)
287
        for csetid, author, revdate, meta in rset.fetchall():
288
289
            log.append({'rev': csetid, 'author': author,
                        'date': pd.Timestamp(revdate, tz='utc'),
290
                        'meta': meta or {},
291
                        'names': self._changeset_series(cn, csetid)})
292
293
294

        if diff:
            for rev in log:
295
                rev['diff'] = {name: self.diff_at(cn, rev['rev'], name)
296
297
                               for name in rev['names']}

298
        log.sort(key=lambda rev: rev['rev'])
299
300
        return log

301
302
    # /API
    # Helpers
303

304
305
    # creation / update

306
307
    def _create(self, cn, newts, name, author,
                metadata=None, insertion_date=None):
308
309
310
311
        # initial insertion
        if len(newts) == 0:
            return None
        snapshot = Snapshot(cn, self, name)
312
        csid = self._newchangeset(cn, author, insertion_date, metadata)
313
314
315
316
317
318
319
320
321
322
323
324
        head = snapshot.create(newts)
        value = {
            'cset': csid,
            'snapshot': head
        }
        table = self._make_ts_table(cn, name, newts)
        cn.execute(table.insert().values(value))
        self._finalize_insertion(cn, csid, name)
        L.info('first insertion of %s (size=%s) by %s',
               name, len(newts), author or self._author)
        return newts

325
326
    def _update(self, cn, table, newts, name, author,
                metadata=None, insertion_date=None):
327
328
        self._validate(cn, newts, name)
        snapshot = Snapshot(cn, self, name)
329
330
331
        diff = self.diff(snapshot.last(newts.index.min(),
                                       newts.index.max()),
                         newts)
332
333
334
335
336
        if not len(diff):
            L.info('no difference in %s by %s (for ts of size %s)',
                   name, author or self._author, len(newts))
            return

337
        csid = self._newchangeset(cn, author, insertion_date, metadata)
338
339
340
341
342
343
344
345
346
347
348
349
350
        head = snapshot.update(diff)
        value = {
            'cset': csid,
            'diff': self._serialize(diff),
            'snapshot': head
        }
        cn.execute(table.insert().values(value))
        self._finalize_insertion(cn, csid, name)

        L.info('inserted diff (size=%s) for ts %s by %s',
               len(diff), name, author or self._author)
        return diff

351
352
    # ts serialisation

353
354
355
356
357
    def _ensure_tz_consistency(self, cn, ts):
        """Return timeserie with tz aware index or not depending on metadata
        tzaware.
        """
        assert ts.name is not None
358
        metadata = self.metadata(cn, ts.name)
359
360
361
362
        if metadata and metadata.get('tzaware', False):
            return ts.tz_localize('UTC')
        return ts

363
    # serie table handling
364

365
366
    def _ts_table_name(self, seriename):
        # namespace.seriename
367
        return '{}.timeserie.{}'.format(self.namespace, seriename)
368

369
    def _table_definition_for(self, seriename):
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
        tablename = self._ts_table_name(seriename)
        table = TABLES.get(tablename)
        if table is None:
            TABLES[tablename] = table = Table(
                seriename, self.schema.meta,
                Column('id', Integer, primary_key=True),
                Column('cset', Integer,
                       ForeignKey('{}.changeset.id'.format(self.namespace)),
                       index=True, nullable=False),
                Column('diff', BYTEA),
                Column('snapshot', Integer,
                       ForeignKey('{}.snapshot.{}.id'.format(
                           self.namespace,
                           seriename)),
                       index=True),
                schema='{}.timeserie'.format(self.namespace),
                extend_existing=True
            )
        return table
389

390
    def _make_ts_table(self, cn, name, ts):
391
        tablename = self._ts_table_name(name)
392
        table = self._table_definition_for(name)
393
        table.create(cn)
394
395
        index = ts.index
        inames = [name for name in index.names if name]
396
        sql = self.schema.registry.insert().values(
397
            name=name,
398
            table_name=tablename,
399
400
401
402
403
404
            metadata={
                'tzaware': tzaware_serie(ts),
                'index_type': index.dtype.name,
                'index_names': inames,
                'value_type': ts.dtypes.name
            },
405
        )
406
        cn.execute(sql)
407
408
        return table

409
    def _get_ts_table(self, cn, name):
410
        reg = self.schema.registry
411
412
        tablename = self._ts_table_name(name)
        sql = reg.select().where(reg.c.table_name == tablename)
413
        tid = cn.execute(sql).scalar()
414
        if tid:
415
            return self._table_definition_for(name)
416

417
418
    # changeset handling

419
    def _newchangeset(self, cn, author, insertion_date=None, metadata=None):
420
        table = self.schema.changeset
421
422
423
        if insertion_date is not None:
            assert insertion_date.tzinfo is not None
        idate = pd.Timestamp(insertion_date or datetime.utcnow(), tz='UTC')
424
425
        sql = table.insert().values(
            author=author,
426
            metadata=metadata,
427
            insertion_date=idate)
428
        return cn.execute(sql).inserted_primary_key[0]
429

430
    def _changeset_series(self, cn, csid):
431
        cset_serie = self.schema.changeset_series
432
        sql = select([cset_serie.c.serie]
433
        ).where(cset_serie.c.cset == csid)
434

435
        return [seriename for seriename, in cn.execute(sql).fetchall()]
436
437
438

    # insertion handling

439
    def _validate(self, cn, ts, name):
440
441
        if ts.isnull().all():
            # ts erasure
442
            return
443
        tstype = ts.dtype
444
        meta = self.metadata(cn, name)
445
        if tstype != meta['value_type']:
446
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
447
                name, tstype, meta['value_type'])
448
            raise Exception(m)
449
        if ts.index.dtype.name != meta['index_type']:
450
            raise Exception('Incompatible index types')
451

452
453
454
455
456
457
458
    def _finalize_insertion(self, cn, csid, name):
        table = self.schema.changeset_series
        sql = table.insert().values(
            cset=csid,
            serie=name
        )
        cn.execute(sql)
459

460
    def diff_at(self, cn, csetid, name):
461
        table = self._get_ts_table(cn, name)
462
        cset = self.schema.changeset
463
464

        def filtercset(sql):
465
            return sql.where(table.c.cset == cset.c.id
466
467
468
            ).where(cset.c.id == csetid)

        sql = filtercset(select([table.c.id]))
469
        tsid = cn.execute(sql).scalar()
470
471

        if tsid == 1:
472
            return Snapshot(cn, self, name).first
473

474
        sql = filtercset(select([table.c.diff]))
475
476
        ts = self._deserialize(cn.execute(sql).scalar(), name)
        return self._ensure_tz_consistency(cn, ts)