Commit 51c4d9ed authored by jssuzanne's avatar jssuzanne
Browse files

Flake8 for anybus module

parent 46b5891c2d7a
#!/usr/bin/env python
#==============================================================================
# =
# anybus module for OpenERP, Use the lib anybus and broker rabbitmq to send and receive data
# Copyright (C) 2012 Anybox (<http://http://anybox.fr>)
# Jean-Sebastien SUZANNE <jssuzanne@anybox.fr>
# =
# This file is a part of anybus
# =
# anybus is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License v3 or later
# as published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
# =
# anybus is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License v3 or later for more details.
# =
# You should have received a copy of the GNU Affero General Public License
# v3 or later along with this program.
# If not, see <http://www.gnu.org/licenses/>.
# =
#==============================================================================
# -*- coding: utf-8 -*-
#flake8: noqa
import process
import connection
......
#!/usr/bin/env python
#==============================================================================
# =
# anybus module for OpenERP, Use the lib anybus and broker rabbitmq to send and receive data
# anybus module for OpenERP, Use the lib anybus and broker rabbitmq to send
# and receive data
# Copyright (C) 2012 Anybox (<http://http://anybox.fr>)
# Jean-Sebastien SUZANNE <jssuzanne@anybox.fr>
# =
......@@ -53,5 +54,4 @@
'auto_install': False,
'license': 'AGPL-3',
}
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
......@@ -12,15 +12,17 @@ class anybusConnection(osv.Model):
_columns = {
'name': fields.char('Name', size=64, required=True),
'uri': fields.char('URI', size=255, required=True,
help='Define the connection URI for the AMQP broker (ex: RabbitMQ)'),
'state': fields.selection([('draft', 'Draft'), ('declared', 'Declared')],
'State', readonly=True,
help='Define the state of the connection'),
'exchange_ids': fields.one2many('anybus.entity.exchange',
'connection_id', 'Exchanges', readonly=True),
'uri': fields.char(
'URI', size=255, required=True, help='Define the connection URI '
'for the AMQP broker (ex: RabbitMQ)'),
'state': fields.selection(
[('draft', 'Draft'), ('declared', 'Declared')], 'State',
readonly=True, help='Define the state of the connection'),
'exchange_ids': fields.one2many(
'anybus.entity.exchange', 'connection_id', 'Exchanges',
readonly=True),
'queue_ids': fields.one2many('anybus.entity.queue', 'connection_id',
'Queues', readonly=True),
'Queues', readonly=True),
}
_defaults = {
......@@ -46,7 +48,7 @@ class anybusConnection(osv.Model):
except IOError, e:
error = "No vhost found for %r" % uri
raise osv.except_osv(_('Anybus Connection error'),
_(error))
_(error))
return {}
def action_redraft(self, cr, uid, ids, context=None):
......
......@@ -4,32 +4,37 @@ from kombu import Connection
from osv import osv, fields
from tools.translate import _
class AbstractAnybusEntity(osv.AbstractModel):
_name = 'abstract.anybus.entity'
_description = 'Abstract anybus entity'
_columns = {
'name': fields.char('Name of the entity', size=64, required=True,
states={'declared':[('readonly', True)]},
help="You can use %(dbname)s to add the base name in name"),
'connection_id': fields.many2one('anybus.connection', 'Connection',
states={'declared':[('readonly', True)]}, required=True),
'durable': fields.boolean('Durable', states={'declared':
[('readonly', True)]}, help=(
'name': fields.char(
'Name of the entity', size=64, required=True,
states={'declared': [('readonly', True)]},
help="You can use %(dbname)s to add the base name in name"),
'connection_id': fields.many2one(
'anybus.connection', 'Connection',
states={'declared': [('readonly', True)]}, required=True),
'durable': fields.boolean(
'Durable', states={'declared': [('readonly', True)]}, help=(
"Durable entity remain active when a server restarts."
" Non-durable enity (transient entity) are purged "
"when a server restarts.")),
'auto_delete': fields.boolean('Auto delete', states={'declared':
[('readonly', True)]}, help=(
"If set, the entity is deleted when "
"all entities have finished using it.")),
'arguments': fields.char('Arguments', size=255, required=True,
states={'declared':[('readonly', True)]},
help='Additional arguments to specify when the entity is declared.'),
'already_declared': fields.boolean('Alread declared',
states={'declared':[('readonly', True)]}),
'state': fields.selection([('draft', 'Draft'),
('declared', 'Declared')], 'State'),
'auto_delete': fields.boolean(
'Auto delete', states={'declared': [('readonly', True)]}, help=(
"If set, the entity is deleted when "
"all entities have finished using it.")),
'arguments': fields.char(
'Arguments', size=255, required=True,
states={'declared': [('readonly', True)]},
help='Additional arguments to specify when the entity is '
'declared.'),
'already_declared': fields.boolean(
'Alread declared', states={'declared': [('readonly', True)]}),
'state': fields.selection(
[('draft', 'Draft'), ('declared', 'Declared')], 'State'),
}
_defaults = {
......@@ -45,16 +50,17 @@ class AbstractAnybusEntity(osv.AbstractModel):
default = {}
entity = self.browse(cr, uid, id, context=context)
default.update({'state': 'draft', 'name': 'Copie ' + entity.name})
return super(AbstractAnybusEntity, self).copy(cr, uid, id, default,
context=context)
return super(AbstractAnybusEntity, self).copy(
cr, uid, id, default, context=context)
def unlink(self, cr, uid, ids, context=None):
for entity in self.browse(cr, uid, ids, context=context):
if entity.state != 'draft':
raise osv.except_osv(_('Entity error'),
raise osv.except_osv(
_('Entity error'),
_('Only the draft entity can be being delete'))
return super(AbstractAnybusEntity, self).unlink(cr, uid, ids,
context=context)
return super(AbstractAnybusEntity, self).unlink(
cr, uid, ids, context=context)
def name_get(self, cr, uid, ids, context=None):
if not len(ids):
......@@ -65,34 +71,34 @@ class AbstractAnybusEntity(osv.AbstractModel):
return res.items()
def _get_entity_name(self, cr, uid, id, context=None):
r= self.read(cr, uid, id, ['name'], context=context)
r = self.read(cr, uid, id, ['name'], context=context)
return r['name'] % {'dbname': cr.dbname}
def _get_entity(self, cr, uid, entity, channel, context=None, **kwargs):
raise osv.except_osv(_('Entity error'), _('No entity defined'))
def _get_already_declared_entity(self, cr, uid, entity, channel,
context=None, **kwargs):
context=None, **kwargs):
raise osv.except_osv(_('Entity error'), _('No entity defined'))
def _action_entity_method(self, cr, uid, entity, channel, method,
context=None):
context=None):
e = self._get_entity(cr, uid, entity, channel, context=context)
f = getattr(e, method)
f()
def _action_method(self, cr, uid, ids, method="declare", onerror=True,
write={'state': 'declared'}, context=None):
write={'state': 'declared'}, context=None):
for entity in self.browse(cr, uid, ids, context=context):
if entity.connection_id.state == 'draft':
entity.connection_id.action_declare()
uri = self.pool.get('anybus.connection')._get_uri(cr, uid,
entity.connection_id, context=context)
uri = self.pool.get('anybus.connection')._get_uri(
cr, uid, entity.connection_id, context=context)
try:
with Connection(uri) as conn:
self._action_entity_method(cr, uid, entity,
conn.channel(), method, context=context)
self._action_entity_method(cr, uid, entity, conn.channel(),
method, context=context)
except Exception, e:
if onerror:
raise osv.except_osv(_('Entity error'), e)
......@@ -104,6 +110,6 @@ class AbstractAnybusEntity(osv.AbstractModel):
def action_redraft(self, cr, uid, ids, context=None):
self._action_method(cr, uid, ids, method="delete", onerror=False,
write={'state': 'draft'}, context=context)
write={'state': 'draft'}, context=context)
# vim:expandtab:smartindent:tabstop=4:softtabstop=2:shiftwidth=4:
......@@ -11,30 +11,13 @@ class AnybusExchange(osv.Model):
_inherit = 'abstract.anybus.entity'
_columns = {
'type': fields.selection([('direct', 'Direct'), ('topic', 'Topic'),
('fanout', 'Fanout'), ('headers', 'Headers')], 'Exchange types',
states={'declared':[('readonly', True)]},
help="""
*Direct match between the routing key in the message, and the routing criteria
used when a queue is bound to this exchange.\n
*Topic Wildcard match between the routing key and the routing pattern specified
in the exchange/queue binding. The routing key is treated as zero or more
words delimited by ”.” and supports special wildcard characters. “*” matches a
single word and “#” matches zero or more words.\n
*FanoutQueues are bound to this exchange with no arguments. Hence any message
sent to this exchange will be forwarded to all queues bound to this exchange.\n
*Headers: Queues are bound to this exchange with a table of arguments
containing headers and values (optional). A special argument named “x-match”
determines the matching algorithm, where “all” implies an AND (all pairs must
match) and “any” implies OR (at least one pair must match). arguments is used
to specify the arguments.
"""),
'delivery_mode': fields.selection([(1, 'Transient'), (2, 'Persistent')]\
, 'Delivery mode', states={'declared':[('readonly', True)]},
help='The message is transient. Which means \
it is stored in memory only, and is lost if the server dies or restarts.\nThe \
message is persistent. Which means the message is stored both in-memory, and on\
disk, and therefore preserved if the server dies or restarts.'),
'type': fields.selection(
[('direct', 'Direct'), ('topic', 'Topic'), ('fanout', 'Fanout'),
('headers', 'Headers')], 'Exchange types',
states={'declared': [('readonly', True)]}),
'delivery_mode': fields.selection(
[(1, 'Transient'), (2, 'Persistent')], 'Delivery mode',
states={'declared': [('readonly', True)]}),
}
_defaults = {
......@@ -69,7 +52,7 @@ message is persistent. Which means the message is stored both in-memory, and on\
)
def _get_already_declared_entity(self, cr, uid, exchange, channel,
context=None, **kwargs):
context=None, **kwargs):
name = self._get_entity_name(cr, uid, exchange.id, context=context)
return Exchange(
name=name,
......@@ -79,8 +62,8 @@ message is persistent. Which means the message is stored both in-memory, and on\
def publish(self, cr, uid, id, body, key, context=None):
exchange = self.browse(cr, uid, id, context=context)
ex_name = self._get_entity_name(cr, uid, id, context=context)
uri = self.pool.get('anybus.connection')._get_uri(cr, uid,
exchange.connection_id, context=context)
uri = self.pool.get('anybus.connection')._get_uri(
cr, uid, exchange.connection_id, context=context)
with Connection(uri) as conn:
ex = Exchange(name=ex_name, channel=conn.channel())
message = ex.Message(body)
......
......@@ -10,13 +10,17 @@ class AbstractAnybusCommonMessageFields(osv.AbstractModel):
_name = 'abstract.anybus.common.message.fields'
_columns = {
'onerror': fields.selection([('continue', 'Continue on the next data'),
('block', 'Block the fifo'), ('retry', 'Put at the end of the FIFO')],
'On error', help="""If some data is on error you can "Continue"
'onerror': fields.selection(
[('continue', 'Continue on the next data'),
('block', 'Block the fifo'),
('retry', 'Put at the end of the FIFO')], 'On error',
help="""If some data is on error you can "Continue"
with the next data and avoid blocking the FIFO,
or block the fifo with block or put in the end of the fifo and retry""", required=True),
'delete_message': fields.boolean('Delete message', help='If check, the \
message will be delete after process'),
or block the fifo with block or put in the end of the fifo and retry""",
required=True),
'delete_message': fields.boolean(
'Delete message',
help='If check, the message will be delete after process'),
'properties': fields.char('Properties', size=512,
help='Properties add of the message'),
}
......@@ -37,16 +41,20 @@ class AbstractAnybusMessage(osv.AbstractModel):
]
_columns = {
'connection_id': fields.many2one('anybus.connection', 'Connection',
'connection_id': fields.many2one(
'anybus.connection', 'Connection',
states={'draft': [('readonly', False)]}, readonly=True),
'user_id': fields.many2one('res.users', 'Process user', required=True,
'user_id': fields.many2one(
'res.users', 'Process user', required=True,
states={'draft': [('readonly', False)]}, readonly=True),
'seq': fields.integer('Sequence', readonly=True,
'seq': fields.integer(
'Sequence', readonly=True,
states={'draft': [('readonly', False)]}),
'create_date': fields.datetime('Create Date', readonly=True),
'write_date': fields.datetime('Write Date', readonly=True),
'message': fields.binary('Message',
states={'draft': [('readonly', False)]}, readonly=True),
states={'draft': [('readonly', False)]},
readonly=True),
'result': fields.text('Result', readonly=True),
'state': fields.selection([('draft', 'Draft'), ('blocked', 'Blocked'),
('done', 'Done')], 'state', readonly=True),
......@@ -70,14 +78,18 @@ class AbstractAnybusMessage(osv.AbstractModel):
def unlink(self, cr, uid, ids, context=None):
for message in self.browse(cr, uid, ids, context=context):
if message.state == "blocked":
raise osv.except_osv(_('Message error'), _('You can\' t unlink a blocked message'))
return super(AbstractAnybusMessage, self).unlink(cr, uid, ids, context=context)
raise osv.except_osv(_('Message error'),
_('You can\' t unlink a blocked message'))
return super(AbstractAnybusMessage, self).unlink(
cr, uid, ids, context=context)
def action_redraft(self, cr, uid, ids, context=None):
self.write(cr, uid, ids, {'state': 'draft'}, context=context)
def _action_process_message(self, cr, uid, message, context=None):
raise osv.except_osv(_('Anybus Message'), _('Action fordidden, this action must be inherit'))
raise osv.except_osv(
_('Anybus Message'),
_('Action fordidden, this action must be inherit'))
def action_process_message(self, cr, uid, id, retry=False, context=None):
this = self.browse(cr, uid, id, context=context)
......@@ -92,7 +104,9 @@ class AbstractAnybusMessage(osv.AbstractModel):
return True
except osv.except_osv, e:
if this.onerror == 'continue' or retry:
result = "".join(traceback.format_exception(*sys.exc_info())) + '\n' + repr(e)
result = "".join(
traceback.format_exception(
*sys.exc_info())) + '\n' + repr(e)
this.write({'state': 'blocked', 'result': result})
elif this.onerror == 'retry':
return False
......@@ -100,22 +114,24 @@ class AbstractAnybusMessage(osv.AbstractModel):
raise
def action_process_messages(self, cr, uid, ids, context=None):
search_ids = self.search(cr, uid, [('state', '=', 'draft')], context=context)
search_ids = self.search(
cr, uid, [('state', '=', 'draft')], context=context)
retry = []
for id in search_ids:
if not self.action_process_message(cr, uid, id, context=None):
if not self.action_process_message(cr, uid, id, context=None):
retry.append(id)
for id in retry:
self.action_process_message(cr, uid, id, context=None)
def on_change_defaut_values(self, cr, uid, ids, defaultvalues_id, context=None):
def on_change_defaut_values(self, cr, uid, ids, defaultvalues_id,
context=None):
res = {}
if defaultvalues_id and self._default_values_model:
obj = self.pool.get(self._default_values_model)
if obj:
read = obj.read(cr, uid, defaultvalues_id,
self._default_values_reads, context=context,
load="_classic_write")
read = obj.read(
cr, uid, defaultvalues_id, self._default_values_reads,
context=context, load='_classic_write')
del read['id']
res['value'] = read
return res
......
......@@ -25,8 +25,8 @@ class AbstractAnybusProcess(osv.AbstractModel):
('field_id', 'in', fields_ids),
]
model_ids = model_obj.search(cr, uid, domain, context=context)
res = model_obj.read(cr, uid, model_ids, ['model', 'name'],
context=context)
res = model_obj.read(
cr, uid, model_ids, ['model', 'name'], context=context)
return [(r['model'], r['name']) for r in res]
def force_not_default(self, cr, uid, id, context=None):
......@@ -38,11 +38,12 @@ class AbstractAnybusProcess(osv.AbstractModel):
self.write(cr, uid, ids, {'isdefault': False}, context=context)
def write(self, cr, uid, ids, values, context=None):
res = super(AbstractAnybusProcess, self).write(cr, uid, ids, values,
context=context)
res = super(AbstractAnybusProcess, self).write(
cr, uid, ids, values, context=context)
if values.get('isdefault', False):
if len(ids) > 1:
raise osv.except_osv(_('process Error'),
raise osv.except_osv(
_('process Error'),
_('You can\'t define more than one default process'))
self.force_not_default(cr, uid, ids[0], context=context)
return res
......@@ -51,21 +52,21 @@ class AbstractAnybusProcess(osv.AbstractModel):
if context is None:
context = {}
ctx = context.copy()
if values.get('field_id', None) is None and \
context.get('process_model', None) is not None:
field_id = values.get('field_id', None)
if field_id is None and context.get('process_model') is not None:
domain = [
('model_id.model', '=', context.get('process_model')),
('ttype', '=', 'many2one'),
('relation', '=', self._name),
]
fields_ids = self.pool.get('ir.model.fields').search(cr, uid,
domain, context=context)
fields_ids = self.pool.get('ir.model.fields').search(
cr, uid, domain, context=context)
if fields_ids:
values['field_id'] = fields_ids[0]
else:
ctx.update({'process_model': self._name})
id = super(AbstractAnybusProcess, self).create(cr, uid, values,
context=ctx)
id = super(AbstractAnybusProcess, self).create(
cr, uid, values, context=ctx)
if values.get('isdefault', False):
self.force_not_default(cr, uid, id, context=context)
if self._inherit_create_method:
......@@ -73,8 +74,8 @@ class AbstractAnybusProcess(osv.AbstractModel):
process_id = this[this.field_id.name].id
obj = self.pool.get(this.field_id.relation)
obj.write(cr, uid, process_id,
{'real_process': '%s,%d' % (self._name, id)},
context=context)
{'real_process': '%s,%d' % (self._name, id)},
context=context)
return id
def _process_process(self, cr, uid, id, model, model_ids, context=None):
......@@ -85,7 +86,8 @@ class AbstractAnybusProcess(osv.AbstractModel):
process = this.real_process._name
process_id = this.real_process.id
obj = self.pool.get(process)
return obj._process_process(cr, uid, process_id, model, model_ids, context=context)
return obj._process_process(
cr, uid, process_id, model, model_ids, context=context)
class AbstractAnybusProcessTable(osv.AbstractModel):
......@@ -97,16 +99,16 @@ class AbstractAnybusProcessTable(osv.AbstractModel):
return self._get_model(cr, uid, self._name, context=context)
_columns = {
'name': fields.char('Name of the process', size=64,
required=True),
'real_process': fields.reference('Real process',
selection=__get_model, size=128,
readonly=True),
'field_id': fields.many2one('ir.model.fields', 'Relation field',
required=True),
'name': fields.char(
'Name of the process', size=64, required=True),
'real_process': fields.reference(
'Real process', selection=__get_model, size=128, readonly=True),
'field_id': fields.many2one(
'ir.model.fields', 'Relation field', required=True),
'model_id': fields.many2one('ir.model', 'Model'),
'isdefault': fields.boolean('Is default', help='If check this \
value will be the default process'),
'isdefault': fields.boolean(
'Is default',
help='If check this value will be the default process'),
}
_defaults = {
......
......@@ -11,12 +11,11 @@ class AnybusQueue(osv.Model):
_inherit = 'abstract.anybus.entity'
_columns = {
'exclusive': fields.boolean('Exclusive', states={'declared':
[('readonly', True)]}, help='Exclusive queues may only be \
consumed from by the current connection. Setting the ‘exclusive’ flag always \
implies ‘auto-delete’.'),
'binding_ids': fields.one2many('anybus.binding', 'queue_id', 'Bindings',
states={'declared': [('readonly', True)]}),
'exclusive': fields.boolean(
'Exclusive', states={'declared': [('readonly', True)]}),
'binding_ids': fields.one2many(
'anybus.binding', 'queue_id', 'Bindings',
states={'declared': [('readonly', True)]}),
}
_defaults = {
......@@ -29,7 +28,7 @@ implies ‘auto-delete’.'),
]
def _get_entity(self, cr, uid, queue, channel, context=None,
binding=None):
binding=None):
name = self._get_entity_name(cr, uid, queue.id, context=context)
if context is None:
context = {}
......@@ -43,10 +42,11 @@ implies ‘auto-delete’.'),
}
if binding:
exchange_obj = self.pool.get('anybus.entity.exchange')
exchange=exchange_obj._get_entity(cr,uid, binding.exchange_id,
None, context=context)
exchange_name=exchange_obj._get_entity_name(cr,uid,
binding.exchange_id.id, context=context)
exchange = exchange_obj._get_entity(
cr, uid, binding.exchange_id, None,
context=context)
exchange_name = exchange_obj._get_entity_name(
cr, uid, binding.exchange_id.id, context=context)
if binding.routing_key:
routing_key = binding.routing_key % {
'dbname': cr.dbname,
......@@ -58,7 +58,7 @@ implies ‘auto-delete’.'),
return Queue(
name=name,
exchange=exchange,
routing_key = routing_key,
routing_key=routing_key,
channel=channel,
durable=queue.durable,
exclusive=queue.exclusive,
......@@ -76,7 +76,7 @@ implies ‘auto-delete’.'),
)
def _get_already_declared_entity(self, cr, uid, queue, channel,
context=None):
context=None):
name = self._get_entity_name(cr, uid, queue.id, context=context)
return Queue(
name=name,
......@@ -84,28 +84,28 @@ implies ‘auto-delete’.'),
)
def _action_entity_method(self, cr, uid, queue, channel, method,
context=None):
context=None):
if queue.binding_ids:
for binding in queue.binding_ids:
new_queue = self._get_entity(cr, uid, queue, channel,
context=context, binding=binding)
context=context, binding=binding)
f = getattr(new_queue, method)
f()
else:
new_queue = self._get_entity(cr, uid, queue, channel,
context=context)
context=context)
f = getattr(new_queue, method)
f()
def action_purge(self, cr, uid, ids, context=None):
self._action_method(cr, uid, ids, method="purge", onerror=False,
write={}, context=context)
write={}, context=context)
def get(self, cr, uid, id, context=None):
queue = self.browse(cr, uid, id, context=context)
queue_name = self._get_entity_name(cr, uid, id, context=context)
uri = self.pool.get('anybus.connection')._get_uri(cr, uid,
queue.connection_id, context=context)
uri = self.pool.get('anybus.connection')._get_uri(
cr, uid, queue.connection_id, context=context)
with Connection(uri) as conn:
q = Queue(name=queue_name, channel=conn.channel())
message = q.get()
......@@ -115,28 +115,29 @@ implies ‘auto-delete’.'),
return body
return None
class AnybusBinding(osv.Model):
_name = 'anybus.binding'
_description = 'anybus binding'
_rec_name = 'routing_key'
_columns = {
'queue_id': fields.many2one('anybus.entity.queue', 'Queue', required=True),
'exchange_id': fields.many2one('anybus.entity.exchange', 'Exchange',
required=True),
'queue_id': fields.many2one(
'anybus.entity.queue', 'Queue', required=True),
'exchange_id': fields.many2one(
'anybus.entity.exchange', 'Exchange', required=True),
'routing_key': fields.char('Routing key', size=128),
'binding_arguments': fields.char('Arguments', size=255, required=True,
help='Additional arguments to specify when binding the queue'),
'binding_arguments': fields.char('Arguments', size=255, required=True),
}
_defaults = {