tsio.py 18 KB
Newer Older
1
from datetime import datetime
2
from contextlib import contextmanager
3
import logging
4
5
import pickle
import zlib
6
7
8
9

import pandas as pd
import numpy as np

10
from sqlalchemy import Table, Column, Integer, ForeignKey
Aurélien Campéas's avatar
Aurélien Campéas committed
11
from sqlalchemy.sql.expression import select, func, desc
12
from sqlalchemy.dialects.postgresql import JSONB, BYTEA
13

14
from tshistory import schema
15
16


17
18
19
20
21
22
def setuplogging():
    logger = logging.getLogger('tshistory.tsio')
    logger.addHandler(logging.StreamHandler())
    logger.setLevel(logging.INFO)
    return logger

Aurélien Campéas's avatar
Aurélien Campéas committed
23

24
25
26
L = setuplogging()


27
28
29
30
def tojson(ts):
    if ts is None:
        return None

31
32
    if not isinstance(ts.index, pd.MultiIndex):
        return ts.to_json(date_format='iso')
33

34
35
36
    # multi index case
    return ts.to_frame().reset_index().to_json(date_format='iso')

Aurélien Campéas's avatar
Aurélien Campéas committed
37

38
39
40
41
42
def num2float(pdobj):
    # get a Series or a Dataframe column
    if str(pdobj.dtype).startswith('int'):
        return pdobj.astype('float64')
    return pdobj
43

Aurélien Campéas's avatar
Aurélien Campéas committed
44

45
def fromjson(jsonb, tsname):
46
47
48
49
    return _fromjson(jsonb, tsname).fillna(value=np.nan)


def _fromjson(jsonb, tsname):
50
51
52
    if jsonb == '{}':
        return pd.Series(name=tsname)

53
54
    result = pd.read_json(jsonb, typ='series', dtype=False)
    if isinstance(result.index, pd.DatetimeIndex):
55
        result = num2float(result)
56
57
58
59
60
61
62
63
        return result

    # multi index case
    columns = result.index.values.tolist()
    columns.remove(tsname)
    result = pd.read_json(jsonb, typ='frame',
                          convert_dates=columns)
    result.set_index(sorted(columns), inplace=True)
Aurélien Campéas's avatar
Aurélien Campéas committed
64
    return num2float(result.iloc[:, 0])  # get a Series object
65
66


67
class TimeSerie(object):
68
    _csid = None
69
    _snapshot_interval = 10
70
    _precision = 1e-14
71
72
73

    # API : changeset, insert, get, delete
    @contextmanager
74
    def newchangeset(self, cn, author, _insertion_date=None):
75
76
77
78
79
80
        """A context manager to allow insertion of several series within the
        same changeset identifier

        This allows to group changes to several series, hence
        producing a macro-change.

81
82
        _insertion_date is *only* provided for migration purposes and
        not part of the API.
83
        """
84
        assert self._csid is None
85
        self._csid = self._newchangeset(cn, author, _insertion_date)
86
        self._author = author
87
88
        yield
        del self._csid
89
        del self._author
90

91
    def insert(self, cn, newts, name, author=None,
92
               extra_scalars={}):
93
        """Create a new revision of a given time series
94

95
        newts: pandas.Series with date index
96

97
        name: str unique identifier of the serie
98
99
100
101

        author: str free-form author name (mandatory, unless provided
        to the newchangeset context manager).

102
        """
103
104
        assert self._csid or author, 'author is mandatory'
        if self._csid and author:
105
            L.info('author will not be used when in a changeset')
106
        assert isinstance(newts, pd.Series)
107
        assert not newts.index.duplicated().any()
108

109
        newts = num2float(newts)
110

111
        if not len(newts):
112
            return
113

114
        newts.name = name
115
        table = self._get_ts_table(cn, name)
116

117
118
119
120
        if isinstance(newts.index, pd.MultiIndex):
            # we impose an order to survive rountrips
            newts = newts.reorder_levels(sorted(newts.index.names))

121
122
        if table is None:
            # initial insertion
123
124
            if newts.isnull().all():
                return None
125
            newts = newts[~newts.isnull()]
126
127
            table = self._make_ts_table(cn, name)
            csid = self._csid or self._newchangeset(cn, author)
128
            value = {
129
                'csid': csid,
130
                'snapshot': self._serialize(newts),
131
            }
132
133
            # callback for extenders
            self._complete_insertion_value(value, extra_scalars)
134
135
            cn.execute(table.insert().values(value))
            self._finalize_insertion(cn, csid, name)
136
137
            L.info('first insertion of %s (size=%s) by %s',
                   name, len(newts), author or self._author)
138
            return newts
139

140
        diff, newsnapshot = self._compute_diff_and_newsnapshot(
141
            cn, table, newts, **extra_scalars
142
143
        )
        if diff is None:
144
145
            L.info('no difference in %s by %s (for ts of size %s)',
                   name, author or self._author, len(newts))
146
147
            return

148
149
        tip_id = self._get_tip_id(cn, table)
        csid = self._csid or self._newchangeset(cn, author)
150
        value = {
151
            'csid': csid,
152
153
            'diff': self._serialize(diff),
            'snapshot': self._serialize(newsnapshot),
154
155
156
157
            'parent': tip_id,
        }
        # callback for extenders
        self._complete_insertion_value(value, extra_scalars)
158
159
        cn.execute(table.insert().values(value))
        self._finalize_insertion(cn, csid, name)
160

161
        if tip_id > 1 and tip_id % self._snapshot_interval:
162
            self._purge_snapshot_at(cn, table, tip_id)
163
164
        L.info('inserted diff (size=%s) for ts %s by %s',
               len(diff), name, author or self._author)
165
        return diff
166

167
    def get(self, cn, name, revision_date=None):
168
169
170
171
172
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

173
        """
174
        table = self._get_ts_table(cn, name)
175
176
        if table is None:
            return
177

178
179
180
        qfilter = []
        if revision_date:
            qfilter.append(lambda cset, _: cset.c.insertion_date <= revision_date)
181
        current = self._build_snapshot_upto(cn, table, qfilter)
182

183
184
        if current is not None:
            current.name = name
185
            current = current[~current.isnull()]
186
        return current
187

188
189
    def get_group(self, cn, name, revision_date=None):
        csid = self._latest_csid_for(cn, name)
190
191

        group = {}
192
193
        for seriename in self._changeset_series(cn, csid):
            serie = self.get(cn, seriename, revision_date)
194
195
196
197
            if serie is not None:
                group[seriename] = serie
        return group

198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
    def get_history(self, cn, name,
                    from_insertion_date=None,
                    to_insertion_date=None):
        table = self._get_ts_table(cn, name)
        if table is None:
            return

        logs = self.log(cn, names=[name],
                        fromdate=from_insertion_date,
                        todate=to_insertion_date)
        series = []
        for log in logs:
            serie = self.get(cn, name, revision_date=log['date'])
            revdate = pd.Timestamp(log['date'])
            mindex = [(revdate, valuestamp) for valuestamp in serie.index]
            serie.index = pd.MultiIndex.from_tuples(mindex, names=['insertion_date', 'value_date'])
            series.append(serie)
        return pd.concat(series)

217
218
219
    def exists(self, cn, name):
        return self._get_ts_table(cn, name) is not None

220
    def latest_insertion_date(self, cn, name):
221
        cset = schema.changeset
222
        tstable = self._get_ts_table(cn, name)
223
224
        sql = select([func.max(cset.c.insertion_date)]
        ).where(tstable.c.csid == cset.c.id)
225
        return cn.execute(sql).scalar()
226

227
    def info(self, cn):
228
229
230
        """Gather global statistics on the current tshistory repository
        """
        sql = 'select count(*) from registry'
231
        stats = {'series count': cn.execute(sql).scalar()}
232
        sql = 'select max(id) from changeset'
233
        stats['changeset count'] = cn.execute(sql).scalar()
234
        sql = 'select distinct name from registry order by name'
235
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
236
237
        return stats

238
239
240
    def log(self, cn, limit=0, diff=False, names=None, authors=None,
            fromrev=None, torev=None,
            fromdate=None, todate=None):
241
242
243
244
245
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
        cset, cset_series, reg = schema.changeset, schema.changeset_series, schema.registry
246

247
248
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date]
        ).distinct().order_by(desc(cset.c.id))
249
250
251
252

        if limit:
            sql = sql.limit(limit)

253
254
255
        if names:
            sql = sql.where(reg.c.name.in_(names))

256
257
258
        if authors:
            sql = sql.where(cset.c.author.in_(authors))

259
260
261
262
263
264
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)

        if torev:
            sql = sql.where(cset.c.id <= torev)

265
266
267
268
269
270
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)

        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)

271
        sql = sql.where(cset.c.id == cset_series.c.csid
272
273
        ).where(cset_series.c.serie == reg.c.name)

274
        rset = cn.execute(sql)
275
276
        for csetid, author, revdate in rset.fetchall():
            log.append({'rev': csetid, 'author': author, 'date': revdate,
277
                        'names': self._changeset_series(cn, csetid)})
278
279
280

        if diff:
            for rev in log:
281
                rev['diff'] = {name: self._diff(cn, rev['rev'], name)
282
283
                               for name in rev['names']}

284
        log.sort(key=lambda rev: rev['rev'])
285
286
        return log

287
288
    # /API
    # Helpers
289

290
291
292
293
294
295
296
297
    # ts serialisation

    def _serialize(self, ts):
        return tojson(ts)

    def _deserialize(self, ts, name):
        return fromjson(ts, name)

298
    # serie table handling
299

300
301
302
    def _ts_table_name(self, seriename):
        # namespace.seriename
        return 'timeserie.%s' % seriename
303

304
    def _table_definition_for(self, seriename):
305
        return Table(
306
            seriename, schema.meta,
307
            Column('id', Integer, primary_key=True),
308
            Column('csid', Integer, ForeignKey('changeset.id'),
309
                   index=True, nullable=False),
310
            # constraint: there is either .diff or .snapshot
311
312
            Column('diff', JSONB(none_as_null=True)),
            Column('snapshot', JSONB(none_as_null=True)),
313
314
            Column('parent',
                   Integer,
315
                   ForeignKey('timeserie.%s.id' % seriename,
316
                              ondelete='cascade'),
317
318
319
                   nullable=True,
                   unique=True,
                   index=True),
320
321
            schema='timeserie',
            extend_existing=True
322
323
        )

324
    def _make_ts_table(self, cn, name):
325
        tablename = self._ts_table_name(name)
326
        table = self._table_definition_for(name)
327
        table.create(cn)
328
        sql = schema.registry.insert().values(
329
330
            name=name,
            table_name=tablename)
331
        cn.execute(sql)
332
333
        return table

334
    def _get_ts_table(self, cn, name):
335
        reg = schema.registry
336
337
        tablename = self._ts_table_name(name)
        sql = reg.select().where(reg.c.table_name == tablename)
338
        tid = cn.execute(sql).scalar()
339
        if tid:
340
            return self._table_definition_for(name)
341

342
343
    # changeset handling

344
    def _newchangeset(self, cn, author, _insertion_date=None):
345
        table = schema.changeset
346
347
        sql = table.insert().values(
            author=author,
348
            insertion_date=_insertion_date or datetime.now())
349
        return cn.execute(sql).inserted_primary_key[0]
350

351
352
    def _latest_csid_for(self, cn, name):
        table = self._get_ts_table(cn, name)
Aurélien Campéas's avatar
Aurélien Campéas committed
353
        sql = select([func.max(table.c.csid)])
354
        return cn.execute(sql).scalar()
355

356
    def _changeset_series(self, cn, csid):
357
        cset_serie = schema.changeset_series
358
359
360
        sql = select([cset_serie.c.serie]
        ).where(cset_serie.c.csid == csid)

361
        return [seriename for seriename, in cn.execute(sql).fetchall()]
362
363
364

    # insertion handling

365
    def _get_tip_id(self, cn, table):
366
        sql = select([func.max(table.c.id)])
367
        return cn.execute(sql).scalar()
368

369
370
371
    def _complete_insertion_value(self, value, extra_scalars):
        pass

372
    def _finalize_insertion(self, cn, csid, name):
373
        table = schema.changeset_series
374
375
376
377
        sql = table.insert().values(
            csid=csid,
            serie=name
        )
378
        cn.execute(sql)
379

380
381
    # snapshot handling

382
383
    def _purge_snapshot_at(self, cn, table, diffid):
        cn.execute(
384
385
386
387
388
            table.update(
            ).where(table.c.id == diffid
            ).values(snapshot=None)
        )

389
    def _validate_type(self, oldts, newts, name):
390
391
392
        if (oldts is None or
            oldts.isnull().all() or
            newts.isnull().all()):
393
394
395
396
397
398
399
400
            return
        old_type = oldts.dtype
        new_type = newts.dtype
        if new_type != old_type:
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
                name, new_type, old_type)
            raise Exception(m)

401
402
    def _compute_diff_and_newsnapshot(self, cn, table, newts, **extra_scalars):
        snapshot = self._build_snapshot_upto(cn, table)
403
        self._validate_type(snapshot, newts, table.name)
404
405
406
407
408
409
410
411
412
        diff = self._compute_diff(snapshot, newts)

        if len(diff) == 0:
            return None, None

        # full state computation & insertion
        newsnapshot = self._apply_diff(snapshot, diff)
        return diff, newsnapshot

413
    def _find_snapshot(self, cn, table, qfilter=(), column='snapshot'):
414
        cset = schema.changeset
415
416
417
        sql = select([table.c.id, table.c[column]]
        ).order_by(desc(table.c.id)
        ).limit(1
418
        ).where(table.c[column] != None)
419
420

        if qfilter:
421
            sql = sql.where(table.c.csid == cset.c.id)
422
423
424
425
            for filtercb in qfilter:
                sql = sql.where(filtercb(cset, table))

        try:
426
            snapid, snapdata = cn.execute(sql).fetchone()
427
428
        except TypeError:
            return None, None
429
        return snapid, self._deserialize(snapdata, table.name)
430

431
432
    def _build_snapshot_upto(self, cn, table, qfilter=()):
        snapid, snapshot = self._find_snapshot(cn, table, qfilter)
433
434
435
        if snapid is None:
            return None

436
        cset = schema.changeset
437
        sql = select([table.c.id,
438
                      table.c.diff,
439
                      table.c.parent,
440
441
                      cset.c.insertion_date]
        ).order_by(table.c.id
442
        ).where(table.c.id > snapid)
443

444
445
446
447
        if qfilter:
            sql = sql.where(table.c.csid == cset.c.id)
            for filtercb in qfilter:
                sql = sql.where(filtercb(cset, table))
448

449
        alldiffs = pd.read_sql(sql, cn)
450
451

        if len(alldiffs) == 0:
452
            return snapshot
453

454
        # initial ts
455
456
        ts = snapshot
        for _, row in alldiffs.iterrows():
457
            diff = self._deserialize(row['diff'], table.name)
458
            ts = self._apply_diff(ts, diff)
459
460
        assert ts.index.dtype.name == 'datetime64[ns]' or len(ts) == 0
        return ts
461
462
463

    # diff handling

464
465
    def _diff(self, cn, csetid, name):
        table = self._get_ts_table(cn, name)
466
467
468
469
470
471
472
        cset = schema.changeset

        def filtercset(sql):
            return sql.where(table.c.csid == cset.c.id
            ).where(cset.c.id == csetid)

        sql = filtercset(select([table.c.id]))
473
        tsid = cn.execute(sql).scalar()
474
475
476
477
478
479
480

        if tsid == 1:
            sql = select([table.c.snapshot])
        else:
            sql = select([table.c.diff])
        sql = filtercset(sql)

481
        return self._deserialize(cn.execute(sql).scalar(), name)
482

483
    def _compute_diff(self, fromts, tots):
484
485
        """Compute the difference between fromts and tots
        (like in tots - fromts).
486
487

        """
488
        if fromts is None:
489
            return tots
490
491
492
        fromts = fromts[~fromts.isnull()]
        if not len(fromts):
            return tots
Aurélien Campéas's avatar
Aurélien Campéas committed
493

494
495
496
497
498
        mask_overlap = tots.index.isin(fromts.index)
        fromts_overlap = fromts[tots.index[mask_overlap]]
        tots_overlap = tots[mask_overlap]

        if fromts.dtype == 'float64':
499
            mask_equal = np.isclose(fromts_overlap, tots_overlap,
500
                                    rtol=0, atol=self._precision)
501
502
503
        else:
            mask_equal = fromts_overlap == tots_overlap

504
505
506
        mask_na_equal = fromts_overlap.isnull() & tots_overlap.isnull()
        mask_equal = mask_equal | mask_na_equal

507
508
        diff_overlap = tots[mask_overlap][~mask_equal]
        diff_new = tots[~mask_overlap]
509
        diff_new = diff_new[~diff_new.isnull()]
510
        return pd.concat([diff_overlap, diff_new])
511
512
513

    def _apply_diff(self, base_ts, new_ts):
        """Produce a new ts using base_ts as a base and taking any
514
        intersecting and new values from new_ts.
515
516
517
518
519
520
521
522
523
524

        """
        if base_ts is None:
            return new_ts
        if new_ts is None:
            return base_ts
        result_ts = pd.Series([0.0], index=base_ts.index.union(new_ts.index))
        result_ts[base_ts.index] = base_ts
        result_ts[new_ts.index] = new_ts
        result_ts.sort_index(inplace=True)
525
        result_ts.name = base_ts.name
526
        return result_ts
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555


class BigdataTimeSerie(TimeSerie):

    def _table_definition_for(self, seriename):
        return Table(
            seriename, schema.meta,
            Column('id', Integer, primary_key=True),
            Column('csid', Integer, ForeignKey('changeset.id'),
                   index=True, nullable=False),
            # constraint: there is either .diff or .snapshot
            Column('diff', BYTEA),
            Column('snapshot', BYTEA),
            Column('parent',
                   Integer,
                   ForeignKey('timeserie.%s.id' % seriename,
                              ondelete='cascade'),
                   nullable=True,
                   unique=True,
                   index=True),
            schema='timeserie',
            extend_existing=True
        )

    def _serialize(self, ts):
        return zlib.compress(tojson(ts).encode('utf-8'))

    def _deserialize(self, ts, name):
        return fromjson(zlib.decompress(ts).decode('utf-8'), name)