Commit 0ba960a8 authored by Aurélien Campéas's avatar Aurélien Campéas
Browse files

replace sqlalchemy core api with sqlhelp

parent 26b984531f95
...@@ -14,16 +14,14 @@ from flask import ( ...@@ -14,16 +14,14 @@ from flask import (
url_for url_for
) )
from sqlalchemy import select
from pml import HTML from pml import HTML
from pygments import highlight from pygments import highlight
from pygments.lexers import PythonTracebackLexer from pygments.lexers import PythonTracebackLexer
from pygments.formatters import HtmlFormatter from pygments.formatters import HtmlFormatter
from sqlhelp import select, update
from rework import api from rework import api
from rework.helper import utcnow from rework.helper import utcnow
from rework.schema import task, worker, operation, monitor
from rework.task import Task from rework.task import Task
from rework_ui.helper import argsdict from rework_ui.helper import argsdict
...@@ -114,17 +112,21 @@ def reworkui(engine, ...@@ -114,17 +112,21 @@ def reworkui(engine,
if t is None: if t is None:
return json.dumps(0) return json.dumps(0)
servicename, hostid, domain = engine.execute( op = select(
select([operation.c.name, operation.c.host, operation.c.domain] 'name', 'host', 'domain'
).where(operation.c.id == task.c.operation ).table(
).where(task.c.id == t.tid) 'rework.operation'
).fetchone() ).join(
'rework.task as task on (task.operation = operation.id)'
).where(
'task.id = %(tid)s', tid=t.tid
).do(engine).fetchone()
newtask = api.schedule(engine, newtask = api.schedule(engine,
servicename, op.name,
rawinputdata=t.raw_input, rawinputdata=t.raw_input,
domain=domain, domain=op.domain,
hostid=hostid, hostid=op.host,
metadata=t.metadata) metadata=t.metadata)
return json.dumps(newtask.tid) return json.dumps(newtask.tid)
...@@ -199,15 +201,17 @@ def reworkui(engine, ...@@ -199,15 +201,17 @@ def reworkui(engine,
@bp.route('/shutdown-worker/<wid>') @bp.route('/shutdown-worker/<wid>')
def shutdown_worker(wid): def shutdown_worker(wid):
with engine.begin() as cn: with engine.begin() as cn:
cn.execute(worker.update().where(worker.c.id == wid update('rework.worker').where(id=wid).values(
).values(shutdown=True)) shutdown=True
).do(cn)
return json.dumps(True) return json.dumps(True)
@bp.route('/kill-worker/<wid>') @bp.route('/kill-worker/<wid>')
def kill_worker(wid): def kill_worker(wid):
with engine.begin() as cn: with engine.begin() as cn:
cn.execute(worker.update().where(worker.c.id == wid update('rework.worker').where(id=wid).values(
).values(kill=True)) kill=True
).do(cn)
return json.dumps(True) return json.dumps(True)
class uiargsdict(argsdict): class uiargsdict(argsdict):
...@@ -218,23 +222,30 @@ def reworkui(engine, ...@@ -218,23 +222,30 @@ def reworkui(engine,
@bp.route('/workers-table') @bp.route('/workers-table')
def list_workers(): def list_workers():
# workers # workers
sql = select([worker.c.id, worker.c.host, worker.c.domain, worker.c.pid, q = select(
worker.c.mem, worker.c.shutdown, worker.c.kill, 'id', 'host', 'domain', 'pid', 'mem', 'shutdown',
worker.c.debugport, worker.c.started] 'kill', 'debugport', 'started'
).order_by(worker.c.id ).table('rework.worker'
).where(worker.c.running == True) ).where('running = true'
).order('id')
domain = uiargsdict(request.args).domain domain = uiargsdict(request.args).domain
if domain != 'all': if domain != 'all':
sql = sql.where(worker.c.domain == domain) q.where(domain=domain)
workers = engine.execute(sql).fetchall() workers = q.do(engine).fetchall()
# monitors # monitors
sql = select([monitor.c.id, monitor.c.domain, monitor.c.lastseen, monitor.c.options]) q = select(
'id', 'domain', 'lastseen', 'options'
).table('rework.monitor')
if domain != 'all': if domain != 'all':
sql = sql.where(monitor.c.domain == domain) q.where(domain=domain)
monitors = {row.domain: row for row in engine.execute(sql).fetchall()} monitors = {
row.domain: row
for row in q.do(engine).fetchall()
}
now = utcnow().astimezone(TZ) now = utcnow().astimezone(TZ)
h = HTML() h = HTML()
...@@ -364,13 +375,14 @@ def reworkui(engine, ...@@ -364,13 +375,14 @@ def reworkui(engine,
@bp.route('/services-table') @bp.route('/services-table')
def list_services(): def list_services():
args = uiargsdict(request.args) args = uiargsdict(request.args)
sql = select([operation.c.id, operation.c.host, operation.c.name, q = select(
operation.c.path, operation.c.domain] 'id', 'host', 'name', 'path', 'domain'
).order_by(operation.c.domain, operation.c.name) ).table('rework.operation'
).order('domain, name')
if args.domain != 'all': if args.domain != 'all':
sql = sql.where(operation.c.domain == args.domain) q.where(domain=args.domain)
ops = engine.execute(sql) ops = q.do(engine)
h = HTML() h = HTML()
h.br() h.br()
with h.table(klass='table table-sm table-bordered table-striped table-hover') as t: with h.table(klass='table table-sm table-bordered table-striped table-hover') as t:
......
from sqlalchemy import Table, Column, Integer, String from pathlib import Path
from rework.schema import meta from sqlhelp import sqlfile
taskstable = Table( SCHEMAFILE = Path(__file__).parent / 'schema.sql'
'taskstable', meta,
Column('id', Integer, primary_key=True),
Column('domain', String, default='default', index=True),
Column('hash', String, nullable=False, index=True),
Column('content', String, nullable=False),
schema='rework'
)
def init(engine): def init(engine):
sql = sqlfile(SCHEMAFILE, ns='rework')
with engine.begin() as cn: with engine.begin() as cn:
taskstable.create(cn) cn.execute(sql)
def reset(engine):
with engine.begin() as cn:
taskstable.drop(cn, checkfirst=True)
create table {ns}.taskstable (
id serial primary key,
domain text default 'default',
hash text not null,
content text not null
);
create index ix_{ns}_taskstable_domain on {ns}.taskstable (domain);
create index ix_{ns}_taskstable_hash on {ns}.taskstable (hash);
...@@ -4,25 +4,18 @@ from time import sleep ...@@ -4,25 +4,18 @@ from time import sleep
from pkg_resources import iter_entry_points from pkg_resources import iter_entry_points
from pml import HTML from pml import HTML
from sqlalchemy import select, desc from sqlhelp import select, insert
from rework.task import Task from rework.task import Task
from rework.schema import task, operation
from rework_ui.schema import taskstable
def latest_table_hash(engine, domain): def latest_table_hash(engine, domain):
sql = select( q = select(
[taskstable.c.hash] 'hash'
).order_by( ).table('rework.taskstable'
desc(taskstable.c.id) ).order('hash'
).limit( ).limit(1
1 ).where(domain=domain)
).where( return q.do(engine).scalar()
taskstable.c.domain == domain
)
return engine.execute(sql).scalar()
def refresh_tasks(engine, inithash, domain): def refresh_tasks(engine, inithash, domain):
...@@ -30,22 +23,20 @@ def refresh_tasks(engine, inithash, domain): ...@@ -30,22 +23,20 @@ def refresh_tasks(engine, inithash, domain):
thash = md5(str(taskstates).encode('ascii')).hexdigest() thash = md5(str(taskstates).encode('ascii')).hexdigest()
if thash != inithash: if thash != inithash:
htmltable = generate_tasks_table(engine, taskstates) htmltable = generate_tasks_table(engine, taskstates)
sql = taskstable.insert().values( q = insert('rework.taskstable').values(
hash=thash, hash=thash,
domain=domain, domain=domain,
content=htmltable content=htmltable
) )
with engine.begin() as cn: with engine.begin() as cn:
cn.execute(sql) q.do(cn)
inithash = thash inithash = thash
# cleanup old tables # cleanup old tables
sql = taskstable.delete().where( sql = ('delete from rework.taskstable '
taskstable.c.hash != thash 'where hash != %(hash)s '
).where( 'and domain = %(domain)s')
taskstable.c.domain == domain
)
with engine.begin() as cn: with engine.begin() as cn:
cn.execute(sql) cn.execute(sql, hash=thash, domain=domain)
return inithash return inithash
...@@ -76,15 +67,14 @@ def refresh_tasks_file(engine, loop=False, sleeptime=2): ...@@ -76,15 +67,14 @@ def refresh_tasks_file(engine, loop=False, sleeptime=2):
def tasks_info(engine, domain): def tasks_info(engine, domain):
with engine.begin() as cn: with engine.begin() as cn:
sql = select( q = select(
[task.c.id, task.c.status, operation.c.domain] 't.id', 't.status', 'op.domain'
).order_by(desc(task.c.id) ).table('rework.task as t'
).where(task.c.operation == operation.c.id) ).join('rework.operation as op on (op.id = t.operation)'
).order('t.id', 'desc')
if domain != 'all': if domain != 'all':
sql = sql.where( q.where(domain=domain)
operation.c.domain == domain return q.do(cn).fetchall()
)
return cn.execute(sql).fetchall()
MORE_TASKS_ACTIONS = set() MORE_TASKS_ACTIONS = set()
......
...@@ -19,8 +19,7 @@ def engine(request): ...@@ -19,8 +19,7 @@ def engine(request):
db.setup_local_pg_cluster(request, DATADIR, PORT) db.setup_local_pg_cluster(request, DATADIR, PORT)
uri = 'postgresql://localhost:{}/postgres'.format(PORT) uri = 'postgresql://localhost:{}/postgres'.format(PORT)
e = create_engine(uri) e = create_engine(uri)
reworkschema.reset(e) reworkschema.init(e, drop=True)
reworkschema.init(e)
ruischema.init(e) ruischema.init(e)
api.freeze_operations(e) api.freeze_operations(e)
return e return e
......
...@@ -92,6 +92,19 @@ def test_abort(engine, client): ...@@ -92,6 +92,19 @@ def test_abort(engine, client):
assert t.aborted assert t.aborted
def test_relaunch(engine, client):
with workers(engine) as mon:
res = client.put('/schedule-task/good_job?user=Babar',
upload_files=[('input_file', 'input.xml', b'the file', 'text/xml')])
tid = int(res.body)
t = Task.byid(engine, tid)
t.join()
res = client.put(f'/relaunch-task/{tid}')
newtid = int(res.body)
t2 = Task.byid(engine, newtid)
t2.join()
def test_task_life_cycle(engine, client, refresh): def test_task_life_cycle(engine, client, refresh):
with workers(engine): with workers(engine):
tasks = [] tasks = []
...@@ -243,7 +256,7 @@ def test_tasks_table(engine, client, refresh): ...@@ -243,7 +256,7 @@ def test_tasks_table(engine, client, refresh):
t.join() t.join()
taskstable.refresh_tasks_file(engine) taskstable.refresh_tasks_file(engine)
res = client.get('/tasks-table-hash?domain=uranus') res = client.get('/tasks-table-hash?domain=uranus')
assert res.text == 'cbcf36e551ad8fdc0aef16fbefd7c6be' assert res.text == '05265be5adad9bb8b0ee50f837535cfa'
res = client.get('/tasks-table?domain=uranus') res = client.get('/tasks-table?domain=uranus')
refpath = DATADIR / 'tasks-table-uranus.html' refpath = DATADIR / 'tasks-table-uranus.html'
if refresh: if refresh:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment