tsio.py 20.1 KB
Newer Older
1
from datetime import datetime
2
from contextlib import contextmanager
3
import logging
4
import zlib
5
import math
6
7
8
9

import pandas as pd
import numpy as np

10
from sqlalchemy import Table, Column, Integer, ForeignKey
Aurélien Campéas's avatar
Aurélien Campéas committed
11
from sqlalchemy.sql.expression import select, func, desc
12
from sqlalchemy.dialects.postgresql import BYTEA
13

14
from tshistory.schema import SCHEMAS
15
16


17
L = logging.getLogger('tshistory.tsio')
18
19


20
def tojson(ts):
21
    if not isinstance(ts.index, pd.MultiIndex):
22
23
        return ts.to_json(date_format='iso',
                          double_precision=-int(math.log10(TimeSerie._precision)))
24

25
26
27
    # multi index case
    return ts.to_frame().reset_index().to_json(date_format='iso')

Aurélien Campéas's avatar
Aurélien Campéas committed
28

29
30
31
32
33
def num2float(pdobj):
    # get a Series or a Dataframe column
    if str(pdobj.dtype).startswith('int'):
        return pdobj.astype('float64')
    return pdobj
34

Aurélien Campéas's avatar
Aurélien Campéas committed
35

36
def fromjson(jsonb, tsname):
37
38
39
40
    return _fromjson(jsonb, tsname).fillna(value=np.nan)


def _fromjson(jsonb, tsname):
41
42
43
    if jsonb == '{}':
        return pd.Series(name=tsname)

44
45
    result = pd.read_json(jsonb, typ='series', dtype=False)
    if isinstance(result.index, pd.DatetimeIndex):
46
        result = num2float(result)
47
48
49
50
51
52
53
54
        return result

    # multi index case
    columns = result.index.values.tolist()
    columns.remove(tsname)
    result = pd.read_json(jsonb, typ='frame',
                          convert_dates=columns)
    result.set_index(sorted(columns), inplace=True)
Aurélien Campéas's avatar
Aurélien Campéas committed
55
    return num2float(result.iloc[:, 0])  # get a Series object
56
57


58
59
60
61
62
63
def subset(ts, fromdate, todate):
    if fromdate is None and todate is None:
        return ts
    return ts.loc[fromdate:todate]


64
class TimeSerie(object):
65
    _csid = None
66
    _snapshot_interval = 10
67
    _precision = 1e-14
68
    namespace = 'tsh'
69
    schema = None
70
71
72

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
73
        self.schema = SCHEMAS[namespace]
74
75
76

    # API : changeset, insert, get, delete
    @contextmanager
77
    def newchangeset(self, cn, author, _insertion_date=None):
78
79
80
81
82
83
        """A context manager to allow insertion of several series within the
        same changeset identifier

        This allows to group changes to several series, hence
        producing a macro-change.

84
85
        _insertion_date is *only* provided for migration purposes and
        not part of the API.
86
        """
87
        assert self._csid is None
88
        self._csid = self._newchangeset(cn, author, _insertion_date)
89
        self._author = author
90
91
        yield
        del self._csid
92
        del self._author
93

94
    def insert(self, cn, newts, name, author=None,
95
               extra_scalars={}):
96
        """Create a new revision of a given time series
97

98
        newts: pandas.Series with date index
99

100
        name: str unique identifier of the serie
101
102
103
104

        author: str free-form author name (mandatory, unless provided
        to the newchangeset context manager).

105
        """
106
107
        assert self._csid or author, 'author is mandatory'
        if self._csid and author:
108
109
            L.info('author r{} will not be used when in a changeset'.format(author))
            author = None
110
        assert isinstance(newts, pd.Series)
111
        assert not newts.index.duplicated().any()
112

113
        newts = num2float(newts)
114

115
        if not len(newts):
116
            return
117

118
119
120
121
        assert ('<M8[ns]' == newts.index.dtype or
                'datetime' in str(newts.index.dtype) or
                isinstance(newts.index, pd.MultiIndex))

122
        newts.name = name
123
        table = self._get_ts_table(cn, name)
124

125
126
127
128
        if isinstance(newts.index, pd.MultiIndex):
            # we impose an order to survive rountrips
            newts = newts.reorder_levels(sorted(newts.index.names))

129
130
        if table is None:
            # initial insertion
131
132
            if newts.isnull().all():
                return None
133
            newts = newts[~newts.isnull()]
134
135
            table = self._make_ts_table(cn, name)
            csid = self._csid or self._newchangeset(cn, author)
136
            value = {
137
                'csid': csid,
138
                'snapshot': self._serialize(newts),
139
            }
140
141
            # callback for extenders
            self._complete_insertion_value(value, extra_scalars)
142
143
            cn.execute(table.insert().values(value))
            self._finalize_insertion(cn, csid, name)
144
145
            L.info('first insertion of %s (size=%s) by %s',
                   name, len(newts), author or self._author)
146
            return newts
147

148
        diff, newsnapshot = self._compute_diff_and_newsnapshot(
149
            cn, table, newts, **extra_scalars
150
151
        )
        if diff is None:
152
153
            L.info('no difference in %s by %s (for ts of size %s)',
                   name, author or self._author, len(newts))
154
155
            return

156
157
        tip_id = self._get_tip_id(cn, table)
        csid = self._csid or self._newchangeset(cn, author)
158
        value = {
159
            'csid': csid,
160
161
            'diff': self._serialize(diff),
            'snapshot': self._serialize(newsnapshot),
162
163
164
165
            'parent': tip_id,
        }
        # callback for extenders
        self._complete_insertion_value(value, extra_scalars)
166
167
        cn.execute(table.insert().values(value))
        self._finalize_insertion(cn, csid, name)
168

169
        if tip_id > 1 and tip_id % self._snapshot_interval:
170
            self._purge_snapshot_at(cn, table, tip_id)
171
172
        L.info('inserted diff (size=%s) for ts %s by %s',
               len(diff), name, author or self._author)
173
        return diff
174

175
    def get(self, cn, name, revision_date=None):
176
177
178
179
180
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

181
        """
182
        table = self._get_ts_table(cn, name)
183
184
        if table is None:
            return
185

186
187
188
        qfilter = []
        if revision_date:
            qfilter.append(lambda cset, _: cset.c.insertion_date <= revision_date)
189
        current = self._build_snapshot_upto(cn, table, qfilter)
190

191
192
        if current is not None:
            current.name = name
193
            current = current[~current.isnull()]
194
        return current
195

196
197
    def get_group(self, cn, name, revision_date=None):
        csid = self._latest_csid_for(cn, name)
198
199

        group = {}
200
201
        for seriename in self._changeset_series(cn, csid):
            serie = self.get(cn, seriename, revision_date)
202
203
204
205
            if serie is not None:
                group[seriename] = serie
        return group

206
207
    def get_history(self, cn, name,
                    from_insertion_date=None,
208
209
                    to_insertion_date=None,
                    from_value_date=None,
210
211
                    to_value_date=None,
                    diffmode=False):
212
213
214
215
        table = self._get_ts_table(cn, name)
        if table is None:
            return

216
        # compute diffs above the snapshot
217
218
219
220
221
222
223
224
225
226
227
        cset = self.schema.changeset
        diffsql = select([cset.c.id, cset.c.insertion_date, table.c.diff]
        ).order_by(cset.c.id
        ).where(table.c.csid == cset.c.id)

        if from_insertion_date:
            diffsql = diffsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            diffsql = diffsql.where(cset.c.insertion_date <= to_insertion_date)

        diffs = cn.execute(diffsql).fetchall()
228
229
230
231
232
        if not diffs:
            # it's fine to ask for an insertion date range
            # where noting did happen, but you get nothing
            return

233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
        if diffmode:
            series = []
            for csid, revdate, diff in diffs:
                if diff is None:  # we must fetch the initial snapshot
                    sql = select([table.c.snapshot]).where(table.c.csid == csid)
                    diff = cn.execute(sql).scalar()
                serie = subset(self._deserialize(diff, name), from_value_date, to_value_date)
                mindex = [(revdate, valuestamp) for valuestamp in serie.index]
                serie.index = pd.MultiIndex.from_tuples(mindex, names=[
                    'insertion_date', 'value_date']
                )
                series.append(serie)
            series = pd.concat(series)
            series.name = name
            return series

249
250
251
        csid, revdate, diff_ = diffs[0]
        snapshot = self._build_snapshot_upto(cn, table, [
            lambda cset, _: cset.c.id == csid
252
        ], from_value_date, to_value_date)
253

254
        series = [(revdate, subset(snapshot, from_value_date, to_value_date))]
255
        for csid_, revdate, diff in diffs[1:]:
256
257
            diff = subset(self._deserialize(diff, table.name),
                          from_value_date, to_value_date)
258

259
260
261
262
            serie = self._apply_diff(series[-1][1], diff)
            series.append((revdate, serie))

        for revdate, serie in series:
263
            mindex = [(revdate, valuestamp) for valuestamp in serie.index]
264
265
266
267
268
269
270
            serie.index = pd.MultiIndex.from_tuples(mindex, names=[
                'insertion_date', 'value_date']
            )

        serie = pd.concat([serie for revdate_, serie in series])
        serie.name = name
        return serie
271

272
273
274
    def exists(self, cn, name):
        return self._get_ts_table(cn, name) is not None

275
    def latest_insertion_date(self, cn, name):
276
        cset = self.schema.changeset
277
        tstable = self._get_ts_table(cn, name)
278
279
        sql = select([func.max(cset.c.insertion_date)]
        ).where(tstable.c.csid == cset.c.id)
280
        return cn.execute(sql).scalar()
281

282
    def info(self, cn):
283
284
        """Gather global statistics on the current tshistory repository
        """
285
        sql = 'select count(*) from {}.registry'.format(self.namespace)
286
        stats = {'series count': cn.execute(sql).scalar()}
287
        sql = 'select max(id) from {}.changeset'.format(self.namespace)
288
        stats['changeset count'] = cn.execute(sql).scalar()
289
        sql = 'select distinct name from {}.registry order by name'.format(self.namespace)
290
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
291
292
        return stats

293
294
295
    def log(self, cn, limit=0, diff=False, names=None, authors=None,
            fromrev=None, torev=None,
            fromdate=None, todate=None):
296
297
298
299
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
300
301
302
303
304
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
305

306
307
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date]
        ).distinct().order_by(desc(cset.c.id))
308
309
310
311

        if limit:
            sql = sql.limit(limit)

312
313
314
        if names:
            sql = sql.where(reg.c.name.in_(names))

315
316
317
        if authors:
            sql = sql.where(cset.c.author.in_(authors))

318
319
320
321
322
323
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)

        if torev:
            sql = sql.where(cset.c.id <= torev)

324
325
326
327
328
329
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)

        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)

330
        sql = sql.where(cset.c.id == cset_series.c.csid
331
332
        ).where(cset_series.c.serie == reg.c.name)

333
        rset = cn.execute(sql)
334
335
        for csetid, author, revdate in rset.fetchall():
            log.append({'rev': csetid, 'author': author, 'date': revdate,
336
                        'names': self._changeset_series(cn, csetid)})
337
338
339

        if diff:
            for rev in log:
340
                rev['diff'] = {name: self._diff(cn, rev['rev'], name)
341
342
                               for name in rev['names']}

343
        log.sort(key=lambda rev: rev['rev'])
344
345
        return log

346
347
    # /API
    # Helpers
348

349
350
351
    # ts serialisation

    def _serialize(self, ts):
352
353
        if ts is None:
            return None
354
        return zlib.compress(tojson(ts).encode('utf-8'))
355
356

    def _deserialize(self, ts, name):
357
        return fromjson(zlib.decompress(ts).decode('utf-8'), name)
358

359
    # serie table handling
360

361
362
    def _ts_table_name(self, seriename):
        # namespace.seriename
363
        return '{}.timeserie.{}'.format(self.namespace, seriename)
364

365
    def _table_definition_for(self, seriename):
366
        return Table(
367
            seriename, self.schema.meta,
368
            Column('id', Integer, primary_key=True),
369
370
            Column('csid', Integer,
                   ForeignKey('{}.changeset.id'.format(self.namespace)),
371
                   index=True, nullable=False),
372
            # constraint: there is either .diff or .snapshot
373
374
            Column('diff', BYTEA),
            Column('snapshot', BYTEA),
375
376
            Column('parent',
                   Integer,
377
378
                   ForeignKey('{}.timeserie.{}.id'.format(self.namespace,
                                                          seriename),
379
                              ondelete='cascade'),
380
381
382
                   nullable=True,
                   unique=True,
                   index=True),
383
            schema='{}.timeserie'.format(self.namespace),
384
            extend_existing=True
385
386
        )

387
    def _make_ts_table(self, cn, name):
388
        tablename = self._ts_table_name(name)
389
        table = self._table_definition_for(name)
390
        table.create(cn)
391
        sql = self.schema.registry.insert().values(
392
393
            name=name,
            table_name=tablename)
394
        cn.execute(sql)
395
396
        return table

397
    def _get_ts_table(self, cn, name):
398
        reg = self.schema.registry
399
400
        tablename = self._ts_table_name(name)
        sql = reg.select().where(reg.c.table_name == tablename)
401
        tid = cn.execute(sql).scalar()
402
        if tid:
403
            return self._table_definition_for(name)
404

405
406
    # changeset handling

407
    def _newchangeset(self, cn, author, _insertion_date=None):
408
        table = self.schema.changeset
409
410
        sql = table.insert().values(
            author=author,
411
            insertion_date=_insertion_date or datetime.now())
412
        return cn.execute(sql).inserted_primary_key[0]
413

414
415
    def _latest_csid_for(self, cn, name):
        table = self._get_ts_table(cn, name)
Aurélien Campéas's avatar
Aurélien Campéas committed
416
        sql = select([func.max(table.c.csid)])
417
        return cn.execute(sql).scalar()
418

419
    def _changeset_series(self, cn, csid):
420
        cset_serie = self.schema.changeset_series
421
422
423
        sql = select([cset_serie.c.serie]
        ).where(cset_serie.c.csid == csid)

424
        return [seriename for seriename, in cn.execute(sql).fetchall()]
425
426
427

    # insertion handling

428
    def _get_tip_id(self, cn, table):
429
        sql = select([func.max(table.c.id)])
430
        return cn.execute(sql).scalar()
431

432
433
434
    def _complete_insertion_value(self, value, extra_scalars):
        pass

435
    def _finalize_insertion(self, cn, csid, name):
436
        table = self.schema.changeset_series
437
438
439
440
        sql = table.insert().values(
            csid=csid,
            serie=name
        )
441
        cn.execute(sql)
442

443
444
    # snapshot handling

445
446
    def _purge_snapshot_at(self, cn, table, diffid):
        cn.execute(
447
448
449
450
451
            table.update(
            ).where(table.c.id == diffid
            ).values(snapshot=None)
        )

452
    def _validate_type(self, oldts, newts, name):
453
454
455
        if (oldts is None or
            oldts.isnull().all() or
            newts.isnull().all()):
456
457
458
459
460
461
462
463
            return
        old_type = oldts.dtype
        new_type = newts.dtype
        if new_type != old_type:
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
                name, new_type, old_type)
            raise Exception(m)

464
465
    def _compute_diff_and_newsnapshot(self, cn, table, newts, **extra_scalars):
        snapshot = self._build_snapshot_upto(cn, table)
466
        self._validate_type(snapshot, newts, table.name)
467
468
469
470
471
472
473
474
475
        diff = self._compute_diff(snapshot, newts)

        if len(diff) == 0:
            return None, None

        # full state computation & insertion
        newsnapshot = self._apply_diff(snapshot, diff)
        return diff, newsnapshot

476
477
    def _find_snapshot(self, cn, table, qfilter=(), column='snapshot',
                       from_value_date=None, to_value_date=None):
478
        cset = self.schema.changeset
479
480
481
        sql = select([table.c.id, table.c[column]]
        ).order_by(desc(table.c.id)
        ).limit(1
482
        ).where(table.c[column] != None)
483
484

        if qfilter:
485
            sql = sql.where(table.c.csid <= cset.c.id)
486
487
488
489
            for filtercb in qfilter:
                sql = sql.where(filtercb(cset, table))

        try:
490
            snapid, snapdata = cn.execute(sql).fetchone()
491
492
            snapdata = subset(self._deserialize(snapdata, table.name),
                              from_value_date, to_value_date)
493
494
        except TypeError:
            return None, None
495
        return snapid, snapdata
496

497
498
    def _build_snapshot_upto(self, cn, table, qfilter=(),
                             from_value_date=None, to_value_date=None):
499
        snapid, snapshot = self._find_snapshot(cn, table, qfilter)
500
501
502
        if snapid is None:
            return None

503
        cset = self.schema.changeset
504
        sql = select([table.c.id,
505
                      table.c.diff,
506
                      table.c.parent,
507
508
                      cset.c.insertion_date]
        ).order_by(table.c.id
509
        ).where(table.c.id > snapid)
510

511
512
513
514
        if qfilter:
            sql = sql.where(table.c.csid == cset.c.id)
            for filtercb in qfilter:
                sql = sql.where(filtercb(cset, table))
515

516
        alldiffs = pd.read_sql(sql, cn)
517
518

        if len(alldiffs) == 0:
519
            return snapshot
520

521
        # initial ts
522
        ts = self._deserialize(alldiffs.loc[0, 'diff'], table.name)
523
        for row in alldiffs.loc[1:].itertuples():
524
525
            diff = subset(self._deserialize(row.diff, table.name),
                          from_value_date, to_value_date)
526
            ts = self._apply_diff(ts, diff)
527
        ts = self._apply_diff(snapshot, ts)
528
529
        assert ts.index.dtype.name == 'datetime64[ns]' or len(ts) == 0
        return ts
530
531
532

    # diff handling

533
534
    def _diff(self, cn, csetid, name):
        table = self._get_ts_table(cn, name)
535
        cset = self.schema.changeset
536
537
538
539
540
541

        def filtercset(sql):
            return sql.where(table.c.csid == cset.c.id
            ).where(cset.c.id == csetid)

        sql = filtercset(select([table.c.id]))
542
        tsid = cn.execute(sql).scalar()
543
544
545
546
547
548
549

        if tsid == 1:
            sql = select([table.c.snapshot])
        else:
            sql = select([table.c.diff])
        sql = filtercset(sql)

550
        return self._deserialize(cn.execute(sql).scalar(), name)
551

552
    def _compute_diff(self, fromts, tots):
553
554
        """Compute the difference between fromts and tots
        (like in tots - fromts).
555
556

        """
557
        if fromts is None:
558
            return tots
559
560
561
        fromts = fromts[~fromts.isnull()]
        if not len(fromts):
            return tots
Aurélien Campéas's avatar
Aurélien Campéas committed
562

563
564
565
566
567
        mask_overlap = tots.index.isin(fromts.index)
        fromts_overlap = fromts[tots.index[mask_overlap]]
        tots_overlap = tots[mask_overlap]

        if fromts.dtype == 'float64':
568
            mask_equal = np.isclose(fromts_overlap, tots_overlap,
569
                                    rtol=0, atol=self._precision)
570
571
572
        else:
            mask_equal = fromts_overlap == tots_overlap

573
574
575
        mask_na_equal = fromts_overlap.isnull() & tots_overlap.isnull()
        mask_equal = mask_equal | mask_na_equal

576
577
        diff_overlap = tots[mask_overlap][~mask_equal]
        diff_new = tots[~mask_overlap]
578
        diff_new = diff_new[~diff_new.isnull()]
579
        return pd.concat([diff_overlap, diff_new])
580
581
582

    def _apply_diff(self, base_ts, new_ts):
        """Produce a new ts using base_ts as a base and taking any
583
        intersecting and new values from new_ts.
584
585
586
587
588
589
590
591
592
593

        """
        if base_ts is None:
            return new_ts
        if new_ts is None:
            return base_ts
        result_ts = pd.Series([0.0], index=base_ts.index.union(new_ts.index))
        result_ts[base_ts.index] = base_ts
        result_ts[new_ts.index] = new_ts
        result_ts.sort_index(inplace=True)
594
        result_ts.name = base_ts.name
595
        return result_ts