Commit c2dccfb4 authored by Aurélien Campéas's avatar Aurélien Campéas
Browse files

tsio: reimplement the supervision logic using only the high level API

We now don't need the very complicated logic/code we had.

Resolves #1
parent d19b017142b7
......@@ -5,7 +5,7 @@ from sqlalchemy import create_engine, MetaData
from pytest_sa_pg import db
from tshistory.schema import init, reset
from tshistory.schema import init, reset, delete_schema
from tshistory_supervision.tsio import TimeSerie
DATADIR = Path(__file__).parent / 'test' / 'data'
......@@ -20,8 +20,10 @@ def engine(request):
meta = MetaData()
with e.connect() as cn:
reset(cn)
delete_schema(e, 'automatic')
with e.connect() as cn:
init(cn, meta)
init(cn, meta, 'automatic')
yield e
......
......@@ -42,7 +42,7 @@ def test_mercure_serie(engine, tsh):
""", pd.read_sql('select id, name, table_name from tsh.registry', engine))
assert_df("""
csid serie
cset serie
0 1 42
""", pd.read_sql('select * from tsh.changeset_series', engine))
......@@ -178,8 +178,7 @@ def test_manual_overrides(engine, tsh):
# edit the bogus upstream data: -1 -> 3
# also edit the next value
ts_manual = genserie(datetime(2010, 1, 4), 'D', 2, [3])
tsh.insert(engine, ts_manual, 'ts_mixte', 'test',
extra_scalars=dict(manual=True))
tsh.insert(engine, ts_manual, 'ts_mixte', 'test', manual=True)
tsh.get_ts_marker(engine, 'ts_mixte')
assert_df("""
......@@ -235,8 +234,7 @@ def test_manual_overrides(engine, tsh):
# another iterleaved editing session
ts_edit = genserie(datetime(2010, 1, 4), 'D', 1, [2])
tsh.insert(engine, ts_edit, 'ts_mixte', 'test',
extra_scalars=dict(manual=True))
tsh.insert(engine, ts_edit, 'ts_mixte', 'test', manual=True)
assert 2 == tsh.get(engine, 'ts_mixte')['2010-01-04'] # still
ts, marker = tsh.get_ts_marker(engine, 'ts_mixte')
......@@ -253,8 +251,7 @@ def test_manual_overrides(engine, tsh):
# another iterleaved editing session
drange = pd.date_range(start=datetime(2010, 1, 4), periods=1)
ts_edit = pd.Series([4], index=drange)
tsh.insert(engine, ts_edit, 'ts_mixte', 'test',
extra_scalars=dict(manual=True))
tsh.insert(engine, ts_edit, 'ts_mixte', 'test', manual=True)
assert 4 == tsh.get(engine, 'ts_mixte')['2010-01-04'] # still
ts_auto_resend_the_same = pd.Series([2], index=drange)
......@@ -282,12 +279,10 @@ def test_manual_overrides(engine, tsh):
""", ts_auto)
ts_manual = genserie(datetime(2010, 1, 5), 'D', 2, [3])
tsh.insert(engine, ts_manual, 'ts_mixte', 'test',
extra_scalars=dict(manual=True))
tsh.insert(engine, ts_manual, 'ts_mixte', 'test', manual=True)
ts_manual = genserie(datetime(2010, 1, 9), 'D', 1, [3])
tsh.insert(engine, ts_manual, 'ts_mixte', 'test',
extra_scalars=dict(manual=True))
tsh.insert(engine, ts_manual, 'ts_mixte', 'test', manual=True)
tsh.insert(engine, ts_auto, 'ts_mixte', 'test')
upstream_fix = pd.Series([2.5], index=[datetime(2010, 1, 5)])
......@@ -320,8 +315,7 @@ def test_manual_overrides(engine, tsh):
# just another override for the fun
ts_manual.iloc[0] = 4
tsh.insert(engine, ts_manual, 'ts_mixte', 'test',
extra_scalars=dict(manual=True))
tsh.insert(engine, ts_manual, 'ts_mixte', 'test', manual=True)
assert_df("""
2010-01-01 2.0
2010-01-02 2.0
......@@ -336,8 +330,7 @@ def test_manual_overrides(engine, tsh):
def test_first_manual(engine, tsh):
ts_begin = genserie(datetime(2010, 1, 1), 'D', 10)
tsh.insert(engine, ts_begin, 'ts_only', 'test',
extra_scalars=dict(manual=True))
tsh.insert(engine, ts_begin, 'ts_only', 'test', manual=True)
assert_df("""
2010-01-01 0.0
......@@ -353,8 +346,7 @@ def test_first_manual(engine, tsh):
""", tsh.get(engine, 'ts_only'))
# we should detect the emission of a message
tsh.insert(engine, ts_begin, 'ts_only', 'test',
extra_scalars=dict(manual=True))
tsh.insert(engine, ts_begin, 'ts_only', 'test', manual=True)
assert_df("""
2010-01-01 0.0
......@@ -376,8 +368,7 @@ def test_first_manual(engine, tsh):
tsh.get(engine, 'ts_only').to_string().strip()
# should be a noop
tsh.insert(engine, ts_slight_variation, 'ts_only', 'test',
extra_scalars=dict(manual=True))
tsh.insert(engine, ts_slight_variation, 'ts_only', 'test', manual=True)
_, marker = tsh.get_ts_marker(engine, 'ts_only')
assert_df("""
......@@ -393,19 +384,6 @@ def test_first_manual(engine, tsh):
2010-01-10 False
""", marker)
with engine.connect() as cn:
cn.execute('set search_path to "tsh.timeserie"')
df = pd.read_sql("select id, manual, diff, snapshot, autosnapshot from ts_only",
cn)
for attr in ('diff', 'snapshot', 'autosnapshot'):
df[attr] = df[attr].apply(lambda x: 0 if x is None else len(x))
assert """
id manual diff snapshot autosnapshot
0 1 True 0 82 0
1 2 False 84 84 84
""".strip() == df.to_string().strip()
def test_more_manual(engine, tsh):
ts = genserie(datetime(2015, 1, 1), 'D', 5)
......@@ -414,8 +392,7 @@ def test_more_manual(engine, tsh):
ts_man = genserie(datetime(2015, 1, 3), 'D', 3, -1)
ts_man.iloc[-1] = np.nan
# erasing of the laste value for the date 5/1/2015
tsh.insert(engine, ts_man, 'ts_exp1', 'test',
extra_scalars={'manual': True})
tsh.insert(engine, ts_man, 'ts_exp1', 'test', manual=True)
ts_get = tsh.get(engine, 'ts_exp1')
......@@ -423,7 +400,8 @@ def test_more_manual(engine, tsh):
2015-01-01 0.0
2015-01-02 1.0
2015-01-03 -3.0
2015-01-04 -3.0""", ts_get)
2015-01-04 -3.0
""", ts_get)
ts_marker, marker = tsh.get_ts_marker(engine, 'ts_exp1')
assert ts_marker.equals(ts_get)
......@@ -432,55 +410,8 @@ def test_more_manual(engine, tsh):
2015-01-02 False
2015-01-03 True
2015-01-04 True
2015-01-05 True""", marker)
def test_revision_date(engine, tsh):
ts = genserie(datetime(2010, 1, 4), 'D', 4, [1], name='truc')
tsh.insert(engine, ts, 'ts_through_time', 'test',
_insertion_date=datetime(2015, 1, 1, 15, 43, 23))
ts = genserie(datetime(2010, 1, 4), 'D', 4, [2], name='truc')
tsh.insert(engine, ts, 'ts_through_time', 'test',
_insertion_date=datetime(2015, 1, 2, 15, 43, 23))
ts = genserie(datetime(2010, 1, 4), 'D', 4, [3], name='truc')
tsh.insert(engine, ts, 'ts_through_time', 'test',
_insertion_date=datetime(2015, 1, 3, 15, 43, 23))
ts = tsh.get(engine, 'ts_through_time')
assert_df("""
2010-01-04 3.0
2010-01-05 3.0
2010-01-06 3.0
2010-01-07 3.0
""", ts)
ts = tsh.get(engine, 'ts_through_time',
revision_date=datetime(2015, 1, 2, 18, 43, 23))
assert_df("""
2010-01-04 2.0
2010-01-05 2.0
2010-01-06 2.0
2010-01-07 2.0
""", ts)
ts = tsh.get(engine, 'ts_through_time',
revision_date=datetime(2015, 1, 1, 18, 43, 23))
assert_df("""
2010-01-04 1.0
2010-01-05 1.0
2010-01-06 1.0
2010-01-07 1.0
""", ts)
ts = tsh.get(engine, 'ts_through_time',
revision_date=datetime(2014, 1, 1, 18, 43, 23))
assert ts is None
2015-01-05 True
""", marker)
def test_before_first_insertion(engine, tsh):
......
......@@ -39,232 +39,41 @@ class TimeSerie(BaseTS):
value.
We can explain the workflow like with a traditional DVCS graph,
with two branches: "automatic" and "manual".
with two branches: "automatic" and "synthetic".
All automatic fetches go into the automatic branch (and thus are
diffed against each other).
The manual series are rooted from the (current) top of the
automatic series, but live in their own branch.
As soon as a new automatic serie is inserted, it is also *merged*
on top of the manual branch.
Hence, the manual branch contains all the series + edits, and
whenever an automatic serie fixes an old error, it is merged into
the series + edits branch, which contains the correct synthesis.
The concrete implementation is not Hg/Git since it uses a
single-parent model. We use filtering and a modicum of ad-hoc
transformations to achieve the desired effect.
The synthetic series receive all the non-empty differences
resulting from inserting to the automatic series, and also
all the manual entries.
The manual editions can be computed as a diff between synthetic
and automatic series.
"""
_saveme = None
_snapshot_interval = 100
def insert(self, cn, ts, name, author=None,
_insertion_date=None,
extra_scalars={}):
initial_insertion = not self.exists(cn, name)
if initial_insertion and not extra_scalars.get('manual', False):
if ts.isnull().all():
return None
ts = ts[~ts.isnull()]
self._saveme = {'autosnapshot': ts}
diff = super(TimeSerie, self).insert(cn, ts, name, author=author,
_insertion_date=_insertion_date,
extra_scalars=extra_scalars)
return diff
# log
def log(self, cn, *args, **kw):
logs = super(TimeSerie, self).log(cn, *args, **kw)
for rev in logs:
rev['manual'] = attrs = {}
for name in rev['names']:
attrs[name] = self._manual_value(cn, rev['rev'], name)
return logs
def _manual_value(self, cn, csetid, seriename):
table = self._table_definition_for(seriename)
sql = select([table.c.manual]).where(table.c.csid == csetid)
return cn.execute(sql).scalar()
# /log
def _table_definition_for(self, tablename):
tdef = super(TimeSerie, self)._table_definition_for(tablename)
tdef.append_column(Column('manual', Boolean, default=False, index=True))
tdef.append_column(Column('autosnapshot', BYTEA))
return tdef
def _complete_insertion_value(self, value, extra_scalars):
if extra_scalars:
value.update(extra_scalars)
if self._saveme is not None:
value.update({k: self._serialize(v)
for k, v in self._saveme.items()}
def insert(self, cn, ts, name, author, _insertion_date=None, manual=False):
if manual:
# insert & compute diff over synthetic
return super().insert(
cn, ts, name, author, _insertion_date=_insertion_date
)
self._saveme = None
def _latest_item(self, cn, table, column):
# fetch the top-level things (e.g. snapshot, autosnapshot)
sql = select([table.c[column]]
).order_by(desc(table.c.id)
).limit(1)
return cn.execute(sql).scalar()
def _purge_snapshot_at(self, cn, table, diffid):
cn.execute(
table.update(
).where(table.c.id == diffid
).values(snapshot=None, autosnapshot=None)
# insert into synthetic & compute diff over automatic
basetsh = BaseTS(namespace='automatic')
diff = basetsh.insert(
cn, ts, name, author,
_insertion_date=_insertion_date
)
def _compute_diff_and_newsnapshot(self, cn, table, newts, manual=False):
auto = self._latest_item(cn, table, 'autosnapshot')
if auto is None:
auto = self._build_snapshot_upto(cn, table,
[lambda _, table: table.c.manual == False])
else:
auto = self._deserialize(auto, table.name)
synthetic = self._build_snapshot_upto(cn, table)
self._validate_type(auto, newts, table.name)
self._validate_type(synthetic, newts, table.name)
# this is the diff between our computed parent
diff = self._compute_diff(synthetic if manual else auto,
newts)
if len(diff) == 0:
return None, None
# maintain the auto snapshot
self._saveme = {
'autosnapshot': auto if manual else self._apply_diff(auto, diff)
}
return diff, self._apply_diff(synthetic, diff)
# we still need a full-blown history reconstruction routine
# for arbitrary revision_dates
def _build_snapshots_upto(self, cn, table, qfilter,
from_value_date=None, to_value_date=None):
snapid, synthsnap = self._find_snapshot(cn, table, qfilter,
from_value_date=from_value_date,
to_value_date=to_value_date)
auto_snapid, autosnap = self._find_snapshot(cn, table, qfilter,
column='autosnapshot',
from_value_date=from_value_date,
to_value_date=to_value_date)
if snapid is None:
return None, None # yes, we can be asked fancy revision dates
if auto_snapid is not None:
assert snapid == auto_snapid
cset = self.schema.changeset
sql = select([table.c.id,
table.c.diff,
table.c.parent,
table.c.manual,
cset.c.insertion_date]
).order_by(table.c.id
).where(table.c.csid == cset.c.id
).where(table.c.id > snapid)
for filtercb in qfilter:
sql = sql.where(filtercb(cset, table))
alldiffs = pd.read_sql(sql, cn)
if len(alldiffs) == 0:
manual_ts = self._compute_diff(autosnap, synthsnap)
return autosnap, manual_ts
# rebuild automatic & manual residual starting
# from the last known (synthetic) state
synth_ts = synthsnap
auto_ts = autosnap if autosnap is not None else pd.Series()
for _, row in alldiffs.iterrows():
diff = self._deserialize(row['diff'], table.name)
if row['manual']:
synth_ts = self._apply_diff(synth_ts, diff)
else:
auto_ts = self._apply_diff(auto_ts, diff)
# merging auto into manual
# we erase all the elements that have been edited
# by the auto diff
synth_ts = synth_ts[~synth_ts.index.isin(diff.index)]
manual_ts = self._compute_diff(auto_ts, synth_ts)
return auto_ts, manual_ts
def _onthefly(self, cn, table, revision_date,
from_value_date=None, to_value_date=None):
qfilter = []
if revision_date:
qfilter.append(lambda cset, _: cset.c.insertion_date <= revision_date)
return self._build_snapshots_upto(cn, table, qfilter,
from_value_date=from_value_date,
to_value_date=to_value_date)
# public API redefinition
def get(self, cn, name, revision_date=None):
table = self._get_ts_table(cn, name)
if table is None:
if diff is None:
return
if revision_date:
auto, residualmanual = self._onthefly(cn, table, revision_date)
ts = self._apply_diff(auto, residualmanual)
else:
# fetch the top-level snapshot
synthetic = self._latest_item(cn, table, 'snapshot')
if synthetic is None: # head just got chopped
ts = self._build_snapshot_upto(cn, table)
else:
ts = self._deserialize(synthetic, name)
if ts is not None:
ts.name = name
ts = ts[~ts.isnull()]
return ts
# updated to account for the possibility of stripping changesets
# of their series diffs
def _diff(self, cn, csetid, name):
table = self._get_ts_table(cn, name)
cset = self.schema.changeset
def filtercset(sql):
return sql.where(table.c.csid == cset.c.id
).where(cset.c.id == csetid)
sql = filtercset(select([table.c.id]))
tsid = cn.execute(sql).scalar()
# that guy was stripped
if tsid is None:
return pd.Series()
if tsid == 1:
sql = select([table.c.snapshot])
else:
sql = select([table.c.diff])
sql = filtercset(sql)
return self._deserialize(cn.execute(sql).scalar(), name)
# insert the diff over automatic into synthetic
a = super().insert(
cn, diff, name, author,
_insertion_date=_insertion_date
)
return a
# supervision specific API
......@@ -274,9 +83,19 @@ class TimeSerie(BaseTS):
if table is None:
return None, None
auto, manual = self._onthefly(cn, table, revision_date,
from_value_date=from_value_date,
to_value_date=to_value_date)
autotsh = BaseTS(namespace='automatic')
auto = autotsh.get(cn, name,
revision_date=revision_date,
from_value_date=from_value_date,
to_value_date=to_value_date,
_keep_nans=True)
synth = self.get(cn, name,
revision_date=revision_date,
from_value_date=from_value_date,
to_value_date=to_value_date,
_keep_nans=True)
manual = self.diff(auto, synth)
unionindex = join_index(auto, manual)
if unionindex is None:
# this means both series are empty
......@@ -287,7 +106,9 @@ class TimeSerie(BaseTS):
mask_manual[manual.index] = True
mask_manual.name = name
ts = self._apply_diff(auto, manual)
ts = ts[~ts.isnull()]
ts = self.get(cn, name,
revision_date=revision_date,
from_value_date=from_value_date,
to_value_date=to_value_date)
ts.name = name
return ts, mask_manual
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment