Commit 4dbd5e8d authored by Aurélien Campéas's avatar Aurélien Campéas
Browse files

tsio: always delegate snapshot construction to _build_snapshot_upto

Also, the initial record of a ts really si a snapshot, not
a diff. Fix that.
parent ba29c87d96bb
......@@ -61,7 +61,7 @@ def test_changeset(engine):
2017-01-03 c
""".strip() == tso.get(engine, 'ts_othervalues').to_string().strip()
tso.delete_last_changeset_for(engine, 'ts_values')
assert tso.delete_last_changeset_for(engine, 'ts_values')
assert """
2017-01-01 1.0
......@@ -75,12 +75,12 @@ def test_changeset(engine):
2017-01-03 c
""".strip() == tso.get(engine, 'ts_othervalues').to_string().strip()
tso.delete_last_changeset_for(engine, 'ts_values')
assert tso.delete_last_changeset_for(engine, 'ts_values')
assert tso.get(engine, 'ts_values') is None
assert tso.get(engine, 'ts_othervalues') is None
assert not tso.delete_last_changeset_for(engine, 'ts_values')
def test_differential(engine):
# instantiate one time serie handler object
......@@ -250,7 +250,7 @@ def test_differential(engine):
revision_date=datetime.now()).to_string().strip()
# test striping the last diff
tso.delete_last_changeset_for(engine, 'ts_mixte')
assert tso.delete_last_changeset_for(engine, 'ts_mixte')
assert """
2010-01-01 2.0
......
......@@ -82,12 +82,10 @@ class TimeSerie(object):
if table is None:
# initial insertion
table = self._make_ts_table(cnx, name)
jsonts = tojson(newts)
csid = self._csid or self._newchangeset(cnx, author)
value = {
'csid': csid,
'data': jsonts,
'snapshot': jsonts,
'snapshot': tojson(newts),
}
# callback for extenders
self._complete_insertion_value(value, extra_scalars)
......@@ -116,11 +114,12 @@ class TimeSerie(object):
cnx.execute(table.insert().values(value))
self._finalize_insertion(cnx, csid, name)
cnx.execute(
table.update(
).where(table.c.id == tip_id
).values(snapshot=None)
)
if tip_id > 1:
cnx.execute(
table.update(
).where(table.c.id == tip_id
).values(snapshot=None)
)
L.info('Insertion differential of %s by %s', name, author)
def get(self, cnx, name, revision_date=None):
......@@ -134,12 +133,10 @@ class TimeSerie(object):
if table is None:
return
if revision_date is None:
current = self._read_latest_snapshot(cnx, table)
else:
current = self._build_snapshot_upto(
cnx, table, lambda cset, _: cset.c.insertion_date <= revision_date
)
qfilter = []
if revision_date:
qfilter.append(lambda cset, _: cset.c.insertion_date <= revision_date)
current = self._build_snapshot_upto(cnx, table, *qfilter)
if current is not None:
current.name = name
......@@ -173,16 +170,6 @@ class TimeSerie(object):
table.c.csid == csid
)
cnx.execute(sql)
# apply on flat
newsnapshot = self._build_snapshot_upto(cnx, table)
parent_id = self._get_tip_id(cnx, table)
update_snapshot_sql = table.update(
).where(table.c.id == parent_id
).values(snapshot=tojson(newsnapshot))
cnx.execute(update_snapshot_sql)
return True
# /API
......@@ -199,7 +186,8 @@ class TimeSerie(object):
Column('id', Integer, primary_key=True),
Column('csid', Integer, ForeignKey('ts_changeset.id'),
nullable=False),
Column('data', JSONB, nullable=False),
# constraint: there is either .data or .snapshot
Column('data', JSONB),
Column('snapshot', JSONB),
Column('parent',
Integer,
......@@ -270,7 +258,7 @@ class TimeSerie(object):
def _compute_diff_and_newsnapshot(self, cnx, table, newts, **extra_scalars):
# NOTE: this depends on the snapshot being always maintained
# at the top-level
snapshot = self._read_latest_snapshot(cnx, table)
snapshot = self._build_snapshot_upto(cnx, table)
# this is the diff between our computed parent
diff = self._compute_diff(snapshot, newts)
......@@ -281,24 +269,36 @@ class TimeSerie(object):
newsnapshot = self._apply_diff(snapshot, diff)
return diff, newsnapshot
def _read_latest_snapshot(self, cnx, table):
sql = select([table.c.snapshot]
).order_by(desc(table.c.id)
).limit(1)
snapjson = cnx.execute(sql).scalar()
if snapjson is None:
return
return fromjson(snapjson)
def _find_snapshot(self, cnx, table, *qfilter):
cset = schema.ts_changeset
sql = select([func.max(table.c.id), table.c.snapshot]
).group_by(table.c.id, table.c.snapshot
).where(table.c.csid == cset.c.id
).where(table.c.snapshot != None)
if qfilter:
for filtercb in qfilter:
sql = sql.where(filtercb(cset, table))
try:
snapid, snapdata = cnx.execute(sql).fetchone()
except TypeError:
return None, None
return snapid, fromjson(snapdata)
def _build_snapshot_upto(self, cnx, table, *qfilter):
snapid, snapshot = self._find_snapshot(cnx, table, *qfilter)
if snapid is None:
return None
cset = schema.ts_changeset
sql = select([table.c.id,
table.c.data,
table.c.parent,
cset.c.insertion_date]
).order_by(table.c.id
).where(table.c.csid == cset.c.id)
).where(table.c.csid == cset.c.id
).where(table.c.id > snapid)
for filtercb in qfilter:
sql = sql.where(filtercb(cset, table))
......@@ -306,11 +306,11 @@ class TimeSerie(object):
alldiffs = pd.read_sql(sql, cnx)
if len(alldiffs) == 0:
return None
return snapshot
# initial ts
ts = fromjson(alldiffs.iloc[0]['data'])
for _, row in alldiffs[1:].iterrows():
ts = snapshot
for _, row in alldiffs.iterrows():
diff = fromjson(row['data'])
ts = self._apply_diff(ts, diff)
assert ts.index.dtype.name == 'datetime64[ns]' or len(ts) == 0
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment