tsio.py 25 KB
Newer Older
1
from datetime import datetime
2
import logging
3
4
5

import pandas as pd

6
7
from sqlalchemy import Table, Column, Integer, ForeignKey, Index
from sqlalchemy.sql.elements import NONE_NAME
8
from sqlalchemy.engine.base import Engine
Aurélien Campéas's avatar
Aurélien Campéas committed
9
from sqlalchemy.sql.expression import select, func, desc
Aurélien Campéas's avatar
Aurélien Campéas committed
10
from sqlalchemy.dialects.postgresql import TIMESTAMP
11

12
from tshistory.schema import tsschema
13
from tshistory.util import (
14
    closed_overlaps,
15
16
    num2float,
    SeriesServices,
17
    start_end,
18
19
    tzaware_serie
)
20
from tshistory.snapshot import Snapshot, TABLES as SNAPTABLES
21

22
L = logging.getLogger('tshistory.tsio')
23
TABLES = {}
24
25


26
class TimeSerie(SeriesServices):
27
    namespace = 'tsh'
28
    schema = None
29
    metadatacache = None
30
31
32
33
34
35
36
37
    metakeys = {
        'tzaware',
        'index_type',
        'index_names',
        'index_dtype',
        'value_dtype',
        'value_type'
    }
38
    registry_map = None
39
    serie_tablename = None
40
    create_lock_id = None
41
42
43

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
44
45
        self.schema = tsschema(namespace)
        self.schema.define()
46
        self.metadatacache = {}
47
        self.registry_map = {}
48
        self.serie_tablename = {}
49
        self.create_lock_id = sum(ord(c) for c in namespace)
50

51
52
53
54
55
56
    def _check_tx(self, cn):
        # safety belt to make sure important api points are tx-safe
        if isinstance(cn, Engine) or not cn.in_transaction():
            if not getattr(self, '_testing', False):
                raise TypeError('You must use a transaction object')

57
    def insert(self, cn, newts, seriename, author,
58
59
               metadata=None,
               _insertion_date=None):
60
        """Create a new revision of a given time series
61

62
        newts: pandas.Series with date index
63
        seriename: str unique identifier of the serie
64
        author: str free-form author name
65
        metadata: optional dict for changeset metadata
66
        """
67
        self._check_tx(cn)
68
69
70
71
        assert isinstance(newts, pd.Series), 'Not a pd.Series'
        assert isinstance(seriename, str), 'Name not a string'
        assert isinstance(author, str), 'Author not a string'
        assert metadata is None or isinstance(metadata, dict), 'Bad format for metadata'
72
73
        assert (_insertion_date is None or
                isinstance(_insertion_date, datetime)), 'Bad format for insertion date'
74
        assert not newts.index.duplicated().any(), 'There are some duplicates in the index'
75

76
        assert newts.index.notna().all(), 'The index contains NaT entries'
77
78
        if not newts.index.is_monotonic_increasing:
            newts = newts.sort_index()
79

80
        newts = num2float(newts)
81

82
        if not len(newts):
83
            return
84

85
        assert ('<M8[ns]' == newts.index.dtype or
86
                'datetime' in str(newts.index.dtype) and not
87
88
                isinstance(newts.index, pd.MultiIndex))

89
90
        newts.name = seriename
        table = self._get_ts_table(cn, seriename)
91

92
        if table is None:
93
            return self._create(cn, newts, seriename, author,
94
                                metadata, _insertion_date)
95

96
        return self._update(cn, table, newts, seriename, author,
97
                            metadata, _insertion_date)
98

99
    def get(self, cn, seriename, revision_date=None,
100
101
            from_value_date=None, to_value_date=None,
            _keep_nans=False):
102
103
104
105
106
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

107
        """
Aurélien Campéas's avatar
Aurélien Campéas committed
108
        if not self.exists(cn, seriename):
109
            return
110

111
        csetfilter = []
112
        if revision_date:
113
            csetfilter.append(lambda cset: cset.c.insertion_date <= revision_date)
114
        snap = Snapshot(cn, self, seriename)
115
        _, current = snap.find(csetfilter=csetfilter,
116
117
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)
118

119
        if current is not None and not _keep_nans:
120
            current.name = seriename
121
            current = current[~current.isnull()]
122
        return current
123

124
    def metadata(self, cn, seriename):
125
        """Return metadata dict of timeserie."""
126
127
        if seriename in self.metadatacache:
            return self.metadatacache[seriename]
128
129
        reg = self.schema.registry
        sql = select([reg.c.metadata]).where(
130
            reg.c.seriename == seriename
131
132
        )
        meta = cn.execute(sql).scalar()
133
        self.metadatacache[seriename] = meta
134
135
        return meta

136
    def update_metadata(self, cn, seriename, metadata):
137
        self._check_tx(cn)
138
        assert isinstance(metadata, dict)
139
        assert not set(metadata.keys()) & self.metakeys
140
        meta = self.metadata(cn, seriename)
141
142
143
        # remove al but internal stuff
        newmeta = {key: meta[key] for key in self.metakeys}
        newmeta.update(metadata)
144
145
        reg = self.schema.registry
        sql = reg.update().where(
146
            reg.c.seriename == seriename
147
        ).values(metadata=newmeta)
148
        self.metadatacache.pop(seriename)
149
150
        cn.execute(sql)

151
152
153
154
155
156
157
    def changeset_metadata(self, cn, csid):
        sql = 'select metadata from "{ns}".changeset where id = {id}'.format(
            ns=self.namespace,
            id=csid
        )
        return cn.execute(sql).scalar()

158
    def get_history(self, cn, seriename,
159
                    from_insertion_date=None,
160
161
                    to_insertion_date=None,
                    from_value_date=None,
162
                    to_value_date=None,
163
                    deltabefore=None,
164
165
                    deltaafter=None,
                    diffmode=False):
166
        table = self._get_ts_table(cn, seriename)
167
168
169
        if table is None:
            return

170
        cset = self.schema.changeset
171
172
173
174
175
176
177
178
179
180
181
182
        revsql = select(
            [cset.c.id, cset.c.insertion_date]
        ).order_by(
            cset.c.id
        ).where(
            table.c.cset == cset.c.id
        )

        if from_insertion_date:
            revsql = revsql.where(cset.c.insertion_date >= from_insertion_date)
        if to_insertion_date:
            revsql = revsql.where(cset.c.insertion_date <= to_insertion_date)
183

184
185
186
187
188
189
190
191
192
        if from_value_date or to_value_date:
            revsql = revsql.where(
                closed_overlaps(from_value_date, to_value_date)
            )

        revs = cn.execute(
            revsql,
            {'fromdate': from_value_date, 'todate': to_value_date}
        ).fetchall()
193
        if not revs:
194
            return {}
195

196
197
198
199
200
201
        if diffmode:
            # compute the previous serie value
            first_csid = revs[0][0]
            previous_csid = self._previous_cset(cn, seriename, first_csid)
            revs.insert(0, (previous_csid, None))

202
        snapshot = Snapshot(cn, self, seriename)
203
        series = []
204
205
        if (deltabefore, deltaafter) != (None, None):
            for csid, idate in revs:
206
207
                from_date = None
                to_date = None
208
                if deltabefore is not None:
209
                    from_date = idate - deltabefore
210
                if deltaafter is not None:
211
                    to_date = idate + deltaafter
212
213
214
                series.append((
                    idate,
                    snapshot.find(csetfilter=[lambda cset: cset.c.id == csid],
215
216
                                  from_value_date=from_date,
                                  to_value_date=to_date)[1]
217
218
219
220
221
222
                ))
        else:
            series = snapshot.findall(revs,
                                      from_value_date,
                                      to_value_date
            )
223

224
225
        if diffmode:
            diffs = []
Aurélien Campéas's avatar
Aurélien Campéas committed
226
            for (_revdate_a, serie_a), (revdate_b, serie_b) in zip(series, series[1:]):
227
228
229
230
231
232
233
                if serie_a is None:
                    # when we scan the entirety of the history: there exists no "previous" serie
                    # we therefore consider the first serie as a diff to the "null" serie
                    diffs.append((revdate_b, serie_b))
                else:
                    diffs.append((revdate_b, self.diff(serie_a, serie_b)))
            series = diffs
234
235
236
237
238
        else:
            series = [
                (idate, ts.dropna())
                 for idate, ts in series
            ]
239

240
        return {
241
            pd.Timestamp(idate).astimezone('UTC'): serie
242
243
            for idate, serie in series
        }
244

245
246
247
    def get_delta(self, cn, seriename, delta,
                  from_value_date=None,
                  to_value_date=None):
248
249
250
251
        """ compute a series whose value dates are bounded to be
        `delta` time after the insertion dates and where we
        keep the most recent ones
        """
252
        histo = self.get_history(
253
254
255
            cn, seriename, deltabefore=-delta,
            from_value_date=from_value_date,
            to_value_date=to_value_date
256
        )
257
258
        if histo is None:
            return None
259

260
261
262
263
264
265
266
267
        vimap = {}
        vvmap = {}
        for idate, series in histo.items():
            for vdate, value in series.iteritems():
                if vdate not in vimap or vimap[vdate] < idate:
                    vimap[vdate] = idate
                    vvmap[vdate] = value

268
        ts = pd.Series(vvmap).sort_index().loc[from_value_date:to_value_date]
269
270
        ts.name = seriename
        return ts
271

272
273
    def exists(self, cn, seriename):
        return self._get_ts_table(cn, seriename) is not None
274

275
    def latest_insertion_date(self, cn, seriename):
276
        cset = self.schema.changeset
277
        tstable = self._get_ts_table(cn, seriename)
278
279
280
281
282
283
284
285
        sql = select(
            [func.max(cset.c.insertion_date)]
        ).where(
            tstable.c.cset == cset.c.id
        )
        return pd.Timestamp(
            cn.execute(sql).scalar()
        ).astimezone('UTC')
286

287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
    def insertion_dates(self, cn, seriename):
        cset = self.schema.changeset
        tstable = self._get_ts_table(cn, seriename)
        sql = select(
            [cset.c.insertion_date]
        ).where(
            tstable.c.cset == cset.c.id
        ).order_by(
            cset.c.id
        )
        return [
            pd.Timestamp(idate).astimezone('UTC')
            for idate, in cn.execute(sql).fetchall()
        ]

302
303
304
305
    def last_id(self, cn, seriename):
        snapshot = Snapshot(cn, self, seriename)
        return snapshot.last_id()

306
307
308
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
        assert mode in ('strict', 'before', 'after')
        cset = self.schema.changeset
309
        table = self._table_definition_for(cn, seriename)
310
311
        sql = select([table.c.cset]).where(
            table.c.cset == cset.c.id
312
313
314
315
316
317
318
319
320
        )
        if mode == 'strict':
            sql = sql.where(cset.c.insertion_date == revdate)
        elif mode == 'before':
            sql = sql.where(cset.c.insertion_date <= revdate)
        else:
            sql = sql.where(cset.c.insertion_date >= revdate)
        return cn.execute(sql).scalar()

321
    def delete(self, cn, seriename):
322
        self._check_tx(cn)
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
        assert not isinstance(cn, Engine), 'use a transaction object'
        assert self.exists(cn, seriename)
        # changeset will keep ghost entries
        # we cleanup changeset series, then registry
        # then we drop the two remaining tables
        # cn *must* be a transaction scope
        rid, tablename = cn.execute(
            'select id, table_name from "{}".registry '
            'where seriename = %(seriename)s'.format(self.namespace),
            seriename=seriename
        ).fetchone()
        # drop series tables
        cn.execute(
            'drop table "{}.timeserie"."{}" cascade'.format(self.namespace, tablename)
        )
        cn.execute(
            'drop table "{}.snapshot"."{}" cascade'.format(self.namespace, tablename)
        )
        # cleanup changesets table
        cn.execute('with csets as ('
                   ' select cset from "{ns}".changeset_series '
                   ' where serie = %(rid)s'
                   ') '
                   'delete from "{ns}".changeset as cset using csets '
                   'where cset.id = csets.cset'.format(ns=self.namespace),
                   rid=rid
        )
        cn.execute('delete from "{}".registry '
                   'where id = %(rid)s'.format(self.namespace),
                   rid=rid)
        # -> this will transitively cleanup state changeset_series entries
        self._resetcaches()

356
    def strip(self, cn, seriename, csid):
357
        self._check_tx(cn)
358
359
360
361
362
363
364
365
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs

        # put stripping info in the metadata
        cset = self.schema.changeset
        cset_serie = self.schema.changeset_series
        for log in logs:
            # update changeset.metadata
366
            metadata = self.changeset_metadata(cn, log['rev']) or {}
367
368
369
370
371
372
            metadata['tshistory.info'] = 'got stripped from {}'.format(csid)
            sql = cset.update().where(cset.c.id == log['rev']
            ).values(metadata=metadata)
            cn.execute(sql)
            # delete changset_serie item
            sql = cset_serie.delete().where(
373
                cset_serie.c.cset == log['rev']
374
            ).where(
375
                cset_serie.c.serie == self._name_to_regid(cn, seriename)
376
377
378
379
            )
            cn.execute(sql)

        # wipe the diffs
380
        table = self._table_definition_for(cn, seriename)
381
        cn.execute(table.delete().where(table.c.cset >= csid))
382

383
    def info(self, cn):
384
385
        """Gather global statistics on the current tshistory repository
        """
386
        sql = 'select count(*) from "{}".registry'.format(self.namespace)
387
        stats = {'series count': cn.execute(sql).scalar()}
388
        sql = 'select max(id) from "{}".changeset'.format(self.namespace)
389
        stats['changeset count'] = cn.execute(sql).scalar()
390
391
392
        sql = 'select distinct seriename from "{}".registry order by seriename'.format(
            self.namespace
        )
393
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
394
395
        return stats

396
    def log(self, cn, limit=0, names=None, authors=None,
397
            stripped=False,
398
399
            fromrev=None, torev=None,
            fromdate=None, todate=None):
400
401
402
403
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
404
405
406
407
408
        cset, cset_series, reg = (
            self.schema.changeset,
            self.schema.changeset_series,
            self.schema.registry
        )
409

410
        sql = select([cset.c.id, cset.c.author, cset.c.insertion_date, cset.c.metadata]
411
        ).distinct().order_by(desc(cset.c.id))
412
413
414

        if limit:
            sql = sql.limit(limit)
415
        if names:
416
            sql = sql.where(reg.c.seriename.in_(names))
417
418
        if authors:
            sql = sql.where(cset.c.author.in_(authors))
419
420
421
422
        if fromrev:
            sql = sql.where(cset.c.id >= fromrev)
        if torev:
            sql = sql.where(cset.c.id <= torev)
423
424
425
426
        if fromdate:
            sql = sql.where(cset.c.insertion_date >= fromdate)
        if todate:
            sql = sql.where(cset.c.insertion_date <= todate)
427
428
429
430
        if stripped:
            # outerjoin to show dead things
            sql = sql.select_from(cset.outerjoin(cset_series))
        else:
431
            sql = sql.where(cset.c.id == cset_series.c.cset
432
            ).where(cset_series.c.serie == reg.c.id)
433

434
        rset = cn.execute(sql)
435
        for csetid, author, revdate, meta in rset.fetchall():
436
437
            log.append({'rev': csetid, 'author': author,
                        'date': pd.Timestamp(revdate, tz='utc'),
438
                        'meta': meta or {},
439
                        'names': self._changeset_series(cn, csetid)})
440

441
        log.sort(key=lambda rev: rev['rev'])
442
443
        return log

444
    def interval(self, cn, seriename, notz=False):
445
446
447
448
449
450
451
452
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            raise ValueError(f'no such serie: {seriename}')
        sql = (f'select start, "end" '
               f'from "{self.namespace}.timeserie"."{tablename}" '
               f'order by cset desc limit 1')
        res = cn.execute(sql).fetchone()
        start, end = res.start, res.end
453
        if self.metadata(cn, seriename).get('tzaware') and not notz:
454
455
456
            start, end = pd.Timestamp(start, tz='UTC'), pd.Timestamp(end, tz='UTC')
        return pd.Interval(left=start, right=end, closed='both')

457
458
    # /API
    # Helpers
459

460
461
    # creation / update

462
    def _create(self, cn, newts, seriename, author,
463
                metadata=None, insertion_date=None):
464
        start, end = start_end(newts, notz=False)
465
466
467
468
        if start is None:
            assert end is None
            # this is just full of nans
            return None
469
470
471
472
473
        # chop off unwanted nans
        newts = newts.loc[start:end]
        if len(newts) == 0:
            return None

474
475
476
477
478
        # at creation time we take an exclusive lock to avoid
        # a deadlock on created tables against the changeset-series fk
        cn.execute(
            'select pg_advisory_xact_lock({})'.format(self.create_lock_id)
        )
479
        self._register_serie(cn, seriename, newts)
480
        snapshot = Snapshot(cn, self, seriename)
481
        csid = self._newchangeset(cn, author, insertion_date, metadata)
482
        head = snapshot.create(newts)
483
        start, end = start_end(newts)
484
485
        value = {
            'cset': csid,
486
487
488
            'snapshot': head,
            'start': start,
            'end': end
489
        }
Aurélien Campéas's avatar
Aurélien Campéas committed
490
        table = self._make_ts_table(cn, seriename)
491
        cn.execute(table.insert().values(value))
492
        self._finalize_insertion(cn, csid, seriename)
493
        L.info('first insertion of %s (size=%s) by %s',
494
               seriename, len(newts), author)
495
496
        return newts

497
    def _update(self, cn, table, newts, seriename, author,
498
                metadata=None, insertion_date=None):
499
500
        self._validate(cn, newts, seriename)
        snapshot = Snapshot(cn, self, seriename)
501
502
503
        diff = self.diff(snapshot.last(newts.index.min(),
                                       newts.index.max()),
                         newts)
504
505
        if not len(diff):
            L.info('no difference in %s by %s (for ts of size %s)',
506
                   seriename, author, len(newts))
507
508
            return

509
        # compute series start/end stamps
510
        tsstart, tsend = start_end(newts)
511
512
513
        ival = self.interval(cn, seriename, notz=True)
        start = min(tsstart or ival.left, ival.left)
        end = max(tsend or ival.right, ival.right)
514
515
516
517
518
519
520
521
522
523
524
525
526
527

        if pd.isnull(diff[0]) or pd.isnull(diff[-1]):
            # we *might* be shrinking, let's look at the full series
            # and yes, shrinkers have a slow path
            last = snapshot.last()
            patched = self.patch(last, diff).dropna()
            if not len(patched):
                raise ValueError('complete erasure of a series is forbidden')
            if pd.isnull(diff[0]):
                start = patched.index[0]
            if pd.isnull(diff[-1]):
                end = patched.index[-1]

        csid = self._newchangeset(cn, author, insertion_date, metadata)
528
529
530
        head = snapshot.update(diff)
        value = {
            'cset': csid,
531
532
533
            'snapshot': head,
            'start': start,
            'end': end
534
535
        }
        cn.execute(table.insert().values(value))
536
        self._finalize_insertion(cn, csid, seriename)
537
538

        L.info('inserted diff (size=%s) for ts %s by %s',
539
               len(diff), seriename, author)
540
541
        return diff

542
    # serie table handling
543

544
545
546
547
    def _serie_to_tablename(self, cn, seriename):
        tablename = self.serie_tablename.get(seriename)
        if tablename is not None:
            return tablename
548

549
550
551
552
553
554
555
556
557
558
559
560
561
        reg = self.schema.registry
        sql = select([reg.c.table_name]).where(reg.c.seriename == seriename)
        tablename = cn.execute(sql).scalar()
        if tablename is None:
            # creation time
            return
        self.serie_tablename[seriename] = tablename
        return tablename

    def _table_definition_for(self, cn, seriename):
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            # creation time
Aurélien Campéas's avatar
Aurélien Campéas committed
562
            tablename = self._make_tablename(cn, seriename)
563
564
        fq_tablename = '{}.timeserie.{}'.format(self.namespace, tablename)
        table = TABLES.get(fq_tablename)
565
        if table is None:
566
567
            TABLES[fq_tablename] = table = Table(
                tablename, self.schema.meta,
568
569
570
                Column('id', Integer, primary_key=True),
                Column('cset', Integer,
                       ForeignKey('{}.changeset.id'.format(self.namespace)),
571
                       nullable=False),
572
573
                Column('start', TIMESTAMP, nullable=False),
                Column('end', TIMESTAMP, nullable=False),
574
575
576
                Column('snapshot', Integer,
                       ForeignKey('{}.snapshot.{}.id'.format(
                           self.namespace,
577
578
579
                           tablename))),
                Index(NONE_NAME, 'cset'),
                Index(NONE_NAME, 'snapshot'),
580
581
                Index(NONE_NAME, 'start'),
                Index(NONE_NAME, 'end'),
582
                schema='{}.timeserie'.format(self.namespace),
583
                keep_existing=True
584
585
            )
        return table
586

Aurélien Campéas's avatar
Aurélien Campéas committed
587
    def _make_ts_table(self, cn, seriename):
588
        table = self._table_definition_for(cn, seriename)
589
        table.create(cn)
590
591
592
        return table

    def _register_serie(self, cn, seriename, ts):
593
594
        index = ts.index
        inames = [name for name in index.names if name]
595
        sql = self.schema.registry.insert().values(
596
            seriename=seriename,
Aurélien Campéas's avatar
Aurélien Campéas committed
597
            table_name=self._make_tablename(cn, seriename),
598
599
600
601
            metadata={
                'tzaware': tzaware_serie(ts),
                'index_type': index.dtype.name,
                'index_names': inames,
602
603
                'index_dtype': index.dtype.str,
                'value_dtype': ts.dtypes.str,
604
                'value_type': ts.dtypes.name
605
            }
606
        )
607
608
        regid = cn.execute(sql).inserted_primary_key[0]
        self.registry_map[seriename] = regid
609

610
    def _get_ts_table(self, cn, seriename):
611
612
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename:
Aurélien Campéas's avatar
Aurélien Campéas committed
613
            return self._table_definition_for(cn, seriename)
614

615
616
    # changeset handling

617
    def _newchangeset(self, cn, author, insertion_date=None, metadata=None):
618
        table = self.schema.changeset
619
620
621
        if insertion_date is not None:
            assert insertion_date.tzinfo is not None
        idate = pd.Timestamp(insertion_date or datetime.utcnow(), tz='UTC')
622
623
        sql = table.insert().values(
            author=author,
624
            metadata=metadata,
625
            insertion_date=idate)
626
        return cn.execute(sql).inserted_primary_key[0]
627

628
    def _changeset_series(self, cn, csid):
629
        cset_serie = self.schema.changeset_series
630
631
        reg = self.schema.registry
        sql = select(
632
            [reg.c.seriename]
633
634
        ).where(cset_serie.c.cset == csid
        ).where(cset_serie.c.serie == reg.c.id)
635

636
        return [
637
            row.seriename
638
639
            for row in cn.execute(sql).fetchall()
        ]
640

641
642
643
644
645
646
647
    def _previous_cset(self, cn, seriename, csid):
        tablename = self._serie_to_tablename(cn, seriename)
        sql = ('select cset from "{}.timeserie"."{}" '
               'where cset < %(csid)s '
               'order by cset desc limit 1').format(self.namespace, tablename)
        return cn.execute(sql, csid=csid).scalar()

648
649
    # insertion handling

650
    def _validate(self, cn, ts, seriename):
651
652
        if ts.isnull().all():
            # ts erasure
653
            return
654
        tstype = ts.dtype
655
        meta = self.metadata(cn, seriename)
656
        if tstype != meta['value_type']:
657
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
658
                seriename, tstype, meta['value_type'])
659
            raise Exception(m)
660
        if ts.index.dtype.name != meta['index_type']:
661
            raise Exception('Incompatible index types')
662

663
664
    def _name_to_regid(self, cn, seriename):
        regid = self.registry_map.get(seriename)
665
666
667
668
        if regid is not None:
            return regid

        registry = self.schema.registry
669
        sql = select([registry.c.id]).where(registry.c.seriename == seriename)
670
        regid = self.registry_map[seriename] = cn.execute(sql).scalar()
671
672
        return regid

673
    def _finalize_insertion(self, cn, csid, seriename):
674
675
676
        table = self.schema.changeset_series
        sql = table.insert().values(
            cset=csid,
677
            serie=self._name_to_regid(cn, seriename)
678
679
        )
        cn.execute(sql)
680

681
682
683
    def _resetcaches(self):
        TABLES.clear()
        SNAPTABLES.clear()
684
685
686
        self.metadatacache.clear()
        self.registry_map.clear()
        self.serie_tablename.clear()