Commit bf7f4120 authored by Aurélien Campéas's avatar Aurélien Campéas
Browse files

a web ui for rework

Rework-UI is a [rework][rework] plugin which provides a `view`
subcommand to the `rework` command.
The command launches a simple Web front-end to monitor and control
workers, tasks and operations of `rework`.
It works like this:
$ rework view postgres://babar:celeste@jobstore:5432/
* Running on (Press CTRL+C to quit) - - [10/Nov/2017 14:32:04] "GET /rework HTTP/1.1" 200 - - - [10/Nov/2017 14:32:04] "GET /services-table HTTP/1.1" 200 - - - [10/Nov/2017 14:32:04] "GET /workers-table HTTP/1.1" 200 - - - [10/Nov/2017 14:32:07] "GET /tasks-table HTTP/1.1"
200 -
At the same time, a browser tab opens, and we can see the big picture.
![rework view](
from sqlalchemy import create_engine
from flask import Flask
from rework_ui.blueprint import reworkui
def startapp(host, port, dburi):
engine = create_engine(dburi)
app = Flask('rework')
app.register_blueprint(reworkui(engine)), port=port, threaded=True)
import json
from flask import Blueprint, request, render_template, url_for
from pml import HTML
from rework.schema import task, worker
from rework.task import Task
from rework_ui.helper import argsdict
bp = Blueprint('reworkui', __name__,
def getjob(engine, jobid):
return Task.byid(engine, int(jobid))
return None
class newjobargs(argsdict):
defaults = {
'user': '<unknown>'
class sliceargs(argsdict):
types = {
'from_log_id': int
def reworkui(engine):
def job_status(jobid):
job = getjob(engine, jobid)
if job is None:
return 'NO SUCH JOB'
return job.state
def job_logslice(jobid):
job = getjob(engine, jobid)
if job is None:
args = sliceargs(request.args)
logs = job.logs(fromid=args.from_log_id)
return json.dumps([[lid, line] for lid, line in logs])
def kill_job(jobid):
job = getjob(engine, jobid)
if job is None:
return 'no such job'
if job.aborted:
return 'was already aborted'
return 'job terminated'
def list_jobs():
with engine.connect() as cn:
tsql = 'select id from rework.task order by id'
jobids = cn.execute(tsql).fetchall()
opsql = 'select id, name from rework.operation'
ops = dict(cn.execute(opsql).fetchall())
output = []
for jid, in jobids:
job = getjob(jid)
stat = job.status
if stat == 'done':
if job.traceback:
stat = 'failed'
elif job.aborted:
stat = 'aborted'
output.append((jid, ops[job.operation], stat))
return json.dumps(output)
def shutdown_worker(wid):
with engine.connect() as cn:
cn.execute(worker.update().where( == wid
return json.dumps(True)
def list_workers():
workers = engine.execute('select id, host, pid, mem, shutdown from rework.worker '
'where running = true '
'order by id'
h = HTML()
with h.table(klass='table table-sm table-bordered table-striped table-hover') as t:
with t.thead(klass='thead-inverse') as th:
with as r:'#')'pid@host')'memory (Mb)')'action')
for wid, host, pid, mem, shutdown in workers:
with as r:, scope='row')'{}@{}'.format(pid, host))
with as col:
with col.button() as b:
if shutdown:
b('shutdown asked', klass='btn glyphicon glyphicon-ban-circle')
b('shutdown', type='button', klass='btn btn-danger btn-sm',
return str(h)
def delete_task(tid):
with engine.connect() as cn:
cn.execute("delete from rework.task where id = %(tid)s and status != 'running'",
return json.dumps(True)
def abort_task(tid):
with engine.connect() as cn:
sql = task.update().where( == tid
return json.dumps(True)
def list_tasks():
tids = engine.execute('select id from rework.task order by id desc').fetchall()
opsql = 'select id, name from rework.operation'
ops = dict(engine.execute(opsql).fetchall())
h = HTML()
with h.table(klass='table table-sm table-bordered table-striped table-hover') as t:
with t.thead(klass='thead-inverse') as th:
with as r:'#')'service')'created')'user')'worker')'status')'action')
for tid, in tids:
task = Task.byid(engine, tid)
if task is None:
continue # avoid a `delete` + refresh tasks race condition
with as r:, scope='row')
with as col:
title='show the tasks log (if any)',
href='tasklogs/{}'.format(tid))'created').strftime('%Y-%m-%d %H:%M:%S'))'user', '<unknown>'))
worker = task._propvalue('worker')'#{}'.format(worker or ''))
state = task.state
stateattrs = {'klass': state}
if state == 'failed':
stateattrs['title'] = task.traceback, **stateattrs)
with as col:
state = task.state
with col.button() as b:
if state == 'running':
b('abort', type='button', klass='btn btn-danger btn-sm',
elif state == 'aborting':
b('wait', klass='btn glyphicon glyphicon-ban-circle')
b('delete', type='button', klass='btn btn-warning btn-sm',
col.span(' ')
return str(h)
def tasklogs(taskid):
return render_template(
logsliceuri=url_for('reworkui.job_logslice', jobid=taskid)
def list_services():
sql = 'select id, host, name, path from rework.operation order by id'
ops = engine.execute(sql)
h = HTML()
with h.table(klass='table table-sm table-bordered table-striped table-hover') as t:
with t.thead(klass='thead-inverse') as th:
with as r:'#')'host')'name')'path')
for opid, host, name, path in ops.fetchall():
with as r:, scope='row')
return str(h)
def home():
return render_template('home.html')
return bp
from threading import Thread
import socket
import webbrowser
import click
from import startapp
def view(db_uri):
"""monitor and control workers and tasks"""
ipaddr = socket.gethostbyname(socket.gethostname())
port = 5679
server = Thread(name='reworkui.webapp', target=startapp,
kwargs={'host': ipaddr, 'port': port, 'dburi': db_uri})
server.daemon = True
server.start()'http://{ipaddr}:{port}/rework'.format(ipaddr=ipaddr, port=port))
from warnings import warn
from werkzeug import ImmutableMultiDict
class dictobj(dict):
""" a dict-like object:
* whose values can also be get/set using the `obj.key` notation
* object[key] returns None if the key is not known
def __getattr__(self, name):
return self[name]
def __setattr__(self, name, value):
self[name] = value
def __getitem__(self, name):
if name in self:
return super(dictobj, self).__getitem__(name)
return None
def copy(self):
return self.__class__((k, self[k]) for k in self)
class argsdict(dictobj):
types = {}
defaults = {}
def __init__(self, reqargs=None, defaults=None, types=None):
""" transforms the request args (or any such dict) for convenience :
* be a malleable dictobj (whose missing attrs/keys default to None)
* set the default values (if any, defaults is a mapping from keys
to a scalar or a collable)
* coerce to the wanted types (if any, types is a mapping from keys
to a type or factory function)
super(argsdict, self).__init__()
if reqargs is None: # copy constructor
if not isinstance(reqargs, ImmutableMultiDict):
for k, v in reqargs.items():
self[k] = v
defaults = defaults or self.defaults
types = types or self.types
for key, val in reqargs.to_dict(flat=False).items():
# when sending json, attributes land as `<attribute>[]`
islist = key.endswith('[]')
key = key.rstrip('[]')
targettype = types.get(key)
# signal if there is any discrepancy and force to tuple
if islist and targettype not in (list, tuple):
warn('element %r is a sequence but its expected type is %r' %
(key, targettype))
targettype = tuple
# val can be an str or a sequence of strs
# hence `not filter(None, val)` gets us all
# the falsy values ('', [''])
if not list(filter(None, val)): # py3k: force to list
# no value -> default value
default = defaults.get(key)
self[key] = default() if callable(default) else default
self[key] = val if targettype in (list, tuple) else val[0]
# type coercion
if targettype:
self[key] = targettype(self[key])
def _set_defaults(self, defaults=None):
defaults = defaults or self.defaults
# complete entries with mandatory defaults
for key, val in defaults.items():
if key not in self:
self[key] = val() if callable(val) else val
def copy(self):
new = self.__class__()
for k in self:
new[k] = self[k]
return new
.queued {
color: blue;
.running {
color: blue;
.done {
color: green;
.aborting {
color: blue;
.aborted {
color: orange;
.failed {
color: red;
"use strict"
function append(domid, html) {
const div = document.getElementById(domid)
const span = document.createElement('span')
span.innerHTML = html + '<br/>'
function update(domid, html) {
const elt = document.getElementById(domid)
elt.innerHTML = html
function refresh_section(section) {
resp => resp.text()
resp => update(section, resp)
function shutdown_worker(wid) {
() => refresh_section('workers')
function abort_task(tid) {
() => refresh_section('tasks')
function delete_task(tid) {
() => refresh_section('tasks')
function show_logs(logsliceuri) {
let lastid = 0
console.log('logslice uri', logsliceuri)
function _getmore() {
resp => resp.json()
).then(logs => {
logs.forEach(id_line => {
// let's be ruthlessly inefficient :)
const [id, line] = id_line
lastid = id
append('logs', line)
setInterval(_getmore, 3000)
<!doctype html>
<title>Rework Monitor</title>
<link rel="stylesheet"
<link rel="stylesheet" href="{{ url_for('reworkui.static', filename='style.css') }}">
<script src="{{ url_for('reworkui.static', filename='util.js') }}"></script>
{% block body %}
{% endblock %}
{% extends "base.html" %}
{% block body %}
<h1>Rework Monitoring UI</h1>
<div id="tasks">
<div id="workers">
<div id="services">
document.onreadystatechange = function () {
if (document.readyState == 'complete') {
setInterval(() => refresh_section('workers'), 10000)
setInterval(() => refresh_section('tasks'), 2000)
{% endblock %}
{% extends "base.html" %}
{% block body %}
<h1>Task #{{tid}}</h1>
<div id="logs" style="color: green"></div>
document.onreadystatechange = function () {
if (document.readyState == 'complete') {
{% endblock %}
description-file =
universal = 1
from setuptools import setup
author='Aurelien Campeas',
description='A web ui for the rework distributed task dispatcher',
package_data={'rework_ui': [
entry_points={'rework.subcommands': [
'Development Status :: 4 - Beta',
'Intended Audience :: Developers',
'License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)',
'Operating System :: OS Independent',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 3',
'Environment :: Web Environment',
'Topic :: System :: Distributed Computing',
'Topic :: Software Development :: User Interfaces'
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment