Commit 9b725370 authored by Aurélien Campéas's avatar Aurélien Campéas
Browse files

supervision: only pay the price if we do it

We don't want to pay for a double storage (using the `upstream`
namespace) if we don't use it.

We have to use bits of metadata and introduce an explicit
supervision state: `unsupervised`, `supervised` and `handcrafted`.

`handcrafted` comes from manual=True first insertion. Current
semantics should be reviewed.
parent df51059f9390
......@@ -27,8 +27,13 @@ def genserie(start, freq, repeat, initval=None, tz=None, name=None):
def test_rename(engine, tsh):
assert tsh.supervision_status(engine, 'rename-me') == 'unsupervised'
tsh.insert(engine, genserie(datetime(2010, 1, 1), 'D', 3),
'rename-me', 'Babar')
assert tsh.supervision_status(engine, 'rename-me') == 'unsupervised'
tsh.insert(engine, genserie(datetime(2010, 1, 2), 'D', 3),
'rename-me', 'Babar', manual=True)
assert tsh.supervision_status(engine, 'rename-me') == 'supervised'
tsh.rename(engine, 'rename-me', 'renamed')
tsh._resetcaches()
......@@ -133,6 +138,8 @@ def test_manual_overrides(engine, tsh):
ts_begin.loc['2010-01-04'] = -1
tsh.insert(engine, ts_begin, 'ts_mixte', 'test')
assert tsh.supervision_status(engine, 'ts_mixte') == 'unsupervised'
# -1 represents bogus upstream data
assert_df("""
2010-01-01 2.0
......@@ -174,12 +181,37 @@ def test_manual_overrides(engine, tsh):
2010-01-06 2.0
2010-01-07 2.0
""", tsh.get(engine, 'ts_mixte'))
assert tsh.supervision_status(engine, 'ts_mixte') == 'unsupervised'
assert tsh.upstream.get(engine, 'ts_mixte') is None
# edit the bogus upstream data: -1 -> 3
# also edit the next value
ts_manual = genserie(datetime(2010, 1, 4), 'D', 2, [3])
tsh.insert(engine, ts_manual, 'ts_mixte', 'test', manual=True)
tsh.get_ts_marker(engine, 'ts_mixte')
assert tsh.supervision_status(engine, 'ts_mixte') == 'supervised'
upstream = tsh.upstream.get(engine, 'ts_mixte')
assert_df("""
2010-01-01 2.0
2010-01-02 2.0
2010-01-03 2.0
2010-01-04 -1.0
2010-01-05 2.0
2010-01-06 2.0
2010-01-07 2.0
""", upstream)
ts, marker = tsh.get_ts_marker(engine, 'ts_mixte')
assert_df("""
2010-01-01 False
2010-01-02 False
2010-01-03 False
2010-01-04 True
2010-01-05 True
2010-01-06 False
2010-01-07 False
""", marker)
assert_df("""
2010-01-01 2.0
......@@ -189,7 +221,7 @@ def test_manual_overrides(engine, tsh):
2010-01-05 3.0
2010-01-06 2.0
2010-01-07 2.0
""", tsh.get(engine, 'ts_mixte'))
""", ts)
# refetch upstream: the fixed value override must remain in place
assert -1 == ts_begin['2010-01-04']
......@@ -334,7 +366,7 @@ def test_manual_overrides(engine, tsh):
""", manual)
def test_first_manual(engine, tsh):
def test_handcrafted(engine, tsh):
ts_begin = genserie(datetime(2010, 1, 1), 'D', 10)
tsh.insert(engine, ts_begin, 'ts_only', 'test', manual=True)
......@@ -351,28 +383,24 @@ def test_first_manual(engine, tsh):
2010-01-10 9.0
""", tsh.get(engine, 'ts_only'))
# we should detect the emission of a message
tsh.insert(engine, ts_begin, 'ts_only', 'test', manual=True)
ts_slight_variation = ts_begin.copy()
ts_slight_variation.iloc[3] = 0
ts_slight_variation.iloc[6] = 0
tsh.insert(engine, ts_slight_variation, 'ts_only', 'test')
assert_df("""
2010-01-01 0.0
2010-01-02 1.0
2010-01-03 2.0
2010-01-04 3.0
2010-01-04 0.0
2010-01-05 4.0
2010-01-06 5.0
2010-01-07 6.0
2010-01-07 0.0
2010-01-08 7.0
2010-01-09 8.0
2010-01-10 9.0
""", tsh.get(engine, 'ts_only'))
ts_slight_variation = ts_begin.copy()
ts_slight_variation.iloc[3] = 0
ts_slight_variation.iloc[6] = 0
tsh.insert(engine, ts_slight_variation, 'ts_only', 'test')
tsh.get(engine, 'ts_only').to_string().strip()
# should be a noop
tsh.insert(engine, ts_slight_variation, 'ts_only', 'test', manual=True)
_, marker = tsh.get_ts_marker(engine, 'ts_only')
......
......@@ -52,15 +52,69 @@ class timeseries(basets):
upstream series.
"""
metakeys = {
'tzaware',
'index_type',
'index_dtype',
'value_dtype',
'value_type',
# novelty
'supervision_status'
}
supervision_states = ('unsupervised', 'supervised', 'handcrafted')
def __init__(self, *a, **kw):
super().__init__(*a, **kw)
self.upstream = basets(namespace='{}-upstream'.format(self.namespace))
def supervision_status(self, cn, name):
meta = self.metadata(cn, name)
if meta:
return meta.get('supervision_status', 'unsupervised')
return 'unsupervised'
@tx
def insert(self, cn, ts, name, author,
metadata=None,
_insertion_date=None, manual=False):
if not self.exists(cn, name):
# initial insert
diff = super().insert(
cn, ts, name, author,
metadata=metadata,
_insertion_date=_insertion_date
)
# the super call create the initial meta, let's complete it
meta = self.metadata(cn, name)
meta['supervision_status'] = 'handcrafted' if manual else 'unsupervised'
self.update_metadata(cn, name, meta, internal=True)
return diff
supervision_status = self.supervision_status(cn, name)
if supervision_status == 'unsupervised':
if manual:
# first supervised insert
# let's take a copy of the current series state
# into upstream and proceed forward
current = self.get(cn, name)
self.upstream.insert(
cn, current, name, author,
metadata=metadata,
_insertion_date=_insertion_date
)
# update supervision status
meta = self.metadata(cn, name)
meta['supervision_status'] = 'supervised'
self.update_metadata(cn, name, meta, internal=True)
# now insert what we got
return super().insert(
cn, ts, name, author,
metadata=metadata,
_insertion_date=_insertion_date
)
assert supervision_status in ('supervised', 'handcrafted')
if manual:
diff = ts
else:
......@@ -70,6 +124,13 @@ class timeseries(basets):
metadata=metadata,
_insertion_date=_insertion_date
)
if supervision_status == 'handcrafted':
# update supervision status
meta = self.metadata(cn, name)
meta['supervision_status'] = 'supervised'
self.update_metadata(cn, name, meta, internal=True)
if diff is None:
return
......@@ -119,15 +180,28 @@ class timeseries(basets):
if table is None:
return None, None
upstreamtsh = self.upstream
upstream = upstreamtsh.get(
edited = self.get(
cn, name,
revision_date=revision_date,
from_value_date=from_value_date,
to_value_date=to_value_date,
_keep_nans=True
)
edited = self.get(
if edited is None:
# because of a revision_date
return None, None
supervision = self.supervision_status(cn, name)
if supervision in ('unsupervised', 'handcrafted'):
flags = pd.Series(
[supervision == 'handcrafted'] * len(edited.index),
index=edited.index
)
flags.name = name
return edited.dropna(), flags
upstreamtsh = self.upstream
upstream = upstreamtsh.get(
cn, name,
revision_date=revision_date,
from_value_date=from_value_date,
......@@ -141,7 +215,10 @@ class timeseries(basets):
# this means both series are empty
return None, None
mask_manual = pd.Series([False] * len(unionindex), index=unionindex)
mask_manual = pd.Series(
[False] * len(unionindex),
index=unionindex
)
if manual is not None:
mask_manual[manual.index] = True
mask_manual.name = name
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment