Commit bbdb3d93 authored by jssuzanne's avatar jssuzanne
Browse files

Publish message

--HG--
branch : 8.0
parent 940f1da7d79a
# flake8: noqa
import connection
import message
import ping
......@@ -43,6 +43,7 @@
'view/menu.xml',
'view/connection.xml',
'view/message.xml',
'view/ping.xml',
],
'demo_xml': [
],
......
# -*- coding: utf-8 -*-
from openerp import models, fields
from enum import Enum
from base64 import b64encode
from logging import getLogger
......@@ -54,5 +55,19 @@ class anybusConnection(models.Model):
return MessageStatus.ACK
def publish(self, exchange, routing_key, data, contenttype):
pass
# TODO
message = self.env['anybus.message']
existing_message = message.search(
[('exchange', '=', exchange)])
val = {
'exchange': exchange,
'routing_key': routing_key,
'contenttype': contenttype,
'message': b64encode(data),
}
if existing_message:
val['sequence'] = existing_message[-1].sequence + 1
message.create(val)
existing_message = message.search(
[('exchange', '!=', ''), ('routing_key', '!=', '')])
existing_message.process()
......@@ -2,6 +2,8 @@
from openerp import models, fields, api
from .connection import MessageStatus
from base64 import b64decode
from openerp.tools import config
import pika
from logging import getLogger
logger = getLogger(__name__)
......@@ -20,7 +22,7 @@ class AnybusMessage(models.Model):
sequence = fields.Integer(required=True, default=100)
exchange = fields.Char(readonly=True)
binding_key = fields.Char(readonly=True)
routing_key = fields.Char(readonly=True)
queue = fields.Char(readonly=True)
model = fields.Many2one('ir.model', track_visibility="onchange")
......@@ -28,7 +30,7 @@ class AnybusMessage(models.Model):
message = fields.Binary(required=True, track_visibility="onchange")
contenttype = fields.Char(required=True, track_visibility="onchange")
error = fields.Text(required=True, track_visibility="onchange")
error = fields.Text(readonly=True, track_visibility="onchange")
def _needaction_domain_get(self, *a, **kw):
return [('id', '!=', 0)]
......@@ -38,16 +40,30 @@ class AnybusMessage(models.Model):
if this.queue:
this.name = 'consume %s' % this.queue
else:
this.name = "%s => %s" % (this.exchange, this.binding_key)
this.name = "%s => %s" % (this.exchange, this.routing_key)
@api.v7
def action_process(self, cr, uid, ids, context=None):
for this in self.browse(cr, uid, ids, context=context):
this.process()
self.browse(cr, uid, ids, context=context).process()
@api.v8
def process(self):
logger.info("proces: %s", self.name)
logger.info("process all: %r", self)
for this in self:
logger.info("process: %s", this.name)
if this.exchange and this.routing_key:
res = this.process_publish()
elif this.queue:
res = this.process_consume()
else:
logger.warning("Unknown process: %r", this)
res = True
if not res:
logger.warning("Break process: %r", this)
break
def process_consume(self):
error = ""
try:
body = b64decode(self.message)
......@@ -61,6 +77,7 @@ class AnybusMessage(models.Model):
if status is MessageStatus.ACK:
logger.info('ack %s ', self.name)
self.unlink()
return True
elif status is MessageStatus.NACK:
self.message_post(body="Process has been nack", subject="Process")
logger.info('nack %s', self.name)
......@@ -74,3 +91,56 @@ class AnybusMessage(models.Model):
self.message_post(body="Process finished whitout status",
subject="Process")
logger.info('No status for %s', self.name)
return False
def process_publish(self):
profile = config.get('anybus_profile')
if not profile:
self.write({'error': 'No profile found in configuration'})
return False
connection = self.env['anybus.connection'].search(
[('name', '=', profile)])
if not connection:
self.write({'error': 'No profile found in the connections'})
return False
_connection = None
channel = None
try:
with self.env.cr.savepoint():
body = b64decode(self.message)
parameters = pika.URLParameters(connection.url)
_connection = pika.BlockingConnection(parameters)
channel = _connection.channel()
channel.confirm_delivery()
if channel.basic_publish(
exchange=self.exchange,
routing_key=self.routing_key,
body=body,
properties=pika.BasicProperties(
content_type=self.contenttype,
delivery_mode=1)
):
logger.info("Message published %r", self)
self.unlink()
# if for obscure raison the message can be deleted
# then the message that dont break all
else:
raise Exception("Can publish message")
except Exception as e:
self.write({'error': str(e)})
return False
finally:
if channel and not channel.is_closed and not channel.is_closing:
channel.close()
if (
_connection and
not _connection.is_closed and
not _connection.is_closing
):
_connection.close()
return True
from openerp import models, fields
from simplejson import dumps, loads
class AnyBusPublishPing(models.TransientModel):
_name = 'anybus.publish.ping'
exchange = fields.Char(required=True)
routing_key = fields.Char(required=True)
properties = fields.Text(default='{}', required=True)
def action_publish(self, cr, uid, ids, context=None):
for this in self.browse(cr, uid, ids, context=context):
this.publish()
def publish(self):
self.env['anybus.connection'].publish(
self.exchange, self.routing_key,
dumps({
'exchange': self.exchange,
'routing_key': self.routing_key,
'properties': loads(self.properties),
}),
'text/json'
)
......@@ -23,6 +23,14 @@
<field name="priority" eval="8"/>
<field name="arch" type="xml">
<form string="Messages a traiter">
<header>
<button
string="Process"
type="object"
name="action_process"
class="oe-hightlight"
/>
</header>
<sheet>
<h1>
<field name="name"/>
......@@ -52,8 +60,8 @@
</group>
<label for="exchange"/>
<field name="exchange"/>
<label for="binding_key"/>
<field name="binding_key"/>
<label for="routing_key"/>
<field name="routing_key"/>
</page>
</notebook>
</sheet>
......@@ -76,12 +84,12 @@
<field name="error" nolabel="1"/>
<field name="queue"/>
<field name="exchange"/>
<field name="binding_key"/>
<field name="routing_key"/>
<group string="Group by...">
<filter name="Content types" context="{'group_by': 'contenttype'}" />
<filter name="queue" context="{'group_by': 'queue'}" />
<filter name="Exchange" context="{'group_by': 'exchange'}" />
<filter name="Binding key" context="{'group_by': 'binding_key'}" />
<filter name="routing_key key" context="{'group_by': 'routing_key'}" />
</group>
</search>
</field>
......
<?xml version="1.0" encoding="UTF-8"?>
<openerp>
<data>
<record id="view_anybus_publish_ping_form" model="ir.ui.view">
<field name="name">anybus.publish.ping.form</field>
<field name="model">anybus.publish.ping</field>
<field name="type">form</field>
<field name="priority" eval="8"/>
<field name="arch" type="xml">
<form string="Do a ping" version="7.0">
<sheet>
<label for="exchange"/>
<field name="exchange"/>
<label for="routing_key"/>
<field name="routing_key"/>
<label for="properties"/>
<field name="properties"/>
</sheet>
<footer>
<button
string="Cancel"
special="cancel"
/>
<button
string="Publish"
type="object"
name="action_publish"
/>
</footer>
</form>
</field>
</record>
<record model="ir.actions.act_window" id="act_open_anybus_publish_ping_view">
<field name="name">Do a ping</field>
<field name="type">ir.actions.act_window</field>
<field name="res_model">anybus.publish.ping</field>
<field name="view_type">form</field>
<field name="view_mode">form</field>
<field name="target">new</field>
<field name="domain">[]</field>
<field name="context">{}</field>
</record>
<record model="ir.actions.act_window.view" id="act_open_anybus_publish_ping_view_form">
<field name="act_window_id" ref="act_open_anybus_publish_ping_view"/>
<field name="sequence" eval="20"/>
<field name="view_mode">form</field>
<field name="view_id" ref="view_anybus_publish_ping_form"/>
</record>
<menuitem id="menu_anybus_publish_ping"
parent="menu_anybus_general"
sequence="40"
action="act_open_anybus_publish_ping_view"/>
</data>
</openerp>
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment