Skip to content

Commit

Permalink
Fixed the shutdown procedure of the Redis event broker
Browse files Browse the repository at this point in the history
The client was not closed when the RedisEventBroker was initialized using RedisEventBroker.from_url().
Also, the pub-sub instance was not closed if the broker was shut down normally.
Additionally, when it was shut down due to an error, it used the deprecated .close() method to shut down the pub-sub instance.
  • Loading branch information
agronholm committed May 11, 2024
1 parent 00c9796 commit 3390502
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 5 deletions.
1 change: 1 addition & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ APScheduler, see the :doc:`migration section <migration>`.
- Fixed scheduler not resuming job processing when ``max_concurrent_jobs`` had been
reached and then a job was completed, thus making job processing possible again
(PR by MohammadAmin Vahedinia)
- Fixed the shutdown procedure of the Redis event broker

**4.0.0a4**

Expand Down
22 changes: 17 additions & 5 deletions src/apscheduler/eventbrokers/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
from asyncio import CancelledError
from contextlib import AsyncExitStack
from logging import Logger
from typing import Any

import anyio
import attrs
import tenacity
from anyio import move_on_after
from redis import ConnectionError
from redis.asyncio import Redis
from redis.asyncio.client import PubSub
Expand Down Expand Up @@ -35,10 +37,11 @@ class RedisEventBroker(BaseExternalEventBroker):
client: Redis
channel: str = attrs.field(kw_only=True, default="apscheduler")
stop_check_interval: float = attrs.field(kw_only=True, default=1)
_close_client_on_exit: bool = attrs.field(kw_only=True, default=False)
_stopped: bool = attrs.field(init=False, default=True)

@classmethod
def from_url(cls, url: str, **kwargs) -> RedisEventBroker:
def from_url(cls, url: str, **kwargs: Any) -> RedisEventBroker:
"""
Create a new event broker from a URL.
Expand All @@ -49,7 +52,7 @@ def from_url(cls, url: str, **kwargs) -> RedisEventBroker:
"""
pool = ConnectionPool.from_url(url)
client = Redis(connection_pool=pool)
return cls(client, **kwargs)
return cls(client, close_client_on_exit=True, **kwargs)

def _retry(self) -> tenacity.AsyncRetrying:
def after_attempt(retry_state: tenacity.RetryCallState) -> None:
Expand All @@ -69,10 +72,19 @@ def after_attempt(retry_state: tenacity.RetryCallState) -> None:
reraise=True,
)

async def _disconnect(self) -> None:
with move_on_after(5, shield=True):
await self.client.aclose(close_connection_pool=True)

async def start(self, exit_stack: AsyncExitStack, logger: Logger) -> None:
await super().start(exit_stack, logger)
pubsub = self.client.pubsub()
# Close the client and its connection pool if this broker was created using
# .from_url()
if self._close_client_on_exit:
exit_stack.push_async_callback(self._disconnect)

pubsub = await exit_stack.enter_async_context(self.client.pubsub())
await pubsub.subscribe(self.channel)
await super().start(exit_stack, logger)

self._stopped = False
exit_stack.callback(setattr, self, "_stopped", True)
Expand Down Expand Up @@ -101,7 +113,7 @@ async def _listen_messages(self, pubsub: PubSub) -> None:
"%s listener crashed", self.__class__.__name__
)

await pubsub.close()
await pubsub.aclose()
raise

async def publish(self, event: Event) -> None:
Expand Down

0 comments on commit 3390502

Please sign in to comment.