Commit 051591e1 authored by Arnaud Campeas's avatar Arnaud Campeas
Browse files

tsio: handle multi-index timeseries


closes #10
parent abfda4d19709
......@@ -35,7 +35,6 @@ def genserie(start, freq, repeat, initval=None, tz=None, name=None):
periods=repeat,
tz=tz))
def test_changeset(engine):
# instantiate one time serie handler object
tso = TimeSerie()
......@@ -530,3 +529,142 @@ def test_deletion(engine):
tso.insert(engine, ts_begin, 'ts_null', 'test')
assert tso.get(engine, 'ts_null') is None
def test_multi_index(engine):
tso = TimeSerie()
appdate_0 = pd.DatetimeIndex(start=datetime(2015, 1, 1),
end=datetime(2015, 1, 2),
freq='D').values
pubdate_0 = [pd.datetime(2015, 1, 11, 12, 0, 0)] * 2
insertion_date_0 = [pd.datetime(2015, 1, 11, 12, 30, 0)] * 2
multi = [
appdate_0,
np.array(pubdate_0),
np.array(insertion_date_0)
]
ts_multi = pd.Series(range(2), index=multi)
ts_multi.index.rename(['b', 'c', 'a'], inplace=True)
tso.insert(engine, ts_multi, 'ts_multi_simple', 'test')
ts = tso.get(engine, 'ts_multi_simple')
assert_df("""
ts_multi_simple
a b c
2015-01-11 12:30:00 2015-01-01 2015-01-11 12:00:00 0
2015-01-02 2015-01-11 12:00:00 1
""", pd.DataFrame(ts))
diff = tso.insert(engine, ts_multi, 'ts_multi_simple', 'test')
assert diff is None
ts_multi_2 = pd.Series([0, 2], index=multi)
ts_multi_2.index.rename(['b', 'c', 'a'], inplace=True)
tso.insert(engine, ts_multi_2, 'ts_multi_simple', 'test')
ts = tso.get(engine, 'ts_multi_simple')
assert_df("""
ts_multi_simple
a b c
2015-01-11 12:30:00 2015-01-01 2015-01-11 12:00:00 0
2015-01-02 2015-01-11 12:00:00 2
""", pd.DataFrame(ts))
# bigger ts
appdate_0 = pd.DatetimeIndex(start=datetime(2015, 1, 1),
end=datetime(2015, 1, 4),
freq='D').values
pubdate_0 = [pd.datetime(2015, 1, 11, 12, 0, 0)] * 4
insertion_date_0 = [pd.datetime(2015, 1, 11, 12, 30, 0)] * 4
appdate_1 = pd.DatetimeIndex(start=datetime(2015, 1, 1),
end=datetime(2015, 1, 4),
freq='D').values
pubdate_1 = [pd.datetime(2015, 1, 21, 12, 0, 0)] * 4
insertion_date_1 = [pd.datetime(2015, 1, 21, 12, 30, 0)] * 4
multi = [
np.concatenate([appdate_0, appdate_1]),
np.array(pubdate_0 + pubdate_1),
np.array(insertion_date_0 + insertion_date_1)
]
ts_multi = pd.Series(range(8), index=multi)
ts_multi.index.rename(['a', 'c', 'b'], inplace=True)
tso.insert(engine, ts_multi, 'ts_multi', 'test')
ts = tso.get(engine, 'ts_multi')
assert_df("""
ts_multi
a b c
2015-01-01 2015-01-11 12:30:00 2015-01-11 12:00:00 0
2015-01-21 12:30:00 2015-01-21 12:00:00 4
2015-01-02 2015-01-11 12:30:00 2015-01-11 12:00:00 1
2015-01-21 12:30:00 2015-01-21 12:00:00 5
2015-01-03 2015-01-11 12:30:00 2015-01-11 12:00:00 2
2015-01-21 12:30:00 2015-01-21 12:00:00 6
2015-01-04 2015-01-11 12:30:00 2015-01-11 12:00:00 3
2015-01-21 12:30:00 2015-01-21 12:00:00 7
""", pd.DataFrame(ts.sort_index()))
# Note: the columnns are returned according to the alphabetic order
appdate_2 = pd.DatetimeIndex(start=datetime(2015, 1, 1),
end=datetime(2015, 1, 4),
freq='D').values
pubdate_2 = [pd.datetime(2015, 1, 31, 12, 0, 0)] * 4
insertion_date_2 = [pd.datetime(2015, 1, 31, 12, 30, 0)] * 4
multi_2 = [
np.concatenate([appdate_1, appdate_2]),
np.array(pubdate_1 + pubdate_2),
np.array(insertion_date_1 + insertion_date_2)
]
ts_multi_2 = pd.Series([4] * 8, index=multi_2)
ts_multi_2.index.rename(['a', 'c', 'b'], inplace=True)
# A second ts is inserted with some index in common with the first
# one: appdate_1, pubdate_1,and insertion_date_1. The value is set
# at 4, which matches the previous value of the "2015-01-01" point.
diff = tso.insert(engine, ts_multi_2, 'ts_multi', 'test')
assert_df("""
ts_multi
a b c
2015-01-01 2015-01-31 12:30:00 2015-01-31 12:00:00 4.0
2015-01-02 2015-01-21 12:30:00 2015-01-21 12:00:00 4.0
2015-01-31 12:30:00 2015-01-31 12:00:00 4.0
2015-01-03 2015-01-21 12:30:00 2015-01-21 12:00:00 4.0
2015-01-31 12:30:00 2015-01-31 12:00:00 4.0
2015-01-04 2015-01-21 12:30:00 2015-01-21 12:00:00 4.0
2015-01-31 12:30:00 2015-01-31 12:00:00 4.0
""", pd.DataFrame(diff.sort_index()))
# the differential skips a value for "2015-01-01"
# which does not change from the previous ts
ts = tso.get(engine, 'ts_multi')
assert_df("""
ts_multi
a b c
2015-01-01 2015-01-11 12:30:00 2015-01-11 12:00:00 0
2015-01-21 12:30:00 2015-01-21 12:00:00 4
2015-01-31 12:30:00 2015-01-31 12:00:00 4
2015-01-02 2015-01-11 12:30:00 2015-01-11 12:00:00 1
2015-01-21 12:30:00 2015-01-21 12:00:00 4
2015-01-31 12:30:00 2015-01-31 12:00:00 4
2015-01-03 2015-01-11 12:30:00 2015-01-11 12:00:00 2
2015-01-21 12:30:00 2015-01-21 12:00:00 4
2015-01-31 12:30:00 2015-01-31 12:00:00 4
2015-01-04 2015-01-11 12:30:00 2015-01-11 12:00:00 3
2015-01-21 12:30:00 2015-01-21 12:00:00 4
2015-01-31 12:30:00 2015-01-31 12:00:00 4
""", pd.DataFrame(ts.sort_index()))
# the result ts have now 3 values for each point in 'a'
......@@ -24,11 +24,27 @@ L = setuplogging()
def tojson(ts):
if ts is None:
return None
return ts.to_json(date_format='iso')
if not isinstance(ts.index, pd.MultiIndex):
return ts.to_json(date_format='iso')
def fromjson(jsonb):
return pd.read_json(jsonb, typ='series', dtype=False)
# multi index case
return ts.to_frame().reset_index().to_json(date_format='iso')
def fromjson(jsonb, tsname):
result = pd.read_json(jsonb, typ='series', dtype=False)
if isinstance(result.index, pd.DatetimeIndex):
return result
# multi index case
columns = result.index.values.tolist()
columns.remove(tsname)
result = pd.read_json(jsonb, typ='frame',
convert_dates=columns)
result.set_index(sorted(columns), inplace=True)
return result.iloc[:,0] # get a Series object
class TimeSerie(object):
......@@ -83,6 +99,10 @@ class TimeSerie(object):
newts.name = name
table = self._get_ts_table(cnx, name)
if isinstance(newts.index, pd.MultiIndex):
# we impose an order to survive rountrips
newts = newts.reorder_levels(sorted(newts.index.names))
if table is None:
# initial insertion
if newts.isnull().all():
......@@ -284,7 +304,7 @@ class TimeSerie(object):
snapid, snapdata = cnx.execute(sql).fetchone()
except TypeError:
return None, None
return snapid, fromjson(snapdata)
return snapid, fromjson(snapdata, table.name)
def _build_snapshot_upto(self, cnx, table, qfilter=()):
snapid, snapshot = self._find_snapshot(cnx, table, qfilter)
......@@ -312,7 +332,7 @@ class TimeSerie(object):
# initial ts
ts = snapshot
for _, row in alldiffs.iterrows():
diff = fromjson(row['diff'])
diff = fromjson(row['diff'], table.name)
ts = self._apply_diff(ts, diff)
assert ts.index.dtype.name == 'datetime64[ns]' or len(ts) == 0
return ts
......@@ -355,4 +375,5 @@ class TimeSerie(object):
result_ts[new_ts.index] = new_ts
result_ts = result_ts[~result_ts.isnull()]
result_ts.sort_index(inplace=True)
result_ts.name = base_ts.name
return result_ts
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment