Commit 4b6efb5d authored by jssuzanne's avatar jssuzanne
Browse files

Update script to consume

--HG--
branch : 8.0
parent 58c5dbc62473
......@@ -38,6 +38,7 @@
],
'update_xml': [
'security/ir.model.access.csv',
'view/template.xml',
'view/menu.xml',
'view/connection.xml',
'view/message.xml',
......
# -*- coding: utf-8 -*-
from openerp import models, fields
from enum import Enum
class MessageStatus(Enum):
ACK = 0
NACK = 1
REJECT = 2
ERROR = 3
class anybusConnection(models.Model):
......@@ -33,7 +41,12 @@ class anybusConnection(models.Model):
def get_consumers(self):
# (queue name, model, method name)
return [('erp_ping', self._name, 'ping')]
return [
('erp_ping', self._name, 'ping'),
]
def ping(self, body=None):
print(' ==> ', body)
def publish(self, exchange, routing_key, data, contenttype):
pass
......
# -*- coding: utf-8 -*-
from openerp import models, fields
from openerp import models, fields, api
class AnybusMessage(models.Model):
......@@ -17,7 +17,7 @@ class AnybusMessage(models.Model):
binding_key = fields.Char(readonly=True)
queue = fields.Char(readonly=True)
model = fields.Many2one('res.model')
model = fields.Many2one('ir.model')
method = fields.Char()
message = fields.Binary(required=True)
......@@ -34,6 +34,11 @@ class AnybusMessage(models.Model):
else:
this.name = "%s => %s" % (this.exchange, this.binding_key)
@api.v7
def action_process(self, cr, uid, ids, context=None):
for this in self.browse(cr, uid, ids, context=context):
this.process()
@api.v8
def process(self):
pass
# TODO
print(' ==> ', self._ids)
openerp.anybus = function(instance) {
instance.anybus.ProcessAll = function(parent, action) {
parent.rpc('/web/dataset/call_button', {
model: 'anybus.message',
method: 'action_process',
domain_id: null,
context_id: 0,
args: [action.context.active_ids],
});
};
instance.web.client_actions.add("anybus.process.all", "instance.anybus.ProcessAll");
}
......@@ -11,6 +11,7 @@
<field name="sequence" widget="handle"/>
<field name="name"/>
<field name="create_date"/>
<button string="Process" type="object" name="action_process"/>
</tree>
</field>
</record>
......@@ -109,5 +110,18 @@
parent="menu_anybus_general"
sequence="20"
action="act_open_anybus_message_view"/>
<record id="make_all_colidation" model="ir.actions.client">
<field name="name">Process all</field>
<field name="tag">anybus.process.all</field>
</record>
<record id="action_process_all" model="ir.values">
<field name="name">Process all</field>
<field name="model">anybus.message</field>
<field name="key">action</field>
<field name="key2">client_action_multi</field>
<field name="value" eval="'ir.actions.client,%d'%make_all_colidation"/>
</record>
</data>
</openerp>
<?xml version="1.0" encoding="UTF-8"?>
<openerp>
<data>
<template id="assets_backend"
name="web_live assets"
inherit_id="web.assets_backend">
<xpath expr="." position="inside">
<script
type="text/javascript"
src="/anybus/static/src/js/process.js"
/>
</xpath>
</template>
</data>
</openerp>
from pika import SelectConnection, URLParameters
from openerp.addons.anybus.connection import MessageStatus
from base64 import b64encode
import logging
from time import sleep
from uuid import uuid1
logger = logging.getLogger(__name__)
......@@ -93,15 +93,48 @@ class Consumer:
logger.info('Connexion stopped')
def declare_consumer(self, queue, model, method):
print(' ==> ', queue, model, method)
model_id = self.session.env['ir.model'].search(
[('model', '=', model)])[0].id
def on_message(unused_channel, basic_deliver, properties, body):
uuid = uuid1()
print('received', uuid)
sleep(2)
self._channel.basic_ack(basic_deliver.delivery_tag)
sleep(2)
print('ack', uuid)
logger.info('received on %r tag %r',
queue, basic_deliver.delivery_tag)
self.session.env.cr.rollback()
error = ""
try:
status = getattr(self.session.env[model], method)(body=body)
except Exception as e:
self.session.env.cr.rollback()
status = MessageStatus.ERROR
error = str(e)
if status is MessageStatus.ACK:
self._channel.basic_ack(basic_deliver.delivery_tag)
logger.info('ack queue %s tag %r',
queue, basic_deliver.delivery_tag)
elif status is MessageStatus.NACK:
self._channel.basic_nack(basic_deliver.delivery_tag)
logger.info('nack queue %s tag %r',
queue, basic_deliver.delivery_tag)
elif status is MessageStatus.REJECT:
self._channel.basic_reject(basic_deliver.delivery_tag)
logger.info('reject queue %s tag %r',
queue, basic_deliver.delivery_tag)
elif status is MessageStatus.ERROR or status is None:
self.session.env['anybus.message'].create(dict(
sequence=basic_deliver.delivery_tag,
queue=queue,
model=model_id,
method=method,
message=b64encode(body),
contenttype=properties.content_type,
error=error
))
self._channel.basic_ack(basic_deliver.delivery_tag)
logger.info('save message of the queue %s tag %r',
queue, basic_deliver.delivery_tag)
self.session.env.cr.commit()
self._consumer_tags.append(self._channel.basic_consume(
on_message, queue=queue, arguments=dict(model=model, method=method)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment