tsio.py 15.5 KB
Newer Older
1
from datetime import datetime
2
from contextlib import contextmanager
3
import logging
4
5
6
7

import pandas as pd
import numpy as np

8
from sqlalchemy import Table, Column, Integer, ForeignKey
Aurélien Campéas's avatar
Aurélien Campéas committed
9
from sqlalchemy.sql.expression import select, func, desc
10
from sqlalchemy.dialects.postgresql import JSONB
11

12
from tshistory import schema
13
14


15
16
17
18
19
20
def setuplogging():
    logger = logging.getLogger('tshistory.tsio')
    logger.addHandler(logging.StreamHandler())
    logger.setLevel(logging.INFO)
    return logger

Aurélien Campéas's avatar
Aurélien Campéas committed
21

22
23
24
L = setuplogging()


25
26
27
28
def tojson(ts):
    if ts is None:
        return None

29
30
    if not isinstance(ts.index, pd.MultiIndex):
        return ts.to_json(date_format='iso')
31

32
33
34
    # multi index case
    return ts.to_frame().reset_index().to_json(date_format='iso')

Aurélien Campéas's avatar
Aurélien Campéas committed
35

36
37
38
39
40
def num2float(pdobj):
    # get a Series or a Dataframe column
    if str(pdobj.dtype).startswith('int'):
        return pdobj.astype('float64')
    return pdobj
41

Aurélien Campéas's avatar
Aurélien Campéas committed
42

43
def fromjson(jsonb, tsname):
44
45
46
47
    return _fromjson(jsonb, tsname).fillna(value=np.nan)


def _fromjson(jsonb, tsname):
48
49
50
    if jsonb == '{}':
        return pd.Series(name=tsname)

51
52
    result = pd.read_json(jsonb, typ='series', dtype=False)
    if isinstance(result.index, pd.DatetimeIndex):
53
        result = num2float(result)
54
55
56
57
58
59
60
61
        return result

    # multi index case
    columns = result.index.values.tolist()
    columns.remove(tsname)
    result = pd.read_json(jsonb, typ='frame',
                          convert_dates=columns)
    result.set_index(sorted(columns), inplace=True)
Aurélien Campéas's avatar
Aurélien Campéas committed
62
    return num2float(result.iloc[:, 0])  # get a Series object
63
64


65
class TimeSerie(object):
66
    _csid = None
67
    _snapshot_interval = 10
68
    _precision = 1e-14
69
70
71

    # API : changeset, insert, get, delete
    @contextmanager
72
    def newchangeset(self, cn, author, _insertion_date=None):
73
74
75
76
77
78
        """A context manager to allow insertion of several series within the
        same changeset identifier

        This allows to group changes to several series, hence
        producing a macro-change.

79
80
        _insertion_date is *only* provided for migration purposes and
        not part of the API.
81
        """
82
        assert self._csid is None
83
        self._csid = self._newchangeset(cn, author, _insertion_date)
84
85
86
        yield
        del self._csid

87
    def insert(self, cn, newts, name, author=None,
88
               extra_scalars={}):
89
        """Create a new revision of a given time series
90

91
        newts: pandas.Series with date index
92

93
        name: str unique identifier of the serie
94
95
96
97

        author: str free-form author name (mandatory, unless provided
        to the newchangeset context manager).

98
        """
99
100
        assert self._csid or author, 'author is mandatory'
        if self._csid and author:
101
            L.info('author will not be used when in a changeset')
102
        assert isinstance(newts, pd.Series)
103
        assert not newts.index.duplicated().any()
104

105
        newts = num2float(newts)
106

107
        if not len(newts):
108
            return
109

110
        newts.name = name
111
        table = self._get_ts_table(cn, name)
112

113
114
115
116
        if isinstance(newts.index, pd.MultiIndex):
            # we impose an order to survive rountrips
            newts = newts.reorder_levels(sorted(newts.index.names))

117
118
        if table is None:
            # initial insertion
119
120
            if newts.isnull().all():
                return None
121
            newts = newts[~newts.isnull()]
122
123
            table = self._make_ts_table(cn, name)
            csid = self._csid or self._newchangeset(cn, author)
124
            value = {
125
                'csid': csid,
126
                'snapshot': tojson(newts),
127
            }
128
129
            # callback for extenders
            self._complete_insertion_value(value, extra_scalars)
130
131
            cn.execute(table.insert().values(value))
            self._finalize_insertion(cn, csid, name)
Aurélien Campéas's avatar
Aurélien Campéas committed
132
            L.info('First insertion of %s by %s', name, author)
133
            return newts
134

135
        diff, newsnapshot = self._compute_diff_and_newsnapshot(
136
            cn, table, newts, **extra_scalars
137
138
        )
        if diff is None:
139
            L.info('No difference in %s by %s', name, author)
140
141
            return

142
143
        tip_id = self._get_tip_id(cn, table)
        csid = self._csid or self._newchangeset(cn, author)
144
        value = {
145
            'csid': csid,
146
            'diff': tojson(diff),
147
148
149
150
151
            'snapshot': tojson(newsnapshot),
            'parent': tip_id,
        }
        # callback for extenders
        self._complete_insertion_value(value, extra_scalars)
152
153
        cn.execute(table.insert().values(value))
        self._finalize_insertion(cn, csid, name)
154

155
        if tip_id > 1 and tip_id % self._snapshot_interval:
156
            self._purge_snapshot_at(cn, table, tip_id)
Aurélien Campéas's avatar
Aurélien Campéas committed
157
        L.info('Inserted diff for ts %s by %s', name, author)
158
        return diff
159

160
    def get(self, cn, name, revision_date=None):
161
162
163
164
165
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

166
        """
167
        table = self._get_ts_table(cn, name)
168
169
        if table is None:
            return
170

171
172
173
        qfilter = []
        if revision_date:
            qfilter.append(lambda cset, _: cset.c.insertion_date <= revision_date)
174
        current = self._build_snapshot_upto(cn, table, qfilter)
175

176
177
        if current is not None:
            current.name = name
178
            current = current[~current.isnull()]
179
        return current
180

181
182
    def get_group(self, cn, name, revision_date=None):
        csid = self._latest_csid_for(cn, name)
183
184

        group = {}
185
186
        for seriename in self._changeset_series(cn, csid):
            serie = self.get(cn, seriename, revision_date)
187
188
189
190
            if serie is not None:
                group[seriename] = serie
        return group

191
192
193
    def exists(self, cn, name):
        return self._get_ts_table(cn, name) is not None

194
    def latest_insertion_date(self, cn, name):
195
        cset = schema.changeset
196
        tstable = self._get_ts_table(cn, name)
197
198
        sql = select([func.max(cset.c.insertion_date)]
        ).where(tstable.c.csid == cset.c.id)
199
        return cn.execute(sql).scalar()
200

201
    def info(self, cn):
202
203
204
        """Gather global statistics on the current tshistory repository
        """
        sql = 'select count(*) from registry'
205
        stats = {'series count': cn.execute(sql).scalar()}
206
        sql = 'select max(id) from changeset'
207
        stats['changeset count'] = cn.execute(sql).scalar()
208
        sql = 'select distinct name from registry order by name'
209
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
210
211
        return stats

212
    def log(self, cn, limit=0, diff=False, names=None, authors=None, fromrev=None, torev=None):
213
214
215
216
217
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
        cset, cset_series, reg = schema.changeset, schema.changeset_series, schema.registry
218

219
220
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date]
        ).distinct().order_by(desc(cset.c.id))
221
222
223
224

        if limit:
            sql = sql.limit(limit)

225
226
227
        if names:
            sql = sql.where(reg.c.name.in_(names))

228
229
230
        if authors:
            sql = sql.where(cset.c.author.in_(authors))

231
232
233
234
235
236
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)

        if torev:
            sql = sql.where(cset.c.id <= torev)

237
        sql = sql.where(cset.c.id == cset_series.c.csid
238
239
        ).where(cset_series.c.serie == reg.c.name)

240
        rset = cn.execute(sql)
241
242
        for csetid, author, revdate in rset.fetchall():
            log.append({'rev': csetid, 'author': author, 'date': revdate,
243
                        'names': self._changeset_series(cn, csetid)})
244
245
246

        if diff:
            for rev in log:
247
                rev['diff'] = {name: self._diff(cn, rev['rev'], name)
248
249
                               for name in rev['names']}

250
        log.sort(key=lambda rev: rev['rev'])
251
252
        return log

253
254
    # /API
    # Helpers
255

256
    # serie table handling
257

258
259
260
    def _ts_table_name(self, seriename):
        # namespace.seriename
        return 'timeserie.%s' % seriename
261

262
    def _table_definition_for(self, seriename):
263
        return Table(
264
            seriename, schema.meta,
265
            Column('id', Integer, primary_key=True),
266
            Column('csid', Integer, ForeignKey('changeset.id'),
267
                   index=True, nullable=False),
268
            # constraint: there is either .diff or .snapshot
269
270
            Column('diff', JSONB(none_as_null=True)),
            Column('snapshot', JSONB(none_as_null=True)),
271
272
            Column('parent',
                   Integer,
273
                   ForeignKey('timeserie.%s.id' % seriename,
274
                              ondelete='cascade'),
275
276
277
                   nullable=True,
                   unique=True,
                   index=True),
278
279
            schema='timeserie',
            extend_existing=True
280
281
        )

282
    def _make_ts_table(self, cn, name):
283
        tablename = self._ts_table_name(name)
284
        table = self._table_definition_for(name)
285
        table.create(cn)
286
        sql = schema.registry.insert().values(
287
288
            name=name,
            table_name=tablename)
289
        cn.execute(sql)
290
291
        return table

292
    def _get_ts_table(self, cn, name):
293
        reg = schema.registry
294
295
        tablename = self._ts_table_name(name)
        sql = reg.select().where(reg.c.table_name == tablename)
296
        tid = cn.execute(sql).scalar()
297
        if tid:
298
            return self._table_definition_for(name)
299

300
301
    # changeset handling

302
    def _newchangeset(self, cn, author, _insertion_date=None):
303
        table = schema.changeset
304
305
        sql = table.insert().values(
            author=author,
306
            insertion_date=_insertion_date or datetime.now())
307
        return cn.execute(sql).inserted_primary_key[0]
308

309
310
    def _latest_csid_for(self, cn, name):
        table = self._get_ts_table(cn, name)
Aurélien Campéas's avatar
Aurélien Campéas committed
311
        sql = select([func.max(table.c.csid)])
312
        return cn.execute(sql).scalar()
313

314
    def _changeset_series(self, cn, csid):
315
        cset_serie = schema.changeset_series
316
317
318
        sql = select([cset_serie.c.serie]
        ).where(cset_serie.c.csid == csid)

319
        return [seriename for seriename, in cn.execute(sql).fetchall()]
320
321
322

    # insertion handling

323
    def _get_tip_id(self, cn, table):
324
        sql = select([func.max(table.c.id)])
325
        return cn.execute(sql).scalar()
326

327
328
329
    def _complete_insertion_value(self, value, extra_scalars):
        pass

330
    def _finalize_insertion(self, cn, csid, name):
331
        table = schema.changeset_series
332
333
334
335
        sql = table.insert().values(
            csid=csid,
            serie=name
        )
336
        cn.execute(sql)
337

338
339
    # snapshot handling

340
341
    def _purge_snapshot_at(self, cn, table, diffid):
        cn.execute(
342
343
344
345
346
            table.update(
            ).where(table.c.id == diffid
            ).values(snapshot=None)
        )

347
    def _validate_type(self, oldts, newts, name):
348
349
350
        if (oldts is None or
            oldts.isnull().all() or
            newts.isnull().all()):
351
352
353
354
355
356
357
358
            return
        old_type = oldts.dtype
        new_type = newts.dtype
        if new_type != old_type:
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
                name, new_type, old_type)
            raise Exception(m)

359
360
    def _compute_diff_and_newsnapshot(self, cn, table, newts, **extra_scalars):
        snapshot = self._build_snapshot_upto(cn, table)
361
        self._validate_type(snapshot, newts, table.name)
362
363
364
365
366
367
368
369
370
        diff = self._compute_diff(snapshot, newts)

        if len(diff) == 0:
            return None, None

        # full state computation & insertion
        newsnapshot = self._apply_diff(snapshot, diff)
        return diff, newsnapshot

371
    def _find_snapshot(self, cn, table, qfilter=(), column='snapshot'):
372
        cset = schema.changeset
373
374
375
        sql = select([table.c.id, table.c[column]]
        ).order_by(desc(table.c.id)
        ).limit(1
376
        ).where(table.c[column] != None)
377
378

        if qfilter:
379
            sql = sql.where(table.c.csid == cset.c.id)
380
381
382
383
            for filtercb in qfilter:
                sql = sql.where(filtercb(cset, table))

        try:
384
            snapid, snapdata = cn.execute(sql).fetchone()
385
386
        except TypeError:
            return None, None
387
        return snapid, fromjson(snapdata, table.name)
388

389
390
    def _build_snapshot_upto(self, cn, table, qfilter=()):
        snapid, snapshot = self._find_snapshot(cn, table, qfilter)
391
392
393
        if snapid is None:
            return None

394
        cset = schema.changeset
395
        sql = select([table.c.id,
396
                      table.c.diff,
397
                      table.c.parent,
398
399
                      cset.c.insertion_date]
        ).order_by(table.c.id
400
        ).where(table.c.id > snapid)
401

402
403
404
405
        if qfilter:
            sql = sql.where(table.c.csid == cset.c.id)
            for filtercb in qfilter:
                sql = sql.where(filtercb(cset, table))
406

407
        alldiffs = pd.read_sql(sql, cn)
408
409

        if len(alldiffs) == 0:
410
            return snapshot
411

412
        # initial ts
413
414
        ts = snapshot
        for _, row in alldiffs.iterrows():
415
            diff = fromjson(row['diff'], table.name)
416
            ts = self._apply_diff(ts, diff)
417
418
        assert ts.index.dtype.name == 'datetime64[ns]' or len(ts) == 0
        return ts
419
420
421

    # diff handling

422
423
    def _diff(self, cn, csetid, name):
        table = self._get_ts_table(cn, name)
424
425
426
427
428
429
430
        cset = schema.changeset

        def filtercset(sql):
            return sql.where(table.c.csid == cset.c.id
            ).where(cset.c.id == csetid)

        sql = filtercset(select([table.c.id]))
431
        tsid = cn.execute(sql).scalar()
432
433
434
435
436
437
438

        if tsid == 1:
            sql = select([table.c.snapshot])
        else:
            sql = select([table.c.diff])
        sql = filtercset(sql)

439
        return fromjson(cn.execute(sql).scalar(), name)
440

441
    def _compute_diff(self, fromts, tots):
442
443
        """Compute the difference between fromts and tots
        (like in tots - fromts).
444
445

        """
446
        if fromts is None:
447
            return tots
448
449
450
        fromts = fromts[~fromts.isnull()]
        if not len(fromts):
            return tots
Aurélien Campéas's avatar
Aurélien Campéas committed
451

452
453
454
455
456
        mask_overlap = tots.index.isin(fromts.index)
        fromts_overlap = fromts[tots.index[mask_overlap]]
        tots_overlap = tots[mask_overlap]

        if fromts.dtype == 'float64':
457
            mask_equal = np.isclose(fromts_overlap, tots_overlap,
458
                                    rtol=0, atol=self._precision)
459
460
461
        else:
            mask_equal = fromts_overlap == tots_overlap

462
463
464
        mask_na_equal = fromts_overlap.isnull() & tots_overlap.isnull()
        mask_equal = mask_equal | mask_na_equal

465
466
        diff_overlap = tots[mask_overlap][~mask_equal]
        diff_new = tots[~mask_overlap]
467
        diff_new = diff_new[~diff_new.isnull()]
468
        return pd.concat([diff_overlap, diff_new])
469
470
471

    def _apply_diff(self, base_ts, new_ts):
        """Produce a new ts using base_ts as a base and taking any
472
        intersecting and new values from new_ts.
473
474
475
476
477
478
479
480
481
482

        """
        if base_ts is None:
            return new_ts
        if new_ts is None:
            return base_ts
        result_ts = pd.Series([0.0], index=base_ts.index.union(new_ts.index))
        result_ts[base_ts.index] = base_ts
        result_ts[new_ts.index] = new_ts
        result_ts.sort_index(inplace=True)
483
        result_ts.name = base_ts.name
484
        return result_ts