tsio.py 27.5 KB
Newer Older
1
from datetime import datetime
2
import logging
3
4
import hashlib
import uuid
5
from threading import Lock
6
7
import json
from pathlib import Path
8
9
10

import pandas as pd

11
12
from deprecated import deprecated

13
from tshistory.schema import tsschema
14
from tshistory.util import (
15
    closed_overlaps,
16
17
    num2float,
    SeriesServices,
18
    start_end,
19
    sqlfile,
20
    tx,
21
22
    tzaware_serie
)
23
from tshistory.snapshot import Snapshot
24

25
L = logging.getLogger('tshistory.tsio')
26
SERIESSCHEMA = Path(__file__).parent / 'series.sql'
27
28


29
class timeseries(SeriesServices):
30
    namespace = 'tsh'
31
    schema = None
32
    metadatacache = None
33
34
35
36
37
38
39
40
    metakeys = {
        'tzaware',
        'index_type',
        'index_names',
        'index_dtype',
        'value_dtype',
        'value_type'
    }
41
    registry_map = None
42
    serie_tablename = None
43
    create_lock_id = None
44
    cachelock = Lock()
45
46
47

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
48
        tsschema(namespace).define()
49
        self.metadatacache = {}
50
        self.registry_map = {}
51
        self.serie_tablename = {}
52
        self.create_lock_id = sum(ord(c) for c in namespace)
53

54
    @tx
55
    def insert(self, cn, newts, seriename, author,
56
57
               metadata=None,
               _insertion_date=None):
58
        """Create a new revision of a given time series
59

60
        newts: pandas.Series with date index
61
        seriename: str unique identifier of the serie
62
        author: str free-form author name
63
        metadata: optional dict for changeset metadata
64
        """
65
66
67
68
        assert isinstance(newts, pd.Series), 'Not a pd.Series'
        assert isinstance(seriename, str), 'Name not a string'
        assert isinstance(author, str), 'Author not a string'
        assert metadata is None or isinstance(metadata, dict), 'Bad format for metadata'
69
70
        assert (_insertion_date is None or
                isinstance(_insertion_date, datetime)), 'Bad format for insertion date'
71
        assert not newts.index.duplicated().any(), 'There are some duplicates in the index'
72

73
        assert newts.index.notna().all(), 'The index contains NaT entries'
74
75
        if not newts.index.is_monotonic_increasing:
            newts = newts.sort_index()
76

77
        newts = num2float(newts)
78

79
        if not len(newts):
80
            return
81

82
        assert ('<M8[ns]' == newts.index.dtype or
83
                'datetime' in str(newts.index.dtype) and not
84
85
                isinstance(newts.index, pd.MultiIndex))

86
        newts.name = seriename
87
        tablename = self._serie_to_tablename(cn, seriename)
88

89
        if tablename is None:
90
            return self._create(cn, newts, seriename, author,
91
                                metadata, _insertion_date)
92

93
        return self._update(cn, tablename, newts, seriename, author,
94
                            metadata, _insertion_date)
95

96
    def get(self, cn, seriename, revision_date=None,
97
98
            from_value_date=None, to_value_date=None,
            _keep_nans=False):
99
100
101
102
103
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

104
        """
Aurélien Campéas's avatar
Aurélien Campéas committed
105
        if not self.exists(cn, seriename):
106
            return
107

108
        csetfilter = []
109
        if revision_date:
110
111
112
            csetfilter.append(
                f'cset.insertion_date <= \'{revision_date.isoformat()}\''
            )
113
        snap = Snapshot(cn, self, seriename)
114
        _, current = snap.find(csetfilter=csetfilter,
115
116
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)
117

118
        if current is not None and not _keep_nans:
119
            current.name = seriename
120
            current = current[~current.isnull()]
121
        return current
122

123
    def metadata(self, cn, seriename):
124
        """Return metadata dict of timeserie."""
125
126
        if seriename in self.metadatacache:
            return self.metadatacache[seriename]
127
128
129
        sql = (f'select metadata from {self.namespace}.registry '
               'where seriename = %(seriename)s')
        meta = cn.execute(sql, seriename=seriename).scalar()
130
131
        if meta is not None:
            self.metadatacache[seriename] = meta
132
133
        return meta

134
    @tx
135
    def update_metadata(self, cn, seriename, metadata, internal=False):
136
        assert isinstance(metadata, dict)
137
        assert internal or not set(metadata.keys()) & self.metakeys
138
        meta = self.metadata(cn, seriename)
139
        # remove al but internal stuff
140
141
142
143
144
        newmeta = {
            key: meta[key]
            for key in self.metakeys
            if meta.get(key) is not None
        }
145
        newmeta.update(metadata)
146
147
148
        sql = (f'update "{self.namespace}".registry as reg '
               'set metadata = %(metadata)s '
               'where reg.seriename = %(seriename)s')
149
        self.metadatacache.pop(seriename)
150
151
152
153
154
        cn.execute(
            sql,
            metadata=json.dumps(newmeta),
            seriename=seriename
        )
155

156
157
158
159
160
161
162
    def changeset_metadata(self, cn, csid):
        sql = 'select metadata from "{ns}".changeset where id = {id}'.format(
            ns=self.namespace,
            id=csid
        )
        return cn.execute(sql).scalar()

163
164
165
    def type(self, cn, name):
        return 'primary'

166
    @tx
167
168
169
170
171
172
173
174
175
    def history(self, cn, seriename,
                from_insertion_date=None,
                to_insertion_date=None,
                from_value_date=None,
                to_value_date=None,
                deltabefore=None,
                deltaafter=None,
                diffmode=False,
                _keep_nans=False):
176
177
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
178
179
            return

180
181
182
183
184
185
        revsql = [
            'select cset.id, cset.insertion_date '
            f'from "{self.namespace}".changeset as cset, '
            f'     "{self.namespace}.timeserie"."{tablename}" as ts '
            'where ts.cset = cset.id '
        ]
186
187

        if from_insertion_date:
188
            revsql.append('and cset.insertion_date >= %(from_idate)s ')
189
        if to_insertion_date:
190
            revsql.append('and cset.insertion_date <= %(to_idate)s ')
191

192
        if from_value_date or to_value_date:
193
194
            revsql.append(
                'and ' + closed_overlaps(from_value_date, to_value_date)
195
196
            )

197
198
199
        revsql.append('order by cset.id')
        revsql = ''.join(revsql)

200
        revs = cn.execute(
201
202
203
204
205
206
            revsql, {
                'fromdate': from_value_date,
                'todate': to_value_date,
                'from_idate': from_insertion_date,
                'to_idate': to_insertion_date
            }
207
        ).fetchall()
208
        if not revs:
209
            return {}
210

211
212
213
214
215
216
        if diffmode:
            # compute the previous serie value
            first_csid = revs[0][0]
            previous_csid = self._previous_cset(cn, seriename, first_csid)
            revs.insert(0, (previous_csid, None))

217
        snapshot = Snapshot(cn, self, seriename)
218
        series = []
219
220
        if (deltabefore, deltaafter) != (None, None):
            for csid, idate in revs:
221
222
                from_date = None
                to_date = None
223
                if deltabefore is not None:
224
                    from_date = idate - deltabefore
225
                if deltaafter is not None:
226
                    to_date = idate + deltaafter
227
228
                series.append((
                    idate,
229
                    snapshot.find(csetfilter=[f'cset.id = {csid}'],
230
231
                                  from_value_date=from_date,
                                  to_value_date=to_date)[1]
232
233
234
235
236
237
                ))
        else:
            series = snapshot.findall(revs,
                                      from_value_date,
                                      to_value_date
            )
238

239
240
        if diffmode:
            diffs = []
Aurélien Campéas's avatar
Aurélien Campéas committed
241
            for (_revdate_a, serie_a), (revdate_b, serie_b) in zip(series, series[1:]):
242
243
244
245
246
247
248
                if serie_a is None:
                    # when we scan the entirety of the history: there exists no "previous" serie
                    # we therefore consider the first serie as a diff to the "null" serie
                    diffs.append((revdate_b, serie_b))
                else:
                    diffs.append((revdate_b, self.diff(serie_a, serie_b)))
            series = diffs
249
250
        else:
            series = [
251
                (idate, ts if _keep_nans else ts.dropna() )
252
253
                 for idate, ts in series
            ]
254

255
        return {
256
            pd.Timestamp(idate).astimezone('UTC'): serie
257
258
            for idate, serie in series
        }
259

260
    @tx
261
    def staircase(self, cn, seriename, delta,
262
263
                  from_value_date=None,
                  to_value_date=None):
264
265
266
267
        """ compute a series whose value dates are bounded to be
        `delta` time after the insertion dates and where we
        keep the most recent ones
        """
268
        histo = self.history(
269
270
            cn, seriename, deltabefore=-delta,
            from_value_date=from_value_date,
271
272
            to_value_date=to_value_date,
            _keep_nans=True
273
        )
274
275
        if histo is None:
            return None
276

277
278
279
280
281
282
283
284
        vimap = {}
        vvmap = {}
        for idate, series in histo.items():
            for vdate, value in series.iteritems():
                if vdate not in vimap or vimap[vdate] < idate:
                    vimap[vdate] = idate
                    vvmap[vdate] = value

285
        ts = pd.Series(vvmap).sort_index().loc[from_value_date:to_value_date]
286
        ts.name = seriename
287
        return ts.dropna()
288

289
    def exists(self, cn, seriename):
290
        return self._serie_to_tablename(cn, seriename) is not None
291

292
    def latest_insertion_date(self, cn, seriename):
293
294
295
296
297
        tablename = self._serie_to_tablename(cn, seriename)
        sql = ('select max(insertion_date) '
               f'from "{self.namespace}".changeset as cset, '
               f'     "{self.namespace}.timeserie"."{tablename}" as tstable '
               'where cset.id = tstable.cset')
298
299
300
        return pd.Timestamp(
            cn.execute(sql).scalar()
        ).astimezone('UTC')
301

302
303
    def insertion_dates(self, cn, seriename,
                        fromdate=None, todate=None):
304
305
        tablename = self._serie_to_tablename(cn, seriename)
        fromclause, toclause = '', ''
306
        if fromdate:
307
            fromclause = ' and cset.insertion_date >= %(fromdate)s '
308
        if todate:
309
310
311
312
313
314
315
            toclause = ' and cset.insertion_date <= %(todate)s '
        sql = ('select insertion_date '
               f'from "{self.namespace}".changeset as cset, '
               f'     "{self.namespace}.timeserie"."{tablename}" as tstable '
               'where cset.id = tstable.cset '
               f'{fromclause} {toclause}'
               'order by cset.id')
316

317
318
        return [
            pd.Timestamp(idate).astimezone('UTC')
319
320
321
            for idate, in cn.execute(
                    sql, fromdate=fromdate, todate=todate
            ).fetchall()
322
323
        ]

324
325
326
327
    def last_id(self, cn, seriename):
        snapshot = Snapshot(cn, self, seriename)
        return snapshot.last_id()

328
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
329
330
331
332
333
334
335
336
337
338
339
340
341
        operators = {
            'strict': '=',
            'before': '<=',
            'after': '>='
        }
        tablename = self._serie_to_tablename(cn, seriename)
        assert mode in operators
        sql = ('select cset '
               f'from "{self.namespace}.timeserie"."{tablename}" as tstable, '
               f'      "{self.namespace}".changeset as cset '
               f'where cset.id = tstable.cset '
               f'and   cset.insertion_date {operators[mode]} %(revdate)s')
        return cn.execute(sql, revdate=revdate).scalar()
342

343
344
    @tx
    def rename(self, cn, oldname, newname):
345
346
347
348
        sql = (f'update "{self.namespace}".registry '
               'set seriename = %(newname)s '
               'where seriename = %(oldname)s')
        cn.execute(sql, oldname=oldname, newname=newname)
349
350
        self._resetcaches()

351
    @tx
352
    def delete(self, cn, seriename):
353
354
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
355
356
            print('not deleting unknown series', seriename, self.namespace)
            return
357
        # changeset will keep ghost entries
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
        # whose cleanup is costly
        # we will mark them as from a deleted series
        # update changeset.metadata
        msg = f'belonged to deleted series `{seriename}`'
        csetsql = f'select cset from "{self.namespace}.timeserie"."{tablename}"'
        for csid, in cn.execute(csetsql):
            metadata = self.changeset_metadata(cn, csid) or {}
            metadata['tshistory.info'] = msg
            cn.execute(
                f'update "{self.namespace}".changeset '
                'set metadata = %(metadata)s '
                'where id = %(csid)s',
                csid=csid,
                metadata=json.dumps(metadata)
            )

374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
        rid, tablename = cn.execute(
            'select id, table_name from "{}".registry '
            'where seriename = %(seriename)s'.format(self.namespace),
            seriename=seriename
        ).fetchone()
        # drop series tables
        cn.execute(
            'drop table "{}.timeserie"."{}" cascade'.format(self.namespace, tablename)
        )
        cn.execute(
            'drop table "{}.snapshot"."{}" cascade'.format(self.namespace, tablename)
        )
        cn.execute('delete from "{}".registry '
                   'where id = %(rid)s'.format(self.namespace),
                   rid=rid)
        # -> this will transitively cleanup state changeset_series entries
        self._resetcaches()

392
    @tx
393
394
395
396
397
398
399
    def strip(self, cn, seriename, csid):
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs

        # put stripping info in the metadata
        for log in logs:
            # update changeset.metadata
400
            metadata = self.changeset_metadata(cn, log['rev']) or {}
401
            metadata['tshistory.info'] = 'got stripped from {}'.format(csid)
402
403
404
405
            sql = (f'update "{self.namespace}".changeset '
                   'set metadata = %(metadata)s '
                   'where id = %(id)s')
            cn.execute(sql, id=log['rev'], metadata=json.dumps(metadata))
406
            # delete changset_serie item
407
408
409
410
411
412
413
            sql = (f'delete from "{self.namespace}".changeset_series as css '
                   'where css.cset = %(rev)s '
                   'and   css.serie = %(name)s')
            cn.execute(
                sql,
                rev=log['rev'],
                name=self._name_to_regid(cn, seriename)
414
415
416
            )

        # wipe the diffs
417
418
419
420
        tablename = self._serie_to_tablename(cn, seriename)
        sql = (f'delete from "{self.namespace}.timeserie"."{tablename}" '
               'where cset >= %(csid)s')
        cn.execute(sql, csid=csid)
421

422
    def info(self, cn):
423
424
        """Gather global statistics on the current tshistory repository
        """
425
        sql = 'select count(*) from "{}".registry'.format(self.namespace)
426
        stats = {'series count': cn.execute(sql).scalar()}
427
        sql = 'select max(id) from "{}".changeset'.format(self.namespace)
428
        stats['changeset count'] = cn.execute(sql).scalar()
429
430
431
        sql = 'select distinct seriename from "{}".registry order by seriename'.format(
            self.namespace
        )
432
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
433
434
        return stats

435
    def log(self, cn, limit=0, names=None, authors=None,
436
            stripped=False,
437
438
            fromrev=None, torev=None,
            fromdate=None, todate=None):
439
440
441
442
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
443

444
445
446
447
448
449
450
451
452
453
454
455
456
        sql = [
            'select distinct cset.id, cset.author, cset.insertion_date, cset.metadata '
            f'from "{self.namespace}".changeset as cset, '
            f'     "{self.namespace}".registry as reg, '
            f'     "{self.namespace}".changeset_series as css '
        ]
        wheres = []
        if stripped:
            sql.append(f'left join "{self.namespace}".changeset as cset2 '
                       'on (cset2.id = css.cset) ')
        else:
            wheres.append('cset.id = css.cset and '
                          'css.serie = reg.id ')
457

458
        if names:
459
460
461
462
            # XXX check names exist
            wheres.append('reg.seriename in (%s)' % ','.join(
                repr(name) for name in names)
            )
463
        if authors:
464
465
466
            wheres.append('cset.author in (%s)' % ','.join(
                repr(auth) for auth in authors)
            )
467
        if fromrev:
468
            wheres.append('cset.id >= %(fromrev)s')
469
        if torev:
470
            wheres.append('cset.id <= %(torev)s')
471
        if fromdate:
472
            wheres.append('cset.insertion_date >= %(fromdate)s')
473
        if todate:
474
475
476
477
478
479
480
481
            wheres.append('cset.insertion_date <= %(todate)s')

        sql.append('where ' + ' and '.join(wheres))
        if limit:
            sql.append('limit %(limit)s ')
        sql.append('order by cset.id desc')

        sql = ''.join(sql)
482

483
484
485
486
487
488
        rset = cn.execute(sql, {
            'fromdate': fromdate,
            'todate': todate,
            'fromrev': fromrev,
            'torev': torev
        })
489
        for csetid, author, revdate, meta in rset.fetchall():
490
491
            log.append({'rev': csetid, 'author': author,
                        'date': pd.Timestamp(revdate, tz='utc'),
492
                        'meta': meta or {},
493
                        'names': self._changeset_series(cn, csetid)})
494

495
        log.sort(key=lambda rev: rev['rev'])
496
497
        return log

498
    def interval(self, cn, seriename, notz=False):
499
500
501
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            raise ValueError(f'no such serie: {seriename}')
502
        sql = (f'select tsstart, tsend '
503
504
505
               f'from "{self.namespace}.timeserie"."{tablename}" '
               f'order by cset desc limit 1')
        res = cn.execute(sql).fetchone()
506
        start, end = res.tsstart, res.tsend
507
        if self.metadata(cn, seriename).get('tzaware') and not notz:
508
509
510
            start, end = pd.Timestamp(start, tz='UTC'), pd.Timestamp(end, tz='UTC')
        return pd.Interval(left=start, right=end, closed='both')

511
512
    # /API
    # Helpers
513

514
515
    # creation / update

516
    def _create(self, cn, newts, seriename, author,
517
                metadata=None, insertion_date=None):
518
        start, end = start_end(newts, notz=False)
519
520
521
522
        if start is None:
            assert end is None
            # this is just full of nans
            return None
523
524
525
526
527
        # chop off unwanted nans
        newts = newts.loc[start:end]
        if len(newts) == 0:
            return None

528
529
530
531
532
        # at creation time we take an exclusive lock to avoid
        # a deadlock on created tables against the changeset-series fk
        cn.execute(
            'select pg_advisory_xact_lock({})'.format(self.create_lock_id)
        )
533
        self._register_serie(cn, seriename, newts)
534
        snapshot = Snapshot(cn, self, seriename)
535
        csid = self._newchangeset(cn, author, insertion_date, metadata)
536
        head = snapshot.create(newts)
537
        start, end = start_end(newts)
538
539
540
541
542
543
        tablename = self._make_ts_table(cn, seriename)
        sql = (f'insert into "{self.namespace}.timeserie"."{tablename}" '
               '(cset, snapshot, tsstart, tsend) '
               f'values (%s, %s, %s, %s)')
        cn.execute(sql, csid, head,
                   start.to_pydatetime(), end.to_pydatetime())
544
        self._finalize_insertion(cn, csid, seriename)
545
        L.info('first insertion of %s (size=%s) by %s',
546
               seriename, len(newts), author)
547
548
        return newts

549
    def _update(self, cn, tablename, newts, seriename, author,
550
                metadata=None, insertion_date=None):
551
552
        self._validate(cn, newts, seriename)
        snapshot = Snapshot(cn, self, seriename)
553
554
555
        diff = self.diff(snapshot.last(newts.index.min(),
                                       newts.index.max()),
                         newts)
556
557
        if not len(diff):
            L.info('no difference in %s by %s (for ts of size %s)',
558
                   seriename, author, len(newts))
559
560
            return

561
        # compute series start/end stamps
562
        tsstart, tsend = start_end(newts)
563
564
565
        ival = self.interval(cn, seriename, notz=True)
        start = min(tsstart or ival.left, ival.left)
        end = max(tsend or ival.right, ival.right)
566
567
568
569
570
571
572
573
574
575
576
577
578
579

        if pd.isnull(diff[0]) or pd.isnull(diff[-1]):
            # we *might* be shrinking, let's look at the full series
            # and yes, shrinkers have a slow path
            last = snapshot.last()
            patched = self.patch(last, diff).dropna()
            if not len(patched):
                raise ValueError('complete erasure of a series is forbidden')
            if pd.isnull(diff[0]):
                start = patched.index[0]
            if pd.isnull(diff[-1]):
                end = patched.index[-1]

        csid = self._newchangeset(cn, author, insertion_date, metadata)
580
        head = snapshot.update(diff)
581
582
583
584
        sql = (f'insert into "{self.namespace}.timeserie"."{tablename}" '
               '(cset, snapshot, tsstart, tsend) '
               'values (%s, %s, %s, %s)')
        cn.execute(sql, csid, head, start, end)
585
        self._finalize_insertion(cn, csid, seriename)
586
587

        L.info('inserted diff (size=%s) for ts %s by %s',
588
               len(diff), seriename, author)
589
590
        return diff

591
    # serie table handling
592

593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
    def _make_tablename(self, cn, seriename):
        """ compute the unqualified (no namespace) table name
        from a serie name, to allow arbitrary serie names
        """
        # postgresql table names are limited to 63 chars.
        if len(seriename) > 63:
            seriename = hashlib.sha1(seriename.encode('utf-8')).hexdigest()

        # collision detection (collision can happen after a rename)
        if cn.execute(f'select table_name '
                      f'from "{self.namespace}".registry '
                      f'where table_name = %(seriename)s',
                      seriename=seriename).scalar():
            return str(uuid.uuid4())

        return seriename

610
611
612
613
    def _serie_to_tablename(self, cn, seriename):
        tablename = self.serie_tablename.get(seriename)
        if tablename is not None:
            return tablename
614

615
616
617
618
619
        tablename = cn.execute(
            f'select table_name from "{self.namespace}".registry '
            f'where seriename = %(seriename)s',
            seriename=seriename
        ).scalar()
620
621
622
623
624
625
626
627
628
629
        if tablename is None:
            # creation time
            return
        self.serie_tablename[seriename] = tablename
        return tablename

    def _table_definition_for(self, cn, seriename):
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            # creation time
Aurélien Campéas's avatar
Aurélien Campéas committed
630
            tablename = self._make_tablename(cn, seriename)
631
632
633
634
635
636
        table = sqlfile(
            SERIESSCHEMA,
            namespace=self.namespace,
            tablename=tablename
        )
        return table, tablename
637

Aurélien Campéas's avatar
Aurélien Campéas committed
638
    def _make_ts_table(self, cn, seriename):
639
640
641
        table, tablename = self._table_definition_for(cn, seriename)
        cn.execute(table)
        return tablename
642
643

    def _register_serie(self, cn, seriename, ts):
644
645
        index = ts.index
        inames = [name for name in index.names if name]
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
        sql = (f'insert into {self.namespace}.registry '
               '(seriename, table_name, metadata) '
               'values (%s, %s, %s) '
               'returning id')
        table_name = self._make_tablename(cn, seriename)
        metadata = json.dumps({
            'tzaware': tzaware_serie(ts),
            'index_type': index.dtype.name,
            'index_names': inames,
            'index_dtype': index.dtype.str,
            'value_dtype': ts.dtypes.str,
            'value_type': ts.dtypes.name
        })
        regid = cn.execute(
            sql,
            seriename,
            table_name,
            metadata
        ).scalar()
665
        self.registry_map[seriename] = regid
666

667
    def _get_ts_table(self, cn, seriename):
668
669
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename:
Aurélien Campéas's avatar
Aurélien Campéas committed
670
            return self._table_definition_for(cn, seriename)
671

672
673
    # changeset handling

674
675
676
677
    def _newchangeset(self, cn, author, insertion_date=None, metadata=None):
        if insertion_date is not None:
            assert insertion_date.tzinfo is not None
        idate = pd.Timestamp(insertion_date or datetime.utcnow(), tz='UTC')
678
679
680
681
682
683
684
685
686
687
688
689
        sql = (f'insert into {self.namespace}.changeset '
               '(author, metadata, insertion_date) '
               'values (%s, %s, %s) '
               'returning id')
        if metadata:
            metadata = json.dumps(metadata)
        return cn.execute(
            sql,
            author,
            metadata,
            idate
        ).scalar()
690

691
    def _changeset_series(self, cn, csid):
692
693
694
695
696
        sql = ('select seriename '
               f'from "{self.namespace}".registry as reg, '
               f'     "{self.namespace}".changeset_series as css '
               'where css.serie = reg.id '
               'and   css.cset = %(csid)s')
697

698
        return [
699
            row.seriename
700
            for row in cn.execute(sql, csid=csid).fetchall()
701
        ]
702

703
704
705
706
707
708
709
    def _previous_cset(self, cn, seriename, csid):
        tablename = self._serie_to_tablename(cn, seriename)
        sql = ('select cset from "{}.timeserie"."{}" '
               'where cset < %(csid)s '
               'order by cset desc limit 1').format(self.namespace, tablename)
        return cn.execute(sql, csid=csid).scalar()

710
711
    # insertion handling

712
    def _validate(self, cn, ts, seriename):
713
714
        if ts.isnull().all():
            # ts erasure
715
            return
716
        tstype = ts.dtype
717
        meta = self.metadata(cn, seriename)
718
        if tstype != meta['value_type']:
719
            m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
720
                seriename, tstype, meta['value_type'])
721
            raise Exception(m)
722
        if ts.index.dtype.name != meta['index_type']:
723
            raise Exception('Incompatible index types')
724

725
726
    def _name_to_regid(self, cn, seriename):
        regid = self.registry_map.get(seriename)
727
728
729
        if regid is not None:
            return regid

730
731
732
733
734
735
736
        sql = ('select id '
               f'from "{self.namespace}".registry '
               'where seriename = %(seriename)s')
        regid = self.registry_map[seriename] = cn.execute(
            sql,
            seriename=seriename
        ).scalar()
737
738
        return regid

739
    def _finalize_insertion(self, cn, csid, seriename):
740
741
742
743
        sql = (f'insert into "{self.namespace}".changeset_series '
               '(cset, serie) '
               'values (%s, %s)')
        cn.execute(sql, csid, self._name_to_regid(cn, seriename))
744

745
    def _resetcaches(self):
746
747
748
749
        with self.cachelock:
            self.metadatacache.clear()
            self.registry_map.clear()
            self.serie_tablename.clear()
750
751
752
753
754
755
756
757



@deprecated(reason='use the `timeseries` object instead')
class TimeSerie(timeseries):

    get_history = deprecated(timeseries.history)
    get_delta = deprecated(timeseries.staircase)