tsio.py 27.4 KB
Newer Older
1
from datetime import datetime
2
import logging
3
4
import hashlib
import uuid
5
from threading import Lock
6
7
import json
from pathlib import Path
8
9
10

import pandas as pd

11
12
from deprecated import deprecated

13
from tshistory.schema import tsschema
14
from tshistory.util import (
15
    closed_overlaps,
16
17
    num2float,
    SeriesServices,
18
    start_end,
19
    sqlfile,
20
    tx,
21
22
    tzaware_serie
)
23
from tshistory.snapshot import Snapshot
24

25
L = logging.getLogger('tshistory.tsio')
26
SERIESSCHEMA = Path(__file__).parent / 'series.sql'
27
28


29
class timeseries(SeriesServices):
30
    namespace = 'tsh'
31
    schema = None
32
    metadatacache = None
33
34
35
36
37
38
39
40
    metakeys = {
        'tzaware',
        'index_type',
        'index_names',
        'index_dtype',
        'value_dtype',
        'value_type'
    }
41
    registry_map = None
42
    serie_tablename = None
43
    create_lock_id = None
44
    cachelock = Lock()
45
46
47

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
48
        tsschema(namespace).define()
49
        self.metadatacache = {}
50
        self.registry_map = {}
51
        self.serie_tablename = {}
52
        self.create_lock_id = sum(ord(c) for c in namespace)
53

54
    @tx
55
    def insert(self, cn, newts, seriename, author,
56
57
               metadata=None,
               _insertion_date=None):
58
        """Create a new revision of a given time series
59

60
        newts: pandas.Series with date index
61
        seriename: str unique identifier of the serie
62
        author: str free-form author name
63
        metadata: optional dict for changeset metadata
64
        """
65
66
67
68
        assert isinstance(newts, pd.Series), 'Not a pd.Series'
        assert isinstance(seriename, str), 'Name not a string'
        assert isinstance(author, str), 'Author not a string'
        assert metadata is None or isinstance(metadata, dict), 'Bad format for metadata'
69
70
        assert (_insertion_date is None or
                isinstance(_insertion_date, datetime)), 'Bad format for insertion date'
71
        assert not newts.index.duplicated().any(), 'There are some duplicates in the index'
72

73
        assert newts.index.notna().all(), 'The index contains NaT entries'
74
75
        if not newts.index.is_monotonic_increasing:
            newts = newts.sort_index()
76

77
        newts = num2float(newts)
78

79
        if not len(newts):
80
            return
81

82
        assert ('<M8[ns]' == newts.index.dtype or
83
                'datetime' in str(newts.index.dtype) and not
84
85
                isinstance(newts.index, pd.MultiIndex))

86
        newts.name = seriename
87
        tablename = self._serie_to_tablename(cn, seriename)
88

89
        if tablename is None:
90
            return self._create(cn, newts, seriename, author,
91
                                metadata, _insertion_date)
92

93
        return self._update(cn, tablename, newts, seriename, author,
94
                            metadata, _insertion_date)
95

96
    def get(self, cn, seriename, revision_date=None,
97
98
            from_value_date=None, to_value_date=None,
            _keep_nans=False):
99
100
101
102
103
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

104
        """
Aurélien Campéas's avatar
Aurélien Campéas committed
105
        if not self.exists(cn, seriename):
106
            return
107

108
        csetfilter = []
109
        if revision_date:
110
111
112
            csetfilter.append(
                f'cset.insertion_date <= \'{revision_date.isoformat()}\''
            )
113
        snap = Snapshot(cn, self, seriename)
114
        _, current = snap.find(csetfilter=csetfilter,
115
116
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)
117

118
        if current is not None and not _keep_nans:
119
            current.name = seriename
120
            current = current[~current.isnull()]
121
        return current
122

123
    def metadata(self, cn, seriename):
124
        """Return metadata dict of timeserie."""
125
126
        if seriename in self.metadatacache:
            return self.metadatacache[seriename]
127
128
129
        sql = (f'select metadata from {self.namespace}.registry '
               'where seriename = %(seriename)s')
        meta = cn.execute(sql, seriename=seriename).scalar()
130
131
        if meta is not None:
            self.metadatacache[seriename] = meta
132
133
        return meta

134
    @tx
135
    def update_metadata(self, cn, seriename, metadata, internal=False):
136
        assert isinstance(metadata, dict)
137
        assert internal or not set(metadata.keys()) & self.metakeys
138
        meta = self.metadata(cn, seriename)
139
        # remove al but internal stuff
140
141
142
143
144
        newmeta = {
            key: meta[key]
            for key in self.metakeys
            if meta.get(key) is not None
        }
145
        newmeta.update(metadata)
146
147
148
        sql = (f'update "{self.namespace}".registry as reg '
               'set metadata = %(metadata)s '
               'where reg.seriename = %(seriename)s')
149
        self.metadatacache.pop(seriename)
150
151
152
153
154
        cn.execute(
            sql,
            metadata=json.dumps(newmeta),
            seriename=seriename
        )
155

156
157
158
159
160
161
162
    def changeset_metadata(self, cn, csid):
        sql = 'select metadata from "{ns}".changeset where id = {id}'.format(
            ns=self.namespace,
            id=csid
        )
        return cn.execute(sql).scalar()

163
164
165
    def type(self, cn, name):
        return 'primary'

166
    @tx
167
168
169
170
171
172
173
174
175
    def history(self, cn, seriename,
                from_insertion_date=None,
                to_insertion_date=None,
                from_value_date=None,
                to_value_date=None,
                deltabefore=None,
                deltaafter=None,
                diffmode=False,
                _keep_nans=False):
176
177
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
178
179
            return

180
181
182
183
184
185
        revsql = [
            'select cset.id, cset.insertion_date '
            f'from "{self.namespace}".changeset as cset, '
            f'     "{self.namespace}.timeserie"."{tablename}" as ts '
            'where ts.cset = cset.id '
        ]
186
187

        if from_insertion_date:
188
            revsql.append('and cset.insertion_date >= %(from_idate)s ')
189
        if to_insertion_date:
190
            revsql.append('and cset.insertion_date <= %(to_idate)s ')
191

192
        if from_value_date or to_value_date:
193
194
            revsql.append(
                'and ' + closed_overlaps(from_value_date, to_value_date)
195
196
            )

197
198
199
        revsql.append('order by cset.id')
        revsql = ''.join(revsql)

200
        revs = cn.execute(
201
202
203
204
205
206
            revsql, {
                'fromdate': from_value_date,
                'todate': to_value_date,
                'from_idate': from_insertion_date,
                'to_idate': to_insertion_date
            }
207
        ).fetchall()
208
        if not revs:
209
            return {}
210

211
212
213
214
215
216
        if diffmode:
            # compute the previous serie value
            first_csid = revs[0][0]
            previous_csid = self._previous_cset(cn, seriename, first_csid)
            revs.insert(0, (previous_csid, None))

217
        snapshot = Snapshot(cn, self, seriename)
218
        series = []
219
220
        if (deltabefore, deltaafter) != (None, None):
            for csid, idate in revs:
221
222
                from_date = None
                to_date = None
223
                if deltabefore is not None:
224
                    from_date = idate - deltabefore
225
                if deltaafter is not None:
226
                    to_date = idate + deltaafter
227
228
                series.append((
                    idate,
229
                    snapshot.find(csetfilter=[f'cset.id = {csid}'],
230
231
                                  from_value_date=from_date,
                                  to_value_date=to_date)[1]
232
233
234
235
236
237
                ))
        else:
            series = snapshot.findall(revs,
                                      from_value_date,
                                      to_value_date
            )
238

239
240
        if diffmode:
            diffs = []
Aurélien Campéas's avatar
Aurélien Campéas committed
241
            for (_revdate_a, serie_a), (revdate_b, serie_b) in zip(series, series[1:]):
242
243
244
245
246
247
248
                if serie_a is None:
                    # when we scan the entirety of the history: there exists no "previous" serie
                    # we therefore consider the first serie as a diff to the "null" serie
                    diffs.append((revdate_b, serie_b))
                else:
                    diffs.append((revdate_b, self.diff(serie_a, serie_b)))
            series = diffs
249
250
        else:
            series = [
251
                (idate, ts if _keep_nans else ts.dropna() )
252
253
                 for idate, ts in series
            ]
254

255
        return {
256
            pd.Timestamp(idate).astimezone('UTC'): serie
257
258
            for idate, serie in series
        }
259

260
    @tx
261
    def staircase(self, cn, seriename, delta,
262
263
                  from_value_date=None,
                  to_value_date=None):
264
265
266
267
        """ compute a series whose value dates are bounded to be
        `delta` time after the insertion dates and where we
        keep the most recent ones
        """
268
        histo = self.get_history(
269
270
            cn, seriename, deltabefore=-delta,
            from_value_date=from_value_date,
271
272
            to_value_date=to_value_date,
            _keep_nans=True
273
        )
274
275
        if histo is None:
            return None
276

277
278
279
280
281
282
283
284
        vimap = {}
        vvmap = {}
        for idate, series in histo.items():
            for vdate, value in series.iteritems():
                if vdate not in vimap or vimap[vdate] < idate:
                    vimap[vdate] = idate
                    vvmap[vdate] = value

285
        ts = pd.Series(vvmap).sort_index().loc[from_value_date:to_value_date]
286
        ts.name = seriename
287
        return ts.dropna()
288

289
    def exists(self, cn, seriename):
290
        return self._serie_to_tablename(cn, seriename) is not None
291

292
    def latest_insertion_date(self, cn, seriename):
293
294
295
296
297
        tablename = self._serie_to_tablename(cn, seriename)
        sql = ('select max(insertion_date) '
               f'from "{self.namespace}".changeset as cset, '
               f'     "{self.namespace}.timeserie"."{tablename}" as tstable '
               'where cset.id = tstable.cset')
298
299
300
        return pd.Timestamp(
            cn.execute(sql).scalar()
        ).astimezone('UTC')
301

302
303
    def insertion_dates(self, cn, seriename,
                        fromdate=None, todate=None):
304
305
        tablename = self._serie_to_tablename(cn, seriename)
        fromclause, toclause = '', ''
306
        if fromdate:
307
            fromclause = ' and cset.insertion_date >= %(fromdate)s '
308
        if todate:
309
310
311
312
313
314
315
            toclause = ' and cset.insertion_date <= %(todate)s '
        sql = ('select insertion_date '
               f'from "{self.namespace}".changeset as cset, '
               f'     "{self.namespace}.timeserie"."{tablename}" as tstable '
               'where cset.id = tstable.cset '
               f'{fromclause} {toclause}'
               'order by cset.id')
316

317
318
        return [
            pd.Timestamp(idate).astimezone('UTC')
319
320
321
            for idate, in cn.execute(
                    sql, fromdate=fromdate, todate=todate
            ).fetchall()
322
323
        ]

324
325
326
327
    def last_id(self, cn, seriename):
        snapshot = Snapshot(cn, self, seriename)
        return snapshot.last_id()

328
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
329
330
331
332
333
334
335
336
337
338
339
340
341
        operators = {
            'strict': '=',
            'before': '<=',
            'after': '>='
        }
        tablename = self._serie_to_tablename(cn, seriename)
        assert mode in operators
        sql = ('select cset '
               f'from "{self.namespace}.timeserie"."{tablename}" as tstable, '
               f'      "{self.namespace}".changeset as cset '
               f'where cset.id = tstable.cset '
               f'and   cset.insertion_date {operators[mode]} %(revdate)s')
        return cn.execute(sql, revdate=revdate).scalar()
342

343
344
    @tx
    def rename(self, cn, oldname, newname):
345
346
347
348
        sql = (f'update "{self.namespace}".registry '
               'set seriename = %(newname)s '
               'where seriename = %(oldname)s')
        cn.execute(sql, oldname=oldname, newname=newname)
349
350
        self._resetcaches()

351
    @tx
352
    def delete(self, cn, seriename):
353
354
355
        if not self.exists(cn, seriename):
            print('not deleting unknown series', seriename, self.namespace)
            return
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
        # changeset will keep ghost entries
        # we cleanup changeset series, then registry
        # then we drop the two remaining tables
        # cn *must* be a transaction scope
        rid, tablename = cn.execute(
            'select id, table_name from "{}".registry '
            'where seriename = %(seriename)s'.format(self.namespace),
            seriename=seriename
        ).fetchone()
        # drop series tables
        cn.execute(
            'drop table "{}.timeserie"."{}" cascade'.format(self.namespace, tablename)
        )
        cn.execute(
            'drop table "{}.snapshot"."{}" cascade'.format(self.namespace, tablename)
        )
        # cleanup changesets table
        cn.execute('with csets as ('
                   ' select cset from "{ns}".changeset_series '
                   ' where serie = %(rid)s'
                   ') '
                   'delete from "{ns}".changeset as cset using csets '
                   'where cset.id = csets.cset'.format(ns=self.namespace),
                   rid=rid
        )
        cn.execute('delete from "{}".registry '
                   'where id = %(rid)s'.format(self.namespace),
                   rid=rid)
        # -> this will transitively cleanup state changeset_series entries
        self._resetcaches()
386
        print('deleted', seriename, self.namespace)
387

388
    @tx
389
390
391
392
393
394
395
    def strip(self, cn, seriename, csid):
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs

        # put stripping info in the metadata
        for log in logs:
            # update changeset.metadata
396
            metadata = self.changeset_metadata(cn, log['rev']) or {}
397
            metadata['tshistory.info'] = 'got stripped from {}'.format(csid)
398
399
400
401
            sql = (f'update "{self.namespace}".changeset '
                   'set metadata = %(metadata)s '
                   'where id = %(id)s')
            cn.execute(sql, id=log['rev'], metadata=json.dumps(metadata))
402
            # delete changset_serie item
403
404
405
406
407
408
409
            sql = (f'delete from "{self.namespace}".changeset_series as css '
                   'where css.cset = %(rev)s '
                   'and   css.serie = %(name)s')
            cn.execute(
                sql,
                rev=log['rev'],
                name=self._name_to_regid(cn, seriename)
410
411
412
            )

        # wipe the diffs
413
414
415
416
        tablename = self._serie_to_tablename(cn, seriename)
        sql = (f'delete from "{self.namespace}.timeserie"."{tablename}" '
               'where cset >= %(csid)s')
        cn.execute(sql, csid=csid)
417

418
    def info(self, cn):
419
420
        """Gather global statistics on the current tshistory repository
        """
421
        sql = 'select count(*) from "{}".registry'.format(self.namespace)
422
        stats = {'series count': cn.execute(sql).scalar()}
423
        sql = 'select max(id) from "{}".changeset'.format(self.namespace)
424
        stats['changeset count'] = cn.execute(sql).scalar()
425
426
427
        sql = 'select distinct seriename from "{}".registry order by seriename'.format(
            self.namespace
        )
428
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
429
430
        return stats

431
    def log(self, cn, limit=0, names=None, authors=None,
432
            stripped=False,
433
434
            fromrev=None, torev=None,
            fromdate=None, todate=None):
435
436
437
438
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
439

440
441
442
443
444
445
446
447
448
449
450
451
452
        sql = [
            'select distinct cset.id, cset.author, cset.insertion_date, cset.metadata '
            f'from "{self.namespace}".changeset as cset, '
            f'     "{self.namespace}".registry as reg, '
            f'     "{self.namespace}".changeset_series as css '
        ]
        wheres = []
        if stripped:
            sql.append(f'left join "{self.namespace}".changeset as cset2 '
                       'on (cset2.id = css.cset) ')
        else:
            wheres.append('cset.id = css.cset and '
                          'css.serie = reg.id ')
453

454
        if names:
455
456
457
458
            # XXX check names exist
            wheres.append('reg.seriename in (%s)' % ','.join(
                repr(name) for name in names)
            )
459
        if authors:
460
461
462
            wheres.append('cset.author in (%s)' % ','.join(
                repr(auth) for auth in authors)
            )
463
        if fromrev:
464
            wheres.append('cset.id >= %(fromrev)s')
465
        if torev:
466
            wheres.append('cset.id <= %(torev)s')
467
        if fromdate:
468
            wheres.append('cset.insertion_date >= %(fromdate)s')
469
        if todate:
470
471
472
473
474
475
476
477
            wheres.append('cset.insertion_date <= %(todate)s')

        sql.append('where ' + ' and '.join(wheres))
        if limit:
            sql.append('limit %(limit)s ')
        sql.append('order by cset.id desc')

        sql = ''.join(sql)
478

479
480
481
482
483
484
        rset = cn.execute(sql, {
            'fromdate': fromdate,
            'todate': todate,
            'fromrev': fromrev,
            'torev': torev
        })
485
        for csetid, author, revdate, meta in rset.fetchall():
486
487
            log.append({'rev': csetid, 'author': author,
                        'date': pd.Timestamp(revdate, tz='utc'),
488
                        'meta': meta or {},
489
                        'names': self._changeset_series(cn, csetid)})
490

491
        log.sort(key=lambda rev: rev['rev'])
492
493
        return log

494
    def interval(self, cn, seriename, notz=False):
495
496
497
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            raise ValueError(f'no such serie: {seriename}')
498
        sql = (f'select tsstart, tsend '
499
500
501
               f'from "{self.namespace}.timeserie"."{tablename}" '
               f'order by cset desc limit 1')
        res = cn.execute(sql).fetchone()
502
        start, end = res.tsstart, res.tsend
503
        if self.metadata(cn, seriename).get('tzaware') and not notz:
504
505
506
            start, end = pd.Timestamp(start, tz='UTC'), pd.Timestamp(end, tz='UTC')
        return pd.Interval(left=start, right=end, closed='both')

507
508
    # /API
    # Helpers
509

510
511
    # creation / update

512
    def _create(self, cn, newts, seriename, author,
513
                metadata=None, insertion_date=None):
514
        start, end = start_end(newts, notz=False)
515
516
517
518
        if start is None:
            assert end is None
            # this is just full of nans
            return None
519
520
521
522
523
        # chop off unwanted nans
        newts = newts.loc[start:end]
        if len(newts) == 0:
            return None

524
525
526
527
528
        # at creation time we take an exclusive lock to avoid
        # a deadlock on created tables against the changeset-series fk
        cn.execute(
            'select pg_advisory_xact_lock({})'.format(self.create_lock_id)
        )
529
        self._register_serie(cn, seriename, newts)
530
        snapshot = Snapshot(cn, self, seriename)
531
        csid = self._newchangeset(cn, author, insertion_date, metadata)
532
        head = snapshot.create(newts)
533
        start, end = start_end(newts)
534
535
536
537
538
539
        tablename = self._make_ts_table(cn, seriename)
        sql = (f'insert into "{self.namespace}.timeserie"."{tablename}" '
               '(cset, snapshot, tsstart, tsend) '
               f'values (%s, %s, %s, %s)')
        cn.execute(sql, csid, head,
                   start.to_pydatetime(), end.to_pydatetime())
540
        self._finalize_insertion(cn, csid, seriename)
541
        L.info('first insertion of %s (size=%s) by %s',
542
               seriename, len(newts), author)
543
544
        return newts

545
    def _update(self, cn, tablename, newts, seriename, author,
546
                metadata=None, insertion_date=None):
547
548
        self._validate(cn, newts, seriename)
        snapshot = Snapshot(cn, self, seriename)
549
550
551
        diff = self.diff(snapshot.last(newts.index.min(),
                                       newts.index.max()),
                         newts)
552
553
        if not len(diff):
            L.info('no difference in %s by %s (for ts of size %s)',
554
                   seriename, author, len(newts))
555
556
            return

557
        # compute series start/end stamps
558
        tsstart, tsend = start_end(newts)
559
560
561
        ival = self.interval(cn, seriename, notz=True)
        start = min(tsstart or ival.left, ival.left)
        end = max(tsend or ival.right, ival.right)
562
563
564
565
566
567
568
569
570
571
572
573
574
575

        if pd.isnull(diff[0]) or pd.isnull(diff[-1]):
            # we *might* be shrinking, let's look at the full series
            # and yes, shrinkers have a slow path
            last = snapshot.last()
            patched = self.patch(last, diff).dropna()
            if not len(patched):
                raise ValueError('complete erasure of a series is forbidden')
            if pd.isnull(diff[0]):
                start = patched.index[0]
            if pd.isnull(diff[-1]):
                end = patched.index[-1]

        csid = self._newchangeset(cn, author, insertion_date, metadata)
576
        head = snapshot.update(diff)
577
578
579
580
        sql = (f'insert into "{self.namespace}.timeserie"."{tablename}" '
               '(cset, snapshot, tsstart, tsend) '
               'values (%s, %s, %s, %s)')
        cn.execute(sql, csid, head, start, end)
581
        self._finalize_insertion(cn, csid, seriename)
582
583

        L.info('inserted diff (size=%s) for ts %s by %s',
584
               len(diff), seriename, author)
585
586
        return diff

587
    # serie table handling
588

589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
    def _make_tablename(self, cn, seriename):
        """ compute the unqualified (no namespace) table name
        from a serie name, to allow arbitrary serie names
        """
        # postgresql table names are limited to 63 chars.
        if len(seriename) > 63:
            seriename = hashlib.sha1(seriename.encode('utf-8')).hexdigest()

        # collision detection (collision can happen after a rename)
        if cn.execute(f'select table_name '
                      f'from "{self.namespace}".registry '
                      f'where table_name = %(seriename)s',
                      seriename=seriename).scalar():
            return str(uuid.uuid4())

        return seriename

606
607
608
609
    def _serie_to_tablename(self, cn, seriename):
        tablename = self.serie_tablename.get(seriename)
        if tablename is not None:
            return tablename
610

611
612
613
614
615
        tablename = cn.execute(
            f'select table_name from "{self.namespace}".registry '
            f'where seriename = %(seriename)s',
            seriename=seriename
        ).scalar()
616
617
618
619
620
621
622
623
624
625
        if tablename is None:
            # creation time
            return
        self.serie_tablename[seriename] = tablename
        return tablename

    def _table_definition_for(self, cn, seriename):
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            # creation time
Aurélien Campéas's avatar
Aurélien Campéas committed
626
            tablename = self._make_tablename(cn, seriename)
627
628
629
630
631
632
        table = sqlfile(
            SERIESSCHEMA,
            namespace=self.namespace,
            tablename=tablename
        )
        return table, tablename
633

Aurélien Campéas's avatar
Aurélien Campéas committed
634
    def _make_ts_table(self, cn, seriename):
635
636
637
        table, tablename = self._table_definition_for(cn, seriename)
        cn.execute(table)
        return tablename
638
639

    def _register_serie(self, cn, seriename, ts):
640
641
        index = ts.index
        inames = [name for name in index.names if name]
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
        sql = (f'insert into {self.namespace}.registry '
               '(seriename, table_name, metadata) '
               'values (%s, %s, %s) '
               'returning id')
        table_name = self._make_tablename(cn, seriename)
        metadata = json.dumps({
            'tzaware': tzaware_serie(ts),
            'index_type': index.dtype.name,
            'index_names': inames,
            'index_dtype': index.dtype.str,
            'value_dtype': ts.dtypes.str,
            'value_type': ts.dtypes.name
        })
        regid = cn.execute(
            sql,
            seriename,
            table_name,
            metadata
        ).scalar()
661
        self.registry_map[seriename] = regid
662

663
    def _get_ts_table(self, cn, seriename):
664
665
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename:
Aurélien Campéas's avatar
Aurélien Campéas committed
666
            return self._table_definition_for(cn, seriename)
667

668
669
    # changeset handling

670
671
672
673
    def _newchangeset(self, cn, author, insertion_date=None, metadata=None):
        if insertion_date is not None:
            assert insertion_date.tzinfo is not None
        idate = pd.Timestamp(insertion_date or datetime.utcnow(), tz='UTC')
674
675
676
677
678
679
680
681
682
683
684
685
        sql = (f'insert into {self.namespace}.changeset '
               '(author, metadata, insertion_date) '
               'values (%s, %s, %s) '
               'returning id')
        if metadata:
            metadata = json.dumps(metadata)
        return cn.execute(
            sql,
            author,
            metadata,
            idate
        ).scalar()
686

687
    def _changeset_series(self, cn, csid):
688
689
690
691
692
        sql = ('select seriename '
               f'from "{self.namespace}".registry as reg, '
               f'     "{self.namespace}".changeset_series as css '
               'where css.serie = reg.id '
               'and   css.cset = %(csid)s')
693

694
        return [
695
            row.seriename
696
            for row in cn.execute(sql, csid=csid).fetchall()
697
        ]
698

699
700
701
702
703
704
705
    def _previous_cset(self, cn, seriename, csid):
        tablename = self._serie_to_tablename(cn, seriename)
        sql = ('select cset from "{}.timeserie"."{}" '
               'where cset < %(csid)s '
               'order by cset desc limit 1').format(self.namespace, tablename)
        return cn.execute(sql, csid=csid).scalar()

706
707
    # insertion handling

708
    def _validate(self, cn, ts, seriename):
709
710
        if ts.isnull().all():
            # ts erasure
711
            return
712
        tstype = ts.dtype
713
        meta = self.metadata(cn, seriename)
714
        if tstype != meta['value_type']:
715
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
716
                seriename, tstype, meta['value_type'])
717
            raise Exception(m)
718
        if ts.index.dtype.name != meta['index_type']:
719
            raise Exception('Incompatible index types')
720

721
722
    def _name_to_regid(self, cn, seriename):
        regid = self.registry_map.get(seriename)
723
724
725
        if regid is not None:
            return regid

726
727
728
729
730
731
732
        sql = ('select id '
               f'from "{self.namespace}".registry '
               'where seriename = %(seriename)s')
        regid = self.registry_map[seriename] = cn.execute(
            sql,
            seriename=seriename
        ).scalar()
733
734
        return regid

735
    def _finalize_insertion(self, cn, csid, seriename):
736
737
738
739
        sql = (f'insert into "{self.namespace}".changeset_series '
               '(cset, serie) '
               'values (%s, %s)')
        cn.execute(sql, csid, self._name_to_regid(cn, seriename))
740

741
    def _resetcaches(self):
742
743
744
745
        with self.cachelock:
            self.metadatacache.clear()
            self.registry_map.clear()
            self.serie_tablename.clear()
746
747
748
749
750
751
752
753



@deprecated(reason='use the `timeseries` object instead')
class TimeSerie(timeseries):

    get_history = deprecated(timeseries.history)
    get_delta = deprecated(timeseries.staircase)