Commit 483fefdc authored by jssuzanne's avatar jssuzanne
Browse files

Fixed consumer

--HG--
branch : 8.0
parent a153ae3a0bff
import logging
from argparse import ArgumentParser
from openerp.tools import config
from .consumer import Consumer
from .consumer import ReconnectingConsumer
logger = logging.getLogger(__name__)
......@@ -22,7 +22,7 @@ def run(session):
logger.info("run on database %s with profile %s", dbname, profile)
session.open(db=parsed_args.database)
consumer = Consumer(session, profile)
consumer = ReconnectingConsumer(session, profile)
try:
consumer.run()
except KeyboardInterrupt:
......
......@@ -2,6 +2,7 @@ from pika import SelectConnection, URLParameters
from openerp.addons.anybus.profile import MessageStatus
from base64 import b64encode
import logging
import time
logger = logging.getLogger(__name__)
......@@ -16,6 +17,8 @@ class Consumer:
self._closing = False
self._consumer_tags = []
self.odoo_connection = None
self._consuming = False
self._prefetch_count = 1
def get_url(self):
connection = self.session.env['anybus.profile'].search(
......@@ -26,56 +29,105 @@ class Consumer:
raise Exception("Profile unknown")
def on_connection_open(self, *a):
def connect(self):
"""This method connects to RabbitMQ, returning the connection handle.
When the connection is established, the on_connection_open method
will be invoked by pika.
:rtype: pika.SelectConnection
"""
url = self.get_url()
logger.info('Connecting to %s', url)
return SelectConnection(
parameters=URLParameters(url),
on_open_callback=self.on_connection_open,
on_open_error_callback=self.on_connection_open_error,
on_close_callback=self.on_connection_closed)
def close_connection(self):
self._consuming = False
if self._connection.is_closing or self._connection.is_closed:
logger.info('Connection is closing or already closed')
else:
logger.info('Closing connection')
self._connection.close()
def on_connection_open(self, _unused_connection):
"""This method is called by pika once the connection to RabbitMQ has
been established. It passes the handle to the connection object in
case we need it, but in this case, we'll just mark it unused.
:param pika.SelectConnection _unused_connection: The connection
"""
self.odoo_connection.write({'state': 'connected'})
self.session.cr.commit()
self._connection.add_on_close_callback(self.on_connection_closed)
self._connection.channel(on_open_callback=self.on_channel_open)
logger.info('Connexion opened')
def on_channel_open(self, channel):
logger.info('Channel opened')
self._channel = channel
self._channel.add_on_close_callback(self.on_channel_closed)
for queue, model, method in self.odoo_connection.get_consumers():
self.declare_consumer(queue, model, method)
def on_channel_closed(self, channel, reply_code, reply_text):
logger.warning('Channel %i was closed: (%s) %s',
channel, reply_code, reply_text)
self._connection.close()
def on_connection_open_error(self, _unused_connection, err):
"""This method is called by pika if the connection to RabbitMQ
can't be established.
:param pika.SelectConnection _unused_connection: The connection
:param Exception err: The error
"""
logger.error('Connection open failed: %s', err)
self.reconnect()
def on_connection_closed(self, connection, reply_code, reply_text):
def on_connection_closed(self, _unused_connection, reason):
"""This method is invoked by pika when the connection to RabbitMQ is
closed unexpectedly. Since it is unexpected, we will reconnect to
RabbitMQ if it disconnects.
:param pika.connection.Connection connection: The closed connection obj
:param Exception reason: exception representing reason for loss of
connection.
"""
self._channel = None
if self._closing:
self._connection.ioloop.stop()
else:
logger.warning(
'Connection closed, reopening in 5 seconds: (%s) %s',
reply_code, reply_text)
self._connection.add_timeout(5, self.reconnect)
logger.warning('Connection closed, reconnect necessary: %s', reason)
self.reconnect()
def reconnect(self):
self._connection.ioloop.stop()
if not self._closing:
self._connection = self.connect()
self._connection.ioloop.start()
"""Will be invoked if the connection can't be opened or is
closed. Indicates that a reconnect is necessary then stops the
ioloop.
"""
self.should_reconnect = True
self.stop()
def connect(self):
url = self.get_url()
logger.info('Connecting to %s', url)
return SelectConnection(
URLParameters(url),
self.on_connection_open,
stop_ioloop_on_close=False
)
def on_channel_open(self, channel):
"""This method is invoked by pika when the channel has been opened.
The channel object is passed in so we can make use of it.
Since the channel is now open, we'll declare the exchange to use.
:param pika.channel.Channel channel: The channel object
"""
logger.info('Channel opened')
self._channel = channel
self._channel.add_on_close_callback(self.on_channel_closed)
for queue, model, method in self.odoo_connection.get_consumers():
self.declare_consumer(queue, model, method)
self.was_consuming = True
self._consuming = True
logger.info('All consumers is declared')
def on_channel_closed(self, channel, reason):
"""Invoked by pika when RabbitMQ unexpectedly closes the channel.
Channels are usually closed if you attempt to do something that
violates the protocol, such as re-declare an exchange or queue with
different parameters. In this case, we'll close the connection
to shutdown the object.
:param pika.channel.Channel: The closed channel
:param Exception reason: why the channel was closed
"""
logger.warning('Channel %i was closed: %s', channel, reason)
self.close_connection()
def stop_consuming(self):
self.odoo_connection.write({'state': 'disconnected'})
self.session.cr.commit()
if self._channel:
for consumer_tag in self._consumer_tags:
self._channel.basic_cancel(self.on_cancelok, consumer_tag)
self._channel.basic_cancel(consumer_tag, self.on_cancelok)
def on_cancelok(self, unused_frame):
logger.info('RabbitMQ acknowledged the cancellation of the consumer')
......@@ -86,11 +138,24 @@ class Consumer:
self._connection.ioloop.start()
def stop(self):
logger.info('Stopping connexion')
self._closing = True
self.stop_consuming()
self._connection.ioloop.start()
logger.info('Connexion stopped')
"""Cleanly shutdown the connection to RabbitMQ by stopping the consumer
with RabbitMQ. When RabbitMQ confirms the cancellation, on_cancelok
will be invoked by pika, which will then closing the channel and
connection. The IOLoop is started again because this method is invoked
when CTRL-C is pressed raising a KeyboardInterrupt exception. This
exception stops the IOLoop which needs to be running for pika to
communicate with RabbitMQ. All of the commands issued prior to starting
the IOLoop will be buffered but not processed.
"""
if not self._closing:
self._closing = True
logger.info('Stopping')
if self._consuming:
self.stop_consuming()
self._connection.ioloop.start()
else:
self._connection.ioloop.stop()
logger.info('Stopped')
def declare_consumer(self, queue, model, method):
model_id = self.session.env['ir.model'].search(
......@@ -151,5 +216,46 @@ class Consumer:
logger.info('Consume the queue %r', queue)
self._consumer_tags.append(self._channel.basic_consume(
on_message, queue=queue, arguments=dict(model=model, method=method)
queue, on_message, arguments=dict(model=model, method=method)
))
class ReconnectingConsumer(object):
"""This is an example consumer that will reconnect if the nested
ExampleConsumer indicates that a reconnect is necessary.
"""
def __init__(self, session, profile):
self._reconnect_delay = 0
self.session = session
self.profile = profile
self._consumer = Consumer(session, profile)
def run(self):
while True:
try:
self._consumer.run()
except KeyboardInterrupt:
self._consumer.stop()
break
self._maybe_reconnect()
def stop(self):
self._consumer.stop()
def _maybe_reconnect(self):
if self._consumer.should_reconnect:
self._consumer.stop()
reconnect_delay = self._get_reconnect_delay()
logger.info('Reconnecting after %d seconds', reconnect_delay)
time.sleep(reconnect_delay)
self._consumer = Consumer(self.session, self.profile)
def _get_reconnect_delay(self):
if self._consumer.was_consuming:
self._reconnect_delay = 0
else:
self._reconnect_delay += 1
if self._reconnect_delay > 30:
self._reconnect_delay = 30
return self._reconnect_delay
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment