tsio.py 17.2 KB
Newer Older
1
from datetime import datetime
2
from contextlib import contextmanager
3
import logging
4
import hashlib
5
6
7

import pandas as pd

8
from sqlalchemy import Table, Column, Integer, ForeignKey
Aurélien Campéas's avatar
Aurélien Campéas committed
9
from sqlalchemy.sql.expression import select, func, desc
10
from sqlalchemy.dialects.postgresql import BYTEA
11

12
from tshistory.schema import tsschema
13
14
from tshistory.util import (
    inject_in_index,
15
    num2float,
16
    subset,
17
    SeriesServices,
18
19
    tzaware_serie
)
20
from tshistory.snapshot import Snapshot
21

22
L = logging.getLogger('tshistory.tsio')
23
TABLES = {}
24
25


26
class TimeSerie(SeriesServices):
27
    namespace = 'tsh'
28
    schema = None
29
30
31

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
32
33
        self.schema = tsschema(namespace)
        self.schema.define()
34
        self.metadatacache = {}
35

36
37
38
    def insert(self, cn, newts, name, author,
               metadata=None,
               _insertion_date=None):
39
        """Create a new revision of a given time series
40

41
        newts: pandas.Series with date index
42
        name: str unique identifier of the serie
43
        author: str free-form author name
44
        metadata: optional dict for changeset metadata
45
46
        """
        assert isinstance(newts, pd.Series)
Aurélien Campéas's avatar
Aurélien Campéas committed
47
48
        assert isinstance(name, str)
        assert isinstance(author, str)
49
        assert metadata is None or isinstance(metadata, dict)
Aurélien Campéas's avatar
Aurélien Campéas committed
50
        assert _insertion_date is None or isinstance(_insertion_date, datetime)
51
        assert not newts.index.duplicated().any()
52

53
        newts = num2float(newts)
54

55
        if not len(newts):
56
            return
57

58
        assert ('<M8[ns]' == newts.index.dtype or
59
                'datetime' in str(newts.index.dtype) and not
60
61
                isinstance(newts.index, pd.MultiIndex))

62
        newts.name = name
63
        table = self._get_ts_table(cn, name)
64

65
        if table is None:
66
67
            return self._create(cn, newts, name, author,
                                metadata, _insertion_date)
68

69
70
        return self._update(cn, table, newts, name, author,
                            metadata, _insertion_date)
71

72
    def get(self, cn, name, revision_date=None,
73
74
            from_value_date=None, to_value_date=None,
            _keep_nans=False):
75
76
77
78
79
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

80
        """
81
        table = self._get_ts_table(cn, name)
82
83
        if table is None:
            return
84

85
86
87
        qfilter = []
        if revision_date:
            qfilter.append(lambda cset, _: cset.c.insertion_date <= revision_date)
88
        snap = Snapshot(cn, self, name)
89
90
91
        _, current = snap.find(qfilter,
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)
92

93
        if current is not None and not _keep_nans:
94
            current.name = name
95
            current = current[~current.isnull()]
96
        return current
97

98
    def metadata(self, cn, tsname):
99
100
101
102
103
104
105
106
107
108
109
        """Return metadata dict of timeserie."""
        if tsname in self.metadatacache:
            return self.metadatacache[tsname]
        reg = self.schema.registry
        sql = select([reg.c.metadata]).where(
            reg.c.name == tsname
        )
        meta = cn.execute(sql).scalar()
        self.metadatacache[tsname] = meta
        return meta

110
111
112
113
114
115
116
117
118
119
120
121
    def update_metadata(self, cn, tsname, metadata, internal=False):
        assert isinstance(metadata, dict)
        meta = self.metadata(cn, tsname)
        if not internal:
            assert set(meta.keys()).intersection(metadata.keys()) == set()
        meta.update(metadata)
        reg = self.schema.registry
        sql = reg.update().where(
            reg.c.name == tsname
        ).values(metadata=metadata)
        cn.execute(sql)

122
123
124
125
126
127
128
129
    def changeset_metadata(self, cn, csid):
        cset = self.schema.changeset
        sql = 'select metadata from "{ns}".changeset where id = {id}'.format(
            ns=self.namespace,
            id=csid
        )
        return cn.execute(sql).scalar()

130
131
    def get_history(self, cn, name,
                    from_insertion_date=None,
132
133
                    to_insertion_date=None,
                    from_value_date=None,
134
                    to_value_date=None,
135
                    deltabefore=None,
136
                    deltaafter=None):
137
138
139
140
        table = self._get_ts_table(cn, name)
        if table is None:
            return

141
142
143
144
        if deltabefore is not None or deltaafter is not None:
            assert from_value_date is None
            assert to_value_date is None

145
        cset = self.schema.changeset
146
147
148
149
150
151
152
153
154
155
156
157
        revsql = select(
            [cset.c.id, cset.c.insertion_date]
        ).order_by(
            cset.c.id
        ).where(
            table.c.cset == cset.c.id
        )

        if from_insertion_date:
            revsql = revsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            revsql = revsql.where(cset.c.insertion_date <= to_insertion_date)
158

159
160
161
        revs = cn.execute(revsql).fetchall()
        if not revs:
            return
162

163
164
165
        snapshot = Snapshot(cn, self, name)
        series = []
        for csid, idate in revs:
166
167
168
169
            if (deltabefore, deltaafter) != (None, None):
                from_value_date = None
                to_value_date = None
                if deltabefore is not None:
170
                    from_value_date = idate - deltabefore
171
                if deltaafter is not None:
172
                    to_value_date = idate + deltaafter
173
174
175
176
177
178
            series.append((
                idate,
                snapshot.find([lambda cset, _: cset.c.id == csid],
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)[1]
            ))
179
180

        for revdate, serie in series:
181
            inject_in_index(serie, revdate)
182
183
184
185

        serie = pd.concat([serie for revdate_, serie in series])
        serie.name = name
        return serie
186

187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
    def get_delta(self, cn, name, delta):
        histo = self.get_history(
            cn, name, deltabefore=-delta
        )

        df = histo.reset_index()
        # df_date is a dataframe with two columns: value_date and insertion_date
        df_date = df.loc[:, ['insertion_date', 'value_date']]

        # now in selected_dates each value_date has only one occurence
        # which is the last inserted
        selected_dates = df_date.groupby('value_date').max().reset_index()

        ts = df[name]
        # ts is built from the df returned from get_history
        # ts index is now a simple index of tuples (insert_date, value_date)
        ts.index = ((row.insertion_date, row.value_date)
                    for row in df.itertuples())
        # in ts, there ie still all the couple value_date * insertion_date
        # We now used the selected_dates to select in ts only
        # the couple (value_date, insertion_date)
        # which corresponds to the last insertion_date
        ts_select = ts[[(row[2], row[1])
                        for row in selected_dates.itertuples()]]

        # ts_select has still a simple index of tuples (value_date, insertion_date)
        new_index = (elt[1] for elt in ts_select.index)

        # we only keep the value_date information from the index
        ts_select.index = new_index
        return ts_select


220
221
222
    def exists(self, cn, name):
        return self._get_ts_table(cn, name) is not None

223
    def latest_insertion_date(self, cn, name):
224
        cset = self.schema.changeset
225
        tstable = self._get_ts_table(cn, name)
226
        sql = select([func.max(cset.c.insertion_date)]
227
        ).where(tstable.c.cset == cset.c.id)
228
        return cn.execute(sql).scalar()
229

230
231
232
233
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
        assert mode in ('strict', 'before', 'after')
        cset = self.schema.changeset
        table = self._table_definition_for(seriename)
234
235
        sql = select([table.c.cset]).where(
            table.c.cset == cset.c.id
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
        )
        if mode == 'strict':
            sql = sql.where(cset.c.insertion_date == revdate)
        elif mode == 'before':
            sql = sql.where(cset.c.insertion_date <= revdate)
        else:
            sql = sql.where(cset.c.insertion_date >= revdate)
        return cn.execute(sql).scalar()

    def strip(self, cn, seriename, csid):
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs

        # put stripping info in the metadata
        cset = self.schema.changeset
        cset_serie = self.schema.changeset_series
        for log in logs:
            # update changeset.metadata
254
            metadata = self.changeset_metadata(cn, log['rev']) or {}
255
256
257
258
259
260
            metadata['tshistory.info'] = 'got stripped from {}'.format(csid)
            sql = cset.update().where(cset.c.id == log['rev']
            ).values(metadata=metadata)
            cn.execute(sql)
            # delete changset_serie item
            sql = cset_serie.delete().where(
261
                cset_serie.c.cset == log['rev']
262
263
264
265
266
267
268
            ).where(
                cset_serie.c.serie == seriename
            )
            cn.execute(sql)

        # wipe the diffs
        table = self._table_definition_for(seriename)
269
        cn.execute(table.delete().where(table.c.cset >= csid))
270

271
    def info(self, cn):
272
273
        """Gather global statistics on the current tshistory repository
        """
274
        sql = 'select count(*) from {}.registry'.format(self.namespace)
275
        stats = {'series count': cn.execute(sql).scalar()}
276
        sql = 'select max(id) from {}.changeset'.format(self.namespace)
277
        stats['changeset count'] = cn.execute(sql).scalar()
278
        sql = 'select distinct name from {}.registry order by name'.format(self.namespace)
279
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
280
281
        return stats

282
    def log(self, cn, limit=0, names=None, authors=None,
283
            stripped=False,
284
285
            fromrev=None, torev=None,
            fromdate=None, todate=None):
286
287
288
289
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
290
291
292
293
294
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
295

296
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date, cset.c.metadata]
297
        ).distinct().order_by(desc(cset.c.id))
298
299
300

        if limit:
            sql = sql.limit(limit)
301
302
        if names:
            sql = sql.where(reg.c.name.in_(names))
303
304
        if authors:
            sql = sql.where(cset.c.author.in_(authors))
305
306
307
308
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)
        if torev:
            sql = sql.where(cset.c.id <= torev)
309
310
311
312
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)
        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)
313
314
315
316
        if stripped:
            # outerjoin to show dead things
            sql = sql.select_from(cset.outerjoin(cset_series))
        else:
317
            sql = sql.where(cset.c.id == cset_series.c.cset
318
            ).where(cset_series.c.serie == reg.c.name)
319

320
        rset = cn.execute(sql)
321
        for csetid, author, revdate, meta in rset.fetchall():
322
323
            log.append({'rev': csetid, 'author': author,
                        'date': pd.Timestamp(revdate, tz='utc'),
324
                        'meta': meta or {},
325
                        'names': self._changeset_series(cn, csetid)})
326

327
        log.sort(key=lambda rev: rev['rev'])
328
329
        return log

330
331
    # /API
    # Helpers
332

333
334
    # creation / update

335
336
    def _create(self, cn, newts, name, author,
                metadata=None, insertion_date=None):
337
338
339
340
        # initial insertion
        if len(newts) == 0:
            return None
        snapshot = Snapshot(cn, self, name)
341
        csid = self._newchangeset(cn, author, insertion_date, metadata)
342
343
344
345
346
347
348
349
350
        head = snapshot.create(newts)
        value = {
            'cset': csid,
            'snapshot': head
        }
        table = self._make_ts_table(cn, name, newts)
        cn.execute(table.insert().values(value))
        self._finalize_insertion(cn, csid, name)
        L.info('first insertion of %s (size=%s) by %s',
Aurélien Campéas's avatar
Aurélien Campéas committed
351
               name, len(newts), author)
352
353
        return newts

354
355
    def _update(self, cn, table, newts, name, author,
                metadata=None, insertion_date=None):
356
357
        self._validate(cn, newts, name)
        snapshot = Snapshot(cn, self, name)
358
359
360
        diff = self.diff(snapshot.last(newts.index.min(),
                                       newts.index.max()),
                         newts)
361
362
        if not len(diff):
            L.info('no difference in %s by %s (for ts of size %s)',
Aurélien Campéas's avatar
Aurélien Campéas committed
363
                   name, author, len(newts))
364
365
            return

366
        csid = self._newchangeset(cn, author, insertion_date, metadata)
367
368
369
370
371
372
373
374
375
        head = snapshot.update(diff)
        value = {
            'cset': csid,
            'snapshot': head
        }
        cn.execute(table.insert().values(value))
        self._finalize_insertion(cn, csid, name)

        L.info('inserted diff (size=%s) for ts %s by %s',
Aurélien Campéas's avatar
Aurélien Campéas committed
376
               len(diff), name, author)
377
378
        return diff

379
380
    # ts serialisation

381
382
383
384
385
    def _ensure_tz_consistency(self, cn, ts):
        """Return timeserie with tz aware index or not depending on metadata
        tzaware.
        """
        assert ts.name is not None
386
        metadata = self.metadata(cn, ts.name)
387
388
389
390
        if metadata and metadata.get('tzaware', False):
            return ts.tz_localize('UTC')
        return ts

391
    # serie table handling
392

393
    def _ts_table_name(self, seriename):
394
        seriename = self._tablename(seriename)
395
        return '{}.timeserie.{}'.format(self.namespace, seriename)
396

397
    def _table_definition_for(self, seriename):
398
        tablename = self._ts_table_name(seriename)
399
        seriename = self._tablename(seriename)
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
        table = TABLES.get(tablename)
        if table is None:
            TABLES[tablename] = table = Table(
                seriename, self.schema.meta,
                Column('id', Integer, primary_key=True),
                Column('cset', Integer,
                       ForeignKey('{}.changeset.id'.format(self.namespace)),
                       index=True, nullable=False),
                Column('snapshot', Integer,
                       ForeignKey('{}.snapshot.{}.id'.format(
                           self.namespace,
                           seriename)),
                       index=True),
                schema='{}.timeserie'.format(self.namespace),
                extend_existing=True
            )
        return table
417

418
    def _make_ts_table(self, cn, name, ts):
419
        tablename = self._ts_table_name(name)
420
        table = self._table_definition_for(name)
421
        table.create(cn)
422
423
        index = ts.index
        inames = [name for name in index.names if name]
424
        sql = self.schema.registry.insert().values(
425
            name=name,
426
            table_name=tablename,
427
428
429
430
431
432
            metadata={
                'tzaware': tzaware_serie(ts),
                'index_type': index.dtype.name,
                'index_names': inames,
                'value_type': ts.dtypes.name
            },
433
        )
434
        cn.execute(sql)
435
436
        return table

437
    def _get_ts_table(self, cn, name):
438
        reg = self.schema.registry
439
440
        tablename = self._ts_table_name(name)
        sql = reg.select().where(reg.c.table_name == tablename)
441
        tid = cn.execute(sql).scalar()
442
        if tid:
443
            return self._table_definition_for(name)
444

445
446
    # changeset handling

447
    def _newchangeset(self, cn, author, insertion_date=None, metadata=None):
448
        table = self.schema.changeset
449
450
451
        if insertion_date is not None:
            assert insertion_date.tzinfo is not None
        idate = pd.Timestamp(insertion_date or datetime.utcnow(), tz='UTC')
452
453
        sql = table.insert().values(
            author=author,
454
            metadata=metadata,
455
            insertion_date=idate)
456
        return cn.execute(sql).inserted_primary_key[0]
457

458
    def _changeset_series(self, cn, csid):
459
        cset_serie = self.schema.changeset_series
460
        sql = select([cset_serie.c.serie]
461
        ).where(cset_serie.c.cset == csid)
462

463
        return [seriename for seriename, in cn.execute(sql).fetchall()]
464
465
466

    # insertion handling

467
    def _validate(self, cn, ts, name):
468
469
        if ts.isnull().all():
            # ts erasure
470
            return
471
        tstype = ts.dtype
472
        meta = self.metadata(cn, name)
473
        if tstype != meta['value_type']:
474
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
475
                name, tstype, meta['value_type'])
476
            raise Exception(m)
477
        if ts.index.dtype.name != meta['index_type']:
478
            raise Exception('Incompatible index types')
479

480
481
482
483
484
485
486
    def _finalize_insertion(self, cn, csid, name):
        table = self.schema.changeset_series
        sql = table.insert().values(
            cset=csid,
            serie=name
        )
        cn.execute(sql)