tsio.py 18.7 KB
Newer Older
1
from datetime import datetime
2
from contextlib import contextmanager
3
import logging
4
import zlib
5
6
7
8

import pandas as pd
import numpy as np

9
from sqlalchemy import Table, Column, Integer, ForeignKey
10
from sqlalchemy.sql.expression import select, func, desc, and_
11
from sqlalchemy.dialects.postgresql import BYTEA
12

13
from tshistory.schema import SCHEMAS
14
15


16
L = logging.getLogger('tshistory.tsio')
17
18


19
def tojson(ts):
20
21
    if not isinstance(ts.index, pd.MultiIndex):
        return ts.to_json(date_format='iso')
22

23
24
25
    # multi index case
    return ts.to_frame().reset_index().to_json(date_format='iso')

Aurélien Campéas's avatar
Aurélien Campéas committed
26

27
28
29
30
31
def num2float(pdobj):
    # get a Series or a Dataframe column
    if str(pdobj.dtype).startswith('int'):
        return pdobj.astype('float64')
    return pdobj
32

Aurélien Campéas's avatar
Aurélien Campéas committed
33

34
def fromjson(jsonb, tsname):
35
36
37
38
    return _fromjson(jsonb, tsname).fillna(value=np.nan)


def _fromjson(jsonb, tsname):
39
40
41
    if jsonb == '{}':
        return pd.Series(name=tsname)

42
43
    result = pd.read_json(jsonb, typ='series', dtype=False)
    if isinstance(result.index, pd.DatetimeIndex):
44
        result = num2float(result)
45
46
47
48
49
50
51
52
        return result

    # multi index case
    columns = result.index.values.tolist()
    columns.remove(tsname)
    result = pd.read_json(jsonb, typ='frame',
                          convert_dates=columns)
    result.set_index(sorted(columns), inplace=True)
Aurélien Campéas's avatar
Aurélien Campéas committed
53
    return num2float(result.iloc[:, 0])  # get a Series object
54
55


56
57
58
59
60
61
def subset(ts, fromdate, todate):
    if fromdate is None and todate is None:
        return ts
    return ts.loc[fromdate:todate]


62
class TimeSerie(object):
63
    _csid = None
64
    _snapshot_interval = 10
65
    _precision = 1e-14
66
    namespace = 'tsh'
67
    schema = None
68
69
70

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
71
        self.schema = SCHEMAS[namespace]
72
73
74

    # API : changeset, insert, get, delete
    @contextmanager
75
    def newchangeset(self, cn, author, _insertion_date=None):
76
77
78
79
80
81
        """A context manager to allow insertion of several series within the
        same changeset identifier

        This allows to group changes to several series, hence
        producing a macro-change.

82
83
        _insertion_date is *only* provided for migration purposes and
        not part of the API.
84
        """
85
        assert self._csid is None
86
        self._csid = self._newchangeset(cn, author, _insertion_date)
87
        self._author = author
88
89
        yield
        del self._csid
90
        del self._author
91

92
    def insert(self, cn, newts, name, author=None,
93
               extra_scalars={}):
94
        """Create a new revision of a given time series
95

96
        newts: pandas.Series with date index
97

98
        name: str unique identifier of the serie
99
100
101
102

        author: str free-form author name (mandatory, unless provided
        to the newchangeset context manager).

103
        """
104
105
        assert self._csid or author, 'author is mandatory'
        if self._csid and author:
106
            L.info('author will not be used when in a changeset')
107
        assert isinstance(newts, pd.Series)
108
        assert not newts.index.duplicated().any()
109

110
        newts = num2float(newts)
111

112
        if not len(newts):
113
            return
114

115
        newts.name = name
116
        table = self._get_ts_table(cn, name)
117

118
119
120
121
        if isinstance(newts.index, pd.MultiIndex):
            # we impose an order to survive rountrips
            newts = newts.reorder_levels(sorted(newts.index.names))

122
123
        if table is None:
            # initial insertion
124
125
            if newts.isnull().all():
                return None
126
            newts = newts[~newts.isnull()]
127
128
            table = self._make_ts_table(cn, name)
            csid = self._csid or self._newchangeset(cn, author)
129
            value = {
130
                'csid': csid,
131
                'snapshot': self._serialize(newts),
132
            }
133
134
            # callback for extenders
            self._complete_insertion_value(value, extra_scalars)
135
136
            cn.execute(table.insert().values(value))
            self._finalize_insertion(cn, csid, name)
137
138
            L.info('first insertion of %s (size=%s) by %s',
                   name, len(newts), author or self._author)
139
            return newts
140

141
        diff, newsnapshot = self._compute_diff_and_newsnapshot(
142
            cn, table, newts, **extra_scalars
143
144
        )
        if diff is None:
145
146
            L.info('no difference in %s by %s (for ts of size %s)',
                   name, author or self._author, len(newts))
147
148
            return

149
150
        tip_id = self._get_tip_id(cn, table)
        csid = self._csid or self._newchangeset(cn, author)
151
        value = {
152
            'csid': csid,
153
154
            'diff': self._serialize(diff),
            'snapshot': self._serialize(newsnapshot),
155
156
157
158
            'parent': tip_id,
        }
        # callback for extenders
        self._complete_insertion_value(value, extra_scalars)
159
160
        cn.execute(table.insert().values(value))
        self._finalize_insertion(cn, csid, name)
161

162
        if tip_id > 1 and tip_id % self._snapshot_interval:
163
            self._purge_snapshot_at(cn, table, tip_id)
164
165
        L.info('inserted diff (size=%s) for ts %s by %s',
               len(diff), name, author or self._author)
166
        return diff
167

168
    def get(self, cn, name, revision_date=None):
169
170
171
172
173
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

174
        """
175
        table = self._get_ts_table(cn, name)
176
177
        if table is None:
            return
178

179
180
181
        qfilter = []
        if revision_date:
            qfilter.append(lambda cset, _: cset.c.insertion_date <= revision_date)
182
        current = self._build_snapshot_upto(cn, table, qfilter)
183

184
185
        if current is not None:
            current.name = name
186
            current = current[~current.isnull()]
187
        return current
188

189
190
    def get_group(self, cn, name, revision_date=None):
        csid = self._latest_csid_for(cn, name)
191
192

        group = {}
193
194
        for seriename in self._changeset_series(cn, csid):
            serie = self.get(cn, seriename, revision_date)
195
196
197
198
            if serie is not None:
                group[seriename] = serie
        return group

199
200
    def get_history(self, cn, name,
                    from_insertion_date=None,
201
202
203
                    to_insertion_date=None,
                    from_value_date=None,
                    to_value_date=None):
204
205
206
207
        table = self._get_ts_table(cn, name)
        if table is None:
            return

208
        # compute diffs above the snapshot
209
210
211
212
213
214
215
216
217
218
219
        cset = self.schema.changeset
        diffsql = select([cset.c.id, cset.c.insertion_date, table.c.diff]
        ).order_by(cset.c.id
        ).where(table.c.csid == cset.c.id)

        if from_insertion_date:
            diffsql = diffsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            diffsql = diffsql.where(cset.c.insertion_date <= to_insertion_date)

        diffs = cn.execute(diffsql).fetchall()
220
221
222
223
224
225
226
227
228
229
        if not diffs:
            # it's fine to ask for an insertion date range
            # where noting did happen, but you get nothing
            return

        csid, revdate, diff_ = diffs[0]
        snapshot = self._build_snapshot_upto(cn, table, [
            lambda cset, _: cset.c.id == csid
        ])

230
        series = [(revdate, subset(snapshot, from_value_date, to_value_date))]
231
        for csid_, revdate, diff in diffs[1:]:
232
233
            diff = subset(self._deserialize(diff, table.name),
                          from_value_date, to_value_date)
234
235
236
237
            serie = self._apply_diff(series[-1][1], diff)
            series.append((revdate, serie))

        for revdate, serie in series:
238
            mindex = [(revdate, valuestamp) for valuestamp in serie.index]
239
240
241
242
243
244
245
            serie.index = pd.MultiIndex.from_tuples(mindex, names=[
                'insertion_date', 'value_date']
            )

        serie = pd.concat([serie for revdate_, serie in series])
        serie.name = name
        return serie
246

247
248
249
    def exists(self, cn, name):
        return self._get_ts_table(cn, name) is not None

250
    def latest_insertion_date(self, cn, name):
251
        cset = self.schema.changeset
252
        tstable = self._get_ts_table(cn, name)
253
254
        sql = select([func.max(cset.c.insertion_date)]
        ).where(tstable.c.csid == cset.c.id)
255
        return cn.execute(sql).scalar()
256

257
    def info(self, cn):
258
259
        """Gather global statistics on the current tshistory repository
        """
260
        sql = 'select count(*) from {}.registry'.format(self.namespace)
261
        stats = {'series count': cn.execute(sql).scalar()}
262
        sql = 'select max(id) from {}.changeset'.format(self.namespace)
263
        stats['changeset count'] = cn.execute(sql).scalar()
264
        sql = 'select distinct name from {}.registry order by name'.format(self.namespace)
265
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
266
267
        return stats

268
269
270
    def log(self, cn, limit=0, diff=False, names=None, authors=None,
            fromrev=None, torev=None,
            fromdate=None, todate=None):
271
272
273
274
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
275
276
277
278
279
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
280

281
282
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date]
        ).distinct().order_by(desc(cset.c.id))
283
284
285
286

        if limit:
            sql = sql.limit(limit)

287
288
289
        if names:
            sql = sql.where(reg.c.name.in_(names))

290
291
292
        if authors:
            sql = sql.where(cset.c.author.in_(authors))

293
294
295
296
297
298
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)

        if torev:
            sql = sql.where(cset.c.id <= torev)

299
300
301
302
303
304
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)

        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)

305
        sql = sql.where(cset.c.id == cset_series.c.csid
306
307
        ).where(cset_series.c.serie == reg.c.name)

308
        rset = cn.execute(sql)
309
310
        for csetid, author, revdate in rset.fetchall():
            log.append({'rev': csetid, 'author': author, 'date': revdate,
311
                        'names': self._changeset_series(cn, csetid)})
312
313
314

        if diff:
            for rev in log:
315
                rev['diff'] = {name: self._diff(cn, rev['rev'], name)
316
317
                               for name in rev['names']}

318
        log.sort(key=lambda rev: rev['rev'])
319
320
        return log

321
322
    # /API
    # Helpers
323

324
325
326
    # ts serialisation

    def _serialize(self, ts):
327
328
        if ts is None:
            return None
329
        return zlib.compress(tojson(ts).encode('utf-8'))
330
331

    def _deserialize(self, ts, name):
332
        return fromjson(zlib.decompress(ts).decode('utf-8'), name)
333

334
    # serie table handling
335

336
337
    def _ts_table_name(self, seriename):
        # namespace.seriename
338
        return '{}.timeserie.{}'.format(self.namespace, seriename)
339

340
    def _table_definition_for(self, seriename):
341
        return Table(
342
            seriename, self.schema.meta,
343
            Column('id', Integer, primary_key=True),
344
345
            Column('csid', Integer,
                   ForeignKey('{}.changeset.id'.format(self.namespace)),
346
                   index=True, nullable=False),
347
            # constraint: there is either .diff or .snapshot
348
349
            Column('diff', BYTEA),
            Column('snapshot', BYTEA),
350
351
            Column('parent',
                   Integer,
352
353
                   ForeignKey('{}.timeserie.{}.id'.format(self.namespace,
                                                          seriename),
354
                              ondelete='cascade'),
355
356
357
                   nullable=True,
                   unique=True,
                   index=True),
358
            schema='{}.timeserie'.format(self.namespace),
359
            extend_existing=True
360
361
        )

362
    def _make_ts_table(self, cn, name):
363
        tablename = self._ts_table_name(name)
364
        table = self._table_definition_for(name)
365
        table.create(cn)
366
        sql = self.schema.registry.insert().values(
367
368
            name=name,
            table_name=tablename)
369
        cn.execute(sql)
370
371
        return table

372
    def _get_ts_table(self, cn, name):
373
        reg = self.schema.registry
374
375
        tablename = self._ts_table_name(name)
        sql = reg.select().where(reg.c.table_name == tablename)
376
        tid = cn.execute(sql).scalar()
377
        if tid:
378
            return self._table_definition_for(name)
379

380
381
    # changeset handling

382
    def _newchangeset(self, cn, author, _insertion_date=None):
383
        table = self.schema.changeset
384
385
        sql = table.insert().values(
            author=author,
386
            insertion_date=_insertion_date or datetime.now())
387
        return cn.execute(sql).inserted_primary_key[0]
388

389
390
    def _latest_csid_for(self, cn, name):
        table = self._get_ts_table(cn, name)
Aurélien Campéas's avatar
Aurélien Campéas committed
391
        sql = select([func.max(table.c.csid)])
392
        return cn.execute(sql).scalar()
393

394
    def _changeset_series(self, cn, csid):
395
        cset_serie = self.schema.changeset_series
396
397
398
        sql = select([cset_serie.c.serie]
        ).where(cset_serie.c.csid == csid)

399
        return [seriename for seriename, in cn.execute(sql).fetchall()]
400
401
402

    # insertion handling

403
    def _get_tip_id(self, cn, table):
404
        sql = select([func.max(table.c.id)])
405
        return cn.execute(sql).scalar()
406

407
408
409
    def _complete_insertion_value(self, value, extra_scalars):
        pass

410
    def _finalize_insertion(self, cn, csid, name):
411
        table = self.schema.changeset_series
412
413
414
415
        sql = table.insert().values(
            csid=csid,
            serie=name
        )
416
        cn.execute(sql)
417

418
419
    # snapshot handling

420
421
    def _purge_snapshot_at(self, cn, table, diffid):
        cn.execute(
422
423
424
425
426
            table.update(
            ).where(table.c.id == diffid
            ).values(snapshot=None)
        )

427
    def _validate_type(self, oldts, newts, name):
428
429
430
        if (oldts is None or
            oldts.isnull().all() or
            newts.isnull().all()):
431
432
433
434
435
436
437
438
            return
        old_type = oldts.dtype
        new_type = newts.dtype
        if new_type != old_type:
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
                name, new_type, old_type)
            raise Exception(m)

439
440
    def _compute_diff_and_newsnapshot(self, cn, table, newts, **extra_scalars):
        snapshot = self._build_snapshot_upto(cn, table)
441
        self._validate_type(snapshot, newts, table.name)
442
443
444
445
446
447
448
449
450
        diff = self._compute_diff(snapshot, newts)

        if len(diff) == 0:
            return None, None

        # full state computation & insertion
        newsnapshot = self._apply_diff(snapshot, diff)
        return diff, newsnapshot

451
    def _find_snapshot(self, cn, table, qfilter=(), column='snapshot'):
452
        cset = self.schema.changeset
453
454
455
        sql = select([table.c.id, table.c[column]]
        ).order_by(desc(table.c.id)
        ).limit(1
456
        ).where(table.c[column] != None)
457
458

        if qfilter:
459
            sql = sql.where(table.c.csid <= cset.c.id)
460
461
462
463
            for filtercb in qfilter:
                sql = sql.where(filtercb(cset, table))

        try:
464
            snapid, snapdata = cn.execute(sql).fetchone()
465
466
        except TypeError:
            return None, None
467
        return snapid, self._deserialize(snapdata, table.name)
468

469
470
    def _build_snapshot_upto(self, cn, table, qfilter=()):
        snapid, snapshot = self._find_snapshot(cn, table, qfilter)
471
472
473
        if snapid is None:
            return None

474
        cset = self.schema.changeset
475
        sql = select([table.c.id,
476
                      table.c.diff,
477
                      table.c.parent,
478
479
                      cset.c.insertion_date]
        ).order_by(table.c.id
480
        ).where(table.c.id > snapid)
481

482
483
484
485
        if qfilter:
            sql = sql.where(table.c.csid == cset.c.id)
            for filtercb in qfilter:
                sql = sql.where(filtercb(cset, table))
486

487
        alldiffs = pd.read_sql(sql, cn)
488
489

        if len(alldiffs) == 0:
490
            return snapshot
491

492
        # initial ts
493
494
495
        ts = self._deserialize(alldiffs.loc[0, 'diff'], table.name)
        for _, row in alldiffs.loc[1:].itertuples():
            diff = self._deserialize(row.diff, table.name)
496
            ts = self._apply_diff(ts, diff)
497
        ts = self._apply_diff(snapshot, ts)
498
499
        assert ts.index.dtype.name == 'datetime64[ns]' or len(ts) == 0
        return ts
500
501
502

    # diff handling

503
504
    def _diff(self, cn, csetid, name):
        table = self._get_ts_table(cn, name)
505
        cset = self.schema.changeset
506
507
508
509
510
511

        def filtercset(sql):
            return sql.where(table.c.csid == cset.c.id
            ).where(cset.c.id == csetid)

        sql = filtercset(select([table.c.id]))
512
        tsid = cn.execute(sql).scalar()
513
514
515
516
517
518
519

        if tsid == 1:
            sql = select([table.c.snapshot])
        else:
            sql = select([table.c.diff])
        sql = filtercset(sql)

520
        return self._deserialize(cn.execute(sql).scalar(), name)
521

522
    def _compute_diff(self, fromts, tots):
523
524
        """Compute the difference between fromts and tots
        (like in tots - fromts).
525
526

        """
527
        if fromts is None:
528
            return tots
529
530
531
        fromts = fromts[~fromts.isnull()]
        if not len(fromts):
            return tots
Aurélien Campéas's avatar
Aurélien Campéas committed
532

533
534
535
536
537
        mask_overlap = tots.index.isin(fromts.index)
        fromts_overlap = fromts[tots.index[mask_overlap]]
        tots_overlap = tots[mask_overlap]

        if fromts.dtype == 'float64':
538
            mask_equal = np.isclose(fromts_overlap, tots_overlap,
539
                                    rtol=0, atol=self._precision)
540
541
542
        else:
            mask_equal = fromts_overlap == tots_overlap

543
544
545
        mask_na_equal = fromts_overlap.isnull() & tots_overlap.isnull()
        mask_equal = mask_equal | mask_na_equal

546
547
        diff_overlap = tots[mask_overlap][~mask_equal]
        diff_new = tots[~mask_overlap]
548
        diff_new = diff_new[~diff_new.isnull()]
549
        return pd.concat([diff_overlap, diff_new])
550
551
552

    def _apply_diff(self, base_ts, new_ts):
        """Produce a new ts using base_ts as a base and taking any
553
        intersecting and new values from new_ts.
554
555
556
557
558
559
560
561
562
563

        """
        if base_ts is None:
            return new_ts
        if new_ts is None:
            return base_ts
        result_ts = pd.Series([0.0], index=base_ts.index.union(new_ts.index))
        result_ts[base_ts.index] = base_ts
        result_ts[new_ts.index] = new_ts
        result_ts.sort_index(inplace=True)
564
        result_ts.name = base_ts.name
565
        return result_ts