From 0db8ebf7e65fce1ccacb31a80cbf9c625635e12c Mon Sep 17 00:00:00 2001 From: Anton Ilyushenkov Date: Wed, 6 Dec 2023 20:20:49 +0300 Subject: [PATCH] in_pubsub property now indicates boolead flag instead numbers of created channels for Cluster, Pool, RedisConnections --- src/aioredis_cluster/cluster.py | 5 ++- src/aioredis_cluster/connection.py | 14 ++------ src/aioredis_cluster/pool.py | 17 ++++++---- tests/system_tests/test_redis_cluster.py | 6 ++-- .../test_connection_pubsub.py | 33 ++++++++++++++----- 5 files changed, 44 insertions(+), 31 deletions(-) diff --git a/src/aioredis_cluster/cluster.py b/src/aioredis_cluster/cluster.py index 967d7dc..93cb73e 100644 --- a/src/aioredis_cluster/cluster.py +++ b/src/aioredis_cluster/cluster.py @@ -362,7 +362,10 @@ def in_pubsub(self) -> int: Can be tested as bool indicating Pub/Sub mode state. """ - return sum(p.in_pubsub for p in self._pooler.pools()) + for pool in self._pooler.pools(): + if pool.in_pubsub: + return 1 + return 0 @property def pubsub_channels(self) -> Mapping[str, AbcChannel]: diff --git a/src/aioredis_cluster/connection.py b/src/aioredis_cluster/connection.py index 01d278e..bfd33f9 100644 --- a/src/aioredis_cluster/connection.py +++ b/src/aioredis_cluster/connection.py @@ -95,7 +95,6 @@ def __init__(self) -> None: self._sharded: coerced_keys_dict[AbcChannel] = coerced_keys_dict() self._sharded_to_slot: Dict[bytes, int] = {} self._slot_to_sharded: Dict[int, Set[bytes]] = {} - self._pending_unsubscribe: Set[Tuple[PubSubType, bytes]] = set() @property def channels(self) -> Mapping[str, AbcChannel]: @@ -112,14 +111,6 @@ def sharded(self) -> Mapping[str, AbcChannel]: """Returns read-only sharded channels dict.""" return MappingProxyType(self._sharded) - def channel_pending_unsubscribe( - self, - *, - channel_type: PubSubType, - channel_name: bytes, - ) -> None: - self._pending_unsubscribe.add((channel_type, channel_name)) - def channel_subscribe( self, *, @@ -531,9 +522,10 @@ def in_transaction(self) -> bool: def in_pubsub(self) -> int: """Indicates that connection is in PUB/SUB mode. - Provides the number of subscribed channels. + This implementation NOT provides the number of subscribed channels + and provides only boolean flag """ - return self._pubsub_channels_store.channels_total + return int(self._client_in_pubsub) async def select(self, db: int) -> bool: """Change the selected database for the current connection.""" diff --git a/src/aioredis_cluster/pool.py b/src/aioredis_cluster/pool.py index cdf33e8..6bde084 100644 --- a/src/aioredis_cluster/pool.py +++ b/src/aioredis_cluster/pool.py @@ -305,12 +305,15 @@ async def set_readonly(self, value: bool) -> None: @property def in_pubsub(self) -> int: - in_pubsub = 0 - if self._pubsub_conn and not self._pubsub_conn.closed: - in_pubsub += self._pubsub_conn.in_pubsub - if self._sharded_pubsub_conn and not self._sharded_pubsub_conn.closed: - in_pubsub += self._sharded_pubsub_conn.in_pubsub - return in_pubsub + if self._pubsub_conn and not self._pubsub_conn.closed and self._pubsub_conn.in_pubsub: + return 1 + if ( + self._sharded_pubsub_conn + and not self._sharded_pubsub_conn.closed + and self._sharded_pubsub_conn.in_pubsub + ): + return 1 + return 0 @property def pubsub_channels(self) -> Mapping[str, AbcChannel]: @@ -379,7 +382,7 @@ def release(self, conn: AbcConnection) -> None: logger.warning("Connection %r is in transaction, closing it.", conn) conn.close() elif conn.in_pubsub: - logger.warning("Connection %r is in subscribe mode, closing it.", conn) + logger.warning("Connection %r is in PubSub mode, closing it.", conn) conn.close() elif conn._waiters: logger.warning("Connection %r has pending commands, closing it.", conn) diff --git a/tests/system_tests/test_redis_cluster.py b/tests/system_tests/test_redis_cluster.py index 3a97fd4..1942b8c 100644 --- a/tests/system_tests/test_redis_cluster.py +++ b/tests/system_tests/test_redis_cluster.py @@ -156,7 +156,7 @@ async def test_sharded_pubsub(redis_cluster): ch2: Channel = channels[0] assert len(cl1.sharded_pubsub_channels) == 2 - assert cl1.in_pubsub == 2 + assert cl1.in_pubsub == 1 assert len(cl1.channels) == 0 assert len(cl1.patterns) == 0 @@ -173,7 +173,7 @@ async def test_sharded_pubsub(redis_cluster): await cl1.sunsubscribe("channel2") assert len(cl1.sharded_pubsub_channels) == 0 - assert cl1.in_pubsub == 0 + assert cl1.in_pubsub == 1 @pytest.mark.redis_version(gte="7.0.0") @@ -186,7 +186,7 @@ async def test_sharded_pubsub__multiple_subscribe(redis_cluster): ch3: Channel = (await cl1.ssubscribe("channel:{shard_key}:3"))[0] assert len(cl1.sharded_pubsub_channels) == 3 - assert cl1.in_pubsub == 3 + assert cl1.in_pubsub == 1 shard_pool = await cl1.keys_master("{shard_key}") assert len(shard_pool.sharded_pubsub_channels) == 3 diff --git a/tests/unit_tests/aioredis_cluster/test_connection_pubsub.py b/tests/unit_tests/aioredis_cluster/test_connection_pubsub.py index 1340237..ebdc32b 100644 --- a/tests/unit_tests/aioredis_cluster/test_connection_pubsub.py +++ b/tests/unit_tests/aioredis_cluster/test_connection_pubsub.py @@ -80,7 +80,7 @@ async def test_execute__simple_subscribe(add_async_finalizer): assert result_channel == [[b"subscribe", b"chan", 1]] assert result_pattern == [[b"psubscribe", b"chan", 2]] assert result_sharded == [[b"ssubscribe", b"chan", 1]] - assert redis.in_pubsub == 3 + assert redis.in_pubsub == 1 assert redis._client_in_pubsub is True assert redis._server_in_pubsub is True assert len(redis._waiters) == 0 @@ -108,7 +108,10 @@ async def test_execute__simple_unsubscribe(add_async_finalizer): await redis.execute_pubsub("PSUBSCRIBE", "chan") await redis.execute_pubsub("SUBSCRIBE", "chan") - assert redis.in_pubsub == 3 + assert redis.in_pubsub == 1 + assert len(redis.pubsub_channels) == 1 + assert len(redis.pubsub_patterns) == 1 + assert len(redis.sharded_pubsub_channels) == 1 reader.queue.put_nowait([b"unsubscribe", b"chan", 1]) reader.queue.put_nowait([b"punsubscribe", b"chan", 0]) @@ -119,7 +122,10 @@ async def test_execute__simple_unsubscribe(add_async_finalizer): await moment() - assert redis.in_pubsub == 0 + assert redis.in_pubsub == 1 + assert len(redis.pubsub_channels) == 0 + assert len(redis.pubsub_patterns) == 0 + assert len(redis.sharded_pubsub_channels) == 0 assert result_channel == [[b"unsubscribe", b"chan", 1]] assert result_pattern == [[b"punsubscribe", b"chan", 0]] assert result_sharded == [[b"sunsubscribe", b"chan", 0]] @@ -244,7 +250,10 @@ async def test__redis_push_unsubscribe(add_async_finalizer): await redis.execute_pubsub("PSUBSCRIBE", "chan:3", "chan:4") await redis.execute_pubsub("SSUBSCRIBE", "chan:5:{shard}", "chan:6:{shard}") - assert redis.in_pubsub == 6 + assert redis.in_pubsub == 1 + assert len(redis.pubsub_channels) == 2 + assert len(redis.pubsub_patterns) == 2 + assert len(redis.sharded_pubsub_channels) == 2 reader.queue.put_nowait([b"unsubscribe", b"chan:1", 3]) reader.queue.put_nowait([b"unsubscribe", b"chan:2", 2]) @@ -257,7 +266,10 @@ async def test__redis_push_unsubscribe(add_async_finalizer): await moment() - assert redis.in_pubsub == 0 + assert redis.in_pubsub == 1 + assert len(redis.pubsub_channels) == 0 + assert len(redis.pubsub_patterns) == 0 + assert len(redis.sharded_pubsub_channels) == 0 assert redis._reader_task.done() is False @@ -316,7 +328,7 @@ async def test_execute__unexpectable_unsubscribe_and_moved(add_async_finalizer): reader.queue.put_nowait(MovedError("MOVED 10271 127.0.0.1:6379")) await moment() - assert redis.in_pubsub == 0 + assert redis.in_pubsub == 1 assert redis._reader_task.done() is False @@ -355,7 +367,8 @@ async def test_execute__client_unsubscribe_with_server_unsubscribe(add_async_fin assert sub_result1 == [[b"ssubscribe", b"chan:1", 1]] assert sub_result2 == [[b"ssubscribe", b"chan:2", 2]] assert sub_result3 == [[b"ssubscribe", b"chan:3", 3]] - assert redis.in_pubsub == 3 + assert redis.in_pubsub == 1 + assert len(redis.sharded_pubsub_channels) == 3 reader.queue.put_nowait([b"sunsubscribe", b"chan:1", 2]) reader.queue.put_nowait([b"sunsubscribe", b"chan:3", 1]) @@ -370,7 +383,8 @@ async def test_execute__client_unsubscribe_with_server_unsubscribe(add_async_fin await moment() - assert redis.in_pubsub == 0 + assert redis.in_pubsub == 1 + assert len(redis.sharded_pubsub_channels) == 0 assert redis._reader_task is not None assert redis._reader_task.done() is False @@ -497,7 +511,8 @@ async def test_subscribe_and_immediately_unsubscribe(caplog, add_async_finalizer for record in caplog.records: assert "No waiter for received reply" not in record.message, record.message - assert redis.in_pubsub == 0 + assert redis.in_pubsub == 1 + assert len(redis.sharded_pubsub_channels) == 0 assert redis._reader_task is not None assert redis._reader_task.done() is False