tsio.py 26.1 KB
Newer Older
1
from datetime import datetime
2
import logging
3
4
import hashlib
import uuid
5
from threading import Lock
6
7
8

import pandas as pd

9
10
from sqlalchemy import Table, Column, Integer, ForeignKey, Index
from sqlalchemy.sql.elements import NONE_NAME
11
from sqlalchemy.engine.base import Engine
Aurélien Campéas's avatar
Aurélien Campéas committed
12
from sqlalchemy.sql.expression import select, func, desc
Aurélien Campéas's avatar
Aurélien Campéas committed
13
from sqlalchemy.dialects.postgresql import TIMESTAMP
14

15
from tshistory.schema import tsschema
16
from tshistory.util import (
17
    closed_overlaps,
18
19
    num2float,
    SeriesServices,
20
    start_end,
21
22
    tzaware_serie
)
23
from tshistory.snapshot import Snapshot, TABLES as SNAPTABLES
24

25
L = logging.getLogger('tshistory.tsio')
26
TABLES = {}
27
28


29
class TimeSerie(SeriesServices):
30
    namespace = 'tsh'
31
    schema = None
32
    metadatacache = None
33
34
35
36
37
38
39
40
    metakeys = {
        'tzaware',
        'index_type',
        'index_names',
        'index_dtype',
        'value_dtype',
        'value_type'
    }
41
    registry_map = None
42
    serie_tablename = None
43
    create_lock_id = None
44
    cachelock = Lock()
45
46
47

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
48
49
        self.schema = tsschema(namespace)
        self.schema.define()
50
        self.metadatacache = {}
51
        self.registry_map = {}
52
        self.serie_tablename = {}
53
        self.create_lock_id = sum(ord(c) for c in namespace)
54

55
56
57
58
59
60
    def _check_tx(self, cn):
        # safety belt to make sure important api points are tx-safe
        if isinstance(cn, Engine) or not cn.in_transaction():
            if not getattr(self, '_testing', False):
                raise TypeError('You must use a transaction object')

61
    def insert(self, cn, newts, seriename, author,
62
63
               metadata=None,
               _insertion_date=None):
64
        """Create a new revision of a given time series
65

66
        newts: pandas.Series with date index
67
        seriename: str unique identifier of the serie
68
        author: str free-form author name
69
        metadata: optional dict for changeset metadata
70
        """
71
        self._check_tx(cn)
72
73
74
75
        assert isinstance(newts, pd.Series), 'Not a pd.Series'
        assert isinstance(seriename, str), 'Name not a string'
        assert isinstance(author, str), 'Author not a string'
        assert metadata is None or isinstance(metadata, dict), 'Bad format for metadata'
76
77
        assert (_insertion_date is None or
                isinstance(_insertion_date, datetime)), 'Bad format for insertion date'
78
        assert not newts.index.duplicated().any(), 'There are some duplicates in the index'
79

80
        assert newts.index.notna().all(), 'The index contains NaT entries'
81
82
        if not newts.index.is_monotonic_increasing:
            newts = newts.sort_index()
83

84
        newts = num2float(newts)
85

86
        if not len(newts):
87
            return
88

89
        assert ('<M8[ns]' == newts.index.dtype or
90
                'datetime' in str(newts.index.dtype) and not
91
92
                isinstance(newts.index, pd.MultiIndex))

93
94
        newts.name = seriename
        table = self._get_ts_table(cn, seriename)
95

96
        if table is None:
97
            return self._create(cn, newts, seriename, author,
98
                                metadata, _insertion_date)
99

100
        return self._update(cn, table, newts, seriename, author,
101
                            metadata, _insertion_date)
102

103
    def get(self, cn, seriename, revision_date=None,
104
105
            from_value_date=None, to_value_date=None,
            _keep_nans=False):
106
107
108
109
110
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

111
        """
Aurélien Campéas's avatar
Aurélien Campéas committed
112
        if not self.exists(cn, seriename):
113
            return
114

115
        csetfilter = []
116
        if revision_date:
117
            csetfilter.append(lambda cset: cset.c.insertion_date <= revision_date)
118
        snap = Snapshot(cn, self, seriename)
119
        _, current = snap.find(csetfilter=csetfilter,
120
121
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)
122

123
        if current is not None and not _keep_nans:
124
            current.name = seriename
125
            current = current[~current.isnull()]
126
        return current
127

128
    def metadata(self, cn, seriename):
129
        """Return metadata dict of timeserie."""
130
131
        if seriename in self.metadatacache:
            return self.metadatacache[seriename]
132
133
        reg = self.schema.registry
        sql = select([reg.c.metadata]).where(
134
            reg.c.seriename == seriename
135
136
        )
        meta = cn.execute(sql).scalar()
137
        self.metadatacache[seriename] = meta
138
139
        return meta

140
    def update_metadata(self, cn, seriename, metadata):
141
        self._check_tx(cn)
142
        assert isinstance(metadata, dict)
143
        assert not set(metadata.keys()) & self.metakeys
144
        meta = self.metadata(cn, seriename)
145
146
147
        # remove al but internal stuff
        newmeta = {key: meta[key] for key in self.metakeys}
        newmeta.update(metadata)
148
149
        reg = self.schema.registry
        sql = reg.update().where(
150
            reg.c.seriename == seriename
151
        ).values(metadata=newmeta)
152
        self.metadatacache.pop(seriename)
153
154
        cn.execute(sql)

155
156
157
158
159
160
161
    def changeset_metadata(self, cn, csid):
        sql = 'select metadata from "{ns}".changeset where id = {id}'.format(
            ns=self.namespace,
            id=csid
        )
        return cn.execute(sql).scalar()

162
163
164
    def type(self, cn, name):
        return 'primary'

165
    def get_history(self, cn, seriename,
166
                    from_insertion_date=None,
167
168
                    to_insertion_date=None,
                    from_value_date=None,
169
                    to_value_date=None,
170
                    deltabefore=None,
171
172
                    deltaafter=None,
                    diffmode=False):
173
        table = self._get_ts_table(cn, seriename)
174
175
176
        if table is None:
            return

177
        cset = self.schema.changeset
178
179
180
181
182
183
184
185
186
187
188
189
        revsql = select(
            [cset.c.id, cset.c.insertion_date]
        ).order_by(
            cset.c.id
        ).where(
            table.c.cset == cset.c.id
        )

        if from_insertion_date:
            revsql = revsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            revsql = revsql.where(cset.c.insertion_date <= to_insertion_date)
190

191
192
193
194
195
196
197
198
199
        if from_value_date or to_value_date:
            revsql = revsql.where(
                closed_overlaps(from_value_date, to_value_date)
            )

        revs = cn.execute(
            revsql,
            {'fromdate': from_value_date, 'todate': to_value_date}
        ).fetchall()
200
        if not revs:
201
            return {}
202

203
204
205
206
207
208
        if diffmode:
            # compute the previous serie value
            first_csid = revs[0][0]
            previous_csid = self._previous_cset(cn, seriename, first_csid)
            revs.insert(0, (previous_csid, None))

209
        snapshot = Snapshot(cn, self, seriename)
210
        series = []
211
212
        if (deltabefore, deltaafter) != (None, None):
            for csid, idate in revs:
213
214
                from_date = None
                to_date = None
215
                if deltabefore is not None:
216
                    from_date = idate - deltabefore
217
                if deltaafter is not None:
218
                    to_date = idate + deltaafter
219
220
221
                series.append((
                    idate,
                    snapshot.find(csetfilter=[lambda cset: cset.c.id == csid],
222
223
                                  from_value_date=from_date,
                                  to_value_date=to_date)[1]
224
225
226
227
228
229
                ))
        else:
            series = snapshot.findall(revs,
                                      from_value_date,
                                      to_value_date
            )
230

231
232
        if diffmode:
            diffs = []
Aurélien Campéas's avatar
Aurélien Campéas committed
233
            for (_revdate_a, serie_a), (revdate_b, serie_b) in zip(series, series[1:]):
234
235
236
237
238
239
240
                if serie_a is None:
                    # when we scan the entirety of the history: there exists no "previous" serie
                    # we therefore consider the first serie as a diff to the "null" serie
                    diffs.append((revdate_b, serie_b))
                else:
                    diffs.append((revdate_b, self.diff(serie_a, serie_b)))
            series = diffs
241
242
243
244
245
        else:
            series = [
                (idate, ts.dropna())
                 for idate, ts in series
            ]
246

247
        return {
248
            pd.Timestamp(idate).astimezone('UTC'): serie
249
250
            for idate, serie in series
        }
251

252
253
254
    def get_delta(self, cn, seriename, delta,
                  from_value_date=None,
                  to_value_date=None):
255
256
257
258
        """ compute a series whose value dates are bounded to be
        `delta` time after the insertion dates and where we
        keep the most recent ones
        """
259
        histo = self.get_history(
260
261
262
            cn, seriename, deltabefore=-delta,
            from_value_date=from_value_date,
            to_value_date=to_value_date
263
        )
264
265
        if histo is None:
            return None
266

267
268
269
270
271
272
273
274
        vimap = {}
        vvmap = {}
        for idate, series in histo.items():
            for vdate, value in series.iteritems():
                if vdate not in vimap or vimap[vdate] < idate:
                    vimap[vdate] = idate
                    vvmap[vdate] = value

275
        ts = pd.Series(vvmap).sort_index().loc[from_value_date:to_value_date]
276
277
        ts.name = seriename
        return ts
278

279
280
    def exists(self, cn, seriename):
        return self._get_ts_table(cn, seriename) is not None
281

282
    def latest_insertion_date(self, cn, seriename):
283
        cset = self.schema.changeset
284
        tstable = self._get_ts_table(cn, seriename)
285
286
287
288
289
290
291
292
        sql = select(
            [func.max(cset.c.insertion_date)]
        ).where(
            tstable.c.cset == cset.c.id
        )
        return pd.Timestamp(
            cn.execute(sql).scalar()
        ).astimezone('UTC')
293

294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
    def insertion_dates(self, cn, seriename):
        cset = self.schema.changeset
        tstable = self._get_ts_table(cn, seriename)
        sql = select(
            [cset.c.insertion_date]
        ).where(
            tstable.c.cset == cset.c.id
        ).order_by(
            cset.c.id
        )
        return [
            pd.Timestamp(idate).astimezone('UTC')
            for idate, in cn.execute(sql).fetchall()
        ]

309
310
311
312
    def last_id(self, cn, seriename):
        snapshot = Snapshot(cn, self, seriename)
        return snapshot.last_id()

313
314
315
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
        assert mode in ('strict', 'before', 'after')
        cset = self.schema.changeset
316
        table = self._table_definition_for(cn, seriename)
317
318
        sql = select([table.c.cset]).where(
            table.c.cset == cset.c.id
319
320
321
322
323
324
325
326
327
        )
        if mode == 'strict':
            sql = sql.where(cset.c.insertion_date == revdate)
        elif mode == 'before':
            sql = sql.where(cset.c.insertion_date <= revdate)
        else:
            sql = sql.where(cset.c.insertion_date >= revdate)
        return cn.execute(sql).scalar()

328
    def delete(self, cn, seriename):
329
        self._check_tx(cn)
330
        assert not isinstance(cn, Engine), 'use a transaction object'
331
332
333
        if not self.exists(cn, seriename):
            print('not deleting unknown series', seriename, self.namespace)
            return
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
        # changeset will keep ghost entries
        # we cleanup changeset series, then registry
        # then we drop the two remaining tables
        # cn *must* be a transaction scope
        rid, tablename = cn.execute(
            'select id, table_name from "{}".registry '
            'where seriename = %(seriename)s'.format(self.namespace),
            seriename=seriename
        ).fetchone()
        # drop series tables
        cn.execute(
            'drop table "{}.timeserie"."{}" cascade'.format(self.namespace, tablename)
        )
        cn.execute(
            'drop table "{}.snapshot"."{}" cascade'.format(self.namespace, tablename)
        )
        # cleanup changesets table
        cn.execute('with csets as ('
                   ' select cset from "{ns}".changeset_series '
                   ' where serie = %(rid)s'
                   ') '
                   'delete from "{ns}".changeset as cset using csets '
                   'where cset.id = csets.cset'.format(ns=self.namespace),
                   rid=rid
        )
        cn.execute('delete from "{}".registry '
                   'where id = %(rid)s'.format(self.namespace),
                   rid=rid)
        # -> this will transitively cleanup state changeset_series entries
        self._resetcaches()
364
        print('deleted', seriename, self.namespace)
365

366
    def strip(self, cn, seriename, csid):
367
        self._check_tx(cn)
368
369
370
371
372
373
374
375
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs

        # put stripping info in the metadata
        cset = self.schema.changeset
        cset_serie = self.schema.changeset_series
        for log in logs:
            # update changeset.metadata
376
            metadata = self.changeset_metadata(cn, log['rev']) or {}
377
378
379
380
381
382
            metadata['tshistory.info'] = 'got stripped from {}'.format(csid)
            sql = cset.update().where(cset.c.id == log['rev']
            ).values(metadata=metadata)
            cn.execute(sql)
            # delete changset_serie item
            sql = cset_serie.delete().where(
383
                cset_serie.c.cset == log['rev']
384
            ).where(
385
                cset_serie.c.serie == self._name_to_regid(cn, seriename)
386
387
388
389
            )
            cn.execute(sql)

        # wipe the diffs
390
        table = self._table_definition_for(cn, seriename)
391
        cn.execute(table.delete().where(table.c.cset >= csid))
392

393
    def info(self, cn):
394
395
        """Gather global statistics on the current tshistory repository
        """
396
        sql = 'select count(*) from "{}".registry'.format(self.namespace)
397
        stats = {'series count': cn.execute(sql).scalar()}
398
        sql = 'select max(id) from "{}".changeset'.format(self.namespace)
399
        stats['changeset count'] = cn.execute(sql).scalar()
400
401
402
        sql = 'select distinct seriename from "{}".registry order by seriename'.format(
            self.namespace
        )
403
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
404
405
        return stats

406
    def log(self, cn, limit=0, names=None, authors=None,
407
            stripped=False,
408
409
            fromrev=None, torev=None,
            fromdate=None, todate=None):
410
411
412
413
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
414
415
416
417
418
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
419

420
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date, cset.c.metadata]
421
        ).distinct().order_by(desc(cset.c.id))
422
423
424

        if limit:
            sql = sql.limit(limit)
425
        if names:
426
            sql = sql.where(reg.c.seriename.in_(names))
427
428
        if authors:
            sql = sql.where(cset.c.author.in_(authors))
429
430
431
432
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)
        if torev:
            sql = sql.where(cset.c.id <= torev)
433
434
435
436
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)
        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)
437
438
439
440
        if stripped:
            # outerjoin to show dead things
            sql = sql.select_from(cset.outerjoin(cset_series))
        else:
441
            sql = sql.where(cset.c.id == cset_series.c.cset
442
            ).where(cset_series.c.serie == reg.c.id)
443

444
        rset = cn.execute(sql)
445
        for csetid, author, revdate, meta in rset.fetchall():
446
447
            log.append({'rev': csetid, 'author': author,
                        'date': pd.Timestamp(revdate, tz='utc'),
448
                        'meta': meta or {},
449
                        'names': self._changeset_series(cn, csetid)})
450

451
        log.sort(key=lambda rev: rev['rev'])
452
453
        return log

454
    def interval(self, cn, seriename, notz=False):
455
456
457
458
459
460
461
462
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            raise ValueError(f'no such serie: {seriename}')
        sql = (f'select start, "end" '
               f'from "{self.namespace}.timeserie"."{tablename}" '
               f'order by cset desc limit 1')
        res = cn.execute(sql).fetchone()
        start, end = res.start, res.end
463
        if self.metadata(cn, seriename).get('tzaware') and not notz:
464
465
466
            start, end = pd.Timestamp(start, tz='UTC'), pd.Timestamp(end, tz='UTC')
        return pd.Interval(left=start, right=end, closed='both')

467
468
    # /API
    # Helpers
469

470
471
    # creation / update

472
    def _create(self, cn, newts, seriename, author,
473
                metadata=None, insertion_date=None):
474
        start, end = start_end(newts, notz=False)
475
476
477
478
        if start is None:
            assert end is None
            # this is just full of nans
            return None
479
480
481
482
483
        # chop off unwanted nans
        newts = newts.loc[start:end]
        if len(newts) == 0:
            return None

484
485
486
487
488
        # at creation time we take an exclusive lock to avoid
        # a deadlock on created tables against the changeset-series fk
        cn.execute(
            'select pg_advisory_xact_lock({})'.format(self.create_lock_id)
        )
489
        self._register_serie(cn, seriename, newts)
490
        snapshot = Snapshot(cn, self, seriename)
491
        csid = self._newchangeset(cn, author, insertion_date, metadata)
492
        head = snapshot.create(newts)
493
        start, end = start_end(newts)
494
495
        value = {
            'cset': csid,
496
497
498
            'snapshot': head,
            'start': start,
            'end': end
499
        }
Aurélien Campéas's avatar
Aurélien Campéas committed
500
        table = self._make_ts_table(cn, seriename)
501
        cn.execute(table.insert().values(value))
502
        self._finalize_insertion(cn, csid, seriename)
503
        L.info('first insertion of %s (size=%s) by %s',
504
               seriename, len(newts), author)
505
506
        return newts

507
    def _update(self, cn, table, newts, seriename, author,
508
                metadata=None, insertion_date=None):
509
510
        self._validate(cn, newts, seriename)
        snapshot = Snapshot(cn, self, seriename)
511
512
513
        diff = self.diff(snapshot.last(newts.index.min(),
                                       newts.index.max()),
                         newts)
514
515
        if not len(diff):
            L.info('no difference in %s by %s (for ts of size %s)',
516
                   seriename, author, len(newts))
517
518
            return

519
        # compute series start/end stamps
520
        tsstart, tsend = start_end(newts)
521
522
523
        ival = self.interval(cn, seriename, notz=True)
        start = min(tsstart or ival.left, ival.left)
        end = max(tsend or ival.right, ival.right)
524
525
526
527
528
529
530
531
532
533
534
535
536
537

        if pd.isnull(diff[0]) or pd.isnull(diff[-1]):
            # we *might* be shrinking, let's look at the full series
            # and yes, shrinkers have a slow path
            last = snapshot.last()
            patched = self.patch(last, diff).dropna()
            if not len(patched):
                raise ValueError('complete erasure of a series is forbidden')
            if pd.isnull(diff[0]):
                start = patched.index[0]
            if pd.isnull(diff[-1]):
                end = patched.index[-1]

        csid = self._newchangeset(cn, author, insertion_date, metadata)
538
539
540
        head = snapshot.update(diff)
        value = {
            'cset': csid,
541
542
543
            'snapshot': head,
            'start': start,
            'end': end
544
545
        }
        cn.execute(table.insert().values(value))
546
        self._finalize_insertion(cn, csid, seriename)
547
548

        L.info('inserted diff (size=%s) for ts %s by %s',
549
               len(diff), seriename, author)
550
551
        return diff

552
    # serie table handling
553

554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
    def _make_tablename(self, cn, seriename):
        """ compute the unqualified (no namespace) table name
        from a serie name, to allow arbitrary serie names
        """
        # postgresql table names are limited to 63 chars.
        if len(seriename) > 63:
            seriename = hashlib.sha1(seriename.encode('utf-8')).hexdigest()

        # collision detection (collision can happen after a rename)
        if cn.execute(f'select table_name '
                      f'from "{self.namespace}".registry '
                      f'where table_name = %(seriename)s',
                      seriename=seriename).scalar():
            return str(uuid.uuid4())

        return seriename

571
572
573
574
    def _serie_to_tablename(self, cn, seriename):
        tablename = self.serie_tablename.get(seriename)
        if tablename is not None:
            return tablename
575

576
577
578
579
580
581
582
583
584
585
586
587
588
        reg = self.schema.registry
        sql = select([reg.c.table_name]).where(reg.c.seriename == seriename)
        tablename = cn.execute(sql).scalar()
        if tablename is None:
            # creation time
            return
        self.serie_tablename[seriename] = tablename
        return tablename

    def _table_definition_for(self, cn, seriename):
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            # creation time
Aurélien Campéas's avatar
Aurélien Campéas committed
589
            tablename = self._make_tablename(cn, seriename)
590
591
        fq_tablename = '{}.timeserie.{}'.format(self.namespace, tablename)
        table = TABLES.get(fq_tablename)
592
        if table is None:
593
594
            TABLES[fq_tablename] = table = Table(
                tablename, self.schema.meta,
595
596
597
                Column('id', Integer, primary_key=True),
                Column('cset', Integer,
                       ForeignKey('{}.changeset.id'.format(self.namespace)),
598
                       nullable=False),
599
600
                Column('start', TIMESTAMP, nullable=False),
                Column('end', TIMESTAMP, nullable=False),
601
602
603
                Column('snapshot', Integer,
                       ForeignKey('{}.snapshot.{}.id'.format(
                           self.namespace,
604
605
606
                           tablename))),
                Index(NONE_NAME, 'cset'),
                Index(NONE_NAME, 'snapshot'),
607
608
                Index(NONE_NAME, 'start'),
                Index(NONE_NAME, 'end'),
609
                schema='{}.timeserie'.format(self.namespace),
610
                keep_existing=True
611
612
            )
        return table
613

Aurélien Campéas's avatar
Aurélien Campéas committed
614
    def _make_ts_table(self, cn, seriename):
615
        table = self._table_definition_for(cn, seriename)
616
        table.create(cn)
617
618
619
        return table

    def _register_serie(self, cn, seriename, ts):
620
621
        index = ts.index
        inames = [name for name in index.names if name]
622
        sql = self.schema.registry.insert().values(
623
            seriename=seriename,
Aurélien Campéas's avatar
Aurélien Campéas committed
624
            table_name=self._make_tablename(cn, seriename),
625
626
627
628
            metadata={
                'tzaware': tzaware_serie(ts),
                'index_type': index.dtype.name,
                'index_names': inames,
629
630
                'index_dtype': index.dtype.str,
                'value_dtype': ts.dtypes.str,
631
                'value_type': ts.dtypes.name
632
            }
633
        )
634
635
        regid = cn.execute(sql).inserted_primary_key[0]
        self.registry_map[seriename] = regid
636

637
    def _get_ts_table(self, cn, seriename):
638
639
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename:
Aurélien Campéas's avatar
Aurélien Campéas committed
640
            return self._table_definition_for(cn, seriename)
641

642
643
    # changeset handling

644
    def _newchangeset(self, cn, author, insertion_date=None, metadata=None):
645
        table = self.schema.changeset
646
647
648
        if insertion_date is not None:
            assert insertion_date.tzinfo is not None
        idate = pd.Timestamp(insertion_date or datetime.utcnow(), tz='UTC')
649
650
        sql = table.insert().values(
            author=author,
651
            metadata=metadata,
652
            insertion_date=idate)
653
        return cn.execute(sql).inserted_primary_key[0]
654

655
    def _changeset_series(self, cn, csid):
656
        cset_serie = self.schema.changeset_series
657
658
        reg = self.schema.registry
        sql = select(
659
            [reg.c.seriename]
660
661
        ).where(cset_serie.c.cset == csid
        ).where(cset_serie.c.serie == reg.c.id)
662

663
        return [
664
            row.seriename
665
666
            for row in cn.execute(sql).fetchall()
        ]
667

668
669
670
671
672
673
674
    def _previous_cset(self, cn, seriename, csid):
        tablename = self._serie_to_tablename(cn, seriename)
        sql = ('select cset from "{}.timeserie"."{}" '
               'where cset < %(csid)s '
               'order by cset desc limit 1').format(self.namespace, tablename)
        return cn.execute(sql, csid=csid).scalar()

675
676
    # insertion handling

677
    def _validate(self, cn, ts, seriename):
678
679
        if ts.isnull().all():
            # ts erasure
680
            return
681
        tstype = ts.dtype
682
        meta = self.metadata(cn, seriename)
683
        if tstype != meta['value_type']:
684
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
685
                seriename, tstype, meta['value_type'])
686
            raise Exception(m)
687
        if ts.index.dtype.name != meta['index_type']:
688
            raise Exception('Incompatible index types')
689

690
691
    def _name_to_regid(self, cn, seriename):
        regid = self.registry_map.get(seriename)
692
693
694
695
        if regid is not None:
            return regid

        registry = self.schema.registry
696
        sql = select([registry.c.id]).where(registry.c.seriename == seriename)
697
        regid = self.registry_map[seriename] = cn.execute(sql).scalar()
698
699
        return regid

700
    def _finalize_insertion(self, cn, csid, seriename):
701
702
703
        table = self.schema.changeset_series
        sql = table.insert().values(
            cset=csid,
704
            serie=self._name_to_regid(cn, seriename)
705
706
        )
        cn.execute(sql)
707

708
    def _resetcaches(self):
709
710
711
712
713
714
        with self.cachelock:
            TABLES.clear()
            SNAPTABLES.clear()
            self.metadatacache.clear()
            self.registry_map.clear()
            self.serie_tablename.clear()