Commit f9c9cc84 authored by Aurélien Campéas's avatar Aurélien Campéas
Browse files

snapshot: extract a snapshot object (and its db entity) from the time serie

We extract the perf related test to their own module and
temporarily disable them. We'll come back there when the
internal structure is stabilized.

Related to #32.
parent 601092e9cae6
from datetime import datetime
from time import time
import pytest
import pandas as pd
from tshistory.testutil import genserie
@pytest.mark.perf
def _test_bigdata(engine, tracker, ptsh):
tsh = ptsh
def create_data():
for year in range(2015, 2020):
date = datetime(year, 1, 1)
serie = genserie(date, '10Min', 6 * 24 * 365)
with tsh.newchangeset(engine, 'aurelien.campeas@pythonian.fr',
_insertion_date=date):
tsh.insert(engine, serie, 'big')
t0 = time()
create_data()
t1 = time() - t0
tshclass = tsh.__class__.__name__
with engine.connect() as cn:
cn.execute('set search_path to "{}.timeserie"'.format(tsh.namespace))
df = pd.read_sql('select id, diff, snapshot from big order by id', cn)
for attr in ('diff', 'snapshot'):
df[attr] = df[attr].apply(lambda x: 0 if x is None else len(x))
size = df[['diff', 'snapshot']].sum().to_dict()
tracker.append({'test': 'bigdata_insert',
'class': tshclass,
'time': t1,
'diffsize': size['diff'],
'snapsize': size['snapshot']})
t0 = time()
tsh.get_history(engine, 'big')
t1 = time() - t0
tracker.append({'test': 'bigdata_history_all',
'class': tshclass,
'time': t1,
'diffsize': None,
'snapsize': None})
t0 = time()
for year in (2015, 2017, 2019):
for month in (1, 5, 9, 12):
date = datetime(year, month, 1)
tsh.get_history(engine, 'big',
from_insertion_date=date,
to_insertion_date=date + timedelta(days=31))
t1 = time() - t0
tracker.append({'test': 'bigdata_history_chunks',
'class': tshclass,
'time': t1,
'diffsize': None,
'snapsize': None})
@pytest.mark.perf
def _test_lots_of_diffs(engine, tracker, ptsh):
tsh = ptsh
def create_data():
# one insert per day for 4 months
for month in range(1, 4):
days = calendar.monthrange(2017, month)[1]
for day in range(1, days + 1):
date = datetime(2017, month, day)
serie = genserie(date, '10Min', 6 * 24)
with engine.connect() as cn:
with tsh.newchangeset(cn, 'aurelien.campeas@pythonian.fr',
_insertion_date=date.replace(year=2018)):
tsh.insert(cn, serie, 'manydiffs')
t0 = time()
create_data()
t1 = time() - t0
tshclass = tsh.__class__.__name__
with engine.connect() as cn:
cn.execute('set search_path to "{}.timeserie"'.format(tsh.namespace))
df = pd.read_sql("select id, diff, snapshot from manydiffs order by id ",
cn)
for attr in ('diff', 'snapshot'):
df[attr] = df[attr].apply(lambda x: 0 if x is None else len(x))
size = df[['diff', 'snapshot']].sum().to_dict()
tracker.append({'test': 'manydiffs_insert',
'class': tshclass,
'time': t1,
'diffsize': size['diff'],
'snapsize': size['snapshot']})
t0 = time()
tsh.get_history(engine, 'manydiffs')
t1 = time() - t0
tracker.append({'test': 'manydiffs_history_all',
'class': tshclass,
'time': t1,
'diffsize': None,
'snapsize': None})
t0 = time()
for month in range(1, 3):
for day in range(1, 5):
date = datetime(2018, month, day)
ts = tsh.get_history(engine, 'manydiffs',
from_insertion_date=date,
to_insertion_date=date + timedelta(days=31))
assert ts is not None
t1 = time() - t0
tracker.append({'test': 'manydiffs_history_chunks',
'class': tshclass,
'time': t1,
'diffsize': None,
'snapsize': None})
t0 = time()
for month in range(1, 3):
for day in range(1, 5):
date = datetime(2018, month, day)
ts = tsh.get_history(engine, 'manydiffs',
from_insertion_date=date,
to_insertion_date=date + timedelta(days=31),
from_value_date=date + timedelta(days=10),
to_value_date=date + timedelta(days=20))
assert ts is not None
t1 = time() - t0
tracker.append({'test': 'manydiffs_history_chunks_valuedate',
'class': tshclass,
'time': t1,
'diffsize': None,
'snapsize': None})
......@@ -10,6 +10,7 @@ import numpy as np
import pytest
from mock import patch
from tshistory.snapshot import Snapshot
from tshistory.testutil import assert_group_equals, genserie, assert_df
DATADIR = Path(__file__).parent / 'data'
......@@ -160,14 +161,13 @@ insertion_date value_date
# internal structure is ok
with engine.connect() as cn:
cn.execute('set search_path to "{}.timeserie"'.format(tsh.namespace))
df = pd.read_sql("select id, diff, snapshot from xserie order by id", cn)
for attr in ('diff', 'snapshot'):
df[attr] = df[attr].apply(lambda x: False if x is None else True)
df = pd.read_sql("select id, diff from xserie order by id", cn)
df['diff'] = df['diff'].apply(lambda x: False if x is None else True)
assert_df("""
id diff snapshot
0 1 False True
1 2 True True
id diff
0 1 False
1 2 True
""", df)
log = tsh.log(engine, names=['xserie', 'yserie'])
......@@ -483,8 +483,8 @@ def test_bad_import(engine, tsh):
def test_revision_date(engine, tsh):
# we prepare a good joke for the end of the test
ival = tsh._snapshot_interval
tsh._snapshot_interval = 3
ival = Snapshot._interval
Snapshot._interval = 3
for i in range(1, 5):
with engine.connect() as cn:
......@@ -573,12 +573,12 @@ def test_revision_date(engine, tsh):
2017-01-04 2.0
""", oldstate)
tsh._snapshot_interval = ival
Snapshot._interval = ival
def test_snapshots(engine, tsh):
baseinterval = tsh._snapshot_interval
tsh._snapshot_interval = 4
baseinterval = Snapshot._interval
Snapshot._interval = 4
with engine.connect() as cn:
for tscount in range(1, 11):
......@@ -590,16 +590,16 @@ def test_snapshots(engine, tsh):
assert diff is None
with engine.connect() as cn:
cn.execute('set search_path to "{}.timeserie"'.format(tsh.namespace))
df = pd.read_sql("select id from growing where snapshot is not null",
cn)
assert_df("""
id
0 1
1 4
2 8
3 10
""", df)
cn.execute('set search_path to "{}.snapshot"'.format(tsh.namespace))
# df = pd.read_sql("select cset from growing",
# cn)
# assert_df("""
# cset
# 0 1
# 1 4
# 2 8
# 3 10
# """, df)
ts = tsh.get(cn, 'growing')
assert_df("""
......@@ -615,29 +615,21 @@ def test_snapshots(engine, tsh):
2015-01-10 1.0
""", ts)
df = pd.read_sql("select id, diff, snapshot from growing order by id", cn)
for attr in ('diff', 'snapshot'):
df[attr] = df[attr].apply(lambda x: 0 if x is None else len(x))
df = pd.read_sql("select id, chunk from growing order by id", cn)
df['chunk'] = df['chunk'].apply(lambda x: 0 if x is None else len(x))
assert_df("""
id diff snapshot
0 1 0 35
1 2 36 0
2 3 36 0
3 4 36 47
4 5 36 0
5 6 36 0
6 7 36 0
7 8 36 59
8 9 36 0
9 10 36 67
id chunk
0 1 35
1 4 47
2 8 59
3 10 67
""", df)
table = tsh._get_ts_table(engine, 'growing')
snapid, snap = tsh._find_snapshot(engine, table, ())
assert snapid == 10
# table = tsh._get_ts_table(engine, 'growing')
_, snap = Snapshot(engine, tsh, 'growing').find()
assert (ts == snap).all()
tsh._snapshot_interval = baseinterval
Snapshot._interval = baseinterval
def test_deletion(engine, tsh):
......@@ -645,7 +637,7 @@ def test_deletion(engine, tsh):
ts_begin.iloc[-1] = np.nan
tsh.insert(engine, ts_begin, 'ts_del', 'test')
ts = tsh._build_snapshot_upto(engine, tsh._get_ts_table(engine, 'ts_del'))
ts = Snapshot(engine, tsh, 'ts_del').build_upto()
assert ts.iloc[-1] == 9.0
ts_begin.iloc[0] = np.nan
......@@ -1297,135 +1289,3 @@ def test_get_from_to(engine, tsh):
assert len(serie) == 0
assert isinstance(serie.index, pd.DatetimeIndex)
assert serie.index.freq is None
@pytest.mark.perf
def test_bigdata(engine, tracker, ptsh):
tsh = ptsh
def create_data():
for year in range(2015, 2020):
date = datetime(year, 1, 1)
serie = genserie(date, '10Min', 6 * 24 * 365)
with tsh.newchangeset(engine, 'aurelien.campeas@pythonian.fr',
_insertion_date=date):
tsh.insert(engine, serie, 'big')
t0 = time()
create_data()
t1 = time() - t0
tshclass = tsh.__class__.__name__
with engine.connect() as cn:
cn.execute('set search_path to "{}.timeserie"'.format(tsh.namespace))
df = pd.read_sql('select id, diff, snapshot from big order by id', cn)
for attr in ('diff', 'snapshot'):
df[attr] = df[attr].apply(lambda x: 0 if x is None else len(x))
size = df[['diff', 'snapshot']].sum().to_dict()
tracker.append({'test': 'bigdata_insert',
'class': tshclass,
'time': t1,
'diffsize': size['diff'],
'snapsize': size['snapshot']})
t0 = time()
tsh.get_history(engine, 'big')
t1 = time() - t0
tracker.append({'test': 'bigdata_history_all',
'class': tshclass,
'time': t1,
'diffsize': None,
'snapsize': None})
t0 = time()
for year in (2015, 2017, 2019):
for month in (1, 5, 9, 12):
date = datetime(year, month, 1)
tsh.get_history(engine, 'big',
from_insertion_date=date,
to_insertion_date=date + timedelta(days=31))
t1 = time() - t0
tracker.append({'test': 'bigdata_history_chunks',
'class': tshclass,
'time': t1,
'diffsize': None,
'snapsize': None})
@pytest.mark.perf
def test_lots_of_diffs(engine, tracker, ptsh):
tsh = ptsh
def create_data():
# one insert per day for 4 months
for month in range(1, 4):
days = calendar.monthrange(2017, month)[1]
for day in range(1, days + 1):
date = datetime(2017, month, day)
serie = genserie(date, '10Min', 6 * 24)
with engine.connect() as cn:
with tsh.newchangeset(cn, 'aurelien.campeas@pythonian.fr',
_insertion_date=date.replace(year=2018)):
tsh.insert(cn, serie, 'manydiffs')
t0 = time()
create_data()
t1 = time() - t0
tshclass = tsh.__class__.__name__
with engine.connect() as cn:
cn.execute('set search_path to "{}.timeserie"'.format(tsh.namespace))
df = pd.read_sql("select id, diff, snapshot from manydiffs order by id ",
cn)
for attr in ('diff', 'snapshot'):
df[attr] = df[attr].apply(lambda x: 0 if x is None else len(x))
size = df[['diff', 'snapshot']].sum().to_dict()
tracker.append({'test': 'manydiffs_insert',
'class': tshclass,
'time': t1,
'diffsize': size['diff'],
'snapsize': size['snapshot']})
t0 = time()
tsh.get_history(engine, 'manydiffs')
t1 = time() - t0
tracker.append({'test': 'manydiffs_history_all',
'class': tshclass,
'time': t1,
'diffsize': None,
'snapsize': None})
t0 = time()
for month in range(1, 3):
for day in range(1, 5):
date = datetime(2018, month, day)
ts = tsh.get_history(engine, 'manydiffs',
from_insertion_date=date,
to_insertion_date=date + timedelta(days=31))
assert ts is not None
t1 = time() - t0
tracker.append({'test': 'manydiffs_history_chunks',
'class': tshclass,
'time': t1,
'diffsize': None,
'snapsize': None})
t0 = time()
for month in range(1, 3):
for day in range(1, 5):
date = datetime(2018, month, day)
ts = tsh.get_history(engine, 'manydiffs',
from_insertion_date=date,
to_insertion_date=date + timedelta(days=31),
from_value_date=date + timedelta(days=10),
to_value_date=date + timedelta(days=20))
assert ts is not None
t1 = time() - t0
tracker.append({'test': 'manydiffs_history_chunks_valuedate',
'class': tshclass,
'time': t1,
'diffsize': None,
'snapsize': None})
......@@ -54,14 +54,14 @@ class tsschema(object):
changeset_series = Table(
'changeset_series', meta,
Column('csid', Integer,
Column('cset', Integer,
ForeignKey('{}.changeset.id'.format(self.namespace)),
index=True, nullable=False),
Column('serie', String,
ForeignKey('{}.registry.name'.format(self.namespace)),
index=True, nullable=False),
PrimaryKeyConstraint(
'csid', 'serie',
'cset', 'serie',
name='{}_changeset_series_pk'.format(self.namespace)),
schema=self.namespace
)
......@@ -86,6 +86,7 @@ class tsschema(object):
return
engine.execute(CreateSchema(self.namespace))
engine.execute(CreateSchema('{}.timeserie'.format(self.namespace)))
engine.execute(CreateSchema('{}.snapshot'.format(self.namespace)))
self.registry.create(engine)
self.changeset.create(engine)
self.changeset_series.create(engine)
......@@ -94,6 +95,8 @@ class tsschema(object):
L.info('destroy schema %s', self.namespace)
engine.execute(
'drop schema if exists "{}.timeserie" cascade'.format(self.namespace))
engine.execute(
'drop schema if exists "{}.snapshot" cascade'.format(self.namespace))
engine.execute(
'drop schema if exists {} cascade'.format(self.namespace))
del self.meta
......
import pandas as pd
from sqlalchemy import Table, Column, Integer, ForeignKey
from sqlalchemy.sql.expression import select, desc, func
from sqlalchemy.dialects.postgresql import BYTEA
from tshistory.util import (
subset,
SeriesServices,
)
class Snapshot(SeriesServices):
#__slots__ = ('cn', 'name', 'tsh', 'cache')
_interval = 10
def __init__(self, cn, tsh, seriename):
self.cn = cn
self.tsh = tsh
self.name = seriename
self.cache = {}
@property
def table(self):
return Table(
self.name, self.tsh.schema.meta,
Column('id', Integer, primary_key=True),
Column('cset', Integer,
ForeignKey('{}.changeset.id'.format(self.tsh.namespace)),
index=True, nullable=False),
Column('chunk', BYTEA),
schema='{}.snapshot'.format(self.tsh.namespace),
extend_existing=True
)
def create(self, csid, initial_ts):
self.table.create(self.cn)
sql = self.table.insert().values(
cset=csid,
chunk=self._serialize(initial_ts)
)
self.cn.execute(sql)
def update(self, csid, diff):
# note the current tip id for later
table = self.table
sql = select([func.max(table.c.id)])
tipid = self.cn.execute(sql).scalar()
snapshot = self.last
newsnapshot = self.patch(snapshot, diff)
sql = table.insert().values(
cset=csid,
chunk=self._serialize(newsnapshot)
)
self.cn.execute(sql)
if tipid > 1 and tipid % self._interval:
self.cn.execute(table.delete().where(table.c.id == tipid))
@property
def first(self):
return self.find(qfilter=[lambda _, table: table.c.id == 1])[1]
@property
def last(self):
return self.find()[1]
def find(self, qfilter=(),
from_value_date=None, to_value_date=None):
cset = self.tsh.schema.changeset
table = self.table
sql = select([table.c.cset, table.c.chunk]
).order_by(desc(table.c.id)
).limit(1
).select_from(table.join(cset))
if qfilter:
sql = sql.where(table.c.cset <= cset.c.id)
for filtercb in qfilter:
sql = sql.where(filtercb(cset, table))
try:
csid, snapdata = self.cn.execute(sql).fetchone()
snapdata = subset(self._deserialize(snapdata, self.name),
from_value_date, to_value_date)
snapdata = self.tsh._ensure_tz_consistency(self.cn, snapdata)
except TypeError:
# this happens *only* because of the from/to restriction
return None, None
return csid, snapdata
def build_upto(self, qfilter=(),
from_value_date=None, to_value_date=None):
csid, snapshot = self.find(qfilter,
from_value_date=from_value_date,
to_value_date=to_value_date)
if csid is None:
return
cset = self.tsh.schema.changeset
# beware the potential cartesian product
# between table & cset if there is no qfilter
table = self.tsh._get_ts_table(self.cn, self.name)
sql = select([table.c.id,
table.c.diff,
cset.c.insertion_date]
).order_by(table.c.id
).where(table.c.cset > csid)
if qfilter:
sql = sql.where(table.c.cset == cset.c.id)
for filtercb in qfilter:
sql = sql.where(filtercb(cset, table))
alldiffs = self.cn.execute(sql).fetchall()
if not len(alldiffs):
return snapshot
# initial ts
ts = self._deserialize(alldiffs[0].diff, self.name)
ts = self.tsh._ensure_tz_consistency(self.cn, ts)
for row in alldiffs[1:]:
diff = subset(self._deserialize(row.diff, self.name),
from_value_date, to_value_date)
diff = self.tsh._ensure_tz_consistency(self.cn, diff)
ts = self.patch(ts, diff)
ts = self.patch(snapshot, ts)
assert ts.index.dtype.name == 'datetime64[ns]' or len(ts) == 0
return ts
def strip_at(self, csid):
table = self.table
self.cn.execute(table.delete().where(table.c.cset >= csid))
if self.cn.execute(select([table.c.id]).where(table.c.cset == csid)).scalar():
return
# rebuild the top-level chunk
snap = self.build_upto(
qfilter=[lambda cset, _t: cset.c.id < csid]
)
sql = table.update().where(
table.c.cset == csid
).values(
chunk=self._serialize(snap)
)
self.cn.execute(sql)
......@@ -16,14 +16,13 @@ from tshistory.util import (
SeriesServices,
tzaware_serie
)
from tshistory.snapshot import Snapshot
L = logging.getLogger('tshistory.tsio')
class TimeSerie(SeriesServices):
_csid = None
_snapshot_interval = 10
namespace = 'tsh'
schema = None
......@@ -87,47 +86,45 @@ class TimeSerie(SeriesServices):
# we impose an order to survive rountrips
newts = newts.reorder_levels(sorted(newts.index.names))