tsio.py 26 KB
Newer Older
1
from datetime import datetime
2
import logging
3
4
import hashlib
import uuid
5
6
7

import pandas as pd

8
9
from sqlalchemy import Table, Column, Integer, ForeignKey, Index
from sqlalchemy.sql.elements import NONE_NAME
10
from sqlalchemy.engine.base import Engine
Aurélien Campéas's avatar
Aurélien Campéas committed
11
from sqlalchemy.sql.expression import select, func, desc
Aurélien Campéas's avatar
Aurélien Campéas committed
12
from sqlalchemy.dialects.postgresql import TIMESTAMP
13

14
from tshistory.schema import tsschema
15
from tshistory.util import (
16
    closed_overlaps,
17
18
    num2float,
    SeriesServices,
19
    start_end,
20
21
    tzaware_serie
)
22
from tshistory.snapshot import Snapshot, TABLES as SNAPTABLES
23

24
L = logging.getLogger('tshistory.tsio')
25
TABLES = {}
26
27


28
class TimeSerie(SeriesServices):
29
    namespace = 'tsh'
30
    schema = None
31
    metadatacache = None
32
33
34
35
36
37
38
39
    metakeys = {
        'tzaware',
        'index_type',
        'index_names',
        'index_dtype',
        'value_dtype',
        'value_type'
    }
40
    registry_map = None
41
    serie_tablename = None
42
    create_lock_id = None
43
44
45

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
46
47
        self.schema = tsschema(namespace)
        self.schema.define()
48
        self.metadatacache = {}
49
        self.registry_map = {}
50
        self.serie_tablename = {}
51
        self.create_lock_id = sum(ord(c) for c in namespace)
52

53
54
55
56
57
58
    def _check_tx(self, cn):
        # safety belt to make sure important api points are tx-safe
        if isinstance(cn, Engine) or not cn.in_transaction():
            if not getattr(self, '_testing', False):
                raise TypeError('You must use a transaction object')

59
    def insert(self, cn, newts, seriename, author,
60
61
               metadata=None,
               _insertion_date=None):
62
        """Create a new revision of a given time series
63

64
        newts: pandas.Series with date index
65
        seriename: str unique identifier of the serie
66
        author: str free-form author name
67
        metadata: optional dict for changeset metadata
68
        """
69
        self._check_tx(cn)
70
71
72
73
        assert isinstance(newts, pd.Series), 'Not a pd.Series'
        assert isinstance(seriename, str), 'Name not a string'
        assert isinstance(author, str), 'Author not a string'
        assert metadata is None or isinstance(metadata, dict), 'Bad format for metadata'
74
75
        assert (_insertion_date is None or
                isinstance(_insertion_date, datetime)), 'Bad format for insertion date'
76
        assert not newts.index.duplicated().any(), 'There are some duplicates in the index'
77

78
        assert newts.index.notna().all(), 'The index contains NaT entries'
79
80
        if not newts.index.is_monotonic_increasing:
            newts = newts.sort_index()
81

82
        newts = num2float(newts)
83

84
        if not len(newts):
85
            return
86

87
        assert ('<M8[ns]' == newts.index.dtype or
88
                'datetime' in str(newts.index.dtype) and not
89
90
                isinstance(newts.index, pd.MultiIndex))

91
92
        newts.name = seriename
        table = self._get_ts_table(cn, seriename)
93

94
        if table is None:
95
            return self._create(cn, newts, seriename, author,
96
                                metadata, _insertion_date)
97

98
        return self._update(cn, table, newts, seriename, author,
99
                            metadata, _insertion_date)
100

101
    def get(self, cn, seriename, revision_date=None,
102
103
            from_value_date=None, to_value_date=None,
            _keep_nans=False):
104
105
106
107
108
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

109
        """
Aurélien Campéas's avatar
Aurélien Campéas committed
110
        if not self.exists(cn, seriename):
111
            return
112

113
        csetfilter = []
114
        if revision_date:
115
            csetfilter.append(lambda cset: cset.c.insertion_date <= revision_date)
116
        snap = Snapshot(cn, self, seriename)
117
        _, current = snap.find(csetfilter=csetfilter,
118
119
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)
120

121
        if current is not None and not _keep_nans:
122
            current.name = seriename
123
            current = current[~current.isnull()]
124
        return current
125

126
    def metadata(self, cn, seriename):
127
        """Return metadata dict of timeserie."""
128
129
        if seriename in self.metadatacache:
            return self.metadatacache[seriename]
130
131
        reg = self.schema.registry
        sql = select([reg.c.metadata]).where(
132
            reg.c.seriename == seriename
133
134
        )
        meta = cn.execute(sql).scalar()
135
        self.metadatacache[seriename] = meta
136
137
        return meta

138
    def update_metadata(self, cn, seriename, metadata):
139
        self._check_tx(cn)
140
        assert isinstance(metadata, dict)
141
        assert not set(metadata.keys()) & self.metakeys
142
        meta = self.metadata(cn, seriename)
143
144
145
        # remove al but internal stuff
        newmeta = {key: meta[key] for key in self.metakeys}
        newmeta.update(metadata)
146
147
        reg = self.schema.registry
        sql = reg.update().where(
148
            reg.c.seriename == seriename
149
        ).values(metadata=newmeta)
150
        self.metadatacache.pop(seriename)
151
152
        cn.execute(sql)

153
154
155
156
157
158
159
    def changeset_metadata(self, cn, csid):
        sql = 'select metadata from "{ns}".changeset where id = {id}'.format(
            ns=self.namespace,
            id=csid
        )
        return cn.execute(sql).scalar()

160
161
162
    def type(self, cn, name):
        return 'primary'

163
    def get_history(self, cn, seriename,
164
                    from_insertion_date=None,
165
166
                    to_insertion_date=None,
                    from_value_date=None,
167
                    to_value_date=None,
168
                    deltabefore=None,
169
170
                    deltaafter=None,
                    diffmode=False):
171
        table = self._get_ts_table(cn, seriename)
172
173
174
        if table is None:
            return

175
        cset = self.schema.changeset
176
177
178
179
180
181
182
183
184
185
186
187
        revsql = select(
            [cset.c.id, cset.c.insertion_date]
        ).order_by(
            cset.c.id
        ).where(
            table.c.cset == cset.c.id
        )

        if from_insertion_date:
            revsql = revsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            revsql = revsql.where(cset.c.insertion_date <= to_insertion_date)
188

189
190
191
192
193
194
195
196
197
        if from_value_date or to_value_date:
            revsql = revsql.where(
                closed_overlaps(from_value_date, to_value_date)
            )

        revs = cn.execute(
            revsql,
            {'fromdate': from_value_date, 'todate': to_value_date}
        ).fetchall()
198
        if not revs:
199
            return {}
200

201
202
203
204
205
206
        if diffmode:
            # compute the previous serie value
            first_csid = revs[0][0]
            previous_csid = self._previous_cset(cn, seriename, first_csid)
            revs.insert(0, (previous_csid, None))

207
        snapshot = Snapshot(cn, self, seriename)
208
        series = []
209
210
        if (deltabefore, deltaafter) != (None, None):
            for csid, idate in revs:
211
212
                from_date = None
                to_date = None
213
                if deltabefore is not None:
214
                    from_date = idate - deltabefore
215
                if deltaafter is not None:
216
                    to_date = idate + deltaafter
217
218
219
                series.append((
                    idate,
                    snapshot.find(csetfilter=[lambda cset: cset.c.id == csid],
220
221
                                  from_value_date=from_date,
                                  to_value_date=to_date)[1]
222
223
224
225
226
227
                ))
        else:
            series = snapshot.findall(revs,
                                      from_value_date,
                                      to_value_date
            )
228

229
230
        if diffmode:
            diffs = []
Aurélien Campéas's avatar
Aurélien Campéas committed
231
            for (_revdate_a, serie_a), (revdate_b, serie_b) in zip(series, series[1:]):
232
233
234
235
236
237
238
                if serie_a is None:
                    # when we scan the entirety of the history: there exists no "previous" serie
                    # we therefore consider the first serie as a diff to the "null" serie
                    diffs.append((revdate_b, serie_b))
                else:
                    diffs.append((revdate_b, self.diff(serie_a, serie_b)))
            series = diffs
239
240
241
242
243
        else:
            series = [
                (idate, ts.dropna())
                 for idate, ts in series
            ]
244

245
        return {
246
            pd.Timestamp(idate).astimezone('UTC'): serie
247
248
            for idate, serie in series
        }
249

250
251
252
    def get_delta(self, cn, seriename, delta,
                  from_value_date=None,
                  to_value_date=None):
253
254
255
256
        """ compute a series whose value dates are bounded to be
        `delta` time after the insertion dates and where we
        keep the most recent ones
        """
257
        histo = self.get_history(
258
259
260
            cn, seriename, deltabefore=-delta,
            from_value_date=from_value_date,
            to_value_date=to_value_date
261
        )
262
263
        if histo is None:
            return None
264

265
266
267
268
269
270
271
272
        vimap = {}
        vvmap = {}
        for idate, series in histo.items():
            for vdate, value in series.iteritems():
                if vdate not in vimap or vimap[vdate] < idate:
                    vimap[vdate] = idate
                    vvmap[vdate] = value

273
        ts = pd.Series(vvmap).sort_index().loc[from_value_date:to_value_date]
274
275
        ts.name = seriename
        return ts
276

277
278
    def exists(self, cn, seriename):
        return self._get_ts_table(cn, seriename) is not None
279

280
    def latest_insertion_date(self, cn, seriename):
281
        cset = self.schema.changeset
282
        tstable = self._get_ts_table(cn, seriename)
283
284
285
286
287
288
289
290
        sql = select(
            [func.max(cset.c.insertion_date)]
        ).where(
            tstable.c.cset == cset.c.id
        )
        return pd.Timestamp(
            cn.execute(sql).scalar()
        ).astimezone('UTC')
291

292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
    def insertion_dates(self, cn, seriename):
        cset = self.schema.changeset
        tstable = self._get_ts_table(cn, seriename)
        sql = select(
            [cset.c.insertion_date]
        ).where(
            tstable.c.cset == cset.c.id
        ).order_by(
            cset.c.id
        )
        return [
            pd.Timestamp(idate).astimezone('UTC')
            for idate, in cn.execute(sql).fetchall()
        ]

307
308
309
310
    def last_id(self, cn, seriename):
        snapshot = Snapshot(cn, self, seriename)
        return snapshot.last_id()

311
312
313
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
        assert mode in ('strict', 'before', 'after')
        cset = self.schema.changeset
314
        table = self._table_definition_for(cn, seriename)
315
316
        sql = select([table.c.cset]).where(
            table.c.cset == cset.c.id
317
318
319
320
321
322
323
324
325
        )
        if mode == 'strict':
            sql = sql.where(cset.c.insertion_date == revdate)
        elif mode == 'before':
            sql = sql.where(cset.c.insertion_date <= revdate)
        else:
            sql = sql.where(cset.c.insertion_date >= revdate)
        return cn.execute(sql).scalar()

326
    def delete(self, cn, seriename):
327
        self._check_tx(cn)
328
        assert not isinstance(cn, Engine), 'use a transaction object'
329
330
331
        if not self.exists(cn, seriename):
            print('not deleting unknown series', seriename, self.namespace)
            return
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
        # changeset will keep ghost entries
        # we cleanup changeset series, then registry
        # then we drop the two remaining tables
        # cn *must* be a transaction scope
        rid, tablename = cn.execute(
            'select id, table_name from "{}".registry '
            'where seriename = %(seriename)s'.format(self.namespace),
            seriename=seriename
        ).fetchone()
        # drop series tables
        cn.execute(
            'drop table "{}.timeserie"."{}" cascade'.format(self.namespace, tablename)
        )
        cn.execute(
            'drop table "{}.snapshot"."{}" cascade'.format(self.namespace, tablename)
        )
        # cleanup changesets table
        cn.execute('with csets as ('
                   ' select cset from "{ns}".changeset_series '
                   ' where serie = %(rid)s'
                   ') '
                   'delete from "{ns}".changeset as cset using csets '
                   'where cset.id = csets.cset'.format(ns=self.namespace),
                   rid=rid
        )
        cn.execute('delete from "{}".registry '
                   'where id = %(rid)s'.format(self.namespace),
                   rid=rid)
        # -> this will transitively cleanup state changeset_series entries
        self._resetcaches()
362
        print('deleted', seriename, self.namespace)
363

364
    def strip(self, cn, seriename, csid):
365
        self._check_tx(cn)
366
367
368
369
370
371
372
373
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs

        # put stripping info in the metadata
        cset = self.schema.changeset
        cset_serie = self.schema.changeset_series
        for log in logs:
            # update changeset.metadata
374
            metadata = self.changeset_metadata(cn, log['rev']) or {}
375
376
377
378
379
380
            metadata['tshistory.info'] = 'got stripped from {}'.format(csid)
            sql = cset.update().where(cset.c.id == log['rev']
            ).values(metadata=metadata)
            cn.execute(sql)
            # delete changset_serie item
            sql = cset_serie.delete().where(
381
                cset_serie.c.cset == log['rev']
382
            ).where(
383
                cset_serie.c.serie == self._name_to_regid(cn, seriename)
384
385
386
387
            )
            cn.execute(sql)

        # wipe the diffs
388
        table = self._table_definition_for(cn, seriename)
389
        cn.execute(table.delete().where(table.c.cset >= csid))
390

391
    def info(self, cn):
392
393
        """Gather global statistics on the current tshistory repository
        """
394
        sql = 'select count(*) from "{}".registry'.format(self.namespace)
395
        stats = {'series count': cn.execute(sql).scalar()}
396
        sql = 'select max(id) from "{}".changeset'.format(self.namespace)
397
        stats['changeset count'] = cn.execute(sql).scalar()
398
399
400
        sql = 'select distinct seriename from "{}".registry order by seriename'.format(
            self.namespace
        )
401
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
402
403
        return stats

404
    def log(self, cn, limit=0, names=None, authors=None,
405
            stripped=False,
406
407
            fromrev=None, torev=None,
            fromdate=None, todate=None):
408
409
410
411
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
412
413
414
415
416
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
417

418
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date, cset.c.metadata]
419
        ).distinct().order_by(desc(cset.c.id))
420
421
422

        if limit:
            sql = sql.limit(limit)
423
        if names:
424
            sql = sql.where(reg.c.seriename.in_(names))
425
426
        if authors:
            sql = sql.where(cset.c.author.in_(authors))
427
428
429
430
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)
        if torev:
            sql = sql.where(cset.c.id <= torev)
431
432
433
434
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)
        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)
435
436
437
438
        if stripped:
            # outerjoin to show dead things
            sql = sql.select_from(cset.outerjoin(cset_series))
        else:
439
            sql = sql.where(cset.c.id == cset_series.c.cset
440
            ).where(cset_series.c.serie == reg.c.id)
441

442
        rset = cn.execute(sql)
443
        for csetid, author, revdate, meta in rset.fetchall():
444
445
            log.append({'rev': csetid, 'author': author,
                        'date': pd.Timestamp(revdate, tz='utc'),
446
                        'meta': meta or {},
447
                        'names': self._changeset_series(cn, csetid)})
448

449
        log.sort(key=lambda rev: rev['rev'])
450
451
        return log

452
    def interval(self, cn, seriename, notz=False):
453
454
455
456
457
458
459
460
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            raise ValueError(f'no such serie: {seriename}')
        sql = (f'select start, "end" '
               f'from "{self.namespace}.timeserie"."{tablename}" '
               f'order by cset desc limit 1')
        res = cn.execute(sql).fetchone()
        start, end = res.start, res.end
461
        if self.metadata(cn, seriename).get('tzaware') and not notz:
462
463
464
            start, end = pd.Timestamp(start, tz='UTC'), pd.Timestamp(end, tz='UTC')
        return pd.Interval(left=start, right=end, closed='both')

465
466
    # /API
    # Helpers
467

468
469
    # creation / update

470
    def _create(self, cn, newts, seriename, author,
471
                metadata=None, insertion_date=None):
472
        start, end = start_end(newts, notz=False)
473
474
475
476
        if start is None:
            assert end is None
            # this is just full of nans
            return None
477
478
479
480
481
        # chop off unwanted nans
        newts = newts.loc[start:end]
        if len(newts) == 0:
            return None

482
483
484
485
486
        # at creation time we take an exclusive lock to avoid
        # a deadlock on created tables against the changeset-series fk
        cn.execute(
            'select pg_advisory_xact_lock({})'.format(self.create_lock_id)
        )
487
        self._register_serie(cn, seriename, newts)
488
        snapshot = Snapshot(cn, self, seriename)
489
        csid = self._newchangeset(cn, author, insertion_date, metadata)
490
        head = snapshot.create(newts)
491
        start, end = start_end(newts)
492
493
        value = {
            'cset': csid,
494
495
496
            'snapshot': head,
            'start': start,
            'end': end
497
        }
Aurélien Campéas's avatar
Aurélien Campéas committed
498
        table = self._make_ts_table(cn, seriename)
499
        cn.execute(table.insert().values(value))
500
        self._finalize_insertion(cn, csid, seriename)
501
        L.info('first insertion of %s (size=%s) by %s',
502
               seriename, len(newts), author)
503
504
        return newts

505
    def _update(self, cn, table, newts, seriename, author,
506
                metadata=None, insertion_date=None):
507
508
        self._validate(cn, newts, seriename)
        snapshot = Snapshot(cn, self, seriename)
509
510
511
        diff = self.diff(snapshot.last(newts.index.min(),
                                       newts.index.max()),
                         newts)
512
513
        if not len(diff):
            L.info('no difference in %s by %s (for ts of size %s)',
514
                   seriename, author, len(newts))
515
516
            return

517
        # compute series start/end stamps
518
        tsstart, tsend = start_end(newts)
519
520
521
        ival = self.interval(cn, seriename, notz=True)
        start = min(tsstart or ival.left, ival.left)
        end = max(tsend or ival.right, ival.right)
522
523
524
525
526
527
528
529
530
531
532
533
534
535

        if pd.isnull(diff[0]) or pd.isnull(diff[-1]):
            # we *might* be shrinking, let's look at the full series
            # and yes, shrinkers have a slow path
            last = snapshot.last()
            patched = self.patch(last, diff).dropna()
            if not len(patched):
                raise ValueError('complete erasure of a series is forbidden')
            if pd.isnull(diff[0]):
                start = patched.index[0]
            if pd.isnull(diff[-1]):
                end = patched.index[-1]

        csid = self._newchangeset(cn, author, insertion_date, metadata)
536
537
538
        head = snapshot.update(diff)
        value = {
            'cset': csid,
539
540
541
            'snapshot': head,
            'start': start,
            'end': end
542
543
        }
        cn.execute(table.insert().values(value))
544
        self._finalize_insertion(cn, csid, seriename)
545
546

        L.info('inserted diff (size=%s) for ts %s by %s',
547
               len(diff), seriename, author)
548
549
        return diff

550
    # serie table handling
551

552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
    def _make_tablename(self, cn, seriename):
        """ compute the unqualified (no namespace) table name
        from a serie name, to allow arbitrary serie names
        """
        # postgresql table names are limited to 63 chars.
        if len(seriename) > 63:
            seriename = hashlib.sha1(seriename.encode('utf-8')).hexdigest()

        # collision detection (collision can happen after a rename)
        if cn.execute(f'select table_name '
                      f'from "{self.namespace}".registry '
                      f'where table_name = %(seriename)s',
                      seriename=seriename).scalar():
            return str(uuid.uuid4())

        return seriename

569
570
571
572
    def _serie_to_tablename(self, cn, seriename):
        tablename = self.serie_tablename.get(seriename)
        if tablename is not None:
            return tablename
573

574
575
576
577
578
579
580
581
582
583
584
585
586
        reg = self.schema.registry
        sql = select([reg.c.table_name]).where(reg.c.seriename == seriename)
        tablename = cn.execute(sql).scalar()
        if tablename is None:
            # creation time
            return
        self.serie_tablename[seriename] = tablename
        return tablename

    def _table_definition_for(self, cn, seriename):
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            # creation time
Aurélien Campéas's avatar
Aurélien Campéas committed
587
            tablename = self._make_tablename(cn, seriename)
588
589
        fq_tablename = '{}.timeserie.{}'.format(self.namespace, tablename)
        table = TABLES.get(fq_tablename)
590
        if table is None:
591
592
            TABLES[fq_tablename] = table = Table(
                tablename, self.schema.meta,
593
594
595
                Column('id', Integer, primary_key=True),
                Column('cset', Integer,
                       ForeignKey('{}.changeset.id'.format(self.namespace)),
596
                       nullable=False),
597
598
                Column('start', TIMESTAMP, nullable=False),
                Column('end', TIMESTAMP, nullable=False),
599
600
601
                Column('snapshot', Integer,
                       ForeignKey('{}.snapshot.{}.id'.format(
                           self.namespace,
602
603
604
                           tablename))),
                Index(NONE_NAME, 'cset'),
                Index(NONE_NAME, 'snapshot'),
605
606
                Index(NONE_NAME, 'start'),
                Index(NONE_NAME, 'end'),
607
                schema='{}.timeserie'.format(self.namespace),
608
                keep_existing=True
609
610
            )
        return table
611

Aurélien Campéas's avatar
Aurélien Campéas committed
612
    def _make_ts_table(self, cn, seriename):
613
        table = self._table_definition_for(cn, seriename)
614
        table.create(cn)
615
616
617
        return table

    def _register_serie(self, cn, seriename, ts):
618
619
        index = ts.index
        inames = [name for name in index.names if name]
620
        sql = self.schema.registry.insert().values(
621
            seriename=seriename,
Aurélien Campéas's avatar
Aurélien Campéas committed
622
            table_name=self._make_tablename(cn, seriename),
623
624
625
626
            metadata={
                'tzaware': tzaware_serie(ts),
                'index_type': index.dtype.name,
                'index_names': inames,
627
628
                'index_dtype': index.dtype.str,
                'value_dtype': ts.dtypes.str,
629
                'value_type': ts.dtypes.name
630
            }
631
        )
632
633
        regid = cn.execute(sql).inserted_primary_key[0]
        self.registry_map[seriename] = regid
634

635
    def _get_ts_table(self, cn, seriename):
636
637
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename:
Aurélien Campéas's avatar
Aurélien Campéas committed
638
            return self._table_definition_for(cn, seriename)
639

640
641
    # changeset handling

642
    def _newchangeset(self, cn, author, insertion_date=None, metadata=None):
643
        table = self.schema.changeset
644
645
646
        if insertion_date is not None:
            assert insertion_date.tzinfo is not None
        idate = pd.Timestamp(insertion_date or datetime.utcnow(), tz='UTC')
647
648
        sql = table.insert().values(
            author=author,
649
            metadata=metadata,
650
            insertion_date=idate)
651
        return cn.execute(sql).inserted_primary_key[0]
652

653
    def _changeset_series(self, cn, csid):
654
        cset_serie = self.schema.changeset_series
655
656
        reg = self.schema.registry
        sql = select(
657
            [reg.c.seriename]
658
659
        ).where(cset_serie.c.cset == csid
        ).where(cset_serie.c.serie == reg.c.id)
660

661
        return [
662
            row.seriename
663
664
            for row in cn.execute(sql).fetchall()
        ]
665

666
667
668
669
670
671
672
    def _previous_cset(self, cn, seriename, csid):
        tablename = self._serie_to_tablename(cn, seriename)
        sql = ('select cset from "{}.timeserie"."{}" '
               'where cset < %(csid)s '
               'order by cset desc limit 1').format(self.namespace, tablename)
        return cn.execute(sql, csid=csid).scalar()

673
674
    # insertion handling

675
    def _validate(self, cn, ts, seriename):
676
677
        if ts.isnull().all():
            # ts erasure
678
            return
679
        tstype = ts.dtype
680
        meta = self.metadata(cn, seriename)
681
        if tstype != meta['value_type']:
682
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
683
                seriename, tstype, meta['value_type'])
684
            raise Exception(m)
685
        if ts.index.dtype.name != meta['index_type']:
686
            raise Exception('Incompatible index types')
687

688
689
    def _name_to_regid(self, cn, seriename):
        regid = self.registry_map.get(seriename)
690
691
692
693
        if regid is not None:
            return regid

        registry = self.schema.registry
694
        sql = select([registry.c.id]).where(registry.c.seriename == seriename)
695
        regid = self.registry_map[seriename] = cn.execute(sql).scalar()
696
697
        return regid

698
    def _finalize_insertion(self, cn, csid, seriename):
699
700
701
        table = self.schema.changeset_series
        sql = table.insert().values(
            cset=csid,
702
            serie=self._name_to_regid(cn, seriename)
703
704
        )
        cn.execute(sql)
705

706
707
708
    def _resetcaches(self):
        TABLES.clear()
        SNAPTABLES.clear()
709
710
711
        self.metadatacache.clear()
        self.registry_map.clear()
        self.serie_tablename.clear()