tsio.py 4.85 KB
Newer Older
1
2
3
4
5
6
import pandas as pd
import numpy as np

from sqlalchemy import Column, Boolean, select, desc
from sqlalchemy.dialects.postgresql import BYTEA

7
from tshistory.util import tx
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from tshistory.tsio import TimeSerie as BaseTS


def join_index(ts1, ts2):
    if ts1 is None and ts2 is None:
        return None
    if ts1 is None:
        return ts2.index
    if ts2 is None:
        return ts1.index
    return ts1.index.union(ts2.index)


class TimeSerie(BaseTS):
    """This class refines the base `tshistory.TimeSerie` by adding a
    specific workflow on top of it.

    There are two kinds of series : automatically fetched, and
    manually imposed.  The idea is that some scrapper fetches the
    automatic series, and endusers sometimes override values from the
    automatic series.

    Say, one day, Serie X comes with a bogus value -1 for a given
    timestamp. The end user sees it and fixes it.

    But:

    * we don't want that the next automatic serie fetch with the bogus
      value override the fix

    * however whenever upstream fixes the value (that is provides a
      new one) we want the manual override to be replaced by the new
      value.

    We can explain the workflow like with a traditional DVCS graph,
43
    with two branches: "automatic" and "synthetic".
44
45
46
47

    All automatic fetches go into the automatic branch (and thus are
    diffed against each other).

48
    The synthetic series receive all the non-empty differences
49
50
51
52
53
    resulting from inserting to the automatic series, and also all the
    manual entries.

    The manual editions are computed as a diffs between synthetic and
    automatic series.
54
55
56

    """

57
58
59
60
    def __init__(self, *a, **kw):
        super().__init__(*a, **kw)
        self.auto_store = BaseTS(namespace='{}-automatic'.format(self.namespace))

61
    @tx
62
63
64
    def insert(self, cn, ts, name, author,
               metadata=None,
               _insertion_date=None, manual=False):
65
        if manual:
66
67
            diff = ts
        else:
68
            # insert & compute diff over automatic
69
            diff = self.auto_store.insert(
70
                cn, ts, name, author,
71
                metadata=metadata,
72
73
74
75
                _insertion_date=_insertion_date
            )
            if diff is None:
                return
76

77
        # insert the diff over automatic or the manual edit into synthetic
78
79
        a = super().insert(
            cn, diff, name, author,
80
            metadata=metadata,
81
82
83
            _insertion_date=_insertion_date
        )
        return a
84

85
    @tx
Aurélien Campéas's avatar
Aurélien Campéas committed
86
87
88
89
    def delete(self, cn, seriename):
        super().delete(cn, seriename)
        self.auto_store.delete(cn, seriename)

90
91
    # supervision specific API

92
    @tx
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
    def get_overrides(self, cn, name, revision_date=None,
                      from_value_date=None, to_value_date=None):
        autotsh = self.auto_store
        auto = autotsh.get(cn, name,
                           revision_date=revision_date,
                           from_value_date=from_value_date,
                           to_value_date=to_value_date,
                           _keep_nans=True)
        synth = self.get(cn, name,
                         revision_date=revision_date,
                         from_value_date=from_value_date,
                         to_value_date=to_value_date,
                         _keep_nans=True)
        manual = self.diff(auto, synth)

        manual.name = name
        return manual

111
    @tx
Aurélien Campéas's avatar
Aurélien Campéas committed
112
    def get_ts_marker(self, cn, name, revision_date=None,
113
                      from_value_date=None, to_value_date=None):
Aurélien Campéas's avatar
Aurélien Campéas committed
114
        table = self._get_ts_table(cn, name)
115
116
117
        if table is None:
            return None, None

118
        autotsh = self.auto_store
119
120
121
122
123
124
125
126
127
128
129
130
        auto = autotsh.get(cn, name,
                           revision_date=revision_date,
                           from_value_date=from_value_date,
                           to_value_date=to_value_date,
                           _keep_nans=True)
        synth = self.get(cn, name,
                         revision_date=revision_date,
                         from_value_date=from_value_date,
                         to_value_date=to_value_date,
                         _keep_nans=True)
        manual = self.diff(auto, synth)

131
132
133
134
135
        unionindex = join_index(auto, manual)
        if unionindex is None:
            # this means both series are empty
            return None, None

Arnaud Campeas's avatar
Arnaud Campeas committed
136
        mask_manual = pd.Series([False] * len(unionindex), index=unionindex)
137
138
139
140
        if manual is not None:
            mask_manual[manual.index] = True
            mask_manual.name = name

141
142
143
144
        ts = self.get(cn, name,
                      revision_date=revision_date,
                      from_value_date=from_value_date,
                      to_value_date=to_value_date)
145
146
        ts.name = name
        return ts, mask_manual