Skip to content

Commit

Permalink
in_pubsub property now indicates boolead flag instead numbers of crea…
Browse files Browse the repository at this point in the history
…ted channels for Cluster, Pool, RedisConnections
  • Loading branch information
Anton Ilyushenkov committed Dec 6, 2023
1 parent 5a3e467 commit 0db8ebf
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 31 deletions.
5 changes: 4 additions & 1 deletion src/aioredis_cluster/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,10 @@ def in_pubsub(self) -> int:
Can be tested as bool indicating Pub/Sub mode state.
"""

return sum(p.in_pubsub for p in self._pooler.pools())
for pool in self._pooler.pools():
if pool.in_pubsub:
return 1
return 0

@property
def pubsub_channels(self) -> Mapping[str, AbcChannel]:
Expand Down
14 changes: 3 additions & 11 deletions src/aioredis_cluster/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ def __init__(self) -> None:
self._sharded: coerced_keys_dict[AbcChannel] = coerced_keys_dict()
self._sharded_to_slot: Dict[bytes, int] = {}
self._slot_to_sharded: Dict[int, Set[bytes]] = {}
self._pending_unsubscribe: Set[Tuple[PubSubType, bytes]] = set()

@property
def channels(self) -> Mapping[str, AbcChannel]:
Expand All @@ -112,14 +111,6 @@ def sharded(self) -> Mapping[str, AbcChannel]:
"""Returns read-only sharded channels dict."""
return MappingProxyType(self._sharded)

def channel_pending_unsubscribe(
self,
*,
channel_type: PubSubType,
channel_name: bytes,
) -> None:
self._pending_unsubscribe.add((channel_type, channel_name))

def channel_subscribe(
self,
*,
Expand Down Expand Up @@ -531,9 +522,10 @@ def in_transaction(self) -> bool:
def in_pubsub(self) -> int:
"""Indicates that connection is in PUB/SUB mode.
Provides the number of subscribed channels.
This implementation NOT provides the number of subscribed channels
and provides only boolean flag
"""
return self._pubsub_channels_store.channels_total
return int(self._client_in_pubsub)

async def select(self, db: int) -> bool:
"""Change the selected database for the current connection."""
Expand Down
17 changes: 10 additions & 7 deletions src/aioredis_cluster/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,12 +305,15 @@ async def set_readonly(self, value: bool) -> None:

@property
def in_pubsub(self) -> int:
in_pubsub = 0
if self._pubsub_conn and not self._pubsub_conn.closed:
in_pubsub += self._pubsub_conn.in_pubsub
if self._sharded_pubsub_conn and not self._sharded_pubsub_conn.closed:
in_pubsub += self._sharded_pubsub_conn.in_pubsub
return in_pubsub
if self._pubsub_conn and not self._pubsub_conn.closed and self._pubsub_conn.in_pubsub:
return 1
if (
self._sharded_pubsub_conn
and not self._sharded_pubsub_conn.closed
and self._sharded_pubsub_conn.in_pubsub
):
return 1
return 0

@property
def pubsub_channels(self) -> Mapping[str, AbcChannel]:
Expand Down Expand Up @@ -379,7 +382,7 @@ def release(self, conn: AbcConnection) -> None:
logger.warning("Connection %r is in transaction, closing it.", conn)
conn.close()
elif conn.in_pubsub:
logger.warning("Connection %r is in subscribe mode, closing it.", conn)
logger.warning("Connection %r is in PubSub mode, closing it.", conn)
conn.close()
elif conn._waiters:
logger.warning("Connection %r has pending commands, closing it.", conn)
Expand Down
6 changes: 3 additions & 3 deletions tests/system_tests/test_redis_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ async def test_sharded_pubsub(redis_cluster):
ch2: Channel = channels[0]

assert len(cl1.sharded_pubsub_channels) == 2
assert cl1.in_pubsub == 2
assert cl1.in_pubsub == 1
assert len(cl1.channels) == 0
assert len(cl1.patterns) == 0

Expand All @@ -173,7 +173,7 @@ async def test_sharded_pubsub(redis_cluster):
await cl1.sunsubscribe("channel2")

assert len(cl1.sharded_pubsub_channels) == 0
assert cl1.in_pubsub == 0
assert cl1.in_pubsub == 1


@pytest.mark.redis_version(gte="7.0.0")
Expand All @@ -186,7 +186,7 @@ async def test_sharded_pubsub__multiple_subscribe(redis_cluster):
ch3: Channel = (await cl1.ssubscribe("channel:{shard_key}:3"))[0]

assert len(cl1.sharded_pubsub_channels) == 3
assert cl1.in_pubsub == 3
assert cl1.in_pubsub == 1

shard_pool = await cl1.keys_master("{shard_key}")
assert len(shard_pool.sharded_pubsub_channels) == 3
Expand Down
33 changes: 24 additions & 9 deletions tests/unit_tests/aioredis_cluster/test_connection_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ async def test_execute__simple_subscribe(add_async_finalizer):
assert result_channel == [[b"subscribe", b"chan", 1]]
assert result_pattern == [[b"psubscribe", b"chan", 2]]
assert result_sharded == [[b"ssubscribe", b"chan", 1]]
assert redis.in_pubsub == 3
assert redis.in_pubsub == 1
assert redis._client_in_pubsub is True
assert redis._server_in_pubsub is True
assert len(redis._waiters) == 0
Expand Down Expand Up @@ -108,7 +108,10 @@ async def test_execute__simple_unsubscribe(add_async_finalizer):
await redis.execute_pubsub("PSUBSCRIBE", "chan")
await redis.execute_pubsub("SUBSCRIBE", "chan")

assert redis.in_pubsub == 3
assert redis.in_pubsub == 1
assert len(redis.pubsub_channels) == 1
assert len(redis.pubsub_patterns) == 1
assert len(redis.sharded_pubsub_channels) == 1

reader.queue.put_nowait([b"unsubscribe", b"chan", 1])
reader.queue.put_nowait([b"punsubscribe", b"chan", 0])
Expand All @@ -119,7 +122,10 @@ async def test_execute__simple_unsubscribe(add_async_finalizer):

await moment()

assert redis.in_pubsub == 0
assert redis.in_pubsub == 1
assert len(redis.pubsub_channels) == 0
assert len(redis.pubsub_patterns) == 0
assert len(redis.sharded_pubsub_channels) == 0
assert result_channel == [[b"unsubscribe", b"chan", 1]]
assert result_pattern == [[b"punsubscribe", b"chan", 0]]
assert result_sharded == [[b"sunsubscribe", b"chan", 0]]
Expand Down Expand Up @@ -244,7 +250,10 @@ async def test__redis_push_unsubscribe(add_async_finalizer):
await redis.execute_pubsub("PSUBSCRIBE", "chan:3", "chan:4")
await redis.execute_pubsub("SSUBSCRIBE", "chan:5:{shard}", "chan:6:{shard}")

assert redis.in_pubsub == 6
assert redis.in_pubsub == 1
assert len(redis.pubsub_channels) == 2
assert len(redis.pubsub_patterns) == 2
assert len(redis.sharded_pubsub_channels) == 2

reader.queue.put_nowait([b"unsubscribe", b"chan:1", 3])
reader.queue.put_nowait([b"unsubscribe", b"chan:2", 2])
Expand All @@ -257,7 +266,10 @@ async def test__redis_push_unsubscribe(add_async_finalizer):

await moment()

assert redis.in_pubsub == 0
assert redis.in_pubsub == 1
assert len(redis.pubsub_channels) == 0
assert len(redis.pubsub_patterns) == 0
assert len(redis.sharded_pubsub_channels) == 0

assert redis._reader_task.done() is False

Expand Down Expand Up @@ -316,7 +328,7 @@ async def test_execute__unexpectable_unsubscribe_and_moved(add_async_finalizer):
reader.queue.put_nowait(MovedError("MOVED 10271 127.0.0.1:6379"))
await moment()

assert redis.in_pubsub == 0
assert redis.in_pubsub == 1
assert redis._reader_task.done() is False


Expand Down Expand Up @@ -355,7 +367,8 @@ async def test_execute__client_unsubscribe_with_server_unsubscribe(add_async_fin
assert sub_result1 == [[b"ssubscribe", b"chan:1", 1]]
assert sub_result2 == [[b"ssubscribe", b"chan:2", 2]]
assert sub_result3 == [[b"ssubscribe", b"chan:3", 3]]
assert redis.in_pubsub == 3
assert redis.in_pubsub == 1
assert len(redis.sharded_pubsub_channels) == 3

reader.queue.put_nowait([b"sunsubscribe", b"chan:1", 2])
reader.queue.put_nowait([b"sunsubscribe", b"chan:3", 1])
Expand All @@ -370,7 +383,8 @@ async def test_execute__client_unsubscribe_with_server_unsubscribe(add_async_fin

await moment()

assert redis.in_pubsub == 0
assert redis.in_pubsub == 1
assert len(redis.sharded_pubsub_channels) == 0

assert redis._reader_task is not None
assert redis._reader_task.done() is False
Expand Down Expand Up @@ -497,7 +511,8 @@ async def test_subscribe_and_immediately_unsubscribe(caplog, add_async_finalizer
for record in caplog.records:
assert "No waiter for received reply" not in record.message, record.message

assert redis.in_pubsub == 0
assert redis.in_pubsub == 1
assert len(redis.sharded_pubsub_channels) == 0

assert redis._reader_task is not None
assert redis._reader_task.done() is False

0 comments on commit 0db8ebf

Please sign in to comment.