tsio.py 26.8 KB
Newer Older
1
from datetime import datetime
2
import logging
3
4
import hashlib
import uuid
5
from threading import Lock
6
7
8

import pandas as pd

9
10
from sqlalchemy import Table, Column, Integer, ForeignKey, Index
from sqlalchemy.sql.elements import NONE_NAME
11
from sqlalchemy.engine.base import Engine
Aurélien Campéas's avatar
Aurélien Campéas committed
12
from sqlalchemy.sql.expression import select, func, desc
Aurélien Campéas's avatar
Aurélien Campéas committed
13
from sqlalchemy.dialects.postgresql import TIMESTAMP
14

15
from tshistory.schema import tsschema
16
from tshistory.util import (
17
    closed_overlaps,
18
19
    num2float,
    SeriesServices,
20
    start_end,
21
22
    tzaware_serie
)
23
from tshistory.snapshot import Snapshot, TABLES as SNAPTABLES
24

25
L = logging.getLogger('tshistory.tsio')
26
TABLES = {}
27
28


29
30
31
32
33
34
35
36
37
38
39
def tx(func):
    def check_tx_and_call(self, cn, *a, **kw):
        # safety belt to make sure important api points are tx-safe
        if isinstance(cn, Engine) or not cn.in_transaction():
            if not getattr(self, '_testing', False):
                raise TypeError('You must use a transaction object')

        return func(self, cn, *a, **kw)
    return check_tx_and_call


40
class TimeSerie(SeriesServices):
41
    namespace = 'tsh'
42
    schema = None
43
    metadatacache = None
44
45
46
47
48
49
50
51
    metakeys = {
        'tzaware',
        'index_type',
        'index_names',
        'index_dtype',
        'value_dtype',
        'value_type'
    }
52
    registry_map = None
53
    serie_tablename = None
54
    create_lock_id = None
55
    cachelock = Lock()
56
57
58

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
59
60
        self.schema = tsschema(namespace)
        self.schema.define()
61
        self.metadatacache = {}
62
        self.registry_map = {}
63
        self.serie_tablename = {}
64
        self.create_lock_id = sum(ord(c) for c in namespace)
65

66
    @tx
67
    def insert(self, cn, newts, seriename, author,
68
69
               metadata=None,
               _insertion_date=None):
70
        """Create a new revision of a given time series
71

72
        newts: pandas.Series with date index
73
        seriename: str unique identifier of the serie
74
        author: str free-form author name
75
        metadata: optional dict for changeset metadata
76
        """
77
78
79
80
        assert isinstance(newts, pd.Series), 'Not a pd.Series'
        assert isinstance(seriename, str), 'Name not a string'
        assert isinstance(author, str), 'Author not a string'
        assert metadata is None or isinstance(metadata, dict), 'Bad format for metadata'
81
82
        assert (_insertion_date is None or
                isinstance(_insertion_date, datetime)), 'Bad format for insertion date'
83
        assert not newts.index.duplicated().any(), 'There are some duplicates in the index'
84

85
        assert newts.index.notna().all(), 'The index contains NaT entries'
86
87
        if not newts.index.is_monotonic_increasing:
            newts = newts.sort_index()
88

89
        newts = num2float(newts)
90

91
        if not len(newts):
92
            return
93

94
        assert ('<M8[ns]' == newts.index.dtype or
95
                'datetime' in str(newts.index.dtype) and not
96
97
                isinstance(newts.index, pd.MultiIndex))

98
99
        newts.name = seriename
        table = self._get_ts_table(cn, seriename)
100

101
        if table is None:
102
            return self._create(cn, newts, seriename, author,
103
                                metadata, _insertion_date)
104

105
        return self._update(cn, table, newts, seriename, author,
106
                            metadata, _insertion_date)
107

108
    def get(self, cn, seriename, revision_date=None,
109
110
            from_value_date=None, to_value_date=None,
            _keep_nans=False):
111
112
113
114
115
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

116
        """
Aurélien Campéas's avatar
Aurélien Campéas committed
117
        if not self.exists(cn, seriename):
118
            return
119

120
        csetfilter = []
121
        if revision_date:
122
            csetfilter.append(lambda cset: cset.c.insertion_date <= revision_date)
123
        snap = Snapshot(cn, self, seriename)
124
        _, current = snap.find(csetfilter=csetfilter,
125
126
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)
127

128
        if current is not None and not _keep_nans:
129
            current.name = seriename
130
            current = current[~current.isnull()]
131
        return current
132

133
    def metadata(self, cn, seriename):
134
        """Return metadata dict of timeserie."""
135
136
        if seriename in self.metadatacache:
            return self.metadatacache[seriename]
137
138
        reg = self.schema.registry
        sql = select([reg.c.metadata]).where(
139
            reg.c.seriename == seriename
140
141
        )
        meta = cn.execute(sql).scalar()
142
143
        if meta is not None:
            self.metadatacache[seriename] = meta
144
145
        return meta

146
    @tx
147
    def update_metadata(self, cn, seriename, metadata, internal=False):
148
        assert isinstance(metadata, dict)
149
        assert internal or not set(metadata.keys()) & self.metakeys
150
        meta = self.metadata(cn, seriename)
151
        # remove al but internal stuff
152
153
154
155
156
        newmeta = {
            key: meta[key]
            for key in self.metakeys
            if meta.get(key) is not None
        }
157
        newmeta.update(metadata)
158
159
        reg = self.schema.registry
        sql = reg.update().where(
160
            reg.c.seriename == seriename
161
        ).values(metadata=newmeta)
162
        self.metadatacache.pop(seriename)
163
164
        cn.execute(sql)

165
166
167
168
169
170
171
    def changeset_metadata(self, cn, csid):
        sql = 'select metadata from "{ns}".changeset where id = {id}'.format(
            ns=self.namespace,
            id=csid
        )
        return cn.execute(sql).scalar()

172
173
174
    def type(self, cn, name):
        return 'primary'

175
    @tx
176
    def get_history(self, cn, seriename,
177
                    from_insertion_date=None,
178
179
                    to_insertion_date=None,
                    from_value_date=None,
180
                    to_value_date=None,
181
                    deltabefore=None,
182
                    deltaafter=None,
183
184
                    diffmode=False,
                    _keep_nans=False):
185
        table = self._get_ts_table(cn, seriename)
186
187
188
        if table is None:
            return

189
        cset = self.schema.changeset
190
191
192
193
194
195
196
197
198
199
200
201
        revsql = select(
            [cset.c.id, cset.c.insertion_date]
        ).order_by(
            cset.c.id
        ).where(
            table.c.cset == cset.c.id
        )

        if from_insertion_date:
            revsql = revsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            revsql = revsql.where(cset.c.insertion_date <= to_insertion_date)
202

203
204
205
206
207
208
209
210
211
        if from_value_date or to_value_date:
            revsql = revsql.where(
                closed_overlaps(from_value_date, to_value_date)
            )

        revs = cn.execute(
            revsql,
            {'fromdate': from_value_date, 'todate': to_value_date}
        ).fetchall()
212
        if not revs:
213
            return {}
214

215
216
217
218
219
220
        if diffmode:
            # compute the previous serie value
            first_csid = revs[0][0]
            previous_csid = self._previous_cset(cn, seriename, first_csid)
            revs.insert(0, (previous_csid, None))

221
        snapshot = Snapshot(cn, self, seriename)
222
        series = []
223
224
        if (deltabefore, deltaafter) != (None, None):
            for csid, idate in revs:
225
226
                from_date = None
                to_date = None
227
                if deltabefore is not None:
228
                    from_date = idate - deltabefore
229
                if deltaafter is not None:
230
                    to_date = idate + deltaafter
231
232
233
                series.append((
                    idate,
                    snapshot.find(csetfilter=[lambda cset: cset.c.id == csid],
234
235
                                  from_value_date=from_date,
                                  to_value_date=to_date)[1]
236
237
238
239
240
241
                ))
        else:
            series = snapshot.findall(revs,
                                      from_value_date,
                                      to_value_date
            )
242

243
244
        if diffmode:
            diffs = []
Aurélien Campéas's avatar
Aurélien Campéas committed
245
            for (_revdate_a, serie_a), (revdate_b, serie_b) in zip(series, series[1:]):
246
247
248
249
250
251
252
                if serie_a is None:
                    # when we scan the entirety of the history: there exists no "previous" serie
                    # we therefore consider the first serie as a diff to the "null" serie
                    diffs.append((revdate_b, serie_b))
                else:
                    diffs.append((revdate_b, self.diff(serie_a, serie_b)))
            series = diffs
253
254
        else:
            series = [
255
                (idate, ts if _keep_nans else ts.dropna() )
256
257
                 for idate, ts in series
            ]
258

259
        return {
260
            pd.Timestamp(idate).astimezone('UTC'): serie
261
262
            for idate, serie in series
        }
263

264
    @tx
265
266
267
    def get_delta(self, cn, seriename, delta,
                  from_value_date=None,
                  to_value_date=None):
268
269
270
271
        """ compute a series whose value dates are bounded to be
        `delta` time after the insertion dates and where we
        keep the most recent ones
        """
272
        histo = self.get_history(
273
274
            cn, seriename, deltabefore=-delta,
            from_value_date=from_value_date,
275
276
            to_value_date=to_value_date,
            _keep_nans=True
277
        )
278
279
        if histo is None:
            return None
280

281
282
283
284
285
286
287
288
        vimap = {}
        vvmap = {}
        for idate, series in histo.items():
            for vdate, value in series.iteritems():
                if vdate not in vimap or vimap[vdate] < idate:
                    vimap[vdate] = idate
                    vvmap[vdate] = value

289
        ts = pd.Series(vvmap).sort_index().loc[from_value_date:to_value_date]
290
        ts.name = seriename
291
        return ts.dropna()
292

293
294
    def exists(self, cn, seriename):
        return self._get_ts_table(cn, seriename) is not None
295

296
    def latest_insertion_date(self, cn, seriename):
297
        cset = self.schema.changeset
298
        tstable = self._get_ts_table(cn, seriename)
299
300
301
302
303
304
305
306
        sql = select(
            [func.max(cset.c.insertion_date)]
        ).where(
            tstable.c.cset == cset.c.id
        )
        return pd.Timestamp(
            cn.execute(sql).scalar()
        ).astimezone('UTC')
307

308
309
    def insertion_dates(self, cn, seriename,
                        fromdate=None, todate=None):
310
311
312
313
314
315
316
317
318
        cset = self.schema.changeset
        tstable = self._get_ts_table(cn, seriename)
        sql = select(
            [cset.c.insertion_date]
        ).where(
            tstable.c.cset == cset.c.id
        ).order_by(
            cset.c.id
        )
319
320
321
322
323
324

        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)
        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)

325
326
327
328
329
        return [
            pd.Timestamp(idate).astimezone('UTC')
            for idate, in cn.execute(sql).fetchall()
        ]

330
331
332
333
    def last_id(self, cn, seriename):
        snapshot = Snapshot(cn, self, seriename)
        return snapshot.last_id()

334
335
336
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
        assert mode in ('strict', 'before', 'after')
        cset = self.schema.changeset
337
        table = self._table_definition_for(cn, seriename)
338
339
        sql = select([table.c.cset]).where(
            table.c.cset == cset.c.id
340
341
342
343
344
345
346
347
348
        )
        if mode == 'strict':
            sql = sql.where(cset.c.insertion_date == revdate)
        elif mode == 'before':
            sql = sql.where(cset.c.insertion_date <= revdate)
        else:
            sql = sql.where(cset.c.insertion_date >= revdate)
        return cn.execute(sql).scalar()

349
350
351
352
353
354
355
356
357
358
359
    @tx
    def rename(self, cn, oldname, newname):
        reg = self.schema.registry
        sql = reg.update().where(
            reg.c.seriename == oldname
        ).values(
            seriename=newname
        )
        cn.execute(sql)
        self._resetcaches()

360
    @tx
361
    def delete(self, cn, seriename):
362
363
364
        if not self.exists(cn, seriename):
            print('not deleting unknown series', seriename, self.namespace)
            return
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
        # changeset will keep ghost entries
        # we cleanup changeset series, then registry
        # then we drop the two remaining tables
        # cn *must* be a transaction scope
        rid, tablename = cn.execute(
            'select id, table_name from "{}".registry '
            'where seriename = %(seriename)s'.format(self.namespace),
            seriename=seriename
        ).fetchone()
        # drop series tables
        cn.execute(
            'drop table "{}.timeserie"."{}" cascade'.format(self.namespace, tablename)
        )
        cn.execute(
            'drop table "{}.snapshot"."{}" cascade'.format(self.namespace, tablename)
        )
        # cleanup changesets table
        cn.execute('with csets as ('
                   ' select cset from "{ns}".changeset_series '
                   ' where serie = %(rid)s'
                   ') '
                   'delete from "{ns}".changeset as cset using csets '
                   'where cset.id = csets.cset'.format(ns=self.namespace),
                   rid=rid
        )
        cn.execute('delete from "{}".registry '
                   'where id = %(rid)s'.format(self.namespace),
                   rid=rid)
        # -> this will transitively cleanup state changeset_series entries
        self._resetcaches()
395
        print('deleted', seriename, self.namespace)
396

397
    @tx
398
399
400
401
402
403
404
405
406
    def strip(self, cn, seriename, csid):
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs

        # put stripping info in the metadata
        cset = self.schema.changeset
        cset_serie = self.schema.changeset_series
        for log in logs:
            # update changeset.metadata
407
            metadata = self.changeset_metadata(cn, log['rev']) or {}
408
409
410
411
412
413
            metadata['tshistory.info'] = 'got stripped from {}'.format(csid)
            sql = cset.update().where(cset.c.id == log['rev']
            ).values(metadata=metadata)
            cn.execute(sql)
            # delete changset_serie item
            sql = cset_serie.delete().where(
414
                cset_serie.c.cset == log['rev']
415
            ).where(
416
                cset_serie.c.serie == self._name_to_regid(cn, seriename)
417
418
419
420
            )
            cn.execute(sql)

        # wipe the diffs
421
        table = self._table_definition_for(cn, seriename)
422
        cn.execute(table.delete().where(table.c.cset >= csid))
423

424
    def info(self, cn):
425
426
        """Gather global statistics on the current tshistory repository
        """
427
        sql = 'select count(*) from "{}".registry'.format(self.namespace)
428
        stats = {'series count': cn.execute(sql).scalar()}
429
        sql = 'select max(id) from "{}".changeset'.format(self.namespace)
430
        stats['changeset count'] = cn.execute(sql).scalar()
431
432
433
        sql = 'select distinct seriename from "{}".registry order by seriename'.format(
            self.namespace
        )
434
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
435
436
        return stats

437
    def log(self, cn, limit=0, names=None, authors=None,
438
            stripped=False,
439
440
            fromrev=None, torev=None,
            fromdate=None, todate=None):
441
442
443
444
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
445
446
447
448
449
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
450

451
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date, cset.c.metadata]
452
        ).distinct().order_by(desc(cset.c.id))
453
454
455

        if limit:
            sql = sql.limit(limit)
456
        if names:
457
            sql = sql.where(reg.c.seriename.in_(names))
458
459
        if authors:
            sql = sql.where(cset.c.author.in_(authors))
460
461
462
463
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)
        if torev:
            sql = sql.where(cset.c.id <= torev)
464
465
466
467
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)
        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)
468
469
470
471
        if stripped:
            # outerjoin to show dead things
            sql = sql.select_from(cset.outerjoin(cset_series))
        else:
472
            sql = sql.where(cset.c.id == cset_series.c.cset
473
            ).where(cset_series.c.serie == reg.c.id)
474

475
        rset = cn.execute(sql)
476
        for csetid, author, revdate, meta in rset.fetchall():
477
478
            log.append({'rev': csetid, 'author': author,
                        'date': pd.Timestamp(revdate, tz='utc'),
479
                        'meta': meta or {},
480
                        'names': self._changeset_series(cn, csetid)})
481

482
        log.sort(key=lambda rev: rev['rev'])
483
484
        return log

485
    def interval(self, cn, seriename, notz=False):
486
487
488
489
490
491
492
493
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            raise ValueError(f'no such serie: {seriename}')
        sql = (f'select start, "end" '
               f'from "{self.namespace}.timeserie"."{tablename}" '
               f'order by cset desc limit 1')
        res = cn.execute(sql).fetchone()
        start, end = res.start, res.end
494
        if self.metadata(cn, seriename).get('tzaware') and not notz:
495
496
497
            start, end = pd.Timestamp(start, tz='UTC'), pd.Timestamp(end, tz='UTC')
        return pd.Interval(left=start, right=end, closed='both')

498
499
    # /API
    # Helpers
500

501
502
    # creation / update

503
    def _create(self, cn, newts, seriename, author,
504
                metadata=None, insertion_date=None):
505
        start, end = start_end(newts, notz=False)
506
507
508
509
        if start is None:
            assert end is None
            # this is just full of nans
            return None
510
511
512
513
514
        # chop off unwanted nans
        newts = newts.loc[start:end]
        if len(newts) == 0:
            return None

515
516
517
518
519
        # at creation time we take an exclusive lock to avoid
        # a deadlock on created tables against the changeset-series fk
        cn.execute(
            'select pg_advisory_xact_lock({})'.format(self.create_lock_id)
        )
520
        self._register_serie(cn, seriename, newts)
521
        snapshot = Snapshot(cn, self, seriename)
522
        csid = self._newchangeset(cn, author, insertion_date, metadata)
523
        head = snapshot.create(newts)
524
        start, end = start_end(newts)
525
526
        value = {
            'cset': csid,
527
528
529
            'snapshot': head,
            'start': start,
            'end': end
530
        }
Aurélien Campéas's avatar
Aurélien Campéas committed
531
        table = self._make_ts_table(cn, seriename)
532
        cn.execute(table.insert().values(value))
533
        self._finalize_insertion(cn, csid, seriename)
534
        L.info('first insertion of %s (size=%s) by %s',
535
               seriename, len(newts), author)
536
537
        return newts

538
    def _update(self, cn, table, newts, seriename, author,
539
                metadata=None, insertion_date=None):
540
541
        self._validate(cn, newts, seriename)
        snapshot = Snapshot(cn, self, seriename)
542
543
544
        diff = self.diff(snapshot.last(newts.index.min(),
                                       newts.index.max()),
                         newts)
545
546
        if not len(diff):
            L.info('no difference in %s by %s (for ts of size %s)',
547
                   seriename, author, len(newts))
548
549
            return

550
        # compute series start/end stamps
551
        tsstart, tsend = start_end(newts)
552
553
554
        ival = self.interval(cn, seriename, notz=True)
        start = min(tsstart or ival.left, ival.left)
        end = max(tsend or ival.right, ival.right)
555
556
557
558
559
560
561
562
563
564
565
566
567
568

        if pd.isnull(diff[0]) or pd.isnull(diff[-1]):
            # we *might* be shrinking, let's look at the full series
            # and yes, shrinkers have a slow path
            last = snapshot.last()
            patched = self.patch(last, diff).dropna()
            if not len(patched):
                raise ValueError('complete erasure of a series is forbidden')
            if pd.isnull(diff[0]):
                start = patched.index[0]
            if pd.isnull(diff[-1]):
                end = patched.index[-1]

        csid = self._newchangeset(cn, author, insertion_date, metadata)
569
570
571
        head = snapshot.update(diff)
        value = {
            'cset': csid,
572
573
574
            'snapshot': head,
            'start': start,
            'end': end
575
576
        }
        cn.execute(table.insert().values(value))
577
        self._finalize_insertion(cn, csid, seriename)
578
579

        L.info('inserted diff (size=%s) for ts %s by %s',
580
               len(diff), seriename, author)
581
582
        return diff

583
    # serie table handling
584

585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
    def _make_tablename(self, cn, seriename):
        """ compute the unqualified (no namespace) table name
        from a serie name, to allow arbitrary serie names
        """
        # postgresql table names are limited to 63 chars.
        if len(seriename) > 63:
            seriename = hashlib.sha1(seriename.encode('utf-8')).hexdigest()

        # collision detection (collision can happen after a rename)
        if cn.execute(f'select table_name '
                      f'from "{self.namespace}".registry '
                      f'where table_name = %(seriename)s',
                      seriename=seriename).scalar():
            return str(uuid.uuid4())

        return seriename

602
603
604
605
    def _serie_to_tablename(self, cn, seriename):
        tablename = self.serie_tablename.get(seriename)
        if tablename is not None:
            return tablename
606

607
608
609
610
611
612
613
614
615
616
617
618
619
        reg = self.schema.registry
        sql = select([reg.c.table_name]).where(reg.c.seriename == seriename)
        tablename = cn.execute(sql).scalar()
        if tablename is None:
            # creation time
            return
        self.serie_tablename[seriename] = tablename
        return tablename

    def _table_definition_for(self, cn, seriename):
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            # creation time
Aurélien Campéas's avatar
Aurélien Campéas committed
620
            tablename = self._make_tablename(cn, seriename)
621
622
        fq_tablename = '{}.timeserie.{}'.format(self.namespace, tablename)
        table = TABLES.get(fq_tablename)
623
        if table is None:
624
625
            TABLES[fq_tablename] = table = Table(
                tablename, self.schema.meta,
626
627
628
                Column('id', Integer, primary_key=True),
                Column('cset', Integer,
                       ForeignKey('{}.changeset.id'.format(self.namespace)),
629
                       nullable=False),
630
631
                Column('start', TIMESTAMP, nullable=False),
                Column('end', TIMESTAMP, nullable=False),
632
633
634
                Column('snapshot', Integer,
                       ForeignKey('{}.snapshot.{}.id'.format(
                           self.namespace,
635
636
637
                           tablename))),
                Index(NONE_NAME, 'cset'),
                Index(NONE_NAME, 'snapshot'),
638
639
                Index(NONE_NAME, 'start'),
                Index(NONE_NAME, 'end'),
640
                schema='{}.timeserie'.format(self.namespace),
641
                keep_existing=True
642
643
            )
        return table
644

Aurélien Campéas's avatar
Aurélien Campéas committed
645
    def _make_ts_table(self, cn, seriename):
646
        table = self._table_definition_for(cn, seriename)
647
        table.create(cn)
648
649
650
        return table

    def _register_serie(self, cn, seriename, ts):
651
652
        index = ts.index
        inames = [name for name in index.names if name]
653
        sql = self.schema.registry.insert().values(
654
            seriename=seriename,
Aurélien Campéas's avatar
Aurélien Campéas committed
655
            table_name=self._make_tablename(cn, seriename),
656
657
658
659
            metadata={
                'tzaware': tzaware_serie(ts),
                'index_type': index.dtype.name,
                'index_names': inames,
660
661
                'index_dtype': index.dtype.str,
                'value_dtype': ts.dtypes.str,
662
                'value_type': ts.dtypes.name
663
            }
664
        )
665
666
        regid = cn.execute(sql).inserted_primary_key[0]
        self.registry_map[seriename] = regid
667

668
    def _get_ts_table(self, cn, seriename):
669
670
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename:
Aurélien Campéas's avatar
Aurélien Campéas committed
671
            return self._table_definition_for(cn, seriename)
672

673
674
    # changeset handling

675
    def _newchangeset(self, cn, author, insertion_date=None, metadata=None):
676
        table = self.schema.changeset
677
678
679
        if insertion_date is not None:
            assert insertion_date.tzinfo is not None
        idate = pd.Timestamp(insertion_date or datetime.utcnow(), tz='UTC')
680
681
        sql = table.insert().values(
            author=author,
682
            metadata=metadata,
683
            insertion_date=idate)
684
        return cn.execute(sql).inserted_primary_key[0]
685

686
    def _changeset_series(self, cn, csid):
687
        cset_serie = self.schema.changeset_series
688
689
        reg = self.schema.registry
        sql = select(
690
            [reg.c.seriename]
691
692
        ).where(cset_serie.c.cset == csid
        ).where(cset_serie.c.serie == reg.c.id)
693

694
        return [
695
            row.seriename
696
697
            for row in cn.execute(sql).fetchall()
        ]
698

699
700
701
702
703
704
705
    def _previous_cset(self, cn, seriename, csid):
        tablename = self._serie_to_tablename(cn, seriename)
        sql = ('select cset from "{}.timeserie"."{}" '
               'where cset < %(csid)s '
               'order by cset desc limit 1').format(self.namespace, tablename)
        return cn.execute(sql, csid=csid).scalar()

706
707
    # insertion handling

708
    def _validate(self, cn, ts, seriename):
709
710
        if ts.isnull().all():
            # ts erasure
711
            return
712
        tstype = ts.dtype
713
        meta = self.metadata(cn, seriename)
714
        if tstype != meta['value_type']:
715
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
716
                seriename, tstype, meta['value_type'])
717
            raise Exception(m)
718
        if ts.index.dtype.name != meta['index_type']:
719
            raise Exception('Incompatible index types')
720

721
722
    def _name_to_regid(self, cn, seriename):
        regid = self.registry_map.get(seriename)
723
724
725
726
        if regid is not None:
            return regid

        registry = self.schema.registry
727
        sql = select([registry.c.id]).where(registry.c.seriename == seriename)
728
        regid = self.registry_map[seriename] = cn.execute(sql).scalar()
729
730
        return regid

731
    def _finalize_insertion(self, cn, csid, seriename):
732
733
734
        table = self.schema.changeset_series
        sql = table.insert().values(
            cset=csid,
735
            serie=self._name_to_regid(cn, seriename)
736
737
        )
        cn.execute(sql)
738

739
    def _resetcaches(self):
740
741
742
743
744
745
        with self.cachelock:
            TABLES.clear()
            SNAPTABLES.clear()
            self.metadatacache.clear()
            self.registry_map.clear()
            self.serie_tablename.clear()