Skip to content

Commit

Permalink
fix: do not ensure queue and exchange creation after creating them
Browse files Browse the repository at this point in the history
  • Loading branch information
ClemDoum committed Nov 29, 2024
1 parent 5af262e commit cca44fb
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 8 deletions.
14 changes: 8 additions & 6 deletions icij-worker/icij_worker/tests/worker/test_amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
from aio_pika import (
ExchangeType,
Message as AMQPMessage,
RobustConnection,
connect_robust,
)
from pydantic import Field

from conftest import RABBITMQ_TEST_PASSWORD, RABBITMQ_TEST_USER
from icij_common.pydantic_utils import safe_copy
from icij_common.test_utils import async_true_after, fail_if_exception
from icij_worker import (
Expand Down Expand Up @@ -45,7 +45,7 @@
TestableAMQPPublisher,
get_queue_size,
)
from icij_worker.utils.amqp import AMQPManagementClient, AMQPMixin
from icij_worker.utils.amqp import AMQPManagementClient, AMQPMixin, RobustConnection
from icij_worker.worker.amqp import AMQPWorker, AMQPWorkerConfig
from icij_worker.worker.worker import WE

Expand Down Expand Up @@ -106,6 +106,7 @@ def _create_publisher(self):
broker_url=self._broker_url,
connection_timeout_s=self._connection_timeout_s,
reconnection_wait_s=self._reconnection_wait_s,
is_qpid=self._is_qpid,
app_id=self._app.name,
)

Expand All @@ -120,8 +121,9 @@ def amqp_worker_config() -> TestableAMQPWorkerConfig:
rabbitmq_port=RABBITMQ_TEST_PORT,
rabbitmq_management_port=RABBITMQ_MANAGEMENT_PORT,
rabbitmq_vhost=DEFAULT_VHOST,
rabbitmq_user="guest",
rabbitmq_password="guest",
rabbitmq_user=RABBITMQ_TEST_USER,
rabbitmq_password=RABBITMQ_TEST_PASSWORD,
rabbitmq_is_qpid=False,
)
return config

Expand All @@ -147,7 +149,7 @@ def amqp_worker(

@pytest.fixture
async def populate_tasks(rabbit_mq: str, request):
connection = await connect_robust(rabbit_mq)
connection = await connect_robust(rabbit_mq, connection_class=RobustConnection)
routing_strategy = RoutingStrategy()
group = getattr(request, "param", None)
task_routing = routing_strategy.amqp_task_routing(group)
Expand All @@ -168,7 +170,7 @@ async def populate_tasks(rabbit_mq: str, request):
),
]
async with connection:
channel = await connection.channel()
channel = await connection.channel(publisher_confirms=False)
task_ex = await channel.declare_exchange(
task_routing.exchange.name, durable=True
)
Expand Down
4 changes: 2 additions & 2 deletions icij-worker/icij_worker/utils/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,8 @@ async def _get_queue_iterator(
declare_queues=declare_queues,
durable_queues=durable_queues,
)
ex = await self._channel.get_exchange(routing.exchange.name, ensure=True)
queue = await self._channel.get_queue(routing.queue_name, ensure=True)
ex = await self._channel.get_exchange(routing.exchange.name, ensure=False)
queue = await self._channel.get_queue(routing.queue_name, ensure=False)
kwargs = dict()
if self._inactive_after_s is not None:
kwargs["timeout"] = self._inactive_after_s
Expand Down

0 comments on commit cca44fb

Please sign in to comment.