Commit 29b55333 authored by Aurélien Campéas's avatar Aurélien Campéas
Browse files

style: use `cn` to denote db connections rather than `cnx`

Shorter and less meaningless.
parent b04e1a538e9c
......@@ -45,10 +45,10 @@ def test_changeset(engine):
with patch('tshistory.tsio.datetime') as mock_date:
mock_date.now.return_value = datetime(2020, 1, 1)
with engine.connect() as cnx:
with tso.newchangeset(cnx, 'babar'):
tso.insert(cnx, pd.Series(data, index=index), 'ts_values')
tso.insert(cnx, pd.Series(['a', 'b', 'c'], index=index), 'ts_othervalues')
with engine.connect() as cn:
with tso.newchangeset(cn, 'babar'):
tso.insert(cn, pd.Series(data, index=index), 'ts_values')
tso.insert(cn, pd.Series(['a', 'b', 'c'], index=index), 'ts_othervalues')
g = tso.get_group(engine, 'ts_values')
g2 = tso.get_group(engine, 'ts_othervalues')
......@@ -57,12 +57,12 @@ def test_changeset(engine):
with pytest.raises(AssertionError):
tso.insert(engine, pd.Series([2,3,4], index=index), 'ts_values')
with engine.connect() as cnx:
with engine.connect() as cn:
data.append(data.pop(0))
with tso.newchangeset(cnx, 'celeste'):
tso.insert(cnx, pd.Series(data, index=index), 'ts_values')
with tso.newchangeset(cn, 'celeste'):
tso.insert(cn, pd.Series(data, index=index), 'ts_values')
# below should be a noop
tso.insert(cnx, pd.Series(['a', 'b', 'c'], index=index), 'ts_othervalues')
tso.insert(cn, pd.Series(['a', 'b', 'c'], index=index), 'ts_othervalues')
g = tso.get_group(engine, 'ts_values')
assert ['ts_values'] == list(g.keys())
......@@ -414,10 +414,10 @@ def test_snapshots(engine):
tso = TimeSerie()
tso._snapshot_interval = 4
with engine.connect() as cnx:
with engine.connect() as cn:
for tscount in range(1, 11):
ts = genserie(datetime(2015, 1, 1), 'D', tscount, [1])
diff = tso.insert(cnx, ts, 'growing', 'babar')
diff = tso.insert(cn, ts, 'growing', 'babar')
assert diff.index[0] == diff.index[-1] == ts.index[-1]
diff = tso.insert(engine, ts, 'growing', 'babar')
......
......@@ -61,7 +61,7 @@ class TimeSerie(object):
# API : changeset, insert, get, delete
@contextmanager
def newchangeset(self, cnx, author, _insertion_date=None):
def newchangeset(self, cn, author, _insertion_date=None):
"""A context manager to allow insertion of several series within the
same changeset identifier
......@@ -72,11 +72,11 @@ class TimeSerie(object):
not part of the API.
"""
assert self._csid is None
self._csid = self._newchangeset(cnx, author, _insertion_date)
self._csid = self._newchangeset(cn, author, _insertion_date)
yield
del self._csid
def insert(self, cnx, newts, name, author=None,
def insert(self, cn, newts, name, author=None,
extra_scalars={}):
"""Create a new revision of a given time series
......@@ -101,7 +101,7 @@ class TimeSerie(object):
return
newts.name = name
table = self._get_ts_table(cnx, name)
table = self._get_ts_table(cn, name)
if isinstance(newts.index, pd.MultiIndex):
# we impose an order to survive rountrips
......@@ -112,28 +112,28 @@ class TimeSerie(object):
if newts.isnull().all():
return None
newts = newts[~newts.isnull()]
table = self._make_ts_table(cnx, name)
csid = self._csid or self._newchangeset(cnx, author)
table = self._make_ts_table(cn, name)
csid = self._csid or self._newchangeset(cn, author)
value = {
'csid': csid,
'snapshot': tojson(newts),
}
# callback for extenders
self._complete_insertion_value(value, extra_scalars)
cnx.execute(table.insert().values(value))
self._finalize_insertion(cnx, csid, name)
cn.execute(table.insert().values(value))
self._finalize_insertion(cn, csid, name)
L.info('First insertion of %s by %s', name, author)
return newts
diff, newsnapshot = self._compute_diff_and_newsnapshot(
cnx, table, newts, **extra_scalars
cn, table, newts, **extra_scalars
)
if diff is None:
L.info('No difference in %s by %s', name, author)
return
tip_id = self._get_tip_id(cnx, table)
csid = self._csid or self._newchangeset(cnx, author)
tip_id = self._get_tip_id(cn, table)
csid = self._csid or self._newchangeset(cn, author)
value = {
'csid': csid,
'diff': tojson(diff),
......@@ -142,63 +142,63 @@ class TimeSerie(object):
}
# callback for extenders
self._complete_insertion_value(value, extra_scalars)
cnx.execute(table.insert().values(value))
self._finalize_insertion(cnx, csid, name)
cn.execute(table.insert().values(value))
self._finalize_insertion(cn, csid, name)
if tip_id > 1 and tip_id % self._snapshot_interval:
self._purge_snapshot_at(cnx, table, tip_id)
self._purge_snapshot_at(cn, table, tip_id)
L.info('Inserted diff for ts %s by %s', name, author)
return diff
def get(self, cnx, name, revision_date=None):
def get(self, cn, name, revision_date=None):
"""Compute and return the serie of a given name
revision_date: datetime filter to get previous versions of the
serie
"""
table = self._get_ts_table(cnx, name)
table = self._get_ts_table(cn, name)
if table is None:
return
qfilter = []
if revision_date:
qfilter.append(lambda cset, _: cset.c.insertion_date <= revision_date)
current = self._build_snapshot_upto(cnx, table, qfilter)
current = self._build_snapshot_upto(cn, table, qfilter)
if current is not None:
current.name = name
return current
def get_group(self, cnx, name, revision_date=None):
csid = self._latest_csid_for(cnx, name)
def get_group(self, cn, name, revision_date=None):
csid = self._latest_csid_for(cn, name)
group = {}
for seriename in self._changeset_series(cnx, csid):
serie = self.get(cnx, seriename, revision_date)
for seriename in self._changeset_series(cn, csid):
serie = self.get(cn, seriename, revision_date)
if serie is not None:
group[seriename] = serie
return group
def latest_insertion_date(self, cnx, name):
def latest_insertion_date(self, cn, name):
cset = schema.changeset
tstable = self._get_ts_table(cnx, name)
tstable = self._get_ts_table(cn, name)
sql = select([func.max(cset.c.insertion_date)]
).where(tstable.c.csid == cset.c.id)
return cnx.execute(sql).scalar()
return cn.execute(sql).scalar()
def info(self, cnx):
def info(self, cn):
"""Gather global statistics on the current tshistory repository
"""
sql = 'select count(*) from registry'
stats = {'series count': cnx.execute(sql).scalar()}
stats = {'series count': cn.execute(sql).scalar()}
sql = 'select max(id) from changeset'
stats['changeset count'] = cnx.execute(sql).scalar()
stats['changeset count'] = cn.execute(sql).scalar()
sql = 'select distinct name from registry order by name'
stats['serie names'] = [row for row, in cnx.execute(sql).fetchall()]
stats['serie names'] = [row for row, in cn.execute(sql).fetchall()]
return stats
def log(self, cnx, limit=0, diff=False, names=None, fromrev=None, torev=None):
def log(self, cn, limit=0, diff=False, names=None, fromrev=None, torev=None):
"""Build a structure showing the history of all the series in the db,
per changeset, in chronological order.
"""
......@@ -223,14 +223,14 @@ class TimeSerie(object):
sql = sql.where(cset.c.id == cset_series.c.csid
).where(cset_series.c.serie == reg.c.name)
rset = cnx.execute(sql)
rset = cn.execute(sql)
for csetid, author, revdate in rset.fetchall():
log.append({'rev': csetid, 'author': author, 'date': revdate,
'names': self._changeset_series(cnx, csetid)})
'names': self._changeset_series(cn, csetid)})
if diff:
for rev in log:
rev['diff'] = {name: self._diff(cnx, rev['rev'], name)
rev['diff'] = {name: self._diff(cn, rev['rev'], name)
for name in rev['names']}
log.sort(key=lambda rev: rev['rev'])
......@@ -265,73 +265,73 @@ class TimeSerie(object):
extend_existing=True
)
def _make_ts_table(self, cnx, name):
def _make_ts_table(self, cn, name):
tablename = self._ts_table_name(name)
table = self._table_definition_for(name)
table.create(cnx)
table.create(cn)
sql = schema.registry.insert().values(
name=name,
table_name=tablename)
cnx.execute(sql)
cn.execute(sql)
return table
def _get_ts_table(self, cnx, name):
def _get_ts_table(self, cn, name):
reg = schema.registry
tablename = self._ts_table_name(name)
sql = reg.select().where(reg.c.table_name == tablename)
tid = cnx.execute(sql).scalar()
tid = cn.execute(sql).scalar()
if tid:
return self._table_definition_for(name)
# changeset handling
def _newchangeset(self, cnx, author, _insertion_date=None):
def _newchangeset(self, cn, author, _insertion_date=None):
table = schema.changeset
sql = table.insert().values(
author=author,
insertion_date=_insertion_date or datetime.now())
return cnx.execute(sql).inserted_primary_key[0]
return cn.execute(sql).inserted_primary_key[0]
def _latest_csid_for(self, cnx, name):
table = self._get_ts_table(cnx, name)
def _latest_csid_for(self, cn, name):
table = self._get_ts_table(cn, name)
sql = select([func.max(table.c.csid)])
return cnx.execute(sql).scalar()
return cn.execute(sql).scalar()
def _changeset_series(self, cnx, csid):
def _changeset_series(self, cn, csid):
cset_serie = schema.changeset_series
sql = select([cset_serie.c.serie]
).where(cset_serie.c.csid == csid)
return [seriename for seriename, in cnx.execute(sql).fetchall()]
return [seriename for seriename, in cn.execute(sql).fetchall()]
# insertion handling
def _get_tip_id(self, cnx, table):
def _get_tip_id(self, cn, table):
sql = select([func.max(table.c.id)])
return cnx.execute(sql).scalar()
return cn.execute(sql).scalar()
def _complete_insertion_value(self, value, extra_scalars):
pass
def _finalize_insertion(self, cnx, csid, name):
def _finalize_insertion(self, cn, csid, name):
table = schema.changeset_series
sql = table.insert().values(
csid=csid,
serie=name
)
cnx.execute(sql)
cn.execute(sql)
# snapshot handling
def _purge_snapshot_at(self, cnx, table, diffid):
cnx.execute(
def _purge_snapshot_at(self, cn, table, diffid):
cn.execute(
table.update(
).where(table.c.id == diffid
).values(snapshot=None)
)
def _compute_diff_and_newsnapshot(self, cnx, table, newts, **extra_scalars):
snapshot = self._build_snapshot_upto(cnx, table)
def _compute_diff_and_newsnapshot(self, cn, table, newts, **extra_scalars):
snapshot = self._build_snapshot_upto(cn, table)
diff = self._compute_diff(snapshot, newts)
if len(diff) == 0:
......@@ -341,7 +341,7 @@ class TimeSerie(object):
newsnapshot = self._apply_diff(snapshot, diff)
return diff, newsnapshot
def _find_snapshot(self, cnx, table, qfilter=(), column='snapshot'):
def _find_snapshot(self, cn, table, qfilter=(), column='snapshot'):
cset = schema.changeset
sql = select([table.c.id, table.c[column]]
).order_by(desc(table.c.id)
......@@ -354,13 +354,13 @@ class TimeSerie(object):
sql = sql.where(filtercb(cset, table))
try:
snapid, snapdata = cnx.execute(sql).fetchone()
snapid, snapdata = cn.execute(sql).fetchone()
except TypeError:
return None, None
return snapid, fromjson(snapdata, table.name)
def _build_snapshot_upto(self, cnx, table, qfilter=()):
snapid, snapshot = self._find_snapshot(cnx, table, qfilter)
def _build_snapshot_upto(self, cn, table, qfilter=()):
snapid, snapshot = self._find_snapshot(cn, table, qfilter)
if snapid is None:
return None
......@@ -377,7 +377,7 @@ class TimeSerie(object):
for filtercb in qfilter:
sql = sql.where(filtercb(cset, table))
alldiffs = pd.read_sql(sql, cnx)
alldiffs = pd.read_sql(sql, cn)
if len(alldiffs) == 0:
return snapshot
......@@ -392,8 +392,8 @@ class TimeSerie(object):
# diff handling
def _diff(self, cnx, csetid, name):
table = self._get_ts_table(cnx, name)
def _diff(self, cn, csetid, name):
table = self._get_ts_table(cn, name)
cset = schema.changeset
def filtercset(sql):
......@@ -401,7 +401,7 @@ class TimeSerie(object):
).where(cset.c.id == csetid)
sql = filtercset(select([table.c.id]))
tsid = cnx.execute(sql).scalar()
tsid = cn.execute(sql).scalar()
if tsid == 1:
sql = select([table.c.snapshot])
......@@ -409,7 +409,7 @@ class TimeSerie(object):
sql = select([table.c.diff])
sql = filtercset(sql)
return fromjson(cnx.execute(sql).scalar(), name)
return fromjson(cn.execute(sql).scalar(), name)
def _compute_diff(self, fromts, tots):
"""Compute the difference between fromts and tots
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment