tsio.py 17.6 KB
Newer Older
1
from datetime import datetime
2
from contextlib import contextmanager
3
import logging
4
5
6

import pandas as pd

7
from sqlalchemy import Table, Column, Integer, ForeignKey
Aurélien Campéas's avatar
Aurélien Campéas committed
8
from sqlalchemy.sql.expression import select, func, desc
9
from sqlalchemy.dialects.postgresql import BYTEA
10

11
from tshistory.schema import tsschema
12
13
from tshistory.util import (
    inject_in_index,
14
    num2float,
15
    subset,
16
    SeriesServices,
17
18
    tzaware_serie
)
19
from tshistory.snapshot import Snapshot
20

21
L = logging.getLogger('tshistory.tsio')
22
23


24
class TimeSerie(SeriesServices):
25
    _csid = None
26
    namespace = 'tsh'
27
    schema = None
28
29
30

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
31
32
        self.schema = tsschema(namespace)
        self.schema.define()
33
        self.metadatacache = {}
34
35
36

    # API : changeset, insert, get, delete
    @contextmanager
37
    def newchangeset(self, cn, author, _insertion_date=None):
38
39
40
41
42
43
        """A context manager to allow insertion of several series within the
        same changeset identifier

        This allows to group changes to several series, hence
        producing a macro-change.

44
45
        _insertion_date is *only* provided for migration purposes and
        not part of the API.
46
        """
47
        assert self._csid is None
48
        self._csid = self._newchangeset(cn, author, _insertion_date)
49
        self._author = author
50
51
        yield
        del self._csid
52
        del self._author
53

54
    def insert(self, cn, newts, name, author=None, _insertion_date=None,
55
               extra_scalars={}):
56
        """Create a new revision of a given time series
57

58
        newts: pandas.Series with date index
59

60
        name: str unique identifier of the serie
61
62
63
64

        author: str free-form author name (mandatory, unless provided
        to the newchangeset context manager).

65
        """
66
67
        assert self._csid or author, 'author is mandatory'
        if self._csid and author:
68
69
            L.info('author r{} will not be used when in a changeset'.format(author))
            author = None
70
        assert isinstance(newts, pd.Series)
71
        assert not newts.index.duplicated().any()
72

73
        newts = num2float(newts)
74

75
        if not len(newts):
76
            return
77

78
79
80
81
        assert ('<M8[ns]' == newts.index.dtype or
                'datetime' in str(newts.index.dtype) or
                isinstance(newts.index, pd.MultiIndex))

82
        newts.name = name
83
        table = self._get_ts_table(cn, name)
84

85
86
87
88
        if isinstance(newts.index, pd.MultiIndex):
            # we impose an order to survive rountrips
            newts = newts.reorder_levels(sorted(newts.index.names))

89
90
        snapshot = Snapshot(cn, self, name)

91
92
        if table is None:
            # initial insertion
93
94
            if newts.isnull().all():
                return None
95
            newts = newts[~newts.isnull()]
96
            csid = self._csid or self._newchangeset(cn, author, _insertion_date)
97
            snapshot.create(csid, newts)
98
            value = {
99
                'cset': csid
100
            }
101
102
            # callback for extenders
            self._complete_insertion_value(value, extra_scalars)
103
            table = self._make_ts_table(cn, name, newts)
104
105
            cn.execute(table.insert().values(value))
            self._finalize_insertion(cn, csid, name)
106
107
            L.info('first insertion of %s (size=%s) by %s',
                   name, len(newts), author or self._author)
108
            return newts
109

110
111
112
        self._validate(cn, newts, name)
        diff = self.diff(snapshot.last, newts)
        if not len(diff):
113
114
            L.info('no difference in %s by %s (for ts of size %s)',
                   name, author or self._author, len(newts))
115
116
            return

117
        csid = self._csid or self._newchangeset(cn, author, _insertion_date)
118
        snapshot.update(csid, diff)
119
        value = {
120
            'cset': csid,
121
            'diff': self._serialize(diff),
122
123
124
        }
        # callback for extenders
        self._complete_insertion_value(value, extra_scalars)
125
126
        cn.execute(table.insert().values(value))
        self._finalize_insertion(cn, csid, name)
127

128
129
        L.info('inserted diff (size=%s) for ts %s by %s',
               len(diff), name, author or self._author)
130
        return diff
131

132
133
    def get(self, cn, name, revision_date=None,
            from_value_date=None, to_value_date=None):
134
135
136
137
138
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

139
        """
140
        table = self._get_ts_table(cn, name)
141
142
        if table is None:
            return
143

144
145
146
        qfilter = []
        if revision_date:
            qfilter.append(lambda cset, _: cset.c.insertion_date <= revision_date)
147
148
149
150
        snap = Snapshot(cn, self, name)
        current = snap.build_upto(qfilter,
                                  from_value_date=from_value_date,
                                  to_value_date=to_value_date)
151

152
153
        if current is not None:
            current.name = name
154
            current = current[~current.isnull()]
155
        return current
156

157
    def metadata(self, cn, tsname):
158
159
160
161
162
163
164
165
166
167
168
        """Return metadata dict of timeserie."""
        if tsname in self.metadatacache:
            return self.metadatacache[tsname]
        reg = self.schema.registry
        sql = select([reg.c.metadata]).where(
            reg.c.name == tsname
        )
        meta = cn.execute(sql).scalar()
        self.metadatacache[tsname] = meta
        return meta

169
170
    def get_group(self, cn, name, revision_date=None):
        csid = self._latest_csid_for(cn, name)
171
172

        group = {}
173
174
        for seriename in self._changeset_series(cn, csid):
            serie = self.get(cn, seriename, revision_date)
175
176
177
178
            if serie is not None:
                group[seriename] = serie
        return group

179
180
    def get_history(self, cn, name,
                    from_insertion_date=None,
181
182
                    to_insertion_date=None,
                    from_value_date=None,
183
184
                    to_value_date=None,
                    diffmode=False):
185
186
187
188
        table = self._get_ts_table(cn, name)
        if table is None:
            return

189
        # compute diffs above the snapshot
190
191
192
        cset = self.schema.changeset
        diffsql = select([cset.c.id, cset.c.insertion_date, table.c.diff]
        ).order_by(cset.c.id
193
        ).where(table.c.cset == cset.c.id)
194
195
196
197
198
199
200

        if from_insertion_date:
            diffsql = diffsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            diffsql = diffsql.where(cset.c.insertion_date <= to_insertion_date)

        diffs = cn.execute(diffsql).fetchall()
201
202
203
204
205
        if not diffs:
            # it's fine to ask for an insertion date range
            # where noting did happen, but you get nothing
            return

206
        if diffmode:
207
            snapshot = Snapshot(cn, self, name)
208
209
210
            series = []
            for csid, revdate, diff in diffs:
                if diff is None:  # we must fetch the initial snapshot
211
212
213
214
                    serie = subset(snapshot.first, from_value_date, to_value_date)
                else:
                    serie = subset(self._deserialize(diff, name), from_value_date, to_value_date)
                    serie = self._ensure_tz_consistency(cn, serie)
215
                inject_in_index(serie, revdate)
216
217
218
219
220
                series.append(serie)
            series = pd.concat(series)
            series.name = name
            return series

221
        csid, revdate, diff_ = diffs[0]
222
223
224
        snap = Snapshot(cn, self, name)
        snapshot = snap.build_upto([lambda cset, _: cset.c.id <= csid],
                                   from_value_date, to_value_date)
225

226
        series = [(revdate, subset(snapshot, from_value_date, to_value_date))]
227
        for csid_, revdate, diff in diffs[1:]:
228
229
            diff = subset(self._deserialize(diff, table.name),
                          from_value_date, to_value_date)
230
            diff = self._ensure_tz_consistency(cn, diff)
231

232
            serie = self.patch(series[-1][1], diff)
233
234
235
            series.append((revdate, serie))

        for revdate, serie in series:
236
            inject_in_index(serie, revdate)
237
238
239
240

        serie = pd.concat([serie for revdate_, serie in series])
        serie.name = name
        return serie
241

242
243
244
    def exists(self, cn, name):
        return self._get_ts_table(cn, name) is not None

245
    def latest_insertion_date(self, cn, name):
246
        cset = self.schema.changeset
247
        tstable = self._get_ts_table(cn, name)
248
        sql = select([func.max(cset.c.insertion_date)]
249
        ).where(tstable.c.cset == cset.c.id)
250
        return cn.execute(sql).scalar()
251

252
253
254
255
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
        assert mode in ('strict', 'before', 'after')
        cset = self.schema.changeset
        table = self._table_definition_for(seriename)
256
257
        sql = select([table.c.cset]).where(
            table.c.cset == cset.c.id
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
        )
        if mode == 'strict':
            sql = sql.where(cset.c.insertion_date == revdate)
        elif mode == 'before':
            sql = sql.where(cset.c.insertion_date <= revdate)
        else:
            sql = sql.where(cset.c.insertion_date >= revdate)
        return cn.execute(sql).scalar()

    def strip(self, cn, seriename, csid):
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs

        # put stripping info in the metadata
        cset = self.schema.changeset
        cset_serie = self.schema.changeset_series
        for log in logs:
            # update changeset.metadata
            metadata = cn.execute(
                select([cset.c.metadata]).where(cset.c.id == log['rev'])
            ).scalar() or {}
            metadata['tshistory.info'] = 'got stripped from {}'.format(csid)
            sql = cset.update().where(cset.c.id == log['rev']
            ).values(metadata=metadata)
            cn.execute(sql)
            # delete changset_serie item
            sql = cset_serie.delete().where(
285
                cset_serie.c.cset == log['rev']
286
287
288
289
290
291
292
            ).where(
                cset_serie.c.serie == seriename
            )
            cn.execute(sql)

        # wipe the diffs
        table = self._table_definition_for(seriename)
293
        cn.execute(table.delete().where(table.c.cset >= csid))
294
295
        # rebuild the top-level snapshot
        cstip = self._latest_csid_for(cn, seriename)
296
        Snapshot(cn, self, seriename).strip_at(cstip)
297

298
    def info(self, cn):
299
300
        """Gather global statistics on the current tshistory repository
        """
301
        sql = 'select count(*) from {}.registry'.format(self.namespace)
302
        stats = {'series count': cn.execute(sql).scalar()}
303
        sql = 'select max(id) from {}.changeset'.format(self.namespace)
304
        stats['changeset count'] = cn.execute(sql).scalar()
305
        sql = 'select distinct name from {}.registry order by name'.format(self.namespace)
306
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
307
308
        return stats

309
    def log(self, cn, limit=0, diff=False, names=None, authors=None,
310
            stripped=False,
311
312
            fromrev=None, torev=None,
            fromdate=None, todate=None):
313
314
315
316
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
317
318
319
320
321
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
322

323
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date, cset.c.metadata]
324
        ).distinct().order_by(desc(cset.c.id))
325
326
327
328

        if limit:
            sql = sql.limit(limit)

329
330
331
        if names:
            sql = sql.where(reg.c.name.in_(names))

332
333
334
        if authors:
            sql = sql.where(cset.c.author.in_(authors))

335
336
337
338
339
340
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)

        if torev:
            sql = sql.where(cset.c.id <= torev)

341
342
343
344
345
346
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)

        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)

347
348
349
350
        if stripped:
            # outerjoin to show dead things
            sql = sql.select_from(cset.outerjoin(cset_series))
        else:
351
            sql = sql.where(cset.c.id == cset_series.c.cset
352
            ).where(cset_series.c.serie == reg.c.name)
353

354
        rset = cn.execute(sql)
355
        for csetid, author, revdate, meta in rset.fetchall():
356
            log.append({'rev': csetid, 'author': author, 'date': revdate,
357
                        'meta': meta or {},
358
                        'names': self._changeset_series(cn, csetid)})
359
360
361

        if diff:
            for rev in log:
362
                rev['diff'] = {name: self.diff_at(cn, rev['rev'], name)
363
364
                               for name in rev['names']}

365
        log.sort(key=lambda rev: rev['rev'])
366
367
        return log

368
369
    # /API
    # Helpers
370

371
372
    # ts serialisation

373
374
375
376
377
    def _ensure_tz_consistency(self, cn, ts):
        """Return timeserie with tz aware index or not depending on metadata
        tzaware.
        """
        assert ts.name is not None
378
        metadata = self.metadata(cn, ts.name)
379
        if metadata and metadata.get('tzaware', False):
380
381
382
            if isinstance(ts.index, pd.MultiIndex):
                for i in range(len(ts.index.levels)):
                    ts.index = ts.index.set_levels(
383
                        ts.index.levels[i].tz_localize('UTC'),
384
385
                        level=i)
                return ts
386
387
388
            return ts.tz_localize('UTC')
        return ts

389
    # serie table handling
390

391
392
    def _ts_table_name(self, seriename):
        # namespace.seriename
393
        return '{}.timeserie.{}'.format(self.namespace, seriename)
394

395
    def _table_definition_for(self, seriename):
396
        return Table(
397
            seriename, self.schema.meta,
398
            Column('id', Integer, primary_key=True),
399
            Column('cset', Integer,
400
                   ForeignKey('{}.changeset.id'.format(self.namespace)),
401
                   index=True, nullable=False),
402
            # constraint: there is either .diff or .snapshot
403
            Column('diff', BYTEA),
404
            schema='{}.timeserie'.format(self.namespace),
405
            extend_existing=True
406
407
        )

408
    def _make_ts_table(self, cn, name, ts):
409
        tablename = self._ts_table_name(name)
410
        table = self._table_definition_for(name)
411
        table.create(cn)
412
413
        index = ts.index
        inames = [name for name in index.names if name]
414
        sql = self.schema.registry.insert().values(
415
            name=name,
416
            table_name=tablename,
417
418
419
420
421
422
            metadata={
                'tzaware': tzaware_serie(ts),
                'index_type': index.dtype.name,
                'index_names': inames,
                'value_type': ts.dtypes.name
            },
423
        )
424
        cn.execute(sql)
425
426
        return table

427
    def _get_ts_table(self, cn, name):
428
        reg = self.schema.registry
429
430
        tablename = self._ts_table_name(name)
        sql = reg.select().where(reg.c.table_name == tablename)
431
        tid = cn.execute(sql).scalar()
432
        if tid:
433
            return self._table_definition_for(name)
434

435
436
    # changeset handling

437
    def _newchangeset(self, cn, author, _insertion_date=None):
438
        table = self.schema.changeset
439
440
        sql = table.insert().values(
            author=author,
441
            insertion_date=_insertion_date or datetime.now())
442
        return cn.execute(sql).inserted_primary_key[0]
443

444
445
    def _latest_csid_for(self, cn, name):
        table = self._get_ts_table(cn, name)
446
        sql = select([func.max(table.c.cset)])
447
        return cn.execute(sql).scalar()
448

449
    def _changeset_series(self, cn, csid):
450
        cset_serie = self.schema.changeset_series
451
        sql = select([cset_serie.c.serie]
452
        ).where(cset_serie.c.cset == csid)
453

454
        return [seriename for seriename, in cn.execute(sql).fetchall()]
455
456
457

    # insertion handling

458
    def _validate(self, cn, ts, name):
459
460
        if ts.isnull().all():
            # ts erasure
461
            return
462
        tstype = ts.dtype
463
        meta = self.metadata(cn, name)
464
        if tstype != meta['value_type']:
465
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
466
                name, tstype, meta['value_type'])
467
            raise Exception(m)
468
        if ts.index.dtype.name != meta['index_type']:
469
            raise Exception('Incompatible index types')
470
471
472
473
474
        inames = [name for name in ts.index.names if name]
        if inames != meta['index_names']:
            raise Exception('Incompatible multi indexes: {} vs {}'.format(
                meta['index_names'], inames)
            )
475

476
477
    def _complete_insertion_value(self, value, extra_scalars):
        pass
478

479
480
481
482
483
484
485
    def _finalize_insertion(self, cn, csid, name):
        table = self.schema.changeset_series
        sql = table.insert().values(
            cset=csid,
            serie=name
        )
        cn.execute(sql)
486

487
    def diff_at(self, cn, csetid, name):
488
        table = self._get_ts_table(cn, name)
489
        cset = self.schema.changeset
490
491

        def filtercset(sql):
492
            return sql.where(table.c.cset == cset.c.id
493
494
495
            ).where(cset.c.id == csetid)

        sql = filtercset(select([table.c.id]))
496
        tsid = cn.execute(sql).scalar()
497
498

        if tsid == 1:
499
            return Snapshot(cn, self, name).first
500

501
        sql = filtercset(select([table.c.diff]))
502
503
        ts = self._deserialize(cn.execute(sql).scalar(), name)
        return self._ensure_tz_consistency(cn, ts)