tsio.py 26.4 KB
Newer Older
1
from datetime import datetime
2
import logging
3
4
import hashlib
import uuid
5
from threading import Lock
6
7
8

import pandas as pd

9
10
from sqlalchemy import Table, Column, Integer, ForeignKey, Index
from sqlalchemy.sql.elements import NONE_NAME
11
from sqlalchemy.engine.base import Engine
Aurélien Campéas's avatar
Aurélien Campéas committed
12
from sqlalchemy.sql.expression import select, func, desc
Aurélien Campéas's avatar
Aurélien Campéas committed
13
from sqlalchemy.dialects.postgresql import TIMESTAMP
14

15
from tshistory.schema import tsschema
16
from tshistory.util import (
17
    closed_overlaps,
18
19
    num2float,
    SeriesServices,
20
    start_end,
21
22
    tzaware_serie
)
23
from tshistory.snapshot import Snapshot, TABLES as SNAPTABLES
24

25
L = logging.getLogger('tshistory.tsio')
26
TABLES = {}
27
28


29
class TimeSerie(SeriesServices):
30
    namespace = 'tsh'
31
    schema = None
32
    metadatacache = None
33
34
35
36
37
38
39
40
    metakeys = {
        'tzaware',
        'index_type',
        'index_names',
        'index_dtype',
        'value_dtype',
        'value_type'
    }
41
    registry_map = None
42
    serie_tablename = None
43
    create_lock_id = None
44
    cachelock = Lock()
45
46
47

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
48
49
        self.schema = tsschema(namespace)
        self.schema.define()
50
        self.metadatacache = {}
51
        self.registry_map = {}
52
        self.serie_tablename = {}
53
        self.create_lock_id = sum(ord(c) for c in namespace)
54

55
56
57
58
59
60
    def _check_tx(self, cn):
        # safety belt to make sure important api points are tx-safe
        if isinstance(cn, Engine) or not cn.in_transaction():
            if not getattr(self, '_testing', False):
                raise TypeError('You must use a transaction object')

61
    def insert(self, cn, newts, seriename, author,
62
63
               metadata=None,
               _insertion_date=None):
64
        """Create a new revision of a given time series
65

66
        newts: pandas.Series with date index
67
        seriename: str unique identifier of the serie
68
        author: str free-form author name
69
        metadata: optional dict for changeset metadata
70
        """
71
        self._check_tx(cn)
72
73
74
75
        assert isinstance(newts, pd.Series), 'Not a pd.Series'
        assert isinstance(seriename, str), 'Name not a string'
        assert isinstance(author, str), 'Author not a string'
        assert metadata is None or isinstance(metadata, dict), 'Bad format for metadata'
76
77
        assert (_insertion_date is None or
                isinstance(_insertion_date, datetime)), 'Bad format for insertion date'
78
        assert not newts.index.duplicated().any(), 'There are some duplicates in the index'
79

80
        assert newts.index.notna().all(), 'The index contains NaT entries'
81
82
        if not newts.index.is_monotonic_increasing:
            newts = newts.sort_index()
83

84
        newts = num2float(newts)
85

86
        if not len(newts):
87
            return
88

89
        assert ('<M8[ns]' == newts.index.dtype or
90
                'datetime' in str(newts.index.dtype) and not
91
92
                isinstance(newts.index, pd.MultiIndex))

93
94
        newts.name = seriename
        table = self._get_ts_table(cn, seriename)
95

96
        if table is None:
97
            return self._create(cn, newts, seriename, author,
98
                                metadata, _insertion_date)
99

100
        return self._update(cn, table, newts, seriename, author,
101
                            metadata, _insertion_date)
102

103
    def get(self, cn, seriename, revision_date=None,
104
105
            from_value_date=None, to_value_date=None,
            _keep_nans=False):
106
107
108
109
110
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

111
        """
Aurélien Campéas's avatar
Aurélien Campéas committed
112
        if not self.exists(cn, seriename):
113
            return
114

115
        csetfilter = []
116
        if revision_date:
117
            csetfilter.append(lambda cset: cset.c.insertion_date <= revision_date)
118
        snap = Snapshot(cn, self, seriename)
119
        _, current = snap.find(csetfilter=csetfilter,
120
121
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)
122

123
        if current is not None and not _keep_nans:
124
            current.name = seriename
125
            current = current[~current.isnull()]
126
        return current
127

128
    def metadata(self, cn, seriename):
129
        """Return metadata dict of timeserie."""
130
131
        if seriename in self.metadatacache:
            return self.metadatacache[seriename]
132
133
        reg = self.schema.registry
        sql = select([reg.c.metadata]).where(
134
            reg.c.seriename == seriename
135
136
        )
        meta = cn.execute(sql).scalar()
137
        self.metadatacache[seriename] = meta
138
139
        return meta

140
    def update_metadata(self, cn, seriename, metadata):
141
        self._check_tx(cn)
142
        assert isinstance(metadata, dict)
143
        assert not set(metadata.keys()) & self.metakeys
144
        meta = self.metadata(cn, seriename)
145
146
147
        # remove al but internal stuff
        newmeta = {key: meta[key] for key in self.metakeys}
        newmeta.update(metadata)
148
149
        reg = self.schema.registry
        sql = reg.update().where(
150
            reg.c.seriename == seriename
151
        ).values(metadata=newmeta)
152
        self.metadatacache.pop(seriename)
153
154
        cn.execute(sql)

155
156
157
158
159
160
161
    def changeset_metadata(self, cn, csid):
        sql = 'select metadata from "{ns}".changeset where id = {id}'.format(
            ns=self.namespace,
            id=csid
        )
        return cn.execute(sql).scalar()

162
163
164
    def type(self, cn, name):
        return 'primary'

165
    def get_history(self, cn, seriename,
166
                    from_insertion_date=None,
167
168
                    to_insertion_date=None,
                    from_value_date=None,
169
                    to_value_date=None,
170
                    deltabefore=None,
171
                    deltaafter=None,
172
173
                    diffmode=False,
                    _keep_nans=False):
174
        table = self._get_ts_table(cn, seriename)
175
176
177
        if table is None:
            return

178
        cset = self.schema.changeset
179
180
181
182
183
184
185
186
187
188
189
190
        revsql = select(
            [cset.c.id, cset.c.insertion_date]
        ).order_by(
            cset.c.id
        ).where(
            table.c.cset == cset.c.id
        )

        if from_insertion_date:
            revsql = revsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            revsql = revsql.where(cset.c.insertion_date <= to_insertion_date)
191

192
193
194
195
196
197
198
199
200
        if from_value_date or to_value_date:
            revsql = revsql.where(
                closed_overlaps(from_value_date, to_value_date)
            )

        revs = cn.execute(
            revsql,
            {'fromdate': from_value_date, 'todate': to_value_date}
        ).fetchall()
201
        if not revs:
202
            return {}
203

204
205
206
207
208
209
        if diffmode:
            # compute the previous serie value
            first_csid = revs[0][0]
            previous_csid = self._previous_cset(cn, seriename, first_csid)
            revs.insert(0, (previous_csid, None))

210
        snapshot = Snapshot(cn, self, seriename)
211
        series = []
212
213
        if (deltabefore, deltaafter) != (None, None):
            for csid, idate in revs:
214
215
                from_date = None
                to_date = None
216
                if deltabefore is not None:
217
                    from_date = idate - deltabefore
218
                if deltaafter is not None:
219
                    to_date = idate + deltaafter
220
221
222
                series.append((
                    idate,
                    snapshot.find(csetfilter=[lambda cset: cset.c.id == csid],
223
224
                                  from_value_date=from_date,
                                  to_value_date=to_date)[1]
225
226
227
228
229
230
                ))
        else:
            series = snapshot.findall(revs,
                                      from_value_date,
                                      to_value_date
            )
231

232
233
        if diffmode:
            diffs = []
Aurélien Campéas's avatar
Aurélien Campéas committed
234
            for (_revdate_a, serie_a), (revdate_b, serie_b) in zip(series, series[1:]):
235
236
237
238
239
240
241
                if serie_a is None:
                    # when we scan the entirety of the history: there exists no "previous" serie
                    # we therefore consider the first serie as a diff to the "null" serie
                    diffs.append((revdate_b, serie_b))
                else:
                    diffs.append((revdate_b, self.diff(serie_a, serie_b)))
            series = diffs
242
243
        else:
            series = [
244
                (idate, ts if _keep_nans else ts.dropna() )
245
246
                 for idate, ts in series
            ]
247

248
        return {
249
            pd.Timestamp(idate).astimezone('UTC'): serie
250
251
            for idate, serie in series
        }
252

253
254
255
    def get_delta(self, cn, seriename, delta,
                  from_value_date=None,
                  to_value_date=None):
256
257
258
259
        """ compute a series whose value dates are bounded to be
        `delta` time after the insertion dates and where we
        keep the most recent ones
        """
260
        histo = self.get_history(
261
262
            cn, seriename, deltabefore=-delta,
            from_value_date=from_value_date,
263
264
            to_value_date=to_value_date,
            _keep_nans=True
265
        )
266
267
        if histo is None:
            return None
268

269
270
271
272
273
274
275
276
        vimap = {}
        vvmap = {}
        for idate, series in histo.items():
            for vdate, value in series.iteritems():
                if vdate not in vimap or vimap[vdate] < idate:
                    vimap[vdate] = idate
                    vvmap[vdate] = value

277
        ts = pd.Series(vvmap).sort_index().loc[from_value_date:to_value_date]
278
        ts.name = seriename
279
        return ts.dropna()
280

281
282
    def exists(self, cn, seriename):
        return self._get_ts_table(cn, seriename) is not None
283

284
    def latest_insertion_date(self, cn, seriename):
285
        cset = self.schema.changeset
286
        tstable = self._get_ts_table(cn, seriename)
287
288
289
290
291
292
293
294
        sql = select(
            [func.max(cset.c.insertion_date)]
        ).where(
            tstable.c.cset == cset.c.id
        )
        return pd.Timestamp(
            cn.execute(sql).scalar()
        ).astimezone('UTC')
295

296
297
    def insertion_dates(self, cn, seriename,
                        fromdate=None, todate=None):
298
299
300
301
302
303
304
305
306
        cset = self.schema.changeset
        tstable = self._get_ts_table(cn, seriename)
        sql = select(
            [cset.c.insertion_date]
        ).where(
            tstable.c.cset == cset.c.id
        ).order_by(
            cset.c.id
        )
307
308
309
310
311
312

        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)
        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)

313
314
315
316
317
        return [
            pd.Timestamp(idate).astimezone('UTC')
            for idate, in cn.execute(sql).fetchall()
        ]

318
319
320
321
    def last_id(self, cn, seriename):
        snapshot = Snapshot(cn, self, seriename)
        return snapshot.last_id()

322
323
324
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
        assert mode in ('strict', 'before', 'after')
        cset = self.schema.changeset
325
        table = self._table_definition_for(cn, seriename)
326
327
        sql = select([table.c.cset]).where(
            table.c.cset == cset.c.id
328
329
330
331
332
333
334
335
336
        )
        if mode == 'strict':
            sql = sql.where(cset.c.insertion_date == revdate)
        elif mode == 'before':
            sql = sql.where(cset.c.insertion_date <= revdate)
        else:
            sql = sql.where(cset.c.insertion_date >= revdate)
        return cn.execute(sql).scalar()

337
    def delete(self, cn, seriename):
338
        self._check_tx(cn)
339
        assert not isinstance(cn, Engine), 'use a transaction object'
340
341
342
        if not self.exists(cn, seriename):
            print('not deleting unknown series', seriename, self.namespace)
            return
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
        # changeset will keep ghost entries
        # we cleanup changeset series, then registry
        # then we drop the two remaining tables
        # cn *must* be a transaction scope
        rid, tablename = cn.execute(
            'select id, table_name from "{}".registry '
            'where seriename = %(seriename)s'.format(self.namespace),
            seriename=seriename
        ).fetchone()
        # drop series tables
        cn.execute(
            'drop table "{}.timeserie"."{}" cascade'.format(self.namespace, tablename)
        )
        cn.execute(
            'drop table "{}.snapshot"."{}" cascade'.format(self.namespace, tablename)
        )
        # cleanup changesets table
        cn.execute('with csets as ('
                   ' select cset from "{ns}".changeset_series '
                   ' where serie = %(rid)s'
                   ') '
                   'delete from "{ns}".changeset as cset using csets '
                   'where cset.id = csets.cset'.format(ns=self.namespace),
                   rid=rid
        )
        cn.execute('delete from "{}".registry '
                   'where id = %(rid)s'.format(self.namespace),
                   rid=rid)
        # -> this will transitively cleanup state changeset_series entries
        self._resetcaches()
373
        print('deleted', seriename, self.namespace)
374

375
    def strip(self, cn, seriename, csid):
376
        self._check_tx(cn)
377
378
379
380
381
382
383
384
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs

        # put stripping info in the metadata
        cset = self.schema.changeset
        cset_serie = self.schema.changeset_series
        for log in logs:
            # update changeset.metadata
385
            metadata = self.changeset_metadata(cn, log['rev']) or {}
386
387
388
389
390
391
            metadata['tshistory.info'] = 'got stripped from {}'.format(csid)
            sql = cset.update().where(cset.c.id == log['rev']
            ).values(metadata=metadata)
            cn.execute(sql)
            # delete changset_serie item
            sql = cset_serie.delete().where(
392
                cset_serie.c.cset == log['rev']
393
            ).where(
394
                cset_serie.c.serie == self._name_to_regid(cn, seriename)
395
396
397
398
            )
            cn.execute(sql)

        # wipe the diffs
399
        table = self._table_definition_for(cn, seriename)
400
        cn.execute(table.delete().where(table.c.cset >= csid))
401

402
    def info(self, cn):
403
404
        """Gather global statistics on the current tshistory repository
        """
405
        sql = 'select count(*) from "{}".registry'.format(self.namespace)
406
        stats = {'series count': cn.execute(sql).scalar()}
407
        sql = 'select max(id) from "{}".changeset'.format(self.namespace)
408
        stats['changeset count'] = cn.execute(sql).scalar()
409
410
411
        sql = 'select distinct seriename from "{}".registry order by seriename'.format(
            self.namespace
        )
412
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
413
414
        return stats

415
    def log(self, cn, limit=0, names=None, authors=None,
416
            stripped=False,
417
418
            fromrev=None, torev=None,
            fromdate=None, todate=None):
419
420
421
422
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
423
424
425
426
427
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
428

429
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date, cset.c.metadata]
430
        ).distinct().order_by(desc(cset.c.id))
431
432
433

        if limit:
            sql = sql.limit(limit)
434
        if names:
435
            sql = sql.where(reg.c.seriename.in_(names))
436
437
        if authors:
            sql = sql.where(cset.c.author.in_(authors))
438
439
440
441
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)
        if torev:
            sql = sql.where(cset.c.id <= torev)
442
443
444
445
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)
        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)
446
447
448
449
        if stripped:
            # outerjoin to show dead things
            sql = sql.select_from(cset.outerjoin(cset_series))
        else:
450
            sql = sql.where(cset.c.id == cset_series.c.cset
451
            ).where(cset_series.c.serie == reg.c.id)
452

453
        rset = cn.execute(sql)
454
        for csetid, author, revdate, meta in rset.fetchall():
455
456
            log.append({'rev': csetid, 'author': author,
                        'date': pd.Timestamp(revdate, tz='utc'),
457
                        'meta': meta or {},
458
                        'names': self._changeset_series(cn, csetid)})
459

460
        log.sort(key=lambda rev: rev['rev'])
461
462
        return log

463
    def interval(self, cn, seriename, notz=False):
464
465
466
467
468
469
470
471
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            raise ValueError(f'no such serie: {seriename}')
        sql = (f'select start, "end" '
               f'from "{self.namespace}.timeserie"."{tablename}" '
               f'order by cset desc limit 1')
        res = cn.execute(sql).fetchone()
        start, end = res.start, res.end
472
        if self.metadata(cn, seriename).get('tzaware') and not notz:
473
474
475
            start, end = pd.Timestamp(start, tz='UTC'), pd.Timestamp(end, tz='UTC')
        return pd.Interval(left=start, right=end, closed='both')

476
477
    # /API
    # Helpers
478

479
480
    # creation / update

481
    def _create(self, cn, newts, seriename, author,
482
                metadata=None, insertion_date=None):
483
        start, end = start_end(newts, notz=False)
484
485
486
487
        if start is None:
            assert end is None
            # this is just full of nans
            return None
488
489
490
491
492
        # chop off unwanted nans
        newts = newts.loc[start:end]
        if len(newts) == 0:
            return None

493
494
495
496
497
        # at creation time we take an exclusive lock to avoid
        # a deadlock on created tables against the changeset-series fk
        cn.execute(
            'select pg_advisory_xact_lock({})'.format(self.create_lock_id)
        )
498
        self._register_serie(cn, seriename, newts)
499
        snapshot = Snapshot(cn, self, seriename)
500
        csid = self._newchangeset(cn, author, insertion_date, metadata)
501
        head = snapshot.create(newts)
502
        start, end = start_end(newts)
503
504
        value = {
            'cset': csid,
505
506
507
            'snapshot': head,
            'start': start,
            'end': end
508
        }
Aurélien Campéas's avatar
Aurélien Campéas committed
509
        table = self._make_ts_table(cn, seriename)
510
        cn.execute(table.insert().values(value))
511
        self._finalize_insertion(cn, csid, seriename)
512
        L.info('first insertion of %s (size=%s) by %s',
513
               seriename, len(newts), author)
514
515
        return newts

516
    def _update(self, cn, table, newts, seriename, author,
517
                metadata=None, insertion_date=None):
518
519
        self._validate(cn, newts, seriename)
        snapshot = Snapshot(cn, self, seriename)
520
521
522
        diff = self.diff(snapshot.last(newts.index.min(),
                                       newts.index.max()),
                         newts)
523
524
        if not len(diff):
            L.info('no difference in %s by %s (for ts of size %s)',
525
                   seriename, author, len(newts))
526
527
            return

528
        # compute series start/end stamps
529
        tsstart, tsend = start_end(newts)
530
531
532
        ival = self.interval(cn, seriename, notz=True)
        start = min(tsstart or ival.left, ival.left)
        end = max(tsend or ival.right, ival.right)
533
534
535
536
537
538
539
540
541
542
543
544
545
546

        if pd.isnull(diff[0]) or pd.isnull(diff[-1]):
            # we *might* be shrinking, let's look at the full series
            # and yes, shrinkers have a slow path
            last = snapshot.last()
            patched = self.patch(last, diff).dropna()
            if not len(patched):
                raise ValueError('complete erasure of a series is forbidden')
            if pd.isnull(diff[0]):
                start = patched.index[0]
            if pd.isnull(diff[-1]):
                end = patched.index[-1]

        csid = self._newchangeset(cn, author, insertion_date, metadata)
547
548
549
        head = snapshot.update(diff)
        value = {
            'cset': csid,
550
551
552
            'snapshot': head,
            'start': start,
            'end': end
553
554
        }
        cn.execute(table.insert().values(value))
555
        self._finalize_insertion(cn, csid, seriename)
556
557

        L.info('inserted diff (size=%s) for ts %s by %s',
558
               len(diff), seriename, author)
559
560
        return diff

561
    # serie table handling
562

563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
    def _make_tablename(self, cn, seriename):
        """ compute the unqualified (no namespace) table name
        from a serie name, to allow arbitrary serie names
        """
        # postgresql table names are limited to 63 chars.
        if len(seriename) > 63:
            seriename = hashlib.sha1(seriename.encode('utf-8')).hexdigest()

        # collision detection (collision can happen after a rename)
        if cn.execute(f'select table_name '
                      f'from "{self.namespace}".registry '
                      f'where table_name = %(seriename)s',
                      seriename=seriename).scalar():
            return str(uuid.uuid4())

        return seriename

580
581
582
583
    def _serie_to_tablename(self, cn, seriename):
        tablename = self.serie_tablename.get(seriename)
        if tablename is not None:
            return tablename
584

585
586
587
588
589
590
591
592
593
594
595
596
597
        reg = self.schema.registry
        sql = select([reg.c.table_name]).where(reg.c.seriename == seriename)
        tablename = cn.execute(sql).scalar()
        if tablename is None:
            # creation time
            return
        self.serie_tablename[seriename] = tablename
        return tablename

    def _table_definition_for(self, cn, seriename):
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            # creation time
Aurélien Campéas's avatar
Aurélien Campéas committed
598
            tablename = self._make_tablename(cn, seriename)
599
600
        fq_tablename = '{}.timeserie.{}'.format(self.namespace, tablename)
        table = TABLES.get(fq_tablename)
601
        if table is None:
602
603
            TABLES[fq_tablename] = table = Table(
                tablename, self.schema.meta,
604
605
606
                Column('id', Integer, primary_key=True),
                Column('cset', Integer,
                       ForeignKey('{}.changeset.id'.format(self.namespace)),
607
                       nullable=False),
608
609
                Column('start', TIMESTAMP, nullable=False),
                Column('end', TIMESTAMP, nullable=False),
610
611
612
                Column('snapshot', Integer,
                       ForeignKey('{}.snapshot.{}.id'.format(
                           self.namespace,
613
614
615
                           tablename))),
                Index(NONE_NAME, 'cset'),
                Index(NONE_NAME, 'snapshot'),
616
617
                Index(NONE_NAME, 'start'),
                Index(NONE_NAME, 'end'),
618
                schema='{}.timeserie'.format(self.namespace),
619
                keep_existing=True
620
621
            )
        return table
622

Aurélien Campéas's avatar
Aurélien Campéas committed
623
    def _make_ts_table(self, cn, seriename):
624
        table = self._table_definition_for(cn, seriename)
625
        table.create(cn)
626
627
628
        return table

    def _register_serie(self, cn, seriename, ts):
629
630
        index = ts.index
        inames = [name for name in index.names if name]
631
        sql = self.schema.registry.insert().values(
632
            seriename=seriename,
Aurélien Campéas's avatar
Aurélien Campéas committed
633
            table_name=self._make_tablename(cn, seriename),
634
635
636
637
            metadata={
                'tzaware': tzaware_serie(ts),
                'index_type': index.dtype.name,
                'index_names': inames,
638
639
                'index_dtype': index.dtype.str,
                'value_dtype': ts.dtypes.str,
640
                'value_type': ts.dtypes.name
641
            }
642
        )
643
644
        regid = cn.execute(sql).inserted_primary_key[0]
        self.registry_map[seriename] = regid
645

646
    def _get_ts_table(self, cn, seriename):
647
648
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename:
Aurélien Campéas's avatar
Aurélien Campéas committed
649
            return self._table_definition_for(cn, seriename)
650

651
652
    # changeset handling

653
    def _newchangeset(self, cn, author, insertion_date=None, metadata=None):
654
        table = self.schema.changeset
655
656
657
        if insertion_date is not None:
            assert insertion_date.tzinfo is not None
        idate = pd.Timestamp(insertion_date or datetime.utcnow(), tz='UTC')
658
659
        sql = table.insert().values(
            author=author,
660
            metadata=metadata,
661
            insertion_date=idate)
662
        return cn.execute(sql).inserted_primary_key[0]
663

664
    def _changeset_series(self, cn, csid):
665
        cset_serie = self.schema.changeset_series
666
667
        reg = self.schema.registry
        sql = select(
668
            [reg.c.seriename]
669
670
        ).where(cset_serie.c.cset == csid
        ).where(cset_serie.c.serie == reg.c.id)
671

672
        return [
673
            row.seriename
674
675
            for row in cn.execute(sql).fetchall()
        ]
676

677
678
679
680
681
682
683
    def _previous_cset(self, cn, seriename, csid):
        tablename = self._serie_to_tablename(cn, seriename)
        sql = ('select cset from "{}.timeserie"."{}" '
               'where cset < %(csid)s '
               'order by cset desc limit 1').format(self.namespace, tablename)
        return cn.execute(sql, csid=csid).scalar()

684
685
    # insertion handling

686
    def _validate(self, cn, ts, seriename):
687
688
        if ts.isnull().all():
            # ts erasure
689
            return
690
        tstype = ts.dtype
691
        meta = self.metadata(cn, seriename)
692
        if tstype != meta['value_type']:
693
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
694
                seriename, tstype, meta['value_type'])
695
            raise Exception(m)
696
        if ts.index.dtype.name != meta['index_type']:
697
            raise Exception('Incompatible index types')
698

699
700
    def _name_to_regid(self, cn, seriename):
        regid = self.registry_map.get(seriename)
701
702
703
704
        if regid is not None:
            return regid

        registry = self.schema.registry
705
        sql = select([registry.c.id]).where(registry.c.seriename == seriename)
706
        regid = self.registry_map[seriename] = cn.execute(sql).scalar()
707
708
        return regid

709
    def _finalize_insertion(self, cn, csid, seriename):
710
711
712
        table = self.schema.changeset_series
        sql = table.insert().values(
            cset=csid,
713
            serie=self._name_to_regid(cn, seriename)
714
715
        )
        cn.execute(sql)
716

717
    def _resetcaches(self):
718
719
720
721
722
723
        with self.cachelock:
            TABLES.clear()
            SNAPTABLES.clear()
            self.metadatacache.clear()
            self.registry_map.clear()
            self.serie_tablename.clear()