tsio.py 25.9 KB
Newer Older
1
from datetime import datetime
2
import logging
3
4
import hashlib
import uuid
5
6
7

import pandas as pd

8
9
from sqlalchemy import Table, Column, Integer, ForeignKey, Index
from sqlalchemy.sql.elements import NONE_NAME
10
from sqlalchemy.engine.base import Engine
Aurélien Campéas's avatar
Aurélien Campéas committed
11
from sqlalchemy.sql.expression import select, func, desc
Aurélien Campéas's avatar
Aurélien Campéas committed
12
from sqlalchemy.dialects.postgresql import TIMESTAMP
13

14
from tshistory.schema import tsschema
15
from tshistory.util import (
16
    closed_overlaps,
17
18
    num2float,
    SeriesServices,
19
    start_end,
20
21
    tzaware_serie
)
22
from tshistory.snapshot import Snapshot, TABLES as SNAPTABLES
23

24
L = logging.getLogger('tshistory.tsio')
25
TABLES = {}
26
27


28
class TimeSerie(SeriesServices):
29
    namespace = 'tsh'
30
    schema = None
31
    metadatacache = None
32
33
34
35
36
37
38
39
    metakeys = {
        'tzaware',
        'index_type',
        'index_names',
        'index_dtype',
        'value_dtype',
        'value_type'
    }
40
    registry_map = None
41
    serie_tablename = None
42
    create_lock_id = None
43
44
45

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
46
47
        self.schema = tsschema(namespace)
        self.schema.define()
48
        self.metadatacache = {}
49
        self.registry_map = {}
50
        self.serie_tablename = {}
51
        self.create_lock_id = sum(ord(c) for c in namespace)
52

53
54
55
56
57
58
    def _check_tx(self, cn):
        # safety belt to make sure important api points are tx-safe
        if isinstance(cn, Engine) or not cn.in_transaction():
            if not getattr(self, '_testing', False):
                raise TypeError('You must use a transaction object')

59
    def insert(self, cn, newts, seriename, author,
60
61
               metadata=None,
               _insertion_date=None):
62
        """Create a new revision of a given time series
63

64
        newts: pandas.Series with date index
65
        seriename: str unique identifier of the serie
66
        author: str free-form author name
67
        metadata: optional dict for changeset metadata
68
        """
69
        self._check_tx(cn)
70
71
72
73
        assert isinstance(newts, pd.Series), 'Not a pd.Series'
        assert isinstance(seriename, str), 'Name not a string'
        assert isinstance(author, str), 'Author not a string'
        assert metadata is None or isinstance(metadata, dict), 'Bad format for metadata'
74
75
        assert (_insertion_date is None or
                isinstance(_insertion_date, datetime)), 'Bad format for insertion date'
76
        assert not newts.index.duplicated().any(), 'There are some duplicates in the index'
77

78
        assert newts.index.notna().all(), 'The index contains NaT entries'
79
80
        if not newts.index.is_monotonic_increasing:
            newts = newts.sort_index()
81

82
        newts = num2float(newts)
83

84
        if not len(newts):
85
            return
86

87
        assert ('<M8[ns]' == newts.index.dtype or
88
                'datetime' in str(newts.index.dtype) and not
89
90
                isinstance(newts.index, pd.MultiIndex))

91
92
        newts.name = seriename
        table = self._get_ts_table(cn, seriename)
93

94
        if table is None:
95
            return self._create(cn, newts, seriename, author,
96
                                metadata, _insertion_date)
97

98
        return self._update(cn, table, newts, seriename, author,
99
                            metadata, _insertion_date)
100

101
    def get(self, cn, seriename, revision_date=None,
102
103
            from_value_date=None, to_value_date=None,
            _keep_nans=False):
104
105
106
107
108
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

109
        """
Aurélien Campéas's avatar
Aurélien Campéas committed
110
        if not self.exists(cn, seriename):
111
            return
112

113
        csetfilter = []
114
        if revision_date:
115
            csetfilter.append(lambda cset: cset.c.insertion_date <= revision_date)
116
        snap = Snapshot(cn, self, seriename)
117
        _, current = snap.find(csetfilter=csetfilter,
118
119
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)
120

121
        if current is not None and not _keep_nans:
122
            current.name = seriename
123
            current = current[~current.isnull()]
124
        return current
125

126
    def metadata(self, cn, seriename):
127
        """Return metadata dict of timeserie."""
128
129
        if seriename in self.metadatacache:
            return self.metadatacache[seriename]
130
131
        reg = self.schema.registry
        sql = select([reg.c.metadata]).where(
132
            reg.c.seriename == seriename
133
134
        )
        meta = cn.execute(sql).scalar()
135
        self.metadatacache[seriename] = meta
136
137
        return meta

138
    def update_metadata(self, cn, seriename, metadata):
139
        self._check_tx(cn)
140
        assert isinstance(metadata, dict)
141
        assert not set(metadata.keys()) & self.metakeys
142
        meta = self.metadata(cn, seriename)
143
144
145
        # remove al but internal stuff
        newmeta = {key: meta[key] for key in self.metakeys}
        newmeta.update(metadata)
146
147
        reg = self.schema.registry
        sql = reg.update().where(
148
            reg.c.seriename == seriename
149
        ).values(metadata=newmeta)
150
        self.metadatacache.pop(seriename)
151
152
        cn.execute(sql)

153
154
155
156
157
158
159
    def changeset_metadata(self, cn, csid):
        sql = 'select metadata from "{ns}".changeset where id = {id}'.format(
            ns=self.namespace,
            id=csid
        )
        return cn.execute(sql).scalar()

160
    def get_history(self, cn, seriename,
161
                    from_insertion_date=None,
162
163
                    to_insertion_date=None,
                    from_value_date=None,
164
                    to_value_date=None,
165
                    deltabefore=None,
166
167
                    deltaafter=None,
                    diffmode=False):
168
        table = self._get_ts_table(cn, seriename)
169
170
171
        if table is None:
            return

172
        cset = self.schema.changeset
173
174
175
176
177
178
179
180
181
182
183
184
        revsql = select(
            [cset.c.id, cset.c.insertion_date]
        ).order_by(
            cset.c.id
        ).where(
            table.c.cset == cset.c.id
        )

        if from_insertion_date:
            revsql = revsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            revsql = revsql.where(cset.c.insertion_date <= to_insertion_date)
185

186
187
188
189
190
191
192
193
194
        if from_value_date or to_value_date:
            revsql = revsql.where(
                closed_overlaps(from_value_date, to_value_date)
            )

        revs = cn.execute(
            revsql,
            {'fromdate': from_value_date, 'todate': to_value_date}
        ).fetchall()
195
        if not revs:
196
            return {}
197

198
199
200
201
202
203
        if diffmode:
            # compute the previous serie value
            first_csid = revs[0][0]
            previous_csid = self._previous_cset(cn, seriename, first_csid)
            revs.insert(0, (previous_csid, None))

204
        snapshot = Snapshot(cn, self, seriename)
205
        series = []
206
207
        if (deltabefore, deltaafter) != (None, None):
            for csid, idate in revs:
208
209
                from_date = None
                to_date = None
210
                if deltabefore is not None:
211
                    from_date = idate - deltabefore
212
                if deltaafter is not None:
213
                    to_date = idate + deltaafter
214
215
216
                series.append((
                    idate,
                    snapshot.find(csetfilter=[lambda cset: cset.c.id == csid],
217
218
                                  from_value_date=from_date,
                                  to_value_date=to_date)[1]
219
220
221
222
223
224
                ))
        else:
            series = snapshot.findall(revs,
                                      from_value_date,
                                      to_value_date
            )
225

226
227
        if diffmode:
            diffs = []
Aurélien Campéas's avatar
Aurélien Campéas committed
228
            for (_revdate_a, serie_a), (revdate_b, serie_b) in zip(series, series[1:]):
229
230
231
232
233
234
235
                if serie_a is None:
                    # when we scan the entirety of the history: there exists no "previous" serie
                    # we therefore consider the first serie as a diff to the "null" serie
                    diffs.append((revdate_b, serie_b))
                else:
                    diffs.append((revdate_b, self.diff(serie_a, serie_b)))
            series = diffs
236
237
238
239
240
        else:
            series = [
                (idate, ts.dropna())
                 for idate, ts in series
            ]
241

242
        return {
243
            pd.Timestamp(idate).astimezone('UTC'): serie
244
245
            for idate, serie in series
        }
246

247
248
249
    def get_delta(self, cn, seriename, delta,
                  from_value_date=None,
                  to_value_date=None):
250
251
252
253
        """ compute a series whose value dates are bounded to be
        `delta` time after the insertion dates and where we
        keep the most recent ones
        """
254
        histo = self.get_history(
255
256
257
            cn, seriename, deltabefore=-delta,
            from_value_date=from_value_date,
            to_value_date=to_value_date
258
        )
259
260
        if histo is None:
            return None
261

262
263
264
265
266
267
268
269
        vimap = {}
        vvmap = {}
        for idate, series in histo.items():
            for vdate, value in series.iteritems():
                if vdate not in vimap or vimap[vdate] < idate:
                    vimap[vdate] = idate
                    vvmap[vdate] = value

270
        ts = pd.Series(vvmap).sort_index().loc[from_value_date:to_value_date]
271
272
        ts.name = seriename
        return ts
273

274
275
    def exists(self, cn, seriename):
        return self._get_ts_table(cn, seriename) is not None
276

277
    def latest_insertion_date(self, cn, seriename):
278
        cset = self.schema.changeset
279
        tstable = self._get_ts_table(cn, seriename)
280
281
282
283
284
285
286
287
        sql = select(
            [func.max(cset.c.insertion_date)]
        ).where(
            tstable.c.cset == cset.c.id
        )
        return pd.Timestamp(
            cn.execute(sql).scalar()
        ).astimezone('UTC')
288

289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
    def insertion_dates(self, cn, seriename):
        cset = self.schema.changeset
        tstable = self._get_ts_table(cn, seriename)
        sql = select(
            [cset.c.insertion_date]
        ).where(
            tstable.c.cset == cset.c.id
        ).order_by(
            cset.c.id
        )
        return [
            pd.Timestamp(idate).astimezone('UTC')
            for idate, in cn.execute(sql).fetchall()
        ]

304
305
306
307
    def last_id(self, cn, seriename):
        snapshot = Snapshot(cn, self, seriename)
        return snapshot.last_id()

308
309
310
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
        assert mode in ('strict', 'before', 'after')
        cset = self.schema.changeset
311
        table = self._table_definition_for(cn, seriename)
312
313
        sql = select([table.c.cset]).where(
            table.c.cset == cset.c.id
314
315
316
317
318
319
320
321
322
        )
        if mode == 'strict':
            sql = sql.where(cset.c.insertion_date == revdate)
        elif mode == 'before':
            sql = sql.where(cset.c.insertion_date <= revdate)
        else:
            sql = sql.where(cset.c.insertion_date >= revdate)
        return cn.execute(sql).scalar()

323
    def delete(self, cn, seriename):
324
        self._check_tx(cn)
325
        assert not isinstance(cn, Engine), 'use a transaction object'
326
327
328
        if not self.exists(cn, seriename):
            print('not deleting unknown series', seriename, self.namespace)
            return
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
        # changeset will keep ghost entries
        # we cleanup changeset series, then registry
        # then we drop the two remaining tables
        # cn *must* be a transaction scope
        rid, tablename = cn.execute(
            'select id, table_name from "{}".registry '
            'where seriename = %(seriename)s'.format(self.namespace),
            seriename=seriename
        ).fetchone()
        # drop series tables
        cn.execute(
            'drop table "{}.timeserie"."{}" cascade'.format(self.namespace, tablename)
        )
        cn.execute(
            'drop table "{}.snapshot"."{}" cascade'.format(self.namespace, tablename)
        )
        # cleanup changesets table
        cn.execute('with csets as ('
                   ' select cset from "{ns}".changeset_series '
                   ' where serie = %(rid)s'
                   ') '
                   'delete from "{ns}".changeset as cset using csets '
                   'where cset.id = csets.cset'.format(ns=self.namespace),
                   rid=rid
        )
        cn.execute('delete from "{}".registry '
                   'where id = %(rid)s'.format(self.namespace),
                   rid=rid)
        # -> this will transitively cleanup state changeset_series entries
        self._resetcaches()
359
        print('deleted', seriename, self.namespace)
360

361
    def strip(self, cn, seriename, csid):
362
        self._check_tx(cn)
363
364
365
366
367
368
369
370
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs

        # put stripping info in the metadata
        cset = self.schema.changeset
        cset_serie = self.schema.changeset_series
        for log in logs:
            # update changeset.metadata
371
            metadata = self.changeset_metadata(cn, log['rev']) or {}
372
373
374
375
376
377
            metadata['tshistory.info'] = 'got stripped from {}'.format(csid)
            sql = cset.update().where(cset.c.id == log['rev']
            ).values(metadata=metadata)
            cn.execute(sql)
            # delete changset_serie item
            sql = cset_serie.delete().where(
378
                cset_serie.c.cset == log['rev']
379
            ).where(
380
                cset_serie.c.serie == self._name_to_regid(cn, seriename)
381
382
383
384
            )
            cn.execute(sql)

        # wipe the diffs
385
        table = self._table_definition_for(cn, seriename)
386
        cn.execute(table.delete().where(table.c.cset >= csid))
387

388
    def info(self, cn):
389
390
        """Gather global statistics on the current tshistory repository
        """
391
        sql = 'select count(*) from "{}".registry'.format(self.namespace)
392
        stats = {'series count': cn.execute(sql).scalar()}
393
        sql = 'select max(id) from "{}".changeset'.format(self.namespace)
394
        stats['changeset count'] = cn.execute(sql).scalar()
395
396
397
        sql = 'select distinct seriename from "{}".registry order by seriename'.format(
            self.namespace
        )
398
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
399
400
        return stats

401
    def log(self, cn, limit=0, names=None, authors=None,
402
            stripped=False,
403
404
            fromrev=None, torev=None,
            fromdate=None, todate=None):
405
406
407
408
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
409
410
411
412
413
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
414

415
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date, cset.c.metadata]
416
        ).distinct().order_by(desc(cset.c.id))
417
418
419

        if limit:
            sql = sql.limit(limit)
420
        if names:
421
            sql = sql.where(reg.c.seriename.in_(names))
422
423
        if authors:
            sql = sql.where(cset.c.author.in_(authors))
424
425
426
427
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)
        if torev:
            sql = sql.where(cset.c.id <= torev)
428
429
430
431
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)
        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)
432
433
434
435
        if stripped:
            # outerjoin to show dead things
            sql = sql.select_from(cset.outerjoin(cset_series))
        else:
436
            sql = sql.where(cset.c.id == cset_series.c.cset
437
            ).where(cset_series.c.serie == reg.c.id)
438

439
        rset = cn.execute(sql)
440
        for csetid, author, revdate, meta in rset.fetchall():
441
442
            log.append({'rev': csetid, 'author': author,
                        'date': pd.Timestamp(revdate, tz='utc'),
443
                        'meta': meta or {},
444
                        'names': self._changeset_series(cn, csetid)})
445

446
        log.sort(key=lambda rev: rev['rev'])
447
448
        return log

449
    def interval(self, cn, seriename, notz=False):
450
451
452
453
454
455
456
457
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            raise ValueError(f'no such serie: {seriename}')
        sql = (f'select start, "end" '
               f'from "{self.namespace}.timeserie"."{tablename}" '
               f'order by cset desc limit 1')
        res = cn.execute(sql).fetchone()
        start, end = res.start, res.end
458
        if self.metadata(cn, seriename).get('tzaware') and not notz:
459
460
461
            start, end = pd.Timestamp(start, tz='UTC'), pd.Timestamp(end, tz='UTC')
        return pd.Interval(left=start, right=end, closed='both')

462
463
    # /API
    # Helpers
464

465
466
    # creation / update

467
    def _create(self, cn, newts, seriename, author,
468
                metadata=None, insertion_date=None):
469
        start, end = start_end(newts, notz=False)
470
471
472
473
        if start is None:
            assert end is None
            # this is just full of nans
            return None
474
475
476
477
478
        # chop off unwanted nans
        newts = newts.loc[start:end]
        if len(newts) == 0:
            return None

479
480
481
482
483
        # at creation time we take an exclusive lock to avoid
        # a deadlock on created tables against the changeset-series fk
        cn.execute(
            'select pg_advisory_xact_lock({})'.format(self.create_lock_id)
        )
484
        self._register_serie(cn, seriename, newts)
485
        snapshot = Snapshot(cn, self, seriename)
486
        csid = self._newchangeset(cn, author, insertion_date, metadata)
487
        head = snapshot.create(newts)
488
        start, end = start_end(newts)
489
490
        value = {
            'cset': csid,
491
492
493
            'snapshot': head,
            'start': start,
            'end': end
494
        }
Aurélien Campéas's avatar
Aurélien Campéas committed
495
        table = self._make_ts_table(cn, seriename)
496
        cn.execute(table.insert().values(value))
497
        self._finalize_insertion(cn, csid, seriename)
498
        L.info('first insertion of %s (size=%s) by %s',
499
               seriename, len(newts), author)
500
501
        return newts

502
    def _update(self, cn, table, newts, seriename, author,
503
                metadata=None, insertion_date=None):
504
505
        self._validate(cn, newts, seriename)
        snapshot = Snapshot(cn, self, seriename)
506
507
508
        diff = self.diff(snapshot.last(newts.index.min(),
                                       newts.index.max()),
                         newts)
509
510
        if not len(diff):
            L.info('no difference in %s by %s (for ts of size %s)',
511
                   seriename, author, len(newts))
512
513
            return

514
        # compute series start/end stamps
515
        tsstart, tsend = start_end(newts)
516
517
518
        ival = self.interval(cn, seriename, notz=True)
        start = min(tsstart or ival.left, ival.left)
        end = max(tsend or ival.right, ival.right)
519
520
521
522
523
524
525
526
527
528
529
530
531
532

        if pd.isnull(diff[0]) or pd.isnull(diff[-1]):
            # we *might* be shrinking, let's look at the full series
            # and yes, shrinkers have a slow path
            last = snapshot.last()
            patched = self.patch(last, diff).dropna()
            if not len(patched):
                raise ValueError('complete erasure of a series is forbidden')
            if pd.isnull(diff[0]):
                start = patched.index[0]
            if pd.isnull(diff[-1]):
                end = patched.index[-1]

        csid = self._newchangeset(cn, author, insertion_date, metadata)
533
534
535
        head = snapshot.update(diff)
        value = {
            'cset': csid,
536
537
538
            'snapshot': head,
            'start': start,
            'end': end
539
540
        }
        cn.execute(table.insert().values(value))
541
        self._finalize_insertion(cn, csid, seriename)
542
543

        L.info('inserted diff (size=%s) for ts %s by %s',
544
               len(diff), seriename, author)
545
546
        return diff

547
    # serie table handling
548

549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
    def _make_tablename(self, cn, seriename):
        """ compute the unqualified (no namespace) table name
        from a serie name, to allow arbitrary serie names
        """
        # postgresql table names are limited to 63 chars.
        if len(seriename) > 63:
            seriename = hashlib.sha1(seriename.encode('utf-8')).hexdigest()

        # collision detection (collision can happen after a rename)
        if cn.execute(f'select table_name '
                      f'from "{self.namespace}".registry '
                      f'where table_name = %(seriename)s',
                      seriename=seriename).scalar():
            return str(uuid.uuid4())

        return seriename

566
567
568
569
    def _serie_to_tablename(self, cn, seriename):
        tablename = self.serie_tablename.get(seriename)
        if tablename is not None:
            return tablename
570

571
572
573
574
575
576
577
578
579
580
581
582
583
        reg = self.schema.registry
        sql = select([reg.c.table_name]).where(reg.c.seriename == seriename)
        tablename = cn.execute(sql).scalar()
        if tablename is None:
            # creation time
            return
        self.serie_tablename[seriename] = tablename
        return tablename

    def _table_definition_for(self, cn, seriename):
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            # creation time
Aurélien Campéas's avatar
Aurélien Campéas committed
584
            tablename = self._make_tablename(cn, seriename)
585
586
        fq_tablename = '{}.timeserie.{}'.format(self.namespace, tablename)
        table = TABLES.get(fq_tablename)
587
        if table is None:
588
589
            TABLES[fq_tablename] = table = Table(
                tablename, self.schema.meta,
590
591
592
                Column('id', Integer, primary_key=True),
                Column('cset', Integer,
                       ForeignKey('{}.changeset.id'.format(self.namespace)),
593
                       nullable=False),
594
595
                Column('start', TIMESTAMP, nullable=False),
                Column('end', TIMESTAMP, nullable=False),
596
597
598
                Column('snapshot', Integer,
                       ForeignKey('{}.snapshot.{}.id'.format(
                           self.namespace,
599
600
601
                           tablename))),
                Index(NONE_NAME, 'cset'),
                Index(NONE_NAME, 'snapshot'),
602
603
                Index(NONE_NAME, 'start'),
                Index(NONE_NAME, 'end'),
604
                schema='{}.timeserie'.format(self.namespace),
605
                keep_existing=True
606
607
            )
        return table
608

Aurélien Campéas's avatar
Aurélien Campéas committed
609
    def _make_ts_table(self, cn, seriename):
610
        table = self._table_definition_for(cn, seriename)
611
        table.create(cn)
612
613
614
        return table

    def _register_serie(self, cn, seriename, ts):
615
616
        index = ts.index
        inames = [name for name in index.names if name]
617
        sql = self.schema.registry.insert().values(
618
            seriename=seriename,
Aurélien Campéas's avatar
Aurélien Campéas committed
619
            table_name=self._make_tablename(cn, seriename),
620
621
622
623
            metadata={
                'tzaware': tzaware_serie(ts),
                'index_type': index.dtype.name,
                'index_names': inames,
624
625
                'index_dtype': index.dtype.str,
                'value_dtype': ts.dtypes.str,
626
                'value_type': ts.dtypes.name
627
            }
628
        )
629
630
        regid = cn.execute(sql).inserted_primary_key[0]
        self.registry_map[seriename] = regid
631

632
    def _get_ts_table(self, cn, seriename):
633
634
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename:
Aurélien Campéas's avatar
Aurélien Campéas committed
635
            return self._table_definition_for(cn, seriename)
636

637
638
    # changeset handling

639
    def _newchangeset(self, cn, author, insertion_date=None, metadata=None):
640
        table = self.schema.changeset
641
642
643
        if insertion_date is not None:
            assert insertion_date.tzinfo is not None
        idate = pd.Timestamp(insertion_date or datetime.utcnow(), tz='UTC')
644
645
        sql = table.insert().values(
            author=author,
646
            metadata=metadata,
647
            insertion_date=idate)
648
        return cn.execute(sql).inserted_primary_key[0]
649

650
    def _changeset_series(self, cn, csid):
651
        cset_serie = self.schema.changeset_series
652
653
        reg = self.schema.registry
        sql = select(
654
            [reg.c.seriename]
655
656
        ).where(cset_serie.c.cset == csid
        ).where(cset_serie.c.serie == reg.c.id)
657

658
        return [
659
            row.seriename
660
661
            for row in cn.execute(sql).fetchall()
        ]
662

663
664
665
666
667
668
669
    def _previous_cset(self, cn, seriename, csid):
        tablename = self._serie_to_tablename(cn, seriename)
        sql = ('select cset from "{}.timeserie"."{}" '
               'where cset < %(csid)s '
               'order by cset desc limit 1').format(self.namespace, tablename)
        return cn.execute(sql, csid=csid).scalar()

670
671
    # insertion handling

672
    def _validate(self, cn, ts, seriename):
673
674
        if ts.isnull().all():
            # ts erasure
675
            return
676
        tstype = ts.dtype
677
        meta = self.metadata(cn, seriename)
678
        if tstype != meta['value_type']:
679
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
680
                seriename, tstype, meta['value_type'])
681
            raise Exception(m)
682
        if ts.index.dtype.name != meta['index_type']:
683
            raise Exception('Incompatible index types')
684

685
686
    def _name_to_regid(self, cn, seriename):
        regid = self.registry_map.get(seriename)
687
688
689
690
        if regid is not None:
            return regid

        registry = self.schema.registry
691
        sql = select([registry.c.id]).where(registry.c.seriename == seriename)
692
        regid = self.registry_map[seriename] = cn.execute(sql).scalar()
693
694
        return regid

695
    def _finalize_insertion(self, cn, csid, seriename):
696
697
698
        table = self.schema.changeset_series
        sql = table.insert().values(
            cset=csid,
699
            serie=self._name_to_regid(cn, seriename)
700
701
        )
        cn.execute(sql)
702

703
704
705
    def _resetcaches(self):
        TABLES.clear()
        SNAPTABLES.clear()
706
707
708
        self.metadatacache.clear()
        self.registry_map.clear()
        self.serie_tablename.clear()