Commit 8bb0496b authored by Aurélien Campéas's avatar Aurélien Campéas
Browse files

tsio: name -> seriename to make it clear what we are talking about

parent 14345ad740a4
......@@ -36,18 +36,18 @@ class TimeSerie(SeriesServices):
self.metadatacache = {}
self.registry_map = {}
def insert(self, cn, newts, name, author,
def insert(self, cn, newts, seriename, author,
metadata=None,
_insertion_date=None):
"""Create a new revision of a given time series
newts: pandas.Series with date index
name: str unique identifier of the serie
seriename: str unique identifier of the serie
author: str free-form author name
metadata: optional dict for changeset metadata
"""
assert isinstance(newts, pd.Series)
assert isinstance(name, str)
assert isinstance(seriename, str)
assert isinstance(author, str)
assert metadata is None or isinstance(metadata, dict)
assert _insertion_date is None or isinstance(_insertion_date, datetime)
......@@ -62,17 +62,17 @@ class TimeSerie(SeriesServices):
'datetime' in str(newts.index.dtype) and not
isinstance(newts.index, pd.MultiIndex))
newts.name = name
table = self._get_ts_table(cn, name)
newts.name = seriename
table = self._get_ts_table(cn, seriename)
if table is None:
return self._create(cn, newts, name, author,
return self._create(cn, newts, seriename, author,
metadata, _insertion_date)
return self._update(cn, table, newts, name, author,
return self._update(cn, table, newts, seriename, author,
metadata, _insertion_date)
def get(self, cn, name, revision_date=None,
def get(self, cn, seriename, revision_date=None,
from_value_date=None, to_value_date=None,
_keep_nans=False):
"""Compute and return the serie of a given name
......@@ -81,44 +81,44 @@ class TimeSerie(SeriesServices):
serie
"""
table = self._get_ts_table(cn, name)
table = self._get_ts_table(cn, seriename)
if table is None:
return
csetfilter = []
if revision_date:
csetfilter.append(lambda cset: cset.c.insertion_date <= revision_date)
snap = Snapshot(cn, self, name)
snap = Snapshot(cn, self, seriename)
_, current = snap.find(csetfilter=csetfilter,
from_value_date=from_value_date,
to_value_date=to_value_date)
if current is not None and not _keep_nans:
current.name = name
current.name = seriename
current = current[~current.isnull()]
return current
def metadata(self, cn, tsname):
def metadata(self, cn, seriename):
"""Return metadata dict of timeserie."""
if tsname in self.metadatacache:
return self.metadatacache[tsname]
if seriename in self.metadatacache:
return self.metadatacache[seriename]
reg = self.schema.registry
sql = select([reg.c.metadata]).where(
reg.c.name == tsname
reg.c.name == seriename
)
meta = cn.execute(sql).scalar()
self.metadatacache[tsname] = meta
self.metadatacache[seriename] = meta
return meta
def update_metadata(self, cn, tsname, metadata, internal=False):
def update_metadata(self, cn, seriename, metadata, internal=False):
assert isinstance(metadata, dict)
meta = self.metadata(cn, tsname)
meta = self.metadata(cn, seriename)
if not internal:
assert set(meta.keys()).intersection(metadata.keys()) == set()
meta.update(metadata)
reg = self.schema.registry
sql = reg.update().where(
reg.c.name == tsname
reg.c.name == seriename
).values(metadata=metadata)
cn.execute(sql)
......@@ -130,14 +130,14 @@ class TimeSerie(SeriesServices):
)
return cn.execute(sql).scalar()
def get_history(self, cn, name,
def get_history(self, cn, seriename,
from_insertion_date=None,
to_insertion_date=None,
from_value_date=None,
to_value_date=None,
deltabefore=None,
deltaafter=None):
table = self._get_ts_table(cn, name)
table = self._get_ts_table(cn, seriename)
if table is None:
return
......@@ -163,7 +163,7 @@ class TimeSerie(SeriesServices):
if not revs:
return
snapshot = Snapshot(cn, self, name)
snapshot = Snapshot(cn, self, seriename)
series = []
for csid, idate in revs:
if (deltabefore, deltaafter) != (None, None):
......@@ -184,12 +184,12 @@ class TimeSerie(SeriesServices):
inject_in_index(serie, revdate)
serie = pd.concat([serie for revdate_, serie in series])
serie.name = name
serie.name = seriename
return serie
def get_delta(self, cn, name, delta):
def get_delta(self, cn, seriename, delta):
histo = self.get_history(
cn, name, deltabefore=-delta
cn, seriename, deltabefore=-delta
)
df = histo.reset_index()
......@@ -200,7 +200,7 @@ class TimeSerie(SeriesServices):
# which is the last inserted
selected_dates = df_date.groupby('value_date').max().reset_index()
ts = df[name]
ts = df[seriename]
# ts is built from the df returned from get_history
# ts index is now a simple index of tuples (insert_date, value_date)
ts.index = ((row.insertion_date, row.value_date)
......@@ -220,12 +220,12 @@ class TimeSerie(SeriesServices):
return ts_select
def exists(self, cn, name):
return self._get_ts_table(cn, name) is not None
def exists(self, cn, seriename):
return self._get_ts_table(cn, seriename) is not None
def latest_insertion_date(self, cn, name):
def latest_insertion_date(self, cn, seriename):
cset = self.schema.changeset
tstable = self._get_ts_table(cn, name)
tstable = self._get_ts_table(cn, seriename)
sql = select([func.max(cset.c.insertion_date)]
).where(tstable.c.cset == cset.c.id)
return cn.execute(sql).scalar()
......@@ -335,35 +335,35 @@ class TimeSerie(SeriesServices):
# creation / update
def _create(self, cn, newts, name, author,
def _create(self, cn, newts, seriename, author,
metadata=None, insertion_date=None):
# initial insertion
if len(newts) == 0:
return None
snapshot = Snapshot(cn, self, name)
snapshot = Snapshot(cn, self, seriename)
csid = self._newchangeset(cn, author, insertion_date, metadata)
head = snapshot.create(newts)
value = {
'cset': csid,
'snapshot': head
}
table = self._make_ts_table(cn, name, newts)
table = self._make_ts_table(cn, seriename, newts)
cn.execute(table.insert().values(value))
self._finalize_insertion(cn, csid, name)
self._finalize_insertion(cn, csid, seriename)
L.info('first insertion of %s (size=%s) by %s',
name, len(newts), author)
seriename, len(newts), author)
return newts
def _update(self, cn, table, newts, name, author,
def _update(self, cn, table, newts, seriename, author,
metadata=None, insertion_date=None):
self._validate(cn, newts, name)
snapshot = Snapshot(cn, self, name)
self._validate(cn, newts, seriename)
snapshot = Snapshot(cn, self, seriename)
diff = self.diff(snapshot.last(newts.index.min(),
newts.index.max()),
newts)
if not len(diff):
L.info('no difference in %s by %s (for ts of size %s)',
name, author, len(newts))
seriename, author, len(newts))
return
csid = self._newchangeset(cn, author, insertion_date, metadata)
......@@ -373,10 +373,10 @@ class TimeSerie(SeriesServices):
'snapshot': head
}
cn.execute(table.insert().values(value))
self._finalize_insertion(cn, csid, name)
self._finalize_insertion(cn, csid, seriename)
L.info('inserted diff (size=%s) for ts %s by %s',
len(diff), name, author)
len(diff), seriename, author)
return diff
# ts serialisation
......@@ -418,14 +418,14 @@ class TimeSerie(SeriesServices):
)
return table
def _make_ts_table(self, cn, name, ts):
tablename = self._ts_table_name(name)
table = self._table_definition_for(name)
def _make_ts_table(self, cn, seriename, ts):
tablename = self._ts_table_name(seriename)
table = self._table_definition_for(seriename)
table.create(cn)
index = ts.index
inames = [name for name in index.names if name]
sql = self.schema.registry.insert().values(
name=name,
name=seriename,
table_name=tablename,
metadata={
'tzaware': tzaware_serie(ts),
......@@ -437,13 +437,13 @@ class TimeSerie(SeriesServices):
cn.execute(sql)
return table
def _get_ts_table(self, cn, name):
def _get_ts_table(self, cn, seriename):
reg = self.schema.registry
tablename = self._ts_table_name(name)
tablename = self._ts_table_name(seriename)
sql = reg.select().where(reg.c.table_name == tablename)
tid = cn.execute(sql).scalar()
if tid:
return self._table_definition_for(name)
return self._table_definition_for(seriename)
# changeset handling
......@@ -473,33 +473,33 @@ class TimeSerie(SeriesServices):
# insertion handling
def _validate(self, cn, ts, name):
def _validate(self, cn, ts, seriename):
if ts.isnull().all():
# ts erasure
return
tstype = ts.dtype
meta = self.metadata(cn, name)
meta = self.metadata(cn, seriename)
if tstype != meta['value_type']:
m = 'Type error when inserting {}, new type is {}, type in base is {}'.format(
name, tstype, meta['value_type'])
seriename, tstype, meta['value_type'])
raise Exception(m)
if ts.index.dtype.name != meta['index_type']:
raise Exception('Incompatible index types')
def _name_to_regid(self, cn, name):
regid = self.registry_map.get(name)
def _name_to_regid(self, cn, seriename):
regid = self.registry_map.get(seriename)
if regid is not None:
return regid
registry = self.schema.registry
sql = select([registry.c.id]).where(registry.c.name == name)
regid = self.registry_map[name] = cn.execute(sql).scalar()
sql = select([registry.c.id]).where(registry.c.name == seriename)
regid = self.registry_map[seriename] = cn.execute(sql).scalar()
return regid
def _finalize_insertion(self, cn, csid, name):
def _finalize_insertion(self, cn, csid, seriename):
table = self.schema.changeset_series
sql = table.insert().values(
cset=csid,
serie=self._name_to_regid(cn, name)
serie=self._name_to_regid(cn, seriename)
)
cn.execute(sql)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment