tsio.py 17.4 KB
Newer Older
1
from datetime import datetime
2
from contextlib import contextmanager
3
import logging
4
import zlib
5
6
7
8

import pandas as pd
import numpy as np

9
from sqlalchemy import Table, Column, Integer, ForeignKey
Aurélien Campéas's avatar
Aurélien Campéas committed
10
from sqlalchemy.sql.expression import select, func, desc
11
from sqlalchemy.dialects.postgresql import BYTEA
12

13
from tshistory.schema import SCHEMAS
14
15


16
L = logging.getLogger('tshistory.tsio')
17
18


19
def tojson(ts):
20
21
    if not isinstance(ts.index, pd.MultiIndex):
        return ts.to_json(date_format='iso')
22

23
24
25
    # multi index case
    return ts.to_frame().reset_index().to_json(date_format='iso')

Aurélien Campéas's avatar
Aurélien Campéas committed
26

27
28
29
30
31
def num2float(pdobj):
    # get a Series or a Dataframe column
    if str(pdobj.dtype).startswith('int'):
        return pdobj.astype('float64')
    return pdobj
32

Aurélien Campéas's avatar
Aurélien Campéas committed
33

34
def fromjson(jsonb, tsname):
35
36
37
38
    return _fromjson(jsonb, tsname).fillna(value=np.nan)


def _fromjson(jsonb, tsname):
39
40
41
    if jsonb == '{}':
        return pd.Series(name=tsname)

42
43
    result = pd.read_json(jsonb, typ='series', dtype=False)
    if isinstance(result.index, pd.DatetimeIndex):
44
        result = num2float(result)
45
46
47
48
49
50
51
52
        return result

    # multi index case
    columns = result.index.values.tolist()
    columns.remove(tsname)
    result = pd.read_json(jsonb, typ='frame',
                          convert_dates=columns)
    result.set_index(sorted(columns), inplace=True)
Aurélien Campéas's avatar
Aurélien Campéas committed
53
    return num2float(result.iloc[:, 0])  # get a Series object
54
55


56
class TimeSerie(object):
57
    _csid = None
58
    _snapshot_interval = 10
59
    _precision = 1e-14
60
    namespace = 'tsh'
61
    schema = None
62
63
64

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
65
        self.schema = SCHEMAS[namespace]
66
67
68

    # API : changeset, insert, get, delete
    @contextmanager
69
    def newchangeset(self, cn, author, _insertion_date=None):
70
71
72
73
74
75
        """A context manager to allow insertion of several series within the
        same changeset identifier

        This allows to group changes to several series, hence
        producing a macro-change.

76
77
        _insertion_date is *only* provided for migration purposes and
        not part of the API.
78
        """
79
        assert self._csid is None
80
        self._csid = self._newchangeset(cn, author, _insertion_date)
81
        self._author = author
82
83
        yield
        del self._csid
84
        del self._author
85

86
    def insert(self, cn, newts, name, author=None,
87
               extra_scalars={}):
88
        """Create a new revision of a given time series
89

90
        newts: pandas.Series with date index
91

92
        name: str unique identifier of the serie
93
94
95
96

        author: str free-form author name (mandatory, unless provided
        to the newchangeset context manager).

97
        """
98
99
        assert self._csid or author, 'author is mandatory'
        if self._csid and author:
100
            L.info('author will not be used when in a changeset')
101
        assert isinstance(newts, pd.Series)
102
        assert not newts.index.duplicated().any()
103

104
        newts = num2float(newts)
105

106
        if not len(newts):
107
            return
108

109
        newts.name = name
110
        table = self._get_ts_table(cn, name)
111

112
113
114
115
        if isinstance(newts.index, pd.MultiIndex):
            # we impose an order to survive rountrips
            newts = newts.reorder_levels(sorted(newts.index.names))

116
117
        if table is None:
            # initial insertion
118
119
            if newts.isnull().all():
                return None
120
            newts = newts[~newts.isnull()]
121
122
            table = self._make_ts_table(cn, name)
            csid = self._csid or self._newchangeset(cn, author)
123
            value = {
124
                'csid': csid,
125
                'snapshot': self._serialize(newts),
126
            }
127
128
            # callback for extenders
            self._complete_insertion_value(value, extra_scalars)
129
130
            cn.execute(table.insert().values(value))
            self._finalize_insertion(cn, csid, name)
131
132
            L.info('first insertion of %s (size=%s) by %s',
                   name, len(newts), author or self._author)
133
            return newts
134

135
        diff, newsnapshot = self._compute_diff_and_newsnapshot(
136
            cn, table, newts, **extra_scalars
137
138
        )
        if diff is None:
139
140
            L.info('no difference in %s by %s (for ts of size %s)',
                   name, author or self._author, len(newts))
141
142
            return

143
144
        tip_id = self._get_tip_id(cn, table)
        csid = self._csid or self._newchangeset(cn, author)
145
        value = {
146
            'csid': csid,
147
148
            'diff': self._serialize(diff),
            'snapshot': self._serialize(newsnapshot),
149
150
151
152
            'parent': tip_id,
        }
        # callback for extenders
        self._complete_insertion_value(value, extra_scalars)
153
154
        cn.execute(table.insert().values(value))
        self._finalize_insertion(cn, csid, name)
155

156
        if tip_id > 1 and tip_id % self._snapshot_interval:
157
            self._purge_snapshot_at(cn, table, tip_id)
158
159
        L.info('inserted diff (size=%s) for ts %s by %s',
               len(diff), name, author or self._author)
160
        return diff
161

162
    def get(self, cn, name, revision_date=None):
163
164
165
166
167
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

168
        """
169
        table = self._get_ts_table(cn, name)
170
171
        if table is None:
            return
172

173
174
175
        qfilter = []
        if revision_date:
            qfilter.append(lambda cset, _: cset.c.insertion_date <= revision_date)
176
        current = self._build_snapshot_upto(cn, table, qfilter)
177

178
179
        if current is not None:
            current.name = name
180
            current = current[~current.isnull()]
181
        return current
182

183
184
    def get_group(self, cn, name, revision_date=None):
        csid = self._latest_csid_for(cn, name)
185
186

        group = {}
187
188
        for seriename in self._changeset_series(cn, csid):
            serie = self.get(cn, seriename, revision_date)
189
190
191
192
            if serie is not None:
                group[seriename] = serie
        return group

193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
    def get_history(self, cn, name,
                    from_insertion_date=None,
                    to_insertion_date=None):
        table = self._get_ts_table(cn, name)
        if table is None:
            return

        logs = self.log(cn, names=[name],
                        fromdate=from_insertion_date,
                        todate=to_insertion_date)
        series = []
        for log in logs:
            serie = self.get(cn, name, revision_date=log['date'])
            revdate = pd.Timestamp(log['date'])
            mindex = [(revdate, valuestamp) for valuestamp in serie.index]
            serie.index = pd.MultiIndex.from_tuples(mindex, names=['insertion_date', 'value_date'])
            series.append(serie)
        return pd.concat(series)

212
213
214
    def exists(self, cn, name):
        return self._get_ts_table(cn, name) is not None

215
    def latest_insertion_date(self, cn, name):
216
        cset = self.schema.changeset
217
        tstable = self._get_ts_table(cn, name)
218
219
        sql = select([func.max(cset.c.insertion_date)]
        ).where(tstable.c.csid == cset.c.id)
220
        return cn.execute(sql).scalar()
221

222
    def info(self, cn):
223
224
        """Gather global statistics on the current tshistory repository
        """
225
        sql = 'select count(*) from {}.registry'.format(self.namespace)
226
        stats = {'series count': cn.execute(sql).scalar()}
227
        sql = 'select max(id) from {}.changeset'.format(self.namespace)
228
        stats['changeset count'] = cn.execute(sql).scalar()
229
        sql = 'select distinct name from {}.registry order by name'.format(self.namespace)
230
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
231
232
        return stats

233
234
235
    def log(self, cn, limit=0, diff=False, names=None, authors=None,
            fromrev=None, torev=None,
            fromdate=None, todate=None):
236
237
238
239
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
240
241
242
243
244
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
245

246
247
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date]
        ).distinct().order_by(desc(cset.c.id))
248
249
250
251

        if limit:
            sql = sql.limit(limit)

252
253
254
        if names:
            sql = sql.where(reg.c.name.in_(names))

255
256
257
        if authors:
            sql = sql.where(cset.c.author.in_(authors))

258
259
260
261
262
263
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)

        if torev:
            sql = sql.where(cset.c.id <= torev)

264
265
266
267
268
269
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)

        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)

270
        sql = sql.where(cset.c.id == cset_series.c.csid
271
272
        ).where(cset_series.c.serie == reg.c.name)

273
        rset = cn.execute(sql)
274
275
        for csetid, author, revdate in rset.fetchall():
            log.append({'rev': csetid, 'author': author, 'date': revdate,
276
                        'names': self._changeset_series(cn, csetid)})
277
278
279

        if diff:
            for rev in log:
280
                rev['diff'] = {name: self._diff(cn, rev['rev'], name)
281
282
                               for name in rev['names']}

283
        log.sort(key=lambda rev: rev['rev'])
284
285
        return log

286
287
    # /API
    # Helpers
288

289
290
291
    # ts serialisation

    def _serialize(self, ts):
292
293
        if ts is None:
            return None
294
        return zlib.compress(tojson(ts).encode('utf-8'))
295
296

    def _deserialize(self, ts, name):
297
        return fromjson(zlib.decompress(ts).decode('utf-8'), name)
298

299
    # serie table handling
300

301
302
    def _ts_table_name(self, seriename):
        # namespace.seriename
303
        return '{}.timeserie.{}'.format(self.namespace, seriename)
304

305
    def _table_definition_for(self, seriename):
306
        return Table(
307
            seriename, self.schema.meta,
308
            Column('id', Integer, primary_key=True),
309
310
            Column('csid', Integer,
                   ForeignKey('{}.changeset.id'.format(self.namespace)),
311
                   index=True, nullable=False),
312
            # constraint: there is either .diff or .snapshot
313
314
            Column('diff', BYTEA),
            Column('snapshot', BYTEA),
315
316
            Column('parent',
                   Integer,
317
318
                   ForeignKey('{}.timeserie.{}.id'.format(self.namespace,
                                                          seriename),
319
                              ondelete='cascade'),
320
321
322
                   nullable=True,
                   unique=True,
                   index=True),
323
            schema='{}.timeserie'.format(self.namespace),
324
            extend_existing=True
325
326
        )

327
    def _make_ts_table(self, cn, name):
328
        tablename = self._ts_table_name(name)
329
        table = self._table_definition_for(name)
330
        table.create(cn)
331
        sql = self.schema.registry.insert().values(
332
333
            name=name,
            table_name=tablename)
334
        cn.execute(sql)
335
336
        return table

337
    def _get_ts_table(self, cn, name):
338
        reg = self.schema.registry
339
340
        tablename = self._ts_table_name(name)
        sql = reg.select().where(reg.c.table_name == tablename)
341
        tid = cn.execute(sql).scalar()
342
        if tid:
343
            return self._table_definition_for(name)
344

345
346
    # changeset handling

347
    def _newchangeset(self, cn, author, _insertion_date=None):
348
        table = self.schema.changeset
349
350
        sql = table.insert().values(
            author=author,
351
            insertion_date=_insertion_date or datetime.now())
352
        return cn.execute(sql).inserted_primary_key[0]
353

354
355
    def _latest_csid_for(self, cn, name):
        table = self._get_ts_table(cn, name)
Aurélien Campéas's avatar
Aurélien Campéas committed
356
        sql = select([func.max(table.c.csid)])
357
        return cn.execute(sql).scalar()
358

359
    def _changeset_series(self, cn, csid):
360
        cset_serie = self.schema.changeset_series
361
362
363
        sql = select([cset_serie.c.serie]
        ).where(cset_serie.c.csid == csid)

364
        return [seriename for seriename, in cn.execute(sql).fetchall()]
365
366
367

    # insertion handling

368
    def _get_tip_id(self, cn, table):
369
        sql = select([func.max(table.c.id)])
370
        return cn.execute(sql).scalar()
371

372
373
374
    def _complete_insertion_value(self, value, extra_scalars):
        pass

375
    def _finalize_insertion(self, cn, csid, name):
376
        table = self.schema.changeset_series
377
378
379
380
        sql = table.insert().values(
            csid=csid,
            serie=name
        )
381
        cn.execute(sql)
382

383
384
    # snapshot handling

385
386
    def _purge_snapshot_at(self, cn, table, diffid):
        cn.execute(
387
388
389
390
391
            table.update(
            ).where(table.c.id == diffid
            ).values(snapshot=None)
        )

392
    def _validate_type(self, oldts, newts, name):
393
394
395
        if (oldts is None or
            oldts.isnull().all() or
            newts.isnull().all()):
396
397
398
399
400
401
402
403
            return
        old_type = oldts.dtype
        new_type = newts.dtype
        if new_type != old_type:
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
                name, new_type, old_type)
            raise Exception(m)

404
405
    def _compute_diff_and_newsnapshot(self, cn, table, newts, **extra_scalars):
        snapshot = self._build_snapshot_upto(cn, table)
406
        self._validate_type(snapshot, newts, table.name)
407
408
409
410
411
412
413
414
415
        diff = self._compute_diff(snapshot, newts)

        if len(diff) == 0:
            return None, None

        # full state computation & insertion
        newsnapshot = self._apply_diff(snapshot, diff)
        return diff, newsnapshot

416
    def _find_snapshot(self, cn, table, qfilter=(), column='snapshot'):
417
        cset = self.schema.changeset
418
419
420
        sql = select([table.c.id, table.c[column]]
        ).order_by(desc(table.c.id)
        ).limit(1
421
        ).where(table.c[column] != None)
422
423

        if qfilter:
424
            sql = sql.where(table.c.csid == cset.c.id)
425
426
427
428
            for filtercb in qfilter:
                sql = sql.where(filtercb(cset, table))

        try:
429
            snapid, snapdata = cn.execute(sql).fetchone()
430
431
        except TypeError:
            return None, None
432
        return snapid, self._deserialize(snapdata, table.name)
433

434
435
    def _build_snapshot_upto(self, cn, table, qfilter=()):
        snapid, snapshot = self._find_snapshot(cn, table, qfilter)
436
437
438
        if snapid is None:
            return None

439
        cset = self.schema.changeset
440
        sql = select([table.c.id,
441
                      table.c.diff,
442
                      table.c.parent,
443
444
                      cset.c.insertion_date]
        ).order_by(table.c.id
445
        ).where(table.c.id > snapid)
446

447
448
449
450
        if qfilter:
            sql = sql.where(table.c.csid == cset.c.id)
            for filtercb in qfilter:
                sql = sql.where(filtercb(cset, table))
451

452
        alldiffs = pd.read_sql(sql, cn)
453
454

        if len(alldiffs) == 0:
455
            return snapshot
456

457
        # initial ts
458
459
        ts = snapshot
        for _, row in alldiffs.iterrows():
460
            diff = self._deserialize(row['diff'], table.name)
461
            ts = self._apply_diff(ts, diff)
462
463
        assert ts.index.dtype.name == 'datetime64[ns]' or len(ts) == 0
        return ts
464
465
466

    # diff handling

467
468
    def _diff(self, cn, csetid, name):
        table = self._get_ts_table(cn, name)
469
        cset = self.schema.changeset
470
471
472
473
474
475

        def filtercset(sql):
            return sql.where(table.c.csid == cset.c.id
            ).where(cset.c.id == csetid)

        sql = filtercset(select([table.c.id]))
476
        tsid = cn.execute(sql).scalar()
477
478
479
480
481
482
483

        if tsid == 1:
            sql = select([table.c.snapshot])
        else:
            sql = select([table.c.diff])
        sql = filtercset(sql)

484
        return self._deserialize(cn.execute(sql).scalar(), name)
485

486
    def _compute_diff(self, fromts, tots):
487
488
        """Compute the difference between fromts and tots
        (like in tots - fromts).
489
490

        """
491
        if fromts is None:
492
            return tots
493
494
495
        fromts = fromts[~fromts.isnull()]
        if not len(fromts):
            return tots
Aurélien Campéas's avatar
Aurélien Campéas committed
496

497
498
499
500
501
        mask_overlap = tots.index.isin(fromts.index)
        fromts_overlap = fromts[tots.index[mask_overlap]]
        tots_overlap = tots[mask_overlap]

        if fromts.dtype == 'float64':
502
            mask_equal = np.isclose(fromts_overlap, tots_overlap,
503
                                    rtol=0, atol=self._precision)
504
505
506
        else:
            mask_equal = fromts_overlap == tots_overlap

507
508
509
        mask_na_equal = fromts_overlap.isnull() & tots_overlap.isnull()
        mask_equal = mask_equal | mask_na_equal

510
511
        diff_overlap = tots[mask_overlap][~mask_equal]
        diff_new = tots[~mask_overlap]
512
        diff_new = diff_new[~diff_new.isnull()]
513
        return pd.concat([diff_overlap, diff_new])
514
515
516

    def _apply_diff(self, base_ts, new_ts):
        """Produce a new ts using base_ts as a base and taking any
517
        intersecting and new values from new_ts.
518
519
520
521
522
523
524
525
526
527

        """
        if base_ts is None:
            return new_ts
        if new_ts is None:
            return base_ts
        result_ts = pd.Series([0.0], index=base_ts.index.union(new_ts.index))
        result_ts[base_ts.index] = base_ts
        result_ts[new_ts.index] = new_ts
        result_ts.sort_index(inplace=True)
528
        result_ts.name = base_ts.name
529
        return result_ts