Commit 1319d045 authored by Aurélien Campéas's avatar Aurélien Campéas
Browse files

schema, tsio: introduce a notion of changeset

a changeset has a globally unique identifier and is associated with
the standard metadata (author, insertion_date)

it allows to insert several series in the same coherent "change set",
for later (grouped) retrieval
parent 6d74943457f0
from sqlalchemy import Table, Column, Integer, String, MetaData
from sqlalchemy import Table, Column, Integer, String, MetaData, DateTime
meta = MetaData()
......@@ -9,3 +9,11 @@ ts_registry = Table(
Column('name', String, index=True, nullable=False, unique=True),
Column('table_name', String, index=True, nullable=False, unique=True)
)
ts_changeset = Table(
'ts_changeset', meta,
Column('id', Integer, primary_key=True),
Column('author', String, index=True, nullable=False),
Column('insertion_date', DateTime, index=True, nullable=False)
)
......@@ -6,12 +6,28 @@ from dateutil import parser
import pandas as pd
import numpy as np
from mock import patch
import pytest
from tshistory.tsio import TimeSerie
DATADIR = Path(__file__).parent / 'data'
def test_transaction(engine):
# instantiate one time serie handler object
tso = TimeSerie()
index = pd.date_range(start=datetime(2017, 1, 1), freq='D', periods=3)
with engine.connect() as cnx:
with tso.newchangeset(cnx, 'babar'):
tso.insert(cnx, pd.Series([1,2,3], index=index), 'ts_values')
tso.insert(cnx, pd.Series([5,6,7], index=index), 'ts_othervalues')
with pytest.raises(AssertionError):
tso.insert(engine, pd.Series([2,3,4], index=index), 'ts_values')
def test_differential(engine):
# instantiate one time serie handler object
tso = TimeSerie()
......@@ -158,13 +174,14 @@ def test_differential(engine):
2 3 2.0
""".strip() == hist.to_string().strip()
allts = pd.read_sql('select id, name, table_name from ts_registry',
allts = pd.read_sql("select name, table_name from ts_registry "
"where name in ('ts_test', 'ts_mixte')",
engine)
assert """
id name table_name
0 1 ts_test ts_ts_test
1 2 ts_mixte ts_ts_mixte
name table_name
0 ts_test ts_ts_test
1 ts_mixte ts_ts_mixte
""".strip() == allts.to_string().strip()
assert """
......
from datetime import datetime
from contextlib import contextmanager
import pandas as pd
import numpy as np
from sqlalchemy import Table, Column, Integer, String, DateTime, ForeignKey
from sqlalchemy import Table, Column, Integer, String, ForeignKey
from sqlalchemy.sql.expression import select, desc, func
from sqlalchemy.dialects.postgresql import JSONB
......@@ -23,20 +24,30 @@ def fromjson(jsonb):
typ='series', dtype=False)
class TimeSerie(object):
# API : insert, get
def insert(self, engine, newts, name, author, extra_scalars={}):
_csid = None
# API : changeset, insert, get, delete
@contextmanager
def newchangeset(self, cnx, author):
assert self._csid is None
self._csid = self._newchangeset(cnx, author)
yield
del self._csid
def insert(self, engine, newts, name, author=None,
extra_scalars={}):
"""Create a new revision of a given time series
ts: pandas.Series with date index and float values
name: str unique identifier of the serie
author: str free-form author name
"""
assert self._csid or author, 'author is mandatory'
if self._csid and author:
print('author will not be used when in a changeset')
assert isinstance(newts, pd.Series)
newts = newts[~newts.isnull()] # wipe the the NaNs
newts = newts[~newts.isnull()] # wipe the the NaNs
if len(newts):
assert newts.index.dtype.name == 'datetime64[ns]'
else:
......@@ -53,10 +64,9 @@ class TimeSerie(object):
table = self._make_ts_table(cnx, name)
jsonts = tojson(newts)
value = {
'csid': self._csid or self._newchangeset(cnx, author),
'data': jsonts,
'snapshot': jsonts,
'insertion_date': datetime.now(),
'author': author
}
# callback for extenders
self._complete_insertion_value(value, extra_scalars)
......@@ -74,10 +84,9 @@ class TimeSerie(object):
tip_id = self._get_tip_id(cnx, table)
value = {
'csid': self._csid or self._newchangeset(cnx, author),
'data': tojson(diff),
'snapshot': tojson(newsnapshot),
'insertion_date': datetime.now(),
'author': author,
'parent': tip_id,
}
# callback for extenders
......@@ -103,7 +112,7 @@ class TimeSerie(object):
current = self._read_latest_snapshot(cnx, table)
else:
current = self._build_snapshot_upto(
cnx, table, lambda table: table.c.insertion_date <= revision_date
cnx, table, lambda cset, _: cset.c.insertion_date <= revision_date
)
if current is not None:
......@@ -148,8 +157,8 @@ class TimeSerie(object):
return Table(
tablename, schema.meta,
Column('id', Integer, primary_key=True),
Column('author', String, index=True, nullable=False),
Column('insertion_date', DateTime, index=True, nullable=False),
Column('csid', Integer, ForeignKey('ts_changeset.id'),
nullable=False),
Column('data', JSONB, nullable=False),
Column('snapshot', JSONB),
Column('parent',
......@@ -178,6 +187,13 @@ class TimeSerie(object):
return Table(self._ts_table_name(name), schema.meta,
autoload=True, autoload_with=cnx.engine)
def _newchangeset(self, cnx, author):
table = schema.ts_changeset
sql = table.insert().values(
author=author,
insertion_date=datetime.now())
return cnx.execute(sql).inserted_primary_key[0]
def _get_tip_id(self, cnx, table):
sql = select([func.max(table.c.id)])
return cnx.execute(sql).scalar()
......@@ -236,14 +252,16 @@ class TimeSerie(object):
return result_ts
def _build_snapshot_upto(self, cnx, table, *qfilter):
cset = schema.ts_changeset
sql = select([table.c.id,
table.c.data,
table.c.parent,
table.c.insertion_date]
).order_by(table.c.id)
cset.c.insertion_date]
).order_by(table.c.id
).where(table.c.csid == cset.c.id)
for filtercb in qfilter:
sql = sql.where(filtercb(table))
sql = sql.where(filtercb(cset, table))
alldiffs = pd.read_sql(sql, cnx)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment