tsio.py 15.8 KB
Newer Older
1
from datetime import datetime
2
from contextlib import contextmanager
3
import logging
4
import hashlib
5
6
7

import pandas as pd

8
from sqlalchemy import Table, Column, Integer, ForeignKey
Aurélien Campéas's avatar
Aurélien Campéas committed
9
from sqlalchemy.sql.expression import select, func, desc
10
from sqlalchemy.dialects.postgresql import BYTEA
11

12
from tshistory.schema import tsschema
13
14
from tshistory.util import (
    inject_in_index,
15
    num2float,
16
    subset,
17
    SeriesServices,
18
19
    tzaware_serie
)
20
from tshistory.snapshot import Snapshot
21

22
L = logging.getLogger('tshistory.tsio')
23
TABLES = {}
24
25


26
class TimeSerie(SeriesServices):
27
    namespace = 'tsh'
28
    schema = None
29
30
31

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
32
33
        self.schema = tsschema(namespace)
        self.schema.define()
34
        self.metadatacache = {}
35

36
37
38
    def insert(self, cn, newts, name, author,
               metadata=None,
               _insertion_date=None):
39
        """Create a new revision of a given time series
40

41
        newts: pandas.Series with date index
42
        name: str unique identifier of the serie
43
        author: str free-form author name
44
        metadata: optional dict for changeset metadata
45
46
        """
        assert isinstance(newts, pd.Series)
Aurélien Campéas's avatar
Aurélien Campéas committed
47
48
        assert isinstance(name, str)
        assert isinstance(author, str)
49
        assert metadata is None or isinstance(metadata, dict)
Aurélien Campéas's avatar
Aurélien Campéas committed
50
        assert _insertion_date is None or isinstance(_insertion_date, datetime)
51
        assert not newts.index.duplicated().any()
52

53
        newts = num2float(newts)
54

55
        if not len(newts):
56
            return
57

58
        assert ('<M8[ns]' == newts.index.dtype or
59
                'datetime' in str(newts.index.dtype) and not
60
61
                isinstance(newts.index, pd.MultiIndex))

62
        newts.name = name
63
        table = self._get_ts_table(cn, name)
64

65
        if table is None:
66
67
            return self._create(cn, newts, name, author,
                                metadata, _insertion_date)
68

69
70
        return self._update(cn, table, newts, name, author,
                            metadata, _insertion_date)
71

72
    def get(self, cn, name, revision_date=None,
73
74
            from_value_date=None, to_value_date=None,
            _keep_nans=False):
75
76
77
78
79
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

80
        """
81
        table = self._get_ts_table(cn, name)
82
83
        if table is None:
            return
84

85
86
87
        qfilter = []
        if revision_date:
            qfilter.append(lambda cset, _: cset.c.insertion_date <= revision_date)
88
        snap = Snapshot(cn, self, name)
89
90
91
        _, current = snap.find(qfilter,
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)
92

93
        if current is not None and not _keep_nans:
94
            current.name = name
95
            current = current[~current.isnull()]
96
        return current
97

98
    def metadata(self, cn, tsname):
99
100
101
102
103
104
105
106
107
108
109
        """Return metadata dict of timeserie."""
        if tsname in self.metadatacache:
            return self.metadatacache[tsname]
        reg = self.schema.registry
        sql = select([reg.c.metadata]).where(
            reg.c.name == tsname
        )
        meta = cn.execute(sql).scalar()
        self.metadatacache[tsname] = meta
        return meta

110
111
112
113
114
115
116
117
118
119
120
121
    def update_metadata(self, cn, tsname, metadata, internal=False):
        assert isinstance(metadata, dict)
        meta = self.metadata(cn, tsname)
        if not internal:
            assert set(meta.keys()).intersection(metadata.keys()) == set()
        meta.update(metadata)
        reg = self.schema.registry
        sql = reg.update().where(
            reg.c.name == tsname
        ).values(metadata=metadata)
        cn.execute(sql)

122
123
124
125
126
127
128
129
    def changeset_metadata(self, cn, csid):
        cset = self.schema.changeset
        sql = 'select metadata from "{ns}".changeset where id = {id}'.format(
            ns=self.namespace,
            id=csid
        )
        return cn.execute(sql).scalar()

130
131
    def get_history(self, cn, name,
                    from_insertion_date=None,
132
133
                    to_insertion_date=None,
                    from_value_date=None,
134
                    to_value_date=None,
135
                    deltabefore=None,
136
                    deltaafter=None):
137
138
139
140
        table = self._get_ts_table(cn, name)
        if table is None:
            return

141
142
143
144
        if deltabefore is not None or deltaafter is not None:
            assert from_value_date is None
            assert to_value_date is None

145
        cset = self.schema.changeset
146
147
148
149
150
151
152
153
154
155
156
157
        revsql = select(
            [cset.c.id, cset.c.insertion_date]
        ).order_by(
            cset.c.id
        ).where(
            table.c.cset == cset.c.id
        )

        if from_insertion_date:
            revsql = revsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            revsql = revsql.where(cset.c.insertion_date <= to_insertion_date)
158

159
160
161
        revs = cn.execute(revsql).fetchall()
        if not revs:
            return
162

163
164
165
        snapshot = Snapshot(cn, self, name)
        series = []
        for csid, idate in revs:
166
167
168
169
170
171
172
            if deltabefore or deltaafter:
                from_value_date = idate
                to_value_date = idate
                if deltabefore:
                    from_value_date = idate - deltabefore
                if deltaafter:
                    to_value_date = idate + deltaafter
173
174
175
176
177
178
            series.append((
                idate,
                snapshot.find([lambda cset, _: cset.c.id == csid],
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)[1]
            ))
179
180

        for revdate, serie in series:
181
            inject_in_index(serie, revdate)
182
183
184
185

        serie = pd.concat([serie for revdate_, serie in series])
        serie.name = name
        return serie
186

187
188
189
    def exists(self, cn, name):
        return self._get_ts_table(cn, name) is not None

190
    def latest_insertion_date(self, cn, name):
191
        cset = self.schema.changeset
192
        tstable = self._get_ts_table(cn, name)
193
        sql = select([func.max(cset.c.insertion_date)]
194
        ).where(tstable.c.cset == cset.c.id)
195
        return cn.execute(sql).scalar()
196

197
198
199
200
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
        assert mode in ('strict', 'before', 'after')
        cset = self.schema.changeset
        table = self._table_definition_for(seriename)
201
202
        sql = select([table.c.cset]).where(
            table.c.cset == cset.c.id
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
        )
        if mode == 'strict':
            sql = sql.where(cset.c.insertion_date == revdate)
        elif mode == 'before':
            sql = sql.where(cset.c.insertion_date <= revdate)
        else:
            sql = sql.where(cset.c.insertion_date >= revdate)
        return cn.execute(sql).scalar()

    def strip(self, cn, seriename, csid):
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs

        # put stripping info in the metadata
        cset = self.schema.changeset
        cset_serie = self.schema.changeset_series
        for log in logs:
            # update changeset.metadata
221
            metadata = self.changeset_metadata(cn, log['rev']) or {}
222
223
224
225
226
227
            metadata['tshistory.info'] = 'got stripped from {}'.format(csid)
            sql = cset.update().where(cset.c.id == log['rev']
            ).values(metadata=metadata)
            cn.execute(sql)
            # delete changset_serie item
            sql = cset_serie.delete().where(
228
                cset_serie.c.cset == log['rev']
229
230
231
232
233
234
235
            ).where(
                cset_serie.c.serie == seriename
            )
            cn.execute(sql)

        # wipe the diffs
        table = self._table_definition_for(seriename)
236
        cn.execute(table.delete().where(table.c.cset >= csid))
237

238
    def info(self, cn):
239
240
        """Gather global statistics on the current tshistory repository
        """
241
        sql = 'select count(*) from {}.registry'.format(self.namespace)
242
        stats = {'series count': cn.execute(sql).scalar()}
243
        sql = 'select max(id) from {}.changeset'.format(self.namespace)
244
        stats['changeset count'] = cn.execute(sql).scalar()
245
        sql = 'select distinct name from {}.registry order by name'.format(self.namespace)
246
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
247
248
        return stats

249
    def log(self, cn, limit=0, names=None, authors=None,
250
            stripped=False,
251
252
            fromrev=None, torev=None,
            fromdate=None, todate=None):
253
254
255
256
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
257
258
259
260
261
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
262

263
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date, cset.c.metadata]
264
        ).distinct().order_by(desc(cset.c.id))
265
266
267

        if limit:
            sql = sql.limit(limit)
268
269
        if names:
            sql = sql.where(reg.c.name.in_(names))
270
271
        if authors:
            sql = sql.where(cset.c.author.in_(authors))
272
273
274
275
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)
        if torev:
            sql = sql.where(cset.c.id <= torev)
276
277
278
279
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)
        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)
280
281
282
283
        if stripped:
            # outerjoin to show dead things
            sql = sql.select_from(cset.outerjoin(cset_series))
        else:
284
            sql = sql.where(cset.c.id == cset_series.c.cset
285
            ).where(cset_series.c.serie == reg.c.name)
286

287
        rset = cn.execute(sql)
288
        for csetid, author, revdate, meta in rset.fetchall():
289
290
            log.append({'rev': csetid, 'author': author,
                        'date': pd.Timestamp(revdate, tz='utc'),
291
                        'meta': meta or {},
292
                        'names': self._changeset_series(cn, csetid)})
293

294
        log.sort(key=lambda rev: rev['rev'])
295
296
        return log

297
298
    # /API
    # Helpers
299

300
301
    # creation / update

302
303
    def _create(self, cn, newts, name, author,
                metadata=None, insertion_date=None):
304
305
306
307
        # initial insertion
        if len(newts) == 0:
            return None
        snapshot = Snapshot(cn, self, name)
308
        csid = self._newchangeset(cn, author, insertion_date, metadata)
309
310
311
312
313
314
315
316
317
        head = snapshot.create(newts)
        value = {
            'cset': csid,
            'snapshot': head
        }
        table = self._make_ts_table(cn, name, newts)
        cn.execute(table.insert().values(value))
        self._finalize_insertion(cn, csid, name)
        L.info('first insertion of %s (size=%s) by %s',
Aurélien Campéas's avatar
Aurélien Campéas committed
318
               name, len(newts), author)
319
320
        return newts

321
322
    def _update(self, cn, table, newts, name, author,
                metadata=None, insertion_date=None):
323
324
        self._validate(cn, newts, name)
        snapshot = Snapshot(cn, self, name)
325
326
327
        diff = self.diff(snapshot.last(newts.index.min(),
                                       newts.index.max()),
                         newts)
328
329
        if not len(diff):
            L.info('no difference in %s by %s (for ts of size %s)',
Aurélien Campéas's avatar
Aurélien Campéas committed
330
                   name, author, len(newts))
331
332
            return

333
        csid = self._newchangeset(cn, author, insertion_date, metadata)
334
335
336
337
338
339
340
341
342
        head = snapshot.update(diff)
        value = {
            'cset': csid,
            'snapshot': head
        }
        cn.execute(table.insert().values(value))
        self._finalize_insertion(cn, csid, name)

        L.info('inserted diff (size=%s) for ts %s by %s',
Aurélien Campéas's avatar
Aurélien Campéas committed
343
               len(diff), name, author)
344
345
        return diff

346
347
    # ts serialisation

348
349
350
351
352
    def _ensure_tz_consistency(self, cn, ts):
        """Return timeserie with tz aware index or not depending on metadata
        tzaware.
        """
        assert ts.name is not None
353
        metadata = self.metadata(cn, ts.name)
354
355
356
357
        if metadata and metadata.get('tzaware', False):
            return ts.tz_localize('UTC')
        return ts

358
    # serie table handling
359

360
    def _ts_table_name(self, seriename):
361
        seriename = self._tablename(seriename)
362
        return '{}.timeserie.{}'.format(self.namespace, seriename)
363

364
    def _table_definition_for(self, seriename):
365
        tablename = self._ts_table_name(seriename)
366
        seriename = self._tablename(seriename)
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
        table = TABLES.get(tablename)
        if table is None:
            TABLES[tablename] = table = Table(
                seriename, self.schema.meta,
                Column('id', Integer, primary_key=True),
                Column('cset', Integer,
                       ForeignKey('{}.changeset.id'.format(self.namespace)),
                       index=True, nullable=False),
                Column('snapshot', Integer,
                       ForeignKey('{}.snapshot.{}.id'.format(
                           self.namespace,
                           seriename)),
                       index=True),
                schema='{}.timeserie'.format(self.namespace),
                extend_existing=True
            )
        return table
384

385
    def _make_ts_table(self, cn, name, ts):
386
        tablename = self._ts_table_name(name)
387
        table = self._table_definition_for(name)
388
        table.create(cn)
389
390
        index = ts.index
        inames = [name for name in index.names if name]
391
        sql = self.schema.registry.insert().values(
392
            name=name,
393
            table_name=tablename,
394
395
396
397
398
399
            metadata={
                'tzaware': tzaware_serie(ts),
                'index_type': index.dtype.name,
                'index_names': inames,
                'value_type': ts.dtypes.name
            },
400
        )
401
        cn.execute(sql)
402
403
        return table

404
    def _get_ts_table(self, cn, name):
405
        reg = self.schema.registry
406
407
        tablename = self._ts_table_name(name)
        sql = reg.select().where(reg.c.table_name == tablename)
408
        tid = cn.execute(sql).scalar()
409
        if tid:
410
            return self._table_definition_for(name)
411

412
413
    # changeset handling

414
    def _newchangeset(self, cn, author, insertion_date=None, metadata=None):
415
        table = self.schema.changeset
416
417
418
        if insertion_date is not None:
            assert insertion_date.tzinfo is not None
        idate = pd.Timestamp(insertion_date or datetime.utcnow(), tz='UTC')
419
420
        sql = table.insert().values(
            author=author,
421
            metadata=metadata,
422
            insertion_date=idate)
423
        return cn.execute(sql).inserted_primary_key[0]
424

425
    def _changeset_series(self, cn, csid):
426
        cset_serie = self.schema.changeset_series
427
        sql = select([cset_serie.c.serie]
428
        ).where(cset_serie.c.cset == csid)
429

430
        return [seriename for seriename, in cn.execute(sql).fetchall()]
431
432
433

    # insertion handling

434
    def _validate(self, cn, ts, name):
435
436
        if ts.isnull().all():
            # ts erasure
437
            return
438
        tstype = ts.dtype
439
        meta = self.metadata(cn, name)
440
        if tstype != meta['value_type']:
441
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
442
                name, tstype, meta['value_type'])
443
            raise Exception(m)
444
        if ts.index.dtype.name != meta['index_type']:
445
            raise Exception('Incompatible index types')
446

447
448
449
450
451
452
453
    def _finalize_insertion(self, cn, csid, name):
        table = self.schema.changeset_series
        sql = table.insert().values(
            cset=csid,
            serie=name
        )
        cn.execute(sql)