From 89621110bea24cb8ba0a3b25ced64b6889adc711 Mon Sep 17 00:00:00 2001 From: Raafat Date: Wed, 26 Apr 2023 12:02:29 +0200 Subject: [PATCH 1/4] MRSAL-6: Add exchange_exists and queue_exists --- mrsal/mrsal.py | 58 ++++++++++++++++--- pyproject.toml | 2 +- .../test_exceptions.py | 27 ++++++++- 3 files changed, 78 insertions(+), 9 deletions(-) diff --git a/mrsal/mrsal.py b/mrsal/mrsal.py index 984e3d9..ee3a62b 100644 --- a/mrsal/mrsal.py +++ b/mrsal/mrsal.py @@ -8,6 +8,7 @@ import pika from pika import SSLOptions +from pika.exchange_type import ExchangeType from retry import retry from mrsal.config.logging import get_logger @@ -132,8 +133,11 @@ def setup_exchange(self, exchange: str, exchange_type: str, arguments: Dict[str, except AttributeError as err: self.log.error(f'Caught a attribute error: {err}') raise AttributeError + except pika.exceptions.ChannelClosedByBroker as err: + self.log.error(f'Caught ChannelClosedByBroker error: {err}') + raise pika.exceptions.ChannelClosedByBroker(404, str(err)) except pika.exceptions.ConnectionClosedByBroker as err: - self.log.error(f'Caught a connection closed by broker error: {err}') + self.log.error(f'Caught ConnectionClosedByBroker error: {err}') raise pika.exceptions.ConnectionClosedByBroker(503, str(err)) def setup_queue(self, queue: str, arguments: Dict[str, str] = None, durable: bool = True, @@ -172,9 +176,9 @@ def setup_queue(self, queue: str, arguments: Dict[str, str] = None, durable: boo passive=passive) self.log.success(f'Queue is declared successfully: {queue_declare_info}, result={queue_declare_result.method}') return queue_declare_result - except pika.exceptions.ChannelClosedByBroker as err: + except (pika.exceptions.ChannelClosedByBroker, pika.exceptions.ChannelWrongStateError) as err: self.log.error(f'Caught ChannelClosedByBroker: {err}') - raise pika.exceptions.ChannelClosedByBroker(503, str(err)) + raise pika.exceptions.ChannelClosedByBroker(404, str(err)) def setup_queue_binding(self, exchange: str, queue: str, routing_key: str = None, arguments=None): """Bind queue to exchange. @@ -246,6 +250,17 @@ def exchange_delete(self, exchange: str): def confirm_delivery(self): self._channel.confirm_delivery() + def exchange_exist(self, exchange: str, exchange_type: ExchangeType): + exch_result: pika.frame.Method = self.setup_exchange( + exchange=exchange, exchange_type=exchange_type, passive=True + ) + return exch_result + + def queue_exist(self, queue: str): + queue_result = self.setup_queue(queue=queue, passive=True) + # message_count1 = result1.method.message_count + return queue_result + # -------------------------------------------------------------- # -------------------------------------------------------------- # TODO NOT IN USE: Need to reformat it to publish messages to dead letters exchange after exceeding retries limit @@ -318,15 +333,29 @@ def start_consumer( :param bool callback_with_delivery_info: Specify whether the callback method needs delivery info. - spec.Basic.Deliver: Captures the fields for delivered message. E.g:(consumer_tag, delivery_tag, redelivered, exchange, routing_key). - spec.BasicProperties: Captures the client message sent to the server. E.g:(CONTENT_TYPE, DELIVERY_MODE, MESSAGE_ID, APP_ID). + :param bool fast_setup: + - when True, the method will create the specified exchange, queue + and bind them together using the routing kye. + - If False, this method will check if the specified exchange and queue + are already exist before start consuming. """ + print_thread_index = f"thread={str(thread_num)} -> " if thread_num is not None else "" + self.log.info(f'{print_thread_index} Consuming messages queue= {queue}, requeue= {requeue}, inactivity_timeout= {inactivity_timeout}') if fast_setup: # setting up the necessary connections self.setup_exchange(exchange=exchange, exchange_type=exchange_type) self.setup_queue(queue=queue) self.setup_queue_binding(exchange=exchange, queue=queue, routing_key=routing_key) - print_thread_index = f"thread={str(thread_num)} -> " if thread_num is not None else "" - self.log.info(f'{print_thread_index} Consuming messages queue= {queue}, requeue= {requeue}, inactivity_timeout= {inactivity_timeout}') - + else: + # Check if the necessary resources (exch & queue) are active + try: + if exchange is not None and exchange_type is not None: + self.exchange_exist(exchange=exchange, exchange_type=exchange_type) + self.queue_exist(queue=queue) + except (pika.exceptions.ChannelClosedByBroker, pika.exceptions.ConnectionClosedByBroker) as err: + self.log.error(f'{print_thread_index} Failed to check active resources. Cancel consumer. {str(err)}') + self._channel.cancel() + raise pika.exceptions.ChannelClosedByBroker(404, str(err)) try: self.consumer_tag = None method_frame: spec.Basic.Deliver @@ -435,7 +464,7 @@ def start_concurrence_consumer(self, total_threads: int, queue: str, callback: C @retry((pika.exceptions.UnroutableError), tries=2, delay=5, jitter=(1, 3)) def publish_message( - self, exchange: str, routing_key: str, message: Any, exchange_type: str = None, + self, exchange: str, routing_key: str, message: Any, exchange_type: ExchangeType = ExchangeType.direct, queue: str = None, fast_setup: bool = False, prop: pika.BasicProperties = None ): """Publish message to the exchange specifying routing key and properties. @@ -444,6 +473,11 @@ def publish_message( :param str routing_key: The routing key to bind on :param bytes body: The message body; empty string if no body :param pika.spec.BasicProperties properties: message properties + :param bool fast_setup: + - when True, will the method create the specified exchange, queue + and bind them together using the routing kye. + - If False, this method will check if the specified exchange and queue + are already exist before publishing. :raises UnroutableError: raised when a message published in publisher-acknowledgments mode (see @@ -459,6 +493,16 @@ def publish_message( self.setup_exchange(exchange=exchange, exchange_type=exchange_type) self.setup_queue(queue=queue) self.setup_queue_binding(exchange=exchange, queue=queue, routing_key=routing_key) + else: + # Check if the necessary resources (exch & queue) are active + try: + self.exchange_exist(exchange=exchange, exchange_type=exchange_type) + if queue is not None: + self.queue_exist(queue=queue) + except (pika.exceptions.ChannelClosedByBroker, pika.exceptions.ConnectionClosedByBroker) as err: + self.log.error(f'Failed to check active resources. Cancel consumer. {str(err)}') + self._channel.cancel() + raise pika.exceptions.ChannelClosedByBroker(404, str(err)) try: # Publish the message by serializing it in json dump diff --git a/pyproject.toml b/pyproject.toml index 3b6aa65..b36931b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "mrsal" -version = "0.4.0-alpha" +version = "0.4.1-alpha" description = "Mrsal is a simple to use message broker abstraction on top of RabbitMQ and Pika." authors = ["Raafat ", "Jon E Nesvold "] maintainers = ["Raafat ", "Jon E Nesvold "] diff --git a/tests/test_delay_and_dl_messages/test_exceptions.py b/tests/test_delay_and_dl_messages/test_exceptions.py index 6fd81ce..0727dc9 100644 --- a/tests/test_delay_and_dl_messages/test_exceptions.py +++ b/tests/test_delay_and_dl_messages/test_exceptions.py @@ -1,9 +1,11 @@ import os from socket import gaierror -import mrsal.config.config as config import pika import pytest +from pika.exchange_type import ExchangeType + +import mrsal.config.config as config import tests.config as test_config from mrsal.config.logging import get_logger from mrsal.mrsal import Mrsal @@ -118,7 +120,30 @@ def test_bind_exceptions(): assert (f'Caught a AttributeError exception caused by "NoneType" object has no attribute "queue_bind": {err2}') +def test_active_exchange_exceptions(): + mrsal = Mrsal(host=test_config.HOST, + port=config.RABBITMQ_PORT, + credentials=config.RABBITMQ_CREDENTIALS, + virtual_host=config.V_HOST) + mrsal.connect_to_server() + exchange = 'not_exist_exch' + with pytest.raises(pika.exceptions.ChannelClosedByBroker) as err1: + mrsal.exchange_exist(exchange=exchange, exchange_type=ExchangeType.direct) + +def test_active_queue_exceptions(): + mrsal = Mrsal(host=test_config.HOST, + port=config.RABBITMQ_PORT, + credentials=config.RABBITMQ_CREDENTIALS, + virtual_host=config.V_HOST) + mrsal.connect_to_server() + queue = 'not_exist_queue' + with pytest.raises(pika.exceptions.ChannelClosedByBroker) as err1: + mrsal.queue_exist(queue=queue) + + if __name__ == '__main__': test_connection_exceptions() test_exchange_exceptions() test_bind_exceptions() + test_active_exchange_exceptions() + test_active_queue_exceptions() From d099077c4601e699d7a1f8d542d0bff9e76fa86d Mon Sep 17 00:00:00 2001 From: Raafat Date: Wed, 26 Apr 2023 12:08:35 +0200 Subject: [PATCH 2/4] update readme --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index faf2c59..056e755 100644 --- a/README.md +++ b/README.md @@ -139,6 +139,7 @@ mrsal.start_consumer( callback_args=(test_config.HOST, 'friendship_queue'), inactivity_timeout=1, requeue=False, + fast_setup=True, callback_with_delivery_info=True ) ``` @@ -188,6 +189,7 @@ def test_concurrent_consumer(): exchange=EXCHANGE, exchange_type=EXCHANGE_TYPE, routing_key=ROUTING_KEY, inactivity_timeout=INACTIVITY_TIMEOUT, + fast_setup=True, callback_with_delivery_info=True) print(f"Concurrent consumers are done") From 8ff35c0e5756d73c6d092f1eb6a1da1e41d00216 Mon Sep 17 00:00:00 2001 From: Raafat Date: Wed, 26 Apr 2023 12:09:19 +0200 Subject: [PATCH 3/4] update version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index b36931b..149d54d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "mrsal" -version = "0.4.1-alpha" +version = "0.4.2-alpha" description = "Mrsal is a simple to use message broker abstraction on top of RabbitMQ and Pika." authors = ["Raafat ", "Jon E Nesvold "] maintainers = ["Raafat ", "Jon E Nesvold "] From f989618922fe84d8cb041f565364ce230ff4afe2 Mon Sep 17 00:00:00 2001 From: Raafat Date: Wed, 26 Apr 2023 12:24:53 +0200 Subject: [PATCH 4/4] fix typo --- mrsal/mrsal.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mrsal/mrsal.py b/mrsal/mrsal.py index ee3a62b..f90fe46 100644 --- a/mrsal/mrsal.py +++ b/mrsal/mrsal.py @@ -337,7 +337,7 @@ def start_consumer( - when True, the method will create the specified exchange, queue and bind them together using the routing kye. - If False, this method will check if the specified exchange and queue - are already exist before start consuming. + already exist before start consuming. """ print_thread_index = f"thread={str(thread_num)} -> " if thread_num is not None else "" self.log.info(f'{print_thread_index} Consuming messages queue= {queue}, requeue= {requeue}, inactivity_timeout= {inactivity_timeout}') @@ -477,7 +477,7 @@ def publish_message( - when True, will the method create the specified exchange, queue and bind them together using the routing kye. - If False, this method will check if the specified exchange and queue - are already exist before publishing. + already exist before publishing. :raises UnroutableError: raised when a message published in publisher-acknowledgments mode (see