Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(replay): reduce postgres queries in recording consumer using get_from_cache #77365

Merged
merged 15 commits into from
Sep 20, 2024
Merged
4 changes: 3 additions & 1 deletion src/sentry/replays/consumers/recording_buffered.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
from sentry_kafka_schemas.schema_types.ingest_replay_recordings_v1 import ReplayRecording

from sentry.conf.types.kafka_definition import Topic, get_topic_codec
from sentry.models.project import Project
from sentry.replays.lib.storage import (
RecordingSegmentStorageMeta,
make_recording_filename,
Expand Down Expand Up @@ -294,8 +295,9 @@ def process_message(buffer: RecordingBuffer, message: bytes) -> None:
else None
)

project = Project.objects.get_from_cache(decoded_message["project_id"])
aliu39 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How much memory will this consume?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the cache? I don't know. Got the inspiration from occurrence_consumer.py

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already are usually caching the project for other places in ingest, so it probably won't be very much additional if any

replay_actions = parse_replay_actions(
decoded_message["project_id"],
project,
decoded_message["replay_id"],
decoded_message["retention_days"],
parsed_recording_data,
Expand Down
3 changes: 2 additions & 1 deletion src/sentry/replays/usecases/ingest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,10 @@ def recording_post_processor(
op="replays.usecases.ingest.parse_and_emit_replay_actions",
description="parse_and_emit_replay_actions",
):
project = Project.objects.get_from_cache(id=message.project_id)
aliu39 marked this conversation as resolved.
Show resolved Hide resolved
parse_and_emit_replay_actions(
retention_days=message.retention_days,
project_id=message.project_id,
project=project,
replay_id=message.replay_id,
segment_data=parsed_segment_data,
replay_event=parsed_replay_event,
Expand Down
71 changes: 31 additions & 40 deletions src/sentry/replays/usecases/ingest/dom_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ class ReplayActionsEvent(TypedDict):


def parse_and_emit_replay_actions(
project_id: int,
project: Project,
replay_id: str,
retention_days: int,
segment_data: list[dict[str, Any]],
replay_event: dict[str, Any] | None,
) -> None:
with metrics.timer("replays.usecases.ingest.dom_index.parse_and_emit_replay_actions"):
message = parse_replay_actions(
project_id, replay_id, retention_days, segment_data, replay_event
project, replay_id, retention_days, segment_data, replay_event
)
if message is not None:
emit_replay_actions(message)
Expand All @@ -82,19 +82,19 @@ def emit_replay_actions(action: ReplayActionsEvent) -> None:


def parse_replay_actions(
project_id: int,
project: Project,
replay_id: str,
retention_days: int,
segment_data: list[dict[str, Any]],
replay_event: dict[str, Any] | None,
) -> ReplayActionsEvent | None:
"""Parse RRWeb payload to ReplayActionsEvent."""
actions = get_user_actions(project_id, replay_id, segment_data, replay_event)
actions = get_user_actions(project, replay_id, segment_data, replay_event)
if len(actions) == 0:
return None

payload = create_replay_actions_payload(replay_id, actions)
return create_replay_actions_event(replay_id, project_id, retention_days, payload)
return create_replay_actions_event(replay_id, project.id, retention_days, payload)


def create_replay_actions_event(
Expand Down Expand Up @@ -153,7 +153,7 @@ def log_canvas_size(


def get_user_actions(
project_id: int,
project: Project,
replay_id: str,
events: list[dict[str, Any]],
replay_event: dict[str, Any] | None,
Expand Down Expand Up @@ -185,15 +185,15 @@ def get_user_actions(
tag = event.get("data", {}).get("tag")

if tag == "breadcrumb":
click = _handle_breadcrumb(event, project_id, replay_id, replay_event)
click = _handle_breadcrumb(event, project, replay_id, replay_event)
if click is not None:
result.append(click)
# look for request / response breadcrumbs and report metrics on them
if tag == "performanceSpan":
_handle_resource_metric_event(event)
# log the SDK options sent from the SDK 1/500 times
if tag == "options" and random.randint(0, 499) < 1:
_handle_options_logging_event(project_id, replay_id, event)
_handle_options_logging_event(project.id, replay_id, event)
# log large dom mutation breadcrumb events 1/100 times

payload = event.get("data", {}).get("payload", {})
Expand All @@ -203,7 +203,7 @@ def get_user_actions(
and payload.get("category") == "replay.mutations"
and random.randint(0, 500) < 1
):
_handle_mutations_event(project_id, replay_id, event)
_handle_mutations_event(project.id, replay_id, event)

return result

Expand Down Expand Up @@ -287,8 +287,7 @@ def _parse_classes(classes: str) -> list[str]:
return list(filter(lambda n: n != "", classes.split(" ")))[:10]


def _should_report_hydration_error_issue(project_id: int) -> bool:
project = Project.objects.get(id=project_id)
def _should_report_hydration_error_issue(project: Project) -> bool:
"""
The feature is controlled by Sentry admins for release of the feature,
while the project option is controlled by the project owner, and is a
Expand All @@ -300,27 +299,16 @@ def _should_report_hydration_error_issue(project_id: int) -> bool:
) and project.get_option("sentry:replay_hydration_error_issues")


def _should_report_rage_click_issue(project_id: int) -> bool:
project = Project.objects.get(id=project_id)

def _project_has_feature_enabled() -> bool:
"""
Check if the project has the feature flag enabled,
This is controlled by Sentry admins for release of the feature
"""
return features.has(
"organizations:session-replay-rage-click-issue-creation",
project.organization,
)

def _project_has_option_enabled() -> bool:
"""
Check if the project has the option enabled,
This is controlled by the project owner, and is a permanent setting
"""
return project.get_option("sentry:replay_rage_click_issues")

return all([_project_has_feature_enabled(), _project_has_option_enabled()])
def _should_report_rage_click_issue(project: Project) -> bool:
"""
The feature is controlled by Sentry admins for release of the feature,
while the project option is controlled by the project owner, and is a
permanent setting
"""
return features.has(
"organizations:session-replay-rage-click-issue-creation",
cmanallen marked this conversation as resolved.
Show resolved Hide resolved
project.organization,
aliu39 marked this conversation as resolved.
Show resolved Hide resolved
cmanallen marked this conversation as resolved.
Show resolved Hide resolved
) and project.get_option("sentry:replay_rage_click_issues")


def _iter_custom_events(events: list[dict[str, Any]]) -> Generator[dict[str, Any]]:
Expand Down Expand Up @@ -392,7 +380,10 @@ def _handle_mutations_event(project_id: int, replay_id: str, event: dict[str, An


def _handle_breadcrumb(
event: dict[str, Any], project_id: int, replay_id: str, replay_event: dict[str, Any] | None
event: dict[str, Any],
project: Project,
replay_id: str,
replay_event: dict[str, Any] | None,
) -> ReplayActionsEventPayloadClick | None:

click = None
Expand All @@ -417,15 +408,15 @@ def _handle_breadcrumb(
payload["data"].get("clickCount", 0) or payload["data"].get("clickcount", 0)
) >= 5
click = create_click_event(
payload, replay_id, is_dead=True, is_rage=is_rage, project_id=project_id
payload, replay_id, is_dead=True, is_rage=is_rage, project_id=project.id
)
if click is not None:
if is_rage:
metrics.incr("replay.rage_click_detected")
if _should_report_rage_click_issue(project_id):
if _should_report_rage_click_issue(project):
if replay_event is not None:
report_rage_click_issue_with_replay_event(
project_id,
project.id,
replay_id,
payload["timestamp"],
payload["message"],
Expand All @@ -436,24 +427,24 @@ def _handle_breadcrumb(
)
# Log the event for tracking.
log = event["data"].get("payload", {}).copy()
log["project_id"] = project_id
log["project_id"] = project.id
log["replay_id"] = replay_id
log["dom_tree"] = log.pop("message")

return click

elif category == "ui.click":
click = create_click_event(
payload, replay_id, is_dead=False, is_rage=False, project_id=project_id
payload, replay_id, is_dead=False, is_rage=False, project_id=project.id
)
if click is not None:
return click

elif category == "replay.hydrate-error":
metrics.incr("replay.hydration_error_breadcrumb")
if replay_event is not None and _should_report_hydration_error_issue(project_id):
if replay_event is not None and _should_report_hydration_error_issue(project):
report_hydration_error_issue_with_replay_event(
project_id,
project.id,
replay_id,
payload["timestamp"],
payload.get("data", {}).get("url"),
Expand Down
Loading