tsio.py 4.12 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import pandas as pd
import numpy as np

from sqlalchemy import Column, Boolean, select, desc
from sqlalchemy.dialects.postgresql import BYTEA

from tshistory.tsio import TimeSerie as BaseTS


def join_index(ts1, ts2):
    if ts1 is None and ts2 is None:
        return None
    if ts1 is None:
        return ts2.index
    if ts2 is None:
        return ts1.index
    return ts1.index.union(ts2.index)


class TimeSerie(BaseTS):
    """This class refines the base `tshistory.TimeSerie` by adding a
    specific workflow on top of it.

    There are two kinds of series : automatically fetched, and
    manually imposed.  The idea is that some scrapper fetches the
    automatic series, and endusers sometimes override values from the
    automatic series.

    Say, one day, Serie X comes with a bogus value -1 for a given
    timestamp. The end user sees it and fixes it.

    But:

    * we don't want that the next automatic serie fetch with the bogus
      value override the fix

    * however whenever upstream fixes the value (that is provides a
      new one) we want the manual override to be replaced by the new
      value.

    We can explain the workflow like with a traditional DVCS graph,
42
    with three branches: "automatic", "manual" and "synthetic".
43
44
45
46

    All automatic fetches go into the automatic branch (and thus are
    diffed against each other).

47
48
49
    The synthetic series receive all the non-empty differences
    resulting from inserting to the automatic series, and also
    all the manual entries.
50

51
    The manual editions can be computed as a diff between synthetic
52
53
    and automatic series, but for convenience we also store them
    explicitly.
54
55
    """

56
57
    def insert(self, cn, ts, name, author, _insertion_date=None, manual=False):
        if manual:
58
            basetsh = BaseTS(namespace='{}-manual'.format(self.namespace))
59
            # insert & compute diff over synthetic
60
            diff = basetsh.insert(
61
                cn, ts, name, author, _insertion_date=_insertion_date
62
63
            )

64
65
66
67
68
            if diff is None:
                return
            diff = ts

        else:
69
            # insert & compute diff over automatic
70
            basetsh = BaseTS(namespace='{}-automatic'.format(self.namespace))
71
72
73
74
75
76
            diff = basetsh.insert(
                cn, ts, name, author,
                _insertion_date=_insertion_date
            )
            if diff is None:
                return
77

78
        # insert the diff over automatic or the manual edit into synthetic
79
80
81
82
83
        a = super().insert(
            cn, diff, name, author,
            _insertion_date=_insertion_date
        )
        return a
84
85
86

    # supervision specific API

Aurélien Campéas's avatar
Aurélien Campéas committed
87
    def get_ts_marker(self, cn, name, revision_date=None,
88
                      from_value_date=None, to_value_date=None):
Aurélien Campéas's avatar
Aurélien Campéas committed
89
        table = self._get_ts_table(cn, name)
90
91
92
        if table is None:
            return None, None

93
        autotsh = BaseTS(namespace='{}-automatic'.format(self.namespace))
94
95
96
97
98
99
100
101
102
103
104
105
        auto = autotsh.get(cn, name,
                           revision_date=revision_date,
                           from_value_date=from_value_date,
                           to_value_date=to_value_date,
                           _keep_nans=True)
        synth = self.get(cn, name,
                         revision_date=revision_date,
                         from_value_date=from_value_date,
                         to_value_date=to_value_date,
                         _keep_nans=True)
        manual = self.diff(auto, synth)

106
107
108
109
110
        unionindex = join_index(auto, manual)
        if unionindex is None:
            # this means both series are empty
            return None, None

Arnaud Campeas's avatar
Arnaud Campeas committed
111
        mask_manual = pd.Series([False] * len(unionindex), index=unionindex)
112
113
114
115
        if manual is not None:
            mask_manual[manual.index] = True
            mask_manual.name = name

116
117
118
119
        ts = self.get(cn, name,
                      revision_date=revision_date,
                      from_value_date=from_value_date,
                      to_value_date=to_value_date)
120
121
        ts.name = name
        return ts, mask_manual