tsio.py 4.55 KB
Newer Older
1
2
3
4
5
6
import pandas as pd
import numpy as np

from sqlalchemy import Column, Boolean, select, desc
from sqlalchemy.dialects.postgresql import BYTEA

7
from tshistory.util import tx
8
from tshistory.tsio import timeseries as basets
9
10
11
12
13
14
15
16
17
18
19
20


def join_index(ts1, ts2):
    if ts1 is None and ts2 is None:
        return None
    if ts1 is None:
        return ts2.index
    if ts2 is None:
        return ts1.index
    return ts1.index.union(ts2.index)


21
class timeseries(basets):
22
    """This class refines the base `tshistory.timeseries` by adding a
23
24
    specific workflow on top of it.

25
26
27
    We sometimes work with series that automatically fetched from some
    upstream source, and then eventually manually corrected (by an
    expert in the data domain)
28

29
    Say, one day, series X comes with a bogus value -1 for a given
30
31
32
33
    timestamp. The end user sees it and fixes it.

    But:

34
    * we don't want that the next upstream series fetch with the bogus
35
36
37
38
39
40
41
      value override the fix

    * however whenever upstream fixes the value (that is provides a
      new one) we want the manual override to be replaced by the new
      value.

    We can explain the workflow like with a traditional DVCS graph,
42
    with two branches: "upstream" and "edited".
43

44
    All upstream fetches go into the upstream branch (and thus are
45
46
    diffed against each other).

47
48
    The edited series receive all the non-empty differences
    resulting from inserting to the upsmtream series, and also all the
49
50
    manual entries.

51
52
    The manual editions are computed as a diffs between edited and
    upstream series.
53
54
55

    """

56
57
    def __init__(self, *a, **kw):
        super().__init__(*a, **kw)
58
        self.upstream = basets(namespace='{}-automatic'.format(self.namespace))
59

60
    @tx
61
62
63
    def insert(self, cn, ts, name, author,
               metadata=None,
               _insertion_date=None, manual=False):
64
        if manual:
65
66
            diff = ts
        else:
67
            # insert & compute diff over automatic
68
            diff = self.upstream.insert(
69
                cn, ts, name, author,
70
                metadata=metadata,
71
72
73
74
                _insertion_date=_insertion_date
            )
            if diff is None:
                return
75

76
        # insert the diff over automatic or the manual edit into synthetic
77
78
        a = super().insert(
            cn, diff, name, author,
79
            metadata=metadata,
80
81
82
            _insertion_date=_insertion_date
        )
        return a
83

84
    @tx
Aurélien Campéas's avatar
Aurélien Campéas committed
85
86
    def delete(self, cn, seriename):
        super().delete(cn, seriename)
87
        self.upstream.delete(cn, seriename)
Aurélien Campéas's avatar
Aurélien Campéas committed
88

89
90
    # supervision specific API

91
    @tx
92
93
    def get_overrides(self, cn, name, revision_date=None,
                      from_value_date=None, to_value_date=None):
94
95
        upstreamtsh = self.upstream
        upstream = upstreamtsh.get(cn, name,
96
97
98
99
                           revision_date=revision_date,
                           from_value_date=from_value_date,
                           to_value_date=to_value_date,
                           _keep_nans=True)
100
        edited = self.get(cn, name,
101
102
103
104
                         revision_date=revision_date,
                         from_value_date=from_value_date,
                         to_value_date=to_value_date,
                         _keep_nans=True)
105
        manual = self.diff(upstream, edited)
106
107
108
109

        manual.name = name
        return manual

110
    @tx
Aurélien Campéas's avatar
Aurélien Campéas committed
111
    def get_ts_marker(self, cn, name, revision_date=None,
112
                      from_value_date=None, to_value_date=None):
Aurélien Campéas's avatar
Aurélien Campéas committed
113
        table = self._get_ts_table(cn, name)
114
115
116
        if table is None:
            return None, None

117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
        upstreamtsh = self.upstream
        upstream = upstreamtsh.get(
            cn, name,
            revision_date=revision_date,
            from_value_date=from_value_date,
            to_value_date=to_value_date,
            _keep_nans=True
        )
        edited = self.get(
            cn, name,
            revision_date=revision_date,
            from_value_date=from_value_date,
            to_value_date=to_value_date,
            _keep_nans=True
        )
        manual = self.diff(upstream, edited)
133

134
        unionindex = join_index(upstream, manual)
135
136
137
138
        if unionindex is None:
            # this means both series are empty
            return None, None

Arnaud Campeas's avatar
Arnaud Campeas committed
139
        mask_manual = pd.Series([False] * len(unionindex), index=unionindex)
140
141
142
143
        if manual is not None:
            mask_manual[manual.index] = True
            mask_manual.name = name

144
        return edited.dropna(), mask_manual