Commit 3324094b authored by Aurélien Campéas's avatar Aurélien Campéas
Browse files

tsio/snapshots: we now have chunked snapshots

This should provide a significant speed bonus for many common operatiions.
Notes below about this commit contents:

* conftest: more robust cleanup at test startup time
  When debugging, we might have got phantom inserts of a previous
  session because of an unclean exit.

* tsio: remove customization entry point
  It was not a good idea.

* tsio, snapshot: cache the sqlachemy Table objects
  It turns out these are very expensive to instantiate,
  and we do that a lot.

* tests/perf: becnhmark a forecast-like insertion

* tsio: slight optimisation in _create


Resolves #32.
parent 300562497d27
......@@ -8,6 +8,7 @@ import pytest
from pytest_sa_pg import db
from tshistory import schema, tsio
from tshistory.snapshot import Snapshot
DATADIR = Path(__file__).parent / 'test' / 'data'
......@@ -30,8 +31,11 @@ def engine(request):
scope='session')
def tsh(request, engine):
namespace = request.param
schema.reset(engine, namespace)
schema.delete_schema(engine, namespace)
schema.init(engine, MetaData(), namespace)
if namespace == 'zzz':
Snapshot._bucket_size = 5
tsh = tsio.TimeSerie(namespace)
yield tsh
......
from datetime import datetime
from datetime import timedelta
from time import time
import calendar
from pprint import pprint
import pytest
import pandas as pd
import numpy as np
from tshistory.testutil import genserie
from tshistory.snapshot import Snapshot
from tshistory.testutil import (
assert_df,
genserie,
utcdt,
tempattr
)
@pytest.mark.perf
def _test_bigdata(engine, tracker, ptsh):
def test_hourly_forecast(engine, tracker, ptsh):
tsh = ptsh
# build a sin curve, like e.g. solar input
base = [np.sin(x) for x in np.linspace(0, 3.14, 20)]
base.insert(0, 0)
base.insert(0, 0)
base.append(0)
base.append(0)
base = np.array(base * 3) * 10
# hourly 3 days
forecasts = pd.date_range(start=utcdt(2013, 1, 1),
freq='H',
periods=24 * 365 * 5)
perturbations = [1 + np.random.binomial(10, .5) / 100.
for _ in range(5000)]
def create(name, bsize, limit=None):
with tempattr(Snapshot, '_bucket_size', bsize):
for idx, idate in enumerate(forecasts):
dr = pd.date_range(start=idate, freq='H', periods=48)
perturbation = perturbations[idx]
localbase = base * perturbation
serie = pd.Series(localbase[idate.hour:idate.hour + 48],
index=dr)
diff = tsh.insert(engine, serie, name, 'test',
_insertion_date=idate)
if limit and idx > limit:
break
sizes = (100,)#(25, 50, 100, 200, 300)
for bsize in sizes:
t0 = time()
name = 'fcast_{}'.format(bsize)
create(name, bsize, limit=500)
t1 = time() - t0
sql = 'select parent, chunk from "{}.snapshot".{} order by id'.format(
tsh.namespace,
name
)
out = engine.execute(sql).fetchall()
ssize = sum([len(c) for _, c in out])
noparentcount = len([x for x, _ in out if x is None])
print('bucket_size, snap_size, noparent, time : ',
bsize, ssize, noparentcount, t1)
engine.execute('drop table if exists fcast_sql')
def sqlcreate(limit=None):
for idx, idate in enumerate(forecasts):
dr = pd.date_range(start=idate, freq='H', periods=48)
perturbation = perturbations[idx]
localbase = base * perturbation
serie = pd.DataFrame(
localbase[idate.hour:idate.hour + 48],
index=dr
).reset_index().rename(columns={
'index': 'value_date',
0: 'value'
})
serie['insertion'] = idate
serie = serie[['insertion', 'value_date', 'value']]
serie['value_date'] = serie['value_date'].apply(lambda x: pd.to_datetime(str(x)))
serie['insertion'] = serie['insertion'].apply(lambda x: pd.to_datetime(str(x)))
serie.to_sql('fcast_sql', engine, if_exists='append', index=False)
if limit and idx > limit:
break
t0 = time()
sqlcreate(500)
print('sql insert', time() - t0)
query = '''
WITH tmp as (
SELECT value_date, max(insertion) as insertion
FROM fcast_sql
GROUP BY value_date
)
SELECT main.value_date as "value_date",
main.value as value
FROM fcast_sql as main
JOIN tmp ON (tmp.value_date = main.value_date
AND tmp.insertion = main.insertion)
'''
t0 = time()
sqlts = pd.read_sql(query, engine).set_index('value_date').squeeze()
print('SQL GET', time() - t0)
t0 = time()
tshts = tsh.get(engine, 'fcast_100')
print('TSH GET', time() - t0)
@pytest.mark.perf
def test_bigdata(engine, tracker, ptsh):
tsh = ptsh
def create_data():
# 4 years of sub-hourly points
for year in range(2015, 2020):
date = datetime(year, 1, 1)
date = utcdt(year, 1, 1)
serie = genserie(date, '10Min', 6 * 24 * 365)
with tsh.newchangeset(engine, 'aurelien.campeas@pythonian.fr',
_insertion_date=date):
tsh.insert(engine, serie, 'big')
tsh.insert(engine, serie, 'big', 'aurelien.campeas@pythonian.fr',
_insertion_date=date)
t0 = time()
create_data()
......@@ -26,17 +127,15 @@ def _test_bigdata(engine, tracker, ptsh):
with engine.connect() as cn:
cn.execute('set search_path to "{}.timeserie"'.format(tsh.namespace))
df = pd.read_sql('select id, diff, snapshot from big order by id', cn)
df = pd.read_sql('select id, diff from big order by id', cn)
for attr in ('diff', 'snapshot'):
df[attr] = df[attr].apply(lambda x: 0 if x is None else len(x))
df['diff'] = df['diff'].apply(lambda x: 0 if x is None else len(x))
size = df[['diff', 'snapshot']].sum().to_dict()
size = df['diff'].sum()
tracker.append({'test': 'bigdata_insert',
'class': tshclass,
'time': t1,
'diffsize': size['diff'],
'snapsize': size['snapshot']})
'diffsize': size})
t0 = time()
tsh.get_history(engine, 'big')
......@@ -44,13 +143,12 @@ def _test_bigdata(engine, tracker, ptsh):
tracker.append({'test': 'bigdata_history_all',
'class': tshclass,
'time': t1,
'diffsize': None,
'snapsize': None})
'diffsize': None})
t0 = time()
for year in (2015, 2017, 2019):
for month in (1, 5, 9, 12):
date = datetime(year, month, 1)
date = utcdt(year, month, 1)
tsh.get_history(engine, 'big',
from_insertion_date=date,
to_insertion_date=date + timedelta(days=31))
......@@ -58,12 +156,11 @@ def _test_bigdata(engine, tracker, ptsh):
tracker.append({'test': 'bigdata_history_chunks',
'class': tshclass,
'time': t1,
'diffsize': None,
'snapsize': None})
'diffsize': None})
@pytest.mark.perf
def _test_lots_of_diffs(engine, tracker, ptsh):
def test_lots_of_diffs(engine, tracker, ptsh):
tsh = ptsh
def create_data():
......@@ -71,12 +168,13 @@ def _test_lots_of_diffs(engine, tracker, ptsh):
for month in range(1, 4):
days = calendar.monthrange(2017, month)[1]
for day in range(1, days + 1):
date = datetime(2017, month, day)
date = utcdt(2017, month, day)
serie = genserie(date, '10Min', 6 * 24)
with engine.connect() as cn:
with tsh.newchangeset(cn, 'aurelien.campeas@pythonian.fr',
_insertion_date=date.replace(year=2018)):
tsh.insert(cn, serie, 'manydiffs')
tsh.insert(cn, serie, 'manydiffs',
'aurelien.campeas@pythonian.fr',
_insertion_date=date.replace(year=2018)
)
t0 = time()
create_data()
......@@ -85,17 +183,15 @@ def _test_lots_of_diffs(engine, tracker, ptsh):
with engine.connect() as cn:
cn.execute('set search_path to "{}.timeserie"'.format(tsh.namespace))
df = pd.read_sql("select id, diff, snapshot from manydiffs order by id ",
df = pd.read_sql("select id, diff from manydiffs order by id ",
cn)
for attr in ('diff', 'snapshot'):
df[attr] = df[attr].apply(lambda x: 0 if x is None else len(x))
df['diff'] = df['diff'].apply(lambda x: 0 if x is None else len(x))
size = df[['diff', 'snapshot']].sum().to_dict()
size = df['diff'].sum()
tracker.append({'test': 'manydiffs_insert',
'class': tshclass,
'time': t1,
'diffsize': size['diff'],
'snapsize': size['snapshot']})
'diffsize': size})
t0 = time()
tsh.get_history(engine, 'manydiffs')
......@@ -103,13 +199,12 @@ def _test_lots_of_diffs(engine, tracker, ptsh):
tracker.append({'test': 'manydiffs_history_all',
'class': tshclass,
'time': t1,
'diffsize': None,
'snapsize': None})
'diffsize': None})
t0 = time()
for month in range(1, 3):
for day in range(1, 5):
date = datetime(2018, month, day)
date = utcdt(2018, month, day)
ts = tsh.get_history(engine, 'manydiffs',
from_insertion_date=date,
to_insertion_date=date + timedelta(days=31))
......@@ -118,13 +213,12 @@ def _test_lots_of_diffs(engine, tracker, ptsh):
tracker.append({'test': 'manydiffs_history_chunks',
'class': tshclass,
'time': t1,
'diffsize': None,
'snapsize': None})
'diffsize': None})
t0 = time()
for month in range(1, 3):
for day in range(1, 5):
date = datetime(2018, month, day)
date = utcdt(2018, month, day)
ts = tsh.get_history(engine, 'manydiffs',
from_insertion_date=date,
to_insertion_date=date + timedelta(days=31),
......@@ -135,5 +229,4 @@ def _test_lots_of_diffs(engine, tracker, ptsh):
tracker.append({'test': 'manydiffs_history_chunks_valuedate',
'class': tshclass,
'time': t1,
'diffsize': None,
'snapsize': None})
'diffsize': None})
......@@ -12,7 +12,12 @@ import numpy as np
import pandas as pd
from tshistory.snapshot import Snapshot
from tshistory.testutil import assert_group_equals, genserie, assert_df
from tshistory.testutil import (
assert_df,
assert_group_equals,
genserie,
tempattr
)
DATADIR = Path(__file__).parent / 'data'
......@@ -292,6 +297,189 @@ a b c
assert err.value.args[0] == "Incompatible multi indexes: ['a', 'b', 'c'] vs ['a', 'b']"
def test_chunks(engine, tsh):
with tempattr(Snapshot, '_bucket_size', 2):
ts = genserie(datetime(2010, 1, 1), 'D', 5)
tsh.insert(engine, ts, 'chunks', 'test')
# we expect 3 chunks
sql = 'select parent, chunk from "{}.snapshot".chunks order by id'.format(
tsh.namespace
)
chunks = engine.execute(sql).fetchall()
assert len(chunks) == 3
assert chunks[0].parent is None
assert chunks[1].parent == 1
assert chunks[2].parent == 2
ts0 = tsh._deserialize(chunks[0].chunk, 'name')
ts1 = tsh._deserialize(chunks[1].chunk, 'name')
ts2 = tsh._deserialize(chunks[2].chunk, 'name')
assert_df("""
2010-01-01 0.0
2010-01-02 1.0
""", ts0)
assert_df("""
2010-01-03 2.0
2010-01-04 3.0
""", ts1)
assert_df("""
2010-01-05 4.0
""", ts2)
assert_df("""
2010-01-01 0.0
2010-01-02 1.0
2010-01-03 2.0
2010-01-04 3.0
2010-01-05 4.0
""", tsh.get(engine, 'chunks'))
ts = pd.Series([4, 5, 6, 7, 8],
index=pd.date_range(start=datetime(2010, 1, 5),
end=datetime(2010, 1, 9),
freq='D'))
tsh.insert(engine, ts, 'chunks', 'test')
whole = tsh.get(engine, 'chunks')
assert_df("""
2010-01-01 0.0
2010-01-02 1.0
2010-01-03 2.0
2010-01-04 3.0
2010-01-05 4.0
2010-01-06 5.0
2010-01-07 6.0
2010-01-08 7.0
2010-01-09 8.0
""", whole)
# we expect 6 chunks
sql = 'select id, parent, chunk from "{}.snapshot".chunks order by id'.format(
tsh.namespace
)
chunks = engine.execute(sql).fetchall()
assert len(chunks) == 6
assert chunks[4].parent == 4
assert chunks[5].parent == 5
assert {
1: None,
2: 1,
3: 2, # head of first commit
4: 2,
5: 4,
6: 5 # head of last commit
} == {
chunk.id: chunk.parent for chunk in chunks
}
ts3 = tsh._deserialize(chunks[3].chunk, 'name')
ts4 = tsh._deserialize(chunks[4].chunk, 'name')
ts5 = tsh._deserialize(chunks[5].chunk, 'name')
assert_df("""
2010-01-05 4.0
2010-01-06 5.0
""", ts3)
assert_df("""
2010-01-07 6.0
2010-01-08 7.0
""", ts4)
assert_df("""
2010-01-09 8.0
""", ts5)
# non-append edit
whole[2] = 0
whole[7] = 0
tsh.insert(engine, whole, 'chunks', 'test')
assert_df("""
2010-01-01 0.0
2010-01-02 1.0
2010-01-03 0.0
2010-01-04 3.0
2010-01-05 4.0
2010-01-06 5.0
2010-01-07 6.0
2010-01-08 0.0
2010-01-09 8.0
""", tsh.get(engine, 'chunks'))
assert_df("""
2010-01-05 4.0
2010-01-06 5.0
2010-01-07 6.0
2010-01-08 0.0
2010-01-09 8.0
""", tsh.get(engine, 'chunks', from_value_date=datetime(2010, 1, 5)))
# we expect 10 chunks
# because we edit from the second chunk
# and 4 new chunks have to be made
sql = 'select id, parent, chunk from "{}.snapshot".chunks order by id'.format(
tsh.namespace
)
chunks = engine.execute(sql).fetchall()
assert len(chunks) == 10
assert {
1: None,
2: 1,
3: 2, # head of first commit
4: 2,
5: 4,
6: 5, # head of second commit
7: 1, # base of third commit (we lost many shared chunks)
8: 7,
9: 8,
10: 9 # head of last commit
} == {
chunk.id: chunk.parent for chunk in chunks
}
# 2nd commit chunks without filtering
snap = Snapshot(engine, tsh, 'chunks')
chunks = {parent: len(ts) for parent, ts in snap.rawchunks(6)}
assert chunks == {
None: 2,
1: 2,
2: 2,
4: 2,
5: 1
}
# 2nd commit chunks with filtering
chunks = {
parent: len(ts)
for parent, ts in snap.rawchunks(6, datetime(2010, 1, 5))
}
assert chunks == {2: 2, 4: 2, 5: 1}
# 3rd commit chunks without filtering
chunks = {parent: len(ts) for parent, ts in snap.rawchunks(10)}
assert chunks == {
None: 2,
1: 2,
7: 2,
8: 2,
9: 1
}
# 3rd commit chunks with filtering
chunks = {
parent: len(ts)
for parent, ts in snap.rawchunks(10, datetime(2010, 1, 5))
}
assert chunks == {
7: 2,
8: 2,
9: 1
}
def test_differential(engine, tsh):
ts_begin = genserie(datetime(2010, 1, 1), 'D', 10)
tsh.insert(engine, ts_begin, 'ts_test', 'test')
......@@ -486,8 +674,8 @@ def test_bad_import(engine, tsh):
def test_revision_date(engine, tsh):
# we prepare a good joke for the end of the test
ival = Snapshot._interval
Snapshot._interval = 3
# ival = Snapshot._interval
# Snapshot._interval = 3
for i in range(1, 5):
with engine.connect() as cn:
......@@ -572,10 +760,10 @@ def test_revision_date(engine, tsh):
2017-01-04 2.0
""", oldstate)
Snapshot._interval = ival
# Snapshot._interval = ival
def test_snapshots(engine, tsh):
def _test_snapshots(engine, tsh):
baseinterval = Snapshot._interval
Snapshot._interval = 4
......@@ -614,15 +802,15 @@ def test_snapshots(engine, tsh):
2015-01-10 1.0
""", ts)
df = pd.read_sql("select id, chunk from growing order by id", cn)
df['chunk'] = df['chunk'].apply(lambda x: 0 if x is None else len(x))
df = pd.read_sql("select id, chunkhead from growing order by id", cn)
df['chunkhead'] = df['chunkhead'].apply(lambda x: 0 if x is None else len(x))
assert_df("""
id chunk
0 1 35
1 4 47
2 8 59
3 10 67
id chunkhead
0 1 35
1 4 47
2 8 59
3 10 67
""", df)
# table = tsh._get_ts_table(engine, 'growing')
......@@ -1272,6 +1460,40 @@ def test_get_from_to(engine, tsh):
ts = genserie(datetime(2015, 1, 1), 'D', 365)
tsh.insert(engine, ts, 'quitelong', 'aurelien.campeas@pythonian.fr')
snap = Snapshot(engine, tsh, 'quitelong')
if tsh.namespace == 'zzz':
sql = 'select id, parent from "zzz.snapshot".quitelong order by id'
chunks = engine.execute(sql).fetchall()
# should be perfectly chained
chunks = {
chunk.id: chunk.parent for chunk in chunks
}
chunks.pop(1)
assert all(k == v+1 for k, v in chunks.items())
chunks = {parent: len(ts) for parent, ts in snap.rawchunks(73)}
assert chunks == {None: 5, 1: 5, 2: 5, 3: 5, 4: 5, 5: 5, 6: 5, 7: 5,
8: 5, 9: 5, 10: 5, 11: 5, 12: 5, 13: 5, 14: 5, 15: 5,
16: 5, 17: 5, 18: 5, 19: 5, 20: 5, 21: 5, 22: 5,
23: 5, 24: 5, 25: 5, 26: 5, 27: 5, 28: 5, 29: 5,
30: 5, 31: 5, 32: 5, 33: 5, 34: 5, 35: 5, 36: 5,
37: 5, 38: 5, 39: 5, 40: 5, 41: 5, 42: 5, 43: 5, 44: 5,
45: 5, 46: 5, 47: 5, 48: 5, 49: 5, 50: 5, 51: 5, 52: 5,
53: 5, 54: 5, 55: 5, 56: 5, 57: 5, 58: 5, 59: 5, 60: 5,
61: 5, 62: 5, 63: 5, 64: 5, 65: 5, 66: 5, 67: 5, 68: 5,
69: 5, 70: 5, 71: 5, 72: 5}
chunks = {
parent: len(ts)
for parent, ts in snap.rawchunks(73, datetime(2015, 5, 1))
}
assert chunks == {24: 5, 25: 5, 26: 5, 27: 5, 28: 5, 29: 5, 30: 5, 31: 5,
32: 5, 33: 5, 34: 5, 35: 5, 36: 5, 37: 5, 38: 5, 39: 5,
40: 5, 41: 5, 42: 5, 43: 5, 44: 5, 45: 5, 46: 5, 47: 5,
48: 5, 49: 5, 50: 5, 51: 5, 52: 5, 53: 5, 54: 5, 55: 5,
56: 5, 57: 5, 58: 5, 59: 5, 60: 5, 61: 5, 62: 5, 63: 5,
64: 5, 65: 5, 66: 5, 67: 5, 68: 5, 69: 5, 70: 5, 71: 5,
72: 5}
serie = tsh.get(engine, 'quitelong')
assert serie.index[0] == pd.Timestamp('2015-01-01 00:00:00')
assert serie.index[-1] == pd.Timestamp('2015-12-31 00:00:00')
......
......@@ -12,6 +12,14 @@ L = logging.getLogger('tshistory.schema')
SCHEMAS = {}
meta = MetaData()
def delete_schema(engine, ns):
with engine.connect() as cn:
for subns in ('timeserie', 'snapshot'):
cn.execute(
'drop schema if exists "{}.{}" cascade'.format(ns, subns)
)
cn.execute('drop schema if exists {} cascade'.format(ns))
class tsschema(object):
namespace = 'tsh'
......@@ -93,12 +101,7 @@ class tsschema(object):
def destroy(self, engine):
L.info('destroy schema %s', self.namespace)
engine.execute(
'drop schema if exists "{}.timeserie" cascade'.format(self.namespace))
engine.execute(
'drop schema if exists "{}.snapshot" cascade'.format(self.namespace))
engine.execute(
'drop schema if exists {} cascade'.format(self.namespace))
delete_schema(engine, self.namespace)
del self.meta
del self.registry
del self.changeset
......
from collections import deque
import pandas as pd
from sqlalchemy import Table, Column, Integer, ForeignKey
from sqlalchemy.sql.expression import select, desc, func
from sqlalchemy.dial