Commit 8b895bca authored by Aurélien Campéas's avatar Aurélien Campéas
Browse files

util: concentrate more facilities in a SeriesService base object


This will be reused later in other objects,
and may help clarify a bit the overall interface
of TimeSerie.
parent 8816a36d713d
......@@ -778,13 +778,13 @@ Freq: D
assert diff is None
# there is no difference
assert 0 == len(tsh._compute_diff(ts_repushed, ts_repushed))
assert 0 == len(tsh.diff(ts_repushed, ts_repushed))
ts_add = genserie(datetime(2010, 1, 1), 'D', 15)
ts_add.iloc[0] = np.nan
ts_add.iloc[13:] = np.nan
ts_add.iloc[8] = np.nan
diff = tsh._compute_diff(ts_repushed, ts_add)
diff = tsh.diff(ts_repushed, ts_add)
assert_df("""
2010-01-02 1.0
......
from datetime import datetime
from contextlib import contextmanager
import logging
import zlib
import math
import pandas as pd
import numpy as np
from sqlalchemy import Table, Column, Integer, ForeignKey
from sqlalchemy.sql.expression import select, func, desc
......@@ -14,7 +11,9 @@ from sqlalchemy.dialects.postgresql import BYTEA
from tshistory.schema import tsschema
from tshistory.util import (
inject_in_index,
num2float,
subset,
SeriesServices,
tzaware_serie
)
......@@ -22,49 +21,9 @@ from tshistory.util import (
L = logging.getLogger('tshistory.tsio')
def tojson(ts):
if not isinstance(ts.index, pd.MultiIndex):
return ts.to_json(date_format='iso',
double_precision=-int(math.log10(TimeSerie._precision)))
# multi index case
return ts.to_frame().reset_index().to_json(date_format='iso')
def num2float(pdobj):
# get a Series or a Dataframe column
if str(pdobj.dtype).startswith('int'):
return pdobj.astype('float64')
return pdobj
def fromjson(jsonb, tsname):
return _fromjson(jsonb, tsname).fillna(value=np.nan)
def _fromjson(jsonb, tsname):
if jsonb == '{}':
return pd.Series(name=tsname)
result = pd.read_json(jsonb, typ='series', dtype=False)
result.name = tsname
if isinstance(result.index, pd.DatetimeIndex):
result = num2float(result)
return result
# multi index case
columns = result.index.values.tolist()
columns.remove(tsname)
result = pd.read_json(jsonb, typ='frame',
convert_dates=columns)
result.set_index(sorted(columns), inplace=True)
return num2float(result.iloc[:, 0]) # get a Series object
class TimeSerie(object):
class TimeSerie(SeriesServices):
_csid = None
_snapshot_interval = 10
_precision = 1e-14
namespace = 'tsh'
schema = None
......@@ -272,7 +231,7 @@ class TimeSerie(object):
from_value_date, to_value_date)
diff = self._ensure_tz_consistency(cn, diff)
serie = self._apply_diff(series[-1][1], diff)
serie = self.patch(series[-1][1], diff)
series.append((revdate, serie))
for revdate, serie in series:
......@@ -423,14 +382,6 @@ class TimeSerie(object):
# ts serialisation
def _serialize(self, ts):
if ts is None:
return None
return zlib.compress(tojson(ts).encode('utf-8'))
def _deserialize(self, ts, name):
return fromjson(zlib.decompress(ts).decode('utf-8'), name)
def _ensure_tz_consistency(self, cn, ts):
"""Return timeserie with tz aware index or not depending on metadata
tzaware.
......@@ -572,13 +523,13 @@ class TimeSerie(object):
self._validate(cn, table.name, newts)
snapshot = self._build_snapshot_upto(cn, table)
assert snapshot is not None
diff = self._compute_diff(snapshot, newts)
diff = self.diff(snapshot, newts)
if len(diff) == 0:
return None, None
# full state computation & insertion
newsnapshot = self._apply_diff(snapshot, diff)
newsnapshot = self.patch(snapshot, diff)
return diff, newsnapshot
def _find_snapshot(self, cn, table, qfilter=(), column='snapshot',
......@@ -639,8 +590,8 @@ class TimeSerie(object):
diff = subset(self._deserialize(row.diff, table.name),
from_value_date, to_value_date)
diff = self._ensure_tz_consistency(cn, diff)
ts = self._apply_diff(ts, diff)
ts = self._apply_diff(snapshot, ts)
ts = self.patch(ts, diff)
ts = self.patch(snapshot, ts)
assert ts.index.dtype.name == 'datetime64[ns]' or len(ts) == 0
return ts
......@@ -665,48 +616,3 @@ class TimeSerie(object):
ts = self._deserialize(cn.execute(sql).scalar(), name)
return self._ensure_tz_consistency(cn, ts)
def _compute_diff(self, fromts, tots):
"""Compute the difference between fromts and tots
(like in tots - fromts).
"""
if fromts is None:
return tots
fromts = fromts[~fromts.isnull()]
if not len(fromts):
return tots
mask_overlap = tots.index.isin(fromts.index)
fromts_overlap = fromts[tots.index[mask_overlap]]
tots_overlap = tots[mask_overlap]
if fromts.dtype == 'float64':
mask_equal = np.isclose(fromts_overlap, tots_overlap,
rtol=0, atol=self._precision)
else:
mask_equal = fromts_overlap == tots_overlap
mask_na_equal = fromts_overlap.isnull() & tots_overlap.isnull()
mask_equal = mask_equal | mask_na_equal
diff_overlap = tots[mask_overlap][~mask_equal]
diff_new = tots[~mask_overlap]
diff_new = diff_new[~diff_new.isnull()]
return pd.concat([diff_overlap, diff_new])
def _apply_diff(self, base_ts, new_ts):
"""Produce a new ts using base_ts as a base and taking any
intersecting and new values from new_ts.
"""
if base_ts is None:
return new_ts
if new_ts is None:
return base_ts
result_ts = pd.Series([0.0], index=base_ts.index.union(new_ts.index))
result_ts[base_ts.index] = base_ts
result_ts[new_ts.index] = new_ts
result_ts.sort_index(inplace=True)
result_ts.name = base_ts.name
return result_ts
import math
import zlib
import numpy as np
import pandas as pd
from pandas.api.types import is_datetimetz
......@@ -38,3 +42,94 @@ def inject_in_index(serie, revdate):
serie.index = pd.MultiIndex.from_tuples(mindex, names=[
'insertion_date', 'value_date']
)
def num2float(pdobj):
# get a Series or a Dataframe column
if str(pdobj.dtype).startswith('int'):
return pdobj.astype('float64')
return pdobj
def tojson(ts, precision=1e-14):
if not isinstance(ts.index, pd.MultiIndex):
return ts.to_json(date_format='iso',
double_precision=-int(math.log10(precision)))
# multi index case
return ts.to_frame().reset_index().to_json(date_format='iso')
def fromjson(jsonb, tsname):
return _fromjson(jsonb, tsname).fillna(value=np.nan)
def _fromjson(jsonb, tsname):
if jsonb == '{}':
return pd.Series(name=tsname)
result = pd.read_json(jsonb, typ='series', dtype=False)
result.name = tsname
if isinstance(result.index, pd.DatetimeIndex):
result = num2float(result)
return result
# multi index case
columns = result.index.values.tolist()
columns.remove(tsname)
result = pd.read_json(jsonb, typ='frame',
convert_dates=columns)
result.set_index(sorted(columns), inplace=True)
return num2float(result.iloc[:, 0]) # get a Series object
class SeriesServices(object):
_precision = 1e-14
# diff handling
def patch(self, base, diff):
assert base is not None
assert diff is not None
patched = pd.Series([0.0], index=base.index.union(diff.index))
patched[base.index] = base
patched[diff.index] = diff
patched.sort_index(inplace=True)
patched.name = base.name
return patched
def diff(self, base, other):
if base is None:
return other
base = base[~base.isnull()]
if not len(base):
return other
mask_overlap = other.index.isin(base.index)
base_overlap = base[other.index[mask_overlap]]
other_overlap = other[mask_overlap]
if base.dtype == 'float64':
mask_equal = np.isclose(base_overlap, other_overlap,
rtol=0, atol=self._precision)
else:
mask_equal = base_overlap == other_overlap
mask_na_equal = base_overlap.isnull() & other_overlap.isnull()
mask_equal = mask_equal | mask_na_equal
diff_overlap = other[mask_overlap][~mask_equal]
diff_new = other[~mask_overlap]
diff_new = diff_new[~diff_new.isnull()]
return pd.concat([diff_overlap, diff_new])
# serialization
def _serialize(self, ts):
if ts is None:
return None
return zlib.compress(tojson(ts, self._precision).encode('utf-8'))
def _deserialize(self, ts, name):
return fromjson(zlib.decompress(ts).decode('utf-8'), name)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment