tsio.py 7.67 KB
Newer Older
1
2
3
4
5
6
import pandas as pd
import numpy as np

from sqlalchemy import Column, Boolean, select, desc
from sqlalchemy.dialects.postgresql import BYTEA

7
from tshistory.util import tx
8
from tshistory.tsio import timeseries as basets
9
10
11
12
13
14
15
16
17
18
19
20


def join_index(ts1, ts2):
    if ts1 is None and ts2 is None:
        return None
    if ts1 is None:
        return ts2.index
    if ts2 is None:
        return ts1.index
    return ts1.index.union(ts2.index)


21
class timeseries(basets):
22
    """This class refines the base `tshistory.timeseries` by adding a
23
24
    specific workflow on top of it.

25
26
27
    We sometimes work with series that automatically fetched from some
    upstream source, and then eventually manually corrected (by an
    expert in the data domain)
28

29
    Say, one day, series X comes with a bogus value -1 for a given
30
31
32
33
    timestamp. The end user sees it and fixes it.

    But:

34
    * we don't want that the next upstream series fetch with the bogus
35
36
37
38
39
40
41
      value override the fix

    * however whenever upstream fixes the value (that is provides a
      new one) we want the manual override to be replaced by the new
      value.

    We can explain the workflow like with a traditional DVCS graph,
42
    with two branches: "upstream" and "edited".
43

44
    All upstream fetches go into the upstream branch (and thus are
45
46
    diffed against each other).

47
48
    The edited series receive all the non-empty differences
    resulting from inserting to the upsmtream series, and also all the
49
50
    manual entries.

51
52
    The manual editions are computed as a diffs between edited and
    upstream series.
53
54

    """
55
56
57
58
59
60
61
62
63
64
    metakeys = {
        'tzaware',
        'index_type',
        'index_dtype',
        'value_dtype',
        'value_type',
        # novelty
        'supervision_status'
    }
    supervision_states = ('unsupervised', 'supervised', 'handcrafted')
65

66
67
    def __init__(self, *a, **kw):
        super().__init__(*a, **kw)
68
        self.upstream = basets(namespace='{}-upstream'.format(self.namespace))
69

70
71
72
73
74
75
    def supervision_status(self, cn, name):
        meta = self.metadata(cn, name)
        if meta:
            return meta.get('supervision_status', 'unsupervised')
        return 'unsupervised'

76
    @tx
77
78
79
    def insert(self, cn, ts, name, author,
               metadata=None,
               _insertion_date=None, manual=False):
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
        if not self.exists(cn, name):
            # initial insert
            diff = super().insert(
                cn, ts, name, author,
                metadata=metadata,
                _insertion_date=_insertion_date
            )
            # the super call create the initial meta, let's complete it
            meta = self.metadata(cn, name)
            meta['supervision_status'] = 'handcrafted' if manual else 'unsupervised'
            self.update_metadata(cn, name, meta, internal=True)
            return diff

        supervision_status = self.supervision_status(cn, name)

        if supervision_status == 'unsupervised':
            if manual:
                # first supervised insert
                # let's take a copy of the current series state
                # into upstream and proceed forward
                current = self.get(cn, name)
                self.upstream.insert(
                    cn, current, name, author,
                    metadata=metadata,
                    _insertion_date=_insertion_date
                )
                # update supervision status
                meta = self.metadata(cn, name)
                meta['supervision_status'] = 'supervised'
                self.update_metadata(cn, name, meta, internal=True)
            # now insert what we got
            return super().insert(
                cn, ts, name, author,
                metadata=metadata,
                _insertion_date=_insertion_date
            )

        assert supervision_status in ('supervised', 'handcrafted')
118
        if manual:
119
120
            diff = ts
        else:
Aurélien Campéas's avatar
Aurélien Campéas committed
121
            # insert & compute diff over upstream
122
            diff = self.upstream.insert(
123
                cn, ts, name, author,
124
                metadata=metadata,
125
126
                _insertion_date=_insertion_date
            )
127
128
129
130
131
132
133

            if supervision_status == 'handcrafted':
                # update supervision status
                meta = self.metadata(cn, name)
                meta['supervision_status'] = 'supervised'
                self.update_metadata(cn, name, meta, internal=True)

134
135
            if diff is None:
                return
136

Aurélien Campéas's avatar
Aurélien Campéas committed
137
        # insert the diff over upstream or the manual edit into edited
138
139
        a = super().insert(
            cn, diff, name, author,
140
            metadata=metadata,
141
142
143
            _insertion_date=_insertion_date
        )
        return a
144

145
    @tx
Aurélien Campéas's avatar
Aurélien Campéas committed
146
147
    def delete(self, cn, seriename):
        super().delete(cn, seriename)
148
        self.upstream.delete(cn, seriename)
Aurélien Campéas's avatar
Aurélien Campéas committed
149

150
151
152
153
154
    @tx
    def rename(self, cn, oldname, newname):
        super().rename(cn, oldname, newname)
        self.upstream.rename(cn, oldname, newname)

155
156
157
158
159
160
161
    @tx
    def strip(self, cn, name, csid):
        if self.supervision_status(cn, name) == 'supervised':
            raise ValueError(f'supervised series `{name}` cannot be striped')

        super().strip(cn, name, csid)

162
163
    # supervision specific API

164
    @tx
165
166
    def get_overrides(self, cn, name, revision_date=None,
                      from_value_date=None, to_value_date=None):
167
168
        upstreamtsh = self.upstream
        upstream = upstreamtsh.get(cn, name,
169
170
171
172
                           revision_date=revision_date,
                           from_value_date=from_value_date,
                           to_value_date=to_value_date,
                           _keep_nans=True)
173
        edited = self.get(cn, name,
174
175
176
177
                         revision_date=revision_date,
                         from_value_date=from_value_date,
                         to_value_date=to_value_date,
                         _keep_nans=True)
178
        manual = self.diff(upstream, edited)
179
180
181
182

        manual.name = name
        return manual

183
    @tx
Aurélien Campéas's avatar
Aurélien Campéas committed
184
    def get_ts_marker(self, cn, name, revision_date=None,
185
                      from_value_date=None, to_value_date=None):
Aurélien Campéas's avatar
Aurélien Campéas committed
186
        table = self._get_ts_table(cn, name)
187
188
189
        if table is None:
            return None, None

190
        edited = self.get(
191
192
193
194
195
196
            cn, name,
            revision_date=revision_date,
            from_value_date=from_value_date,
            to_value_date=to_value_date,
            _keep_nans=True
        )
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
        if edited is None:
            # because of a revision_date
            return None, None

        supervision = self.supervision_status(cn, name)
        if supervision in ('unsupervised', 'handcrafted'):
            flags = pd.Series(
                [supervision == 'handcrafted'] * len(edited.index),
                index=edited.index
            )
            flags.name = name
            return edited.dropna(), flags

        upstreamtsh = self.upstream
        upstream = upstreamtsh.get(
212
213
214
215
216
217
218
            cn, name,
            revision_date=revision_date,
            from_value_date=from_value_date,
            to_value_date=to_value_date,
            _keep_nans=True
        )
        manual = self.diff(upstream, edited)
219

220
        unionindex = join_index(upstream, manual)
221
222
223
224
        if unionindex is None:
            # this means both series are empty
            return None, None

225
226
227
228
        mask_manual = pd.Series(
            [False] * len(unionindex),
            index=unionindex
        )
229
230
231
232
        if manual is not None:
            mask_manual[manual.index] = True
            mask_manual.name = name

233
        return edited.dropna(), mask_manual