Commit 670ba527 authored by Arnaud Campeas's avatar Arnaud Campeas
Browse files

backport from the proprietary `data_hub` product

parents
from pathlib import Path
import pytest
from sqlalchemy import create_engine, MetaData
from pytest_sa_pg import db
from tshistory.schema import init, reset
from tshistory_supervision.tsio import TimeSerie
DATADIR = Path(__file__).parent / 'test' / 'data'
@pytest.fixture(scope='session')
def engine(request):
port = 5433
db.setup_local_pg_cluster(request, DATADIR, port)
uri = 'postgresql://localhost:{}/postgres'.format(port)
e = create_engine(uri)
meta = MetaData()
with e.connect() as cn:
reset(cn)
with e.connect() as cn:
init(cn, meta)
yield e
@pytest.fixture(scope='session')
def tsh(request, engine):
return TimeSerie()
[metadata]
description-file = README.md
[bdist_wheel]
universal = 1
from setuptools import setup
setup(name='tshistory_supervision',
version='0.1.0',
author='Pythonian',
author_email='aurelien.campeas@pythonian.fr, arnaud.campeas@pythonian.fr',
url='https://bitbucket.org/pythonian/tshistory_supervision',
description='Store timeseries histories into postgres',
packages=['tshistory_supervision'],
install_requires=[
'tshistory'
],
classifiers=[
'Development Status :: 4 - Beta',
'Intended Audience :: Developers',
'License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)',
'Operating System :: OS Independent',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 3',
'Topic :: Database',
'Topic :: Scientific/Engineering',
'Topic :: Software Development :: Version Control'
]
)
from pathlib import Path
from datetime import datetime
import pytest
from mock import patch
import pandas as pd
import numpy as np
DATADIR = Path(__file__).parent / 'data'
def assert_df(expected, df):
assert expected.strip() == df.to_string().strip()
def genserie(start, freq, repeat, initval=None, tz=None, name=None):
if initval is None:
values = range(repeat)
else:
values = initval * repeat
return pd.Series(values,
name=name,
index=pd.date_range(start=start,
freq=freq,
periods=repeat,
tz=tz))
def test_mercure_serie(engine, tsh):
# mercure serie have numerical names
# this can cause issues if not properly handed to
# postgres ...
tsh.insert(engine, genserie(datetime(2010, 1, 1), 'D', 3),
'42', 'Babar')
s = tsh.get(engine, '42')
assert 3 == len(s)
assert_df("""
id name table_name
0 1 42 tsh.timeserie.42
""", pd.read_sql('select id, name, table_name from tsh.registry', engine))
assert_df("""
csid serie
0 1 42
""", pd.read_sql('select * from tsh.changeset_series', engine))
def test_differential(engine, tsh):
ts_begin = genserie(datetime(2010, 1, 1), 'D', 10)
tsh.insert(engine, ts_begin, 'ts_test', 'test')
assert_df("""
2010-01-01 0.0
2010-01-02 1.0
2010-01-03 2.0
2010-01-04 3.0
2010-01-05 4.0
2010-01-06 5.0
2010-01-07 6.0
2010-01-08 7.0
2010-01-09 8.0
2010-01-10 9.0
""", tsh.get(engine, 'ts_test'))
# we should detect the emission of a message
tsh.insert(engine, ts_begin, 'ts_test', 'test')
assert_df("""
2010-01-01 0.0
2010-01-02 1.0
2010-01-03 2.0
2010-01-04 3.0
2010-01-05 4.0
2010-01-06 5.0
2010-01-07 6.0
2010-01-08 7.0
2010-01-09 8.0
2010-01-10 9.0
""", tsh.get(engine, 'ts_test'))
ts_slight_variation = ts_begin.copy()
ts_slight_variation.iloc[3] = 0
ts_slight_variation.iloc[6] = 0
tsh.insert(engine, ts_slight_variation, 'ts_test', 'test')
assert_df("""
2010-01-01 0.0
2010-01-02 1.0
2010-01-03 2.0
2010-01-04 0.0
2010-01-05 4.0
2010-01-06 5.0
2010-01-07 0.0
2010-01-08 7.0
2010-01-09 8.0
2010-01-10 9.0
""", tsh.get(engine, 'ts_test'))
ts_longer = genserie(datetime(2010, 1, 3), 'D', 15)
ts_longer.iloc[1] = 2.48
ts_longer.iloc[3] = 3.14
ts_longer.iloc[5] = ts_begin.iloc[7]
tsh.insert(engine, ts_longer, 'ts_test', 'test')
assert_df("""
2010-01-01 0.00
2010-01-02 1.00
2010-01-03 0.00
2010-01-04 2.48
2010-01-05 2.00
2010-01-06 3.14
2010-01-07 4.00
2010-01-08 7.00
2010-01-09 6.00
2010-01-10 7.00
2010-01-11 8.00
2010-01-12 9.00
2010-01-13 10.00
2010-01-14 11.00
2010-01-15 12.00
2010-01-16 13.00
2010-01-17 14.00
""", tsh.get(engine, 'ts_test'))
def test_manual_overrides(engine, tsh):
tsh._snapshot_interval = 10
# start testing manual overrides
ts_begin = genserie(datetime(2010, 1, 1), 'D', 5, [2.])
ts_begin.loc['2010-01-04'] = -1
tsh.insert(engine, ts_begin, 'ts_mixte', 'test')
# -1 represents bogus upstream data
assert_df("""
2010-01-01 2.0
2010-01-02 2.0
2010-01-03 2.0
2010-01-04 -1.0
2010-01-05 2.0
""", tsh.get(engine, 'ts_mixte'))
# test marker for first inserstion
_, marker = tsh.get_ts_marker(engine, 'ts_mixte')
assert not marker.any()
# refresh all the period + 1 extra data point
ts_more = genserie(datetime(2010, 1, 2), 'D', 5, [2])
ts_more.loc['2010-01-04'] = -1
tsh.insert(engine, ts_more, 'ts_mixte', 'test')
assert_df("""
2010-01-01 2.0
2010-01-02 2.0
2010-01-03 2.0
2010-01-04 -1.0
2010-01-05 2.0
2010-01-06 2.0
""", tsh.get(engine, 'ts_mixte'))
# just append an extra data point
# with no intersection with the previous ts
ts_one_more = genserie(datetime(2010, 1, 7), 'D', 1, [2])
tsh.insert(engine, ts_one_more, 'ts_mixte', 'test')
assert_df("""
2010-01-01 2.0
2010-01-02 2.0
2010-01-03 2.0
2010-01-04 -1.0
2010-01-05 2.0
2010-01-06 2.0
2010-01-07 2.0
""", tsh.get(engine, 'ts_mixte'))
# edit the bogus upstream data: -1 -> 3
# also edit the next value
ts_manual = genserie(datetime(2010, 1, 4), 'D', 2, [3])
tsh.insert(engine, ts_manual, 'ts_mixte', 'test', dict(manual=True))
tsh.get_ts_marker(engine, 'ts_mixte')
assert_df("""
2010-01-01 2.0
2010-01-02 2.0
2010-01-03 2.0
2010-01-04 3.0
2010-01-05 3.0
2010-01-06 2.0
2010-01-07 2.0
""", tsh.get(engine, 'ts_mixte'))
# refetch upstream: the fixed value override must remain in place
assert -1 == ts_begin['2010-01-04']
tsh.insert(engine, ts_begin, 'ts_mixte', 'test')
assert_df("""
2010-01-01 2.0
2010-01-02 2.0
2010-01-03 2.0
2010-01-04 3.0
2010-01-05 3.0
2010-01-06 2.0
2010-01-07 2.0
""", tsh.get(engine, 'ts_mixte'))
# upstream provider fixed its bogus value: the manual override
# should be replaced by the new provider value
ts_begin_amend = ts_begin.copy()
ts_begin_amend.iloc[3] = 2
tsh.insert(engine, ts_begin_amend, 'ts_mixte', 'test')
ts, marker = tsh.get_ts_marker(engine, 'ts_mixte')
assert_df("""
2010-01-01 False
2010-01-02 False
2010-01-03 False
2010-01-04 False
2010-01-05 True
2010-01-06 False
2010-01-07 False
""", marker)
assert_df("""
2010-01-01 2.0
2010-01-02 2.0
2010-01-03 2.0
2010-01-04 2.0
2010-01-05 3.0
2010-01-06 2.0
2010-01-07 2.0
""", ts)
# another iterleaved editing session
ts_edit = genserie(datetime(2010, 1, 4), 'D', 1, [2])
tsh.insert(engine, ts_edit, 'ts_mixte', 'test', dict(manual=True))
assert 2 == tsh.get(engine, 'ts_mixte')['2010-01-04'] # still
ts, marker = tsh.get_ts_marker(engine, 'ts_mixte')
assert_df("""
2010-01-01 False
2010-01-02 False
2010-01-03 False
2010-01-04 False
2010-01-05 True
2010-01-06 False
2010-01-07 False
""", marker)
# another iterleaved editing session
drange = pd.date_range(start=datetime(2010, 1, 4), periods=1)
ts_edit = pd.Series([4], index=drange)
tsh.insert(engine, ts_edit, 'ts_mixte', 'test', dict(manual=True))
assert 4 == tsh.get(engine, 'ts_mixte')['2010-01-04'] # still
ts_auto_resend_the_same = pd.Series([2], index=drange)
tsh.insert(engine, ts_auto_resend_the_same, 'ts_mixte', 'test')
assert 4 == tsh.get(engine, 'ts_mixte')['2010-01-04'] # still
ts_auto_fix_value = pd.Series([7], index=drange)
tsh.insert(engine, ts_auto_fix_value, 'ts_mixte', 'test')
assert 7 == tsh.get(engine, 'ts_mixte')['2010-01-04'] # still
# test the marker logic
# which helps put nice colour cues in the excel sheet
# get_ts_marker returns a ts and its manual override mask
# test we get a proper ts
ts_auto, _ = tsh.get_ts_marker(engine, 'ts_mixte')
assert_df("""
2010-01-01 2.0
2010-01-02 2.0
2010-01-03 2.0
2010-01-04 7.0
2010-01-05 3.0
2010-01-06 2.0
2010-01-07 2.0
""", ts_auto)
ts_manual = genserie(datetime(2010, 1, 5), 'D', 2, [3])
tsh.insert(engine, ts_manual, 'ts_mixte', 'test', dict(manual=True))
ts_manual = genserie(datetime(2010, 1, 9), 'D', 1, [3])
tsh.insert(engine, ts_manual, 'ts_mixte', 'test', dict(manual=True))
tsh.insert(engine, ts_auto, 'ts_mixte', 'test')
upstream_fix = pd.Series([2.5], index=[datetime(2010, 1, 5)])
tsh.insert(engine, upstream_fix, 'ts_mixte', 'test')
# we had three manual overrides, but upstream fixed one of its values
tip_ts, tip_marker = tsh.get_ts_marker(engine, 'ts_mixte')
assert_df("""
2010-01-01 2.0
2010-01-02 2.0
2010-01-03 2.0
2010-01-04 7.0
2010-01-05 2.5
2010-01-06 3.0
2010-01-07 2.0
2010-01-09 3.0
""", tip_ts)
assert_df("""
2010-01-01 False
2010-01-02 False
2010-01-03 False
2010-01-04 False
2010-01-05 False
2010-01-06 True
2010-01-07 False
2010-01-09 True
""", tip_marker)
# just another override for the fun
ts_manual.iloc[0] = 4
tsh.insert(engine, ts_manual, 'ts_mixte', 'test', dict(manual=True))
assert_df("""
2010-01-01 2.0
2010-01-02 2.0
2010-01-03 2.0
2010-01-04 7.0
2010-01-05 2.5
2010-01-06 3.0
2010-01-07 2.0
2010-01-09 4.0
""", tsh.get(engine, 'ts_mixte'))
def test_first_manual(engine, tsh):
ts_begin = genserie(datetime(2010, 1, 1), 'D', 10)
tsh.insert(engine, ts_begin, 'ts_only', 'test', dict(manual=True))
assert_df("""
2010-01-01 0.0
2010-01-02 1.0
2010-01-03 2.0
2010-01-04 3.0
2010-01-05 4.0
2010-01-06 5.0
2010-01-07 6.0
2010-01-08 7.0
2010-01-09 8.0
2010-01-10 9.0
""", tsh.get(engine, 'ts_only'))
# we should detect the emission of a message
tsh.insert(engine, ts_begin, 'ts_only', 'test', dict(manual=True))
assert_df("""
2010-01-01 0.0
2010-01-02 1.0
2010-01-03 2.0
2010-01-04 3.0
2010-01-05 4.0
2010-01-06 5.0
2010-01-07 6.0
2010-01-08 7.0
2010-01-09 8.0
2010-01-10 9.0
""", tsh.get(engine, 'ts_only'))
ts_slight_variation = ts_begin.copy()
ts_slight_variation.iloc[3] = 0
ts_slight_variation.iloc[6] = 0
tsh.insert(engine, ts_slight_variation, 'ts_only', 'test')
tsh.get(engine, 'ts_only').to_string().strip()
# should be a noop
tsh.insert(engine, ts_slight_variation, 'ts_only', 'test', dict(manual=True))
_, marker = tsh.get_ts_marker(engine, 'ts_only')
assert_df("""
2010-01-01 False
2010-01-02 False
2010-01-03 False
2010-01-04 False
2010-01-05 False
2010-01-06 False
2010-01-07 False
2010-01-08 False
2010-01-09 False
2010-01-10 False
""", marker)
with engine.connect() as cn:
cn.execute('set search_path to "tsh.timeserie"')
df = pd.read_sql("select id, manual, diff, snapshot, autosnapshot from ts_only",
cn)
for attr in ('diff', 'snapshot', 'autosnapshot'):
df[attr] = df[attr].apply(lambda x: 0 if x is None else len(x))
assert """
id manual diff snapshot autosnapshot
0 1 True 0 82 0
1 2 False 84 84 84
""".strip() == df.to_string().strip()
def test_more_manual(engine, tsh):
ts = genserie(datetime(2015, 1, 1), 'D', 5)
tsh.insert(engine, ts, 'ts_exp1', 'test')
ts_man = genserie(datetime(2015, 1, 3), 'D', 3, -1)
ts_man.iloc[-1] = np.nan
# erasing of the laste value for the date 5/1/2015
tsh.insert(engine, ts_man, 'ts_exp1', 'test', {'manual': True})
ts_get = tsh.get(engine, 'ts_exp1')
assert_df("""
2015-01-01 0.0
2015-01-02 1.0
2015-01-03 -3.0
2015-01-04 -3.0""", ts_get)
ts_marker, marker = tsh.get_ts_marker(engine, 'ts_exp1')
assert ts_marker.equals(ts_get)
assert_df("""
2015-01-01 False
2015-01-02 False
2015-01-03 True
2015-01-04 True
2015-01-05 True""", marker)
def test_revision_date(engine, tsh):
with patch('tshistory.tsio.datetime') as mock_date:
mock_date.now.return_value = datetime(2015, 1, 1, 15, 43, 23)
ts = genserie(datetime(2010, 1, 4), 'D', 4, [1], name='truc')
tsh.insert(engine, ts, 'ts_through_time', 'test')
with patch('tshistory.tsio.datetime') as mock_date:
mock_date.now.return_value = datetime(2015, 1, 2, 15, 43, 23)
ts = genserie(datetime(2010, 1, 4), 'D', 4, [2], name='truc')
tsh.insert(engine, ts, 'ts_through_time', 'test')
with patch('tshistory.tsio.datetime') as mock_date:
mock_date.now.return_value = datetime(2015, 1, 3, 15, 43, 23)
ts = genserie(datetime(2010, 1, 4), 'D', 4, [3], name='truc')
tsh.insert(engine, ts, 'ts_through_time', 'test')
ts = tsh.get(engine, 'ts_through_time')
assert_df("""
2010-01-04 3.0
2010-01-05 3.0
2010-01-06 3.0
2010-01-07 3.0
""", ts)
ts = tsh.get(engine, 'ts_through_time',
revision_date=datetime(2015, 1, 2, 18, 43, 23))
assert_df("""
2010-01-04 2.0
2010-01-05 2.0
2010-01-06 2.0
2010-01-07 2.0
""", ts)
ts = tsh.get(engine, 'ts_through_time',
revision_date=datetime(2015, 1, 1, 18, 43, 23))
assert_df("""
2010-01-04 1.0
2010-01-05 1.0
2010-01-06 1.0
2010-01-07 1.0
""", ts)
ts = tsh.get(engine, 'ts_through_time',
revision_date=datetime(2014, 1, 1, 18, 43, 23))
assert ts is None
def test_before_first_insertion(engine, tsh):
tsh.insert(engine, genserie(datetime(2010, 1, 1), 'D', 11), 'ts_shtroumpf', 'test')
# test get_marker with an unknown series vs a serie displayed with
# a revision date before the first insertion
result = tsh.get_ts_marker(engine, 'unknown_ts')
assert (None, None) == result
result = tsh.get_ts_marker(engine, 'ts_shtroumpf', revision_date=datetime(1970, 1, 1))
assert (None, None) == result
def test_na_and_delete(engine, tsh):
ts_repushed = genserie(datetime(2010, 1, 1), 'D', 11)
ts_repushed[0:3] = np.nan
tsh.insert(engine, ts_repushed, 'ts_repushed', 'test')
diff = tsh.insert(engine, ts_repushed, 'ts_repushed', 'test')
assert diff is None
def test_exotic_name(engine, tsh):
ts = genserie(datetime(2010, 1, 1), 'D', 11)
tsh.insert(engine, ts, 'ts-with_dash', 'test')
tsh.get(engine, 'ts-with_dash')
def test_series_dtype(engine, tsh):
tsh.insert(engine,
genserie(datetime(2015, 1, 1),
'D',
11).astype('str'),
'error1',
'test')
with pytest.raises(Exception) as excinfo:
tsh.insert(engine,
genserie(datetime(2015, 1, 1),
'D',
11),
'error1',
'test')
assert 'Type error when inserting error1, new type is float64, type in base is object' == str(excinfo.value)
tsh.insert(engine,
genserie(datetime(2015, 1, 1),
'D',
11),
'error2',
'test')
with pytest.raises(Exception) as excinfo:
tsh.insert(engine,
genserie(datetime(2015, 1, 1),
'D',
11).astype('str'),
'error2',
'test')
assert 'Type error when inserting error2, new type is object, type in base is float64' == str(excinfo.value)
import pandas as pd
import numpy as np
from sqlalchemy import Column, Boolean, select, desc
from sqlalchemy.dialects.postgresql import BYTEA
from tshistory.tsio import TimeSerie as BaseTS
def join_index(ts1, ts2):
if ts1 is None and ts2 is No