Commit 970e761f authored by Aurélien Campéas's avatar Aurélien Campéas
Browse files

web/tasks-table: use a cached tasks table, add abitlity to schedule a job, and other things

This is an heavy backport wich carries a bit too much baggage ...

Noteworthy:
* we add a db table to store tasks table views per domain (and complete)
* the client checks a hash rather than reloading the whole table: only on changes
  does the reload happen
* a domain selector is introduced
* a dedicated url for task errors is provided
* we provide an extension point `tasks_actions` to allow specification
  of callbacks from unknown tiers
parent 9ebe55322f13
import io
import base64
import json
from flask import (
......@@ -7,6 +8,7 @@ from flask import (
make_response,
request,
render_template,
render_template_string,
send_file,
url_for
)
......@@ -14,12 +16,16 @@ from flask import (
from sqlalchemy import select
from pml import HTML
from pygments import highlight
from pygments.lexers import PythonTracebackLexer
from pygments.formatters import HtmlFormatter
from rework import api
from rework.schema import task, worker, operation
from rework.task import Task
from rework_ui.helper import argsdict
from rework_ui import taskstable
bp = Blueprint('reworkui', __name__,
......@@ -41,7 +47,62 @@ class sliceargs(argsdict):
}
def reworkui(engine, serviceactions=None):
def _schedule_job(engine,
service,
args,
inputfile):
user = args.user
if user is None:
abort(400, 'user parameter is mandatory')
hostid = args.hostid or api.host()
domain = args.domain
metadata = {'user': user}
if args.options:
metadata['options'] = args.options
try:
task = api.schedule(engine, service,
rawinputdata=inputfile,
hostid=hostid,
domain=domain,
metadata=metadata)
except Exception as err:
abort(400, str(err))
return json.dumps(task.tid)
def alldomains(engine):
return [
dom for dom, in engine.execute(
'select domain from rework.operation group by domain order by domain'
).fetchall()
]
def initialdomain(domains):
return 'all' if len(domains) > 1 else domains and domains[0] or 'default'
def reworkui(engine,
serviceactions=None,
alttemplate=None):
@bp.route('/new_job/<service>', methods=['PUT'])
def submit_job(service):
args = argsdict()
args.update(argsdict(request.form))
args.update(argsdict(request.args))
fileargs = argsdict(request.files)
if 'input_file' not in fileargs:
abort(400, 'input file is mandatory')
return _schedule_job(engine,
service,
args,
fileargs.input_file.read())
@bp.route('/relaunch-task/<int:tid>', methods=['PUT'])
def relaunch_task(tid):
......@@ -73,7 +134,6 @@ def reworkui(engine, serviceactions=None):
return send_file(io.BytesIO(archive),
mimetype='application/octet-stream')
@bp.route('/job_results/<jobid>')
def job_results(jobid):
job = getjob(engine, jobid)
......@@ -161,7 +221,7 @@ def reworkui(engine, serviceactions=None):
class uiargsdict(argsdict):
defaults = {
'domain': 'all'
'domain': initialdomain(alldomains(engine))
}
@bp.route('/workers-table')
......@@ -196,7 +256,7 @@ def reworkui(engine, serviceactions=None):
with r.td() as col:
with col.button() as b:
if shutdown:
b('shutdown asked', klass='btn glyphicon glyphicon-ban-circle')
b('shutdown asked', klass='btn gltyphicon glyphicon-ban-circle')
else:
b('shutdown', type='button', klass='btn btn-warning btn-sm',
onclick='shutdown_worker({})'.format(wid))
......@@ -225,62 +285,40 @@ def reworkui(engine, serviceactions=None):
cn.execute(sql)
return json.dumps(True)
@bp.route('/tasks-table')
def list_tasks():
tids = engine.execute('select id from rework.task order by id desc').fetchall()
opsql = 'select id, name from rework.operation'
ops = dict(engine.execute(opsql).fetchall())
h = HTML()
with h.table(klass='table table-sm table-bordered table-striped table-hover') as t:
with t.thead(klass='thead-inverse') as th:
with th.tr() as r:
r.th('#')
r.th('service')
r.th('created')
r.th('user')
r.th('worker')
r.th('status')
r.th('action')
for tid, in tids:
task = Task.byid(engine, tid)
if task is None:
continue # avoid a `delete` + refresh tasks race condition
with t.tr() as r:
r.th(str(task.tid), scope='row')
with r.td() as col:
col.a(ops[task.operation],
title='show the tasks log (if any)',
target='_blank',
href='tasklogs/{}'.format(tid))
r.td(task._propvalue('created').strftime('%Y-%m-%d %H:%M:%S'))
r.td(task.metadata.get('user', '<unknown>'))
@bp.route('/taskerror/<int:taskid>')
def taskerror(taskid):
job = getjob(engine, taskid)
if job is None:
abort(404, 'job does not exists')
worker = task._propvalue('worker')
r.td('#{}'.format(worker or ''))
formatter = HtmlFormatter()
traceback = highlight(job.traceback,
PythonTracebackLexer(),
formatter)
return render_template(
'taskerror.html',
tid=taskid,
css=formatter.get_style_defs(),
traceback=traceback
)
state = task.state
stateattrs = {'klass': state}
if state == 'failed':
stateattrs['title'] = task.traceback
r.td(state, **stateattrs)
@bp.route('/tasks-table-hash')
def tasks_table_hash():
args = uiargsdict(request.args)
thash = taskstable.latest_table_hash(engine, args.domain)
return thash or 'no-hash-yet'
with r.td() as col:
state = task.state
with col.button() as b:
if state == 'running':
b('abort', type='button', klass='btn btn-danger btn-sm',
onclick='abort_task({})'.format(task.tid))
elif state == 'aborting':
b('wait', klass='btn glyphicon glyphicon-ban-circle')
else:
b('delete', type='button', klass='btn btn-warning btn-sm',
onclick='delete_task({})'.format(task.tid))
col.span(' ')
@bp.route('/tasks-table')
def list_tasks():
args = uiargsdict(request.args)
content = engine.execute('select content from rework.taskstable '
'where domain = %(domain)s '
'order by id desc limit 1',
domain=args.domain).scalar()
if content is None:
return '<p>Table under construction ...</p>'
return str(h)
return content
@bp.route('/tasklogs/<int:taskid>')
def tasklogs(taskid):
......@@ -326,8 +364,29 @@ def reworkui(engine, serviceactions=None):
return str(h)
@bp.route('/rework')
@bp.route('/')
def home():
return render_template('home.html')
domains = alldomains(engine)
h = HTML()
firstdomain = initialdomain(domains)
with h.select(id='domain-filter', name='domain-filter',
title='domain',
onchange='setdomain(this)')as s:
if len(domains) > 1:
s.option('all')
for domain in domains:
s.option(domain, value=domain)
else:
s.option(domains[0])
if alttemplate:
return render_template_string(alttemplate,
domain_filter=str(h),
initialdomain=firstdomain)
return render_template('rui_home.html',
domain_filter=str(h),
initialdomain=firstdomain)
return bp
......@@ -3,8 +3,10 @@ import socket
import webbrowser
import click
from sqlalchemy import create_engine
from rework_ui.app import startapp
from rework_ui import schema, taskstable
@click.command()
......@@ -20,3 +22,23 @@ def view(db_uri):
webbrowser.open('http://{ipaddr}:{port}/rework'.format(ipaddr=ipaddr, port=port))
input()
@click.command(name='complete-db')
@click.argument('dburi')
def complete_db(dburi):
"""create the db table necessary for handling big tasks table views in the client"""
engine = create_engine(dburi)
schema.reset(engine)
schema.init(engine)
@click.command(name='generate-tasks-table')
@click.argument('dburi')
@click.option('--loop', is_flag=True, default=False)
@click.option('--period', type=int, default=2)
def generate_tasks_table(dburi, loop=False, period=2):
"""fill (periodically if needed) the tasks table used by the tasks view"""
engine = create_engine(dburi)
taskstable.refresh_tasks_file(engine, loop=loop, sleeptime=period)
......@@ -13,9 +13,30 @@ function update(domid, html) {
elt.innerHTML = html
}
function refresh_tasks() {
fetch(`tasks-table-hash?domain=${domain}`).then(
resp => resp.text()
).then(
newhash => {
if (newhash != hash) {
hash = newhash
refresh_section('tasks')
}
}
)
}
function setdomain(form) {
domain = form.selectedOptions[0].value
refresh_section('tasks')
refresh_section('services')
refresh_section('workers')
}
function refresh_section(section) {
fetch(`${section}-table`).then(
fetch(`${section}-table?domain=${domain}`).then(
resp => resp.text()
).then(
resp => update(section, resp)
......@@ -23,6 +44,13 @@ function refresh_section(section) {
}
function start_job(operation, form) {
fetch(`new_job/${operation}?user=WEBUI`,
{method: 'PUT', body: new FormData(form)})
refresh_section('services')
}
function shutdown_worker(wid) {
fetch(`shutdown-worker/${wid}`).then(
() => refresh_section('workers')
......@@ -44,6 +72,13 @@ function abort_task(tid) {
}
function relaunch_task(tid) {
fetch(`relaunch-task/${tid}`, {method: 'PUT'}).then(
() => refresh_section('tasks')
)
}
function delete_task(tid) {
fetch(`delete-task/${tid}`).then(
() => refresh_section('tasks')
......@@ -56,7 +91,13 @@ function show_logs(logsliceuri) {
console.log('logslice uri', logsliceuri)
function _getmore() {
fetch(`${logsliceuri}?from_log_id=${lastid}`).then(
resp => resp.json()
resp => {
if (resp.status != 200) {
clearInterval(ival)
throw `task at ${logsliceuri} is gone`
}
return resp.json()
}
).then(logs => {
logs.forEach(id_line => {
// let's be ruthlessly inefficient :)
......@@ -64,9 +105,9 @@ function show_logs(logsliceuri) {
lastid = id
append('logs', line)
})
})
}).catch(err => console.log(err))
}
_getmore()
setInterval(_getmore, 3000)
const ival = setInterval(_getmore, 3000)
}
......@@ -2,6 +2,10 @@
{% block body %}
<div id="filter" style="float: right">
{{ domain_filter | safe }}
</div>
<h1>Rework Monitoring UI</h1>
<h2>Tasks</h2>
......@@ -18,6 +22,7 @@
<script>
let domain = '{{initialdomain}}'
document.onreadystatechange = function () {
if (document.readyState == 'complete') {
refresh_section('services')
......
{% extends "rui_base.html" %}
{% block body %}
<style>
{{css}}
</style>
<h1>Task #{{tid}}</h1>
<div id="error">
{{traceback | safe}}
</div>
{% endblock %}
from sqlalchemy import Table, Column, Integer, String
from rework.schema import meta
taskstable = Table(
'taskstable', meta,
Column('id', Integer, primary_key=True),
Column('domain', String, default='default', index=True),
Column('hash', String, nullable=False, index=True),
Column('content', String, nullable=False),
schema='rework'
)
def init(engine):
with engine.connect() as cn:
taskstable.create(cn)
def reset(engine):
with engine.connect() as cn:
taskstable.drop(cn, checkfirst=True)
from hashlib import md5
from time import sleep
from pkg_resources import iter_entry_points
from pml import HTML
from sqlalchemy import select, desc
from rework.task import Task
from rework.schema import task, operation
from rework_ui.schema import taskstable
def latest_table_hash(engine, domain):
sql = select(
[taskstable.c.hash]
).order_by(
desc(taskstable.c.id)
).limit(
1
).where(
taskstable.c.domain == domain
)
return engine.execute(sql).scalar()
def refresh_tasks(engine, inithash, domain):
taskstates = tasks_info(engine, domain)
thash = md5(str(taskstates).encode('ascii')).hexdigest()
if thash != inithash:
htmltable = generate_tasks_table(engine, taskstates)
sql = taskstable.insert().values(
hash=thash,
domain=domain,
content=htmltable
)
with engine.connect() as cn:
cn.execute(sql)
inithash = thash
# cleanup old tables
sql = taskstable.delete().where(
taskstable.c.hash != thash
).where(
taskstable.c.domain == domain
)
with engine.connect() as cn:
cn.execute(sql)
return inithash
def refresh_tasks_file(engine, loop=False, sleeptime=2):
domains = [dom for dom, in engine.execute(
'select domain from rework.operation group by domain order by domain'
).fetchall()]
if len(domains) > 1:
domains.insert(0, 'all')
inithashes = {domain: latest_table_hash(engine, domain)
for domain in domains}
inithashes = {domain: refresh_tasks(engine, inithashes[domain], domain)
for domain in domains}
if loop:
print('Looping. Type Ctrl-C to stop.')
while loop:
newhashes = {domain: refresh_tasks(engine, inithashes[domain], domain)
for domain in domains}
if newhashes != inithashes:
print('tasks set changed')
inithashes = newhashes
else:
print('nothing changed')
sleep(sleeptime)
def tasks_info(engine, domain):
with engine.connect() as cn:
sql = select(
[task.c.id, task.c.status, operation.c.domain]
).order_by(desc(task.c.id)
).where(task.c.operation == operation.c.id)
if domain != 'all':
sql = sql.where(
operation.c.domain == domain
)
return cn.execute(sql).fetchall()
MORE_TASKS_ACTIONS = set()
def add_plugin_actions():
for ep in iter_entry_points('tasks_actions'):
MORE_TASKS_ACTIONS.add(ep.load())
add_plugin_actions()
def generate_tasks_table(engine, taskstates):
opsql = 'select id, name from rework.operation'
ops = dict(engine.execute(opsql).fetchall())
h = HTML()
with h.table(klass='table table-sm table-bordered table-striped table-hover') as t:
with t.thead(klass='thead-inverse') as th:
with th.tr() as r:
r.th('#')
r.th('service')
r.th('domain')
r.th('created')
r.th('user')
r.th('worker')
r.th('status')
r.th('action')
for row in taskstates:
job = Task.byid(engine, row.id)
with t.tr() as r:
r.th(str(job.tid), scope='row')
with r.td() as col:
with col.span() as sp:
sp.a(ops[job.operation],
title='show the tasks log (if any)',
target='_blank',
href='tasklogs/{}'.format(row.id))
if job.traceback:
with col.span() as sp:
sp(' ')
sp.a('[traceback]',
title='show the error',
target='_blank',
href='taskerror/{}'.format(row.id))
r.td(row.domain)
r.td(job._propvalue('created').strftime('%Y-%m-%d %H:%M:%S'))
# user plus maybe run name
meta = job.metadata
user = meta.get('user', '<unknown>')
run_name = meta.get('options', {}).get('run_name', None)
if run_name:
user = '{} [{}]'.format(user, run_name)
r.td(user)
worker = job._propvalue('worker')
r.td('#{}'.format(worker or ''))
state = job.state
stateattrs = {'klass': state}
if state == 'failed':
stateattrs['title'] = job.traceback
r.td(state, **stateattrs)
with r.td() as col:
with col.button() as b:
if state == 'running':
b('abort', type='button', klass='btn btn-danger btn-sm',
onclick='abort_task({})'.format(job.tid))
elif state == 'aborting':
b('wait', klass='btn glyphicon glyphicon-ban-circle')
else:
b('delete', type='button', klass='btn btn-warning btn-sm',
onclick='delete_task({})'.format(job.tid))
if row.status == 'done':
col.span(' ')
with col.button() as b:
b('relaunch', type='button', klass='btn btn-primary btn-sm',
onclick='relaunch_task({})'.format(job.tid))
for action in MORE_TASKS_ACTIONS:
action(col, job, state, ops)
return str(h)
......@@ -13,6 +13,7 @@ setup(name='rework_ui',
'rework',
'flask',
'pml',
'sqlalchemy',
'webtest'
],
package_data={'rework_ui': [
......@@ -20,7 +21,9 @@ setup(name='rework_ui',
'rui_templates/*'
]},
entry_points={'rework.subcommands': [
'view=rework_ui.cli:view'
'view=rework_ui.cli:view',
'complete-db=rework_ui.cli:complete_db',
'generate-tasks-table=rework_ui.cli:generate_tasks_table'
]},