tsio.py 10.3 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import pandas as pd
import numpy as np

from sqlalchemy import Column, Boolean, select, desc
from sqlalchemy.dialects.postgresql import BYTEA

from tshistory.tsio import TimeSerie as BaseTS


def join_index(ts1, ts2):
    if ts1 is None and ts2 is None:
        return None
    if ts1 is None:
        return ts2.index
    if ts2 is None:
        return ts1.index
    return ts1.index.union(ts2.index)


class TimeSerie(BaseTS):
    """This class refines the base `tshistory.TimeSerie` by adding a
    specific workflow on top of it.

    There are two kinds of series : automatically fetched, and
    manually imposed.  The idea is that some scrapper fetches the
    automatic series, and endusers sometimes override values from the
    automatic series.

    Say, one day, Serie X comes with a bogus value -1 for a given
    timestamp. The end user sees it and fixes it.

    But:

    * we don't want that the next automatic serie fetch with the bogus
      value override the fix

    * however whenever upstream fixes the value (that is provides a
      new one) we want the manual override to be replaced by the new
      value.

    We can explain the workflow like with a traditional DVCS graph,
    with two branches: "automatic" and "manual".

    All automatic fetches go into the automatic branch (and thus are
    diffed against each other).

    The manual series are rooted from the (current) top of the
    automatic series, but live in their own branch.

    As soon as a new automatic serie is inserted, it is also *merged*
    on top of the manual branch.

    Hence, the manual branch contains all the series + edits, and
    whenever an automatic serie fixes an old error, it is merged into
    the series + edits branch, which contains the correct synthesis.

    The concrete implementation is not Hg/Git since it uses a
    single-parent model. We use filtering and a modicum of ad-hoc
    transformations to achieve the desired effect.

    """
    _saveme = None
    _snapshot_interval = 100

    def insert(self, cnx, ts, name, author=None, extra_scalars={}):
        initial_insertion = not self.exists(cnx, name)
        if initial_insertion and not extra_scalars.get('manual', False):
            if ts.isnull().all():
                return None
            ts = ts[~ts.isnull()]
            self._saveme = {'autosnapshot': ts}
        diff = super(TimeSerie, self).insert(cnx, ts, name, author=author,
                                             extra_scalars=extra_scalars)

        return diff

    # log

    def log(self, cn, *args, **kw):
        logs = super(TimeSerie, self).log(cn, *args, **kw)

        for rev in logs:
            rev['manual'] = attrs = {}
            for name in rev['names']:
                attrs[name] = self._manual_value(cn, rev['rev'], name)

        return logs

    def _manual_value(self, cn, csetid, seriename):
        table = self._table_definition_for(seriename)
        sql = select([table.c.manual]).where(table.c.csid == csetid)
        return cn.execute(sql).scalar()

    # /log

    def _table_definition_for(self, tablename):
        tdef = super(TimeSerie, self)._table_definition_for(tablename)
        tdef.append_column(Column('manual', Boolean, default=False, index=True))
        tdef.append_column(Column('autosnapshot', BYTEA))
        return tdef

    def _complete_insertion_value(self, value, extra_scalars):
        if extra_scalars:
            value.update(extra_scalars)

        if self._saveme is not None:
            value.update({k: self._serialize(v)
                          for k, v in self._saveme.items()}
            )
            self._saveme = None

    def _latest_item(self, cnx, table, column):
        # fetch the top-level things (e.g. snapshot, autosnapshot)
        sql = select([table.c[column]]
        ).order_by(desc(table.c.id)
        ).limit(1)
        return cnx.execute(sql).scalar()

    def _purge_snapshot_at(self, cnx, table, diffid):
        cnx.execute(
            table.update(
            ).where(table.c.id == diffid
            ).values(snapshot=None, autosnapshot=None)
        )

    def _compute_diff_and_newsnapshot(self, cnx, table, newts, manual=False):
        auto = self._latest_item(cnx, table, 'autosnapshot')
        if auto is None:
            auto = self._build_snapshot_upto(cnx, table,
                                             [lambda _, table: table.c.manual == False])
        else:
            auto = self._deserialize(auto, table.name)
        synthetic = self._build_snapshot_upto(cnx, table)

        self._validate_type(auto, newts, table.name)
        self._validate_type(synthetic, newts, table.name)
        # this is the diff between our computed parent
        diff = self._compute_diff(synthetic if manual else auto,
                                  newts)

        if len(diff) == 0:
            return None, None

        # maintain the auto snapshot
        self._saveme = {
            'autosnapshot': auto if manual else self._apply_diff(auto, diff)
        }

        return diff, self._apply_diff(synthetic, diff)

    # we still need a full-blown history reconstruction routine
    # for arbitrary revision_dates

154
155
156
157
158
    def _build_snapshots_upto(self, cnx, table, qfilter,
                              from_value_date=None, to_value_date=None):
        snapid, synthsnap = self._find_snapshot(cnx, table, qfilter,
                                                from_value_date=from_value_date,
                                                to_value_date=to_value_date)
159
        auto_snapid, autosnap = self._find_snapshot(cnx, table, qfilter,
160
161
162
                                                    column='autosnapshot',
                                                    from_value_date=from_value_date,
                                                    to_value_date=to_value_date)
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
        if snapid is None:
            return None, None  # yes, we can be asked fancy revision dates

        if auto_snapid is not None:
            assert snapid == auto_snapid

        cset = self.schema.changeset
        sql = select([table.c.id,
                      table.c.diff,
                      table.c.parent,
                      table.c.manual,
                      cset.c.insertion_date]
        ).order_by(table.c.id
        ).where(table.c.csid == cset.c.id
        ).where(table.c.id > snapid)

        for filtercb in qfilter:
            sql = sql.where(filtercb(cset, table))

        alldiffs = pd.read_sql(sql, cnx)

        if len(alldiffs) == 0:
            manual_ts = self._compute_diff(autosnap, synthsnap)
            return autosnap, manual_ts

        # rebuild automatic & manual residual starting
        # from the last known (synthetic) state
        synth_ts = synthsnap
        auto_ts = autosnap if autosnap is not None else pd.Series()

        for _, row in alldiffs.iterrows():
            diff = self._deserialize(row['diff'], table.name)

            if row['manual']:
                synth_ts = self._apply_diff(synth_ts, diff)
            else:
                auto_ts = self._apply_diff(auto_ts, diff)
                # merging auto into manual
                # we erase all the elements that have been edited
                # by the auto diff
                synth_ts = synth_ts[~synth_ts.index.isin(diff.index)]

        manual_ts = self._compute_diff(auto_ts, synth_ts)
        return auto_ts, manual_ts

208
209
    def _onthefly(self, cnx, table, revision_date,
                  from_value_date=None, to_value_date=None):
210
211
212
        qfilter = []
        if revision_date:
            qfilter.append(lambda cset, _: cset.c.insertion_date <= revision_date)
213
214
215
        return self._build_snapshots_upto(cnx, table, qfilter,
                                          from_value_date=from_value_date,
                                          to_value_date=to_value_date)
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267

    # public API redefinition

    def get(self, cnx, name, revision_date=None):
        table = self._get_ts_table(cnx, name)
        if table is None:
            return

        if revision_date:
            auto, residualmanual = self._onthefly(cnx, table, revision_date)
            ts = self._apply_diff(auto, residualmanual)
        else:
            # fetch the top-level snapshot
            synthetic = self._latest_item(cnx, table, 'snapshot')
            if synthetic is None: # head just got chopped
                ts = self._build_snapshot_upto(cnx, table)
            else:
                ts = self._deserialize(synthetic, name)

        if ts is not None:
            ts.name = name
            ts = ts[~ts.isnull()]
        return ts

    # updated to account for the possibility of stripping changesets
    # of their series diffs

    def _diff(self, cn, csetid, name):
        table = self._get_ts_table(cn, name)
        cset = self.schema.changeset

        def filtercset(sql):
            return sql.where(table.c.csid == cset.c.id
            ).where(cset.c.id == csetid)

        sql = filtercset(select([table.c.id]))
        tsid = cn.execute(sql).scalar()

        # that guy was stripped
        if tsid is None:
            return pd.Series()

        if tsid == 1:
            sql = select([table.c.snapshot])
        else:
            sql = select([table.c.diff])
        sql = filtercset(sql)

        return self._deserialize(cn.execute(sql).scalar(), name)

    # supervision specific API

268
269
    def get_ts_marker(self, cnx, name, revision_date=None,
                      from_value_date=None, to_value_date=None):
270
271
272
273
        table = self._get_ts_table(cnx, name)
        if table is None:
            return None, None

274
275
276
        auto, manual = self._onthefly(cnx, table, revision_date,
                                      from_value_date=from_value_date,
                                      to_value_date=to_value_date)
277
278
279
280
281
282
283
284
285
286
287
288
289
290
        unionindex = join_index(auto, manual)
        if unionindex is None:
            # this means both series are empty
            return None, None

        mask_manual = pd.Series([False], index=unionindex)
        if manual is not None:
            mask_manual[manual.index] = True
            mask_manual.name = name

        ts = self._apply_diff(auto, manual)
        ts = ts[~ts.isnull()]
        ts.name = name
        return ts, mask_manual