From f798d600a778037dc3970b5d317c6182a7c17969 Mon Sep 17 00:00:00 2001 From: Matthieu Devlin Date: Wed, 27 Sep 2023 10:33:33 -0700 Subject: [PATCH] feat(integrations): add support for cluster clients from redis sdk --- sentry_sdk/integrations/redis/__init__.py | 39 +++++++- sentry_sdk/integrations/redis/asyncio.py | 19 ++-- tests/integrations/redis/cluster/__init__.py | 3 + .../redis/cluster/test_redis_cluster.py | 84 ++++++++++++++++ .../redis/cluster_asyncio/__init__.py | 3 + .../test_redis_cluster_asyncio.py | 96 +++++++++++++++++++ 6 files changed, 229 insertions(+), 15 deletions(-) create mode 100644 tests/integrations/redis/cluster/__init__.py create mode 100644 tests/integrations/redis/cluster/test_redis_cluster.py create mode 100644 tests/integrations/redis/cluster_asyncio/__init__.py create mode 100644 tests/integrations/redis/cluster_asyncio/test_redis_cluster_asyncio.py diff --git a/sentry_sdk/integrations/redis/__init__.py b/sentry_sdk/integrations/redis/__init__.py index 45409a22d9..face24f625 100644 --- a/sentry_sdk/integrations/redis/__init__.py +++ b/sentry_sdk/integrations/redis/__init__.py @@ -64,8 +64,7 @@ def _set_pipeline_data( ): # type: (Span, bool, Any, bool, Sequence[Any]) -> None span.set_tag("redis.is_cluster", is_cluster) - transaction = is_transaction if not is_cluster else False - span.set_tag("redis.transaction", transaction) + span.set_tag("redis.transaction", is_transaction) commands = [] for i, arg in enumerate(command_stack): @@ -103,7 +102,7 @@ def sentry_patched_execute(self, *args, **kwargs): span, is_cluster, get_command_args_fn, - self.transaction, + False if is_cluster else self.transaction, self.command_stack, ) span.set_data(SPANDATA.DB_SYSTEM, "redis") @@ -144,8 +143,37 @@ def _patch_redis(StrictRedis, client): # noqa: N803 patch_redis_async_pipeline, ) - patch_redis_async_client(redis.asyncio.client.StrictRedis) - patch_redis_async_pipeline(redis.asyncio.client.Pipeline) + patch_redis_async_client(redis.asyncio.client.StrictRedis, is_cluster=False) + patch_redis_async_pipeline( + redis.asyncio.client.Pipeline, False, _get_redis_command_args + ) + + +def _patch_redis_cluster(): + # type: () -> None + """Patches the cluster module on redis SDK (as opposed to rediscluster library)""" + try: + from redis import RedisCluster, cluster + except ImportError: + pass + else: + patch_redis_client(RedisCluster, True) + patch_redis_pipeline(cluster.ClusterPipeline, True, _parse_rediscluster_command) + + try: + from redis.asyncio import cluster as async_cluster + except ImportError: + pass + else: + from sentry_sdk.integrations.redis.asyncio import ( + patch_redis_async_client, + patch_redis_async_pipeline, + ) + + patch_redis_async_client(async_cluster.RedisCluster, is_cluster=True) + patch_redis_async_pipeline( + async_cluster.ClusterPipeline, True, _parse_rediscluster_command + ) def _patch_rb(): @@ -200,6 +228,7 @@ def setup_once(): raise DidNotEnable("Redis client not installed") _patch_redis(StrictRedis, client) + _patch_redis_cluster() _patch_rb() try: diff --git a/sentry_sdk/integrations/redis/asyncio.py b/sentry_sdk/integrations/redis/asyncio.py index d0e4e16a87..f0a61da396 100644 --- a/sentry_sdk/integrations/redis/asyncio.py +++ b/sentry_sdk/integrations/redis/asyncio.py @@ -5,7 +5,6 @@ from sentry_sdk.utils import capture_internal_exceptions from sentry_sdk.integrations.redis import ( RedisIntegration, - _get_redis_command_args, _get_span_description, _set_client_data, _set_pipeline_data, @@ -18,8 +17,8 @@ from typing import Any -def patch_redis_async_pipeline(pipeline_cls): - # type: (Any) -> None +def patch_redis_async_pipeline(pipeline_cls, is_cluster, get_command_args_fn): + # type: (Any, bool, Any) -> None old_execute = pipeline_cls.execute async def _sentry_execute(self, *args, **kwargs): @@ -35,10 +34,10 @@ async def _sentry_execute(self, *args, **kwargs): with capture_internal_exceptions(): _set_pipeline_data( span, - False, - _get_redis_command_args, - self.is_transaction, - self.command_stack, + is_cluster, + get_command_args_fn, + False if is_cluster else self.is_transaction, + self._command_stack if is_cluster else self.command_stack, ) return await old_execute(self, *args, **kwargs) @@ -46,8 +45,8 @@ async def _sentry_execute(self, *args, **kwargs): pipeline_cls.execute = _sentry_execute -def patch_redis_async_client(cls): - # type: (Any) -> None +def patch_redis_async_client(cls, is_cluster): + # type: (Any, bool) -> None old_execute_command = cls.execute_command async def _sentry_execute_command(self, name, *args, **kwargs): @@ -60,7 +59,7 @@ async def _sentry_execute_command(self, name, *args, **kwargs): description = _get_span_description(name, *args) with hub.start_span(op=OP.DB_REDIS, description=description) as span: - _set_client_data(span, False, name, *args) + _set_client_data(span, is_cluster, name, *args) return await old_execute_command(self, name, *args, **kwargs) diff --git a/tests/integrations/redis/cluster/__init__.py b/tests/integrations/redis/cluster/__init__.py new file mode 100644 index 0000000000..008b24295f --- /dev/null +++ b/tests/integrations/redis/cluster/__init__.py @@ -0,0 +1,3 @@ +import pytest + +pytest.importorskip("redis.cluster") diff --git a/tests/integrations/redis/cluster/test_redis_cluster.py b/tests/integrations/redis/cluster/test_redis_cluster.py new file mode 100644 index 0000000000..95a460eccc --- /dev/null +++ b/tests/integrations/redis/cluster/test_redis_cluster.py @@ -0,0 +1,84 @@ +import pytest +from sentry_sdk import capture_message +from sentry_sdk.consts import SPANDATA +from sentry_sdk.api import start_transaction +from sentry_sdk.integrations.redis import RedisIntegration + +import redis + + +@pytest.fixture(autouse=True) +def monkeypatch_rediscluster_class(reset_integrations): + pipeline_cls = redis.cluster.ClusterPipeline + redis.cluster.NodesManager.initialize = lambda *_, **__: None + redis.RedisCluster.command = lambda *_: [] + redis.RedisCluster.pipeline = lambda *_, **__: pipeline_cls(None, None) + pipeline_cls.execute = lambda *_, **__: None + redis.RedisCluster.execute_command = lambda *_, **__: None + + +def test_rediscluster_basic(sentry_init, capture_events): + sentry_init(integrations=[RedisIntegration()]) + events = capture_events() + + rc = redis.RedisCluster(host="localhost", port=6379) + rc.get("foobar") + capture_message("hi") + + (event,) = events + (crumb,) = event["breadcrumbs"]["values"] + + assert crumb == { + "category": "redis", + "message": "GET 'foobar'", + "data": { + "db.operation": "GET", + "redis.key": "foobar", + "redis.command": "GET", + "redis.is_cluster": True, + }, + "timestamp": crumb["timestamp"], + "type": "redis", + } + + +@pytest.mark.parametrize( + "send_default_pii, expected_first_ten", + [ + (False, ["GET 'foo'", "SET 'bar' [Filtered]", "SET 'baz' [Filtered]"]), + (True, ["GET 'foo'", "SET 'bar' 1", "SET 'baz' 2"]), + ], +) +def test_rediscluster_pipeline( + sentry_init, capture_events, send_default_pii, expected_first_ten +): + sentry_init( + integrations=[RedisIntegration()], + traces_sample_rate=1.0, + send_default_pii=send_default_pii, + ) + events = capture_events() + + rc = redis.RedisCluster(host="localhost", port=6379) + with start_transaction(): + pipeline = rc.pipeline() + pipeline.get("foo") + pipeline.set("bar", 1) + pipeline.set("baz", 2) + pipeline.execute() + + (event,) = events + (span,) = event["spans"] + assert span["op"] == "db.redis" + assert span["description"] == "redis.pipeline.execute" + assert span["data"] == { + "redis.commands": { + "count": 3, + "first_ten": expected_first_ten, + }, + SPANDATA.DB_SYSTEM: "redis", + } + assert span["tags"] == { + "redis.transaction": False, # For Cluster, this is always False + "redis.is_cluster": True, + } diff --git a/tests/integrations/redis/cluster_asyncio/__init__.py b/tests/integrations/redis/cluster_asyncio/__init__.py new file mode 100644 index 0000000000..663979a4e2 --- /dev/null +++ b/tests/integrations/redis/cluster_asyncio/__init__.py @@ -0,0 +1,3 @@ +import pytest + +pytest.importorskip("redis.asyncio.cluster") diff --git a/tests/integrations/redis/cluster_asyncio/test_redis_cluster_asyncio.py b/tests/integrations/redis/cluster_asyncio/test_redis_cluster_asyncio.py new file mode 100644 index 0000000000..d644761184 --- /dev/null +++ b/tests/integrations/redis/cluster_asyncio/test_redis_cluster_asyncio.py @@ -0,0 +1,96 @@ +import pytest + +from sentry_sdk import capture_message, start_transaction +from sentry_sdk.integrations.redis import RedisIntegration + +import redis + + +async def fake_initialize(*_, **__): + return None + + +async def fake_execute_command(*_, **__): + return [] + + +async def fake_execute(*_, **__): + return None + + +@pytest.fixture(autouse=True) +def monkeypatch_rediscluster_asyncio_class(reset_integrations): + pipeline_cls = redis.asyncio.cluster.ClusterPipeline + redis.asyncio.cluster.NodesManager.initialize = fake_initialize + redis.asyncio.RedisCluster.pipeline = lambda self, *_, **__: pipeline_cls(self) + pipeline_cls.execute = fake_execute + redis.asyncio.RedisCluster.execute_command = fake_execute_command + + +@pytest.mark.asyncio +async def test_async_basic(sentry_init, capture_events): + sentry_init(integrations=[RedisIntegration()]) + events = capture_events() + + connection = redis.asyncio.RedisCluster(host="localhost", port=6379) + + await connection.get("foobar") + capture_message("hi") + + (event,) = events + (crumb,) = event["breadcrumbs"]["values"] + + assert crumb == { + "category": "redis", + "message": "GET 'foobar'", + "data": { + "db.operation": "GET", + "redis.key": "foobar", + "redis.command": "GET", + "redis.is_cluster": True, + }, + "timestamp": crumb["timestamp"], + "type": "redis", + } + + +@pytest.mark.parametrize( + "send_default_pii, expected_first_ten", + [ + (False, ["GET 'foo'", "SET 'bar' [Filtered]", "SET 'baz' [Filtered]"]), + (True, ["GET 'foo'", "SET 'bar' 1", "SET 'baz' 2"]), + ], +) +@pytest.mark.asyncio +async def test_async_redis_pipeline( + sentry_init, capture_events, send_default_pii, expected_first_ten +): + sentry_init( + integrations=[RedisIntegration()], + traces_sample_rate=1.0, + send_default_pii=send_default_pii, + ) + events = capture_events() + + connection = redis.asyncio.RedisCluster(host="localhost", port=6379) + with start_transaction(): + pipeline = connection.pipeline() + pipeline.get("foo") + pipeline.set("bar", 1) + pipeline.set("baz", 2) + await pipeline.execute() + + (event,) = events + (span,) = event["spans"] + assert span["op"] == "db.redis" + assert span["description"] == "redis.pipeline.execute" + assert span["data"] == { + "redis.commands": { + "count": 3, + "first_ten": expected_first_ten, + } + } + assert span["tags"] == { + "redis.transaction": False, + "redis.is_cluster": True, + }