diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index 3b235aadb..9cd5fa490 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -25,6 +25,7 @@ APScheduler, see the :doc:`migration section `. - Fixed scheduler not resuming job processing when ``max_concurrent_jobs`` had been reached and then a job was completed, thus making job processing possible again (PR by MohammadAmin Vahedinia) +- Fixed the shutdown procedure of the Redis event broker **4.0.0a4** diff --git a/src/apscheduler/eventbrokers/redis.py b/src/apscheduler/eventbrokers/redis.py index 8c967736d..8adbdb14f 100644 --- a/src/apscheduler/eventbrokers/redis.py +++ b/src/apscheduler/eventbrokers/redis.py @@ -3,10 +3,12 @@ from asyncio import CancelledError from contextlib import AsyncExitStack from logging import Logger +from typing import Any import anyio import attrs import tenacity +from anyio import move_on_after from redis import ConnectionError from redis.asyncio import Redis from redis.asyncio.client import PubSub @@ -35,10 +37,11 @@ class RedisEventBroker(BaseExternalEventBroker): client: Redis channel: str = attrs.field(kw_only=True, default="apscheduler") stop_check_interval: float = attrs.field(kw_only=True, default=1) + _close_client_on_exit: bool = attrs.field(kw_only=True, default=False) _stopped: bool = attrs.field(init=False, default=True) @classmethod - def from_url(cls, url: str, **kwargs) -> RedisEventBroker: + def from_url(cls, url: str, **kwargs: Any) -> RedisEventBroker: """ Create a new event broker from a URL. @@ -49,7 +52,7 @@ def from_url(cls, url: str, **kwargs) -> RedisEventBroker: """ pool = ConnectionPool.from_url(url) client = Redis(connection_pool=pool) - return cls(client, **kwargs) + return cls(client, close_client_on_exit=True, **kwargs) def _retry(self) -> tenacity.AsyncRetrying: def after_attempt(retry_state: tenacity.RetryCallState) -> None: @@ -69,10 +72,19 @@ def after_attempt(retry_state: tenacity.RetryCallState) -> None: reraise=True, ) + async def _disconnect(self) -> None: + with move_on_after(5, shield=True): + await self.client.aclose(close_connection_pool=True) + async def start(self, exit_stack: AsyncExitStack, logger: Logger) -> None: - await super().start(exit_stack, logger) - pubsub = self.client.pubsub() + # Close the client and its connection pool if this broker was created using + # .from_url() + if self._close_client_on_exit: + exit_stack.push_async_callback(self._disconnect) + + pubsub = await exit_stack.enter_async_context(self.client.pubsub()) await pubsub.subscribe(self.channel) + await super().start(exit_stack, logger) self._stopped = False exit_stack.callback(setattr, self, "_stopped", True) @@ -101,7 +113,7 @@ async def _listen_messages(self, pubsub: PubSub) -> None: "%s listener crashed", self.__class__.__name__ ) - await pubsub.close() + await pubsub.aclose() raise async def publish(self, event: Event) -> None: