tsio.py 4.18 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import pandas as pd
import numpy as np

from sqlalchemy import Column, Boolean, select, desc
from sqlalchemy.dialects.postgresql import BYTEA

from tshistory.tsio import TimeSerie as BaseTS


def join_index(ts1, ts2):
    if ts1 is None and ts2 is None:
        return None
    if ts1 is None:
        return ts2.index
    if ts2 is None:
        return ts1.index
    return ts1.index.union(ts2.index)


class TimeSerie(BaseTS):
    """This class refines the base `tshistory.TimeSerie` by adding a
    specific workflow on top of it.

    There are two kinds of series : automatically fetched, and
    manually imposed.  The idea is that some scrapper fetches the
    automatic series, and endusers sometimes override values from the
    automatic series.

    Say, one day, Serie X comes with a bogus value -1 for a given
    timestamp. The end user sees it and fixes it.

    But:

    * we don't want that the next automatic serie fetch with the bogus
      value override the fix

    * however whenever upstream fixes the value (that is provides a
      new one) we want the manual override to be replaced by the new
      value.

    We can explain the workflow like with a traditional DVCS graph,
42
    with three branches: "automatic", "manual" and "synthetic".
43
44
45
46

    All automatic fetches go into the automatic branch (and thus are
    diffed against each other).

47
48
49
    The synthetic series receive all the non-empty differences
    resulting from inserting to the automatic series, and also
    all the manual entries.
50

51
    The manual editions can be computed as a diff between synthetic
52
53
    and automatic series, but for convenience we also store them
    explicitly.
54
55
    """

56
57
58
59
60
    def __init__(self, *a, **kw):
        super().__init__(*a, **kw)
        self.manual_store = BaseTS(namespace='{}-manual'.format(self.namespace))
        self.auto_store = BaseTS(namespace='{}-automatic'.format(self.namespace))

61
62
63
    def insert(self, cn, ts, name, author, _insertion_date=None, manual=False):
        if manual:
            # insert & compute diff over synthetic
64
            diff = self.manual_store.insert(
65
                cn, ts, name, author, _insertion_date=_insertion_date
66
67
            )

68
69
70
71
72
            if diff is None:
                return
            diff = ts

        else:
73
            # insert & compute diff over automatic
74
            diff = self.auto_store.insert(
75
76
77
78
79
                cn, ts, name, author,
                _insertion_date=_insertion_date
            )
            if diff is None:
                return
80

81
        # insert the diff over automatic or the manual edit into synthetic
82
83
84
85
86
        a = super().insert(
            cn, diff, name, author,
            _insertion_date=_insertion_date
        )
        return a
87
88
89

    # supervision specific API

Aurélien Campéas's avatar
Aurélien Campéas committed
90
    def get_ts_marker(self, cn, name, revision_date=None,
91
                      from_value_date=None, to_value_date=None):
Aurélien Campéas's avatar
Aurélien Campéas committed
92
        table = self._get_ts_table(cn, name)
93
94
95
        if table is None:
            return None, None

96
        autotsh = self.auto_store
97
98
99
100
101
102
103
104
105
106
107
108
        auto = autotsh.get(cn, name,
                           revision_date=revision_date,
                           from_value_date=from_value_date,
                           to_value_date=to_value_date,
                           _keep_nans=True)
        synth = self.get(cn, name,
                         revision_date=revision_date,
                         from_value_date=from_value_date,
                         to_value_date=to_value_date,
                         _keep_nans=True)
        manual = self.diff(auto, synth)

109
110
111
112
113
        unionindex = join_index(auto, manual)
        if unionindex is None:
            # this means both series are empty
            return None, None

Arnaud Campeas's avatar
Arnaud Campeas committed
114
        mask_manual = pd.Series([False] * len(unionindex), index=unionindex)
115
116
117
118
        if manual is not None:
            mask_manual[manual.index] = True
            mask_manual.name = name

119
120
121
122
        ts = self.get(cn, name,
                      revision_date=revision_date,
                      from_value_date=from_value_date,
                      to_value_date=to_value_date)
123
124
        ts.name = name
        return ts, mask_manual