Commit d9273ce1 authored by Aurélien Campéas's avatar Aurélien Campéas
Browse files

tsio: switch from functions to methods of a `TimeSerie` object

This will help a lot when specializing/extending functionality.
parent 3ac9de8be62b
......@@ -7,16 +7,18 @@ import pandas as pd
import numpy as np
from mock import patch
from tshistory.tsio import insert_ts, get_ts, delete_last_diff
from tshistory.tsio import TimeSerie
DATADIR = Path(__file__).parent / 'data'
def test_differential(engine):
# instantiate one time serie handler object
tso = TimeSerie()
ts_begin = pd.Series(range(10))
ts_begin.index = pd.date_range(start=datetime(2010, 1, 1), freq='D', periods=10)
insert_ts(engine, ts_begin, 'ts_test', 'test')
tso.insert(engine, ts_begin, 'ts_test', 'test')
assert """
2010-01-01 0.0
......@@ -29,10 +31,10 @@ def test_differential(engine):
2010-01-08 7.0
2010-01-09 8.0
2010-01-10 9.0
""".strip() == get_ts(engine, 'ts_test').to_string().strip()
""".strip() == tso.get(engine, 'ts_test').to_string().strip()
# we should detect the emission of a message
insert_ts(engine, ts_begin, 'ts_test', 'babar')
tso.insert(engine, ts_begin, 'ts_test', 'babar')
assert """
2010-01-01 0.0
......@@ -45,12 +47,12 @@ def test_differential(engine):
2010-01-08 7.0
2010-01-09 8.0
2010-01-10 9.0
""".strip() == get_ts(engine, 'ts_test').to_string().strip()
""".strip() == tso.get(engine, 'ts_test').to_string().strip()
ts_slight_variation = ts_begin.copy()
ts_slight_variation.iloc[3] = 0
ts_slight_variation.iloc[6] = 0
insert_ts(engine, ts_slight_variation, 'ts_test', 'celeste')
tso.insert(engine, ts_slight_variation, 'ts_test', 'celeste')
assert """
2010-01-01 0.0
......@@ -63,7 +65,7 @@ def test_differential(engine):
2010-01-08 7.0
2010-01-09 8.0
2010-01-10 9.0
""".strip() == get_ts(engine, 'ts_test').to_string().strip()
""".strip() == tso.get(engine, 'ts_test').to_string().strip()
ts_longer = pd.Series(range(15))
ts_longer.index = pd.date_range(start=datetime(2010, 1, 3), freq='D', periods=15)
......@@ -71,7 +73,7 @@ def test_differential(engine):
ts_longer.iloc[3] = 3.14
ts_longer.iloc[5] = ts_begin.iloc[7]
insert_ts(engine, ts_longer, 'ts_test', 'test')
tso.insert(engine, ts_longer, 'ts_test', 'test')
assert """
2010-01-01 0.00
......@@ -91,13 +93,13 @@ def test_differential(engine):
2010-01-15 12.00
2010-01-16 13.00
2010-01-17 14.00
""".strip() == get_ts(engine, 'ts_test').to_string().strip()
""".strip() == tso.get(engine, 'ts_test').to_string().strip()
# start testing manual overrides
ts_begin = pd.Series([2] * 5)
ts_begin.index = pd.date_range(start=datetime(2010, 1, 1), freq='D', periods=5)
ts_begin.loc['2010-01-04'] = -1
insert_ts(engine, ts_begin, 'ts_mixte', 'test')
tso.insert(engine, ts_begin, 'ts_mixte', 'test')
# -1 represents bogus upstream data
assert """
......@@ -106,13 +108,13 @@ def test_differential(engine):
2010-01-03 2.0
2010-01-04 -1.0
2010-01-05 2.0
""".strip() == get_ts(engine, 'ts_mixte').to_string().strip()
""".strip() == tso.get(engine, 'ts_mixte').to_string().strip()
# refresh all the period + 1 extra data point
ts_more = pd.Series([2] * 5)
ts_more.index = pd.date_range(start=datetime(2010, 1, 2), freq='D', periods=5)
ts_more.loc['2010-01-04'] = -1
insert_ts(engine, ts_more, 'ts_mixte', 'test')
tso.insert(engine, ts_more, 'ts_mixte', 'test')
assert """
2010-01-01 2.0
......@@ -121,12 +123,12 @@ def test_differential(engine):
2010-01-04 -1.0
2010-01-05 2.0
2010-01-06 2.0
""".strip() == get_ts(engine, 'ts_mixte').to_string().strip()
""".strip() == tso.get(engine, 'ts_mixte').to_string().strip()
# just append an extra data point
ts_one_more = pd.Series([3]) # with no intersection with the previous ts
ts_one_more.index = pd.date_range(start=datetime(2010, 1, 7), freq='D', periods=1)
insert_ts(engine, ts_one_more, 'ts_mixte', 'test')
tso.insert(engine, ts_one_more, 'ts_mixte', 'test')
assert """
2010-01-01 2.0
......@@ -136,7 +138,7 @@ def test_differential(engine):
2010-01-05 2.0
2010-01-06 2.0
2010-01-07 3.0
""".strip() == get_ts(engine, 'ts_mixte').to_string().strip()
""".strip() == tso.get(engine, 'ts_mixte').to_string().strip()
hist = pd.read_sql('select id, parent from ts_ts_test order by id',
engine)
......@@ -173,11 +175,11 @@ def test_differential(engine):
2010-01-05 2.0
2010-01-06 2.0
2010-01-07 3.0
""".strip() == get_ts(engine, 'ts_mixte',
revision_date=datetime.now()).to_string().strip()
""".strip() == tso.get(engine, 'ts_mixte',
revision_date=datetime.now()).to_string().strip()
# test striping the last diff
delete_last_diff(engine, 'ts_mixte')
tso.delete_last_diff(engine, 'ts_mixte')
assert """
2010-01-01 2.0
......@@ -186,41 +188,45 @@ def test_differential(engine):
2010-01-04 -1.0
2010-01-05 2.0
2010-01-06 2.0
""".strip() == get_ts(engine, 'ts_mixte').to_string().strip()
""".strip() == tso.get(engine, 'ts_mixte').to_string().strip()
def test_bad_import(engine):
# instantiate one time serie handler object
tso = TimeSerie()
# the data were parsed as date by pd.read_json()
df_result = pd.read_csv(DATADIR / 'test_data.csv')
df_result['Gas Day'] = df_result['Gas Day'].apply(parser.parse, dayfirst=True, yearfirst=False)
df_result.set_index('Gas Day', inplace=True)
ts = df_result['SC']
insert_ts(engine, ts, 'SND_SC', 'test')
result = get_ts(engine, 'SND_SC')
tso.insert(engine, ts, 'SND_SC', 'test')
result = tso.get(engine, 'SND_SC')
assert result.dtype == 'float64'
# insertion of empty ts
ts = pd.Series(name='truc', dtype='object')
insert_ts(engine, ts, 'empty_ts', 'test')
assert get_ts(engine, 'empty_ts') is None
tso.insert(engine, ts, 'empty_ts', 'test')
assert tso.get(engine, 'empty_ts') is None
# nan in ts
# all na
ts = pd.Series([np.nan] * 10,
index=pd.date_range(start=datetime(2010, 1, 10),
freq='D', periods=10), name='truc')
insert_ts(engine, ts, 'test_nan', 'test')
assert get_ts(engine, 'test_nan') is None
tso.insert(engine, ts, 'test_nan', 'test')
assert tso.get(engine, 'test_nan') is None
# mixe na
ts = pd.Series([np.nan] * 5 + [3] * 5,
index=pd.date_range(start=datetime(2010, 1, 10),
freq='D', periods=10), name='truc')
insert_ts(engine, ts, 'test_nan', 'test')
result = get_ts(engine, 'test_nan')
tso.insert(engine, ts, 'test_nan', 'test')
result = tso.get(engine, 'test_nan')
insert_ts(engine, ts, 'test_nan', 'test')
result = get_ts(engine, 'test_nan')
tso.insert(engine, ts, 'test_nan', 'test')
result = tso.get(engine, 'test_nan')
assert """
2010-01-15 3.0
2010-01-16 3.0
......@@ -231,17 +237,20 @@ def test_bad_import(engine):
# get_ts with name not in database
get_ts(engine, 'inexisting_name', 'test')
tso.get(engine, 'inexisting_name', 'test')
def test_revision_date(engine):
# instantiate one time serie handler object
tso = TimeSerie()
with patch('tshistory.tsio.datetime') as mock_date:
mock_date.now.return_value = datetime(2015, 1, 1, 15, 43, 23)
ts = pd.Series([1] * 4,
index=pd.date_range(start=datetime(2010, 1, 4),
freq='D', periods=4), name='truc')
insert_ts(engine, ts, 'ts_through_time', 'test')
tso.insert(engine, ts, 'ts_through_time', 'test')
with patch('tshistory.tsio.datetime') as mock_date:
mock_date.now.return_value = datetime(2015, 1, 2, 15, 43, 23)
......@@ -249,7 +258,7 @@ def test_revision_date(engine):
ts = pd.Series([2] * 4,
index=pd.date_range(start=datetime(2010, 1, 4),
freq='D', periods=4), name='truc')
insert_ts(engine, ts, 'ts_through_time', 'test')
tso.insert(engine, ts, 'ts_through_time', 'test')
with patch('tshistory.tsio.datetime') as mock_date:
mock_date.now.return_value = datetime(2015, 1, 3, 15, 43, 23)
......@@ -257,9 +266,9 @@ def test_revision_date(engine):
ts = pd.Series([3] * 4,
index=pd.date_range(start=datetime(2010, 1, 4),
freq='D', periods=4), name='truc')
insert_ts(engine, ts, 'ts_through_time', 'test')
tso.insert(engine, ts, 'ts_through_time', 'test')
ts = get_ts(engine, 'ts_through_time')
ts = tso.get(engine, 'ts_through_time')
assert """
2010-01-04 3.0
......@@ -268,8 +277,8 @@ def test_revision_date(engine):
2010-01-07 3.0
""".strip() == ts.to_string().strip()
ts = get_ts(engine, 'ts_through_time',
revision_date=datetime(2015, 1, 2, 18, 43, 23) )
ts = tso.get(engine, 'ts_through_time',
revision_date=datetime(2015, 1, 2, 18, 43, 23) )
assert """
2010-01-04 2.0
......@@ -278,8 +287,8 @@ def test_revision_date(engine):
2010-01-07 2.0
""".strip() == ts.to_string().strip()
ts = get_ts(engine, 'ts_through_time',
revision_date=datetime(2015, 1, 1, 18, 43, 23))
ts = tso.get(engine, 'ts_through_time',
revision_date=datetime(2015, 1, 1, 18, 43, 23))
assert """
2010-01-04 1.0
......@@ -288,8 +297,8 @@ def test_revision_date(engine):
2010-01-07 1.0
""".strip() == ts.to_string().strip()
ts = get_ts(engine, 'ts_through_time',
revision_date=datetime(2014, 1, 1, 18, 43, 23))
ts = tso.get(engine, 'ts_through_time',
revision_date=datetime(2014, 1, 1, 18, 43, 23))
assert ts is None
......@@ -11,85 +11,6 @@ from tshistory.schema import make_ts_table, get_ts_table
PRECISION = 1e-14
def insert_ts(engine, newts, name, author):
"""Create a new revision of a given time series
ts: pandas.Series with date index and float values
name: str unique identifier of the serie
author: str free-form author name
"""
assert isinstance(newts, pd.Series)
newts = newts[~newts.isnull()] # wipe the the NaNs
if len(newts):
assert newts.index.dtype.name == 'datetime64[ns]'
else:
return
newts = newts.astype('float64')
newts.name = name
with engine.connect() as cnx:
table = get_ts_table(cnx, name)
if table is None:
# initial insertion
table = make_ts_table(cnx, name)
jsonts = tojson(newts)
value = {
'data': jsonts,
'snapshot': jsonts,
'insertion_date': datetime.now(),
'author': author
}
cnx.execute(table.insert().values(value))
print('Fisrt insertion of %s by %s' % (name, author))
return
snapshot, tip_id = _get_snapshot(cnx, table)
# this is the diff between our computed parent
diff = compute_diff(snapshot, newts)
if len(diff) == 0:
print('No difference in %s by %s' % (name, author))
return
assert tip_id is not None
# full state computation & insertion
newsnapshot = apply_diff(snapshot, diff)
value = {
'data': tojson(diff),
'snapshot': tojson(newsnapshot),
'insertion_date': datetime.now(),
'author': author,
'parent': tip_id,
}
cnx.execute(table.insert().values(value))
cnx.execute(
table.update(
).where(table.c.id == tip_id
).values(snapshot=None)
)
print('Insertion differential of %s by %s' % (name, author))
def get_ts(cnx, name, revision_date=None):
"""Compute the top-most timeseries of a given name
with manual overrides applied
"""
table = get_ts_table(cnx, name)
if table is None:
return
if revision_date is None:
current, _ = _get_snapshot(cnx, table)
else:
current, _ = apply_diffs_upto(cnx, table, revision_date)
if current is not None:
current.name = name
return current
def tojson(ts):
if ts is None:
return None
......@@ -101,107 +22,188 @@ def fromjson(jsonb):
typ='series', dtype=False)
def _get_snapshot(cnx, table):
sql = select([table.c.id,
table.c.snapshot]
).order_by(desc(table.c.id)
).limit(1)
df = pd.read_sql(sql, cnx)
if len(df) == 0:
return None, None
assert len(df) == 1
diff_id = df['id'].iloc[0]
snapshot = fromjson(df['snapshot'].iloc[0])
class TimeSerie(object):
return snapshot, int(diff_id)
# API : insert, get
def insert(self, engine, newts, name, author):
"""Create a new revision of a given time series
ts: pandas.Series with date index and float values
name: str unique identifier of the serie
author: str free-form author name
"""
assert isinstance(newts, pd.Series)
newts = newts[~newts.isnull()] # wipe the the NaNs
def compute_diff(ts1, ts2):
mask_overlap = ts2.index.isin(ts1.index)
ts_bef_overlap = ts1[ts2.index[mask_overlap]]
ts_overlap = ts2[mask_overlap]
mask_equal = np.isclose(ts_bef_overlap, ts_overlap, atol=PRECISION)
ts_diff_overlap = ts2[mask_overlap][~mask_equal]
ts_diff_new = ts2[~mask_overlap]
ts_result = pd.concat([ts_diff_overlap, ts_diff_new])
return ts_result
if len(newts):
assert newts.index.dtype.name == 'datetime64[ns]'
else:
return
newts = newts.astype('float64')
newts.name = name
with engine.connect() as cnx:
table = get_ts_table(cnx, name)
if table is None:
# initial insertion
table = make_ts_table(cnx, name)
jsonts = tojson(newts)
value = {
'data': jsonts,
'snapshot': jsonts,
'insertion_date': datetime.now(),
'author': author
}
cnx.execute(table.insert().values(value))
print('Fisrt insertion of %s by %s' % (name, author))
return
snapshot, tip_id = self._get_snapshot(cnx, table)
# this is the diff between our computed parent
diff = self.compute_diff(snapshot, newts)
if len(diff) == 0:
print('No difference in %s by %s' % (name, author))
return
assert tip_id is not None
# full state computation & insertion
newsnapshot = self.apply_diff(snapshot, diff)
value = {
'data': tojson(diff),
'snapshot': tojson(newsnapshot),
'insertion_date': datetime.now(),
'author': author,
'parent': tip_id,
}
cnx.execute(table.insert().values(value))
def apply_diff(base_ts, new_ts):
"""Produce a new ts using base_ts as a base and
taking any intersecting and new values from new_ts
"""
if base_ts is None:
return new_ts
if new_ts is None:
return base_ts
result_ts = pd.Series([0.0], index=base_ts.index.union(new_ts.index))
result_ts[base_ts.index] = base_ts
result_ts[new_ts.index] = new_ts
result_ts.sort_index(inplace=True)
return result_ts
cnx.execute(
table.update(
).where(table.c.id == tip_id
).values(snapshot=None)
)
print('Insertion differential of %s by %s' % (name, author))
def get(self, cnx, name, revision_date=None):
"""Compute the top-most timeseries of a given name
with manual overrides applied
"""
table = get_ts_table(cnx, name)
if table is None:
return
if revision_date is None:
current, _ = self._get_snapshot(cnx, table)
else:
current, _ = self.apply_diffs_upto(cnx, table, revision_date)
def apply_diffs_upto(cnx, table, revision_date=None):
sql = select([table.c.id,
table.c.data,
table.c.parent,
table.c.insertion_date]
).order_by(table.c.id)
if current is not None:
current.name = name
return current
alldiffs = pd.read_sql(sql, cnx)
def delete_last_diff(self, engine, name):
with engine.connect() as cnx:
table = get_ts_table(cnx, name)
sql = select([table.c.id,
table.c.parent]
).order_by(desc(table.c.id)
).limit(1)
if revision_date:
alldiffs = alldiffs[alldiffs['insertion_date'] <= revision_date]
diff_id, parent_id = cnx.execute(sql).fetchone()
if not diff_id:
return False
if len(alldiffs) == 0:
return None, None
sql = table.delete().where(
table.c.id == diff_id
)
cnx.execute(sql)
base = alldiffs.loc[alldiffs.loc[:, 'parent'].isnull()]
# initial ts and its id
ts = fromjson(base['data'].iloc[0])
parent_id = base['id'].iloc[0] # actually the root
# apply on flat
current, parent_id = self.apply_diffs_upto(cnx, table)
if len(alldiffs) == 1:
assert ts.index.dtype.name == 'datetime64[ns]' or len(ts) == 0
return ts, parent_id
update_snapshot_sql = table.update(
).where(table.c.id == parent_id
).values(snapshot=tojson(current))
while True:
child_row = alldiffs.loc[alldiffs.loc[:, 'parent'] == parent_id, :]
child_ts = fromjson(child_row['data'].iloc[0])
parent_id = child_row['id'].iloc[0]
ts = apply_diff(ts, child_ts)
if parent_id not in alldiffs['parent'].tolist():
return ts, int(parent_id)
cnx.execute(update_snapshot_sql)
return True
# /API
# Helpers
def delete_last_diff(engine, name):
with engine.connect() as cnx:
table = get_ts_table(cnx, name)
def _get_snapshot(self, cnx, table):
sql = select([table.c.id,
table.c.parent]
table.c.snapshot]
).order_by(desc(table.c.id)
).limit(1)
diff_id, parent_id = cnx.execute(sql).fetchone()
if not diff_id:
return False
sql = table.delete().where(
table.c.id == diff_id
)
cnx.execute(sql)
# apply on flat
current, parent_id = apply_diffs_upto(cnx, table)
update_snapshot_sql = table.update(
).where(table.c.id == parent_id
).values(snapshot=tojson(current))
cnx.execute(update_snapshot_sql)
return True
df = pd.read_sql(sql, cnx)
if len(df) == 0:
return None, None
assert len(df) == 1
diff_id = df['id'].iloc[0]
snapshot = fromjson(df['snapshot'].iloc[0])
return snapshot, int(diff_id)
def compute_diff(self, ts1, ts2):
mask_overlap = ts2.index.isin(ts1.index)
ts_bef_overlap = ts1[ts2.index[mask_overlap]]
ts_overlap = ts2[mask_overlap]
mask_equal = np.isclose(ts_bef_overlap, ts_overlap, atol=PRECISION)
ts_diff_overlap = ts2[mask_overlap][~mask_equal]
ts_diff_new = ts2[~mask_overlap]
ts_result = pd.concat([ts_diff_overlap, ts_diff_new])
return ts_result
def apply_diff(self, base_ts, new_ts):
"""Produce a new ts using base_ts as a base and
taking any intersecting and new values from new_ts
"""
if base_ts is None:
return new_ts
if new_ts is None:
return base_ts
result_ts = pd.Series([0.0], index=base_ts.index.union(new_ts.index))
result_ts[base_ts.index] = base_ts
result_ts[new_ts.index] = new_ts
result_ts.sort_index(inplace=True)
return result_ts
def apply_diffs_upto(self, cnx, table, revision_date=None):
sql = select([table.c.id,
table.c.data,
table.c.parent,
table.c.insertion_date]
).order_by(table.c.id)
alldiffs = pd.read_sql(sql, cnx)
if revision_date:
alldiffs = alldiffs[alldiffs['insertion_date'] <= revision_date]
if len(alldiffs) == 0:
return None, None
base = alldiffs.loc[alldiffs.loc[:, 'parent'].isnull()]
# initial ts and its id
ts = fromjson(base['data'].iloc[0])
parent_id = base['id'].iloc[0] # actually the root