Commit 0b3992f0 authored by Aurélien Campéas's avatar Aurélien Campéas
Browse files

schema, tsio: better handling of serie name -> table name

This is necessary for proper serie renaming.
Some latent bugs wrt name handling were fixed.

Noteworthy:
* registry tablename stores the unqualified name (no namespace)
* we have more local caches to mitigate the small queries price
parent 391fa0892bf5
......@@ -8,6 +8,7 @@ import numpy as np
import pandas as pd
from tshistory.snapshot import Snapshot
from tshistory.util import rename_series
from tshistory.testutil import (
assert_df,
assert_group_equals,
......@@ -176,14 +177,14 @@ def test_differential(engine, tsh):
with engine.connect() as cn:
cn.execute('set search_path to "{0}.timeserie", {0}, public'.format(tsh.namespace))
allts = pd.read_sql("select name, table_name from registry "
"where name in ('ts_test', 'ts_mixte')",
allts = pd.read_sql("select seriename, table_name from registry "
"where seriename in ('ts_test', 'ts_mixte')",
cn)
assert_df("""
name table_name
0 ts_test {0}.timeserie.ts_test
1 ts_mixte {0}.timeserie.ts_mixte
seriename table_name
0 ts_test ts_test
1 ts_mixte ts_mixte
""".format(tsh.namespace), allts)
assert_df("""
......@@ -1087,3 +1088,27 @@ insertion_date value_date
2015-01-04 05:00:00+00:00 5.0
2015-01-04 06:00:00+00:00 6.0
""", hist)
def test_rename(engine, tsh):
if tsh.namespace == 'zzz':
return # this test can only run once
serie = genserie(datetime(2020, 1, 1), 'D', 3)
tsh.insert(engine, serie, 'foo', 'Babar')
tsh.insert(engine, serie, 'bar', 'Babar')
tsh.insert(engine, serie, 'quux', 'Babar')
rename_series(engine, {
'foo': 'new-foo',
'bar': 'new-bar'
})
tsh.resetcaches()
assert tsh.get(engine, 'foo') is None
assert tsh.get(engine, 'bar') is None
for name in ('quux', 'new-foo', 'new-bar'):
assert tsh.get(engine, name) is not None
......@@ -44,7 +44,7 @@ class tsschema(object):
registry = Table(
'registry', meta,
Column('id', Integer, primary_key=True),
Column('name', String, index=True, nullable=False, unique=True),
Column('seriename', String, index=True, nullable=False, unique=True),
Column('table_name', String, index=True,
nullable=False, unique=True),
Column('metadata', JSONB(none_as_null=True)),
......
......@@ -25,7 +25,7 @@ class Snapshot(SeriesServices):
self.cn = cn
self.tsh = tsh
self.seriename = seriename
self.name = self._tablename(seriename)
self.name = self.tsh._serie_to_tablename(cn, seriename)
@property
def namespace(self):
......
......@@ -28,6 +28,7 @@ class TimeSerie(SeriesServices):
schema = None
metadatacache = None
registry_map = None
serie_tablename = None
def __init__(self, namespace='tsh'):
self.namespace = namespace
......@@ -35,6 +36,7 @@ class TimeSerie(SeriesServices):
self.schema.define()
self.metadatacache = {}
self.registry_map = {}
self.serie_tablename = {}
def insert(self, cn, newts, seriename, author,
metadata=None,
......@@ -104,7 +106,7 @@ class TimeSerie(SeriesServices):
return self.metadatacache[seriename]
reg = self.schema.registry
sql = select([reg.c.metadata]).where(
reg.c.name == seriename
reg.c.seriename == seriename
)
meta = cn.execute(sql).scalar()
self.metadatacache[seriename] = meta
......@@ -118,7 +120,7 @@ class TimeSerie(SeriesServices):
meta.update(metadata)
reg = self.schema.registry
sql = reg.update().where(
reg.c.name == seriename
reg.c.seriename == seriename
).values(metadata=metadata)
cn.execute(sql)
......@@ -233,7 +235,7 @@ class TimeSerie(SeriesServices):
def changeset_at(self, cn, seriename, revdate, mode='strict'):
assert mode in ('strict', 'before', 'after')
cset = self.schema.changeset
table = self._table_definition_for(seriename)
table = self._table_definition_for(cn, seriename)
sql = select([table.c.cset]).where(
table.c.cset == cset.c.id
)
......@@ -268,7 +270,7 @@ class TimeSerie(SeriesServices):
cn.execute(sql)
# wipe the diffs
table = self._table_definition_for(seriename)
table = self._table_definition_for(cn, seriename)
cn.execute(table.delete().where(table.c.cset >= csid))
def info(self, cn):
......@@ -302,7 +304,7 @@ class TimeSerie(SeriesServices):
if limit:
sql = sql.limit(limit)
if names:
sql = sql.where(reg.c.name.in_(names))
sql = sql.where(reg.c.seriename.in_(names))
if authors:
sql = sql.where(cset.c.author.in_(authors))
if fromrev:
......@@ -340,6 +342,7 @@ class TimeSerie(SeriesServices):
# initial insertion
if len(newts) == 0:
return None
self._register_serie(cn, seriename, newts)
snapshot = Snapshot(cn, self, seriename)
csid = self._newchangeset(cn, author, insertion_date, metadata)
head = snapshot.create(newts)
......@@ -393,17 +396,30 @@ class TimeSerie(SeriesServices):
# serie table handling
def _ts_table_name(self, seriename):
seriename = self._tablename(seriename)
return '{}.timeserie.{}'.format(self.namespace, seriename)
def _serie_to_tablename(self, cn, seriename):
tablename = self.serie_tablename.get(seriename)
if tablename is not None:
return tablename
def _table_definition_for(self, seriename):
tablename = self._ts_table_name(seriename)
seriename = self._tablename(seriename)
table = TABLES.get(tablename)
reg = self.schema.registry
sql = select([reg.c.table_name]).where(reg.c.seriename == seriename)
tablename = cn.execute(sql).scalar()
if tablename is None:
# creation time
return
self.serie_tablename[seriename] = tablename
return tablename
def _table_definition_for(self, cn, seriename):
tablename = self._serie_to_tablename(cn, seriename)
if tablename is None:
# creation time
tablename = self._make_tablename(seriename)
fq_tablename = '{}.timeserie.{}'.format(self.namespace, tablename)
table = TABLES.get(fq_tablename)
if table is None:
TABLES[tablename] = table = Table(
seriename, self.schema.meta,
TABLES[fq_tablename] = table = Table(
tablename, self.schema.meta,
Column('id', Integer, primary_key=True),
Column('cset', Integer,
ForeignKey('{}.changeset.id'.format(self.namespace)),
......@@ -411,7 +427,7 @@ class TimeSerie(SeriesServices):
Column('snapshot', Integer,
ForeignKey('{}.snapshot.{}.id'.format(
self.namespace,
seriename)),
tablename)),
index=True),
schema='{}.timeserie'.format(self.namespace),
extend_existing=True
......@@ -419,14 +435,16 @@ class TimeSerie(SeriesServices):
return table
def _make_ts_table(self, cn, seriename, ts):
tablename = self._ts_table_name(seriename)
table = self._table_definition_for(seriename)
table = self._table_definition_for(cn, seriename)
table.create(cn)
return table
def _register_serie(self, cn, seriename, ts):
index = ts.index
inames = [name for name in index.names if name]
sql = self.schema.registry.insert().values(
name=seriename,
table_name=tablename,
seriename=seriename,
table_name=self._make_tablename(seriename),
metadata={
'tzaware': tzaware_serie(ts),
'index_type': index.dtype.name,
......@@ -434,16 +452,13 @@ class TimeSerie(SeriesServices):
'value_type': ts.dtypes.name
},
)
cn.execute(sql)
return table
regid = cn.execute(sql).inserted_primary_key[0]
self.registry_map[seriename] = regid
def _get_ts_table(self, cn, seriename):
reg = self.schema.registry
tablename = self._ts_table_name(seriename)
sql = reg.select().where(reg.c.table_name == tablename)
tid = cn.execute(sql).scalar()
if tid:
return self._table_definition_for(seriename)
tablename = self._serie_to_tablename(cn, seriename)
if tablename:
return self._table_definition_for(cn, tablename)
# changeset handling
......@@ -462,12 +477,12 @@ class TimeSerie(SeriesServices):
cset_serie = self.schema.changeset_series
reg = self.schema.registry
sql = select(
[reg.c.name]
[reg.c.seriename]
).where(cset_serie.c.cset == csid
).where(cset_serie.c.serie == reg.c.id)
return [
row.name
row.seriename
for row in cn.execute(sql).fetchall()
]
......@@ -492,7 +507,7 @@ class TimeSerie(SeriesServices):
return regid
registry = self.schema.registry
sql = select([registry.c.id]).where(registry.c.name == seriename)
sql = select([registry.c.id]).where(registry.c.seriename == seriename)
regid = self.registry_map[seriename] = cn.execute(sql).scalar()
return regid
......@@ -503,3 +518,10 @@ class TimeSerie(SeriesServices):
serie=self._name_to_regid(cn, seriename)
)
cn.execute(sql)
# don't use this
def resetcaches(self):
self.metadatacache.clear()
self.registry_map.clear()
self.serie_tablename.clear()
......@@ -106,11 +106,14 @@ class SeriesServices(object):
# serialization
def _tablename(self, name):
def _make_tablename(self, seriename):
""" compute the unqualified (no namespace) table name
from a serie name, to allow arbitrary serie names
"""
# postgresql table names are limited to 63 chars.
if len(name) > 63:
return hashlib.sha1(name.encode('utf-8')).hexdigest()
return name
if len(seriename) > 63:
return hashlib.sha1(seriename.encode('utf-8')).hexdigest()
return seriename
def _serialize(self, ts):
if ts is None:
......@@ -119,3 +122,20 @@ class SeriesServices(object):
def _deserialize(self, ts, name):
return fromjson(zlib.decompress(ts).decode('utf-8'), name)
def rename_series(engine, serie_map, namespace='tsh'):
from tshistory.schema import tsschema
schema = tsschema(namespace)
reg = schema.registry
with engine.connect() as cn:
for old, new in serie_map.items():
print('{} -> {}'.format(old, new))
sql = reg.update().where(
reg.c.seriename == old
).values(
seriename=new
)
cn.execute(sql)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment