tsio.py 31.9 KB
Newer Older
1
from datetime import datetime
2
import logging
3
4
import hashlib
import uuid
5
from threading import Lock
6
7
import json
from pathlib import Path
8
9
10

import pandas as pd

11
from deprecated import deprecated
12
from sqlhelp import sqlfile, select
13

14
from tshistory.util import (
15
    closed_overlaps,
16
    num2float,
17
    pruned_history,
18
    SeriesServices,
19
    start_end,
20
    tx,
21
22
    tzaware_serie
)
23
from tshistory.snapshot import Snapshot
24

25
L = logging.getLogger('tshistory.tsio')
26
SERIESSCHEMA = Path(__file__).parent / 'series.sql'
27
28


29
class timeseries(SeriesServices):
30
    namespace = 'tsh'
31
    schema = None
32
    metadatacache = None
33
34
35
36
37
38
39
    metakeys = {
        'tzaware',
        'index_type',
        'index_dtype',
        'value_dtype',
        'value_type'
    }
40
    registry_map = None
41
    serie_tablename = None
42
    create_lock_id = None
43
    delete_lock_id = None
44
    cachelock = Lock()
45
46
47

    def __init__(self, namespace='tsh'):
        self.namespace = namespace
48
        self.metadatacache = {}
49
        self.registry_map = {}
50
        self.serie_tablename = {}
51
        self.create_lock_id = sum(ord(c) for c in namespace)
52
        self.delete_lock_id = sum(ord(c) for c in namespace)
53

54
    @tx
55
    def insert(self, cn, newts, seriename, author,
56
57
               metadata=None,
               _insertion_date=None):
58
        """Create a new revision of a given time series
59

60
        newts: pandas.Series with date index
61
        seriename: str unique identifier of the serie
62
        author: str free-form author name
63
        metadata: optional dict for changeset metadata
64
        """
65
66
67
        assert isinstance(seriename, str), 'Name not a string'
        assert isinstance(author, str), 'Author not a string'
        assert metadata is None or isinstance(metadata, dict), 'Bad format for metadata'
68
69
        assert (_insertion_date is None or
                isinstance(_insertion_date, datetime)), 'Bad format for insertion date'
70
        newts = self._guard_insert(newts)
71
        if not len(newts):
72
            return
73

74
        assert ('<M8[ns]' == newts.index.dtype or
75
                'datetime' in str(newts.index.dtype) and not
76
77
                isinstance(newts.index, pd.MultiIndex))

78
        newts.name = seriename
79
        tablename = self._serie_to_tablename(cn, seriename)
80

81
        if tablename is None:
82
83
            seriesmeta = self._series_initial_meta(cn, seriename, newts)
            return self._create(cn, newts, seriename, author, seriesmeta,
84
                                metadata, _insertion_date)
85

86
        return self._update(cn, tablename, newts, seriename, author,
87
                            metadata, _insertion_date)
88

89
    def list_series(self, cn):
90
        """Return the mapping of all series to their type"""
91
        sql = f'select seriename from "{self.namespace}".registry '
92
93
94
95
        return {
            row.seriename: 'primary'
            for row in cn.execute(sql)
        }
96

97
    def get(self, cn, seriename, revision_date=None,
98
99
            from_value_date=None, to_value_date=None,
            _keep_nans=False):
100
101
102
103
104
        """Compute and return the serie of a given name

        revision_date: datetime filter to get previous versions of the
        serie

105
        """
Aurélien Campéas's avatar
Aurélien Campéas committed
106
        if not self.exists(cn, seriename):
107
            return
108

109
        csetfilter = []
110
        if revision_date:
111
            csetfilter.append(
112
                lambda q: q.where(
113
114
                    f'cset.insertion_date <= %(idate)s', idate=revision_date
                )
115
            )
116
        snap = Snapshot(cn, self, seriename)
117
        _, current = snap.find(csetfilter=csetfilter,
118
119
                               from_value_date=from_value_date,
                               to_value_date=to_value_date)
120

121
        if current is not None and not _keep_nans:
122
            current.name = seriename
123
            current = current.dropna()
124
        return current
125

126
    def metadata(self, cn, seriename):
127
        """Return metadata dict of timeserie."""
128
129
        if seriename in self.metadatacache:
            return self.metadatacache[seriename]
130
        sql = (f'select metadata from "{self.namespace}".registry '
131
132
               'where seriename = %(seriename)s')
        meta = cn.execute(sql, seriename=seriename).scalar()
133
134
        if meta is not None:
            self.metadatacache[seriename] = meta
135
136
        return meta

137
    @tx
138
    def update_metadata(self, cn, seriename, metadata, internal=False):
139
        assert isinstance(metadata, dict)
140
        assert internal or not set(metadata.keys()) & self.metakeys
141
        meta = self.metadata(cn, seriename)
142
        # remove al but internal stuff
143
144
145
146
147
        newmeta = {
            key: meta[key]
            for key in self.metakeys
            if meta.get(key) is not None
        }
148
        newmeta.update(metadata)
149
150
151
        sql = (f'update "{self.namespace}".registry as reg '
               'set metadata = %(metadata)s '
               'where reg.seriename = %(seriename)s')
152
        self.metadatacache.pop(seriename)
153
154
155
156
157
        cn.execute(
            sql,
            metadata=json.dumps(newmeta),
            seriename=seriename
        )
158

159
    def changeset_metadata(self, cn, csid):
160
        assert isinstance(csid, int)
161
        q = select(
162
            'metadata'
163
        ).table(
164
165
166
167
168
            f'"{self.namespace}".changeset'
        ).where(
            f'id = %(csid)s', csid=csid
        )
        return q.do(cn).scalar()
169

170
171
172
    def type(self, cn, name):
        return 'primary'

173
174
175
176
177
178
179
180
181
    @tx
    def history(self, cn, seriename,
                from_insertion_date=None,
                to_insertion_date=None,
                from_value_date=None,
                to_value_date=None,
                deltabefore=None,
                deltaafter=None,
                diffmode=False,
182
                _wanted_insertion_dates=None,
183
184
185
186
187
                _keep_nans=False):
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            return

188
189
190
191
192
193
194
195
196
        revs = self._revisions(
            cn, seriename,
            from_insertion_date,
            to_insertion_date,
            from_value_date,
            to_value_date
        )
        if _wanted_insertion_dates is not None:
            revs = self._pruned_revisions(
197
                cn, seriename,
198
199
                _wanted_insertion_dates,
                revs
200
201
            )

202
        if not revs:
203
            return {}
204

205
206
207
208
209
210
        if diffmode:
            # compute the previous serie value
            first_csid = revs[0][0]
            previous_csid = self._previous_cset(cn, seriename, first_csid)
            revs.insert(0, (previous_csid, None))

211
        snapshot = Snapshot(cn, self, seriename)
212
        series = []
213
214
        if (deltabefore, deltaafter) != (None, None):
            for csid, idate in revs:
215
216
                from_date = None
                to_date = None
217
                if deltabefore is not None:
218
                    from_date = idate - deltabefore
219
220
                else:
                    from_date = from_value_date
221
                if deltaafter is not None:
222
                    to_date = idate + deltaafter
223
224
                else:
                    to_date = to_value_date
225
226
                series.append((
                    idate,
227
228
                    snapshot.find(
                        csetfilter=[
229
                            lambda q: q.where('cset.id = %(csid)s', csid=csid)
230
231
232
233
234
                        ],
                        from_value_date=from_date,
                        to_value_date=to_date)[1]
                    )
                )
235
236
237
238
239
        else:
            series = snapshot.findall(revs,
                                      from_value_date,
                                      to_value_date
            )
240

241
242
        if diffmode:
            diffs = []
Aurélien Campéas's avatar
Aurélien Campéas committed
243
            for (_revdate_a, serie_a), (revdate_b, serie_b) in zip(series, series[1:]):
244
245
246
247
248
                if serie_a is None:
                    # when we scan the entirety of the history: there exists no "previous" serie
                    # we therefore consider the first serie as a diff to the "null" serie
                    diffs.append((revdate_b, serie_b))
                else:
249
250
251
                    diff = self.diff(serie_a, serie_b)
                    if len(diff):
                        diffs.append((revdate_b, diff))
252
            series = diffs
253
254
        else:
            series = [
255
                (idate, ts if _keep_nans else ts.dropna() )
256
257
                 for idate, ts in series
            ]
258

259
        hist = {
260
261
262
            idate: ts
            for idate, ts in series if len(series)
        }
263

264
265
266
267
268
269
270
271
        if from_value_date or to_value_date:
            # now it's possible that the extremities cut
            # yields similar series for successive idates
            # and we are not interested in that
            hist = pruned_history(hist)

        return hist

272
    @tx
273
    def staircase(self, cn, seriename, delta,
274
275
                  from_value_date=None,
                  to_value_date=None):
276
277
278
279
        """ compute a series whose value dates are bounded to be
        `delta` time after the insertion dates and where we
        keep the most recent ones
        """
280
281
282
        if not self.exists(cn, seriename):
            return

283
284
285
286
287
        base = self.get(
            cn, seriename,
            from_value_date=from_value_date,
            to_value_date=to_value_date,
            _keep_nans=True
288
        )
289
290
291
292
293
294
295
296
        if not len(base):
            return pd.Series(name=seriename)

        # prepare the needed revision dates
        shiftedvdates = [
            vdate - delta
            for vdate in base.index
        ]
297

298
299
        hcache = historycache(
            self, cn, seriename,
300
            from_value_date=from_value_date,
301
            to_value_date=to_value_date,
302
            _wanted_insertion_dates=shiftedvdates
303
         )
304

305
306
307
308
309
        return hcache.staircase(
            delta,
            from_value_date,
            to_value_date
        )
310

311

312
    def exists(self, cn, seriename):
313
        return self._serie_to_tablename(cn, seriename) is not None
314

315
    def latest_insertion_date(self, cn, seriename):
316
        tablename = self._serie_to_tablename(cn, seriename)
317
        q = select(
318
            'max(insertion_date)'
319
        ).table(
320
321
322
323
324
            f'"{self.namespace}".changeset as cset',
            f'"{self.namespace}.timeserie"."{tablename}" as tstable'
        ).where(
               'cset.id = tstable.cset'
        )
325
        return pd.Timestamp(
326
            q.do(cn).scalar()
327
        ).astimezone('UTC')
328

329
330
    def insertion_dates(self, cn, seriename,
                        fromdate=None, todate=None):
331
332
333
334
335
        revs = self._revisions(
            cn, seriename,
            from_insertion_date=fromdate,
            to_insertion_date=todate
        )
336

337
        return [
338
            idate
339
            for _cset, idate in revs
340
341
        ]

342
343
344
345
    def last_id(self, cn, seriename):
        snapshot = Snapshot(cn, self, seriename)
        return snapshot.last_id()

346
    def changeset_at(self, cn, seriename, revdate, mode='strict'):
347
348
349
350
351
352
353
        operators = {
            'strict': '=',
            'before': '<=',
            'after': '>='
        }
        tablename = self._serie_to_tablename(cn, seriename)
        assert mode in operators
354
        q = select(
355
            'cset'
356
        ).table(
357
358
359
360
361
362
363
364
            f'"{self.namespace}.timeserie"."{tablename}" as tstable',
            f'"{self.namespace}".changeset as cset '
        ).where(
            'cset.id = tstable.cset',
            f'cset.insertion_date {operators[mode]} %(revdate)s',
            revdate=revdate
        )
        return q.do(cn).scalar()
365

366
367
    @tx
    def rename(self, cn, oldname, newname):
368
369
370
371
        sql = (f'update "{self.namespace}".registry '
               'set seriename = %(newname)s '
               'where seriename = %(oldname)s')
        cn.execute(sql, oldname=oldname, newname=newname)
372
373
        self._resetcaches()

374
    @tx
375
    def delete(self, cn, seriename):
376
377
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
378
379
            print('not deleting unknown series', seriename, self.namespace)
            return
380
381
382
383
        # serialize all deletions to avoid deadlocks
        cn.execute(
            f'select pg_advisory_xact_lock({self.delete_lock_id})'
        )
384
        # changeset will keep ghost entries
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
        # whose cleanup is costly
        # we will mark them as from a deleted series
        # update changeset.metadata
        msg = f'belonged to deleted series `{seriename}`'
        csetsql = f'select cset from "{self.namespace}.timeserie"."{tablename}"'
        for csid, in cn.execute(csetsql):
            metadata = self.changeset_metadata(cn, csid) or {}
            metadata['tshistory.info'] = msg
            cn.execute(
                f'update "{self.namespace}".changeset '
                'set metadata = %(metadata)s '
                'where id = %(csid)s',
                csid=csid,
                metadata=json.dumps(metadata)
            )

401
        rid, tablename = cn.execute(
402
403
            f'select id, table_name from "{self.namespace}".registry '
            'where seriename = %(seriename)s',
404
405
406
407
            seriename=seriename
        ).fetchone()
        # drop series tables
        cn.execute(
408
            f'drop table "{self.namespace}.timeserie"."{tablename}" cascade'
409
410
        )
        cn.execute(
411
            f'drop table "{self.namespace}.snapshot"."{tablename}" cascade'
412
        )
413
414
        cn.execute(f'delete from "{self.namespace}".registry '
                   'where id = %(rid)s',
415
416
417
418
                   rid=rid)
        # -> this will transitively cleanup state changeset_series entries
        self._resetcaches()

419
    @tx
420
    def strip(self, cn, seriename, csid):
421
422
423
424
425
426
        # wipe the diffs
        tablename = self._serie_to_tablename(cn, seriename)
        sql = (f'delete from "{self.namespace}.timeserie"."{tablename}" '
               'where cset >= %(csid)s')
        cn.execute(sql, csid=csid)

427
428
429
        logs = self.log(cn, fromrev=csid, names=(seriename,))
        assert logs
        for log in logs:
430
431
432
433
            csid = log['rev']
            # set in metadata the fact that this changeset
            # has been stripped (hence is no longer being referenced)
            metadata = self.changeset_metadata(cn, csid) or {}
434
            metadata['tshistory.info'] = f'got stripped from {csid}'
435
436
            sql = (f'update "{self.namespace}".changeset '
                   'set metadata = %(metadata)s '
437
438
                   'where id = %(csid)s')
            cn.execute(sql, csid=csid, metadata=json.dumps(metadata))
439
            # delete changset_serie item
440
            sql = (f'delete from "{self.namespace}".changeset_series as css '
441
442
                   'where css.cset = %(csid)s')
            cn.execute(sql, csid=csid)
443

444
445
446
        snapshot = Snapshot(cn, self, seriename)
        snapshot.reclaim()

447
    def info(self, cn):
448
449
        """Gather global statistics on the current tshistory repository
        """
450
        sql = f'select count(*) from "{self.namespace}".registry'
451
        stats = {'series count': cn.execute(sql).scalar()}
452
        sql = f'select max(id) from "{self.namespace}".changeset'
453
        stats['changeset count'] = cn.execute(sql).scalar()
454
455
        sql = (f'select distinct seriename from "{self.namespace}".registry '
               'order by seriename')
456
        stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
457
458
        return stats

459
    def log(self, cn, limit=0, names=None, authors=None,
460
461
            fromrev=None, torev=None,
            fromdate=None, todate=None):
462
463
464
465
        """Build a structure showing the history of all the series in the db,
        per changeset, in chronological order.
        """
        log = []
466

467
        q = select(
468
469
            'cset.id', 'cset.author', 'cset.insertion_date', 'cset.metadata',
            opt='distinct'
470
        ).table(
471
472
473
474
475
            f'"{self.namespace}".changeset as cset'
        ).join(
            f'"{self.namespace}".changeset_series as css on css.cset = cset.id',
            f'"{self.namespace}".registry as reg on reg.id = css.serie'
        )
476

477
        if names:
478
479
480
            q.where(
                'reg.seriename in %(names)s',
                names=tuple(names)
481
            )
482
        if authors:
483
484
485
            q.where(
                'cset.author in %(authors)s',
                author=tuple(authors)
486
            )
487
        if fromrev:
488
            q.where('cset.id >= %(fromrev)s', fromrev=fromrev)
489
        if torev:
490
            q.where('cset.id <= %(torev)s', torev=torev)
491
        if fromdate:
492
            q.where('cset.insertion_date >= %(fromdate)s', fromdate=fromdate)
493
        if todate:
494
            q.where('cset.insertion_date <= %(todate)s', todate=todate)
495

496
        q.order('cset.id', 'desc')
497
        if limit:
498
            q.limit(int(limit))
499

500
        rset = q.do(cn)
501
        for csetid, author, revdate, meta in rset.fetchall():
502
            log.append({'rev': csetid, 'author': author,
503
                        'date': pd.Timestamp(revdate).tz_convert('utc'),
504
                        'meta': meta or {},
505
                        'name': self._changeset_series(cn, csetid)})
506

507
        log.sort(key=lambda rev: rev['rev'])
508
509
        return log

510
    def interval(self, cn, seriename, notz=False):
511
512
513
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            raise ValueError(f'no such serie: {seriename}')
514
        sql = (f'select tsstart, tsend '
515
516
517
               f'from "{self.namespace}.timeserie"."{tablename}" '
               f'order by cset desc limit 1')
        res = cn.execute(sql).fetchone()
518
        start, end = res.tsstart, res.tsend
519
        if self.metadata(cn, seriename).get('tzaware') and not notz:
520
521
522
            start, end = pd.Timestamp(start, tz='UTC'), pd.Timestamp(end, tz='UTC')
        return pd.Interval(left=start, right=end, closed='both')

523
524
    # /API
    # Helpers
525

526
527
    # creation / update

528
529
530
531
532
533
534
535
536
537
    def _guard_insert(self, newts):
        assert isinstance(newts, pd.Series), 'Not a pd.Series'
        assert not newts.index.duplicated().any(), 'There are some duplicates in the index'

        assert newts.index.notna().all(), 'The index contains NaT entries'
        if not newts.index.is_monotonic_increasing:
            newts = newts.sort_index()

        return num2float(newts)

538
    def _create(self, cn, newts, seriename, author, seriesmeta,
539
                metadata=None, insertion_date=None):
540
        start, end = start_end(newts, notz=False)
541
542
543
544
        if start is None:
            assert end is None
            # this is just full of nans
            return None
545
546
547
548
549
        # chop off unwanted nans
        newts = newts.loc[start:end]
        if len(newts) == 0:
            return None

550
551
552
        # at creation time we take an exclusive lock to avoid
        # a deadlock on created tables against the changeset-series fk
        cn.execute(
553
            f'select pg_advisory_xact_lock({self.create_lock_id})'
554
        )
555
        self._register_serie(cn, seriename, seriesmeta)
556
        snapshot = Snapshot(cn, self, seriename)
557
        csid = self._newchangeset(cn, author, insertion_date, metadata)
558
        head = snapshot.create(newts)
559
        start, end = start_end(newts)
560
561
562
563
564
565
        tablename = self._make_ts_table(cn, seriename)
        sql = (f'insert into "{self.namespace}.timeserie"."{tablename}" '
               '(cset, snapshot, tsstart, tsend) '
               f'values (%s, %s, %s, %s)')
        cn.execute(sql, csid, head,
                   start.to_pydatetime(), end.to_pydatetime())
566
        self._finalize_insertion(cn, csid, seriename)
567
        L.info('first insertion of %s (size=%s) by %s',
568
               seriename, len(newts), author)
569
570
        return newts

571
    def _update(self, cn, tablename, newts, seriename, author,
572
                metadata=None, insertion_date=None):
573
574
        self._validate(cn, newts, seriename)
        snapshot = Snapshot(cn, self, seriename)
575
576
577
        diff = self.diff(snapshot.last(newts.index.min(),
                                       newts.index.max()),
                         newts)
578
579
        if not len(diff):
            L.info('no difference in %s by %s (for ts of size %s)',
580
                   seriename, author, len(newts))
581
582
            return

583
        # compute series start/end stamps
584
        tsstart, tsend = start_end(newts)
585
586
587
        ival = self.interval(cn, seriename, notz=True)
        start = min(tsstart or ival.left, ival.left)
        end = max(tsend or ival.right, ival.right)
588
589
590
591
592
593
594
595
596
597
598
599
600
601

        if pd.isnull(diff[0]) or pd.isnull(diff[-1]):
            # we *might* be shrinking, let's look at the full series
            # and yes, shrinkers have a slow path
            last = snapshot.last()
            patched = self.patch(last, diff).dropna()
            if not len(patched):
                raise ValueError('complete erasure of a series is forbidden')
            if pd.isnull(diff[0]):
                start = patched.index[0]
            if pd.isnull(diff[-1]):
                end = patched.index[-1]

        csid = self._newchangeset(cn, author, insertion_date, metadata)
602
        head = snapshot.update(diff)
603
604
605
606
        sql = (f'insert into "{self.namespace}.timeserie"."{tablename}" '
               '(cset, snapshot, tsstart, tsend) '
               'values (%s, %s, %s, %s)')
        cn.execute(sql, csid, head, start, end)
607
        self._finalize_insertion(cn, csid, seriename)
608
609

        L.info('inserted diff (size=%s) for ts %s by %s',
610
               len(diff), seriename, author)
611
612
        return diff

613
    # serie table handling
614

615
616
617
618
    def _make_tablename(self, cn, seriename):
        """ compute the unqualified (no namespace) table name
        from a serie name, to allow arbitrary serie names
        """
619
620
        # default
        tablename = seriename
621
622
        # postgresql table names are limited to 63 chars.
        if len(seriename) > 63:
623
            tablename = hashlib.sha1(seriename.encode('utf-8')).hexdigest()
624
625
626
627
628
629

        # collision detection (collision can happen after a rename)
        if cn.execute(f'select table_name '
                      f'from "{self.namespace}".registry '
                      f'where table_name = %(seriename)s',
                      seriename=seriename).scalar():
630
            tablename = str(uuid.uuid4())
631

632
        return tablename
633

634
635
636
637
    def _serie_to_tablename(self, cn, seriename):
        tablename = self.serie_tablename.get(seriename)
        if tablename is not None:
            return tablename
638

639
640
641
642
643
        tablename = cn.execute(
            f'select table_name from "{self.namespace}".registry '
            f'where seriename = %(seriename)s',
            seriename=seriename
        ).scalar()
644
645
646
647
648
649
650
651
652
653
        if tablename is None:
            # creation time
            return
        self.serie_tablename[seriename] = tablename
        return tablename

    def _table_definition_for(self, cn, seriename):
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename is None:
            # creation time
Aurélien Campéas's avatar
Aurélien Campéas committed
654
            tablename = self._make_tablename(cn, seriename)
655
656
657
658
659
660
        table = sqlfile(
            SERIESSCHEMA,
            namespace=self.namespace,
            tablename=tablename
        )
        return table, tablename
661

Aurélien Campéas's avatar
Aurélien Campéas committed
662
    def _make_ts_table(self, cn, seriename):
663
664
665
        table, tablename = self._table_definition_for(cn, seriename)
        cn.execute(table)
        return tablename
666

667
    def _series_initial_meta(self, cn, name, ts):
668
        index = ts.index
669
        return {
670
671
672
673
674
            'tzaware': tzaware_serie(ts),
            'index_type': index.dtype.name,
            'index_dtype': index.dtype.str,
            'value_dtype': ts.dtypes.str,
            'value_type': ts.dtypes.name
675
676
        }

677
    def _register_serie(self, cn, seriename, seriesmeta):
678
679
680
681
682
        sql = (f'insert into "{self.namespace}".registry '
               '(seriename, table_name, metadata) '
               'values (%s, %s, %s) '
               'returning id')
        table_name = self._make_tablename(cn, seriename)
683
684
685
686
        regid = cn.execute(
            sql,
            seriename,
            table_name,
687
            json.dumps(seriesmeta)
688
        ).scalar()
689
        self.registry_map[seriename] = regid
690

691
    def _get_ts_table(self, cn, seriename):
692
693
        tablename = self._serie_to_tablename(cn, seriename)
        if tablename:
Aurélien Campéas's avatar
Aurélien Campéas committed
694
            return self._table_definition_for(cn, seriename)
695

696
697
    # changeset handling

698
699
700
    def _newchangeset(self, cn, author, insertion_date=None, metadata=None):
        if insertion_date is not None:
            assert insertion_date.tzinfo is not None
701
702
703
            idate = pd.Timestamp(insertion_date)
        else:
            idate = pd.Timestamp(datetime.utcnow(), tz='UTC')
704
        sql = (f'insert into "{self.namespace}".changeset '
705
706
707
708
709
710
711
712
713
714
715
               '(author, metadata, insertion_date) '
               'values (%s, %s, %s) '
               'returning id')
        if metadata:
            metadata = json.dumps(metadata)
        return cn.execute(
            sql,
            author,
            metadata,
            idate
        ).scalar()
716

717
    def _changeset_series(self, cn, csid):
718
        q = select(
719
            'seriename'
720
        ).table(
721
722
723
724
725
726
            f'"{self.namespace}".registry as reg',
        ).join(
            f'"{self.namespace}".changeset_series as css on css.serie = reg.id'
        ).where(
            'css.cset = %(csid)s', csid=csid
        )
727

728
        return q.do(cn).scalar()
729

730
731
    def _previous_cset(self, cn, seriename, csid):
        tablename = self._serie_to_tablename(cn, seriename)
732
        sql = (f'select cset from "{self.namespace}.timeserie"."{tablename}" '
733
               'where cset < %(csid)s '
734
               'order by cset desc limit 1')
735
736
        return cn.execute(sql, csid=csid).scalar()

737
738
    # insertion handling

739
    def _validate(self, cn, ts, seriename):
740
741
        if ts.isnull().all():
            # ts erasure
742
            return
743
        tstype = ts.dtype
744
        meta = self.metadata(cn, seriename)
745
        if tstype != meta['value_type']:
746
747
            m = (f'Type error when inserting {seriename}, '
                 f'new type is {tstype}, type in base is {meta["value_type"]}')
748
            raise Exception(m)
749
        if ts.index.dtype.name != meta['index_type']:
750
            raise Exception('Incompatible index types')
751

752
753
    def _name_to_regid(self, cn, seriename):
        regid = self.registry_map.get(seriename)
754
755
756
        if regid is not None:
            return regid

757
758
759
760
761
762
763
        sql = ('select id '
               f'from "{self.namespace}".registry '
               'where seriename = %(seriename)s')
        regid = self.registry_map[seriename] = cn.execute(
            sql,
            seriename=seriename
        ).scalar()
764
765
        return regid

766
    def _finalize_insertion(self, cn, csid, seriename):
767
768
769
770
        sql = (f'insert into "{self.namespace}".changeset_series '
               '(cset, serie) '
               'values (%s, %s)')
        cn.execute(sql, csid, self._name_to_regid(cn, seriename))
771

772
    def _revisions(self, cn, seriename,
773
774
775
776
                   from_insertion_date=None,
                   to_insertion_date=None,
                   from_value_date=None,
                   to_value_date=None):
777
        tablename = self._serie_to_tablename(cn, seriename)
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
        q = select(
            'cset.id', 'cset.insertion_date'
        ).table(
            f'"{self.namespace}.timeserie"."{tablename}" as ts'
        ).join(
            f'"{self.namespace}".changeset as cset on cset.id = ts.cset'
        )

        if from_insertion_date:
            q.where(
                'cset.insertion_date >= %(from_idate)s',
                from_idate=from_insertion_date
            )
        if to_insertion_date:
            q.where(
                'cset.insertion_date <= %(to_idate)s ',
                to_idate=to_insertion_date
            )

        if from_value_date or to_value_date:
            q.where(
                closed_overlaps(from_value_date, to_value_date),
                fromdate=from_value_date,
                todate=to_value_date
            )

        q.order('cset.id')
805
806
807
808
809
        return [
            (csid, pd.Timestamp(idate).astimezone('UTC'))
            for csid, idate in q.do(cn).fetchall()
        ]

810
811
812
    def _pruned_revisions(self, cn, seriename,
                          wanted_revisions,
                          revisions):
813
        """We attempt to build a pruned history insertion dates list using the
814
815
        wanted revisions as a driver: we want at most one
        insertion date for each wanted revision
816
817
818
819
820
821

        This is useful when there are more insertion dates than
        requested points

        """
        tzaware = self.metadata(cn, seriename).get('tzaware')
822
823
824
        pruned = []
        itervdates = reversed(wanted_revisions)
        iterrevs = reversed(revisions)
825
826
827
828
829
830
831
832
833
834
835
836
837

        # for each vdate we retain the nearest inferior insertion date
        # hence we never have more insertion dates than needed
        vdate = next(itervdates)
        while True:
            try:
                rev = next(iterrevs)
            except StopIteration:
                break
            compidate = rev[1]
            if not tzaware:
                compidate = compidate.replace(tzinfo=None)
            if vdate >= compidate:
838
                pruned.append(rev)
839
840
841
842
843
                try:
                    vdate = next(itervdates)
                except StopIteration:
                    break

844
        pruned.reverse()
845
        return revisions
846

847
    def _resetcaches(self):
848
849
850
851
        with self.cachelock:
            self.metadatacache.clear()
            self.registry_map.clear()
            self.serie_tablename.clear()
852
853


854
855
856
857
class historycache:

    def __init__(self, tsh, cn, name,
                 from_value_date=None,
858
                 to_value_date=None,
859
                 _wanted_insertion_dates=None):
860
861
862
863
        self.name = name
        self.hist = tsh.history(
            cn, name,
            from_value_date=from_value_date,
864
            to_value_date=to_value_date,
865
            _wanted_insertion_dates=_wanted_insertion_dates,
866
            _keep_nans=True
867
868
869
870
871
872
873
874
875
876
        )

    def get(self, revision_date=None,
            from_value_date=None,
            to_value_date=None):

        if not len(self.hist):
            return pd.Series(name=self.name)

        if revision_date is None:
877
            return list(self.hist.values())[-1].dropna()
878
879