Commit 0f0220f9 authored by Clovis NZOUENDJOU's avatar Clovis NZOUENDJOU
Browse files

Fix flake8

parent bde809c26cd5
......@@ -18,15 +18,15 @@ class anybusConnection(osv.Model):
'consumer_ids': fields.one2many('anybus.consumer', 'connection_id',
'Consumers', readonly=True),
'cmessage_ids': fields.one2many('anybus.consumer.message',
'connection_id', 'Consumers message',
domain=[('state', '!=', 'done')], readonly=True),
'connection_id', 'Consumers message',
domain=[('state', '!=', 'done')], readonly=True),
'cprocess_ids': fields.function(_get_process,
method=True,
string='Incoming processings',
type='many2many',
relation='anybus.consumer.process',
readonly=True,
store=False),
method=True,
string='Incoming processings',
type='many2many',
relation='anybus.consumer.process',
readonly=True,
store=False),
}
## vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
......@@ -16,20 +16,20 @@ class AnybusConsumer(osv.Model):
_columns = {
'name': fields.char('Name of the consumer', size=64, required=True),
'connection_id': fields.many2one('anybus.connection', 'Connection',
domain=[('state', '=', 'declared')], required=True),
domain=[('state', '=', 'declared')], required=True),
'process_id': fields.many2one('anybus.consumer.process',
'process', required=True),
'process', required=True),
'queue_ids': fields.many2many('anybus.entity.queue',
'consumers_queues_rel', 'consumer_id', 'queue_id', 'Queues list'),
'consumers_queues_rel', 'consumer_id',
'queue_id', 'Queues list'),
'domain': fields.char('Domain', size=512, required=True,
help="this field is evaluated. Use properties to use one property"),
help="this field is evaluated. Use properties to use one property"),
'state': fields.selection([('draft', 'Draft'), ('consume', 'Consume')],
'State'),
}
_defaults = {
'process_id': lambda self, cr, uid, c={}: \
self._get_default_process(cr, uid, context=c),
'process_id': lambda self, cr, uid, c={}: self._get_default_process(cr, uid, context=c),
'domain': lambda *a: 'True',
'state': lambda *a: 'draft',
}
......@@ -38,7 +38,8 @@ class AnybusConsumer(osv.Model):
def _get_default_process(self, cr, uid, context=None):
ids = self.pool.get('anybus.consumer.process').search(cr, uid,
[('isdefault', '=', True)], context=context)
[('isdefault', '=', True)],
context=context)
if ids:
return ids[0]
return False
......@@ -47,10 +48,12 @@ class AnybusConsumer(osv.Model):
this = self.browse(cr, uid, id, context=context)
process_id = this.process_id.id
process_obj = self.pool.get('anybus.consumer.process')
process_obj.process_process(cr, uid, process_id, 'anybus.consumer.message', [message_id], context=context)
process_obj.process_process(cr, uid, process_id, 'anybus.consumer.message',
message_id, context=context)
def _action_consume(self, cr, uid, consumer, context=None, test_cursor=None):
connection_obj = self.pool.get('anybus.connection')
def thread_action_consumer(self, dbname, uid, id, domain, uri, queue_ids, context):
def callback_method(body, message):
if test_cursor is not None:
......@@ -60,7 +63,7 @@ class AnybusConsumer(osv.Model):
message_obj = self.pool.get('anybus.consumer.message')
values = message_obj.on_change_defaut_values(cr, uid, None, id,
context=context).get('value')
context=context).get('value')
if isinstance(body, dict):
properties = body.copy()
else:
......@@ -91,25 +94,26 @@ class AnybusConsumer(osv.Model):
cursor = pooler.get_db(dbname).cursor()
with Connection(uri) as conn:
queues = []
for queue in self.pool.get('anybus.entity.queue').browse(cursor, uid, queue_ids, context=context):
for queue in self.pool.get('anybus.entity.queue').browse(cursor, uid, queue_ids,
context=context):
if queue.binding_ids:
for binding in queue.binding_ids:
new_queue = queue_obj._get_entity(cursor, uid, queue, conn.channel(),
context=context, binding=binding)
context=context, binding=binding)
queues.append(new_queue)
else:
new_queue = queue_obj._get_entity(cursor, uid, queue, conn.channel(),
context=context)
context=context)
queues.append(new_queue)
c = Consumer(conn.channel(), queues=queues, no_ack=False,
auto_declare=False, callbacks=[callback_method])
auto_declare=False, callbacks=[callback_method])
c.consume()
while True:
try:
conn.drain_events(timeout=1)
except socket.timeout:
pass
except AttributeError, err:
except AttributeError: # , err:
pass
finally:
if not self._consumer.get(id):
......@@ -123,7 +127,8 @@ class AnybusConsumer(osv.Model):
if queue.state == 'declared':
queue_ids.append(queue.id)
uri = connection_obj._get_uri(cr, uid, consumer.connection_id, context=context)
thread.start_new_thread(thread_action_consumer, (self, cr.dbname, uid, consumer.id, consumer.domain, uri, queue_ids, context))
thread.start_new_thread(thread_action_consumer, (self, cr.dbname, uid, consumer.id,
consumer.domain, uri, queue_ids, context))
def action_consume(self, cr, uid, ids, context=None, test_cursor=None):
for consumer in self.browse(cr, uid, ids, context=context):
......
......@@ -13,10 +13,10 @@ class AnybusConsumerMessage(osv.Model):
_columns = {
'consumer_id': fields.many2one('anybus.consumer', 'Consumer',
state={
'blocked': [('readonly', True)],
'done': [('readonly', True)],
}, required=True),
state={
'blocked': [('readonly', True)],
'done': [('readonly', True)],
}, required=True),
}
_default_values_reads = [
......@@ -28,6 +28,7 @@ class AnybusConsumerMessage(osv.Model):
def _action_process_message(self, cr, uid, message, context=None):
self.pool.get('anybus.consumer').consume(cr, uid,
message.consumer_id.id, message.id, context=context)
message.consumer_id.id,
message.id, context=context)
## vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
......@@ -11,7 +11,7 @@ class AnybusConsumerprocessNoop(osv.Model):
_columns = {
'process_id': fields.many2one('anybus.consumer.process',
'process', required=True, ondelete="cascade"),
'process', required=True, ondelete="cascade"),
}
def _process_process(self, cr, uid, id, model, model_ids, context=None):
......
......@@ -56,12 +56,12 @@ class TestConsume(AnybusTransactionCase):
self.ex_model.publish(self.cr, self.uid, self.ex_demo_id, "", q2_rk)
qty = self.get_queue_reception_qty(queue_name)
if not qty:
self.fail("No message received for %r" %queue_name)
self.fail("No message received for %r" % queue_name)
qty = self.get_queue_reception_qty(q2_name)
if not qty:
self.fail("No message received for %r" % q2_name)
self.c_model.action_consume(self.cr, self.uid, [self.c_demo_id], test_cursor=self.cr)
sleep(0.1) # put a tempo because the thread don t create cursor and use the same
sleep(0.1) # put a tempo because the thread don t create cursor and use the same
self.c_model.action_consume(self.cr, self.uid, [c2], test_cursor=self.cr)
#self.c_model.action_consume(self.cr, self.uid, [self.c_demo_id, c2], test_cursor=self.cr)
sleep(2)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment