tsio.py 18.1 KB
Newer Older
1
from datetime import datetime
2
from contextlib import contextmanager
3
import logging
4
import zlib
5
6
7
8

import pandas as pd
import numpy as np

9
from sqlalchemy import Table, Column, Integer, ForeignKey
10
from sqlalchemy.sql.expression import select, func, desc, and_
11
from sqlalchemy.dialects.postgresql import BYTEA
12

13
from tshistory.schema import SCHEMAS
14
15


16
L = logging.getLogger('tshistory.tsio')
17
18


19
def tojson(ts):
20
21
    if not isinstance(ts.index, pd.MultiIndex):
        return ts.to_json(date_format='iso')
22

23
24
25
    # multi index case
    return ts.to_frame().reset_index().to_json(date_format='iso')

Aurélien Campéas's avatar
Aurélien Campéas committed
26

27
28
29
30
31
def num2float(pdobj):
    # get a Series or a Dataframe column
    if str(pdobj.dtype).startswith('int'):
        return pdobj.astype('float64')
    return pdobj
32

Aurélien Campéas's avatar
Aurélien Campéas committed
33

34
def fromjson(jsonb, tsname):
35
36
37
38
    return _fromjson(jsonb, tsname).fillna(value=np.nan)


def _fromjson(jsonb, tsname):
39
40
41
    if jsonb == '{}':
        return pd.Series(name=tsname)

42
43
    result = pd.read_json(jsonb, typ='series', dtype=False)
    if isinstance(result.index, pd.DatetimeIndex):
44
        result = num2float(result)
45
46
47
48
49
50
51
52
        return result

    # multi index case
    columns = result.index.values.tolist()
    columns.remove(tsname)
    result = pd.read_json(jsonb, typ='frame',
                          convert_dates=columns)
    result.set_index(sorted(columns), inplace=True)
Aurélien Campéas's avatar
Aurélien Campéas committed
53
    return num2float(result.iloc[:, 0])  # get a Series object
54
55


56
class TimeSerie(object):
57
    _csid = None
58
    _snapshot_interval = 10
59
    _precision = 1e-14
60
    namespace = 'tsh'
61
    schema = None
62
63
64

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
65
        self.schema = SCHEMAS[namespace]
66
67
68

    # API : changeset, insert, get, delete
    @contextmanager
69
    def newchangeset(self, cn, author, _insertion_date=None):
70
71
72
73
74
75
        """A context manager to allow insertion of several series within the
        same changeset identifier

        This allows to group changes to several series, hence
        producing a macro-change.

76
77
        _insertion_date is *only* provided for migration purposes and
        not part of the API.
78
        """
79
        assert self._csid is None
80
        self._csid = self._newchangeset(cn, author, _insertion_date)
81
        self._author = author
82
83
        yield
        del self._csid
84
        del self._author
85

86
    def insert(self, cn, newts, name, author=None,
87
               extra_scalars={}):
88
        """Create a new revision of a given time series
89

90
        newts: pandas.Series with date index
91

92
        name: str unique identifier of the serie
93
94
95
96

        author: str free-form author name (mandatory, unless provided
        to the newchangeset context manager).

97
        """
98
99
        assert self._csid or author, 'author is mandatory'
        if self._csid and author:
100
            L.info('author will not be used when in a changeset')
101
        assert isinstance(newts, pd.Series)
102
        assert not newts.index.duplicated().any()
103

104
        newts = num2float(newts)
105

106
        if not len(newts):
107
            return
108

109
        newts.name = name
110
        table = self._get_ts_table(cn, name)
111

112
113
114
115
        if isinstance(newts.index, pd.MultiIndex):
            # we impose an order to survive rountrips
            newts = newts.reorder_levels(sorted(newts.index.names))

116
117
        if table is None:
            # initial insertion
118
119
            if newts.isnull().all():
                return None
120
            newts = newts[~newts.isnull()]
121
122
            table = self._make_ts_table(cn, name)
            csid = self._csid or self._newchangeset(cn, author)
123
            value = {
124
                'csid': csid,
125
                'snapshot': self._serialize(newts),
126
            }
127
128
            # callback for extenders
            self._complete_insertion_value(value, extra_scalars)
129
130
            cn.execute(table.insert().values(value))
            self._finalize_insertion(cn, csid, name)
131
132
            L.info('first insertion of %s (size=%s) by %s',
                   name, len(newts), author or self._author)
133
            return newts
134

135
        diff, newsnapshot = self._compute_diff_and_newsnapshot(
136
            cn, table, newts, **extra_scalars
137
138
        )
        if diff is None:
139
140
            L.info('no difference in %s by %s (for ts of size %s)',
                   name, author or self._author, len(newts))
141
142
            return

143
144
        tip_id = self._get_tip_id(cn, table)
        csid = self._csid or self._newchangeset(cn, author)
145
        value = {
146
            'csid': csid,
147
148
            'diff': self._serialize(diff),
            'snapshot': self._serialize(newsnapshot),
149
150
151
152
            'parent': tip_id,
        }
        # callback for extenders
        self._complete_insertion_value(value, extra_scalars)
153
154
        cn.execute(table.insert().values(value))
        self._finalize_insertion(cn, csid, name)
155

156
        if tip_id > 1 and tip_id % self._snapshot_interval:
157
            self._purge_snapshot_at(cn, table, tip_id)
158
159
        L.info('inserted diff (size=%s) for ts %s by %s',
               len(diff), name, author or self._author)
160
        return diff
161

162
    def get(self, cn, name, revision_date=None):
163
164
165
166
167
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

168
        """
169
        table = self._get_ts_table(cn, name)
170
171
        if table is None:
            return
172

173
174
175
        qfilter = []
        if revision_date:
            qfilter.append(lambda cset, _: cset.c.insertion_date <= revision_date)
176
        current = self._build_snapshot_upto(cn, table, qfilter)
177

178
179
        if current is not None:
            current.name = name
180
            current = current[~current.isnull()]
181
        return current
182

183
184
    def get_group(self, cn, name, revision_date=None):
        csid = self._latest_csid_for(cn, name)
185
186

        group = {}
187
188
        for seriename in self._changeset_series(cn, csid):
            serie = self.get(cn, seriename, revision_date)
189
190
191
192
            if serie is not None:
                group[seriename] = serie
        return group

193
194
195
196
197
198
199
    def get_history(self, cn, name,
                    from_insertion_date=None,
                    to_insertion_date=None):
        table = self._get_ts_table(cn, name)
        if table is None:
            return

200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
        cset = self.schema.changeset
        diffsql = select([cset.c.id, cset.c.insertion_date, table.c.diff]
        ).order_by(cset.c.id
        ).where(table.c.csid == cset.c.id)

        if from_insertion_date:
            diffsql = diffsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            diffsql = diffsql.where(cset.c.insertion_date <= to_insertion_date)

        diffs = cn.execute(diffsql).fetchall()
        series = [(diffs[0]['insertion_date'],
                   self._build_snapshot_upto(cn, table,
                                             [lambda cset, _: cset.c.id <= diffs[0]['id']]))
        ]
        for csid_, revdate, diff in cn.execute(diffsql).fetchall()[1:]:
            diff = self._deserialize(diff, table.name)
            serie = self._apply_diff(series[-1][1], diff)
            series.append((revdate, serie))

        for revdate, serie in series:
221
            mindex = [(revdate, valuestamp) for valuestamp in serie.index]
222
223
224
225
226
227
228
            serie.index = pd.MultiIndex.from_tuples(mindex, names=[
                'insertion_date', 'value_date']
            )

        serie = pd.concat([serie for revdate_, serie in series])
        serie.name = name
        return serie
229

230
231
232
    def exists(self, cn, name):
        return self._get_ts_table(cn, name) is not None

233
    def latest_insertion_date(self, cn, name):
234
        cset = self.schema.changeset
235
        tstable = self._get_ts_table(cn, name)
236
237
        sql = select([func.max(cset.c.insertion_date)]
        ).where(tstable.c.csid == cset.c.id)
238
        return cn.execute(sql).scalar()
239

240
    def info(self, cn):
241
242
        """Gather global statistics on the current tshistory repository
        """
243
        sql = 'select count(*) from {}.registry'.format(self.namespace)
244
        stats = {'series count': cn.execute(sql).scalar()}
245
        sql = 'select max(id) from {}.changeset'.format(self.namespace)
246
        stats['changeset count'] = cn.execute(sql).scalar()
247
        sql = 'select distinct name from {}.registry order by name'.format(self.namespace)
248
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
249
250
        return stats

251
252
253
    def log(self, cn, limit=0, diff=False, names=None, authors=None,
            fromrev=None, torev=None,
            fromdate=None, todate=None):
254
255
256
257
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
258
259
260
261
262
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
263

264
265
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date]
        ).distinct().order_by(desc(cset.c.id))
266
267
268
269

        if limit:
            sql = sql.limit(limit)

270
271
272
        if names:
            sql = sql.where(reg.c.name.in_(names))

273
274
275
        if authors:
            sql = sql.where(cset.c.author.in_(authors))

276
277
278
279
280
281
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)

        if torev:
            sql = sql.where(cset.c.id <= torev)

282
283
284
285
286
287
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)

        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)

288
        sql = sql.where(cset.c.id == cset_series.c.csid
289
290
        ).where(cset_series.c.serie == reg.c.name)

291
        rset = cn.execute(sql)
292
293
        for csetid, author, revdate in rset.fetchall():
            log.append({'rev': csetid, 'author': author, 'date': revdate,
294
                        'names': self._changeset_series(cn, csetid)})
295
296
297

        if diff:
            for rev in log:
298
                rev['diff'] = {name: self._diff(cn, rev['rev'], name)
299
300
                               for name in rev['names']}

301
        log.sort(key=lambda rev: rev['rev'])
302
303
        return log

304
305
    # /API
    # Helpers
306

307
308
309
    # ts serialisation

    def _serialize(self, ts):
310
311
        if ts is None:
            return None
312
        return zlib.compress(tojson(ts).encode('utf-8'))
313
314

    def _deserialize(self, ts, name):
315
        return fromjson(zlib.decompress(ts).decode('utf-8'), name)
316

317
    # serie table handling
318

319
320
    def _ts_table_name(self, seriename):
        # namespace.seriename
321
        return '{}.timeserie.{}'.format(self.namespace, seriename)
322

323
    def _table_definition_for(self, seriename):
324
        return Table(
325
            seriename, self.schema.meta,
326
            Column('id', Integer, primary_key=True),
327
328
            Column('csid', Integer,
                   ForeignKey('{}.changeset.id'.format(self.namespace)),
329
                   index=True, nullable=False),
330
            # constraint: there is either .diff or .snapshot
331
332
            Column('diff', BYTEA),
            Column('snapshot', BYTEA),
333
334
            Column('parent',
                   Integer,
335
336
                   ForeignKey('{}.timeserie.{}.id'.format(self.namespace,
                                                          seriename),
337
                              ondelete='cascade'),
338
339
340
                   nullable=True,
                   unique=True,
                   index=True),
341
            schema='{}.timeserie'.format(self.namespace),
342
            extend_existing=True
343
344
        )

345
    def _make_ts_table(self, cn, name):
346
        tablename = self._ts_table_name(name)
347
        table = self._table_definition_for(name)
348
        table.create(cn)
349
        sql = self.schema.registry.insert().values(
350
351
            name=name,
            table_name=tablename)
352
        cn.execute(sql)
353
354
        return table

355
    def _get_ts_table(self, cn, name):
356
        reg = self.schema.registry
357
358
        tablename = self._ts_table_name(name)
        sql = reg.select().where(reg.c.table_name == tablename)
359
        tid = cn.execute(sql).scalar()
360
        if tid:
361
            return self._table_definition_for(name)
362

363
364
    # changeset handling

365
    def _newchangeset(self, cn, author, _insertion_date=None):
366
        table = self.schema.changeset
367
368
        sql = table.insert().values(
            author=author,
369
            insertion_date=_insertion_date or datetime.now())
370
        return cn.execute(sql).inserted_primary_key[0]
371

372
373
    def _latest_csid_for(self, cn, name):
        table = self._get_ts_table(cn, name)
Aurélien Campéas's avatar
Aurélien Campéas committed
374
        sql = select([func.max(table.c.csid)])
375
        return cn.execute(sql).scalar()
376

377
    def _changeset_series(self, cn, csid):
378
        cset_serie = self.schema.changeset_series
379
380
381
        sql = select([cset_serie.c.serie]
        ).where(cset_serie.c.csid == csid)

382
        return [seriename for seriename, in cn.execute(sql).fetchall()]
383
384
385

    # insertion handling

386
    def _get_tip_id(self, cn, table):
387
        sql = select([func.max(table.c.id)])
388
        return cn.execute(sql).scalar()
389

390
391
392
    def _complete_insertion_value(self, value, extra_scalars):
        pass

393
    def _finalize_insertion(self, cn, csid, name):
394
        table = self.schema.changeset_series
395
396
397
398
        sql = table.insert().values(
            csid=csid,
            serie=name
        )
399
        cn.execute(sql)
400

401
402
    # snapshot handling

403
404
    def _purge_snapshot_at(self, cn, table, diffid):
        cn.execute(
405
406
407
408
409
            table.update(
            ).where(table.c.id == diffid
            ).values(snapshot=None)
        )

410
    def _validate_type(self, oldts, newts, name):
411
412
413
        if (oldts is None or
            oldts.isnull().all() or
            newts.isnull().all()):
414
415
416
417
418
419
420
421
            return
        old_type = oldts.dtype
        new_type = newts.dtype
        if new_type != old_type:
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
                name, new_type, old_type)
            raise Exception(m)

422
423
    def _compute_diff_and_newsnapshot(self, cn, table, newts, **extra_scalars):
        snapshot = self._build_snapshot_upto(cn, table)
424
        self._validate_type(snapshot, newts, table.name)
425
426
427
428
429
430
431
432
433
        diff = self._compute_diff(snapshot, newts)

        if len(diff) == 0:
            return None, None

        # full state computation & insertion
        newsnapshot = self._apply_diff(snapshot, diff)
        return diff, newsnapshot

434
    def _find_snapshot(self, cn, table, qfilter=(), column='snapshot'):
435
        cset = self.schema.changeset
436
437
438
        sql = select([table.c.id, table.c[column]]
        ).order_by(desc(table.c.id)
        ).limit(1
439
        ).where(table.c[column] != None)
440
441

        if qfilter:
442
            sql = sql.where(table.c.csid == cset.c.id)
443
444
445
446
            for filtercb in qfilter:
                sql = sql.where(filtercb(cset, table))

        try:
447
            snapid, snapdata = cn.execute(sql).fetchone()
448
449
        except TypeError:
            return None, None
450
        return snapid, self._deserialize(snapdata, table.name)
451

452
453
    def _build_snapshot_upto(self, cn, table, qfilter=()):
        snapid, snapshot = self._find_snapshot(cn, table, qfilter)
454
455
456
        if snapid is None:
            return None

457
        cset = self.schema.changeset
458
        sql = select([table.c.id,
459
                      table.c.diff,
460
                      table.c.parent,
461
462
                      cset.c.insertion_date]
        ).order_by(table.c.id
463
        ).where(table.c.id > snapid)
464

465
466
467
468
        if qfilter:
            sql = sql.where(table.c.csid == cset.c.id)
            for filtercb in qfilter:
                sql = sql.where(filtercb(cset, table))
469

470
        alldiffs = pd.read_sql(sql, cn)
471
472

        if len(alldiffs) == 0:
473
            return snapshot
474

475
        # initial ts
476
477
        ts = snapshot
        for _, row in alldiffs.iterrows():
478
            diff = self._deserialize(row['diff'], table.name)
479
            ts = self._apply_diff(ts, diff)
480
481
        assert ts.index.dtype.name == 'datetime64[ns]' or len(ts) == 0
        return ts
482
483
484

    # diff handling

485
486
    def _diff(self, cn, csetid, name):
        table = self._get_ts_table(cn, name)
487
        cset = self.schema.changeset
488
489
490
491
492
493

        def filtercset(sql):
            return sql.where(table.c.csid == cset.c.id
            ).where(cset.c.id == csetid)

        sql = filtercset(select([table.c.id]))
494
        tsid = cn.execute(sql).scalar()
495
496
497
498
499
500
501

        if tsid == 1:
            sql = select([table.c.snapshot])
        else:
            sql = select([table.c.diff])
        sql = filtercset(sql)

502
        return self._deserialize(cn.execute(sql).scalar(), name)
503

504
    def _compute_diff(self, fromts, tots):
505
506
        """Compute the difference between fromts and tots
        (like in tots - fromts).
507
508

        """
509
        if fromts is None:
510
            return tots
511
512
513
        fromts = fromts[~fromts.isnull()]
        if not len(fromts):
            return tots
Aurélien Campéas's avatar
Aurélien Campéas committed
514

515
516
517
518
519
        mask_overlap = tots.index.isin(fromts.index)
        fromts_overlap = fromts[tots.index[mask_overlap]]
        tots_overlap = tots[mask_overlap]

        if fromts.dtype == 'float64':
520
            mask_equal = np.isclose(fromts_overlap, tots_overlap,
521
                                    rtol=0, atol=self._precision)
522
523
524
        else:
            mask_equal = fromts_overlap == tots_overlap

525
526
527
        mask_na_equal = fromts_overlap.isnull() & tots_overlap.isnull()
        mask_equal = mask_equal | mask_na_equal

528
529
        diff_overlap = tots[mask_overlap][~mask_equal]
        diff_new = tots[~mask_overlap]
530
        diff_new = diff_new[~diff_new.isnull()]
531
        return pd.concat([diff_overlap, diff_new])
532
533
534

    def _apply_diff(self, base_ts, new_ts):
        """Produce a new ts using base_ts as a base and taking any
535
        intersecting and new values from new_ts.
536
537
538
539
540
541
542
543
544
545

        """
        if base_ts is None:
            return new_ts
        if new_ts is None:
            return base_ts
        result_ts = pd.Series([0.0], index=base_ts.index.union(new_ts.index))
        result_ts[base_ts.index] = base_ts
        result_ts[new_ts.index] = new_ts
        result_ts.sort_index(inplace=True)
546
        result_ts.name = base_ts.name
547
        return result_ts