tsio.py 16 KB
Newer Older
1
from datetime import datetime
2
from contextlib import contextmanager
3
import logging
4
5
6

import pandas as pd

7
from sqlalchemy import Table, Column, Integer, ForeignKey
Aurélien Campéas's avatar
Aurélien Campéas committed
8
from sqlalchemy.sql.expression import select, func, desc
9
from sqlalchemy.dialects.postgresql import BYTEA
10

11
from tshistory.schema import tsschema
12
13
from tshistory.util import (
    inject_in_index,
14
15
    mindate,
    maxdate,
16
    num2float,
17
    subset,
18
    SeriesServices,
19
20
    tzaware_serie
)
21
from tshistory.snapshot import Snapshot
22

23
L = logging.getLogger('tshistory.tsio')
24
TABLES = {}
25
26


27
class TimeSerie(SeriesServices):
28
    namespace = 'tsh'
29
    schema = None
30
31
32

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
33
34
        self.schema = tsschema(namespace)
        self.schema.define()
35
        self.metadatacache = {}
36

37
    def insert(self, cn, newts, name, author, _insertion_date=None):
38
        """Create a new revision of a given time series
39

40
        newts: pandas.Series with date index
41
        name: str unique identifier of the serie
42
        author: str free-form author name
43
44
        """
        assert isinstance(newts, pd.Series)
45
        assert not newts.index.duplicated().any()
46

47
        newts = num2float(newts)
48

49
        if not len(newts):
50
            return
51

52
        assert ('<M8[ns]' == newts.index.dtype or
53
                'datetime' in str(newts.index.dtype) and not
54
55
                isinstance(newts.index, pd.MultiIndex))

56
        newts.name = name
57
        table = self._get_ts_table(cn, name)
58

59
        if table is None:
60
            return self._create(cn, newts, name, author, _insertion_date)
61

62
        return self._update(cn, table, newts, name, author, _insertion_date)
63

64
    def get(self, cn, name, revision_date=None,
65
66
            from_value_date=None, to_value_date=None,
            _keep_nans=False):
67
68
69
70
71
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

72
        """
73
        table = self._get_ts_table(cn, name)
74
75
        if table is None:
            return
76

77
78
79
        qfilter = []
        if revision_date:
            qfilter.append(lambda cset, _: cset.c.insertion_date <= revision_date)
80
        snap = Snapshot(cn, self, name)
81
82
83
        _, current = snap.find(qfilter,
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)
84

85
        if current is not None and not _keep_nans:
86
            current.name = name
87
            current = current[~current.isnull()]
88
        return current
89

90
    def metadata(self, cn, tsname):
91
92
93
94
95
96
97
98
99
100
101
        """Return metadata dict of timeserie."""
        if tsname in self.metadatacache:
            return self.metadatacache[tsname]
        reg = self.schema.registry
        sql = select([reg.c.metadata]).where(
            reg.c.name == tsname
        )
        meta = cn.execute(sql).scalar()
        self.metadatacache[tsname] = meta
        return meta

102
103
    def get_history(self, cn, name,
                    from_insertion_date=None,
104
105
                    to_insertion_date=None,
                    from_value_date=None,
106
107
                    to_value_date=None,
                    diffmode=False):
108
109
110
111
        table = self._get_ts_table(cn, name)
        if table is None:
            return

112
        # compute diffs above the snapshot
113
114
115
        cset = self.schema.changeset
        diffsql = select([cset.c.id, cset.c.insertion_date, table.c.diff]
        ).order_by(cset.c.id
116
        ).where(table.c.cset == cset.c.id)
117
118
119
120
121
122
123

        if from_insertion_date:
            diffsql = diffsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            diffsql = diffsql.where(cset.c.insertion_date <= to_insertion_date)

        diffs = cn.execute(diffsql).fetchall()
124
125
126
127
128
        if not diffs:
            # it's fine to ask for an insertion date range
            # where noting did happen, but you get nothing
            return

129
        if diffmode:
130
            snapshot = Snapshot(cn, self, name)
131
132
133
            series = []
            for csid, revdate, diff in diffs:
                if diff is None:  # we must fetch the initial snapshot
134
135
136
137
                    serie = subset(snapshot.first, from_value_date, to_value_date)
                else:
                    serie = subset(self._deserialize(diff, name), from_value_date, to_value_date)
                    serie = self._ensure_tz_consistency(cn, serie)
138
                inject_in_index(serie, revdate)
139
140
141
142
143
                series.append(serie)
            series = pd.concat(series)
            series.name = name
            return series

144
        csid, revdate, diff_ = diffs[0]
145
        snap = Snapshot(cn, self, name)
146
147
        _, snapshot = snap.find([lambda cset, _: cset.c.id <= csid],
                                from_value_date, to_value_date)
148

149
        series = [(revdate, subset(snapshot, from_value_date, to_value_date))]
150
        for csid_, revdate, diff in diffs[1:]:
151
152
            diff = subset(self._deserialize(diff, table.name),
                          from_value_date, to_value_date)
153
            diff = self._ensure_tz_consistency(cn, diff)
154

155
            serie = self.patch(series[-1][1], diff)
156
157
158
            series.append((revdate, serie))

        for revdate, serie in series:
159
            inject_in_index(serie, revdate)
160
161
162
163

        serie = pd.concat([serie for revdate_, serie in series])
        serie.name = name
        return serie
164

165
166
167
    def exists(self, cn, name):
        return self._get_ts_table(cn, name) is not None

168
    def latest_insertion_date(self, cn, name):
169
        cset = self.schema.changeset
170
        tstable = self._get_ts_table(cn, name)
171
        sql = select([func.max(cset.c.insertion_date)]
172
        ).where(tstable.c.cset == cset.c.id)
173
        return cn.execute(sql).scalar()
174

175
176
177
178
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
        assert mode in ('strict', 'before', 'after')
        cset = self.schema.changeset
        table = self._table_definition_for(seriename)
179
180
        sql = select([table.c.cset]).where(
            table.c.cset == cset.c.id
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
        )
        if mode == 'strict':
            sql = sql.where(cset.c.insertion_date == revdate)
        elif mode == 'before':
            sql = sql.where(cset.c.insertion_date <= revdate)
        else:
            sql = sql.where(cset.c.insertion_date >= revdate)
        return cn.execute(sql).scalar()

    def strip(self, cn, seriename, csid):
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs

        # put stripping info in the metadata
        cset = self.schema.changeset
        cset_serie = self.schema.changeset_series
        for log in logs:
            # update changeset.metadata
            metadata = cn.execute(
                select([cset.c.metadata]).where(cset.c.id == log['rev'])
            ).scalar() or {}
            metadata['tshistory.info'] = 'got stripped from {}'.format(csid)
            sql = cset.update().where(cset.c.id == log['rev']
            ).values(metadata=metadata)
            cn.execute(sql)
            # delete changset_serie item
            sql = cset_serie.delete().where(
208
                cset_serie.c.cset == log['rev']
209
210
211
212
213
214
215
            ).where(
                cset_serie.c.serie == seriename
            )
            cn.execute(sql)

        # wipe the diffs
        table = self._table_definition_for(seriename)
216
        cn.execute(table.delete().where(table.c.cset >= csid))
217

218
    def info(self, cn):
219
220
        """Gather global statistics on the current tshistory repository
        """
221
        sql = 'select count(*) from {}.registry'.format(self.namespace)
222
        stats = {'series count': cn.execute(sql).scalar()}
223
        sql = 'select max(id) from {}.changeset'.format(self.namespace)
224
        stats['changeset count'] = cn.execute(sql).scalar()
225
        sql = 'select distinct name from {}.registry order by name'.format(self.namespace)
226
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
227
228
        return stats

229
    def log(self, cn, limit=0, diff=False, names=None, authors=None,
230
            stripped=False,
231
232
            fromrev=None, torev=None,
            fromdate=None, todate=None):
233
234
235
236
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
237
238
239
240
241
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
242

243
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date, cset.c.metadata]
244
        ).distinct().order_by(desc(cset.c.id))
245
246
247
248

        if limit:
            sql = sql.limit(limit)

249
250
251
        if names:
            sql = sql.where(reg.c.name.in_(names))

252
253
254
        if authors:
            sql = sql.where(cset.c.author.in_(authors))

255
256
257
258
259
260
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)

        if torev:
            sql = sql.where(cset.c.id <= torev)

261
262
263
264
265
266
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)

        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)

267
268
269
270
        if stripped:
            # outerjoin to show dead things
            sql = sql.select_from(cset.outerjoin(cset_series))
        else:
271
            sql = sql.where(cset.c.id == cset_series.c.cset
272
            ).where(cset_series.c.serie == reg.c.name)
273

274
        rset = cn.execute(sql)
275
        for csetid, author, revdate, meta in rset.fetchall():
276
277
            log.append({'rev': csetid, 'author': author,
                        'date': pd.Timestamp(revdate, tz='utc'),
278
                        'meta': meta or {},
279
                        'names': self._changeset_series(cn, csetid)})
280
281
282

        if diff:
            for rev in log:
283
                rev['diff'] = {name: self.diff_at(cn, rev['rev'], name)
284
285
                               for name in rev['names']}

286
        log.sort(key=lambda rev: rev['rev'])
287
288
        return log

289
290
    # /API
    # Helpers
291

292
293
294
295
296
297
298
    # creation / update

    def _create(self, cn, newts, name, author, insertion_date=None):
        # initial insertion
        if len(newts) == 0:
            return None
        snapshot = Snapshot(cn, self, name)
299
        csid = self._newchangeset(cn, author, insertion_date)
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
        head = snapshot.create(newts)
        value = {
            'cset': csid,
            'snapshot': head
        }
        table = self._make_ts_table(cn, name, newts)
        cn.execute(table.insert().values(value))
        self._finalize_insertion(cn, csid, name)
        L.info('first insertion of %s (size=%s) by %s',
               name, len(newts), author or self._author)
        return newts

    def _update(self, cn, table, newts, name, author, insertion_date=None):
        self._validate(cn, newts, name)
        snapshot = Snapshot(cn, self, name)
        diff = self.diff(snapshot.last(mindate(newts), maxdate(newts)), newts)
        if not len(diff):
            L.info('no difference in %s by %s (for ts of size %s)',
                   name, author or self._author, len(newts))
            return

321
        csid = self._newchangeset(cn, author, insertion_date)
322
323
324
325
326
327
328
329
330
331
332
333
334
        head = snapshot.update(diff)
        value = {
            'cset': csid,
            'diff': self._serialize(diff),
            'snapshot': head
        }
        cn.execute(table.insert().values(value))
        self._finalize_insertion(cn, csid, name)

        L.info('inserted diff (size=%s) for ts %s by %s',
               len(diff), name, author or self._author)
        return diff

335
336
    # ts serialisation

337
338
339
340
341
    def _ensure_tz_consistency(self, cn, ts):
        """Return timeserie with tz aware index or not depending on metadata
        tzaware.
        """
        assert ts.name is not None
342
        metadata = self.metadata(cn, ts.name)
343
344
345
346
        if metadata and metadata.get('tzaware', False):
            return ts.tz_localize('UTC')
        return ts

347
    # serie table handling
348

349
350
    def _ts_table_name(self, seriename):
        # namespace.seriename
351
        return '{}.timeserie.{}'.format(self.namespace, seriename)
352

353
    def _table_definition_for(self, seriename):
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
        tablename = self._ts_table_name(seriename)
        table = TABLES.get(tablename)
        if table is None:
            TABLES[tablename] = table = Table(
                seriename, self.schema.meta,
                Column('id', Integer, primary_key=True),
                Column('cset', Integer,
                       ForeignKey('{}.changeset.id'.format(self.namespace)),
                       index=True, nullable=False),
                Column('diff', BYTEA),
                Column('snapshot', Integer,
                       ForeignKey('{}.snapshot.{}.id'.format(
                           self.namespace,
                           seriename)),
                       index=True),
                schema='{}.timeserie'.format(self.namespace),
                extend_existing=True
            )
        return table
373

374
    def _make_ts_table(self, cn, name, ts):
375
        tablename = self._ts_table_name(name)
376
        table = self._table_definition_for(name)
377
        table.create(cn)
378
379
        index = ts.index
        inames = [name for name in index.names if name]
380
        sql = self.schema.registry.insert().values(
381
            name=name,
382
            table_name=tablename,
383
384
385
386
387
388
            metadata={
                'tzaware': tzaware_serie(ts),
                'index_type': index.dtype.name,
                'index_names': inames,
                'value_type': ts.dtypes.name
            },
389
        )
390
        cn.execute(sql)
391
392
        return table

393
    def _get_ts_table(self, cn, name):
394
        reg = self.schema.registry
395
396
        tablename = self._ts_table_name(name)
        sql = reg.select().where(reg.c.table_name == tablename)
397
        tid = cn.execute(sql).scalar()
398
        if tid:
399
            return self._table_definition_for(name)
400

401
402
    # changeset handling

403
    def _newchangeset(self, cn, author, _insertion_date=None):
404
        table = self.schema.changeset
405
406
407
        if _insertion_date is not None:
            assert _insertion_date.tzinfo is not None
        idate = pd.Timestamp(_insertion_date or datetime.utcnow(), tz='UTC')
408
409
        sql = table.insert().values(
            author=author,
410
            insertion_date=idate)
411
        return cn.execute(sql).inserted_primary_key[0]
412

413
    def _changeset_series(self, cn, csid):
414
        cset_serie = self.schema.changeset_series
415
        sql = select([cset_serie.c.serie]
416
        ).where(cset_serie.c.cset == csid)
417

418
        return [seriename for seriename, in cn.execute(sql).fetchall()]
419
420
421

    # insertion handling

422
    def _validate(self, cn, ts, name):
423
424
        if ts.isnull().all():
            # ts erasure
425
            return
426
        tstype = ts.dtype
427
        meta = self.metadata(cn, name)
428
        if tstype != meta['value_type']:
429
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
430
                name, tstype, meta['value_type'])
431
            raise Exception(m)
432
        if ts.index.dtype.name != meta['index_type']:
433
            raise Exception('Incompatible index types')
434

435
436
437
438
439
440
441
    def _finalize_insertion(self, cn, csid, name):
        table = self.schema.changeset_series
        sql = table.insert().values(
            cset=csid,
            serie=name
        )
        cn.execute(sql)
442

443
    def diff_at(self, cn, csetid, name):
444
        table = self._get_ts_table(cn, name)
445
        cset = self.schema.changeset
446
447

        def filtercset(sql):
448
            return sql.where(table.c.cset == cset.c.id
449
450
451
            ).where(cset.c.id == csetid)

        sql = filtercset(select([table.c.id]))
452
        tsid = cn.execute(sql).scalar()
453
454

        if tsid == 1:
455
            return Snapshot(cn, self, name).first
456

457
        sql = filtercset(select([table.c.diff]))
458
459
        ts = self._deserialize(cn.execute(sql).scalar(), name)
        return self._ensure_tz_consistency(cn, ts)