Commit 8ca087b6 authored by Aurélien Campéas's avatar Aurélien Campéas
Browse files

first release of ts history

parents
from pathlib import Path
from pytest_sa_pg.fixture import engine_fixture
from tshistory import schema
DATADIR = Path(__file__).parent / 'test' / 'data'
DBURI = 'postgresql://localhost:5433/postgres'
engine = engine_fixture(schema.meta, DATADIR, 5433)
from pathlib import Path
from inireader.config import config
from sqlalchemy import create_engine
from tshistory.schema import meta
def init_db():
here = Path(__file__).parent
cfg = config(str(here / 'data_hub.cfg'))
dburi = cfg['db']['uri']
print(dburi)
engine = create_engine(dburi)
meta.drop_all(engine)
meta.create_all(engine)
if __name__ == '__main__':
response = input('Are you sure. This will erase the database (y/N)')
if response.upper() == 'Y':
print('creating the db schema ...')
init_db()
from sqlalchemy import (Table, Column, Integer, String, DateTime,
MetaData, ForeignKey, UniqueConstraint)
from sqlalchemy.dialects.postgresql import JSONB
meta = MetaData()
ts_revlog = Table(
'ts_revlog', meta,
Column('id', Integer, primary_key=True),
Column('name', String, index=True, nullable=False),
Column('author', String, index=True, nullable=False),
Column('insertion_date', DateTime, index=True, nullable=False),
Column('data', JSONB, nullable=False),
Column('snapshot', JSONB),
Column('parent',
Integer,
ForeignKey('ts_revlog.id', ondelete='cascade'),
nullable=True,
index=True),
UniqueConstraint('name', 'parent')
)
Gas Day,SC
01/10/2016,94616000
02/10/2016,97672000
03/10/2016,106085000
04/10/2016,107655000
05/10/2016,109368000
06/10/2016,111082000
07/10/2016,109294000
08/10/2016,105192000
09/10/2016,108090000
10/10/2016,116650000
11/10/2016,117935000
12/10/2016,119363000
13/10/2016,121362000
14/10/2016,119788000
15/10/2016,116040000
16/10/2016,119742000
# coding: utf-8
from pathlib import Path
from datetime import datetime
from dateutil import parser
import pandas as pd
import numpy as np
from mock import patch
from tshistory.tsio import insert_ts, get_ts, delete_last_diff
DATADIR = Path(__file__).parent / 'data'
def test_differential(engine):
ts_begin = pd.Series(range(10))
ts_begin.index = pd.date_range(start=datetime(2010, 1, 1), freq='D', periods=10)
insert_ts(engine, ts_begin, 'ts_test', 'test')
assert """
2010-01-01 0.0
2010-01-02 1.0
2010-01-03 2.0
2010-01-04 3.0
2010-01-05 4.0
2010-01-06 5.0
2010-01-07 6.0
2010-01-08 7.0
2010-01-09 8.0
2010-01-10 9.0
""".strip() == get_ts(engine, 'ts_test').to_string().strip()
# we should detect the emission of a message
insert_ts(engine, ts_begin, 'ts_test', 'babar')
assert """
2010-01-01 0.0
2010-01-02 1.0
2010-01-03 2.0
2010-01-04 3.0
2010-01-05 4.0
2010-01-06 5.0
2010-01-07 6.0
2010-01-08 7.0
2010-01-09 8.0
2010-01-10 9.0
""".strip() == get_ts(engine, 'ts_test').to_string().strip()
ts_slight_variation = ts_begin.copy()
ts_slight_variation.iloc[3] = 0
ts_slight_variation.iloc[6] = 0
insert_ts(engine, ts_slight_variation, 'ts_test', 'celeste')
assert """
2010-01-01 0.0
2010-01-02 1.0
2010-01-03 2.0
2010-01-04 0.0
2010-01-05 4.0
2010-01-06 5.0
2010-01-07 0.0
2010-01-08 7.0
2010-01-09 8.0
2010-01-10 9.0
""".strip() == get_ts(engine, 'ts_test').to_string().strip()
ts_longer = pd.Series(range(15))
ts_longer.index = pd.date_range(start=datetime(2010, 1, 3), freq='D', periods=15)
ts_longer.iloc[1] = 2.48
ts_longer.iloc[3] = 3.14
ts_longer.iloc[5] = ts_begin.iloc[7]
insert_ts(engine, ts_longer, 'ts_test', 'test')
assert """
2010-01-01 0.00
2010-01-02 1.00
2010-01-03 0.00
2010-01-04 2.48
2010-01-05 2.00
2010-01-06 3.14
2010-01-07 4.00
2010-01-08 7.00
2010-01-09 6.00
2010-01-10 7.00
2010-01-11 8.00
2010-01-12 9.00
2010-01-13 10.00
2010-01-14 11.00
2010-01-15 12.00
2010-01-16 13.00
2010-01-17 14.00
""".strip() == get_ts(engine, 'ts_test').to_string().strip()
# start testing manual overrides
ts_begin = pd.Series([2] * 5)
ts_begin.index = pd.date_range(start=datetime(2010, 1, 1), freq='D', periods=5)
ts_begin.loc['2010-01-04'] = -1
insert_ts(engine, ts_begin, 'ts_mixte', 'test')
# -1 represents bogus upstream data
assert """
2010-01-01 2.0
2010-01-02 2.0
2010-01-03 2.0
2010-01-04 -1.0
2010-01-05 2.0
""".strip() == get_ts(engine, 'ts_mixte').to_string().strip()
# refresh all the period + 1 extra data point
ts_more = pd.Series([2] * 5)
ts_more.index = pd.date_range(start=datetime(2010, 1, 2), freq='D', periods=5)
ts_more.loc['2010-01-04'] = -1
insert_ts(engine, ts_more, 'ts_mixte', 'test')
assert """
2010-01-01 2.0
2010-01-02 2.0
2010-01-03 2.0
2010-01-04 -1.0
2010-01-05 2.0
2010-01-06 2.0
""".strip() == get_ts(engine, 'ts_mixte').to_string().strip()
# just append an extra data point
ts_one_more = pd.Series([3]) # with no intersection with the previous ts
ts_one_more.index = pd.date_range(start=datetime(2010, 1, 7), freq='D', periods=1)
insert_ts(engine, ts_one_more, 'ts_mixte', 'test')
assert """
2010-01-01 2.0
2010-01-02 2.0
2010-01-03 2.0
2010-01-04 -1.0
2010-01-05 2.0
2010-01-06 2.0
2010-01-07 3.0
""".strip() == get_ts(engine, 'ts_mixte').to_string().strip()
allts = pd.read_sql('select name, id, parent from ts_revlog order by id',
engine)
assert """
name id parent
0 ts_test 1 NaN
1 ts_test 2 1.0
2 ts_test 3 2.0
3 ts_mixte 4 NaN
4 ts_mixte 5 4.0
5 ts_mixte 6 5.0
""".strip() == allts.to_string().strip()
assert """
2010-01-01 2.0
2010-01-02 2.0
2010-01-03 2.0
2010-01-04 -1.0
2010-01-05 2.0
2010-01-06 2.0
2010-01-07 3.0
""".strip() == get_ts(engine, 'ts_mixte',
revision_date=datetime.now()).to_string().strip()
# test striping the last diff
delete_last_diff(engine, 'ts_mixte')
assert """
2010-01-01 2.0
2010-01-02 2.0
2010-01-03 2.0
2010-01-04 -1.0
2010-01-05 2.0
2010-01-06 2.0
""".strip() == get_ts(engine, 'ts_mixte').to_string().strip()
allts = pd.read_sql('select name, id, parent from ts_revlog order by id',
engine)
assert """
name id parent
0 ts_test 1 NaN
1 ts_test 2 1.0
2 ts_test 3 2.0
3 ts_mixte 4 NaN
4 ts_mixte 5 4.0
""".strip() == allts.to_string().strip()
def test_bad_import(engine):
# the data were parsed as date by pd.read_json()
df_result = pd.read_csv(DATADIR / 'test_data.csv')
df_result['Gas Day'] = df_result['Gas Day'].apply(parser.parse, dayfirst=True, yearfirst=False)
df_result.set_index('Gas Day', inplace=True)
ts = df_result['SC']
insert_ts(engine, ts, 'SND_SC', 'test')
result = get_ts(engine, 'SND_SC')
assert result.dtype == 'float64'
# insertion of empty ts
ts = pd.Series(name='truc', dtype='object')
insert_ts(engine, ts, 'empty_ts', 'test')
assert get_ts(engine, 'empty_ts') is None
# nan in ts
# all na
ts = pd.Series([np.nan] * 10,
index=pd.date_range(start=datetime(2010, 1, 10),
freq='D', periods=10), name='truc')
insert_ts(engine, ts, 'test_nan', 'test')
assert get_ts(engine, 'test_nan') is None
# mixe na
ts = pd.Series([np.nan] * 5 + [3] * 5,
index=pd.date_range(start=datetime(2010, 1, 10),
freq='D', periods=10), name='truc')
insert_ts(engine, ts, 'test_nan', 'test')
result = get_ts(engine, 'test_nan')
insert_ts(engine, ts, 'test_nan', 'test')
result = get_ts(engine, 'test_nan')
assert """
2010-01-15 3.0
2010-01-16 3.0
2010-01-17 3.0
2010-01-18 3.0
2010-01-19 3.0
""".strip() == result.to_string().strip()
# get_ts with name not in database
get_ts(engine, 'inexisting_name', 'test')
def test_revision_date(engine):
with patch('tshistory.tsio.datetime') as mock_date:
mock_date.now.return_value = datetime(2015, 1, 1, 15, 43, 23)
ts = pd.Series([1] * 4,
index=pd.date_range(start=datetime(2010, 1, 4),
freq='D', periods=4), name='truc')
insert_ts(engine, ts, 'ts_through_time', 'test')
with patch('tshistory.tsio.datetime') as mock_date:
mock_date.now.return_value = datetime(2015, 1, 2, 15, 43, 23)
ts = pd.Series([2] * 4,
index=pd.date_range(start=datetime(2010, 1, 4),
freq='D', periods=4), name='truc')
insert_ts(engine, ts, 'ts_through_time', 'test')
with patch('tshistory.tsio.datetime') as mock_date:
mock_date.now.return_value = datetime(2015, 1, 3, 15, 43, 23)
ts = pd.Series([3] * 4,
index=pd.date_range(start=datetime(2010, 1, 4),
freq='D', periods=4), name='truc')
insert_ts(engine, ts, 'ts_through_time', 'test')
ts = get_ts(engine, 'ts_through_time')
assert """
2010-01-04 3.0
2010-01-05 3.0
2010-01-06 3.0
2010-01-07 3.0
""".strip() == ts.to_string().strip()
ts = get_ts(engine, 'ts_through_time',
revision_date=datetime(2015, 1, 2, 18, 43, 23) )
assert """
2010-01-04 2.0
2010-01-05 2.0
2010-01-06 2.0
2010-01-07 2.0
""".strip() == ts.to_string().strip()
ts = get_ts(engine, 'ts_through_time',
revision_date=datetime(2015, 1, 1, 18, 43, 23))
assert """
2010-01-04 1.0
2010-01-05 1.0
2010-01-06 1.0
2010-01-07 1.0
""".strip() == ts.to_string().strip()
ts = get_ts(engine, 'ts_through_time',
revision_date=datetime(2014, 1, 1, 18, 43, 23))
assert ts is None
from datetime import datetime
import pandas as pd
import numpy as np
from sqlalchemy.sql.expression import select, desc
from tshistory.schema import ts_revlog
PRECISION = 1e-14
def insert_ts(engine, newts, name, author):
"""Create a new revision of a given time series
ts: pandas.Series with date index and float values
name: str unique identifier of the serie
author: str free-form author name
"""
assert isinstance(newts, pd.Series)
newts = newts[~newts.isnull()] # wipe the the NaNs
if len(newts):
assert newts.index.dtype.name == 'datetime64[ns]'
else:
return
newts = newts.astype('float64')
newts.name = name
newrev_sql = ts_revlog.insert()
with engine.connect() as cnx:
snapshot, tip_id = _get_snapshot(cnx, name)
if snapshot is None:
# initial insertion
jsonts = tojson(newts)
value = {
'data': jsonts,
'snapshot': jsonts,
'insertion_date': datetime.now(),
'author': author,
'name': name,
}
cnx.execute(newrev_sql.values(value))
print('Fisrt insertion of %s by %s' % (name, author))
return
# this is the diff between our computed parent
diff = compute_diff(snapshot, newts)
if len(diff) == 0:
print('No difference in %s by %s' % (name, author))
return
assert tip_id is not None
# full state computation & insertion
newsnapshot = apply_diff(snapshot, diff)
value = {
'data': tojson(diff),
'snapshot': tojson(newsnapshot),
'insertion_date': datetime.now(),
'author': author,
'name': name,
'parent': tip_id,
}
cnx.execute(newrev_sql.values(value))
cnx.execute(
ts_revlog.update(
).where(ts_revlog.c.id == tip_id
).values(snapshot=None)
)
print('Insertion differential of %s by %s' % (name, author))
def get_ts(engine, name, revision_date=None):
"""Compute the top-most timeseries of a given name
with manual overrides applied
"""
if revision_date is None:
current, _ = _get_snapshot(engine, name)
else:
current, _ = apply_diffs_upto(engine, name, revision_date)
if current is not None:
current.name = name
return current
def tojson(ts):
if ts is None:
return None
return ts.to_json(orient='split', date_format='iso')
def fromjson(jsonb):
return pd.read_json(jsonb, orient='split',
typ='series', dtype=False)
def _get_snapshot(engine, name):
sql = select([ts_revlog.c.id,
ts_revlog.c.snapshot]
).order_by(desc(ts_revlog.c.id)
).limit(1
).where(ts_revlog.c.name == name)
df = pd.read_sql(sql, engine)
if len(df) == 0:
return None, None
assert len(df) == 1
diff_id = df['id'].iloc[0]
snapshot = fromjson(df['snapshot'].iloc[0])
return snapshot, int(diff_id)
def compute_diff(ts1, ts2):
mask_overlap = ts2.index.isin(ts1.index)
ts_bef_overlap = ts1[ts2.index[mask_overlap]]
ts_overlap = ts2[mask_overlap]
mask_equal = np.isclose(ts_bef_overlap, ts_overlap, atol=PRECISION)
ts_diff_overlap = ts2[mask_overlap][~mask_equal]
ts_diff_new = ts2[~mask_overlap]
ts_result = pd.concat([ts_diff_overlap, ts_diff_new])
return ts_result
def apply_diff(base_ts, new_ts):
"""Produce a new ts using base_ts as a base and
taking any intersecting and new values from new_ts
"""
if base_ts is None:
return new_ts
if new_ts is None:
return base_ts
result_ts = pd.Series([0.0], index=base_ts.index.union(new_ts.index))
result_ts[base_ts.index] = base_ts
result_ts[new_ts.index] = new_ts
result_ts.sort_index(inplace=True)
return result_ts
def apply_diffs_upto(engine, name, revision_date=None):
sql = select([ts_revlog.c.id,
ts_revlog.c.data,
ts_revlog.c.parent,
ts_revlog.c.insertion_date]
).order_by(ts_revlog.c.id
).where(ts_revlog.c.name == name)
alldiffs = pd.read_sql(sql, engine)
if revision_date:
alldiffs = alldiffs[alldiffs['insertion_date'] <= revision_date]
if len(alldiffs) == 0:
return None, None
base = alldiffs.loc[alldiffs.loc[:, 'parent'].isnull()]
# initial ts and its id
ts = fromjson(base['data'].iloc[0])
parent_id = base['id'].iloc[0] # actually the root
if len(alldiffs) == 1:
assert ts.index.dtype.name == 'datetime64[ns]' or len(ts) == 0
return ts, parent_id
while True:
child_row = alldiffs.loc[alldiffs.loc[:, 'parent'] == parent_id, :]
child_ts = fromjson(child_row['data'].iloc[0])
parent_id = child_row['id'].iloc[0]
ts = apply_diff(ts, child_ts)
if parent_id not in alldiffs['parent'].tolist():
return ts, int(parent_id)
def delete_last_diff(engine, name):
with engine.connect() as cnx:
sql = select([ts_revlog.c.id,
ts_revlog.c.parent]
).order_by(desc(ts_revlog.c.id)
).limit(1
).where(ts_revlog.c.name == name)
diff_id, parent_id = cnx.execute(sql).fetchone()
if not diff_id:
return False
sql = ts_revlog.delete().where(
ts_revlog.c.id == diff_id
)
cnx.execute(sql)
# apply on flat
current, parent_id = apply_diffs_upto(cnx, name)
update_snapshot_sql = ts_revlog.update(
).where(ts_revlog.c.id == parent_id
).values(snapshot=tojson(current))
cnx.execute(update_snapshot_sql)
return True
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment