From f68f0d0477b8b8460f14b41e8135ca9acb35b238 Mon Sep 17 00:00:00 2001 From: Andrew Liu <159852527+aliu39@users.noreply.github.com> Date: Fri, 20 Sep 2024 15:15:08 -0700 Subject: [PATCH] perf(replay): reduce postgres queries in recording consumer using get_from_cache (#77365) @cmanallen pointed out the queries we make in _handle_breadcrumb, used to check feature flags and project options, could overload Postgres as we scale. To fix this we can do the query at the top-level and use `get_from_cache` --------- Co-authored-by: getsantry[bot] <66042841+getsantry[bot]@users.noreply.github.com> --- .../replays/consumers/recording_buffered.py | 4 +- .../replays/usecases/ingest/__init__.py | 3 +- .../replays/usecases/ingest/dom_index.py | 64 ++++++++++------- .../replays/unit/test_ingest_dom_index.py | 72 +++++++++++-------- 4 files changed, 86 insertions(+), 57 deletions(-) diff --git a/src/sentry/replays/consumers/recording_buffered.py b/src/sentry/replays/consumers/recording_buffered.py index 2d04e75629762..05021dbe51f73 100644 --- a/src/sentry/replays/consumers/recording_buffered.py +++ b/src/sentry/replays/consumers/recording_buffered.py @@ -56,6 +56,7 @@ from sentry_kafka_schemas.schema_types.ingest_replay_recordings_v1 import ReplayRecording from sentry.conf.types.kafka_definition import Topic, get_topic_codec +from sentry.models.project import Project from sentry.replays.lib.storage import ( RecordingSegmentStorageMeta, make_recording_filename, @@ -318,8 +319,9 @@ def process_message(buffer: RecordingBuffer, message: bytes) -> None: else None ) + project = Project.objects.get_from_cache(id=decoded_message["project_id"]) replay_actions = parse_replay_actions( - decoded_message["project_id"], + project, decoded_message["replay_id"], decoded_message["retention_days"], parsed_recording_data, diff --git a/src/sentry/replays/usecases/ingest/__init__.py b/src/sentry/replays/usecases/ingest/__init__.py index 80beeade6d0bb..137e832b4df85 100644 --- a/src/sentry/replays/usecases/ingest/__init__.py +++ b/src/sentry/replays/usecases/ingest/__init__.py @@ -262,9 +262,10 @@ def recording_post_processor( op="replays.usecases.ingest.parse_and_emit_replay_actions", description="parse_and_emit_replay_actions", ): + project = Project.objects.get_from_cache(id=message.project_id) parse_and_emit_replay_actions( retention_days=message.retention_days, - project_id=message.project_id, + project=project, replay_id=message.replay_id, segment_data=parsed_segment_data, replay_event=parsed_replay_event, diff --git a/src/sentry/replays/usecases/ingest/dom_index.py b/src/sentry/replays/usecases/ingest/dom_index.py index aa915d3059110..e211891d9c179 100644 --- a/src/sentry/replays/usecases/ingest/dom_index.py +++ b/src/sentry/replays/usecases/ingest/dom_index.py @@ -62,7 +62,7 @@ class ReplayActionsEvent(TypedDict): def parse_and_emit_replay_actions( - project_id: int, + project: Project, replay_id: str, retention_days: int, segment_data: list[dict[str, Any]], @@ -70,7 +70,7 @@ def parse_and_emit_replay_actions( ) -> None: with metrics.timer("replays.usecases.ingest.dom_index.parse_and_emit_replay_actions"): message = parse_replay_actions( - project_id, replay_id, retention_days, segment_data, replay_event + project, replay_id, retention_days, segment_data, replay_event ) if message is not None: emit_replay_actions(message) @@ -82,19 +82,19 @@ def emit_replay_actions(action: ReplayActionsEvent) -> None: def parse_replay_actions( - project_id: int, + project: Project, replay_id: str, retention_days: int, segment_data: list[dict[str, Any]], replay_event: dict[str, Any] | None, ) -> ReplayActionsEvent | None: """Parse RRWeb payload to ReplayActionsEvent.""" - actions = get_user_actions(project_id, replay_id, segment_data, replay_event) + actions = get_user_actions(project, replay_id, segment_data, replay_event) if len(actions) == 0: return None payload = create_replay_actions_payload(replay_id, actions) - return create_replay_actions_event(replay_id, project_id, retention_days, payload) + return create_replay_actions_event(replay_id, project.id, retention_days, payload) def create_replay_actions_event( @@ -153,7 +153,7 @@ def log_canvas_size( def get_user_actions( - project_id: int, + project: Project, replay_id: str, events: list[dict[str, Any]], replay_event: dict[str, Any] | None, @@ -177,6 +177,10 @@ def get_user_actions( "textContent": "Helloworld!" } """ + # Feature flag and project option queries + should_report_rage = _should_report_rage_click_issue(project) + should_report_hydration = _should_report_hydration_error_issue(project) + result: list[ReplayActionsEventPayloadClick] = [] for event in _iter_custom_events(events): if len(result) == 20: @@ -185,7 +189,14 @@ def get_user_actions( tag = event.get("data", {}).get("tag") if tag == "breadcrumb": - click = _handle_breadcrumb(event, project_id, replay_id, replay_event) + click = _handle_breadcrumb( + event, + project, + replay_id, + replay_event, + should_report_rage_click_issue=should_report_rage, + should_report_hydration_error_issue=should_report_hydration, + ) if click is not None: result.append(click) # look for request / response breadcrumbs and report metrics on them @@ -193,7 +204,7 @@ def get_user_actions( _handle_resource_metric_event(event) # log the SDK options sent from the SDK 1/500 times if tag == "options" and random.randint(0, 499) < 1: - _handle_options_logging_event(project_id, replay_id, event) + _handle_options_logging_event(project.id, replay_id, event) # log large dom mutation breadcrumb events 1/100 times payload = event.get("data", {}).get("payload", {}) @@ -203,7 +214,7 @@ def get_user_actions( and payload.get("category") == "replay.mutations" and random.randint(0, 500) < 1 ): - _handle_mutations_event(project_id, replay_id, event) + _handle_mutations_event(project.id, replay_id, event) return result @@ -287,12 +298,10 @@ def _parse_classes(classes: str) -> list[str]: return list(filter(lambda n: n != "", classes.split(" ")))[:10] -def _should_report_hydration_error_issue(project_id: int) -> bool: - project = Project.objects.get(id=project_id) +def _should_report_hydration_error_issue(project: Project) -> bool: """ - The feature is controlled by Sentry admins for release of the feature, - while the project option is controlled by the project owner, and is a - permanent setting + Checks the feature that's controlled by Sentry admins for release of the feature, + and the permanent project option, controlled by the project owner. """ return features.has( "organizations:session-replay-hydration-error-issue-creation", @@ -300,8 +309,10 @@ def _should_report_hydration_error_issue(project_id: int) -> bool: ) and project.get_option("sentry:replay_hydration_error_issues") -def _should_report_rage_click_issue(project_id: int) -> bool: - project = Project.objects.get(id=project_id) +def _should_report_rage_click_issue(project: Project) -> bool: + """ + Checks the project option, controlled by a project owner. + """ return project.get_option("sentry:replay_rage_click_issues") @@ -374,7 +385,12 @@ def _handle_mutations_event(project_id: int, replay_id: str, event: dict[str, An def _handle_breadcrumb( - event: dict[str, Any], project_id: int, replay_id: str, replay_event: dict[str, Any] | None + event: dict[str, Any], + project: Project, + replay_id: str, + replay_event: dict[str, Any] | None, + should_report_rage_click_issue=False, + should_report_hydration_error_issue=False, ) -> ReplayActionsEventPayloadClick | None: click = None @@ -399,15 +415,15 @@ def _handle_breadcrumb( payload["data"].get("clickCount", 0) or payload["data"].get("clickcount", 0) ) >= 5 click = create_click_event( - payload, replay_id, is_dead=True, is_rage=is_rage, project_id=project_id + payload, replay_id, is_dead=True, is_rage=is_rage, project_id=project.id ) if click is not None: if is_rage: metrics.incr("replay.rage_click_detected") - if _should_report_rage_click_issue(project_id): + if should_report_rage_click_issue: if replay_event is not None: report_rage_click_issue_with_replay_event( - project_id, + project.id, replay_id, payload["timestamp"], payload["message"], @@ -418,7 +434,7 @@ def _handle_breadcrumb( ) # Log the event for tracking. log = event["data"].get("payload", {}).copy() - log["project_id"] = project_id + log["project_id"] = project.id log["replay_id"] = replay_id log["dom_tree"] = log.pop("message") @@ -426,16 +442,16 @@ def _handle_breadcrumb( elif category == "ui.click": click = create_click_event( - payload, replay_id, is_dead=False, is_rage=False, project_id=project_id + payload, replay_id, is_dead=False, is_rage=False, project_id=project.id ) if click is not None: return click elif category == "replay.hydrate-error": metrics.incr("replay.hydration_error_breadcrumb") - if replay_event is not None and _should_report_hydration_error_issue(project_id): + if replay_event is not None and should_report_hydration_error_issue: report_hydration_error_issue_with_replay_event( - project_id, + project.id, replay_id, payload["timestamp"], payload.get("data", {}).get("url"), diff --git a/tests/sentry/replays/unit/test_ingest_dom_index.py b/tests/sentry/replays/unit/test_ingest_dom_index.py index f93d57e566a89..34eaa548fafb3 100644 --- a/tests/sentry/replays/unit/test_ingest_dom_index.py +++ b/tests/sentry/replays/unit/test_ingest_dom_index.py @@ -3,9 +3,11 @@ import uuid from typing import Any from unittest import mock +from unittest.mock import Mock import pytest +from sentry.models.project import Project from sentry.replays.testutils import mock_replay_event from sentry.replays.usecases.ingest.dom_index import ( _get_testid, @@ -27,7 +29,15 @@ def patch_rage_click_issue_with_replay_event(): yield m -def test_get_user_actions(): +@pytest.fixture(autouse=True) +def mock_project() -> Project: + """Has id=1. Use for unit tests so we can skip @django_db""" + proj = Mock(spec=Project) + proj.id = 1 + return proj + + +def test_get_user_actions(mock_project): """Test "get_user_actions" function.""" events = [ { @@ -63,7 +73,7 @@ def test_get_user_actions(): } ] - user_actions = get_user_actions(1, uuid.uuid4().hex, events, None) + user_actions = get_user_actions(mock_project, uuid.uuid4().hex, events, None) assert len(user_actions) == 1 assert user_actions[0]["node_id"] == 1 assert user_actions[0]["tag"] == "div" @@ -82,7 +92,7 @@ def test_get_user_actions(): assert len(user_actions[0]["event_hash"]) == 36 -def test_get_user_actions_str_payload(): +def test_get_user_actions_str_payload(mock_project): """Test "get_user_actions" function.""" events = [ { @@ -95,11 +105,11 @@ def test_get_user_actions_str_payload(): } ] - user_actions = get_user_actions(1, uuid.uuid4().hex, events, None) + user_actions = get_user_actions(mock_project, uuid.uuid4().hex, events, None) assert len(user_actions) == 0 -def test_get_user_actions_missing_node(): +def test_get_user_actions_missing_node(mock_project): """Test "get_user_actions" function.""" events = [ { @@ -117,11 +127,11 @@ def test_get_user_actions_missing_node(): } ] - user_actions = get_user_actions(1, uuid.uuid4().hex, events, None) + user_actions = get_user_actions(mock_project, uuid.uuid4().hex, events, None) assert len(user_actions) == 0 -def test_get_user_actions_performance_spans(): +def test_get_user_actions_performance_spans(mock_project): """Test that "get_user_actions" doesn't error when collecting rsrc metrics, on various formats of performanceSpan""" # payloads are not realistic examples - only include the fields necessary for testing # TODO: does not test if metrics.distribution() is called downstream, with correct param types. @@ -193,10 +203,10 @@ def test_get_user_actions_performance_spans(): }, }, ] - get_user_actions(1, uuid.uuid4().hex, events, None) + get_user_actions(mock_project, uuid.uuid4().hex, events, None) -def test_parse_replay_actions(): +def test_parse_replay_actions(mock_project): events = [ { "type": 5, @@ -231,7 +241,7 @@ def test_parse_replay_actions(): }, } ] - replay_actions = parse_replay_actions(1, "1", 30, events, None) + replay_actions = parse_replay_actions(mock_project, "1", 30, events, None) assert replay_actions is not None assert replay_actions["type"] == "replay_event" @@ -375,7 +385,7 @@ def test_parse_replay_dead_click_actions(patch_rage_click_issue_with_replay_even ] default_project.update_option("sentry:replay_rage_click_issues", True) - replay_actions = parse_replay_actions(default_project.id, "1", 30, events, mock_replay_event()) + replay_actions = parse_replay_actions(default_project, "1", 30, events, mock_replay_event()) assert patch_rage_click_issue_with_replay_event.call_count == 2 assert replay_actions is not None assert replay_actions["type"] == "replay_event" @@ -528,7 +538,7 @@ def test_rage_click_issue_creation_no_component_name( ] default_project.update_option("sentry:replay_rage_click_issues", True) - parse_replay_actions(default_project.id, "1", 30, events, mock_replay_event()) + parse_replay_actions(default_project, "1", 30, events, mock_replay_event()) # test that 2 rage click issues are still created assert patch_rage_click_issue_with_replay_event.call_count == 2 @@ -575,7 +585,7 @@ def test_parse_replay_click_actions_not_dead( } ] - replay_actions = parse_replay_actions(default_project.id, "1", 30, events, None) + replay_actions = parse_replay_actions(default_project, "1", 30, events, None) assert patch_rage_click_issue_with_replay_event.delay.call_count == 0 assert replay_actions is None @@ -619,7 +629,7 @@ def test_parse_replay_rage_click_actions(default_project): }, } ] - replay_actions = parse_replay_actions(default_project.id, "1", 30, events, None) + replay_actions = parse_replay_actions(default_project, "1", 30, events, None) assert replay_actions is not None assert replay_actions["type"] == "replay_event" @@ -659,7 +669,7 @@ def test_encode_as_uuid(): assert isinstance(uuid.UUID(a), uuid.UUID) -def test_parse_request_response_latest(): +def test_parse_request_response_latest(mock_project): events = [ { "type": 5, @@ -698,14 +708,14 @@ def test_parse_request_response_latest(): } ] with mock.patch("sentry.utils.metrics.distribution") as timing: - parse_replay_actions(1, "1", 30, events, None) + parse_replay_actions(mock_project, "1", 30, events, None) assert timing.call_args_list == [ mock.call("replays.usecases.ingest.request_body_size", 2949, unit="byte"), mock.call("replays.usecases.ingest.response_body_size", 94, unit="byte"), ] -def test_parse_request_response_no_info(): +def test_parse_request_response_no_info(mock_project): events = [ { "type": 5, @@ -726,11 +736,11 @@ def test_parse_request_response_no_info(): }, }, ] - parse_replay_actions(1, "1", 30, events, None) + parse_replay_actions(mock_project, "1", 30, events, None) # just make sure we don't raise -def test_parse_request_response_old_format_request_only(): +def test_parse_request_response_old_format_request_only(mock_project): events = [ { "type": 5, @@ -753,13 +763,13 @@ def test_parse_request_response_old_format_request_only(): }, ] with mock.patch("sentry.utils.metrics.distribution") as timing: - parse_replay_actions(1, "1", 30, events, None) + parse_replay_actions(mock_project, "1", 30, events, None) assert timing.call_args_list == [ mock.call("replays.usecases.ingest.request_body_size", 1002, unit="byte"), ] -def test_parse_request_response_old_format_response_only(): +def test_parse_request_response_old_format_response_only(mock_project): events = [ { "type": 5, @@ -781,13 +791,13 @@ def test_parse_request_response_old_format_response_only(): }, ] with mock.patch("sentry.utils.metrics.distribution") as timing: - parse_replay_actions(1, "1", 30, events, None) + parse_replay_actions(mock_project, "1", 30, events, None) assert timing.call_args_list == [ mock.call("replays.usecases.ingest.response_body_size", 1002, unit="byte"), ] -def test_parse_request_response_old_format_request_and_response(): +def test_parse_request_response_old_format_request_and_response(mock_project): events = [ { "type": 5, @@ -810,7 +820,7 @@ def test_parse_request_response_old_format_request_and_response(): }, ] with mock.patch("sentry.utils.metrics.distribution") as timing: - parse_replay_actions(1, "1", 30, events, None) + parse_replay_actions(mock_project, "1", 30, events, None) assert timing.call_args_list == [ mock.call("replays.usecases.ingest.request_body_size", 1002, unit="byte"), mock.call("replays.usecases.ingest.response_body_size", 8001, unit="byte"), @@ -930,7 +940,7 @@ def test_parse_replay_rage_clicks_with_replay_event( ] default_project.update_option("sentry:replay_rage_click_issues", True) - replay_actions = parse_replay_actions(default_project.id, "1", 30, events, mock_replay_event()) + replay_actions = parse_replay_actions(default_project, "1", 30, events, mock_replay_event()) assert patch_rage_click_issue_with_replay_event.call_count == 2 assert replay_actions is not None assert replay_actions["type"] == "replay_event" @@ -941,7 +951,7 @@ def test_parse_replay_rage_clicks_with_replay_event( assert isinstance(replay_actions["payload"], list) -def test_log_sdk_options(): +def test_log_sdk_options(mock_project): events: list[dict[str, Any]] = [ { "data": { @@ -973,11 +983,11 @@ def test_log_sdk_options(): mock.patch("random.randint") as randint, ): randint.return_value = 0 - parse_replay_actions(1, "1", 30, events, None) + parse_replay_actions(mock_project, "1", 30, events, None) assert logger.info.call_args_list == [mock.call("sentry.replays.slow_click", extra=log)] -def test_log_large_dom_mutations(): +def test_log_large_dom_mutations(mock_project): events: list[dict[str, Any]] = [ { "type": 5, @@ -1003,7 +1013,7 @@ def test_log_large_dom_mutations(): mock.patch("random.randint") as randint, ): randint.return_value = 0 - parse_replay_actions(1, "1", 30, events, None) + parse_replay_actions(mock_project, "1", 30, events, None) assert logger.info.call_args_list == [mock.call("Large DOM Mutations List:", extra=log)] @@ -1080,7 +1090,7 @@ def test_log_canvas_size(): log_canvas_size(1, 1, "a", []) -def test_emit_click_negative_node_id(): +def test_emit_click_negative_node_id(mock_project): """Test "get_user_actions" function.""" events = [ { @@ -1116,5 +1126,5 @@ def test_emit_click_negative_node_id(): } ] - user_actions = get_user_actions(1, uuid.uuid4().hex, events, None) + user_actions = get_user_actions(mock_project, uuid.uuid4().hex, events, None) assert len(user_actions) == 0