tsio.py 19.9 KB
Newer Older
1
from datetime import datetime
2
from contextlib import contextmanager
3
import logging
4
import zlib
5
import math
6
7
8
9

import pandas as pd
import numpy as np

10
from sqlalchemy import Table, Column, Integer, ForeignKey
11
from sqlalchemy.sql.expression import select, func, desc, and_
12
from sqlalchemy.dialects.postgresql import BYTEA
13

14
from tshistory.schema import SCHEMAS
15
16


17
L = logging.getLogger('tshistory.tsio')
18
19


20
def tojson(ts):
21
    if not isinstance(ts.index, pd.MultiIndex):
22
23
        return ts.to_json(date_format='iso',
                          double_precision=-int(math.log10(TimeSerie._precision)))
24

25
26
27
    # multi index case
    return ts.to_frame().reset_index().to_json(date_format='iso')

Aurélien Campéas's avatar
Aurélien Campéas committed
28

29
30
31
32
33
def num2float(pdobj):
    # get a Series or a Dataframe column
    if str(pdobj.dtype).startswith('int'):
        return pdobj.astype('float64')
    return pdobj
34

Aurélien Campéas's avatar
Aurélien Campéas committed
35

36
def fromjson(jsonb, tsname):
37
38
39
40
    return _fromjson(jsonb, tsname).fillna(value=np.nan)


def _fromjson(jsonb, tsname):
41
42
43
    if jsonb == '{}':
        return pd.Series(name=tsname)

44
45
    result = pd.read_json(jsonb, typ='series', dtype=False)
    if isinstance(result.index, pd.DatetimeIndex):
46
        result = num2float(result)
47
48
49
50
51
52
53
54
        return result

    # multi index case
    columns = result.index.values.tolist()
    columns.remove(tsname)
    result = pd.read_json(jsonb, typ='frame',
                          convert_dates=columns)
    result.set_index(sorted(columns), inplace=True)
Aurélien Campéas's avatar
Aurélien Campéas committed
55
    return num2float(result.iloc[:, 0])  # get a Series object
56
57


58
59
60
61
62
63
def subset(ts, fromdate, todate):
    if fromdate is None and todate is None:
        return ts
    return ts.loc[fromdate:todate]


64
class TimeSerie(object):
65
    _csid = None
66
    _snapshot_interval = 10
67
    _precision = 1e-14
68
    namespace = 'tsh'
69
    schema = None
70
71
72

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
73
        self.schema = SCHEMAS[namespace]
74
75
76

    # API : changeset, insert, get, delete
    @contextmanager
77
    def newchangeset(self, cn, author, _insertion_date=None):
78
79
80
81
82
83
        """A context manager to allow insertion of several series within the
        same changeset identifier

        This allows to group changes to several series, hence
        producing a macro-change.

84
85
        _insertion_date is *only* provided for migration purposes and
        not part of the API.
86
        """
87
        assert self._csid is None
88
        self._csid = self._newchangeset(cn, author, _insertion_date)
89
        self._author = author
90
91
        yield
        del self._csid
92
        del self._author
93

94
    def insert(self, cn, newts, name, author=None,
95
               extra_scalars={}):
96
        """Create a new revision of a given time series
97

98
        newts: pandas.Series with date index
99

100
        name: str unique identifier of the serie
101
102
103
104

        author: str free-form author name (mandatory, unless provided
        to the newchangeset context manager).

105
        """
106
107
        assert self._csid or author, 'author is mandatory'
        if self._csid and author:
108
            L.info('author will not be used when in a changeset')
109
        assert isinstance(newts, pd.Series)
110
        assert not newts.index.duplicated().any()
111

112
        newts = num2float(newts)
113

114
        if not len(newts):
115
            return
116

117
        newts.name = name
118
        table = self._get_ts_table(cn, name)
119

120
121
122
123
        if isinstance(newts.index, pd.MultiIndex):
            # we impose an order to survive rountrips
            newts = newts.reorder_levels(sorted(newts.index.names))

124
125
        if table is None:
            # initial insertion
126
127
            if newts.isnull().all():
                return None
128
            newts = newts[~newts.isnull()]
129
130
            table = self._make_ts_table(cn, name)
            csid = self._csid or self._newchangeset(cn, author)
131
            value = {
132
                'csid': csid,
133
                'snapshot': self._serialize(newts),
134
            }
135
136
            # callback for extenders
            self._complete_insertion_value(value, extra_scalars)
137
138
            cn.execute(table.insert().values(value))
            self._finalize_insertion(cn, csid, name)
139
140
            L.info('first insertion of %s (size=%s) by %s',
                   name, len(newts), author or self._author)
141
            return newts
142

143
        diff, newsnapshot = self._compute_diff_and_newsnapshot(
144
            cn, table, newts, **extra_scalars
145
146
        )
        if diff is None:
147
148
            L.info('no difference in %s by %s (for ts of size %s)',
                   name, author or self._author, len(newts))
149
150
            return

151
152
        tip_id = self._get_tip_id(cn, table)
        csid = self._csid or self._newchangeset(cn, author)
153
        value = {
154
            'csid': csid,
155
156
            'diff': self._serialize(diff),
            'snapshot': self._serialize(newsnapshot),
157
158
159
160
            'parent': tip_id,
        }
        # callback for extenders
        self._complete_insertion_value(value, extra_scalars)
161
162
        cn.execute(table.insert().values(value))
        self._finalize_insertion(cn, csid, name)
163

164
        if tip_id > 1 and tip_id % self._snapshot_interval:
165
            self._purge_snapshot_at(cn, table, tip_id)
166
167
        L.info('inserted diff (size=%s) for ts %s by %s',
               len(diff), name, author or self._author)
168
        return diff
169

170
    def get(self, cn, name, revision_date=None):
171
172
173
174
175
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

176
        """
177
        table = self._get_ts_table(cn, name)
178
179
        if table is None:
            return
180

181
182
183
        qfilter = []
        if revision_date:
            qfilter.append(lambda cset, _: cset.c.insertion_date <= revision_date)
184
        current = self._build_snapshot_upto(cn, table, qfilter)
185

186
187
        if current is not None:
            current.name = name
188
            current = current[~current.isnull()]
189
        return current
190

191
192
    def get_group(self, cn, name, revision_date=None):
        csid = self._latest_csid_for(cn, name)
193
194

        group = {}
195
196
        for seriename in self._changeset_series(cn, csid):
            serie = self.get(cn, seriename, revision_date)
197
198
199
200
            if serie is not None:
                group[seriename] = serie
        return group

201
202
    def get_history(self, cn, name,
                    from_insertion_date=None,
203
204
                    to_insertion_date=None,
                    from_value_date=None,
205
206
                    to_value_date=None,
                    diffmode=False):
207
208
209
210
        table = self._get_ts_table(cn, name)
        if table is None:
            return

211
        # compute diffs above the snapshot
212
213
214
215
216
217
218
219
220
221
222
        cset = self.schema.changeset
        diffsql = select([cset.c.id, cset.c.insertion_date, table.c.diff]
        ).order_by(cset.c.id
        ).where(table.c.csid == cset.c.id)

        if from_insertion_date:
            diffsql = diffsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            diffsql = diffsql.where(cset.c.insertion_date <= to_insertion_date)

        diffs = cn.execute(diffsql).fetchall()
223
224
225
226
227
        if not diffs:
            # it's fine to ask for an insertion date range
            # where noting did happen, but you get nothing
            return

228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
        if diffmode:
            series = []
            for csid, revdate, diff in diffs:
                if diff is None:  # we must fetch the initial snapshot
                    sql = select([table.c.snapshot]).where(table.c.csid == csid)
                    diff = cn.execute(sql).scalar()
                serie = subset(self._deserialize(diff, name), from_value_date, to_value_date)
                mindex = [(revdate, valuestamp) for valuestamp in serie.index]
                serie.index = pd.MultiIndex.from_tuples(mindex, names=[
                    'insertion_date', 'value_date']
                )
                series.append(serie)
            series = pd.concat(series)
            series.name = name
            return series

244
245
246
        csid, revdate, diff_ = diffs[0]
        snapshot = self._build_snapshot_upto(cn, table, [
            lambda cset, _: cset.c.id == csid
247
        ], from_value_date, to_value_date)
248

249
        series = [(revdate, subset(snapshot, from_value_date, to_value_date))]
250
        for csid_, revdate, diff in diffs[1:]:
251
252
            diff = subset(self._deserialize(diff, table.name),
                          from_value_date, to_value_date)
253

254
255
256
257
            serie = self._apply_diff(series[-1][1], diff)
            series.append((revdate, serie))

        for revdate, serie in series:
258
            mindex = [(revdate, valuestamp) for valuestamp in serie.index]
259
260
261
262
263
264
265
            serie.index = pd.MultiIndex.from_tuples(mindex, names=[
                'insertion_date', 'value_date']
            )

        serie = pd.concat([serie for revdate_, serie in series])
        serie.name = name
        return serie
266

267
268
269
    def exists(self, cn, name):
        return self._get_ts_table(cn, name) is not None

270
    def latest_insertion_date(self, cn, name):
271
        cset = self.schema.changeset
272
        tstable = self._get_ts_table(cn, name)
273
274
        sql = select([func.max(cset.c.insertion_date)]
        ).where(tstable.c.csid == cset.c.id)
275
        return cn.execute(sql).scalar()
276

277
    def info(self, cn):
278
279
        """Gather global statistics on the current tshistory repository
        """
280
        sql = 'select count(*) from {}.registry'.format(self.namespace)
281
        stats = {'series count': cn.execute(sql).scalar()}
282
        sql = 'select max(id) from {}.changeset'.format(self.namespace)
283
        stats['changeset count'] = cn.execute(sql).scalar()
284
        sql = 'select distinct name from {}.registry order by name'.format(self.namespace)
285
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
286
287
        return stats

288
289
290
    def log(self, cn, limit=0, diff=False, names=None, authors=None,
            fromrev=None, torev=None,
            fromdate=None, todate=None):
291
292
293
294
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
295
296
297
298
299
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
300

301
302
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date]
        ).distinct().order_by(desc(cset.c.id))
303
304
305
306

        if limit:
            sql = sql.limit(limit)

307
308
309
        if names:
            sql = sql.where(reg.c.name.in_(names))

310
311
312
        if authors:
            sql = sql.where(cset.c.author.in_(authors))

313
314
315
316
317
318
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)

        if torev:
            sql = sql.where(cset.c.id <= torev)

319
320
321
322
323
324
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)

        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)

325
        sql = sql.where(cset.c.id == cset_series.c.csid
326
327
        ).where(cset_series.c.serie == reg.c.name)

328
        rset = cn.execute(sql)
329
330
        for csetid, author, revdate in rset.fetchall():
            log.append({'rev': csetid, 'author': author, 'date': revdate,
331
                        'names': self._changeset_series(cn, csetid)})
332
333
334

        if diff:
            for rev in log:
335
                rev['diff'] = {name: self._diff(cn, rev['rev'], name)
336
337
                               for name in rev['names']}

338
        log.sort(key=lambda rev: rev['rev'])
339
340
        return log

341
342
    # /API
    # Helpers
343

344
345
346
    # ts serialisation

    def _serialize(self, ts):
347
348
        if ts is None:
            return None
349
        return zlib.compress(tojson(ts).encode('utf-8'))
350
351

    def _deserialize(self, ts, name):
352
        return fromjson(zlib.decompress(ts).decode('utf-8'), name)
353

354
    # serie table handling
355

356
357
    def _ts_table_name(self, seriename):
        # namespace.seriename
358
        return '{}.timeserie.{}'.format(self.namespace, seriename)
359

360
    def _table_definition_for(self, seriename):
361
        return Table(
362
            seriename, self.schema.meta,
363
            Column('id', Integer, primary_key=True),
364
365
            Column('csid', Integer,
                   ForeignKey('{}.changeset.id'.format(self.namespace)),
366
                   index=True, nullable=False),
367
            # constraint: there is either .diff or .snapshot
368
369
            Column('diff', BYTEA),
            Column('snapshot', BYTEA),
370
371
            Column('parent',
                   Integer,
372
373
                   ForeignKey('{}.timeserie.{}.id'.format(self.namespace,
                                                          seriename),
374
                              ondelete='cascade'),
375
376
377
                   nullable=True,
                   unique=True,
                   index=True),
378
            schema='{}.timeserie'.format(self.namespace),
379
            extend_existing=True
380
381
        )

382
    def _make_ts_table(self, cn, name):
383
        tablename = self._ts_table_name(name)
384
        table = self._table_definition_for(name)
385
        table.create(cn)
386
        sql = self.schema.registry.insert().values(
387
388
            name=name,
            table_name=tablename)
389
        cn.execute(sql)
390
391
        return table

392
    def _get_ts_table(self, cn, name):
393
        reg = self.schema.registry
394
395
        tablename = self._ts_table_name(name)
        sql = reg.select().where(reg.c.table_name == tablename)
396
        tid = cn.execute(sql).scalar()
397
        if tid:
398
            return self._table_definition_for(name)
399

400
401
    # changeset handling

402
    def _newchangeset(self, cn, author, _insertion_date=None):
403
        table = self.schema.changeset
404
405
        sql = table.insert().values(
            author=author,
406
            insertion_date=_insertion_date or datetime.now())
407
        return cn.execute(sql).inserted_primary_key[0]
408

409
410
    def _latest_csid_for(self, cn, name):
        table = self._get_ts_table(cn, name)
Aurélien Campéas's avatar
Aurélien Campéas committed
411
        sql = select([func.max(table.c.csid)])
412
        return cn.execute(sql).scalar()
413

414
    def _changeset_series(self, cn, csid):
415
        cset_serie = self.schema.changeset_series
416
417
418
        sql = select([cset_serie.c.serie]
        ).where(cset_serie.c.csid == csid)

419
        return [seriename for seriename, in cn.execute(sql).fetchall()]
420
421
422

    # insertion handling

423
    def _get_tip_id(self, cn, table):
424
        sql = select([func.max(table.c.id)])
425
        return cn.execute(sql).scalar()
426

427
428
429
    def _complete_insertion_value(self, value, extra_scalars):
        pass

430
    def _finalize_insertion(self, cn, csid, name):
431
        table = self.schema.changeset_series
432
433
434
435
        sql = table.insert().values(
            csid=csid,
            serie=name
        )
436
        cn.execute(sql)
437

438
439
    # snapshot handling

440
441
    def _purge_snapshot_at(self, cn, table, diffid):
        cn.execute(
442
443
444
445
446
            table.update(
            ).where(table.c.id == diffid
            ).values(snapshot=None)
        )

447
    def _validate_type(self, oldts, newts, name):
448
449
450
        if (oldts is None or
            oldts.isnull().all() or
            newts.isnull().all()):
451
452
453
454
455
456
457
458
            return
        old_type = oldts.dtype
        new_type = newts.dtype
        if new_type != old_type:
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
                name, new_type, old_type)
            raise Exception(m)

459
460
    def _compute_diff_and_newsnapshot(self, cn, table, newts, **extra_scalars):
        snapshot = self._build_snapshot_upto(cn, table)
461
        self._validate_type(snapshot, newts, table.name)
462
463
464
465
466
467
468
469
470
        diff = self._compute_diff(snapshot, newts)

        if len(diff) == 0:
            return None, None

        # full state computation & insertion
        newsnapshot = self._apply_diff(snapshot, diff)
        return diff, newsnapshot

471
472
    def _find_snapshot(self, cn, table, qfilter=(), column='snapshot',
                       from_value_date=None, to_value_date=None):
473
        cset = self.schema.changeset
474
475
476
        sql = select([table.c.id, table.c[column]]
        ).order_by(desc(table.c.id)
        ).limit(1
477
        ).where(table.c[column] != None)
478
479

        if qfilter:
480
            sql = sql.where(table.c.csid <= cset.c.id)
481
482
483
484
            for filtercb in qfilter:
                sql = sql.where(filtercb(cset, table))

        try:
485
            snapid, snapdata = cn.execute(sql).fetchone()
486
487
            snapdata = subset(self._deserialize(snapdata, table.name),
                              from_value_date, to_value_date)
488
489
        except TypeError:
            return None, None
490
        return snapid, snapdata
491

492
493
    def _build_snapshot_upto(self, cn, table, qfilter=(),
                             from_value_date=None, to_value_date=None):
494
        snapid, snapshot = self._find_snapshot(cn, table, qfilter)
495
496
497
        if snapid is None:
            return None

498
        cset = self.schema.changeset
499
        sql = select([table.c.id,
500
                      table.c.diff,
501
                      table.c.parent,
502
503
                      cset.c.insertion_date]
        ).order_by(table.c.id
504
        ).where(table.c.id > snapid)
505

506
507
508
509
        if qfilter:
            sql = sql.where(table.c.csid == cset.c.id)
            for filtercb in qfilter:
                sql = sql.where(filtercb(cset, table))
510

511
        alldiffs = pd.read_sql(sql, cn)
512
513

        if len(alldiffs) == 0:
514
            return snapshot
515

516
        # initial ts
517
        ts = self._deserialize(alldiffs.loc[0, 'diff'], table.name)
518
        for row in alldiffs.loc[1:].itertuples():
519
520
            diff = subset(self._deserialize(row.diff, table.name),
                          from_value_date, to_value_date)
521
            ts = self._apply_diff(ts, diff)
522
        ts = self._apply_diff(snapshot, ts)
523
524
        assert ts.index.dtype.name == 'datetime64[ns]' or len(ts) == 0
        return ts
525
526
527

    # diff handling

528
529
    def _diff(self, cn, csetid, name):
        table = self._get_ts_table(cn, name)
530
        cset = self.schema.changeset
531
532
533
534
535
536

        def filtercset(sql):
            return sql.where(table.c.csid == cset.c.id
            ).where(cset.c.id == csetid)

        sql = filtercset(select([table.c.id]))
537
        tsid = cn.execute(sql).scalar()
538
539
540
541
542
543
544

        if tsid == 1:
            sql = select([table.c.snapshot])
        else:
            sql = select([table.c.diff])
        sql = filtercset(sql)

545
        return self._deserialize(cn.execute(sql).scalar(), name)
546

547
    def _compute_diff(self, fromts, tots):
548
549
        """Compute the difference between fromts and tots
        (like in tots - fromts).
550
551

        """
552
        if fromts is None:
553
            return tots
554
555
556
        fromts = fromts[~fromts.isnull()]
        if not len(fromts):
            return tots
Aurélien Campéas's avatar
Aurélien Campéas committed
557

558
559
560
561
562
        mask_overlap = tots.index.isin(fromts.index)
        fromts_overlap = fromts[tots.index[mask_overlap]]
        tots_overlap = tots[mask_overlap]

        if fromts.dtype == 'float64':
563
            mask_equal = np.isclose(fromts_overlap, tots_overlap,
564
                                    rtol=0, atol=self._precision)
565
566
567
        else:
            mask_equal = fromts_overlap == tots_overlap

568
569
570
        mask_na_equal = fromts_overlap.isnull() & tots_overlap.isnull()
        mask_equal = mask_equal | mask_na_equal

571
572
        diff_overlap = tots[mask_overlap][~mask_equal]
        diff_new = tots[~mask_overlap]
573
        diff_new = diff_new[~diff_new.isnull()]
574
        return pd.concat([diff_overlap, diff_new])
575
576
577

    def _apply_diff(self, base_ts, new_ts):
        """Produce a new ts using base_ts as a base and taking any
578
        intersecting and new values from new_ts.
579
580
581
582
583
584
585
586
587
588

        """
        if base_ts is None:
            return new_ts
        if new_ts is None:
            return base_ts
        result_ts = pd.Series([0.0], index=base_ts.index.union(new_ts.index))
        result_ts[base_ts.index] = base_ts
        result_ts[new_ts.index] = new_ts
        result_ts.sort_index(inplace=True)
589
        result_ts.name = base_ts.name
590
        return result_ts