tsio.py 26.5 KB
Newer Older
1
from datetime import datetime
2
import logging
3
4
import hashlib
import uuid
5
from threading import Lock
6
7
8

import pandas as pd

9
10
from sqlalchemy import Table, Column, Integer, ForeignKey, Index
from sqlalchemy.sql.elements import NONE_NAME
11
from sqlalchemy.engine.base import Engine
Aurélien Campéas's avatar
Aurélien Campéas committed
12
from sqlalchemy.sql.expression import select, func, desc
Aurélien Campéas's avatar
Aurélien Campéas committed
13
from sqlalchemy.dialects.postgresql import TIMESTAMP
14

15
from tshistory.schema import tsschema
16
from tshistory.util import (
17
    closed_overlaps,
18
19
    num2float,
    SeriesServices,
20
    start_end,
21
22
    tzaware_serie
)
23
from tshistory.snapshot import Snapshot, TABLES as SNAPTABLES
24

25
L = logging.getLogger('tshistory.tsio')
26
TABLES = {}
27
28


29
30
31
32
33
34
35
36
37
38
39
def tx(func):
    def check_tx_and_call(self, cn, *a, **kw):
        # safety belt to make sure important api points are tx-safe
        if isinstance(cn, Engine) or not cn.in_transaction():
            if not getattr(self, '_testing', False):
                raise TypeError('You must use a transaction object')

        return func(self, cn, *a, **kw)
    return check_tx_and_call


40
class TimeSerie(SeriesServices):
41
    namespace = 'tsh'
42
    schema = None
43
    metadatacache = None
44
45
46
47
48
49
50
51
    metakeys = {
        'tzaware',
        'index_type',
        'index_names',
        'index_dtype',
        'value_dtype',
        'value_type'
    }
52
    registry_map = None
53
    serie_tablename = None
54
    create_lock_id = None
55
    cachelock = Lock()
56
57
58

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
59
60
        self.schema = tsschema(namespace)
        self.schema.define()
61
        self.metadatacache = {}
62
        self.registry_map = {}
63
        self.serie_tablename = {}
64
        self.create_lock_id = sum(ord(c) for c in namespace)
65

66
    @tx
67
    def insert(self, cn, newts, seriename, author,
68
69
               metadata=None,
               _insertion_date=None):
70
        """Create a new revision of a given time series
71

72
        newts: pandas.Series with date index
73
        seriename: str unique identifier of the serie
74
        author: str free-form author name
75
        metadata: optional dict for changeset metadata
76
        """
77
78
79
80
        assert isinstance(newts, pd.Series), 'Not a pd.Series'
        assert isinstance(seriename, str), 'Name not a string'
        assert isinstance(author, str), 'Author not a string'
        assert metadata is None or isinstance(metadata, dict), 'Bad format for metadata'
81
82
        assert (_insertion_date is None or
                isinstance(_insertion_date, datetime)), 'Bad format for insertion date'
83
        assert not newts.index.duplicated().any(), 'There are some duplicates in the index'
84

85
        assert newts.index.notna().all(), 'The index contains NaT entries'
86
87
        if not newts.index.is_monotonic_increasing:
            newts = newts.sort_index()
88

89
        newts = num2float(newts)
90

91
        if not len(newts):
92
            return
93

94
        assert ('<M8[ns]' == newts.index.dtype or
95
                'datetime' in str(newts.index.dtype) and not
96
97
                isinstance(newts.index, pd.MultiIndex))

98
99
        newts.name = seriename
        table = self._get_ts_table(cn, seriename)
100

101
        if table is None:
102
            return self._create(cn, newts, seriename, author,
103
                                metadata, _insertion_date)
104

105
        return self._update(cn, table, newts, seriename, author,
106
                            metadata, _insertion_date)
107

108
    def get(self, cn, seriename, revision_date=None,
109
110
            from_value_date=None, to_value_date=None,
            _keep_nans=False):
111
112
113
114
115
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

116
        """
Aurélien Campéas's avatar
Aurélien Campéas committed
117
        if not self.exists(cn, seriename):
118
            return
119

120
        csetfilter = []
121
        if revision_date:
122
            csetfilter.append(lambda cset: cset.c.insertion_date <= revision_date)
123
        snap = Snapshot(cn, self, seriename)
124
        _, current = snap.find(csetfilter=csetfilter,
125
126
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)
127

128
        if current is not None and not _keep_nans:
129
            current.name = seriename
130
            current = current[~current.isnull()]
131
        return current
132

133
    def metadata(self, cn, seriename):
134
        """Return metadata dict of timeserie."""
135
136
        if seriename in self.metadatacache:
            return self.metadatacache[seriename]
137
138
        reg = self.schema.registry
        sql = select([reg.c.metadata]).where(
139
            reg.c.seriename == seriename
140
141
        )
        meta = cn.execute(sql).scalar()
142
        self.metadatacache[seriename] = meta
143
144
        return meta

145
    @tx
146
    def update_metadata(self, cn, seriename, metadata, internal=False):
147
        assert isinstance(metadata, dict)
148
        assert internal or not set(metadata.keys()) & self.metakeys
149
        meta = self.metadata(cn, seriename)
150
        # remove al but internal stuff
151
152
153
154
155
        newmeta = {
            key: meta[key]
            for key in self.metakeys
            if meta.get(key) is not None
        }
156
        newmeta.update(metadata)
157
158
        reg = self.schema.registry
        sql = reg.update().where(
159
            reg.c.seriename == seriename
160
        ).values(metadata=newmeta)
161
        self.metadatacache.pop(seriename)
162
163
        cn.execute(sql)

164
165
166
167
168
169
170
    def changeset_metadata(self, cn, csid):
        sql = 'select metadata from "{ns}".changeset where id = {id}'.format(
            ns=self.namespace,
            id=csid
        )
        return cn.execute(sql).scalar()

171
172
173
    def type(self, cn, name):
        return 'primary'

174
    @tx
175
    def get_history(self, cn, seriename,
176
                    from_insertion_date=None,
177
178
                    to_insertion_date=None,
                    from_value_date=None,
179
                    to_value_date=None,
180
                    deltabefore=None,
181
                    deltaafter=None,
182
183
                    diffmode=False,
                    _keep_nans=False):
184
        table = self._get_ts_table(cn, seriename)
185
186
187
        if table is None:
            return

188
        cset = self.schema.changeset
189
190
191
192
193
194
195
196
197
198
199
200
        revsql = select(
            [cset.c.id, cset.c.insertion_date]
        ).order_by(
            cset.c.id
        ).where(
            table.c.cset == cset.c.id
        )

        if from_insertion_date:
            revsql = revsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            revsql = revsql.where(cset.c.insertion_date <= to_insertion_date)
201

202
203
204
205
206
207
208
209
210
        if from_value_date or to_value_date:
            revsql = revsql.where(
                closed_overlaps(from_value_date, to_value_date)
            )

        revs = cn.execute(
            revsql,
            {'fromdate': from_value_date, 'todate': to_value_date}
        ).fetchall()
211
        if not revs:
212
            return {}
213

214
215
216
217
218
219
        if diffmode:
            # compute the previous serie value
            first_csid = revs[0][0]
            previous_csid = self._previous_cset(cn, seriename, first_csid)
            revs.insert(0, (previous_csid, None))

220
        snapshot = Snapshot(cn, self, seriename)
221
        series = []
222
223
        if (deltabefore, deltaafter) != (None, None):
            for csid, idate in revs:
224
225
                from_date = None
                to_date = None
226
                if deltabefore is not None:
227
                    from_date = idate - deltabefore
228
                if deltaafter is not None:
229
                    to_date = idate + deltaafter
230
231
232
                series.append((
                    idate,
                    snapshot.find(csetfilter=[lambda cset: cset.c.id == csid],
233
234
                                  from_value_date=from_date,
                                  to_value_date=to_date)[1]
235
236
237
238
239
240
                ))
        else:
            series = snapshot.findall(revs,
                                      from_value_date,
                                      to_value_date
            )
241

242
243
        if diffmode:
            diffs = []
Aurélien Campéas's avatar
Aurélien Campéas committed
244
            for (_revdate_a, serie_a), (revdate_b, serie_b) in zip(series, series[1:]):
245
246
247
248
249
250
251
                if serie_a is None:
                    # when we scan the entirety of the history: there exists no "previous" serie
                    # we therefore consider the first serie as a diff to the "null" serie
                    diffs.append((revdate_b, serie_b))
                else:
                    diffs.append((revdate_b, self.diff(serie_a, serie_b)))
            series = diffs
252
253
        else:
            series = [
254
                (idate, ts if _keep_nans else ts.dropna() )
255
256
                 for idate, ts in series
            ]
257

258
        return {
259
            pd.Timestamp(idate).astimezone('UTC'): serie
260
261
            for idate, serie in series
        }
262

263
    @tx
264
265
266
    def get_delta(self, cn, seriename, delta,
                  from_value_date=None,
                  to_value_date=None):
267
268
269
270
        """ compute a series whose value dates are bounded to be
        `delta` time after the insertion dates and where we
        keep the most recent ones
        """
271
        histo = self.get_history(
272
273
            cn, seriename, deltabefore=-delta,
            from_value_date=from_value_date,
274
275
            to_value_date=to_value_date,
            _keep_nans=True
276
        )
277
278
        if histo is None:
            return None
279

280
281
282
283
284
285
286
287
        vimap = {}
        vvmap = {}
        for idate, series in histo.items():
            for vdate, value in series.iteritems():
                if vdate not in vimap or vimap[vdate] < idate:
                    vimap[vdate] = idate
                    vvmap[vdate] = value

288
        ts = pd.Series(vvmap).sort_index().loc[from_value_date:to_value_date]
289
        ts.name = seriename
290
        return ts.dropna()
291

292
293
    def exists(self, cn, seriename):
        return self._get_ts_table(cn, seriename) is not None
294

295
    def latest_insertion_date(self, cn, seriename):
296
        cset = self.schema.changeset
297
        tstable = self._get_ts_table(cn, seriename)
298
299
300
301
302
303
304
305
        sql = select(
            [func.max(cset.c.insertion_date)]
        ).where(
            tstable.c.cset == cset.c.id
        )
        return pd.Timestamp(
            cn.execute(sql).scalar()
        ).astimezone('UTC')
306

307
308
    def insertion_dates(self, cn, seriename,
                        fromdate=None, todate=None):
309
310
311
312
313
314
315
316
317
        cset = self.schema.changeset
        tstable = self._get_ts_table(cn, seriename)
        sql = select(
            [cset.c.insertion_date]
        ).where(
            tstable.c.cset == cset.c.id
        ).order_by(
            cset.c.id
        )
318
319
320
321
322
323

        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)
        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)

324
325
326
327
328
        return [
            pd.Timestamp(idate).astimezone('UTC')
            for idate, in cn.execute(sql).fetchall()
        ]

329
330
331
332
    def last_id(self, cn, seriename):
        snapshot = Snapshot(cn, self, seriename)
        return snapshot.last_id()

333
334
335
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
        assert mode in ('strict', 'before', 'after')
        cset = self.schema.changeset
336
        table = self._table_definition_for(cn, seriename)
337
338
        sql = select([table.c.cset]).where(
            table.c.cset == cset.c.id
339
340
341
342
343
344
345
346
347
        )
        if mode == 'strict':
            sql = sql.where(cset.c.insertion_date == revdate)
        elif mode == 'before':
            sql = sql.where(cset.c.insertion_date <= revdate)
        else:
            sql = sql.where(cset.c.insertion_date >= revdate)
        return cn.execute(sql).scalar()

348
    @tx
349
    def delete(self, cn, seriename):
350
351
352
        if not self.exists(cn, seriename):
            print('not deleting unknown series', seriename, self.namespace)
            return
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
        # changeset will keep ghost entries
        # we cleanup changeset series, then registry
        # then we drop the two remaining tables
        # cn *must* be a transaction scope
        rid, tablename = cn.execute(
            'select id, table_name from "{}".registry '
            'where seriename = %(seriename)s'.format(self.namespace),
            seriename=seriename
        ).fetchone()
        # drop series tables
        cn.execute(
            'drop table "{}.timeserie"."{}" cascade'.format(self.namespace, tablename)
        )
        cn.execute(
            'drop table "{}.snapshot"."{}" cascade'.format(self.namespace, tablename)
        )
        # cleanup changesets table
        cn.execute('with csets as ('
                   ' select cset from "{ns}".changeset_series '
                   ' where serie = %(rid)s'
                   ') '
                   'delete from "{ns}".changeset as cset using csets '
                   'where cset.id = csets.cset'.format(ns=self.namespace),
                   rid=rid
        )
        cn.execute('delete from "{}".registry '
                   'where id = %(rid)s'.format(self.namespace),
                   rid=rid)
        # -> this will transitively cleanup state changeset_series entries
        self._resetcaches()
383
        print('deleted', seriename, self.namespace)
384

385
    @tx
386
387
388
389
390
391
392
393
394
    def strip(self, cn, seriename, csid):
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs

        # put stripping info in the metadata
        cset = self.schema.changeset
        cset_serie = self.schema.changeset_series
        for log in logs:
            # update changeset.metadata
395
            metadata = self.changeset_metadata(cn, log['rev']) or {}
396
397
398
399
400
401
            metadata['tshistory.info'] = 'got stripped from {}'.format(csid)
            sql = cset.update().where(cset.c.id == log['rev']
            ).values(metadata=metadata)
            cn.execute(sql)
            # delete changset_serie item
            sql = cset_serie.delete().where(
402
                cset_serie.c.cset == log['rev']
403
            ).where(
404
                cset_serie.c.serie == self._name_to_regid(cn, seriename)
405
406
407
408
            )
            cn.execute(sql)

        # wipe the diffs
409
        table = self._table_definition_for(cn, seriename)
410
        cn.execute(table.delete().where(table.c.cset >= csid))
411

412
    def info(self, cn):
413
414
        """Gather global statistics on the current tshistory repository
        """
415
        sql = 'select count(*) from "{}".registry'.format(self.namespace)
416
        stats = {'series count': cn.execute(sql).scalar()}
417
        sql = 'select max(id) from "{}".changeset'.format(self.namespace)
418
        stats['changeset count'] = cn.execute(sql).scalar()
419
420
421
        sql = 'select distinct seriename from "{}".registry order by seriename'.format(
            self.namespace
        )
422
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
423
424
        return stats

425
    def log(self, cn, limit=0, names=None, authors=None,
426
            stripped=False,
427
428
            fromrev=None, torev=None,
            fromdate=None, todate=None):
429
430
431
432
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
433
434
435
436
437
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
438

439
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date, cset.c.metadata]
440
        ).distinct().order_by(desc(cset.c.id))
441
442
443

        if limit:
            sql = sql.limit(limit)
444
        if names:
445
            sql = sql.where(reg.c.seriename.in_(names))
446
447
        if authors:
            sql = sql.where(cset.c.author.in_(authors))
448
449
450
451
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)
        if torev:
            sql = sql.where(cset.c.id <= torev)
452
453
454
455
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)
        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)
456
457
458
459
        if stripped:
            # outerjoin to show dead things
            sql = sql.select_from(cset.outerjoin(cset_series))
        else:
460
            sql = sql.where(cset.c.id == cset_series.c.cset
461
            ).where(cset_series.c.serie == reg.c.id)
462

463
        rset = cn.execute(sql)
464
        for csetid, author, revdate, meta in rset.fetchall():
465
466
            log.append({'rev': csetid, 'author': author,
                        'date': pd.Timestamp(revdate, tz='utc'),
467
                        'meta': meta or {},
468
                        'names': self._changeset_series(cn, csetid)})
469

470
        log.sort(key=lambda rev: rev['rev'])
471
472
        return log

473
    def interval(self, cn, seriename, notz=False):
474
475
476
477
478
479
480
481
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            raise ValueError(f'no such serie: {seriename}')
        sql = (f'select start, "end" '
               f'from "{self.namespace}.timeserie"."{tablename}" '
               f'order by cset desc limit 1')
        res = cn.execute(sql).fetchone()
        start, end = res.start, res.end
482
        if self.metadata(cn, seriename).get('tzaware') and not notz:
483
484
485
            start, end = pd.Timestamp(start, tz='UTC'), pd.Timestamp(end, tz='UTC')
        return pd.Interval(left=start, right=end, closed='both')

486
487
    # /API
    # Helpers
488

489
490
    # creation / update

491
    def _create(self, cn, newts, seriename, author,
492
                metadata=None, insertion_date=None):
493
        start, end = start_end(newts, notz=False)
494
495
496
497
        if start is None:
            assert end is None
            # this is just full of nans
            return None
498
499
500
501
502
        # chop off unwanted nans
        newts = newts.loc[start:end]
        if len(newts) == 0:
            return None

503
504
505
506
507
        # at creation time we take an exclusive lock to avoid
        # a deadlock on created tables against the changeset-series fk
        cn.execute(
            'select pg_advisory_xact_lock({})'.format(self.create_lock_id)
        )
508
        self._register_serie(cn, seriename, newts)
509
        snapshot = Snapshot(cn, self, seriename)
510
        csid = self._newchangeset(cn, author, insertion_date, metadata)
511
        head = snapshot.create(newts)
512
        start, end = start_end(newts)
513
514
        value = {
            'cset': csid,
515
516
517
            'snapshot': head,
            'start': start,
            'end': end
518
        }
Aurélien Campéas's avatar
Aurélien Campéas committed
519
        table = self._make_ts_table(cn, seriename)
520
        cn.execute(table.insert().values(value))
521
        self._finalize_insertion(cn, csid, seriename)
522
        L.info('first insertion of %s (size=%s) by %s',
523
               seriename, len(newts), author)
524
525
        return newts

526
    def _update(self, cn, table, newts, seriename, author,
527
                metadata=None, insertion_date=None):
528
529
        self._validate(cn, newts, seriename)
        snapshot = Snapshot(cn, self, seriename)
530
531
532
        diff = self.diff(snapshot.last(newts.index.min(),
                                       newts.index.max()),
                         newts)
533
534
        if not len(diff):
            L.info('no difference in %s by %s (for ts of size %s)',
535
                   seriename, author, len(newts))
536
537
            return

538
        # compute series start/end stamps
539
        tsstart, tsend = start_end(newts)
540
541
542
        ival = self.interval(cn, seriename, notz=True)
        start = min(tsstart or ival.left, ival.left)
        end = max(tsend or ival.right, ival.right)
543
544
545
546
547
548
549
550
551
552
553
554
555
556

        if pd.isnull(diff[0]) or pd.isnull(diff[-1]):
            # we *might* be shrinking, let's look at the full series
            # and yes, shrinkers have a slow path
            last = snapshot.last()
            patched = self.patch(last, diff).dropna()
            if not len(patched):
                raise ValueError('complete erasure of a series is forbidden')
            if pd.isnull(diff[0]):
                start = patched.index[0]
            if pd.isnull(diff[-1]):
                end = patched.index[-1]

        csid = self._newchangeset(cn, author, insertion_date, metadata)
557
558
559
        head = snapshot.update(diff)
        value = {
            'cset': csid,
560
561
562
            'snapshot': head,
            'start': start,
            'end': end
563
564
        }
        cn.execute(table.insert().values(value))
565
        self._finalize_insertion(cn, csid, seriename)
566
567

        L.info('inserted diff (size=%s) for ts %s by %s',
568
               len(diff), seriename, author)
569
570
        return diff

571
    # serie table handling
572

573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
    def _make_tablename(self, cn, seriename):
        """ compute the unqualified (no namespace) table name
        from a serie name, to allow arbitrary serie names
        """
        # postgresql table names are limited to 63 chars.
        if len(seriename) > 63:
            seriename = hashlib.sha1(seriename.encode('utf-8')).hexdigest()

        # collision detection (collision can happen after a rename)
        if cn.execute(f'select table_name '
                      f'from "{self.namespace}".registry '
                      f'where table_name = %(seriename)s',
                      seriename=seriename).scalar():
            return str(uuid.uuid4())

        return seriename

590
591
592
593
    def _serie_to_tablename(self, cn, seriename):
        tablename = self.serie_tablename.get(seriename)
        if tablename is not None:
            return tablename
594

595
596
597
598
599
600
601
602
603
604
605
606
607
        reg = self.schema.registry
        sql = select([reg.c.table_name]).where(reg.c.seriename == seriename)
        tablename = cn.execute(sql).scalar()
        if tablename is None:
            # creation time
            return
        self.serie_tablename[seriename] = tablename
        return tablename

    def _table_definition_for(self, cn, seriename):
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            # creation time
Aurélien Campéas's avatar
Aurélien Campéas committed
608
            tablename = self._make_tablename(cn, seriename)
609
610
        fq_tablename = '{}.timeserie.{}'.format(self.namespace, tablename)
        table = TABLES.get(fq_tablename)
611
        if table is None:
612
613
            TABLES[fq_tablename] = table = Table(
                tablename, self.schema.meta,
614
615
616
                Column('id', Integer, primary_key=True),
                Column('cset', Integer,
                       ForeignKey('{}.changeset.id'.format(self.namespace)),
617
                       nullable=False),
618
619
                Column('start', TIMESTAMP, nullable=False),
                Column('end', TIMESTAMP, nullable=False),
620
621
622
                Column('snapshot', Integer,
                       ForeignKey('{}.snapshot.{}.id'.format(
                           self.namespace,
623
624
625
                           tablename))),
                Index(NONE_NAME, 'cset'),
                Index(NONE_NAME, 'snapshot'),
626
627
                Index(NONE_NAME, 'start'),
                Index(NONE_NAME, 'end'),
628
                schema='{}.timeserie'.format(self.namespace),
629
                keep_existing=True
630
631
            )
        return table
632

Aurélien Campéas's avatar
Aurélien Campéas committed
633
    def _make_ts_table(self, cn, seriename):
634
        table = self._table_definition_for(cn, seriename)
635
        table.create(cn)
636
637
638
        return table

    def _register_serie(self, cn, seriename, ts):
639
640
        index = ts.index
        inames = [name for name in index.names if name]
641
        sql = self.schema.registry.insert().values(
642
            seriename=seriename,
Aurélien Campéas's avatar
Aurélien Campéas committed
643
            table_name=self._make_tablename(cn, seriename),
644
645
646
647
            metadata={
                'tzaware': tzaware_serie(ts),
                'index_type': index.dtype.name,
                'index_names': inames,
648
649
                'index_dtype': index.dtype.str,
                'value_dtype': ts.dtypes.str,
650
                'value_type': ts.dtypes.name
651
            }
652
        )
653
654
        regid = cn.execute(sql).inserted_primary_key[0]
        self.registry_map[seriename] = regid
655

656
    def _get_ts_table(self, cn, seriename):
657
658
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename:
Aurélien Campéas's avatar
Aurélien Campéas committed
659
            return self._table_definition_for(cn, seriename)
660

661
662
    # changeset handling

663
    def _newchangeset(self, cn, author, insertion_date=None, metadata=None):
664
        table = self.schema.changeset
665
666
667
        if insertion_date is not None:
            assert insertion_date.tzinfo is not None
        idate = pd.Timestamp(insertion_date or datetime.utcnow(), tz='UTC')
668
669
        sql = table.insert().values(
            author=author,
670
            metadata=metadata,
671
            insertion_date=idate)
672
        return cn.execute(sql).inserted_primary_key[0]
673

674
    def _changeset_series(self, cn, csid):
675
        cset_serie = self.schema.changeset_series
676
677
        reg = self.schema.registry
        sql = select(
678
            [reg.c.seriename]
679
680
        ).where(cset_serie.c.cset == csid
        ).where(cset_serie.c.serie == reg.c.id)
681

682
        return [
683
            row.seriename
684
685
            for row in cn.execute(sql).fetchall()
        ]
686

687
688
689
690
691
692
693
    def _previous_cset(self, cn, seriename, csid):
        tablename = self._serie_to_tablename(cn, seriename)
        sql = ('select cset from "{}.timeserie"."{}" '
               'where cset < %(csid)s '
               'order by cset desc limit 1').format(self.namespace, tablename)
        return cn.execute(sql, csid=csid).scalar()

694
695
    # insertion handling

696
    def _validate(self, cn, ts, seriename):
697
698
        if ts.isnull().all():
            # ts erasure
699
            return
700
        tstype = ts.dtype
701
        meta = self.metadata(cn, seriename)
702
        if tstype != meta['value_type']:
703
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
704
                seriename, tstype, meta['value_type'])
705
            raise Exception(m)
706
        if ts.index.dtype.name != meta['index_type']:
707
            raise Exception('Incompatible index types')
708

709
710
    def _name_to_regid(self, cn, seriename):
        regid = self.registry_map.get(seriename)
711
712
713
714
        if regid is not None:
            return regid

        registry = self.schema.registry
715
        sql = select([registry.c.id]).where(registry.c.seriename == seriename)
716
        regid = self.registry_map[seriename] = cn.execute(sql).scalar()
717
718
        return regid

719
    def _finalize_insertion(self, cn, csid, seriename):
720
721
722
        table = self.schema.changeset_series
        sql = table.insert().values(
            cset=csid,
723
            serie=self._name_to_regid(cn, seriename)
724
725
        )
        cn.execute(sql)
726

727
    def _resetcaches(self):
728
729
730
731
732
733
        with self.cachelock:
            TABLES.clear()
            SNAPTABLES.clear()
            self.metadatacache.clear()
            self.registry_map.clear()
            self.serie_tablename.clear()