Commit 699045ed authored by Aurélien Campéas's avatar Aurélien Campéas
Browse files

schema: allow to use several namespaces (or `tshistory instances`) within the same db

resolves #24
parent 55128374e986
from time import time
from pathlib import Path
import logging
from sqlalchemy import create_engine
from sqlalchemy import create_engine, MetaData
import pytest
......@@ -13,25 +14,26 @@ from tshistory import schema, tsio
DATADIR = Path(__file__).parent / 'test' / 'data'
DBURI = 'postgresql://localhost:5433/postgres'
schema.L.addHandler(logging.StreamHandler())
tsio.L.addHandler(logging.StreamHandler())
@pytest.fixture(scope='session')
def engine(request):
port = 5433
db.setup_local_pg_cluster(request, DATADIR, port)
uri = 'postgresql://localhost:{}/postgres'.format(port)
engine = create_engine(uri)
schema.reset(engine)
schema.init(engine)
e = create_engine(uri)
yield e
@pytest.fixture(params=['tsh'],
@pytest.fixture(params=['tsh', 'zzz'],
scope='session')
def tsh(request, engine):
tsh = tsio.TimeSerie(request.param)
namespace = request.param
schema.reset(engine, namespace)
schema.init(engine, MetaData(), namespace)
tsh = tsio.TimeSerie(namespace)
yield tsh
# build a ts using the logs from another
......@@ -53,15 +55,7 @@ def tsh(request, engine):
for name in allnames:
assert (tsh.get(engine, name) == tsh.get(engine, 'new_' + name)).all()
schema.reset(engine)
meta = schema.MetaData()
# fix the schema module
r, c, cs = schema.make_schema(meta)
schema.registry = r
schema.changeset = c
schema.changeset_series = cs
schema.meta = meta
schema.init(engine)
schema.reset(engine, namespace)
OUT = []
......
......@@ -239,7 +239,7 @@ def test_differential(engine, tsh):
""", tsh.get(engine, 'ts_mixte'))
with engine.connect() as cn:
cn.execute('set search_path to "tsh.timeserie", tsh, public')
cn.execute('set search_path to "{0}.timeserie", {0}, public'.format(tsh.namespace))
hist = pd.read_sql('select id, parent from ts_test order by id',
cn)
assert_df("""
......@@ -264,9 +264,9 @@ def test_differential(engine, tsh):
assert_df("""
name table_name
0 ts_test tsh.timeserie.ts_test
1 ts_mixte tsh.timeserie.ts_mixte
""", allts)
0 ts_test {0}.timeserie.ts_test
1 ts_mixte {0}.timeserie.ts_mixte
""".format(tsh.namespace), allts)
assert_df("""
2010-01-01 2.0
......@@ -395,7 +395,7 @@ def test_snapshots(engine, tsh):
assert diff is None
with engine.connect() as cn:
cn.execute('set search_path to "tsh.timeserie", tsh, public')
cn.execute('set search_path to "{}.timeserie"'.format(tsh.namespace))
df = pd.read_sql("select id from growing where snapshot is not null",
cn)
assert_df("""
......@@ -894,7 +894,7 @@ def test_bigdata(engine, tracker, tsh):
tshclass = tsh.__class__.__name__
with engine.connect() as cn:
cn.execute('set search_path to "tsh.timeserie"')
cn.execute('set search_path to "{}.timeserie"'.format(tsh.namespace))
df = pd.read_sql('select id, diff, snapshot from big order by id', cn)
for attr in ('diff', 'snapshot'):
......@@ -924,7 +924,7 @@ def test_lots_of_diffs(engine, tracker, tsh):
tshclass = tsh.__class__.__name__
with engine.connect() as cn:
cn.execute('set search_path to "tsh.timeserie"')
cn.execute('set search_path to "{}.timeserie"'.format(tsh.namespace))
df = pd.read_sql("select id, diff, snapshot from manydiffs order by id ",
cn)
for attr in ('diff', 'snapshot'):
......
......@@ -180,7 +180,7 @@ def restore(out_path, db_uri):
def init_db(db_uri):
"""initialize an new db."""
init_schema(create_engine(db_uri))
init_schema(create_engine(db_uri), meta)
if __name__ == '__main__':
......
import logging
from sqlalchemy import (Table, Column, Integer, String, MetaData, DateTime,
ForeignKey, PrimaryKeyConstraint)
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.schema import CreateSchema
L = logging.getLogger('tshistory.schema')
meta = MetaData()
def make_schema(meta, namespace='tsh'):
# schemas registry
SCHEMAS = {}
class tsschema(object):
namespace = 'tsh'
meta = None
registry = None
changeset = None
changeset_series = None
def __init__(self, namespace='tsh'):
self.namespace = namespace
def build(self, meta):
L.info('build schema %s', self.namespace)
self.meta = meta
registry = Table(
'registry', meta,
Column('id', Integer, primary_key=True),
Column('name', String, index=True, nullable=False, unique=True),
Column('table_name', String, index=True, nullable=False, unique=True),
Column('metadata', JSONB(none_as_null=True)),
schema=self.namespace
)
changeset = Table(
'changeset', meta,
Column('id', Integer, primary_key=True),
Column('author', String, index=True, nullable=False),
Column('insertion_date', DateTime, index=True, nullable=False),
Column('metadata', JSONB(none_as_null=True)),
schema=self.namespace
)
registry = Table(
'registry', meta,
Column('id', Integer, primary_key=True),
Column('name', String, index=True, nullable=False, unique=True),
Column('table_name', String, index=True, nullable=False, unique=True),
Column('metadata', JSONB(none_as_null=True)),
schema=namespace
)
changeset_series = Table(
'changeset_series', meta,
Column('csid', Integer,
ForeignKey('{}.changeset.id'.format(self.namespace)),
index=True, nullable=False),
Column('serie', String,
ForeignKey('{}.registry.name'.format(self.namespace)),
index=True, nullable=False),
PrimaryKeyConstraint(
'csid', 'serie',
name='{}_changeset_series_pk'.format(self.namespace)),
schema=self.namespace
)
self.registry = registry
self.changeset = changeset
self.changeset_series = changeset_series
SCHEMAS[self.namespace] = self
changeset = Table(
'changeset', meta,
Column('id', Integer, primary_key=True),
Column('author', String, index=True, nullable=False),
Column('insertion_date', DateTime, index=True, nullable=False),
Column('metadata', JSONB(none_as_null=True)),
schema=namespace
)
def exists(self, engine):
return engine.execute('select exists(select schema_name '
'from information_schema.schemata '
'where schema_name = %(name)s)',
name=self.namespace
).scalar()
changeset_series = Table(
'changeset_series', meta,
Column('csid', Integer, ForeignKey('{}.changeset.id'.format(namespace)),
index=True, nullable=False),
Column('serie', String, ForeignKey('{}.registry.name'.format(namespace)),
index=True, nullable=False),
PrimaryKeyConstraint('csid', 'serie',
name='{}_changeset_series_pk'.format(namespace)),
schema=namespace
)
def create(self, engine):
L.info('create schema %s %s', self.namespace, self.exists(engine))
engine.execute(CreateSchema(self.namespace))
engine.execute(CreateSchema('{}.timeserie'.format(self.namespace)))
self.registry.create(engine)
self.changeset.create(engine)
self.changeset_series.create(engine)
return registry, changeset, changeset_series
def destroy(self, engine):
L.info('destroy schema %s', self.namespace)
engine.execute('drop schema if exists "{}.timeserie" cascade'.format(self.namespace))
engine.execute('drop schema if exists {} cascade'.format(self.namespace))
del self.meta
del self.registry
del self.changeset
del self.changeset_series
# default schema
registry, changeset, changeset_series = make_schema(meta)
# create and register default db structure
tsschema().build(meta)
def init(engine, namespace='tsh'):
from sqlalchemy.schema import CreateSchema
engine.execute(CreateSchema(namespace))
engine.execute(CreateSchema('{}.timeserie'.format(namespace)))
registry.create(engine)
changeset.create(engine)
changeset_series.create(engine)
def init(engine, meta, namespace='tsh'):
schem = tsschema(namespace)
if schem.exists(engine):
L.warning('cannot create already existing namespace %s', namespace)
return
schem.build(meta)
schem.create(engine)
def reset(engine, namespace='tsh'):
engine.execute('drop schema if exists "{}.timeserie" cascade'.format(namespace))
engine.execute('drop schema if exists {} cascade'.format(namespace))
if namespace not in SCHEMAS:
L.warning('unknown ns %s cannot be reset', namespace)
return
schem = SCHEMAS.pop(namespace)
schem.destroy(engine)
......@@ -11,7 +11,7 @@ from sqlalchemy import Table, Column, Integer, ForeignKey
from sqlalchemy.sql.expression import select, func, desc
from sqlalchemy.dialects.postgresql import BYTEA
from tshistory import schema
from tshistory.schema import SCHEMAS
L = logging.getLogger('tshistory.tsio')
......@@ -62,9 +62,11 @@ class TimeSerie(object):
_snapshot_interval = 10
_precision = 1e-14
namespace = 'tsh'
schema = None
def __init__(self, namespace='tsh'):
self.namespace = namespace
self.schema = SCHEMAS[namespace]
# API : changeset, insert, get, delete
@contextmanager
......@@ -215,7 +217,7 @@ class TimeSerie(object):
return self._get_ts_table(cn, name) is not None
def latest_insertion_date(self, cn, name):
cset = schema.changeset
cset = self.schema.changeset
tstable = self._get_ts_table(cn, name)
sql = select([func.max(cset.c.insertion_date)]
).where(tstable.c.csid == cset.c.id)
......@@ -239,7 +241,11 @@ class TimeSerie(object):
per changeset, in chronological order.
"""
log = []
cset, cset_series, reg = schema.changeset, schema.changeset_series, schema.registry
cset, cset_series, reg = (
self.schema.changeset,
self.schema.changeset_series,
self.schema.registry
)
sql = select([cset.c.id, cset.c.author, cset.c.insertion_date]
).distinct().order_by(desc(cset.c.id))
......@@ -300,7 +306,7 @@ class TimeSerie(object):
def _table_definition_for(self, seriename):
return Table(
seriename, schema.meta,
seriename, self.schema.meta,
Column('id', Integer, primary_key=True),
Column('csid', Integer,
ForeignKey('{}.changeset.id'.format(self.namespace)),
......@@ -324,14 +330,14 @@ class TimeSerie(object):
tablename = self._ts_table_name(name)
table = self._table_definition_for(name)
table.create(cn)
sql = schema.registry.insert().values(
sql = self.schema.registry.insert().values(
name=name,
table_name=tablename)
cn.execute(sql)
return table
def _get_ts_table(self, cn, name):
reg = schema.registry
reg = self.schema.registry
tablename = self._ts_table_name(name)
sql = reg.select().where(reg.c.table_name == tablename)
tid = cn.execute(sql).scalar()
......@@ -341,7 +347,7 @@ class TimeSerie(object):
# changeset handling
def _newchangeset(self, cn, author, _insertion_date=None):
table = schema.changeset
table = self.schema.changeset
sql = table.insert().values(
author=author,
insertion_date=_insertion_date or datetime.now())
......@@ -353,7 +359,7 @@ class TimeSerie(object):
return cn.execute(sql).scalar()
def _changeset_series(self, cn, csid):
cset_serie = schema.changeset_series
cset_serie = self.schema.changeset_series
sql = select([cset_serie.c.serie]
).where(cset_serie.c.csid == csid)
......@@ -369,7 +375,7 @@ class TimeSerie(object):
pass
def _finalize_insertion(self, cn, csid, name):
table = schema.changeset_series
table = self.schema.changeset_series
sql = table.insert().values(
csid=csid,
serie=name
......@@ -410,7 +416,7 @@ class TimeSerie(object):
return diff, newsnapshot
def _find_snapshot(self, cn, table, qfilter=(), column='snapshot'):
cset = schema.changeset
cset = self.schema.changeset
sql = select([table.c.id, table.c[column]]
).order_by(desc(table.c.id)
).limit(1
......@@ -432,7 +438,7 @@ class TimeSerie(object):
if snapid is None:
return None
cset = schema.changeset
cset = self.schema.changeset
sql = select([table.c.id,
table.c.diff,
table.c.parent,
......@@ -462,7 +468,7 @@ class TimeSerie(object):
def _diff(self, cn, csetid, name):
table = self._get_ts_table(cn, name)
cset = schema.changeset
cset = self.schema.changeset
def filtercset(sql):
return sql.where(table.c.csid == cset.c.id
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment