tsio.py 27.2 KB
Newer Older
1
from datetime import datetime
2
import logging
3
4
import hashlib
import uuid
5
from threading import Lock
6
7
import json
from pathlib import Path
8
9
10

import pandas as pd

11
from tshistory.schema import tsschema
12
from tshistory.util import (
13
    closed_overlaps,
14
15
    num2float,
    SeriesServices,
16
    start_end,
17
    sqlfile,
18
    tx,
19
20
    tzaware_serie
)
21
from tshistory.snapshot import Snapshot
22

23
L = logging.getLogger('tshistory.tsio')
24
SERIESSCHEMA = Path(__file__).parent / 'series.sql'
25
26


27
class TimeSerie(SeriesServices):
28
    namespace = 'tsh'
29
    schema = None
30
    metadatacache = None
31
32
33
34
35
36
37
38
    metakeys = {
        'tzaware',
        'index_type',
        'index_names',
        'index_dtype',
        'value_dtype',
        'value_type'
    }
39
    registry_map = None
40
    serie_tablename = None
41
    create_lock_id = None
42
    cachelock = Lock()
43
44
45

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
46
        tsschema(namespace).define()
47
        self.metadatacache = {}
48
        self.registry_map = {}
49
        self.serie_tablename = {}
50
        self.create_lock_id = sum(ord(c) for c in namespace)
51

52
    @tx
53
    def insert(self, cn, newts, seriename, author,
54
55
               metadata=None,
               _insertion_date=None):
56
        """Create a new revision of a given time series
57

58
        newts: pandas.Series with date index
59
        seriename: str unique identifier of the serie
60
        author: str free-form author name
61
        metadata: optional dict for changeset metadata
62
        """
63
64
65
66
        assert isinstance(newts, pd.Series), 'Not a pd.Series'
        assert isinstance(seriename, str), 'Name not a string'
        assert isinstance(author, str), 'Author not a string'
        assert metadata is None or isinstance(metadata, dict), 'Bad format for metadata'
67
68
        assert (_insertion_date is None or
                isinstance(_insertion_date, datetime)), 'Bad format for insertion date'
69
        assert not newts.index.duplicated().any(), 'There are some duplicates in the index'
70

71
        assert newts.index.notna().all(), 'The index contains NaT entries'
72
73
        if not newts.index.is_monotonic_increasing:
            newts = newts.sort_index()
74

75
        newts = num2float(newts)
76

77
        if not len(newts):
78
            return
79

80
        assert ('<M8[ns]' == newts.index.dtype or
81
                'datetime' in str(newts.index.dtype) and not
82
83
                isinstance(newts.index, pd.MultiIndex))

84
        newts.name = seriename
85
        tablename = self._serie_to_tablename(cn, seriename)
86

87
        if tablename is None:
88
            return self._create(cn, newts, seriename, author,
89
                                metadata, _insertion_date)
90

91
        return self._update(cn, tablename, newts, seriename, author,
92
                            metadata, _insertion_date)
93

94
    def get(self, cn, seriename, revision_date=None,
95
96
            from_value_date=None, to_value_date=None,
            _keep_nans=False):
97
98
99
100
101
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

102
        """
Aurélien Campéas's avatar
Aurélien Campéas committed
103
        if not self.exists(cn, seriename):
104
            return
105

106
        csetfilter = []
107
        if revision_date:
108
109
110
            csetfilter.append(
                f'cset.insertion_date <= \'{revision_date.isoformat()}\''
            )
111
        snap = Snapshot(cn, self, seriename)
112
        _, current = snap.find(csetfilter=csetfilter,
113
114
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)
115

116
        if current is not None and not _keep_nans:
117
            current.name = seriename
118
            current = current[~current.isnull()]
119
        return current
120

121
    def metadata(self, cn, seriename):
122
        """Return metadata dict of timeserie."""
123
124
        if seriename in self.metadatacache:
            return self.metadatacache[seriename]
125
126
127
        sql = (f'select metadata from {self.namespace}.registry '
               'where seriename = %(seriename)s')
        meta = cn.execute(sql, seriename=seriename).scalar()
128
129
        if meta is not None:
            self.metadatacache[seriename] = meta
130
131
        return meta

132
    @tx
133
    def update_metadata(self, cn, seriename, metadata, internal=False):
134
        assert isinstance(metadata, dict)
135
        assert internal or not set(metadata.keys()) & self.metakeys
136
        meta = self.metadata(cn, seriename)
137
        # remove al but internal stuff
138
139
140
141
142
        newmeta = {
            key: meta[key]
            for key in self.metakeys
            if meta.get(key) is not None
        }
143
        newmeta.update(metadata)
144
145
146
        sql = (f'update "{self.namespace}".registry as reg '
               'set metadata = %(metadata)s '
               'where reg.seriename = %(seriename)s')
147
        self.metadatacache.pop(seriename)
148
149
150
151
152
        cn.execute(
            sql,
            metadata=json.dumps(newmeta),
            seriename=seriename
        )
153

154
155
156
157
158
159
160
    def changeset_metadata(self, cn, csid):
        sql = 'select metadata from "{ns}".changeset where id = {id}'.format(
            ns=self.namespace,
            id=csid
        )
        return cn.execute(sql).scalar()

161
162
163
    def type(self, cn, name):
        return 'primary'

164
    @tx
165
    def get_history(self, cn, seriename,
166
                    from_insertion_date=None,
167
168
                    to_insertion_date=None,
                    from_value_date=None,
169
                    to_value_date=None,
170
                    deltabefore=None,
171
                    deltaafter=None,
172
173
                    diffmode=False,
                    _keep_nans=False):
174
175
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
176
177
            return

178
179
180
181
182
183
        revsql = [
            'select cset.id, cset.insertion_date '
            f'from "{self.namespace}".changeset as cset, '
            f'     "{self.namespace}.timeserie"."{tablename}" as ts '
            'where ts.cset = cset.id '
        ]
184
185

        if from_insertion_date:
186
            revsql.append('and cset.insertion_date >= %(from_idate)s ')
187
        if to_insertion_date:
188
            revsql.append('and cset.insertion_date <= %(to_idate)s ')
189

190
        if from_value_date or to_value_date:
191
192
            revsql.append(
                'and ' + closed_overlaps(from_value_date, to_value_date)
193
194
            )

195
196
197
        revsql.append('order by cset.id')
        revsql = ''.join(revsql)

198
        revs = cn.execute(
199
200
201
202
203
204
            revsql, {
                'fromdate': from_value_date,
                'todate': to_value_date,
                'from_idate': from_insertion_date,
                'to_idate': to_insertion_date
            }
205
        ).fetchall()
206
        if not revs:
207
            return {}
208

209
210
211
212
213
214
        if diffmode:
            # compute the previous serie value
            first_csid = revs[0][0]
            previous_csid = self._previous_cset(cn, seriename, first_csid)
            revs.insert(0, (previous_csid, None))

215
        snapshot = Snapshot(cn, self, seriename)
216
        series = []
217
218
        if (deltabefore, deltaafter) != (None, None):
            for csid, idate in revs:
219
220
                from_date = None
                to_date = None
221
                if deltabefore is not None:
222
                    from_date = idate - deltabefore
223
                if deltaafter is not None:
224
                    to_date = idate + deltaafter
225
226
                series.append((
                    idate,
227
                    snapshot.find(csetfilter=[f'cset.id = {csid}'],
228
229
                                  from_value_date=from_date,
                                  to_value_date=to_date)[1]
230
231
232
233
234
235
                ))
        else:
            series = snapshot.findall(revs,
                                      from_value_date,
                                      to_value_date
            )
236

237
238
        if diffmode:
            diffs = []
Aurélien Campéas's avatar
Aurélien Campéas committed
239
            for (_revdate_a, serie_a), (revdate_b, serie_b) in zip(series, series[1:]):
240
241
242
243
244
245
246
                if serie_a is None:
                    # when we scan the entirety of the history: there exists no "previous" serie
                    # we therefore consider the first serie as a diff to the "null" serie
                    diffs.append((revdate_b, serie_b))
                else:
                    diffs.append((revdate_b, self.diff(serie_a, serie_b)))
            series = diffs
247
248
        else:
            series = [
249
                (idate, ts if _keep_nans else ts.dropna() )
250
251
                 for idate, ts in series
            ]
252

253
        return {
254
            pd.Timestamp(idate).astimezone('UTC'): serie
255
256
            for idate, serie in series
        }
257

258
    @tx
259
260
261
    def get_delta(self, cn, seriename, delta,
                  from_value_date=None,
                  to_value_date=None):
262
263
264
265
        """ compute a series whose value dates are bounded to be
        `delta` time after the insertion dates and where we
        keep the most recent ones
        """
266
        histo = self.get_history(
267
268
            cn, seriename, deltabefore=-delta,
            from_value_date=from_value_date,
269
270
            to_value_date=to_value_date,
            _keep_nans=True
271
        )
272
273
        if histo is None:
            return None
274

275
276
277
278
279
280
281
282
        vimap = {}
        vvmap = {}
        for idate, series in histo.items():
            for vdate, value in series.iteritems():
                if vdate not in vimap or vimap[vdate] < idate:
                    vimap[vdate] = idate
                    vvmap[vdate] = value

283
        ts = pd.Series(vvmap).sort_index().loc[from_value_date:to_value_date]
284
        ts.name = seriename
285
        return ts.dropna()
286

287
    def exists(self, cn, seriename):
288
        return self._serie_to_tablename(cn, seriename) is not None
289

290
    def latest_insertion_date(self, cn, seriename):
291
292
293
294
295
        tablename = self._serie_to_tablename(cn, seriename)
        sql = ('select max(insertion_date) '
               f'from "{self.namespace}".changeset as cset, '
               f'     "{self.namespace}.timeserie"."{tablename}" as tstable '
               'where cset.id = tstable.cset')
296
297
298
        return pd.Timestamp(
            cn.execute(sql).scalar()
        ).astimezone('UTC')
299

300
301
    def insertion_dates(self, cn, seriename,
                        fromdate=None, todate=None):
302
303
        tablename = self._serie_to_tablename(cn, seriename)
        fromclause, toclause = '', ''
304
        if fromdate:
305
            fromclause = ' and cset.insertion_date >= %(fromdate)s '
306
        if todate:
307
308
309
310
311
312
313
            toclause = ' and cset.insertion_date <= %(todate)s '
        sql = ('select insertion_date '
               f'from "{self.namespace}".changeset as cset, '
               f'     "{self.namespace}.timeserie"."{tablename}" as tstable '
               'where cset.id = tstable.cset '
               f'{fromclause} {toclause}'
               'order by cset.id')
314

315
316
        return [
            pd.Timestamp(idate).astimezone('UTC')
317
318
319
            for idate, in cn.execute(
                    sql, fromdate=fromdate, todate=todate
            ).fetchall()
320
321
        ]

322
323
324
325
    def last_id(self, cn, seriename):
        snapshot = Snapshot(cn, self, seriename)
        return snapshot.last_id()

326
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
327
328
329
330
331
332
333
334
335
336
337
338
339
        operators = {
            'strict': '=',
            'before': '<=',
            'after': '>='
        }
        tablename = self._serie_to_tablename(cn, seriename)
        assert mode in operators
        sql = ('select cset '
               f'from "{self.namespace}.timeserie"."{tablename}" as tstable, '
               f'      "{self.namespace}".changeset as cset '
               f'where cset.id = tstable.cset '
               f'and   cset.insertion_date {operators[mode]} %(revdate)s')
        return cn.execute(sql, revdate=revdate).scalar()
340

341
342
    @tx
    def rename(self, cn, oldname, newname):
343
344
345
346
        sql = (f'update "{self.namespace}".registry '
               'set seriename = %(newname)s '
               'where seriename = %(oldname)s')
        cn.execute(sql, oldname=oldname, newname=newname)
347
348
        self._resetcaches()

349
    @tx
350
    def delete(self, cn, seriename):
351
352
353
        if not self.exists(cn, seriename):
            print('not deleting unknown series', seriename, self.namespace)
            return
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
        # changeset will keep ghost entries
        # we cleanup changeset series, then registry
        # then we drop the two remaining tables
        # cn *must* be a transaction scope
        rid, tablename = cn.execute(
            'select id, table_name from "{}".registry '
            'where seriename = %(seriename)s'.format(self.namespace),
            seriename=seriename
        ).fetchone()
        # drop series tables
        cn.execute(
            'drop table "{}.timeserie"."{}" cascade'.format(self.namespace, tablename)
        )
        cn.execute(
            'drop table "{}.snapshot"."{}" cascade'.format(self.namespace, tablename)
        )
        # cleanup changesets table
        cn.execute('with csets as ('
                   ' select cset from "{ns}".changeset_series '
                   ' where serie = %(rid)s'
                   ') '
                   'delete from "{ns}".changeset as cset using csets '
                   'where cset.id = csets.cset'.format(ns=self.namespace),
                   rid=rid
        )
        cn.execute('delete from "{}".registry '
                   'where id = %(rid)s'.format(self.namespace),
                   rid=rid)
        # -> this will transitively cleanup state changeset_series entries
        self._resetcaches()
384
        print('deleted', seriename, self.namespace)
385

386
    @tx
387
388
389
390
391
392
393
    def strip(self, cn, seriename, csid):
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs

        # put stripping info in the metadata
        for log in logs:
            # update changeset.metadata
394
            metadata = self.changeset_metadata(cn, log['rev']) or {}
395
            metadata['tshistory.info'] = 'got stripped from {}'.format(csid)
396
397
398
399
            sql = (f'update "{self.namespace}".changeset '
                   'set metadata = %(metadata)s '
                   'where id = %(id)s')
            cn.execute(sql, id=log['rev'], metadata=json.dumps(metadata))
400
            # delete changset_serie item
401
402
403
404
405
406
407
            sql = (f'delete from "{self.namespace}".changeset_series as css '
                   'where css.cset = %(rev)s '
                   'and   css.serie = %(name)s')
            cn.execute(
                sql,
                rev=log['rev'],
                name=self._name_to_regid(cn, seriename)
408
409
410
            )

        # wipe the diffs
411
412
413
414
        tablename = self._serie_to_tablename(cn, seriename)
        sql = (f'delete from "{self.namespace}.timeserie"."{tablename}" '
               'where cset >= %(csid)s')
        cn.execute(sql, csid=csid)
415

416
    def info(self, cn):
417
418
        """Gather global statistics on the current tshistory repository
        """
419
        sql = 'select count(*) from "{}".registry'.format(self.namespace)
420
        stats = {'series count': cn.execute(sql).scalar()}
421
        sql = 'select max(id) from "{}".changeset'.format(self.namespace)
422
        stats['changeset count'] = cn.execute(sql).scalar()
423
424
425
        sql = 'select distinct seriename from "{}".registry order by seriename'.format(
            self.namespace
        )
426
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
427
428
        return stats

429
    def log(self, cn, limit=0, names=None, authors=None,
430
            stripped=False,
431
432
            fromrev=None, torev=None,
            fromdate=None, todate=None):
433
434
435
436
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
437

438
439
440
441
442
443
444
445
446
447
448
449
450
        sql = [
            'select distinct cset.id, cset.author, cset.insertion_date, cset.metadata '
            f'from "{self.namespace}".changeset as cset, '
            f'     "{self.namespace}".registry as reg, '
            f'     "{self.namespace}".changeset_series as css '
        ]
        wheres = []
        if stripped:
            sql.append(f'left join "{self.namespace}".changeset as cset2 '
                       'on (cset2.id = css.cset) ')
        else:
            wheres.append('cset.id = css.cset and '
                          'css.serie = reg.id ')
451

452
        if names:
453
454
455
456
            # XXX check names exist
            wheres.append('reg.seriename in (%s)' % ','.join(
                repr(name) for name in names)
            )
457
        if authors:
458
459
460
            wheres.append('cset.author in (%s)' % ','.join(
                repr(auth) for auth in authors)
            )
461
        if fromrev:
462
            wheres.append('cset.id >= %(fromrev)s')
463
        if torev:
464
            wheres.append('cset.id <= %(torev)s')
465
        if fromdate:
466
            wheres.append('cset.insertion_date >= %(fromdate)s')
467
        if todate:
468
469
470
471
472
473
474
475
            wheres.append('cset.insertion_date <= %(todate)s')

        sql.append('where ' + ' and '.join(wheres))
        if limit:
            sql.append('limit %(limit)s ')
        sql.append('order by cset.id desc')

        sql = ''.join(sql)
476

477
478
479
480
481
482
        rset = cn.execute(sql, {
            'fromdate': fromdate,
            'todate': todate,
            'fromrev': fromrev,
            'torev': torev
        })
483
        for csetid, author, revdate, meta in rset.fetchall():
484
485
            log.append({'rev': csetid, 'author': author,
                        'date': pd.Timestamp(revdate, tz='utc'),
486
                        'meta': meta or {},
487
                        'names': self._changeset_series(cn, csetid)})
488

489
        log.sort(key=lambda rev: rev['rev'])
490
491
        return log

492
    def interval(self, cn, seriename, notz=False):
493
494
495
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            raise ValueError(f'no such serie: {seriename}')
496
        sql = (f'select tsstart, tsend '
497
498
499
               f'from "{self.namespace}.timeserie"."{tablename}" '
               f'order by cset desc limit 1')
        res = cn.execute(sql).fetchone()
500
        start, end = res.tsstart, res.tsend
501
        if self.metadata(cn, seriename).get('tzaware') and not notz:
502
503
504
            start, end = pd.Timestamp(start, tz='UTC'), pd.Timestamp(end, tz='UTC')
        return pd.Interval(left=start, right=end, closed='both')

505
506
    # /API
    # Helpers
507

508
509
    # creation / update

510
    def _create(self, cn, newts, seriename, author,
511
                metadata=None, insertion_date=None):
512
        start, end = start_end(newts, notz=False)
513
514
515
516
        if start is None:
            assert end is None
            # this is just full of nans
            return None
517
518
519
520
521
        # chop off unwanted nans
        newts = newts.loc[start:end]
        if len(newts) == 0:
            return None

522
523
524
525
526
        # at creation time we take an exclusive lock to avoid
        # a deadlock on created tables against the changeset-series fk
        cn.execute(
            'select pg_advisory_xact_lock({})'.format(self.create_lock_id)
        )
527
        self._register_serie(cn, seriename, newts)
528
        snapshot = Snapshot(cn, self, seriename)
529
        csid = self._newchangeset(cn, author, insertion_date, metadata)
530
        head = snapshot.create(newts)
531
        start, end = start_end(newts)
532
533
534
535
536
537
        tablename = self._make_ts_table(cn, seriename)
        sql = (f'insert into "{self.namespace}.timeserie"."{tablename}" '
               '(cset, snapshot, tsstart, tsend) '
               f'values (%s, %s, %s, %s)')
        cn.execute(sql, csid, head,
                   start.to_pydatetime(), end.to_pydatetime())
538
        self._finalize_insertion(cn, csid, seriename)
539
        L.info('first insertion of %s (size=%s) by %s',
540
               seriename, len(newts), author)
541
542
        return newts

543
    def _update(self, cn, tablename, newts, seriename, author,
544
                metadata=None, insertion_date=None):
545
546
        self._validate(cn, newts, seriename)
        snapshot = Snapshot(cn, self, seriename)
547
548
549
        diff = self.diff(snapshot.last(newts.index.min(),
                                       newts.index.max()),
                         newts)
550
551
        if not len(diff):
            L.info('no difference in %s by %s (for ts of size %s)',
552
                   seriename, author, len(newts))
553
554
            return

555
        # compute series start/end stamps
556
        tsstart, tsend = start_end(newts)
557
558
559
        ival = self.interval(cn, seriename, notz=True)
        start = min(tsstart or ival.left, ival.left)
        end = max(tsend or ival.right, ival.right)
560
561
562
563
564
565
566
567
568
569
570
571
572
573

        if pd.isnull(diff[0]) or pd.isnull(diff[-1]):
            # we *might* be shrinking, let's look at the full series
            # and yes, shrinkers have a slow path
            last = snapshot.last()
            patched = self.patch(last, diff).dropna()
            if not len(patched):
                raise ValueError('complete erasure of a series is forbidden')
            if pd.isnull(diff[0]):
                start = patched.index[0]
            if pd.isnull(diff[-1]):
                end = patched.index[-1]

        csid = self._newchangeset(cn, author, insertion_date, metadata)
574
        head = snapshot.update(diff)
575
576
577
578
        sql = (f'insert into "{self.namespace}.timeserie"."{tablename}" '
               '(cset, snapshot, tsstart, tsend) '
               'values (%s, %s, %s, %s)')
        cn.execute(sql, csid, head, start, end)
579
        self._finalize_insertion(cn, csid, seriename)
580
581

        L.info('inserted diff (size=%s) for ts %s by %s',
582
               len(diff), seriename, author)
583
584
        return diff

585
    # serie table handling
586

587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
    def _make_tablename(self, cn, seriename):
        """ compute the unqualified (no namespace) table name
        from a serie name, to allow arbitrary serie names
        """
        # postgresql table names are limited to 63 chars.
        if len(seriename) > 63:
            seriename = hashlib.sha1(seriename.encode('utf-8')).hexdigest()

        # collision detection (collision can happen after a rename)
        if cn.execute(f'select table_name '
                      f'from "{self.namespace}".registry '
                      f'where table_name = %(seriename)s',
                      seriename=seriename).scalar():
            return str(uuid.uuid4())

        return seriename

604
605
606
607
    def _serie_to_tablename(self, cn, seriename):
        tablename = self.serie_tablename.get(seriename)
        if tablename is not None:
            return tablename
608

609
610
611
612
613
        tablename = cn.execute(
            f'select table_name from "{self.namespace}".registry '
            f'where seriename = %(seriename)s',
            seriename=seriename
        ).scalar()
614
615
616
617
618
619
620
621
622
623
        if tablename is None:
            # creation time
            return
        self.serie_tablename[seriename] = tablename
        return tablename

    def _table_definition_for(self, cn, seriename):
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            # creation time
Aurélien Campéas's avatar
Aurélien Campéas committed
624
            tablename = self._make_tablename(cn, seriename)
625
626
627
628
629
630
        table = sqlfile(
            SERIESSCHEMA,
            namespace=self.namespace,
            tablename=tablename
        )
        return table, tablename
631

Aurélien Campéas's avatar
Aurélien Campéas committed
632
    def _make_ts_table(self, cn, seriename):
633
634
635
        table, tablename = self._table_definition_for(cn, seriename)
        cn.execute(table)
        return tablename
636
637

    def _register_serie(self, cn, seriename, ts):
638
639
        index = ts.index
        inames = [name for name in index.names if name]
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
        sql = (f'insert into {self.namespace}.registry '
               '(seriename, table_name, metadata) '
               'values (%s, %s, %s) '
               'returning id')
        table_name = self._make_tablename(cn, seriename)
        metadata = json.dumps({
            'tzaware': tzaware_serie(ts),
            'index_type': index.dtype.name,
            'index_names': inames,
            'index_dtype': index.dtype.str,
            'value_dtype': ts.dtypes.str,
            'value_type': ts.dtypes.name
        })
        regid = cn.execute(
            sql,
            seriename,
            table_name,
            metadata
        ).scalar()
659
        self.registry_map[seriename] = regid
660

661
    def _get_ts_table(self, cn, seriename):
662
663
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename:
Aurélien Campéas's avatar
Aurélien Campéas committed
664
            return self._table_definition_for(cn, seriename)
665

666
667
    # changeset handling

668
669
670
671
    def _newchangeset(self, cn, author, insertion_date=None, metadata=None):
        if insertion_date is not None:
            assert insertion_date.tzinfo is not None
        idate = pd.Timestamp(insertion_date or datetime.utcnow(), tz='UTC')
672
673
674
675
676
677
678
679
680
681
682
683
        sql = (f'insert into {self.namespace}.changeset '
               '(author, metadata, insertion_date) '
               'values (%s, %s, %s) '
               'returning id')
        if metadata:
            metadata = json.dumps(metadata)
        return cn.execute(
            sql,
            author,
            metadata,
            idate
        ).scalar()
684

685
    def _changeset_series(self, cn, csid):
686
687
688
689
690
        sql = ('select seriename '
               f'from "{self.namespace}".registry as reg, '
               f'     "{self.namespace}".changeset_series as css '
               'where css.serie = reg.id '
               'and   css.cset = %(csid)s')
691

692
        return [
693
            row.seriename
694
            for row in cn.execute(sql, csid=csid).fetchall()
695
        ]
696

697
698
699
700
701
702
703
    def _previous_cset(self, cn, seriename, csid):
        tablename = self._serie_to_tablename(cn, seriename)
        sql = ('select cset from "{}.timeserie"."{}" '
               'where cset < %(csid)s '
               'order by cset desc limit 1').format(self.namespace, tablename)
        return cn.execute(sql, csid=csid).scalar()

704
705
    # insertion handling

706
    def _validate(self, cn, ts, seriename):
707
708
        if ts.isnull().all():
            # ts erasure
709
            return
710
        tstype = ts.dtype
711
        meta = self.metadata(cn, seriename)
712
        if tstype != meta['value_type']:
713
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
714
                seriename, tstype, meta['value_type'])
715
            raise Exception(m)
716
        if ts.index.dtype.name != meta['index_type']:
717
            raise Exception('Incompatible index types')
718

719
720
    def _name_to_regid(self, cn, seriename):
        regid = self.registry_map.get(seriename)
721
722
723
        if regid is not None:
            return regid

724
725
726
727
728
729
730
        sql = ('select id '
               f'from "{self.namespace}".registry '
               'where seriename = %(seriename)s')
        regid = self.registry_map[seriename] = cn.execute(
            sql,
            seriename=seriename
        ).scalar()
731
732
        return regid

733
    def _finalize_insertion(self, cn, csid, seriename):
734
735
736
737
        sql = (f'insert into "{self.namespace}".changeset_series '
               '(cset, serie) '
               'values (%s, %s)')
        cn.execute(sql, csid, self._name_to_regid(cn, seriename))
738

739
    def _resetcaches(self):
740
741
742
743
        with self.cachelock:
            self.metadatacache.clear()
            self.registry_map.clear()
            self.serie_tablename.clear()