From ef9754577c49d2dc8fe352694c05164825b48de4 Mon Sep 17 00:00:00 2001 From: Matthieu Devlin Date: Wed, 27 Sep 2023 10:33:33 -0700 Subject: [PATCH] feat(integrations): add support for cluster clients from redis sdk --- sentry_sdk/integrations/redis/__init__.py | 134 ++++++++++++++--- sentry_sdk/integrations/redis/asyncio.py | 29 ++-- .../redis/asyncio/test_redis_asyncio.py | 6 +- tests/integrations/redis/cluster/__init__.py | 3 + .../redis/cluster/test_redis_cluster.py | 127 ++++++++++++++++ .../redis/cluster_asyncio/__init__.py | 3 + .../test_redis_cluster_asyncio.py | 142 ++++++++++++++++++ 7 files changed, 407 insertions(+), 37 deletions(-) create mode 100644 tests/integrations/redis/cluster/__init__.py create mode 100644 tests/integrations/redis/cluster/test_redis_cluster.py create mode 100644 tests/integrations/redis/cluster_asyncio/__init__.py create mode 100644 tests/integrations/redis/cluster_asyncio/test_redis_cluster_asyncio.py diff --git a/sentry_sdk/integrations/redis/__init__.py b/sentry_sdk/integrations/redis/__init__.py index f6c4f186ff..bcda1b5c1f 100644 --- a/sentry_sdk/integrations/redis/__init__.py +++ b/sentry_sdk/integrations/redis/__init__.py @@ -13,7 +13,7 @@ ) if TYPE_CHECKING: - from typing import Any, Dict, Sequence + from typing import Any, Dict, Sequence, Callable from sentry_sdk.tracing import Span _SINGLE_KEY_COMMANDS = frozenset( @@ -83,8 +83,7 @@ def _set_pipeline_data( ): # type: (Span, bool, Any, bool, Sequence[Any]) -> None span.set_tag("redis.is_cluster", is_cluster) - transaction = is_transaction if not is_cluster else False - span.set_tag("redis.transaction", transaction) + span.set_tag("redis.transaction", is_transaction) commands = [] for i, arg in enumerate(command_stack): @@ -118,7 +117,7 @@ def _set_client_data(span, is_cluster, name, *args): span.set_tag("redis.key", args[0]) -def _set_db_data(span, connection_params): +def _set_db_data_on_span(span, connection_params): # type: (Span, Dict[str, Any]) -> None span.set_data(SPANDATA.DB_SYSTEM, "redis") @@ -135,8 +134,34 @@ def _set_db_data(span, connection_params): span.set_data(SPANDATA.SERVER_PORT, port) -def patch_redis_pipeline(pipeline_cls, is_cluster, get_command_args_fn): - # type: (Any, bool, Any) -> None +def _set_db_data(span, redis_instance): + # type: (Span, Any) -> None + _set_db_data_on_span(span, redis_instance.connection_pool.connection_kwargs) + + +def _set_cluster_db_data(span, redis_cluster_instance): + # type: (Span, Any) -> None + default_node = redis_cluster_instance.get_default_node() + if default_node: + _set_db_data_on_span( + span, {"host": default_node.host, "port": default_node.port} + ) + + +def _set_async_cluster_db_data(span, async_redis_cluster_instance): + # type: (Span, Any) -> None + default_node = async_redis_cluster_instance.get_default_node() + if default_node and default_node.connection_kwargs: + _set_db_data_on_span(span, default_node.connection_kwargs) + + +def _set_async_cluster_pipeline_db_data(span, async_redis_cluster_pipeline_instance): + # type: (Span, Any) -> None + _set_async_cluster_db_data(span, async_redis_cluster_pipeline_instance._client) + + +def patch_redis_pipeline(pipeline_cls, is_cluster, get_command_args_fn, set_db_data_fn): + # type: (Any, bool, Any, Callable[[Span, Any], None]) -> None old_execute = pipeline_cls.execute def sentry_patched_execute(self, *args, **kwargs): @@ -150,12 +175,12 @@ def sentry_patched_execute(self, *args, **kwargs): op=OP.DB_REDIS, description="redis.pipeline.execute" ) as span: with capture_internal_exceptions(): - _set_db_data(span, self.connection_pool.connection_kwargs) + set_db_data_fn(span, self) _set_pipeline_data( span, is_cluster, get_command_args_fn, - self.transaction, + False if is_cluster else self.transaction, self.command_stack, ) @@ -164,8 +189,8 @@ def sentry_patched_execute(self, *args, **kwargs): pipeline_cls.execute = sentry_patched_execute -def patch_redis_client(cls, is_cluster): - # type: (Any, bool) -> None +def patch_redis_client(cls, is_cluster, set_db_data_fn): + # type: (Any, bool, Callable[[Span, Any], None]) -> None """ This function can be used to instrument custom redis client classes or subclasses. @@ -189,7 +214,7 @@ def sentry_patched_execute_command(self, name, *args, **kwargs): description = description[: integration.max_data_size - len("...")] + "..." with hub.start_span(op=OP.DB_REDIS, description=description) as span: - _set_db_data(span, self.connection_pool.connection_kwargs) + set_db_data_fn(span, self) _set_client_data(span, is_cluster, name, *args) return old_execute_command(self, name, *args, **kwargs) @@ -199,14 +224,16 @@ def sentry_patched_execute_command(self, name, *args, **kwargs): def _patch_redis(StrictRedis, client): # noqa: N803 # type: (Any, Any) -> None - patch_redis_client(StrictRedis, is_cluster=False) - patch_redis_pipeline(client.Pipeline, False, _get_redis_command_args) + patch_redis_client(StrictRedis, is_cluster=False, set_db_data_fn=_set_db_data) + patch_redis_pipeline(client.Pipeline, False, _get_redis_command_args, _set_db_data) try: strict_pipeline = client.StrictPipeline except AttributeError: pass else: - patch_redis_pipeline(strict_pipeline, False, _get_redis_command_args) + patch_redis_pipeline( + strict_pipeline, False, _get_redis_command_args, _set_db_data + ) try: import redis.asyncio @@ -218,8 +245,56 @@ def _patch_redis(StrictRedis, client): # noqa: N803 patch_redis_async_pipeline, ) - patch_redis_async_client(redis.asyncio.client.StrictRedis) - patch_redis_async_pipeline(redis.asyncio.client.Pipeline) + patch_redis_async_client( + redis.asyncio.client.StrictRedis, + is_cluster=False, + set_db_data_fn=_set_db_data, + ) + patch_redis_async_pipeline( + redis.asyncio.client.Pipeline, + False, + _get_redis_command_args, + set_db_data_fn=_set_db_data, + ) + + +def _patch_redis_cluster(): + # type: () -> None + """Patches the cluster module on redis SDK (as opposed to rediscluster library)""" + try: + from redis import RedisCluster, cluster + except ImportError: + pass + else: + patch_redis_client(RedisCluster, True, _set_cluster_db_data) + patch_redis_pipeline( + cluster.ClusterPipeline, + True, + _parse_rediscluster_command, + _set_cluster_db_data, + ) + + try: + from redis.asyncio import cluster as async_cluster + except ImportError: + pass + else: + from sentry_sdk.integrations.redis.asyncio import ( + patch_redis_async_client, + patch_redis_async_pipeline, + ) + + patch_redis_async_client( + async_cluster.RedisCluster, + is_cluster=True, + set_db_data_fn=_set_async_cluster_db_data, + ) + patch_redis_async_pipeline( + async_cluster.ClusterPipeline, + True, + _parse_rediscluster_command, + set_db_data_fn=_set_async_cluster_pipeline_db_data, + ) def _patch_rb(): @@ -229,9 +304,15 @@ def _patch_rb(): except ImportError: pass else: - patch_redis_client(rb.clients.FanoutClient, is_cluster=False) - patch_redis_client(rb.clients.MappingClient, is_cluster=False) - patch_redis_client(rb.clients.RoutingClient, is_cluster=False) + patch_redis_client( + rb.clients.FanoutClient, is_cluster=False, set_db_data_fn=_set_db_data + ) + patch_redis_client( + rb.clients.MappingClient, is_cluster=False, set_db_data_fn=_set_db_data + ) + patch_redis_client( + rb.clients.RoutingClient, is_cluster=False, set_db_data_fn=_set_db_data + ) def _patch_rediscluster(): @@ -241,7 +322,9 @@ def _patch_rediscluster(): except ImportError: return - patch_redis_client(rediscluster.RedisCluster, is_cluster=True) + patch_redis_client( + rediscluster.RedisCluster, is_cluster=True, set_db_data_fn=_set_db_data + ) # up to v1.3.6, __version__ attribute is a tuple # from v2.0.0, __version__ is a string and VERSION a tuple @@ -251,11 +334,17 @@ def _patch_rediscluster(): # https://github.com/Grokzen/redis-py-cluster/blob/master/docs/release-notes.rst if (0, 2, 0) < version < (2, 0, 0): pipeline_cls = rediscluster.pipeline.StrictClusterPipeline - patch_redis_client(rediscluster.StrictRedisCluster, is_cluster=True) + patch_redis_client( + rediscluster.StrictRedisCluster, + is_cluster=True, + set_db_data_fn=_set_db_data, + ) else: pipeline_cls = rediscluster.pipeline.ClusterPipeline - patch_redis_pipeline(pipeline_cls, True, _parse_rediscluster_command) + patch_redis_pipeline( + pipeline_cls, True, _parse_rediscluster_command, set_db_data_fn=_set_db_data + ) class RedisIntegration(Integration): @@ -274,6 +363,7 @@ def setup_once(): raise DidNotEnable("Redis client not installed") _patch_redis(StrictRedis, client) + _patch_redis_cluster() _patch_rb() try: diff --git a/sentry_sdk/integrations/redis/asyncio.py b/sentry_sdk/integrations/redis/asyncio.py index 70decdcbd4..3058a87c7a 100644 --- a/sentry_sdk/integrations/redis/asyncio.py +++ b/sentry_sdk/integrations/redis/asyncio.py @@ -4,21 +4,22 @@ from sentry_sdk.consts import OP from sentry_sdk.integrations.redis import ( RedisIntegration, - _get_redis_command_args, _get_span_description, _set_client_data, - _set_db_data, _set_pipeline_data, ) from sentry_sdk._types import TYPE_CHECKING +from sentry_sdk.tracing import Span from sentry_sdk.utils import capture_internal_exceptions if TYPE_CHECKING: - from typing import Any + from typing import Any, Callable -def patch_redis_async_pipeline(pipeline_cls): - # type: (Any) -> None +def patch_redis_async_pipeline( + pipeline_cls, is_cluster, get_command_args_fn, set_db_data_fn +): + # type: (Any, bool, Any, Callable[[Span, Any], None]) -> None old_execute = pipeline_cls.execute async def _sentry_execute(self, *args, **kwargs): @@ -32,13 +33,13 @@ async def _sentry_execute(self, *args, **kwargs): op=OP.DB_REDIS, description="redis.pipeline.execute" ) as span: with capture_internal_exceptions(): - _set_db_data(span, self.connection_pool.connection_kwargs) + set_db_data_fn(span, self) _set_pipeline_data( span, - False, - _get_redis_command_args, - self.is_transaction, - self.command_stack, + is_cluster, + get_command_args_fn, + False if is_cluster else self.is_transaction, + self._command_stack if is_cluster else self.command_stack, ) return await old_execute(self, *args, **kwargs) @@ -46,8 +47,8 @@ async def _sentry_execute(self, *args, **kwargs): pipeline_cls.execute = _sentry_execute -def patch_redis_async_client(cls): - # type: (Any) -> None +def patch_redis_async_client(cls, is_cluster, set_db_data_fn): + # type: (Any, bool, Callable[[Span, Any], None]) -> None old_execute_command = cls.execute_command async def _sentry_execute_command(self, name, *args, **kwargs): @@ -60,8 +61,8 @@ async def _sentry_execute_command(self, name, *args, **kwargs): description = _get_span_description(name, *args) with hub.start_span(op=OP.DB_REDIS, description=description) as span: - _set_db_data(span, self.connection_pool.connection_kwargs) - _set_client_data(span, False, name, *args) + set_db_data_fn(span, self) + _set_client_data(span, is_cluster, name, *args) return await old_execute_command(self, name, *args, **kwargs) diff --git a/tests/integrations/redis/asyncio/test_redis_asyncio.py b/tests/integrations/redis/asyncio/test_redis_asyncio.py index 7233b8f908..016372c19f 100644 --- a/tests/integrations/redis/asyncio/test_redis_asyncio.py +++ b/tests/integrations/redis/asyncio/test_redis_asyncio.py @@ -9,7 +9,11 @@ @pytest.mark.asyncio async def test_async_basic(sentry_init, capture_events): - sentry_init(integrations=[RedisIntegration()]) + sentry_init( + integrations=[RedisIntegration()], + traces_sample_rate=1.0, + send_default_pii=True, + ) events = capture_events() connection = FakeRedis() diff --git a/tests/integrations/redis/cluster/__init__.py b/tests/integrations/redis/cluster/__init__.py new file mode 100644 index 0000000000..008b24295f --- /dev/null +++ b/tests/integrations/redis/cluster/__init__.py @@ -0,0 +1,3 @@ +import pytest + +pytest.importorskip("redis.cluster") diff --git a/tests/integrations/redis/cluster/test_redis_cluster.py b/tests/integrations/redis/cluster/test_redis_cluster.py new file mode 100644 index 0000000000..95e6627aeb --- /dev/null +++ b/tests/integrations/redis/cluster/test_redis_cluster.py @@ -0,0 +1,127 @@ +import pytest +from sentry_sdk import capture_message +from sentry_sdk.consts import SPANDATA +from sentry_sdk.api import start_transaction +from sentry_sdk.integrations.redis import RedisIntegration + +import redis + + +@pytest.fixture(autouse=True) +def monkeypatch_rediscluster_class(reset_integrations): + pipeline_cls = redis.cluster.ClusterPipeline + redis.cluster.NodesManager.initialize = lambda *_, **__: None + redis.RedisCluster.command = lambda *_: [] + redis.RedisCluster.pipeline = lambda *_, **__: pipeline_cls(None, None) + redis.RedisCluster.get_default_node = lambda *_, **__: redis.cluster.ClusterNode( + "localhost", 6379 + ) + pipeline_cls.execute = lambda *_, **__: None + redis.RedisCluster.execute_command = lambda *_, **__: None + + +def test_rediscluster_breadcrumb(sentry_init, capture_events): + sentry_init(integrations=[RedisIntegration()]) + events = capture_events() + + rc = redis.RedisCluster(host="localhost", port=6379) + rc.get("foobar") + capture_message("hi") + + (event,) = events + (crumb,) = event["breadcrumbs"]["values"] + + assert crumb == { + "category": "redis", + "message": "GET 'foobar'", + "data": { + "db.operation": "GET", + "redis.key": "foobar", + "redis.command": "GET", + "redis.is_cluster": True, + }, + "timestamp": crumb["timestamp"], + "type": "redis", + } + + +@pytest.mark.parametrize( + "send_default_pii, description", + [ + (False, "SET 'bar' [Filtered]"), + (True, "SET 'bar' 1"), + ], +) +def test_rediscluster_basic(sentry_init, capture_events, send_default_pii, description): + sentry_init( + integrations=[RedisIntegration()], + traces_sample_rate=1.0, + send_default_pii=send_default_pii, + ) + events = capture_events() + + with start_transaction(): + rc = redis.RedisCluster(host="localhost", port=6379) + rc.set("bar", 1) + + (event,) = events + (span,) = event["spans"] + assert span["op"] == "db.redis" + assert span["description"] == description + assert span["data"] == { + SPANDATA.DB_SYSTEM: "redis", + # ClusterNode converts localhost to 127.0.0.1 + SPANDATA.SERVER_ADDRESS: "127.0.0.1", + SPANDATA.SERVER_PORT: 6379, + } + assert span["tags"] == { + "db.operation": "SET", + "redis.command": "SET", + "redis.is_cluster": True, + "redis.key": "bar", + } + + +@pytest.mark.parametrize( + "send_default_pii, expected_first_ten", + [ + (False, ["GET 'foo'", "SET 'bar' [Filtered]", "SET 'baz' [Filtered]"]), + (True, ["GET 'foo'", "SET 'bar' 1", "SET 'baz' 2"]), + ], +) +def test_rediscluster_pipeline( + sentry_init, capture_events, send_default_pii, expected_first_ten +): + sentry_init( + integrations=[RedisIntegration()], + traces_sample_rate=1.0, + send_default_pii=send_default_pii, + ) + events = capture_events() + + rc = redis.RedisCluster(host="localhost", port=6379) + with start_transaction(): + pipeline = rc.pipeline() + pipeline.get("foo") + pipeline.set("bar", 1) + pipeline.set("baz", 2) + pipeline.execute() + + (event,) = events + (span,) = event["spans"] + assert span["op"] == "db.redis" + assert span["description"] == "redis.pipeline.execute" + assert span["data"] == { + "redis.commands": { + "count": 3, + "first_ten": expected_first_ten, + }, + SPANDATA.DB_SYSTEM: "redis", + # ClusterNode converts localhost to 127.0.0.1 + SPANDATA.SERVER_ADDRESS: "127.0.0.1", + SPANDATA.SERVER_PORT: 6379, + } + assert span["tags"] == { + "redis.transaction": False, # For Cluster, this is always False + "redis.is_cluster": True, + } diff --git a/tests/integrations/redis/cluster_asyncio/__init__.py b/tests/integrations/redis/cluster_asyncio/__init__.py new file mode 100644 index 0000000000..663979a4e2 --- /dev/null +++ b/tests/integrations/redis/cluster_asyncio/__init__.py @@ -0,0 +1,3 @@ +import pytest + +pytest.importorskip("redis.asyncio.cluster") diff --git a/tests/integrations/redis/cluster_asyncio/test_redis_cluster_asyncio.py b/tests/integrations/redis/cluster_asyncio/test_redis_cluster_asyncio.py new file mode 100644 index 0000000000..ad78b79e27 --- /dev/null +++ b/tests/integrations/redis/cluster_asyncio/test_redis_cluster_asyncio.py @@ -0,0 +1,142 @@ +import pytest + +from sentry_sdk import capture_message, start_transaction +from sentry_sdk.consts import SPANDATA +from sentry_sdk.integrations.redis import RedisIntegration + +from redis.asyncio import cluster + + +async def fake_initialize(*_, **__): + return None + + +async def fake_execute_command(*_, **__): + return [] + + +async def fake_execute(*_, **__): + return None + + +@pytest.fixture(autouse=True) +def monkeypatch_rediscluster_asyncio_class(reset_integrations): + pipeline_cls = cluster.ClusterPipeline + cluster.NodesManager.initialize = fake_initialize + cluster.RedisCluster.get_default_node = lambda *_, **__: cluster.ClusterNode( + "localhost", 6379 + ) + cluster.RedisCluster.pipeline = lambda self, *_, **__: pipeline_cls(self) + pipeline_cls.execute = fake_execute + cluster.RedisCluster.execute_command = fake_execute_command + + +@pytest.mark.asyncio +async def test_async_breadcrumb(sentry_init, capture_events): + sentry_init(integrations=[RedisIntegration()]) + events = capture_events() + + connection = cluster.RedisCluster(host="localhost", port=6379) + + await connection.get("foobar") + capture_message("hi") + + (event,) = events + (crumb,) = event["breadcrumbs"]["values"] + + assert crumb == { + "category": "redis", + "message": "GET 'foobar'", + "data": { + "db.operation": "GET", + "redis.key": "foobar", + "redis.command": "GET", + "redis.is_cluster": True, + }, + "timestamp": crumb["timestamp"], + "type": "redis", + } + + +@pytest.mark.parametrize( + "send_default_pii, description", + [ + (False, "SET 'bar' [Filtered]"), + (True, "SET 'bar' 1"), + ], +) +@pytest.mark.asyncio +async def test_async_basic(sentry_init, capture_events, send_default_pii, description): + sentry_init( + integrations=[RedisIntegration()], + traces_sample_rate=1.0, + send_default_pii=send_default_pii, + ) + events = capture_events() + + connection = cluster.RedisCluster(host="localhost", port=6379) + with start_transaction(): + await connection.set("bar", 1) + + (event,) = events + (span,) = event["spans"] + assert span["op"] == "db.redis" + assert span["description"] == description + assert span["data"] == { + SPANDATA.DB_SYSTEM: "redis", + # ClusterNode converts localhost to 127.0.0.1 + SPANDATA.SERVER_ADDRESS: "127.0.0.1", + SPANDATA.SERVER_PORT: 6379, + } + assert span["tags"] == { + "redis.is_cluster": True, + "db.operation": "SET", + "redis.command": "SET", + "redis.key": "bar", + } + + +@pytest.mark.parametrize( + "send_default_pii, expected_first_ten", + [ + (False, ["GET 'foo'", "SET 'bar' [Filtered]", "SET 'baz' [Filtered]"]), + (True, ["GET 'foo'", "SET 'bar' 1", "SET 'baz' 2"]), + ], +) +@pytest.mark.asyncio +async def test_async_redis_pipeline( + sentry_init, capture_events, send_default_pii, expected_first_ten +): + sentry_init( + integrations=[RedisIntegration()], + traces_sample_rate=1.0, + send_default_pii=send_default_pii, + ) + events = capture_events() + + connection = cluster.RedisCluster(host="localhost", port=6379) + with start_transaction(): + pipeline = connection.pipeline() + pipeline.get("foo") + pipeline.set("bar", 1) + pipeline.set("baz", 2) + await pipeline.execute() + + (event,) = events + (span,) = event["spans"] + assert span["op"] == "db.redis" + assert span["description"] == "redis.pipeline.execute" + assert span["data"] == { + "redis.commands": { + "count": 3, + "first_ten": expected_first_ten, + }, + SPANDATA.DB_SYSTEM: "redis", + # ClusterNode converts localhost to 127.0.0.1 + SPANDATA.SERVER_ADDRESS: "127.0.0.1", + SPANDATA.SERVER_PORT: 6379, + } + assert span["tags"] == { + "redis.transaction": False, + "redis.is_cluster": True, + }