tsio.py 17.2 KB
Newer Older
1
from datetime import datetime
2
from contextlib import contextmanager
3
import logging
4
5
import pickle
import zlib
6
7
8
9

import pandas as pd
import numpy as np

10
from sqlalchemy import Table, Column, Integer, ForeignKey
Aurélien Campéas's avatar
Aurélien Campéas committed
11
from sqlalchemy.sql.expression import select, func, desc
12
from sqlalchemy.dialects.postgresql import BYTEA
13

14
from tshistory import schema
15
16


17
L = logging.getLogger('tshistory.tsio')
18
19


20
21
22
23
def tojson(ts):
    if ts is None:
        return None

24
25
    if not isinstance(ts.index, pd.MultiIndex):
        return ts.to_json(date_format='iso')
26

27
28
29
    # multi index case
    return ts.to_frame().reset_index().to_json(date_format='iso')

Aurélien Campéas's avatar
Aurélien Campéas committed
30

31
32
33
34
35
def num2float(pdobj):
    # get a Series or a Dataframe column
    if str(pdobj.dtype).startswith('int'):
        return pdobj.astype('float64')
    return pdobj
36

Aurélien Campéas's avatar
Aurélien Campéas committed
37

38
def fromjson(jsonb, tsname):
39
40
41
42
    return _fromjson(jsonb, tsname).fillna(value=np.nan)


def _fromjson(jsonb, tsname):
43
44
45
    if jsonb == '{}':
        return pd.Series(name=tsname)

46
47
    result = pd.read_json(jsonb, typ='series', dtype=False)
    if isinstance(result.index, pd.DatetimeIndex):
48
        result = num2float(result)
49
50
51
52
53
54
55
56
        return result

    # multi index case
    columns = result.index.values.tolist()
    columns.remove(tsname)
    result = pd.read_json(jsonb, typ='frame',
                          convert_dates=columns)
    result.set_index(sorted(columns), inplace=True)
Aurélien Campéas's avatar
Aurélien Campéas committed
57
    return num2float(result.iloc[:, 0])  # get a Series object
58
59


60
class TimeSerie(object):
61
    _csid = None
62
    _snapshot_interval = 10
63
    _precision = 1e-14
64
65
66
67
    namespace = 'tsh'

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
68
69
70

    # API : changeset, insert, get, delete
    @contextmanager
71
    def newchangeset(self, cn, author, _insertion_date=None):
72
73
74
75
76
77
        """A context manager to allow insertion of several series within the
        same changeset identifier

        This allows to group changes to several series, hence
        producing a macro-change.

78
79
        _insertion_date is *only* provided for migration purposes and
        not part of the API.
80
        """
81
        assert self._csid is None
82
        self._csid = self._newchangeset(cn, author, _insertion_date)
83
        self._author = author
84
85
        yield
        del self._csid
86
        del self._author
87

88
    def insert(self, cn, newts, name, author=None,
89
               extra_scalars={}):
90
        """Create a new revision of a given time series
91

92
        newts: pandas.Series with date index
93

94
        name: str unique identifier of the serie
95
96
97
98

        author: str free-form author name (mandatory, unless provided
        to the newchangeset context manager).

99
        """
100
101
        assert self._csid or author, 'author is mandatory'
        if self._csid and author:
102
            L.info('author will not be used when in a changeset')
103
        assert isinstance(newts, pd.Series)
104
        assert not newts.index.duplicated().any()
105

106
        newts = num2float(newts)
107

108
        if not len(newts):
109
            return
110

111
        newts.name = name
112
        table = self._get_ts_table(cn, name)
113

114
115
116
117
        if isinstance(newts.index, pd.MultiIndex):
            # we impose an order to survive rountrips
            newts = newts.reorder_levels(sorted(newts.index.names))

118
119
        if table is None:
            # initial insertion
120
121
            if newts.isnull().all():
                return None
122
            newts = newts[~newts.isnull()]
123
124
            table = self._make_ts_table(cn, name)
            csid = self._csid or self._newchangeset(cn, author)
125
            value = {
126
                'csid': csid,
127
                'snapshot': self._serialize(newts),
128
            }
129
130
            # callback for extenders
            self._complete_insertion_value(value, extra_scalars)
131
132
            cn.execute(table.insert().values(value))
            self._finalize_insertion(cn, csid, name)
133
134
            L.info('first insertion of %s (size=%s) by %s',
                   name, len(newts), author or self._author)
135
            return newts
136

137
        diff, newsnapshot = self._compute_diff_and_newsnapshot(
138
            cn, table, newts, **extra_scalars
139
140
        )
        if diff is None:
141
142
            L.info('no difference in %s by %s (for ts of size %s)',
                   name, author or self._author, len(newts))
143
144
            return

145
146
        tip_id = self._get_tip_id(cn, table)
        csid = self._csid or self._newchangeset(cn, author)
147
        value = {
148
            'csid': csid,
149
150
            'diff': self._serialize(diff),
            'snapshot': self._serialize(newsnapshot),
151
152
153
154
            'parent': tip_id,
        }
        # callback for extenders
        self._complete_insertion_value(value, extra_scalars)
155
156
        cn.execute(table.insert().values(value))
        self._finalize_insertion(cn, csid, name)
157

158
        if tip_id > 1 and tip_id % self._snapshot_interval:
159
            self._purge_snapshot_at(cn, table, tip_id)
160
161
        L.info('inserted diff (size=%s) for ts %s by %s',
               len(diff), name, author or self._author)
162
        return diff
163

164
    def get(self, cn, name, revision_date=None):
165
166
167
168
169
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

170
        """
171
        table = self._get_ts_table(cn, name)
172
173
        if table is None:
            return
174

175
176
177
        qfilter = []
        if revision_date:
            qfilter.append(lambda cset, _: cset.c.insertion_date <= revision_date)
178
        current = self._build_snapshot_upto(cn, table, qfilter)
179

180
181
        if current is not None:
            current.name = name
182
            current = current[~current.isnull()]
183
        return current
184

185
186
    def get_group(self, cn, name, revision_date=None):
        csid = self._latest_csid_for(cn, name)
187
188

        group = {}
189
190
        for seriename in self._changeset_series(cn, csid):
            serie = self.get(cn, seriename, revision_date)
191
192
193
194
            if serie is not None:
                group[seriename] = serie
        return group

195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
    def get_history(self, cn, name,
                    from_insertion_date=None,
                    to_insertion_date=None):
        table = self._get_ts_table(cn, name)
        if table is None:
            return

        logs = self.log(cn, names=[name],
                        fromdate=from_insertion_date,
                        todate=to_insertion_date)
        series = []
        for log in logs:
            serie = self.get(cn, name, revision_date=log['date'])
            revdate = pd.Timestamp(log['date'])
            mindex = [(revdate, valuestamp) for valuestamp in serie.index]
            serie.index = pd.MultiIndex.from_tuples(mindex, names=['insertion_date', 'value_date'])
            series.append(serie)
        return pd.concat(series)

214
215
216
    def exists(self, cn, name):
        return self._get_ts_table(cn, name) is not None

217
    def latest_insertion_date(self, cn, name):
218
        cset = schema.changeset
219
        tstable = self._get_ts_table(cn, name)
220
221
        sql = select([func.max(cset.c.insertion_date)]
        ).where(tstable.c.csid == cset.c.id)
222
        return cn.execute(sql).scalar()
223

224
    def info(self, cn):
225
226
        """Gather global statistics on the current tshistory repository
        """
227
        sql = 'select count(*) from {}.registry'.format(self.namespace)
228
        stats = {'series count': cn.execute(sql).scalar()}
229
        sql = 'select max(id) from {}.changeset'.format(self.namespace)
230
        stats['changeset count'] = cn.execute(sql).scalar()
231
        sql = 'select distinct name from {}.registry order by name'.format(self.namespace)
232
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
233
234
        return stats

235
236
237
    def log(self, cn, limit=0, diff=False, names=None, authors=None,
            fromrev=None, torev=None,
            fromdate=None, todate=None):
238
239
240
241
242
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
        cset, cset_series, reg = schema.changeset, schema.changeset_series, schema.registry
243

244
245
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date]
        ).distinct().order_by(desc(cset.c.id))
246
247
248
249

        if limit:
            sql = sql.limit(limit)

250
251
252
        if names:
            sql = sql.where(reg.c.name.in_(names))

253
254
255
        if authors:
            sql = sql.where(cset.c.author.in_(authors))

256
257
258
259
260
261
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)

        if torev:
            sql = sql.where(cset.c.id <= torev)

262
263
264
265
266
267
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)

        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)

268
        sql = sql.where(cset.c.id == cset_series.c.csid
269
270
        ).where(cset_series.c.serie == reg.c.name)

271
        rset = cn.execute(sql)
272
273
        for csetid, author, revdate in rset.fetchall():
            log.append({'rev': csetid, 'author': author, 'date': revdate,
274
                        'names': self._changeset_series(cn, csetid)})
275
276
277

        if diff:
            for rev in log:
278
                rev['diff'] = {name: self._diff(cn, rev['rev'], name)
279
280
                               for name in rev['names']}

281
        log.sort(key=lambda rev: rev['rev'])
282
283
        return log

284
285
    # /API
    # Helpers
286

287
288
289
    # ts serialisation

    def _serialize(self, ts):
290
        return zlib.compress(tojson(ts).encode('utf-8'))
291
292

    def _deserialize(self, ts, name):
293
        return fromjson(zlib.decompress(ts).decode('utf-8'), name)
294

295
    # serie table handling
296

297
298
    def _ts_table_name(self, seriename):
        # namespace.seriename
299
        return '{}.timeserie.{}'.format(self.namespace, seriename)
300

301
    def _table_definition_for(self, seriename):
302
        return Table(
303
            seriename, schema.meta,
304
            Column('id', Integer, primary_key=True),
305
306
            Column('csid', Integer,
                   ForeignKey('{}.changeset.id'.format(self.namespace)),
307
                   index=True, nullable=False),
308
            # constraint: there is either .diff or .snapshot
309
310
            Column('diff', BYTEA),
            Column('snapshot', BYTEA),
311
312
            Column('parent',
                   Integer,
313
314
                   ForeignKey('{}.timeserie.{}.id'.format(self.namespace,
                                                          seriename),
315
                              ondelete='cascade'),
316
317
318
                   nullable=True,
                   unique=True,
                   index=True),
319
            schema='{}.timeserie'.format(self.namespace),
320
            extend_existing=True
321
322
        )

323
    def _make_ts_table(self, cn, name):
324
        tablename = self._ts_table_name(name)
325
        table = self._table_definition_for(name)
326
        table.create(cn)
327
        sql = schema.registry.insert().values(
328
329
            name=name,
            table_name=tablename)
330
        cn.execute(sql)
331
332
        return table

333
    def _get_ts_table(self, cn, name):
334
        reg = schema.registry
335
336
        tablename = self._ts_table_name(name)
        sql = reg.select().where(reg.c.table_name == tablename)
337
        tid = cn.execute(sql).scalar()
338
        if tid:
339
            return self._table_definition_for(name)
340

341
342
    # changeset handling

343
    def _newchangeset(self, cn, author, _insertion_date=None):
344
        table = schema.changeset
345
346
        sql = table.insert().values(
            author=author,
347
            insertion_date=_insertion_date or datetime.now())
348
        return cn.execute(sql).inserted_primary_key[0]
349

350
351
    def _latest_csid_for(self, cn, name):
        table = self._get_ts_table(cn, name)
Aurélien Campéas's avatar
Aurélien Campéas committed
352
        sql = select([func.max(table.c.csid)])
353
        return cn.execute(sql).scalar()
354

355
    def _changeset_series(self, cn, csid):
356
        cset_serie = schema.changeset_series
357
358
359
        sql = select([cset_serie.c.serie]
        ).where(cset_serie.c.csid == csid)

360
        return [seriename for seriename, in cn.execute(sql).fetchall()]
361
362
363

    # insertion handling

364
    def _get_tip_id(self, cn, table):
365
        sql = select([func.max(table.c.id)])
366
        return cn.execute(sql).scalar()
367

368
369
370
    def _complete_insertion_value(self, value, extra_scalars):
        pass

371
    def _finalize_insertion(self, cn, csid, name):
372
        table = schema.changeset_series
373
374
375
376
        sql = table.insert().values(
            csid=csid,
            serie=name
        )
377
        cn.execute(sql)
378

379
380
    # snapshot handling

381
382
    def _purge_snapshot_at(self, cn, table, diffid):
        cn.execute(
383
384
385
386
387
            table.update(
            ).where(table.c.id == diffid
            ).values(snapshot=None)
        )

388
    def _validate_type(self, oldts, newts, name):
389
390
391
        if (oldts is None or
            oldts.isnull().all() or
            newts.isnull().all()):
392
393
394
395
396
397
398
399
            return
        old_type = oldts.dtype
        new_type = newts.dtype
        if new_type != old_type:
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
                name, new_type, old_type)
            raise Exception(m)

400
401
    def _compute_diff_and_newsnapshot(self, cn, table, newts, **extra_scalars):
        snapshot = self._build_snapshot_upto(cn, table)
402
        self._validate_type(snapshot, newts, table.name)
403
404
405
406
407
408
409
410
411
        diff = self._compute_diff(snapshot, newts)

        if len(diff) == 0:
            return None, None

        # full state computation & insertion
        newsnapshot = self._apply_diff(snapshot, diff)
        return diff, newsnapshot

412
    def _find_snapshot(self, cn, table, qfilter=(), column='snapshot'):
413
        cset = schema.changeset
414
415
416
        sql = select([table.c.id, table.c[column]]
        ).order_by(desc(table.c.id)
        ).limit(1
417
        ).where(table.c[column] != None)
418
419

        if qfilter:
420
            sql = sql.where(table.c.csid == cset.c.id)
421
422
423
424
            for filtercb in qfilter:
                sql = sql.where(filtercb(cset, table))

        try:
425
            snapid, snapdata = cn.execute(sql).fetchone()
426
427
        except TypeError:
            return None, None
428
        return snapid, self._deserialize(snapdata, table.name)
429

430
431
    def _build_snapshot_upto(self, cn, table, qfilter=()):
        snapid, snapshot = self._find_snapshot(cn, table, qfilter)
432
433
434
        if snapid is None:
            return None

435
        cset = schema.changeset
436
        sql = select([table.c.id,
437
                      table.c.diff,
438
                      table.c.parent,
439
440
                      cset.c.insertion_date]
        ).order_by(table.c.id
441
        ).where(table.c.id > snapid)
442

443
444
445
446
        if qfilter:
            sql = sql.where(table.c.csid == cset.c.id)
            for filtercb in qfilter:
                sql = sql.where(filtercb(cset, table))
447

448
        alldiffs = pd.read_sql(sql, cn)
449
450

        if len(alldiffs) == 0:
451
            return snapshot
452

453
        # initial ts
454
455
        ts = snapshot
        for _, row in alldiffs.iterrows():
456
            diff = self._deserialize(row['diff'], table.name)
457
            ts = self._apply_diff(ts, diff)
458
459
        assert ts.index.dtype.name == 'datetime64[ns]' or len(ts) == 0
        return ts
460
461
462

    # diff handling

463
464
    def _diff(self, cn, csetid, name):
        table = self._get_ts_table(cn, name)
465
466
467
468
469
470
471
        cset = schema.changeset

        def filtercset(sql):
            return sql.where(table.c.csid == cset.c.id
            ).where(cset.c.id == csetid)

        sql = filtercset(select([table.c.id]))
472
        tsid = cn.execute(sql).scalar()
473
474
475
476
477
478
479

        if tsid == 1:
            sql = select([table.c.snapshot])
        else:
            sql = select([table.c.diff])
        sql = filtercset(sql)

480
        return self._deserialize(cn.execute(sql).scalar(), name)
481

482
    def _compute_diff(self, fromts, tots):
483
484
        """Compute the difference between fromts and tots
        (like in tots - fromts).
485
486

        """
487
        if fromts is None:
488
            return tots
489
490
491
        fromts = fromts[~fromts.isnull()]
        if not len(fromts):
            return tots
Aurélien Campéas's avatar
Aurélien Campéas committed
492

493
494
495
496
497
        mask_overlap = tots.index.isin(fromts.index)
        fromts_overlap = fromts[tots.index[mask_overlap]]
        tots_overlap = tots[mask_overlap]

        if fromts.dtype == 'float64':
498
            mask_equal = np.isclose(fromts_overlap, tots_overlap,
499
                                    rtol=0, atol=self._precision)
500
501
502
        else:
            mask_equal = fromts_overlap == tots_overlap

503
504
505
        mask_na_equal = fromts_overlap.isnull() & tots_overlap.isnull()
        mask_equal = mask_equal | mask_na_equal

506
507
        diff_overlap = tots[mask_overlap][~mask_equal]
        diff_new = tots[~mask_overlap]
508
        diff_new = diff_new[~diff_new.isnull()]
509
        return pd.concat([diff_overlap, diff_new])
510
511
512

    def _apply_diff(self, base_ts, new_ts):
        """Produce a new ts using base_ts as a base and taking any
513
        intersecting and new values from new_ts.
514
515
516
517
518
519
520
521
522
523

        """
        if base_ts is None:
            return new_ts
        if new_ts is None:
            return base_ts
        result_ts = pd.Series([0.0], index=base_ts.index.union(new_ts.index))
        result_ts[base_ts.index] = base_ts
        result_ts[new_ts.index] = new_ts
        result_ts.sort_index(inplace=True)
524
        result_ts.name = base_ts.name
525
        return result_ts