tsio.py 10.4 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import pandas as pd
import numpy as np

from sqlalchemy import Column, Boolean, select, desc
from sqlalchemy.dialects.postgresql import BYTEA

from tshistory.tsio import TimeSerie as BaseTS


def join_index(ts1, ts2):
    if ts1 is None and ts2 is None:
        return None
    if ts1 is None:
        return ts2.index
    if ts2 is None:
        return ts1.index
    return ts1.index.union(ts2.index)


class TimeSerie(BaseTS):
    """This class refines the base `tshistory.TimeSerie` by adding a
    specific workflow on top of it.

    There are two kinds of series : automatically fetched, and
    manually imposed.  The idea is that some scrapper fetches the
    automatic series, and endusers sometimes override values from the
    automatic series.

    Say, one day, Serie X comes with a bogus value -1 for a given
    timestamp. The end user sees it and fixes it.

    But:

    * we don't want that the next automatic serie fetch with the bogus
      value override the fix

    * however whenever upstream fixes the value (that is provides a
      new one) we want the manual override to be replaced by the new
      value.

    We can explain the workflow like with a traditional DVCS graph,
    with two branches: "automatic" and "manual".

    All automatic fetches go into the automatic branch (and thus are
    diffed against each other).

    The manual series are rooted from the (current) top of the
    automatic series, but live in their own branch.

    As soon as a new automatic serie is inserted, it is also *merged*
    on top of the manual branch.

    Hence, the manual branch contains all the series + edits, and
    whenever an automatic serie fixes an old error, it is merged into
    the series + edits branch, which contains the correct synthesis.

    The concrete implementation is not Hg/Git since it uses a
    single-parent model. We use filtering and a modicum of ad-hoc
    transformations to achieve the desired effect.

    """
    _saveme = None
    _snapshot_interval = 100

65
66
67
    def insert(self, cn, ts, name, author=None,
               _insertion_date=None,
               extra_scalars={}):
Aurélien Campéas's avatar
Aurélien Campéas committed
68
        initial_insertion = not self.exists(cn, name)
69
70
71
72
73
        if initial_insertion and not extra_scalars.get('manual', False):
            if ts.isnull().all():
                return None
            ts = ts[~ts.isnull()]
            self._saveme = {'autosnapshot': ts}
Aurélien Campéas's avatar
Aurélien Campéas committed
74
        diff = super(TimeSerie, self).insert(cn, ts, name, author=author,
75
                                             _insertion_date=_insertion_date,
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
                                             extra_scalars=extra_scalars)

        return diff

    # log

    def log(self, cn, *args, **kw):
        logs = super(TimeSerie, self).log(cn, *args, **kw)

        for rev in logs:
            rev['manual'] = attrs = {}
            for name in rev['names']:
                attrs[name] = self._manual_value(cn, rev['rev'], name)

        return logs

    def _manual_value(self, cn, csetid, seriename):
        table = self._table_definition_for(seriename)
        sql = select([table.c.manual]).where(table.c.csid == csetid)
        return cn.execute(sql).scalar()

    # /log

    def _table_definition_for(self, tablename):
        tdef = super(TimeSerie, self)._table_definition_for(tablename)
        tdef.append_column(Column('manual', Boolean, default=False, index=True))
        tdef.append_column(Column('autosnapshot', BYTEA))
        return tdef

    def _complete_insertion_value(self, value, extra_scalars):
        if extra_scalars:
            value.update(extra_scalars)

        if self._saveme is not None:
            value.update({k: self._serialize(v)
                          for k, v in self._saveme.items()}
            )
            self._saveme = None

Aurélien Campéas's avatar
Aurélien Campéas committed
115
    def _latest_item(self, cn, table, column):
116
117
118
119
        # fetch the top-level things (e.g. snapshot, autosnapshot)
        sql = select([table.c[column]]
        ).order_by(desc(table.c.id)
        ).limit(1)
Aurélien Campéas's avatar
Aurélien Campéas committed
120
        return cn.execute(sql).scalar()
121

Aurélien Campéas's avatar
Aurélien Campéas committed
122
123
    def _purge_snapshot_at(self, cn, table, diffid):
        cn.execute(
124
125
126
127
128
            table.update(
            ).where(table.c.id == diffid
            ).values(snapshot=None, autosnapshot=None)
        )

Aurélien Campéas's avatar
Aurélien Campéas committed
129
130
    def _compute_diff_and_newsnapshot(self, cn, table, newts, manual=False):
        auto = self._latest_item(cn, table, 'autosnapshot')
131
        if auto is None:
Aurélien Campéas's avatar
Aurélien Campéas committed
132
            auto = self._build_snapshot_upto(cn, table,
133
134
135
                                             [lambda _, table: table.c.manual == False])
        else:
            auto = self._deserialize(auto, table.name)
Aurélien Campéas's avatar
Aurélien Campéas committed
136
        synthetic = self._build_snapshot_upto(cn, table)
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156

        self._validate_type(auto, newts, table.name)
        self._validate_type(synthetic, newts, table.name)
        # this is the diff between our computed parent
        diff = self._compute_diff(synthetic if manual else auto,
                                  newts)

        if len(diff) == 0:
            return None, None

        # maintain the auto snapshot
        self._saveme = {
            'autosnapshot': auto if manual else self._apply_diff(auto, diff)
        }

        return diff, self._apply_diff(synthetic, diff)

    # we still need a full-blown history reconstruction routine
    # for arbitrary revision_dates

Aurélien Campéas's avatar
Aurélien Campéas committed
157
    def _build_snapshots_upto(self, cn, table, qfilter,
158
                              from_value_date=None, to_value_date=None):
Aurélien Campéas's avatar
Aurélien Campéas committed
159
        snapid, synthsnap = self._find_snapshot(cn, table, qfilter,
160
161
                                                from_value_date=from_value_date,
                                                to_value_date=to_value_date)
Aurélien Campéas's avatar
Aurélien Campéas committed
162
        auto_snapid, autosnap = self._find_snapshot(cn, table, qfilter,
163
164
165
                                                    column='autosnapshot',
                                                    from_value_date=from_value_date,
                                                    to_value_date=to_value_date)
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
        if snapid is None:
            return None, None  # yes, we can be asked fancy revision dates

        if auto_snapid is not None:
            assert snapid == auto_snapid

        cset = self.schema.changeset
        sql = select([table.c.id,
                      table.c.diff,
                      table.c.parent,
                      table.c.manual,
                      cset.c.insertion_date]
        ).order_by(table.c.id
        ).where(table.c.csid == cset.c.id
        ).where(table.c.id > snapid)

        for filtercb in qfilter:
            sql = sql.where(filtercb(cset, table))

Aurélien Campéas's avatar
Aurélien Campéas committed
185
        alldiffs = pd.read_sql(sql, cn)
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210

        if len(alldiffs) == 0:
            manual_ts = self._compute_diff(autosnap, synthsnap)
            return autosnap, manual_ts

        # rebuild automatic & manual residual starting
        # from the last known (synthetic) state
        synth_ts = synthsnap
        auto_ts = autosnap if autosnap is not None else pd.Series()

        for _, row in alldiffs.iterrows():
            diff = self._deserialize(row['diff'], table.name)

            if row['manual']:
                synth_ts = self._apply_diff(synth_ts, diff)
            else:
                auto_ts = self._apply_diff(auto_ts, diff)
                # merging auto into manual
                # we erase all the elements that have been edited
                # by the auto diff
                synth_ts = synth_ts[~synth_ts.index.isin(diff.index)]

        manual_ts = self._compute_diff(auto_ts, synth_ts)
        return auto_ts, manual_ts

Aurélien Campéas's avatar
Aurélien Campéas committed
211
    def _onthefly(self, cn, table, revision_date,
212
                  from_value_date=None, to_value_date=None):
213
214
215
        qfilter = []
        if revision_date:
            qfilter.append(lambda cset, _: cset.c.insertion_date <= revision_date)
Aurélien Campéas's avatar
Aurélien Campéas committed
216
        return self._build_snapshots_upto(cn, table, qfilter,
217
218
                                          from_value_date=from_value_date,
                                          to_value_date=to_value_date)
219
220
221

    # public API redefinition

Aurélien Campéas's avatar
Aurélien Campéas committed
222
223
    def get(self, cn, name, revision_date=None):
        table = self._get_ts_table(cn, name)
224
225
226
227
        if table is None:
            return

        if revision_date:
Aurélien Campéas's avatar
Aurélien Campéas committed
228
            auto, residualmanual = self._onthefly(cn, table, revision_date)
229
230
231
            ts = self._apply_diff(auto, residualmanual)
        else:
            # fetch the top-level snapshot
Aurélien Campéas's avatar
Aurélien Campéas committed
232
            synthetic = self._latest_item(cn, table, 'snapshot')
233
            if synthetic is None: # head just got chopped
Aurélien Campéas's avatar
Aurélien Campéas committed
234
                ts = self._build_snapshot_upto(cn, table)
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
            else:
                ts = self._deserialize(synthetic, name)

        if ts is not None:
            ts.name = name
            ts = ts[~ts.isnull()]
        return ts

    # updated to account for the possibility of stripping changesets
    # of their series diffs

    def _diff(self, cn, csetid, name):
        table = self._get_ts_table(cn, name)
        cset = self.schema.changeset

        def filtercset(sql):
            return sql.where(table.c.csid == cset.c.id
            ).where(cset.c.id == csetid)

        sql = filtercset(select([table.c.id]))
        tsid = cn.execute(sql).scalar()

        # that guy was stripped
        if tsid is None:
            return pd.Series()

        if tsid == 1:
            sql = select([table.c.snapshot])
        else:
            sql = select([table.c.diff])
        sql = filtercset(sql)

        return self._deserialize(cn.execute(sql).scalar(), name)

    # supervision specific API

Aurélien Campéas's avatar
Aurélien Campéas committed
271
    def get_ts_marker(self, cn, name, revision_date=None,
272
                      from_value_date=None, to_value_date=None):
Aurélien Campéas's avatar
Aurélien Campéas committed
273
        table = self._get_ts_table(cn, name)
274
275
276
        if table is None:
            return None, None

Aurélien Campéas's avatar
Aurélien Campéas committed
277
        auto, manual = self._onthefly(cn, table, revision_date,
278
279
                                      from_value_date=from_value_date,
                                      to_value_date=to_value_date)
280
281
282
283
284
285
286
287
288
289
290
291
292
293
        unionindex = join_index(auto, manual)
        if unionindex is None:
            # this means both series are empty
            return None, None

        mask_manual = pd.Series([False], index=unionindex)
        if manual is not None:
            mask_manual[manual.index] = True
            mask_manual.name = name

        ts = self._apply_diff(auto, manual)
        ts = ts[~ts.isnull()]
        ts.name = name
        return ts, mask_manual