Commit f522cf2e authored by jssuzanne's avatar jssuzanne
Browse files

Dont save publisher message

--HG--
branch : 8.0
parent 8a67d1fb6b80
......@@ -2,8 +2,6 @@
from openerp import models, fields, api
from .profile import MessageStatus
from base64 import b64decode
from openerp.tools import config
import pika
from logging import getLogger
logger = getLogger(__name__)
......@@ -20,13 +18,10 @@ class AnybusMessage(models.Model):
name = fields.Char(compute="get_name")
sequence = fields.Integer(required=True, default=100)
exchange = fields.Char(readonly=True)
routing_key = fields.Char(readonly=True)
queue = fields.Char(readonly=True)
model = fields.Many2one('ir.model', track_visibility="onchange")
method = fields.Char(track_visibility="onchange")
queue = fields.Char(readonly=True, required=True)
model = fields.Many2one(
'ir.model', track_visibility="onchange", required=True)
method = fields.Char(track_visibility="onchange", required=True)
message = fields.Binary(required=True, track_visibility="onchange")
contenttype = fields.Char(
......@@ -40,10 +35,7 @@ class AnybusMessage(models.Model):
def get_name(self):
for this in self:
if this.queue:
this.name = 'consume %s' % this.queue
else:
this.name = "%s => %s" % (this.exchange, this.routing_key)
this.name = 'consume %s' % this.queue
@api.v7
def action_process(self, cr, uid, ids, context=None):
......@@ -54,15 +46,7 @@ class AnybusMessage(models.Model):
logger.info("process all: %r", self)
for this in self:
logger.info("process: %s", this.name)
if this.exchange and this.routing_key:
res = this.process_publish()
elif this.queue:
res = this.process_consume()
else:
logger.warning("Unknown process: %r", this)
res = True
if not res:
if not this.process_consume():
logger.warning("Break process: %r", this)
break
......@@ -96,54 +80,3 @@ class AnybusMessage(models.Model):
logger.info('No status for %s', self.name)
return False
def process_publish(self):
profile = config.get('anybus_profile')
if not profile:
self.write({'error': 'No profile found in configuration'})
return False
connection = self.env['anybus.profile'].search(
[('name', '=', profile)])
if not connection:
self.write({'error': 'No profile found in the connections'})
return False
_connection = None
channel = None
try:
with self.env.cr.savepoint():
body = b64decode(self.message)
parameters = pika.URLParameters(connection.url)
_connection = pika.BlockingConnection(parameters)
channel = _connection.channel()
channel.confirm_delivery()
if channel.basic_publish(
exchange=self.exchange,
routing_key=self.routing_key,
body=body,
properties=pika.BasicProperties(
content_type=self.contenttype,
delivery_mode=1)
):
logger.info("Message published %r", self)
self.unlink()
# if for obscure raison the message can be deleted
# then the message that dont break all
else:
raise Exception("Can publish message")
except Exception as e:
self.write({'error': str(e)})
return False
finally:
if channel and not channel.is_closed and not channel.is_closing:
channel.close()
if (
_connection and
not _connection.is_closed and
not _connection.is_closing
):
_connection.close()
return True
# -*- coding: utf-8 -*-
from openerp import models, fields
from base64 import b64encode
from logging import getLogger
from .schema import MessageStatus, anybus_schema_validation, PingSchema
from openerp.tools import config
import pika
logger = getLogger(__name__)
......@@ -47,19 +48,50 @@ class AnyBusProfile(models.Model):
return MessageStatus.ACK
def publish(self, exchange, routing_key, data, contenttype):
message = self.env['anybus.message']
existing_message = message.search(
[('exchange', '=', exchange)])
val = {
'exchange': exchange,
'routing_key': routing_key,
'contenttype': contenttype,
'message': b64encode(data),
}
if existing_message:
val['sequence'] = existing_message[-1].sequence + 1
profile = config.get('anybus_profile')
if not profile:
self.write({'error': 'No profile found in configuration'})
return False
message.create(val)
existing_message = message.search(
[('exchange', '!=', ''), ('routing_key', '!=', '')])
existing_message.process()
connection = self.env['anybus.profile'].search(
[('name', '=', profile)])
if not connection:
self.write({'error': 'No profile found in the connections'})
return False
_connection = None
channel = None
try:
with self.env.cr.savepoint():
parameters = pika.URLParameters(connection.url)
_connection = pika.BlockingConnection(parameters)
channel = _connection.channel()
channel.confirm_delivery()
if channel.basic_publish(
exchange=exchange,
routing_key=routing_key,
body=data,
properties=pika.BasicProperties(
content_type=contenttype,
delivery_mode=1)
):
logger.info("Message published %r", self)
# if for obscure raison the message can be deleted
# then the message that dont break all
else:
raise Exception("Can publish message")
except Exception as e:
self.write({'error': str(e)})
return False
finally:
if channel and not channel.is_closed and not channel.is_closing:
channel.close()
if (
_connection and
not _connection.is_closed and
not _connection.is_closing
):
_connection.close()
return True
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment