Commit e6f54e4e authored by Aurélien Campéas's avatar Aurélien Campéas
Browse files

drop sqlalchemy.core for schema and query building

While doing so we also get rid of
* old migrations
* the rename utility
parent 306ee2ff89ab
from pathlib import Path
import logging
from sqlalchemy import create_engine, MetaData
from sqlalchemy import create_engine
import pandas as pd
import pytest
......@@ -39,7 +39,7 @@ def tsh(request, engine):
namespace = request.param
sch = schema.tsschema(namespace)
sch.destroy(engine)
schema.init_schemas(engine, MetaData(), namespace)
schema.init_schemas(engine, namespace)
if namespace == 'zzz':
Snapshot._max_bucket_size = 5
......@@ -51,7 +51,7 @@ def ptsh(engine):
sch = schema.tsschema()
sch.destroy(engine)
schema.register_schema(sch)
schema.init_schemas(engine, MetaData())
schema.init_schemas(engine)
return tsio.TimeSerie()
......
# sqlalchemy patch
from sqlalchemy.sql import elements, expression
from sqlalchemy.dialects.postgresql.base import PGDDLCompiler
elements.NONE_NAME = elements._NONE_NAME
def visit_create_index(self, create):
preparer = self.preparer
index = create.element
self._verify_index_table(index)
text = "CREATE "
if index.unique:
text += "UNIQUE "
text += "INDEX "
if self.dialect._supports_create_index_concurrently:
concurrently = index.dialect_options['postgresql']['concurrently']
if concurrently:
text += "CONCURRENTLY "
# PATCH
if index.name is None or index.name is elements.NONE_NAME:
# -> no name
text += "ON %s" % preparer.format_table(index.table)
else:
text += "%s ON %s " % (
self._prepared_index_name(index,
include_schema=False),
preparer.format_table(index.table)
)
# /PATCH
using = index.dialect_options['postgresql']['using']
if using:
text += "USING %s " % preparer.quote(using)
ops = index.dialect_options["postgresql"]["ops"]
text += "(%s)" \
% (
', '.join([
self.sql_compiler.process(
expr.self_group()
if not isinstance(expr, expression.ColumnClause)
else expr,
include_table=False, literal_binds=True) +
(
(' ' + ops[expr.key])
if hasattr(expr, 'key')
and expr.key in ops else ''
)
for expr in index.expressions
])
)
withclause = index.dialect_options['postgresql']['with']
if withclause:
text += " WITH (%s)" % (', '.join(
['%s = %s' % storage_parameter
for storage_parameter in withclause.items()]))
tablespace_name = index.dialect_options['postgresql']['tablespace']
if tablespace_name:
text += " TABLESPACE %s" % preparer.quote(tablespace_name)
whereclause = index.dialect_options["postgresql"]["where"]
if whereclause is not None:
where_compiled = self.sql_compiler.process(
whereclause, include_table=False,
literal_binds=True)
text += " WHERE " + where_compiled
return text
PGDDLCompiler.visit_create_index = visit_create_index
......@@ -8,7 +8,7 @@ from dateutil import parser
from json import dumps
import click
from sqlalchemy import create_engine, MetaData
from sqlalchemy import create_engine
from dateutil.parser import parse as temporal
import pandas as pd
......@@ -16,8 +16,7 @@ from tshistory.tsio import TimeSerie
from tshistory.util import (
delete_series,
find_dburi,
fromjson,
rename_series
fromjson
)
import tshistory.schema
......@@ -169,7 +168,11 @@ def rename(db_uri, mapfile, namespace='tsh'):
for p in pd.read_csv(mapfile).itertuples()
}
engine = create_engine(find_dburi(db_uri))
rename_series(engine, seriesmap, namespace)
tsh = TimeSerie(namespace)
for old, new in seriesmap.items():
with engine.begin() as cn:
print('rename', old, '->', new)
tsh.rename(cn, old, new)
@tsh.command()
......@@ -214,9 +217,6 @@ def init_db(db_uri, reset=False, namespace='tsh'):
assert schem.exists(engine)
schem.destroy(engine)
# needed because of del self.meta & return in define() :
schem.meta = MetaData()
schem.create(engine)
......@@ -268,161 +268,6 @@ def shell(db_uri, namespace='tsh'):
import pdb; pdb.set_trace()
# repair
@tsh.command(name='repair-start-end')
@click.argument('db-uri')
@click.option('--namespace', default='tsh')
@click.option('--processes', default=1)
@click.option('--series', default=None)
def repair_start_end(db_uri, namespace='tsh', processes=1, series=None):
from tshistory.migration import SnapshotMigrator
if series:
series = [series]
else:
engine = create_engine(find_dburi(db_uri))
sql = 'select seriename from "{}".registry order by seriename'.format(namespace)
series = [row.seriename for row in engine.execute(sql)]
engine.dispose()
def _migrate(seriename):
e = create_engine(find_dburi(db_uri), pool_size=1)
tsh = TimeSerie(namespace)
with e.begin() as cn:
m = SnapshotMigrator(cn, tsh, seriename)
m.fix_start_end()
e.dispose()
def migrate(seriename):
try:
_migrate(seriename)
except Exception:
import traceback as tb
tb.print_exc()
print(seriename, 'FAIL')
def run(proc, series):
seriescount = len(series)
pid = os.getpid()
for idx, ts in enumerate(series, 1):
print('migrate {} proc: {} [{}/{}]'.format(ts, proc, idx, seriescount))
migrate(ts)
if processes == 1:
run(0, series)
return
def chunks(l, n):
for i in range(0, len(l), n):
yield l[i:i + n]
# try to distribute the payload randomly as in practice it is definitely
# *not* evenly dsitributed along the lexical order ...
random.shuffle(series)
chunks = list(chunks(series, len(series) // processes))
print('running with {} processes'.format(len(chunks)))
pids = []
for idx, chunk in enumerate(chunks):
pid = os.fork()
if not pid:
# please the eyes
chunk.sort()
run(idx, chunk)
return
pids.append(pid)
try:
for pid in pids:
print('waiting for', pid)
os.waitpid(pid, 0)
except KeyboardInterrupt:
for pid in pids:
print('kill', pid)
os.kill(pid, signal.SIGINT)
# migration
@tsh.command(name='migrate-0.3-to-0.4')
@click.argument('db-uri')
@click.option('--namespace', default='tsh')
@click.option('--processes', default=1)
@click.option('--tryserie', default=None)
def migrate_zerodotthree_to_zerodotfour(db_uri, namespace='tsh', processes=1, tryserie=None):
""" in-place migration for going from 0.3 to 0.4
Will populate the start/end fields on series tables and
update the chunk representation on snapshots tables.
"""
from tshistory.migration import SnapshotMigrator
if tryserie:
series = [tryserie]
else:
engine = create_engine(find_dburi(db_uri))
sql = 'select seriename from "{}".registry order by seriename'.format(namespace)
series = [row.seriename for row in engine.execute(sql)]
engine.dispose()
def _migrate(seriename):
e = create_engine(find_dburi(db_uri), pool_size=1)
tsh = TimeSerie(namespace)
with e.begin() as cn:
m = SnapshotMigrator(cn, tsh, seriename)
m.migratechunks()
with e.begin() as cn:
m = SnapshotMigrator(cn, tsh, seriename)
m.migrateseries()
e.dispose()
def migrate(seriename):
try:
_migrate(seriename)
except Exception:
import traceback as tb
tb.print_exc()
print(seriename, 'FAIL')
def run(proc, series):
seriescount = len(series)
pid = os.getpid()
for idx, ts in enumerate(series, 1):
print('migrate {} proc: {} [{}/{}]'.format(ts, proc, idx, seriescount))
migrate(ts)
if processes == 1:
run(series)
return
def chunks(l, n):
for i in range(0, len(l), n):
yield l[i:i + n]
# try to distribute the payload randomly as in practice it is definitely
# *not* evenly dsitributed along the lexical order ...
random.shuffle(series)
chunks = list(chunks(series, len(series) // processes))
print('running with {} processes'.format(len(chunks)))
pids = []
for idx, chunk in enumerate(chunks):
pid = os.fork()
if not pid:
# please the eyes
chunk.sort()
run(idx, chunk)
return
pids.append(pid)
try:
for pid in pids:
print('waiting for', pid)
os.waitpid(pid, 0)
except KeyboardInterrupt:
for pid in pids:
print('kill', pid)
os.kill(pid, signal.SIGINT)
for ep in iter_entry_points('tshistory.subcommands'):
tsh.add_command(ep.load())
......
import struct
import zlib
import numpy as np
import pandas as pd
from sqlalchemy import Table, Column, Integer, ForeignKey, Index, bindparam
from sqlalchemy.sql.elements import NONE_NAME
from sqlalchemy.sql.expression import select, asc, desc
from sqlalchemy.dialects.postgresql import BYTEA, TIMESTAMP
from tshistory.util import fromjson
from tshistory.snapshot import Snapshot
TABLES = {}
class SnapshotMigrator:
__slots__ = ('cn', 'name', 'seriename', 'tsh')
def __init__(self, cn, tsh, seriename):
self.cn = cn
self.tsh = tsh
self.seriename = seriename
self.name = self.tsh._serie_to_tablename(cn, seriename)
@property
def namespace(self):
return '{}.snapshot'.format(self.tsh.namespace)
@property
def table(self):
tablename = '{}.{}'.format(self.namespace, self.name)
table = TABLES.get(tablename)
if table is None:
TABLES[tablename] = table = Table(
self.name, self.tsh.schema.meta,
Column('id', Integer, primary_key=True),
Column('start', TIMESTAMP(timezone=True)),
Column('end', TIMESTAMP(timezone=True)),
Column('chunk', BYTEA),
Column('parent', Integer,
ForeignKey('{}.{}.id'.format(
self.namespace,
self.name))),
Index(NONE_NAME, 'start'),
Index(NONE_NAME, 'end'),
Index(NONE_NAME, 'parent'),
schema=self.namespace,
keep_existing=True
)
return table
# new serializer
def _serialize(self, ts):
if ts is None:
return None
# use `view` as a workarround for "cannot include dtype 'M' in a buffer"
indexes = ts.index.view(np.uint8).data.tobytes()
indexes_size = struct.pack('!L', len(indexes))
if self.isstr:
# string separatd by 0 and nones/nans represented as 3 (ETX)
END, ETX = b'\0'.decode(), b'\3'.decode()
# first, safety belt
for s in ts.values:
if not pd.isnull(s):
assert END not in s and ETX not in s
values = b'\0'.join(b'\3' if pd.isnull(v) else v.encode('utf-8')
for v in ts.values)
else:
values = ts.values.data.tobytes()
return zlib.compress(indexes_size + indexes + values)
# old
def _deserialize(self, bytestring):
return zlib.decompress(bytestring)
def _ensure_tz_consistency(self, ts):
"""Return timeserie with tz aware index or not depending on metadata
tzaware.
"""
assert ts.name is not None
metadata = self.tsh.metadata(self.cn, ts.name)
if metadata and metadata.get('tzaware', False):
return ts.tz_localize('UTC')
return ts
def _chunk_to_ts(self, chunk):
body = b'{' + self._deserialize(chunk) + b'}'
return self._ensure_tz_consistency(
fromjson(body.decode('utf-8'), self.seriename)
)
# /serialisation
allsql = """
select chunk.id, chunk.chunk
from "{namespace}"."{table}" as chunk
"""
def allrawchunks(self):
sql = self.allsql.format(
namespace=self.namespace,
table=self.name
)
return self.cn.execute(sql)
@property
def isstr(self):
return self.tsh.metadata(self.cn, self.seriename)['value_type'] == 'object'
def migratechunks(self):
table = self.table
chunks = self.allrawchunks()
for idx, (cid, chunk) in enumerate(chunks):
ts = self._chunk_to_ts(chunk)
sql = table.update().values(
chunk=self._serialize(ts)
).where(
table.c.id == cid
)
self.cn.execute(sql)
self.tsh.update_metadata(
self.cn, self.seriename,
{
'index_dtype': ts.index.dtype.str,
'value_dtype': ts.dtypes.str if not self.isstr else '|O'
},
internal=True
)
print('chunks for serie {}: {}'.format(self.seriename, idx + 1))
rawdatessql = """
with recursive allchunks as (
select chunks.id as cid,
chunks.parent as parent,
chunks.start as start,
chunks."end" as "end"
from "{namespace}"."{table}" as chunks
where chunks.id in ({heads})
union
select chunks.id as cid,
chunks.parent as parent,
chunks.start as start,
chunks."end" as "end"
from "{namespace}"."{table}" as chunks
join allchunks on chunks.id = allchunks.parent
)
select cid, parent, start, "end" from allchunks
"""
def alldates(self, heads):
sql = self.rawdatessql.format(
namespace=self.namespace,
table=self.name,
heads=','.join(str(head) for head in heads)
)
res = self.cn.execute(sql)
dates = {cid: (parent, start, end)
for cid, parent, start, end in res.fetchall()}
return dates
def findall_startend(self, csets):
snap = Snapshot(self.cn, self.tsh, self.seriename)
sql = snap.cset_heads_query((lambda cset: cset.c.id >= min(csets),
lambda cset: cset.c.id <= max(csets)),
order=asc)
cset_snap_map = {
row.cset: row.snapshot
for row in self.cn.execute(sql).fetchall()
}
alldates = self.alldates(
sorted(cset_snap_map.values())
)
series = []
for cset in csets:
head = cset_snap_map[cset]
start, end = None, None
while True:
parent, cstart, cend = alldates.get(head, (None, None, None))
if cstart is None:
break
start = min(start or cstart, cstart)
end = max(end or cend, cend)
head = parent
series.append((cset, start, end))
return series
def migrateseries(self):
sql = 'select id, cset from "{ns}.timeserie"."{name}"'.format(
ns=self.tsh.namespace,
name=self.name
)
revs = [row for row in self.cn.execute(sql)]
sql = ('alter table "{}.timeserie"."{}" '
'add column "start" timestamp, '
'add column "end" timestamp '
).format(self.tsh.namespace, self.name)
self.cn.execute(sql)
sql = 'create index on "{}.timeserie"."{}" (start)'.format(self.tsh.namespace, self.name)
self.cn.execute(sql)
sql = 'create index on "{}.timeserie"."{}" ("end")'.format(self.tsh.namespace, self.name)
self.cn.execute(sql)
fromto = self.findall_startend([row.cset for row in revs])
cset_id_map = {
row.cset: row.id
for row in revs
}
table = self.tsh._get_ts_table(self.cn, self.seriename)
sql = table.update().where(
table.c.id == bindparam('id_')
).values({
'start': bindparam('start'),
'end': bindparam('end')
})
self.cn.execute(sql, [
{'id_': cset_id_map[cset], 'start': start, 'end': end}
for cset, start, end in fromto
])
print('versions for serie {}: {}'.format(self.seriename, len(fromto)))
def fix_start_end(self):
sql = 'select id, cset from "{ns}.timeserie"."{name}"'.format(
ns=self.tsh.namespace,
name=self.name
)
revs = [row for row in self.cn.execute(sql)]
fromto = self.findall_startend([row.cset for row in revs])
cset_id_map = {
row.cset: row.id
for row in revs
}
table = self.tsh._get_ts_table(self.cn, self.seriename)
sql = table.update().where(
table.c.id == bindparam('id_')
).values({
'start': bindparam('start'),
'end': bindparam('end')
})
self.cn.execute(sql, [
{'id_': cset_id_map[cset], 'start': start, 'end': end}
for cset, start, end in fromto
])
print('versions for serie {}: {}'.format(self.seriename, len(fromto)))
import logging
from threading import Lock
from pathlib import Path
from sqlalchemy import (Table, Column, Integer, String, MetaData, TIMESTAMP,
ForeignKey, UniqueConstraint)
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.schema import CreateSchema
from tshistory.util import unilist
from tshistory.util import unilist, sqlfile
L = logging.getLogger('tshistory.schema')
CREATEFILE = Path(__file__).parent / 'schema.sql'
# schemas registry
_SCHLOCK = Lock()
......@@ -23,9 +20,9 @@ def register_schema(schema):
_SCHEMA_HANDLERS.append(schema)
def init_schemas(engine, meta, namespace='tsh'):
def init_schemas(engine, namespace='tsh'):
for schema in _SCHEMA_HANDLERS:
schema.define(meta)
schema.define()
schema.create(engine)
......@@ -45,7 +42,6 @@ def _delete_schema(engine, ns):
class tsschema(object):
namespace = 'tsh'
meta = None
registry = None
changeset = None
changeset_series = None
......@@ -62,51 +58,11 @@ class tsschema(object):
self.namespace = namespace
register_schema(self)
def define(self, meta=MetaData()):
def define(self):
with _SCHLOCK:
if self.namespace in self.SCHEMAS:
return
L.info('build schema %s', self.namespace)
self.meta = meta
registry = Table(
'registry', meta,