tsio.py 26.7 KB
Newer Older
1
from datetime import datetime
2
import logging
3
4
import hashlib
import uuid
5
from threading import Lock
6
7
8

import pandas as pd

9
10
from sqlalchemy import Table, Column, Integer, ForeignKey, Index
from sqlalchemy.sql.elements import NONE_NAME
11
from sqlalchemy.engine.base import Engine
Aurélien Campéas's avatar
Aurélien Campéas committed
12
from sqlalchemy.sql.expression import select, func, desc
Aurélien Campéas's avatar
Aurélien Campéas committed
13
from sqlalchemy.dialects.postgresql import TIMESTAMP
14

15
from tshistory.schema import tsschema
16
from tshistory.util import (
17
    closed_overlaps,
18
19
    num2float,
    SeriesServices,
20
    start_end,
21
22
    tzaware_serie
)
23
from tshistory.snapshot import Snapshot, TABLES as SNAPTABLES
24

25
L = logging.getLogger('tshistory.tsio')
26
TABLES = {}
27
28


29
30
31
32
33
34
35
36
37
38
39
def tx(func):
    def check_tx_and_call(self, cn, *a, **kw):
        # safety belt to make sure important api points are tx-safe
        if isinstance(cn, Engine) or not cn.in_transaction():
            if not getattr(self, '_testing', False):
                raise TypeError('You must use a transaction object')

        return func(self, cn, *a, **kw)
    return check_tx_and_call


40
class TimeSerie(SeriesServices):
41
    namespace = 'tsh'
42
    schema = None
43
    metadatacache = None
44
45
46
47
48
49
50
51
    metakeys = {
        'tzaware',
        'index_type',
        'index_names',
        'index_dtype',
        'value_dtype',
        'value_type'
    }
52
    registry_map = None
53
    serie_tablename = None
54
    create_lock_id = None
55
    cachelock = Lock()
56
57
58

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
59
60
        self.schema = tsschema(namespace)
        self.schema.define()
61
        self.metadatacache = {}
62
        self.registry_map = {}
63
        self.serie_tablename = {}
64
        self.create_lock_id = sum(ord(c) for c in namespace)
65

66
    @tx
67
    def insert(self, cn, newts, seriename, author,
68
69
               metadata=None,
               _insertion_date=None):
70
        """Create a new revision of a given time series
71

72
        newts: pandas.Series with date index
73
        seriename: str unique identifier of the serie
74
        author: str free-form author name
75
        metadata: optional dict for changeset metadata
76
        """
77
78
79
80
        assert isinstance(newts, pd.Series), 'Not a pd.Series'
        assert isinstance(seriename, str), 'Name not a string'
        assert isinstance(author, str), 'Author not a string'
        assert metadata is None or isinstance(metadata, dict), 'Bad format for metadata'
81
82
        assert (_insertion_date is None or
                isinstance(_insertion_date, datetime)), 'Bad format for insertion date'
83
        assert not newts.index.duplicated().any(), 'There are some duplicates in the index'
84

85
        assert newts.index.notna().all(), 'The index contains NaT entries'
86
87
        if not newts.index.is_monotonic_increasing:
            newts = newts.sort_index()
88

89
        newts = num2float(newts)
90

91
        if not len(newts):
92
            return
93

94
        assert ('<M8[ns]' == newts.index.dtype or
95
                'datetime' in str(newts.index.dtype) and not
96
97
                isinstance(newts.index, pd.MultiIndex))

98
99
        newts.name = seriename
        table = self._get_ts_table(cn, seriename)
100

101
        if table is None:
102
            return self._create(cn, newts, seriename, author,
103
                                metadata, _insertion_date)
104

105
        return self._update(cn, table, newts, seriename, author,
106
                            metadata, _insertion_date)
107

108
    def get(self, cn, seriename, revision_date=None,
109
110
            from_value_date=None, to_value_date=None,
            _keep_nans=False):
111
112
113
114
115
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

116
        """
Aurélien Campéas's avatar
Aurélien Campéas committed
117
        if not self.exists(cn, seriename):
118
            return
119

120
        csetfilter = []
121
        if revision_date:
122
            csetfilter.append(lambda cset: cset.c.insertion_date <= revision_date)
123
        snap = Snapshot(cn, self, seriename)
124
        _, current = snap.find(csetfilter=csetfilter,
125
126
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)
127

128
        if current is not None and not _keep_nans:
129
            current.name = seriename
130
            current = current[~current.isnull()]
131
        return current
132

133
    def metadata(self, cn, seriename):
134
        """Return metadata dict of timeserie."""
135
136
        if seriename in self.metadatacache:
            return self.metadatacache[seriename]
137
138
        reg = self.schema.registry
        sql = select([reg.c.metadata]).where(
139
            reg.c.seriename == seriename
140
141
        )
        meta = cn.execute(sql).scalar()
142
        self.metadatacache[seriename] = meta
143
144
        return meta

145
    @tx
146
    def update_metadata(self, cn, seriename, metadata, internal=False):
147
        assert isinstance(metadata, dict)
148
        assert internal or not set(metadata.keys()) & self.metakeys
149
        meta = self.metadata(cn, seriename)
150
        # remove al but internal stuff
151
152
153
154
155
        newmeta = {
            key: meta[key]
            for key in self.metakeys
            if meta.get(key) is not None
        }
156
        newmeta.update(metadata)
157
158
        reg = self.schema.registry
        sql = reg.update().where(
159
            reg.c.seriename == seriename
160
        ).values(metadata=newmeta)
161
        self.metadatacache.pop(seriename)
162
163
        cn.execute(sql)

164
165
166
167
168
169
170
    def changeset_metadata(self, cn, csid):
        sql = 'select metadata from "{ns}".changeset where id = {id}'.format(
            ns=self.namespace,
            id=csid
        )
        return cn.execute(sql).scalar()

171
172
173
    def type(self, cn, name):
        return 'primary'

174
    @tx
175
    def get_history(self, cn, seriename,
176
                    from_insertion_date=None,
177
178
                    to_insertion_date=None,
                    from_value_date=None,
179
                    to_value_date=None,
180
                    deltabefore=None,
181
                    deltaafter=None,
182
183
                    diffmode=False,
                    _keep_nans=False):
184
        table = self._get_ts_table(cn, seriename)
185
186
187
        if table is None:
            return

188
        cset = self.schema.changeset
189
190
191
192
193
194
195
196
197
198
199
200
        revsql = select(
            [cset.c.id, cset.c.insertion_date]
        ).order_by(
            cset.c.id
        ).where(
            table.c.cset == cset.c.id
        )

        if from_insertion_date:
            revsql = revsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            revsql = revsql.where(cset.c.insertion_date <= to_insertion_date)
201

202
203
204
205
206
207
208
209
210
        if from_value_date or to_value_date:
            revsql = revsql.where(
                closed_overlaps(from_value_date, to_value_date)
            )

        revs = cn.execute(
            revsql,
            {'fromdate': from_value_date, 'todate': to_value_date}
        ).fetchall()
211
        if not revs:
212
            return {}
213

214
215
216
217
218
219
        if diffmode:
            # compute the previous serie value
            first_csid = revs[0][0]
            previous_csid = self._previous_cset(cn, seriename, first_csid)
            revs.insert(0, (previous_csid, None))

220
        snapshot = Snapshot(cn, self, seriename)
221
        series = []
222
223
        if (deltabefore, deltaafter) != (None, None):
            for csid, idate in revs:
224
225
                from_date = None
                to_date = None
226
                if deltabefore is not None:
227
                    from_date = idate - deltabefore
228
                if deltaafter is not None:
229
                    to_date = idate + deltaafter
230
231
232
                series.append((
                    idate,
                    snapshot.find(csetfilter=[lambda cset: cset.c.id == csid],
233
234
                                  from_value_date=from_date,
                                  to_value_date=to_date)[1]
235
236
237
238
239
240
                ))
        else:
            series = snapshot.findall(revs,
                                      from_value_date,
                                      to_value_date
            )
241

242
243
        if diffmode:
            diffs = []
Aurélien Campéas's avatar
Aurélien Campéas committed
244
            for (_revdate_a, serie_a), (revdate_b, serie_b) in zip(series, series[1:]):
245
246
247
248
249
250
251
                if serie_a is None:
                    # when we scan the entirety of the history: there exists no "previous" serie
                    # we therefore consider the first serie as a diff to the "null" serie
                    diffs.append((revdate_b, serie_b))
                else:
                    diffs.append((revdate_b, self.diff(serie_a, serie_b)))
            series = diffs
252
253
        else:
            series = [
254
                (idate, ts if _keep_nans else ts.dropna() )
255
256
                 for idate, ts in series
            ]
257

258
        return {
259
            pd.Timestamp(idate).astimezone('UTC'): serie
260
261
            for idate, serie in series
        }
262

263
    @tx
264
265
266
    def get_delta(self, cn, seriename, delta,
                  from_value_date=None,
                  to_value_date=None):
267
268
269
270
        """ compute a series whose value dates are bounded to be
        `delta` time after the insertion dates and where we
        keep the most recent ones
        """
271
        histo = self.get_history(
272
273
            cn, seriename, deltabefore=-delta,
            from_value_date=from_value_date,
274
275
            to_value_date=to_value_date,
            _keep_nans=True
276
        )
277
278
        if histo is None:
            return None
279

280
281
282
283
284
285
286
287
        vimap = {}
        vvmap = {}
        for idate, series in histo.items():
            for vdate, value in series.iteritems():
                if vdate not in vimap or vimap[vdate] < idate:
                    vimap[vdate] = idate
                    vvmap[vdate] = value

288
        ts = pd.Series(vvmap).sort_index().loc[from_value_date:to_value_date]
289
        ts.name = seriename
290
        return ts.dropna()
291

292
293
    def exists(self, cn, seriename):
        return self._get_ts_table(cn, seriename) is not None
294

295
    def latest_insertion_date(self, cn, seriename):
296
        cset = self.schema.changeset
297
        tstable = self._get_ts_table(cn, seriename)
298
299
300
301
302
303
304
305
        sql = select(
            [func.max(cset.c.insertion_date)]
        ).where(
            tstable.c.cset == cset.c.id
        )
        return pd.Timestamp(
            cn.execute(sql).scalar()
        ).astimezone('UTC')
306

307
308
    def insertion_dates(self, cn, seriename,
                        fromdate=None, todate=None):
309
310
311
312
313
314
315
316
317
        cset = self.schema.changeset
        tstable = self._get_ts_table(cn, seriename)
        sql = select(
            [cset.c.insertion_date]
        ).where(
            tstable.c.cset == cset.c.id
        ).order_by(
            cset.c.id
        )
318
319
320
321
322
323

        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)
        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)

324
325
326
327
328
        return [
            pd.Timestamp(idate).astimezone('UTC')
            for idate, in cn.execute(sql).fetchall()
        ]

329
330
331
332
    def last_id(self, cn, seriename):
        snapshot = Snapshot(cn, self, seriename)
        return snapshot.last_id()

333
334
335
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
        assert mode in ('strict', 'before', 'after')
        cset = self.schema.changeset
336
        table = self._table_definition_for(cn, seriename)
337
338
        sql = select([table.c.cset]).where(
            table.c.cset == cset.c.id
339
340
341
342
343
344
345
346
347
        )
        if mode == 'strict':
            sql = sql.where(cset.c.insertion_date == revdate)
        elif mode == 'before':
            sql = sql.where(cset.c.insertion_date <= revdate)
        else:
            sql = sql.where(cset.c.insertion_date >= revdate)
        return cn.execute(sql).scalar()

348
349
350
351
352
353
354
355
356
357
358
    @tx
    def rename(self, cn, oldname, newname):
        reg = self.schema.registry
        sql = reg.update().where(
            reg.c.seriename == oldname
        ).values(
            seriename=newname
        )
        cn.execute(sql)
        self._resetcaches()

359
    @tx
360
    def delete(self, cn, seriename):
361
362
363
        if not self.exists(cn, seriename):
            print('not deleting unknown series', seriename, self.namespace)
            return
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
        # changeset will keep ghost entries
        # we cleanup changeset series, then registry
        # then we drop the two remaining tables
        # cn *must* be a transaction scope
        rid, tablename = cn.execute(
            'select id, table_name from "{}".registry '
            'where seriename = %(seriename)s'.format(self.namespace),
            seriename=seriename
        ).fetchone()
        # drop series tables
        cn.execute(
            'drop table "{}.timeserie"."{}" cascade'.format(self.namespace, tablename)
        )
        cn.execute(
            'drop table "{}.snapshot"."{}" cascade'.format(self.namespace, tablename)
        )
        # cleanup changesets table
        cn.execute('with csets as ('
                   ' select cset from "{ns}".changeset_series '
                   ' where serie = %(rid)s'
                   ') '
                   'delete from "{ns}".changeset as cset using csets '
                   'where cset.id = csets.cset'.format(ns=self.namespace),
                   rid=rid
        )
        cn.execute('delete from "{}".registry '
                   'where id = %(rid)s'.format(self.namespace),
                   rid=rid)
        # -> this will transitively cleanup state changeset_series entries
        self._resetcaches()
394
        print('deleted', seriename, self.namespace)
395

396
    @tx
397
398
399
400
401
402
403
404
405
    def strip(self, cn, seriename, csid):
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs

        # put stripping info in the metadata
        cset = self.schema.changeset
        cset_serie = self.schema.changeset_series
        for log in logs:
            # update changeset.metadata
406
            metadata = self.changeset_metadata(cn, log['rev']) or {}
407
408
409
410
411
412
            metadata['tshistory.info'] = 'got stripped from {}'.format(csid)
            sql = cset.update().where(cset.c.id == log['rev']
            ).values(metadata=metadata)
            cn.execute(sql)
            # delete changset_serie item
            sql = cset_serie.delete().where(
413
                cset_serie.c.cset == log['rev']
414
            ).where(
415
                cset_serie.c.serie == self._name_to_regid(cn, seriename)
416
417
418
419
            )
            cn.execute(sql)

        # wipe the diffs
420
        table = self._table_definition_for(cn, seriename)
421
        cn.execute(table.delete().where(table.c.cset >= csid))
422

423
    def info(self, cn):
424
425
        """Gather global statistics on the current tshistory repository
        """
426
        sql = 'select count(*) from "{}".registry'.format(self.namespace)
427
        stats = {'series count': cn.execute(sql).scalar()}
428
        sql = 'select max(id) from "{}".changeset'.format(self.namespace)
429
        stats['changeset count'] = cn.execute(sql).scalar()
430
431
432
        sql = 'select distinct seriename from "{}".registry order by seriename'.format(
            self.namespace
        )
433
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
434
435
        return stats

436
    def log(self, cn, limit=0, names=None, authors=None,
437
            stripped=False,
438
439
            fromrev=None, torev=None,
            fromdate=None, todate=None):
440
441
442
443
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
444
445
446
447
448
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
449

450
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date, cset.c.metadata]
451
        ).distinct().order_by(desc(cset.c.id))
452
453
454

        if limit:
            sql = sql.limit(limit)
455
        if names:
456
            sql = sql.where(reg.c.seriename.in_(names))
457
458
        if authors:
            sql = sql.where(cset.c.author.in_(authors))
459
460
461
462
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)
        if torev:
            sql = sql.where(cset.c.id <= torev)
463
464
465
466
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)
        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)
467
468
469
470
        if stripped:
            # outerjoin to show dead things
            sql = sql.select_from(cset.outerjoin(cset_series))
        else:
471
            sql = sql.where(cset.c.id == cset_series.c.cset
472
            ).where(cset_series.c.serie == reg.c.id)
473

474
        rset = cn.execute(sql)
475
        for csetid, author, revdate, meta in rset.fetchall():
476
477
            log.append({'rev': csetid, 'author': author,
                        'date': pd.Timestamp(revdate, tz='utc'),
478
                        'meta': meta or {},
479
                        'names': self._changeset_series(cn, csetid)})
480

481
        log.sort(key=lambda rev: rev['rev'])
482
483
        return log

484
    def interval(self, cn, seriename, notz=False):
485
486
487
488
489
490
491
492
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            raise ValueError(f'no such serie: {seriename}')
        sql = (f'select start, "end" '
               f'from "{self.namespace}.timeserie"."{tablename}" '
               f'order by cset desc limit 1')
        res = cn.execute(sql).fetchone()
        start, end = res.start, res.end
493
        if self.metadata(cn, seriename).get('tzaware') and not notz:
494
495
496
            start, end = pd.Timestamp(start, tz='UTC'), pd.Timestamp(end, tz='UTC')
        return pd.Interval(left=start, right=end, closed='both')

497
498
    # /API
    # Helpers
499

500
501
    # creation / update

502
    def _create(self, cn, newts, seriename, author,
503
                metadata=None, insertion_date=None):
504
        start, end = start_end(newts, notz=False)
505
506
507
508
        if start is None:
            assert end is None
            # this is just full of nans
            return None
509
510
511
512
513
        # chop off unwanted nans
        newts = newts.loc[start:end]
        if len(newts) == 0:
            return None

514
515
516
517
518
        # at creation time we take an exclusive lock to avoid
        # a deadlock on created tables against the changeset-series fk
        cn.execute(
            'select pg_advisory_xact_lock({})'.format(self.create_lock_id)
        )
519
        self._register_serie(cn, seriename, newts)
520
        snapshot = Snapshot(cn, self, seriename)
521
        csid = self._newchangeset(cn, author, insertion_date, metadata)
522
        head = snapshot.create(newts)
523
        start, end = start_end(newts)
524
525
        value = {
            'cset': csid,
526
527
528
            'snapshot': head,
            'start': start,
            'end': end
529
        }
Aurélien Campéas's avatar
Aurélien Campéas committed
530
        table = self._make_ts_table(cn, seriename)
531
        cn.execute(table.insert().values(value))
532
        self._finalize_insertion(cn, csid, seriename)
533
        L.info('first insertion of %s (size=%s) by %s',
534
               seriename, len(newts), author)
535
536
        return newts

537
    def _update(self, cn, table, newts, seriename, author,
538
                metadata=None, insertion_date=None):
539
540
        self._validate(cn, newts, seriename)
        snapshot = Snapshot(cn, self, seriename)
541
542
543
        diff = self.diff(snapshot.last(newts.index.min(),
                                       newts.index.max()),
                         newts)
544
545
        if not len(diff):
            L.info('no difference in %s by %s (for ts of size %s)',
546
                   seriename, author, len(newts))
547
548
            return

549
        # compute series start/end stamps
550
        tsstart, tsend = start_end(newts)
551
552
553
        ival = self.interval(cn, seriename, notz=True)
        start = min(tsstart or ival.left, ival.left)
        end = max(tsend or ival.right, ival.right)
554
555
556
557
558
559
560
561
562
563
564
565
566
567

        if pd.isnull(diff[0]) or pd.isnull(diff[-1]):
            # we *might* be shrinking, let's look at the full series
            # and yes, shrinkers have a slow path
            last = snapshot.last()
            patched = self.patch(last, diff).dropna()
            if not len(patched):
                raise ValueError('complete erasure of a series is forbidden')
            if pd.isnull(diff[0]):
                start = patched.index[0]
            if pd.isnull(diff[-1]):
                end = patched.index[-1]

        csid = self._newchangeset(cn, author, insertion_date, metadata)
568
569
570
        head = snapshot.update(diff)
        value = {
            'cset': csid,
571
572
573
            'snapshot': head,
            'start': start,
            'end': end
574
575
        }
        cn.execute(table.insert().values(value))
576
        self._finalize_insertion(cn, csid, seriename)
577
578

        L.info('inserted diff (size=%s) for ts %s by %s',
579
               len(diff), seriename, author)
580
581
        return diff

582
    # serie table handling
583

584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
    def _make_tablename(self, cn, seriename):
        """ compute the unqualified (no namespace) table name
        from a serie name, to allow arbitrary serie names
        """
        # postgresql table names are limited to 63 chars.
        if len(seriename) > 63:
            seriename = hashlib.sha1(seriename.encode('utf-8')).hexdigest()

        # collision detection (collision can happen after a rename)
        if cn.execute(f'select table_name '
                      f'from "{self.namespace}".registry '
                      f'where table_name = %(seriename)s',
                      seriename=seriename).scalar():
            return str(uuid.uuid4())

        return seriename

601
602
603
604
    def _serie_to_tablename(self, cn, seriename):
        tablename = self.serie_tablename.get(seriename)
        if tablename is not None:
            return tablename
605

606
607
608
609
610
611
612
613
614
615
616
617
618
        reg = self.schema.registry
        sql = select([reg.c.table_name]).where(reg.c.seriename == seriename)
        tablename = cn.execute(sql).scalar()
        if tablename is None:
            # creation time
            return
        self.serie_tablename[seriename] = tablename
        return tablename

    def _table_definition_for(self, cn, seriename):
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            # creation time
Aurélien Campéas's avatar
Aurélien Campéas committed
619
            tablename = self._make_tablename(cn, seriename)
620
621
        fq_tablename = '{}.timeserie.{}'.format(self.namespace, tablename)
        table = TABLES.get(fq_tablename)
622
        if table is None:
623
624
            TABLES[fq_tablename] = table = Table(
                tablename, self.schema.meta,
625
626
627
                Column('id', Integer, primary_key=True),
                Column('cset', Integer,
                       ForeignKey('{}.changeset.id'.format(self.namespace)),
628
                       nullable=False),
629
630
                Column('start', TIMESTAMP, nullable=False),
                Column('end', TIMESTAMP, nullable=False),
631
632
633
                Column('snapshot', Integer,
                       ForeignKey('{}.snapshot.{}.id'.format(
                           self.namespace,
634
635
636
                           tablename))),
                Index(NONE_NAME, 'cset'),
                Index(NONE_NAME, 'snapshot'),
637
638
                Index(NONE_NAME, 'start'),
                Index(NONE_NAME, 'end'),
639
                schema='{}.timeserie'.format(self.namespace),
640
                keep_existing=True
641
642
            )
        return table
643

Aurélien Campéas's avatar
Aurélien Campéas committed
644
    def _make_ts_table(self, cn, seriename):
645
        table = self._table_definition_for(cn, seriename)
646
        table.create(cn)
647
648
649
        return table

    def _register_serie(self, cn, seriename, ts):
650
651
        index = ts.index
        inames = [name for name in index.names if name]
652
        sql = self.schema.registry.insert().values(
653
            seriename=seriename,
Aurélien Campéas's avatar
Aurélien Campéas committed
654
            table_name=self._make_tablename(cn, seriename),
655
656
657
658
            metadata={
                'tzaware': tzaware_serie(ts),
                'index_type': index.dtype.name,
                'index_names': inames,
659
660
                'index_dtype': index.dtype.str,
                'value_dtype': ts.dtypes.str,
661
                'value_type': ts.dtypes.name
662
            }
663
        )
664
665
        regid = cn.execute(sql).inserted_primary_key[0]
        self.registry_map[seriename] = regid
666

667
    def _get_ts_table(self, cn, seriename):
668
669
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename:
Aurélien Campéas's avatar
Aurélien Campéas committed
670
            return self._table_definition_for(cn, seriename)
671

672
673
    # changeset handling

674
    def _newchangeset(self, cn, author, insertion_date=None, metadata=None):
675
        table = self.schema.changeset
676
677
678
        if insertion_date is not None:
            assert insertion_date.tzinfo is not None
        idate = pd.Timestamp(insertion_date or datetime.utcnow(), tz='UTC')
679
680
        sql = table.insert().values(
            author=author,
681
            metadata=metadata,
682
            insertion_date=idate)
683
        return cn.execute(sql).inserted_primary_key[0]
684

685
    def _changeset_series(self, cn, csid):
686
        cset_serie = self.schema.changeset_series
687
688
        reg = self.schema.registry
        sql = select(
689
            [reg.c.seriename]
690
691
        ).where(cset_serie.c.cset == csid
        ).where(cset_serie.c.serie == reg.c.id)
692

693
        return [
694
            row.seriename
695
696
            for row in cn.execute(sql).fetchall()
        ]
697

698
699
700
701
702
703
704
    def _previous_cset(self, cn, seriename, csid):
        tablename = self._serie_to_tablename(cn, seriename)
        sql = ('select cset from "{}.timeserie"."{}" '
               'where cset < %(csid)s '
               'order by cset desc limit 1').format(self.namespace, tablename)
        return cn.execute(sql, csid=csid).scalar()

705
706
    # insertion handling

707
    def _validate(self, cn, ts, seriename):
708
709
        if ts.isnull().all():
            # ts erasure
710
            return
711
        tstype = ts.dtype
712
        meta = self.metadata(cn, seriename)
713
        if tstype != meta['value_type']:
714
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
715
                seriename, tstype, meta['value_type'])
716
            raise Exception(m)
717
        if ts.index.dtype.name != meta['index_type']:
718
            raise Exception('Incompatible index types')
719

720
721
    def _name_to_regid(self, cn, seriename):
        regid = self.registry_map.get(seriename)
722
723
724
725
        if regid is not None:
            return regid

        registry = self.schema.registry
726
        sql = select([registry.c.id]).where(registry.c.seriename == seriename)
727
        regid = self.registry_map[seriename] = cn.execute(sql).scalar()
728
729
        return regid

730
    def _finalize_insertion(self, cn, csid, seriename):
731
732
733
        table = self.schema.changeset_series
        sql = table.insert().values(
            cset=csid,
734
            serie=self._name_to_regid(cn, seriename)
735
736
        )
        cn.execute(sql)
737

738
    def _resetcaches(self):
739
740
741
742
743
744
        with self.cachelock:
            TABLES.clear()
            SNAPTABLES.clear()
            self.metadatacache.clear()
            self.registry_map.clear()
            self.serie_tablename.clear()