Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: amqp TM and Worker connection workflows #47

Merged
merged 1 commit into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions icij-worker/icij_worker/event_publisher/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ def __init__(
self._connection_timeout_s = connection_timeout_s
self._reconnection_wait_s = reconnection_wait_s
self._exit_stack = AsyncExitStack()
# We don't declare and bind anything here, the task manager is in charge of it.
# We use this flag only for testing where we want to set everything up easily
self._declare_and_bind = False
self._declare_and_bind = True

async def __aenter__(self) -> AMQPPublisher:
self.info("starting publisher connection workflow...")
Expand Down
9 changes: 7 additions & 2 deletions icij-worker/icij_worker/task_manager/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ async def _aenter__(self) -> AMQPTaskManager:
await self._connection_workflow()
self._manager_messages_it = (
await self._get_queue_iterator(
self.manager_evt_routing(), declare_exchanges=False
self.manager_evt_routing(),
declare_exchanges=False,
declare_queues=False,
)
)[0]
return self
Expand Down Expand Up @@ -215,7 +217,10 @@ async def _connection_workflow(self):
durable_queues=True,
)
await self._create_routing(
AMQPMixin.worker_evt_routing(), declare_exchanges=True, declare_queues=False
self.worker_evt_routing(), declare_exchanges=True, declare_queues=False
)
await self._create_routing(
self.manager_evt_routing(), declare_exchanges=True, declare_queues=True
)
self._task_x = await self._channel.get_exchange(
self.default_task_routing().exchange.name, ensure=True
Expand Down
5 changes: 4 additions & 1 deletion icij-worker/icij_worker/tests/event_publisher/test_ampq.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,13 @@ async def test_publish_event(rabbit_mq: str, hello_world_task: Task):
assert received_event == event


async def test_publisher_not_create_and_bind_exchanges_and_queues(rabbit_mq: str):
async def test_publisher_should_not_create_and_bind_exchanges_and_queues(
rabbit_mq: str,
):
# Given
broker_url = rabbit_mq
publisher = AMQPPublisher(broker_url=broker_url)
publisher._declare_and_bind = False # pylint: disable=protected-access

# When
msg = "NOT_FOUND - no exchange 'exchangeManagerEvents' in vhost '/'"
Expand Down
Loading