diff --git a/dj_cqrs/transport/rabbit_mq.py b/dj_cqrs/transport/rabbit_mq.py index 2d7837c..478763e 100644 --- a/dj_cqrs/transport/rabbit_mq.py +++ b/dj_cqrs/transport/rabbit_mq.py @@ -109,18 +109,26 @@ def _produce_with_retries(cls, payload, retries): cls._produce_message(channel, exchange, payload) cls.log_produced(payload) except ( - exceptions.AMQPError, exceptions.ChannelError, exceptions.ReentrancyError, + exceptions.AMQPError, + exceptions.ChannelError, + exceptions.ReentrancyError, AMQPConnectorException, - ): - logger.exception("CQRS couldn't be published: pk = {0} ({1}).{2}".format( - payload.pk, payload.cqrs_id, " Reconnect..." if retries else "", - )) - + ) as e: # in case of any error - close connection and try to reconnect cls.clean_connection() - if retries: - cls._produce_with_retries(payload, retries - 1) + base_log_message = "CQRS couldn't be published: pk = {0} ({1}).".format( + payload.pk, payload.cqrs_id, + ) + if not retries: + logger.exception(base_log_message) + return + + logger.warning('{0} Error: {1}. Reconnect...'.format( + base_log_message, e.__class__.__name__, + )) + + cls._produce_with_retries(payload, retries - 1) @classmethod def _consume_message(cls, ch, method, properties, body, delay_queue): diff --git a/tests/test_transport/test_rabbit_mq.py b/tests/test_transport/test_rabbit_mq.py index 471fe68..e7c3ca1 100644 --- a/tests/test_transport/test_rabbit_mq.py +++ b/tests/test_transport/test_rabbit_mq.py @@ -8,7 +8,12 @@ import ujson from django.db import DatabaseError from pika.adapters.utils.connection_workflow import AMQPConnectorException -from pika.exceptions import AMQPError, ChannelError, ReentrancyError +from pika.exceptions import ( + AMQPError, + ChannelError, + ReentrancyError, + StreamLostError, +) from dj_cqrs.constants import ( DEFAULT_MASTER_AUTO_UPDATE_FIELDS, @@ -223,8 +228,47 @@ def test_produce_retry_on_error(rabbit_transport, mocker, caplog): SignalType.SAVE, 'CQRS_ID', {'id': 1}, 1, ), ) - assert "CQRS couldn't be published: pk = 1 (CQRS_ID). Reconnect..." in caplog.text - assert 'CQRS is published: pk = 1 (CQRS_ID)' in caplog.text + + assert caplog.record_tuples == [ + ( + 'django-cqrs', + logging.WARNING, + "CQRS couldn't be published: pk = 1 (CQRS_ID)." + " Error: AMQPConnectorException. Reconnect...", + ), + ( + 'django-cqrs', + logging.INFO, + 'CQRS is published: pk = 1 (CQRS_ID), correlation_id = None.', + ), + ] + + +def test_produce_retry_on_error_1(rabbit_transport, mocker, caplog): + mocker.patch.object(RabbitMQTransport, '_get_producer_rmq_objects', side_effect=[ + StreamLostError, + StreamLostError, + ]) + mocker.patch.object(RabbitMQTransport, '_produce_message', return_value=True) + + rabbit_transport.produce( + TransportPayload( + SignalType.SAVE, 'CQRS_ID', {'id': 1}, 1, + ), + ) + + assert caplog.record_tuples == [ + ( + 'django-cqrs', + logging.WARNING, + "CQRS couldn't be published: pk = 1 (CQRS_ID). Error: StreamLostError. Reconnect...", + ), + ( + 'django-cqrs', + logging.ERROR, + "CQRS couldn't be published: pk = 1 (CQRS_ID).", + ), + ] def test_produce_message_ok(mocker):