tsio.py 26.3 KB
Newer Older
1
from datetime import datetime
2
import logging
3
4
import hashlib
import uuid
5
from threading import Lock
6
7
8

import pandas as pd

9
10
from sqlalchemy import Table, Column, Integer, ForeignKey, Index
from sqlalchemy.sql.elements import NONE_NAME
11
from sqlalchemy.engine.base import Engine
Aurélien Campéas's avatar
Aurélien Campéas committed
12
from sqlalchemy.sql.expression import select, func, desc
Aurélien Campéas's avatar
Aurélien Campéas committed
13
from sqlalchemy.dialects.postgresql import TIMESTAMP
14

15
from tshistory.schema import tsschema
16
from tshistory.util import (
17
    closed_overlaps,
18
19
    num2float,
    SeriesServices,
20
    start_end,
21
22
    tzaware_serie
)
23
from tshistory.snapshot import Snapshot, TABLES as SNAPTABLES
24

25
L = logging.getLogger('tshistory.tsio')
26
TABLES = {}
27
28


29
class TimeSerie(SeriesServices):
30
    namespace = 'tsh'
31
    schema = None
32
    metadatacache = None
33
34
35
36
37
38
39
40
    metakeys = {
        'tzaware',
        'index_type',
        'index_names',
        'index_dtype',
        'value_dtype',
        'value_type'
    }
41
    registry_map = None
42
    serie_tablename = None
43
    create_lock_id = None
44
    cachelock = Lock()
45
46
47

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
48
49
        self.schema = tsschema(namespace)
        self.schema.define()
50
        self.metadatacache = {}
51
        self.registry_map = {}
52
        self.serie_tablename = {}
53
        self.create_lock_id = sum(ord(c) for c in namespace)
54

55
56
57
58
59
60
    def _check_tx(self, cn):
        # safety belt to make sure important api points are tx-safe
        if isinstance(cn, Engine) or not cn.in_transaction():
            if not getattr(self, '_testing', False):
                raise TypeError('You must use a transaction object')

61
    def insert(self, cn, newts, seriename, author,
62
63
               metadata=None,
               _insertion_date=None):
64
        """Create a new revision of a given time series
65

66
        newts: pandas.Series with date index
67
        seriename: str unique identifier of the serie
68
        author: str free-form author name
69
        metadata: optional dict for changeset metadata
70
        """
71
        self._check_tx(cn)
72
73
74
75
        assert isinstance(newts, pd.Series), 'Not a pd.Series'
        assert isinstance(seriename, str), 'Name not a string'
        assert isinstance(author, str), 'Author not a string'
        assert metadata is None or isinstance(metadata, dict), 'Bad format for metadata'
76
77
        assert (_insertion_date is None or
                isinstance(_insertion_date, datetime)), 'Bad format for insertion date'
78
        assert not newts.index.duplicated().any(), 'There are some duplicates in the index'
79

80
        assert newts.index.notna().all(), 'The index contains NaT entries'
81
82
        if not newts.index.is_monotonic_increasing:
            newts = newts.sort_index()
83

84
        newts = num2float(newts)
85

86
        if not len(newts):
87
            return
88

89
        assert ('<M8[ns]' == newts.index.dtype or
90
                'datetime' in str(newts.index.dtype) and not
91
92
                isinstance(newts.index, pd.MultiIndex))

93
94
        newts.name = seriename
        table = self._get_ts_table(cn, seriename)
95

96
        if table is None:
97
            return self._create(cn, newts, seriename, author,
98
                                metadata, _insertion_date)
99

100
        return self._update(cn, table, newts, seriename, author,
101
                            metadata, _insertion_date)
102

103
    def get(self, cn, seriename, revision_date=None,
104
105
            from_value_date=None, to_value_date=None,
            _keep_nans=False):
106
107
108
109
110
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

111
        """
Aurélien Campéas's avatar
Aurélien Campéas committed
112
        if not self.exists(cn, seriename):
113
            return
114

115
        csetfilter = []
116
        if revision_date:
117
            csetfilter.append(lambda cset: cset.c.insertion_date <= revision_date)
118
        snap = Snapshot(cn, self, seriename)
119
        _, current = snap.find(csetfilter=csetfilter,
120
121
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)
122

123
        if current is not None and not _keep_nans:
124
            current.name = seriename
125
            current = current[~current.isnull()]
126
        return current
127

128
    def metadata(self, cn, seriename):
129
        """Return metadata dict of timeserie."""
130
131
        if seriename in self.metadatacache:
            return self.metadatacache[seriename]
132
133
        reg = self.schema.registry
        sql = select([reg.c.metadata]).where(
134
            reg.c.seriename == seriename
135
136
        )
        meta = cn.execute(sql).scalar()
137
        self.metadatacache[seriename] = meta
138
139
        return meta

140
    def update_metadata(self, cn, seriename, metadata):
141
        self._check_tx(cn)
142
        assert isinstance(metadata, dict)
143
        assert not set(metadata.keys()) & self.metakeys
144
        meta = self.metadata(cn, seriename)
145
146
147
        # remove al but internal stuff
        newmeta = {key: meta[key] for key in self.metakeys}
        newmeta.update(metadata)
148
149
        reg = self.schema.registry
        sql = reg.update().where(
150
            reg.c.seriename == seriename
151
        ).values(metadata=newmeta)
152
        self.metadatacache.pop(seriename)
153
154
        cn.execute(sql)

155
156
157
158
159
160
161
    def changeset_metadata(self, cn, csid):
        sql = 'select metadata from "{ns}".changeset where id = {id}'.format(
            ns=self.namespace,
            id=csid
        )
        return cn.execute(sql).scalar()

162
163
164
    def type(self, cn, name):
        return 'primary'

165
    def get_history(self, cn, seriename,
166
                    from_insertion_date=None,
167
168
                    to_insertion_date=None,
                    from_value_date=None,
169
                    to_value_date=None,
170
                    deltabefore=None,
171
172
                    deltaafter=None,
                    diffmode=False):
173
        table = self._get_ts_table(cn, seriename)
174
175
176
        if table is None:
            return

177
        cset = self.schema.changeset
178
179
180
181
182
183
184
185
186
187
188
189
        revsql = select(
            [cset.c.id, cset.c.insertion_date]
        ).order_by(
            cset.c.id
        ).where(
            table.c.cset == cset.c.id
        )

        if from_insertion_date:
            revsql = revsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            revsql = revsql.where(cset.c.insertion_date <= to_insertion_date)
190

191
192
193
194
195
196
197
198
199
        if from_value_date or to_value_date:
            revsql = revsql.where(
                closed_overlaps(from_value_date, to_value_date)
            )

        revs = cn.execute(
            revsql,
            {'fromdate': from_value_date, 'todate': to_value_date}
        ).fetchall()
200
        if not revs:
201
            return {}
202

203
204
205
206
207
208
        if diffmode:
            # compute the previous serie value
            first_csid = revs[0][0]
            previous_csid = self._previous_cset(cn, seriename, first_csid)
            revs.insert(0, (previous_csid, None))

209
        snapshot = Snapshot(cn, self, seriename)
210
        series = []
211
212
        if (deltabefore, deltaafter) != (None, None):
            for csid, idate in revs:
213
214
                from_date = None
                to_date = None
215
                if deltabefore is not None:
216
                    from_date = idate - deltabefore
217
                if deltaafter is not None:
218
                    to_date = idate + deltaafter
219
220
221
                series.append((
                    idate,
                    snapshot.find(csetfilter=[lambda cset: cset.c.id == csid],
222
223
                                  from_value_date=from_date,
                                  to_value_date=to_date)[1]
224
225
226
227
228
229
                ))
        else:
            series = snapshot.findall(revs,
                                      from_value_date,
                                      to_value_date
            )
230

231
232
        if diffmode:
            diffs = []
Aurélien Campéas's avatar
Aurélien Campéas committed
233
            for (_revdate_a, serie_a), (revdate_b, serie_b) in zip(series, series[1:]):
234
235
236
237
238
239
240
                if serie_a is None:
                    # when we scan the entirety of the history: there exists no "previous" serie
                    # we therefore consider the first serie as a diff to the "null" serie
                    diffs.append((revdate_b, serie_b))
                else:
                    diffs.append((revdate_b, self.diff(serie_a, serie_b)))
            series = diffs
241
242
243
244
245
        else:
            series = [
                (idate, ts.dropna())
                 for idate, ts in series
            ]
246

247
        return {
248
            pd.Timestamp(idate).astimezone('UTC'): serie
249
250
            for idate, serie in series
        }
251

252
253
254
    def get_delta(self, cn, seriename, delta,
                  from_value_date=None,
                  to_value_date=None):
255
256
257
258
        """ compute a series whose value dates are bounded to be
        `delta` time after the insertion dates and where we
        keep the most recent ones
        """
259
        histo = self.get_history(
260
261
262
            cn, seriename, deltabefore=-delta,
            from_value_date=from_value_date,
            to_value_date=to_value_date
263
        )
264
265
        if histo is None:
            return None
266

267
268
269
270
271
272
273
274
        vimap = {}
        vvmap = {}
        for idate, series in histo.items():
            for vdate, value in series.iteritems():
                if vdate not in vimap or vimap[vdate] < idate:
                    vimap[vdate] = idate
                    vvmap[vdate] = value

275
        ts = pd.Series(vvmap).sort_index().loc[from_value_date:to_value_date]
276
277
        ts.name = seriename
        return ts
278

279
280
    def exists(self, cn, seriename):
        return self._get_ts_table(cn, seriename) is not None
281

282
    def latest_insertion_date(self, cn, seriename):
283
        cset = self.schema.changeset
284
        tstable = self._get_ts_table(cn, seriename)
285
286
287
288
289
290
291
292
        sql = select(
            [func.max(cset.c.insertion_date)]
        ).where(
            tstable.c.cset == cset.c.id
        )
        return pd.Timestamp(
            cn.execute(sql).scalar()
        ).astimezone('UTC')
293

294
295
    def insertion_dates(self, cn, seriename,
                        fromdate=None, todate=None):
296
297
298
299
300
301
302
303
304
        cset = self.schema.changeset
        tstable = self._get_ts_table(cn, seriename)
        sql = select(
            [cset.c.insertion_date]
        ).where(
            tstable.c.cset == cset.c.id
        ).order_by(
            cset.c.id
        )
305
306
307
308
309
310

        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)
        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)

311
312
313
314
315
        return [
            pd.Timestamp(idate).astimezone('UTC')
            for idate, in cn.execute(sql).fetchall()
        ]

316
317
318
319
    def last_id(self, cn, seriename):
        snapshot = Snapshot(cn, self, seriename)
        return snapshot.last_id()

320
321
322
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
        assert mode in ('strict', 'before', 'after')
        cset = self.schema.changeset
323
        table = self._table_definition_for(cn, seriename)
324
325
        sql = select([table.c.cset]).where(
            table.c.cset == cset.c.id
326
327
328
329
330
331
332
333
334
        )
        if mode == 'strict':
            sql = sql.where(cset.c.insertion_date == revdate)
        elif mode == 'before':
            sql = sql.where(cset.c.insertion_date <= revdate)
        else:
            sql = sql.where(cset.c.insertion_date >= revdate)
        return cn.execute(sql).scalar()

335
    def delete(self, cn, seriename):
336
        self._check_tx(cn)
337
        assert not isinstance(cn, Engine), 'use a transaction object'
338
339
340
        if not self.exists(cn, seriename):
            print('not deleting unknown series', seriename, self.namespace)
            return
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
        # changeset will keep ghost entries
        # we cleanup changeset series, then registry
        # then we drop the two remaining tables
        # cn *must* be a transaction scope
        rid, tablename = cn.execute(
            'select id, table_name from "{}".registry '
            'where seriename = %(seriename)s'.format(self.namespace),
            seriename=seriename
        ).fetchone()
        # drop series tables
        cn.execute(
            'drop table "{}.timeserie"."{}" cascade'.format(self.namespace, tablename)
        )
        cn.execute(
            'drop table "{}.snapshot"."{}" cascade'.format(self.namespace, tablename)
        )
        # cleanup changesets table
        cn.execute('with csets as ('
                   ' select cset from "{ns}".changeset_series '
                   ' where serie = %(rid)s'
                   ') '
                   'delete from "{ns}".changeset as cset using csets '
                   'where cset.id = csets.cset'.format(ns=self.namespace),
                   rid=rid
        )
        cn.execute('delete from "{}".registry '
                   'where id = %(rid)s'.format(self.namespace),
                   rid=rid)
        # -> this will transitively cleanup state changeset_series entries
        self._resetcaches()
371
        print('deleted', seriename, self.namespace)
372

373
    def strip(self, cn, seriename, csid):
374
        self._check_tx(cn)
375
376
377
378
379
380
381
382
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs

        # put stripping info in the metadata
        cset = self.schema.changeset
        cset_serie = self.schema.changeset_series
        for log in logs:
            # update changeset.metadata
383
            metadata = self.changeset_metadata(cn, log['rev']) or {}
384
385
386
387
388
389
            metadata['tshistory.info'] = 'got stripped from {}'.format(csid)
            sql = cset.update().where(cset.c.id == log['rev']
            ).values(metadata=metadata)
            cn.execute(sql)
            # delete changset_serie item
            sql = cset_serie.delete().where(
390
                cset_serie.c.cset == log['rev']
391
            ).where(
392
                cset_serie.c.serie == self._name_to_regid(cn, seriename)
393
394
395
396
            )
            cn.execute(sql)

        # wipe the diffs
397
        table = self._table_definition_for(cn, seriename)
398
        cn.execute(table.delete().where(table.c.cset >= csid))
399

400
    def info(self, cn):
401
402
        """Gather global statistics on the current tshistory repository
        """
403
        sql = 'select count(*) from "{}".registry'.format(self.namespace)
404
        stats = {'series count': cn.execute(sql).scalar()}
405
        sql = 'select max(id) from "{}".changeset'.format(self.namespace)
406
        stats['changeset count'] = cn.execute(sql).scalar()
407
408
409
        sql = 'select distinct seriename from "{}".registry order by seriename'.format(
            self.namespace
        )
410
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
411
412
        return stats

413
    def log(self, cn, limit=0, names=None, authors=None,
414
            stripped=False,
415
416
            fromrev=None, torev=None,
            fromdate=None, todate=None):
417
418
419
420
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
421
422
423
424
425
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
426

427
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date, cset.c.metadata]
428
        ).distinct().order_by(desc(cset.c.id))
429
430
431

        if limit:
            sql = sql.limit(limit)
432
        if names:
433
            sql = sql.where(reg.c.seriename.in_(names))
434
435
        if authors:
            sql = sql.where(cset.c.author.in_(authors))
436
437
438
439
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)
        if torev:
            sql = sql.where(cset.c.id <= torev)
440
441
442
443
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)
        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)
444
445
446
447
        if stripped:
            # outerjoin to show dead things
            sql = sql.select_from(cset.outerjoin(cset_series))
        else:
448
            sql = sql.where(cset.c.id == cset_series.c.cset
449
            ).where(cset_series.c.serie == reg.c.id)
450

451
        rset = cn.execute(sql)
452
        for csetid, author, revdate, meta in rset.fetchall():
453
454
            log.append({'rev': csetid, 'author': author,
                        'date': pd.Timestamp(revdate, tz='utc'),
455
                        'meta': meta or {},
456
                        'names': self._changeset_series(cn, csetid)})
457

458
        log.sort(key=lambda rev: rev['rev'])
459
460
        return log

461
    def interval(self, cn, seriename, notz=False):
462
463
464
465
466
467
468
469
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            raise ValueError(f'no such serie: {seriename}')
        sql = (f'select start, "end" '
               f'from "{self.namespace}.timeserie"."{tablename}" '
               f'order by cset desc limit 1')
        res = cn.execute(sql).fetchone()
        start, end = res.start, res.end
470
        if self.metadata(cn, seriename).get('tzaware') and not notz:
471
472
473
            start, end = pd.Timestamp(start, tz='UTC'), pd.Timestamp(end, tz='UTC')
        return pd.Interval(left=start, right=end, closed='both')

474
475
    # /API
    # Helpers
476

477
478
    # creation / update

479
    def _create(self, cn, newts, seriename, author,
480
                metadata=None, insertion_date=None):
481
        start, end = start_end(newts, notz=False)
482
483
484
485
        if start is None:
            assert end is None
            # this is just full of nans
            return None
486
487
488
489
490
        # chop off unwanted nans
        newts = newts.loc[start:end]
        if len(newts) == 0:
            return None

491
492
493
494
495
        # at creation time we take an exclusive lock to avoid
        # a deadlock on created tables against the changeset-series fk
        cn.execute(
            'select pg_advisory_xact_lock({})'.format(self.create_lock_id)
        )
496
        self._register_serie(cn, seriename, newts)
497
        snapshot = Snapshot(cn, self, seriename)
498
        csid = self._newchangeset(cn, author, insertion_date, metadata)
499
        head = snapshot.create(newts)
500
        start, end = start_end(newts)
501
502
        value = {
            'cset': csid,
503
504
505
            'snapshot': head,
            'start': start,
            'end': end
506
        }
Aurélien Campéas's avatar
Aurélien Campéas committed
507
        table = self._make_ts_table(cn, seriename)
508
        cn.execute(table.insert().values(value))
509
        self._finalize_insertion(cn, csid, seriename)
510
        L.info('first insertion of %s (size=%s) by %s',
511
               seriename, len(newts), author)
512
513
        return newts

514
    def _update(self, cn, table, newts, seriename, author,
515
                metadata=None, insertion_date=None):
516
517
        self._validate(cn, newts, seriename)
        snapshot = Snapshot(cn, self, seriename)
518
519
520
        diff = self.diff(snapshot.last(newts.index.min(),
                                       newts.index.max()),
                         newts)
521
522
        if not len(diff):
            L.info('no difference in %s by %s (for ts of size %s)',
523
                   seriename, author, len(newts))
524
525
            return

526
        # compute series start/end stamps
527
        tsstart, tsend = start_end(newts)
528
529
530
        ival = self.interval(cn, seriename, notz=True)
        start = min(tsstart or ival.left, ival.left)
        end = max(tsend or ival.right, ival.right)
531
532
533
534
535
536
537
538
539
540
541
542
543
544

        if pd.isnull(diff[0]) or pd.isnull(diff[-1]):
            # we *might* be shrinking, let's look at the full series
            # and yes, shrinkers have a slow path
            last = snapshot.last()
            patched = self.patch(last, diff).dropna()
            if not len(patched):
                raise ValueError('complete erasure of a series is forbidden')
            if pd.isnull(diff[0]):
                start = patched.index[0]
            if pd.isnull(diff[-1]):
                end = patched.index[-1]

        csid = self._newchangeset(cn, author, insertion_date, metadata)
545
546
547
        head = snapshot.update(diff)
        value = {
            'cset': csid,
548
549
550
            'snapshot': head,
            'start': start,
            'end': end
551
552
        }
        cn.execute(table.insert().values(value))
553
        self._finalize_insertion(cn, csid, seriename)
554
555

        L.info('inserted diff (size=%s) for ts %s by %s',
556
               len(diff), seriename, author)
557
558
        return diff

559
    # serie table handling
560

561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
    def _make_tablename(self, cn, seriename):
        """ compute the unqualified (no namespace) table name
        from a serie name, to allow arbitrary serie names
        """
        # postgresql table names are limited to 63 chars.
        if len(seriename) > 63:
            seriename = hashlib.sha1(seriename.encode('utf-8')).hexdigest()

        # collision detection (collision can happen after a rename)
        if cn.execute(f'select table_name '
                      f'from "{self.namespace}".registry '
                      f'where table_name = %(seriename)s',
                      seriename=seriename).scalar():
            return str(uuid.uuid4())

        return seriename

578
579
580
581
    def _serie_to_tablename(self, cn, seriename):
        tablename = self.serie_tablename.get(seriename)
        if tablename is not None:
            return tablename
582

583
584
585
586
587
588
589
590
591
592
593
594
595
        reg = self.schema.registry
        sql = select([reg.c.table_name]).where(reg.c.seriename == seriename)
        tablename = cn.execute(sql).scalar()
        if tablename is None:
            # creation time
            return
        self.serie_tablename[seriename] = tablename
        return tablename

    def _table_definition_for(self, cn, seriename):
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            # creation time
Aurélien Campéas's avatar
Aurélien Campéas committed
596
            tablename = self._make_tablename(cn, seriename)
597
598
        fq_tablename = '{}.timeserie.{}'.format(self.namespace, tablename)
        table = TABLES.get(fq_tablename)
599
        if table is None:
600
601
            TABLES[fq_tablename] = table = Table(
                tablename, self.schema.meta,
602
603
604
                Column('id', Integer, primary_key=True),
                Column('cset', Integer,
                       ForeignKey('{}.changeset.id'.format(self.namespace)),
605
                       nullable=False),
606
607
                Column('start', TIMESTAMP, nullable=False),
                Column('end', TIMESTAMP, nullable=False),
608
609
610
                Column('snapshot', Integer,
                       ForeignKey('{}.snapshot.{}.id'.format(
                           self.namespace,
611
612
613
                           tablename))),
                Index(NONE_NAME, 'cset'),
                Index(NONE_NAME, 'snapshot'),
614
615
                Index(NONE_NAME, 'start'),
                Index(NONE_NAME, 'end'),
616
                schema='{}.timeserie'.format(self.namespace),
617
                keep_existing=True
618
619
            )
        return table
620

Aurélien Campéas's avatar
Aurélien Campéas committed
621
    def _make_ts_table(self, cn, seriename):
622
        table = self._table_definition_for(cn, seriename)
623
        table.create(cn)
624
625
626
        return table

    def _register_serie(self, cn, seriename, ts):
627
628
        index = ts.index
        inames = [name for name in index.names if name]
629
        sql = self.schema.registry.insert().values(
630
            seriename=seriename,
Aurélien Campéas's avatar
Aurélien Campéas committed
631
            table_name=self._make_tablename(cn, seriename),
632
633
634
635
            metadata={
                'tzaware': tzaware_serie(ts),
                'index_type': index.dtype.name,
                'index_names': inames,
636
637
                'index_dtype': index.dtype.str,
                'value_dtype': ts.dtypes.str,
638
                'value_type': ts.dtypes.name
639
            }
640
        )
641
642
        regid = cn.execute(sql).inserted_primary_key[0]
        self.registry_map[seriename] = regid
643

644
    def _get_ts_table(self, cn, seriename):
645
646
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename:
Aurélien Campéas's avatar
Aurélien Campéas committed
647
            return self._table_definition_for(cn, seriename)
648

649
650
    # changeset handling

651
    def _newchangeset(self, cn, author, insertion_date=None, metadata=None):
652
        table = self.schema.changeset
653
654
655
        if insertion_date is not None:
            assert insertion_date.tzinfo is not None
        idate = pd.Timestamp(insertion_date or datetime.utcnow(), tz='UTC')
656
657
        sql = table.insert().values(
            author=author,
658
            metadata=metadata,
659
            insertion_date=idate)
660
        return cn.execute(sql).inserted_primary_key[0]
661

662
    def _changeset_series(self, cn, csid):
663
        cset_serie = self.schema.changeset_series
664
665
        reg = self.schema.registry
        sql = select(
666
            [reg.c.seriename]
667
668
        ).where(cset_serie.c.cset == csid
        ).where(cset_serie.c.serie == reg.c.id)
669

670
        return [
671
            row.seriename
672
673
            for row in cn.execute(sql).fetchall()
        ]
674

675
676
677
678
679
680
681
    def _previous_cset(self, cn, seriename, csid):
        tablename = self._serie_to_tablename(cn, seriename)
        sql = ('select cset from "{}.timeserie"."{}" '
               'where cset < %(csid)s '
               'order by cset desc limit 1').format(self.namespace, tablename)
        return cn.execute(sql, csid=csid).scalar()

682
683
    # insertion handling

684
    def _validate(self, cn, ts, seriename):
685
686
        if ts.isnull().all():
            # ts erasure
687
            return
688
        tstype = ts.dtype
689
        meta = self.metadata(cn, seriename)
690
        if tstype != meta['value_type']:
691
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
692
                seriename, tstype, meta['value_type'])
693
            raise Exception(m)
694
        if ts.index.dtype.name != meta['index_type']:
695
            raise Exception('Incompatible index types')
696

697
698
    def _name_to_regid(self, cn, seriename):
        regid = self.registry_map.get(seriename)
699
700
701
702
        if regid is not None:
            return regid

        registry = self.schema.registry
703
        sql = select([registry.c.id]).where(registry.c.seriename == seriename)
704
        regid = self.registry_map[seriename] = cn.execute(sql).scalar()
705
706
        return regid

707
    def _finalize_insertion(self, cn, csid, seriename):
708
709
710
        table = self.schema.changeset_series
        sql = table.insert().values(
            cset=csid,
711
            serie=self._name_to_regid(cn, seriename)
712
713
        )
        cn.execute(sql)
714

715
    def _resetcaches(self):
716
717
718
719
720
721
        with self.cachelock:
            TABLES.clear()
            SNAPTABLES.clear()
            self.metadatacache.clear()
            self.registry_map.clear()
            self.serie_tablename.clear()