Commit 93a10fe1 authored by Aurélien Campéas's avatar Aurélien Campéas
Browse files

use a different table for each timeserie

This will scale better.
The `name` is now not part of the queries any more.
It will be easier to grok what is going on when looking directly
in the db.

`ts_registry` keeps track of the timeseries.
parent 5fbe3c3336fb
from pathlib import Path
from pytest_sa_pg.fixture import engine_fixture
from sqlalchemy import create_engine, select, Table, MetaData
import pytest
from pytest_sa_pg.fixture import db
from tshistory import schema
......@@ -9,5 +12,24 @@ DATADIR = Path(__file__).parent / 'test' / 'data'
DBURI = 'postgresql://localhost:5433/postgres'
engine = engine_fixture(schema.meta, DATADIR, 5433)
@pytest.fixture(scope='session')
def engine(request):
port = 5433
db.setup_local_pg_cluster(request, DATADIR, port)
uri = 'postgresql://localhost:{}/postgres'.format(port)
engine = create_engine(uri)
# explicitly cleanup the ts tables
reg = schema.ts_registry
if reg.exists(engine):
meta = MetaData()
for tname, in engine.execute(select([reg.c.table_name])):
table = Table(tname, meta)
with engine.connect() as cnx:
table.drop(cnx)
# /cleanup
metadata = schema.meta
metadata.drop_all(engine)
metadata.create_all(engine)
return create_engine(uri)
from sqlalchemy import (Table, Column, Integer, String, DateTime,
MetaData, ForeignKey, UniqueConstraint)
MetaData, ForeignKey, UniqueConstraint,
exc)
from sqlalchemy.dialects.postgresql import JSONB
meta = MetaData()
ts_revlog = Table(
'ts_revlog', meta,
ts_registry = Table(
'ts_registry', meta,
Column('id', Integer, primary_key=True),
Column('name', String, index=True, nullable=False),
Column('author', String, index=True, nullable=False),
Column('insertion_date', DateTime, index=True, nullable=False),
Column('data', JSONB, nullable=False),
Column('snapshot', JSONB),
Column('parent',
Integer,
ForeignKey('ts_revlog.id', ondelete='cascade'),
nullable=True,
index=True),
UniqueConstraint('name', 'parent')
Column('name', String, index=True, nullable=False, unique=True),
Column('table_name', String, index=True, nullable=False, unique=True)
)
def ts_table_name(name):
return 'ts_%s' % name
def make_ts_table(cnx, name):
tablename = ts_table_name(name)
table = Table(
tablename, meta,
Column('id', Integer, primary_key=True),
Column('author', String, index=True, nullable=False),
Column('insertion_date', DateTime, index=True, nullable=False),
Column('data', JSONB, nullable=False),
Column('snapshot', JSONB),
Column('parent',
Integer,
ForeignKey('%s.id' % tablename, ondelete='cascade'),
nullable=True,
unique=True,
index=True),
)
table.create(cnx)
sql = ts_registry.insert().values(name=name,
table_name=tablename)
cnx.execute(sql)
return table
def get_ts_table(cnx, name):
sql = ts_registry.select().where(ts_registry.c.name==name)
tid = cnx.execute(sql).scalar()
if tid:
return Table(ts_table_name(name), meta,
autoload=True, autoload_with=cnx.engine)
......@@ -138,16 +138,31 @@ def test_differential(engine):
2010-01-07 3.0
""".strip() == get_ts(engine, 'ts_mixte').to_string().strip()
allts = pd.read_sql('select name, id, parent from ts_revlog order by id',
hist = pd.read_sql('select id, parent from ts_ts_test order by id',
engine)
assert """
name id parent
0 ts_test 1 NaN
1 ts_test 2 1.0
2 ts_test 3 2.0
3 ts_mixte 4 NaN
4 ts_mixte 5 4.0
5 ts_mixte 6 5.0
id parent
0 1 NaN
1 2 1.0
2 3 2.0
""".strip() == hist.to_string().strip()
hist = pd.read_sql('select id, parent from ts_ts_mixte order by id',
engine)
assert """
id parent
0 1 NaN
1 2 1.0
2 3 2.0
""".strip() == hist.to_string().strip()
allts = pd.read_sql('select id, name, table_name from ts_registry',
engine)
assert """
id name table_name
0 1 ts_test ts_ts_test
1 2 ts_mixte ts_ts_mixte
""".strip() == allts.to_string().strip()
assert """
......@@ -173,17 +188,6 @@ def test_differential(engine):
2010-01-06 2.0
""".strip() == get_ts(engine, 'ts_mixte').to_string().strip()
allts = pd.read_sql('select name, id, parent from ts_revlog order by id',
engine)
assert """
name id parent
0 ts_test 1 NaN
1 ts_test 2 1.0
2 ts_test 3 2.0
3 ts_mixte 4 NaN
4 ts_mixte 5 4.0
""".strip() == allts.to_string().strip()
def test_bad_import(engine):
# the data were parsed as date by pd.read_json()
......
......@@ -5,7 +5,7 @@ import numpy as np
from sqlalchemy.sql.expression import select, desc
from tshistory.schema import ts_revlog
from tshistory.schema import make_ts_table, get_ts_table
PRECISION = 1e-14
......@@ -28,24 +28,24 @@ def insert_ts(engine, newts, name, author):
newts = newts.astype('float64')
newts.name = name
newrev_sql = ts_revlog.insert()
with engine.connect() as cnx:
snapshot, tip_id = _get_snapshot(cnx, name)
table = get_ts_table(cnx, name)
if snapshot is None:
if table is None:
# initial insertion
table = make_ts_table(cnx, name)
jsonts = tojson(newts)
value = {
'data': jsonts,
'snapshot': jsonts,
'insertion_date': datetime.now(),
'author': author,
'name': name,
'author': author
}
cnx.execute(newrev_sql.values(value))
cnx.execute(table.insert().values(value))
print('Fisrt insertion of %s by %s' % (name, author))
return
snapshot, tip_id = _get_snapshot(cnx, table)
# this is the diff between our computed parent
diff = compute_diff(snapshot, newts)
......@@ -61,27 +61,30 @@ def insert_ts(engine, newts, name, author):
'snapshot': tojson(newsnapshot),
'insertion_date': datetime.now(),
'author': author,
'name': name,
'parent': tip_id,
}
cnx.execute(newrev_sql.values(value))
cnx.execute(table.insert().values(value))
cnx.execute(
ts_revlog.update(
).where(ts_revlog.c.id == tip_id
table.update(
).where(table.c.id == tip_id
).values(snapshot=None)
)
print('Insertion differential of %s by %s' % (name, author))
def get_ts(engine, name, revision_date=None):
def get_ts(cnx, name, revision_date=None):
"""Compute the top-most timeseries of a given name
with manual overrides applied
"""
table = get_ts_table(cnx, name)
if table is None:
return
if revision_date is None:
current, _ = _get_snapshot(engine, name)
current, _ = _get_snapshot(cnx, table)
else:
current, _ = apply_diffs_upto(engine, name, revision_date)
current, _ = apply_diffs_upto(cnx, table, revision_date)
if current is not None:
current.name = name
......@@ -98,14 +101,13 @@ def fromjson(jsonb):
typ='series', dtype=False)
def _get_snapshot(engine, name):
sql = select([ts_revlog.c.id,
ts_revlog.c.snapshot]
).order_by(desc(ts_revlog.c.id)
).limit(1
).where(ts_revlog.c.name == name)
def _get_snapshot(cnx, table):
sql = select([table.c.id,
table.c.snapshot]
).order_by(desc(table.c.id)
).limit(1)
df = pd.read_sql(sql, engine)
df = pd.read_sql(sql, cnx)
if len(df) == 0:
return None, None
......@@ -143,15 +145,14 @@ def apply_diff(base_ts, new_ts):
return result_ts
def apply_diffs_upto(engine, name, revision_date=None):
sql = select([ts_revlog.c.id,
ts_revlog.c.data,
ts_revlog.c.parent,
ts_revlog.c.insertion_date]
).order_by(ts_revlog.c.id
).where(ts_revlog.c.name == name)
def apply_diffs_upto(cnx, table, revision_date=None):
sql = select([table.c.id,
table.c.data,
table.c.parent,
table.c.insertion_date]
).order_by(table.c.id)
alldiffs = pd.read_sql(sql, engine)
alldiffs = pd.read_sql(sql, cnx)
if revision_date:
alldiffs = alldiffs[alldiffs['insertion_date'] <= revision_date]
......@@ -179,26 +180,26 @@ def apply_diffs_upto(engine, name, revision_date=None):
def delete_last_diff(engine, name):
with engine.connect() as cnx:
sql = select([ts_revlog.c.id,
ts_revlog.c.parent]
).order_by(desc(ts_revlog.c.id)
).limit(1
).where(ts_revlog.c.name == name)
table = get_ts_table(cnx, name)
sql = select([table.c.id,
table.c.parent]
).order_by(desc(table.c.id)
).limit(1)
diff_id, parent_id = cnx.execute(sql).fetchone()
if not diff_id:
return False
sql = ts_revlog.delete().where(
ts_revlog.c.id == diff_id
sql = table.delete().where(
table.c.id == diff_id
)
cnx.execute(sql)
# apply on flat
current, parent_id = apply_diffs_upto(cnx, name)
current, parent_id = apply_diffs_upto(cnx, table)
update_snapshot_sql = ts_revlog.update(
).where(ts_revlog.c.id == parent_id
update_snapshot_sql = table.update(
).where(table.c.id == parent_id
).values(snapshot=tojson(current))
cnx.execute(update_snapshot_sql)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment