Skip to content

Commit

Permalink
Merge pull request #18 from NeoMedSys/MRSAL-6
Browse files Browse the repository at this point in the history
MRSAL-6: Add exchange_exist and queue_exist
  • Loading branch information
rafatzahran authored Apr 26, 2023
2 parents d46c530 + f989618 commit 38d33b6
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 9 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ mrsal.start_consumer(
callback_args=(test_config.HOST, 'friendship_queue'),
inactivity_timeout=1,
requeue=False,
fast_setup=True,
callback_with_delivery_info=True
)
```
Expand Down Expand Up @@ -188,6 +189,7 @@ def test_concurrent_consumer():
exchange=EXCHANGE, exchange_type=EXCHANGE_TYPE,
routing_key=ROUTING_KEY,
inactivity_timeout=INACTIVITY_TIMEOUT,
fast_setup=True,
callback_with_delivery_info=True)
print(f"Concurrent consumers are done")

Expand Down
58 changes: 51 additions & 7 deletions mrsal/mrsal.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import pika
from pika import SSLOptions
from pika.exchange_type import ExchangeType
from retry import retry

from mrsal.config.logging import get_logger
Expand Down Expand Up @@ -132,8 +133,11 @@ def setup_exchange(self, exchange: str, exchange_type: str, arguments: Dict[str,
except AttributeError as err:
self.log.error(f'Caught a attribute error: {err}')
raise AttributeError
except pika.exceptions.ChannelClosedByBroker as err:
self.log.error(f'Caught ChannelClosedByBroker error: {err}')
raise pika.exceptions.ChannelClosedByBroker(404, str(err))
except pika.exceptions.ConnectionClosedByBroker as err:
self.log.error(f'Caught a connection closed by broker error: {err}')
self.log.error(f'Caught ConnectionClosedByBroker error: {err}')
raise pika.exceptions.ConnectionClosedByBroker(503, str(err))

def setup_queue(self, queue: str, arguments: Dict[str, str] = None, durable: bool = True,
Expand Down Expand Up @@ -172,9 +176,9 @@ def setup_queue(self, queue: str, arguments: Dict[str, str] = None, durable: boo
passive=passive)
self.log.success(f'Queue is declared successfully: {queue_declare_info}, result={queue_declare_result.method}')
return queue_declare_result
except pika.exceptions.ChannelClosedByBroker as err:
except (pika.exceptions.ChannelClosedByBroker, pika.exceptions.ChannelWrongStateError) as err:
self.log.error(f'Caught ChannelClosedByBroker: {err}')
raise pika.exceptions.ChannelClosedByBroker(503, str(err))
raise pika.exceptions.ChannelClosedByBroker(404, str(err))

def setup_queue_binding(self, exchange: str, queue: str, routing_key: str = None, arguments=None):
"""Bind queue to exchange.
Expand Down Expand Up @@ -246,6 +250,17 @@ def exchange_delete(self, exchange: str):
def confirm_delivery(self):
self._channel.confirm_delivery()

def exchange_exist(self, exchange: str, exchange_type: ExchangeType):
exch_result: pika.frame.Method = self.setup_exchange(
exchange=exchange, exchange_type=exchange_type, passive=True
)
return exch_result

def queue_exist(self, queue: str):
queue_result = self.setup_queue(queue=queue, passive=True)
# message_count1 = result1.method.message_count
return queue_result

# --------------------------------------------------------------
# --------------------------------------------------------------
# TODO NOT IN USE: Need to reformat it to publish messages to dead letters exchange after exceeding retries limit
Expand Down Expand Up @@ -318,15 +333,29 @@ def start_consumer(
:param bool callback_with_delivery_info: Specify whether the callback method needs delivery info.
- spec.Basic.Deliver: Captures the fields for delivered message. E.g:(consumer_tag, delivery_tag, redelivered, exchange, routing_key).
- spec.BasicProperties: Captures the client message sent to the server. E.g:(CONTENT_TYPE, DELIVERY_MODE, MESSAGE_ID, APP_ID).
:param bool fast_setup:
- when True, the method will create the specified exchange, queue
and bind them together using the routing kye.
- If False, this method will check if the specified exchange and queue
already exist before start consuming.
"""
print_thread_index = f"thread={str(thread_num)} -> " if thread_num is not None else ""
self.log.info(f'{print_thread_index} Consuming messages queue= {queue}, requeue= {requeue}, inactivity_timeout= {inactivity_timeout}')
if fast_setup:
# setting up the necessary connections
self.setup_exchange(exchange=exchange, exchange_type=exchange_type)
self.setup_queue(queue=queue)
self.setup_queue_binding(exchange=exchange, queue=queue, routing_key=routing_key)
print_thread_index = f"thread={str(thread_num)} -> " if thread_num is not None else ""
self.log.info(f'{print_thread_index} Consuming messages queue= {queue}, requeue= {requeue}, inactivity_timeout= {inactivity_timeout}')

else:
# Check if the necessary resources (exch & queue) are active
try:
if exchange is not None and exchange_type is not None:
self.exchange_exist(exchange=exchange, exchange_type=exchange_type)
self.queue_exist(queue=queue)
except (pika.exceptions.ChannelClosedByBroker, pika.exceptions.ConnectionClosedByBroker) as err:
self.log.error(f'{print_thread_index} Failed to check active resources. Cancel consumer. {str(err)}')
self._channel.cancel()
raise pika.exceptions.ChannelClosedByBroker(404, str(err))
try:
self.consumer_tag = None
method_frame: spec.Basic.Deliver
Expand Down Expand Up @@ -435,7 +464,7 @@ def start_concurrence_consumer(self, total_threads: int, queue: str, callback: C

@retry((pika.exceptions.UnroutableError), tries=2, delay=5, jitter=(1, 3))
def publish_message(
self, exchange: str, routing_key: str, message: Any, exchange_type: str = None,
self, exchange: str, routing_key: str, message: Any, exchange_type: ExchangeType = ExchangeType.direct,
queue: str = None, fast_setup: bool = False, prop: pika.BasicProperties = None
):
"""Publish message to the exchange specifying routing key and properties.
Expand All @@ -444,6 +473,11 @@ def publish_message(
:param str routing_key: The routing key to bind on
:param bytes body: The message body; empty string if no body
:param pika.spec.BasicProperties properties: message properties
:param bool fast_setup:
- when True, will the method create the specified exchange, queue
and bind them together using the routing kye.
- If False, this method will check if the specified exchange and queue
already exist before publishing.
:raises UnroutableError: raised when a message published in
publisher-acknowledgments mode (see
Expand All @@ -459,6 +493,16 @@ def publish_message(
self.setup_exchange(exchange=exchange, exchange_type=exchange_type)
self.setup_queue(queue=queue)
self.setup_queue_binding(exchange=exchange, queue=queue, routing_key=routing_key)
else:
# Check if the necessary resources (exch & queue) are active
try:
self.exchange_exist(exchange=exchange, exchange_type=exchange_type)
if queue is not None:
self.queue_exist(queue=queue)
except (pika.exceptions.ChannelClosedByBroker, pika.exceptions.ConnectionClosedByBroker) as err:
self.log.error(f'Failed to check active resources. Cancel consumer. {str(err)}')
self._channel.cancel()
raise pika.exceptions.ChannelClosedByBroker(404, str(err))

try:
# Publish the message by serializing it in json dump
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "mrsal"
version = "0.4.0-alpha"
version = "0.4.2-alpha"
description = "Mrsal is a simple to use message broker abstraction on top of RabbitMQ and Pika."
authors = ["Raafat <rafatzahran90@gmail.com>", "Jon E Nesvold <jnesvold@pm.me>"]
maintainers = ["Raafat <rafatzahran90@gmail.com>", "Jon E Nesvold <jnesvold@pm.me>"]
Expand Down
27 changes: 26 additions & 1 deletion tests/test_delay_and_dl_messages/test_exceptions.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import os
from socket import gaierror

import mrsal.config.config as config
import pika
import pytest
from pika.exchange_type import ExchangeType

import mrsal.config.config as config
import tests.config as test_config
from mrsal.config.logging import get_logger
from mrsal.mrsal import Mrsal
Expand Down Expand Up @@ -118,7 +120,30 @@ def test_bind_exceptions():
assert (f'Caught a AttributeError exception caused by "NoneType" object has no attribute "queue_bind": {err2}')


def test_active_exchange_exceptions():
mrsal = Mrsal(host=test_config.HOST,
port=config.RABBITMQ_PORT,
credentials=config.RABBITMQ_CREDENTIALS,
virtual_host=config.V_HOST)
mrsal.connect_to_server()
exchange = 'not_exist_exch'
with pytest.raises(pika.exceptions.ChannelClosedByBroker) as err1:
mrsal.exchange_exist(exchange=exchange, exchange_type=ExchangeType.direct)

def test_active_queue_exceptions():
mrsal = Mrsal(host=test_config.HOST,
port=config.RABBITMQ_PORT,
credentials=config.RABBITMQ_CREDENTIALS,
virtual_host=config.V_HOST)
mrsal.connect_to_server()
queue = 'not_exist_queue'
with pytest.raises(pika.exceptions.ChannelClosedByBroker) as err1:
mrsal.queue_exist(queue=queue)


if __name__ == '__main__':
test_connection_exceptions()
test_exchange_exceptions()
test_bind_exceptions()
test_active_exchange_exceptions()
test_active_queue_exceptions()

0 comments on commit 38d33b6

Please sign in to comment.