Skip to content

Commit

Permalink
ref(rules): Only fetch necessary data (#74229)
Browse files Browse the repository at this point in the history
Optimize queries by only fetching the strictly necessary data, and add a
metric around a rough estimate of how many groups we're processing.
  • Loading branch information
ceorourke authored Jul 16, 2024
1 parent 9274d7d commit c80d024
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 40 deletions.
120 changes: 81 additions & 39 deletions src/sentry/rules/conditions/event_frequency.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from sentry import release_health, tsdb
from sentry.eventstore.models import GroupEvent
from sentry.issues.constants import get_issue_tsdb_group_model, get_issue_tsdb_user_group_model
from sentry.issues.grouptype import GroupCategory
from sentry.issues.grouptype import GroupCategory, get_group_type_by_type_id
from sentry.models.group import Group
from sentry.rules import EventState
from sentry.rules.conditions.base import EventCondition, GenericCondition
Expand Down Expand Up @@ -326,7 +326,8 @@ def get_snuba_query_result(
self,
tsdb_function: Callable[..., Any],
keys: list[int],
group: Group,
group_id: int,
organization_id: int,
model: TSDBModel,
start: datetime,
end: datetime,
Expand All @@ -340,8 +341,8 @@ def get_snuba_query_result(
end=end,
environment_id=environment_id,
use_cache=True,
jitter_value=group.id,
tenant_ids={"organization_id": group.project.organization_id},
jitter_value=group_id,
tenant_ids={"organization_id": organization_id},
referrer_suffix=referrer_suffix,
)
return result
Expand All @@ -350,20 +351,22 @@ def get_chunked_result(
self,
tsdb_function: Callable[..., Any],
model: TSDBModel,
groups: list[Group],
group_ids: list[int],
organization_id: int,
start: datetime,
end: datetime,
environment_id: int,
referrer_suffix: str,
) -> dict[int, int]:
batch_totals: dict[int, int] = defaultdict(int)
group = groups[0]
for group_chunk in chunked(groups, SNUBA_LIMIT):
group_id = group_ids[0]
for group_chunk in chunked(group_ids, SNUBA_LIMIT):
result = self.get_snuba_query_result(
tsdb_function=tsdb_function,
model=model,
keys=[group.id for group in group_chunk],
group=group,
keys=[group_id for group_id in group_chunk],
group_id=group_id,
organization_id=organization_id,
start=start,
end=end,
environment_id=environment_id,
Expand All @@ -372,6 +375,30 @@ def get_chunked_result(
batch_totals.update(result)
return batch_totals

def get_error_and_generic_group_ids(
self, groups: list[tuple[int, int, int]]
) -> tuple[list[int], list[int]]:
"""
Separate group ids into error group ids and generic group ids
"""
generic_issue_ids = []
error_issue_ids = []

for group in groups:
issue_type = get_group_type_by_type_id(group[1])
if GroupCategory(issue_type.category) == GroupCategory.ERROR:
error_issue_ids.append(group[0])
else:
generic_issue_ids.append(group[0])
return (error_issue_ids, generic_issue_ids)

def get_organization_id_from_groups(self, groups: list[tuple[int, int, int]]) -> int | None:
organization_id = None
if groups:
group = groups[0]
organization_id = group[-1]
return organization_id


class EventFrequencyCondition(BaseEventFrequencyCondition):
id = "sentry.rules.conditions.event_frequency.EventFrequencyCondition"
Expand All @@ -383,7 +410,8 @@ def query_hook(
sums: Mapping[int, int] = self.get_snuba_query_result(
tsdb_function=self.tsdb.get_sums,
keys=[event.group_id],
group=event.group,
group_id=event.group.id,
organization_id=event.group.project.organization_id,
model=get_issue_tsdb_group_model(event.group.issue_category),
start=start,
end=end,
Expand All @@ -396,27 +424,32 @@ def batch_query_hook(
self, group_ids: set[int], start: datetime, end: datetime, environment_id: int
) -> dict[int, int]:
batch_sums: dict[int, int] = defaultdict(int)
groups = Group.objects.filter(id__in=group_ids)
error_issues = [group for group in groups if group.issue_category == GroupCategory.ERROR]
generic_issues = [group for group in groups if group.issue_category != GroupCategory.ERROR]
groups = Group.objects.filter(id__in=group_ids).values_list(
"id", "type", "project__organization_id"
)
error_issue_ids, generic_issue_ids = self.get_error_and_generic_group_ids(groups)
organization_id = self.get_organization_id_from_groups(groups)

if error_issues:
if error_issue_ids and organization_id:
error_sums = self.get_chunked_result(
tsdb_function=self.tsdb.get_sums,
model=get_issue_tsdb_group_model(error_issues[0].issue_category),
groups=error_issues,
model=get_issue_tsdb_group_model(GroupCategory.ERROR),
group_ids=error_issue_ids,
organization_id=organization_id,
start=start,
end=end,
environment_id=environment_id,
referrer_suffix="batch_alert_event_frequency",
)
batch_sums.update(error_sums)

if generic_issues:
if generic_issue_ids and organization_id:
generic_sums = self.get_chunked_result(
tsdb_function=self.tsdb.get_sums,
model=get_issue_tsdb_group_model(generic_issues[0].issue_category),
groups=generic_issues,
# this isn't necessarily performance, just any non-error category
model=get_issue_tsdb_group_model(GroupCategory.PERFORMANCE),
group_ids=generic_issue_ids,
organization_id=organization_id,
start=start,
end=end,
environment_id=environment_id,
Expand All @@ -440,7 +473,8 @@ def query_hook(
totals: Mapping[int, int] = self.get_snuba_query_result(
tsdb_function=self.tsdb.get_distinct_counts_totals,
keys=[event.group_id],
group=event.group,
group_id=event.group.id,
organization_id=event.group.project.organization_id,
model=get_issue_tsdb_user_group_model(event.group.issue_category),
start=start,
end=end,
Expand All @@ -453,27 +487,32 @@ def batch_query_hook(
self, group_ids: set[int], start: datetime, end: datetime, environment_id: int
) -> dict[int, int]:
batch_totals: dict[int, int] = defaultdict(int)
groups = Group.objects.filter(id__in=group_ids)
error_issues = [group for group in groups if group.issue_category == GroupCategory.ERROR]
generic_issues = [group for group in groups if group.issue_category != GroupCategory.ERROR]
groups = Group.objects.filter(id__in=group_ids).values_list(
"id", "type", "project__organization_id"
)
error_issue_ids, generic_issue_ids = self.get_error_and_generic_group_ids(groups)
organization_id = self.get_organization_id_from_groups(groups)

if error_issues:
if error_issue_ids and organization_id:
error_totals = self.get_chunked_result(
tsdb_function=self.tsdb.get_distinct_counts_totals,
model=get_issue_tsdb_user_group_model(error_issues[0].issue_category),
groups=error_issues,
model=get_issue_tsdb_user_group_model(GroupCategory.ERROR),
group_ids=error_issue_ids,
organization_id=organization_id,
start=start,
end=end,
environment_id=environment_id,
referrer_suffix="batch_alert_event_uniq_user_frequency",
)
batch_totals.update(error_totals)

if generic_issues:
if generic_issue_ids and organization_id:
generic_totals = self.get_chunked_result(
tsdb_function=self.tsdb.get_distinct_counts_totals,
model=get_issue_tsdb_user_group_model(generic_issues[0].issue_category),
groups=generic_issues,
# this isn't necessarily performance, just any non-error category
model=get_issue_tsdb_user_group_model(GroupCategory.PERFORMANCE),
group_ids=generic_issue_ids,
organization_id=organization_id,
start=start,
end=end,
environment_id=environment_id,
Expand Down Expand Up @@ -591,7 +630,8 @@ def query_hook(
issue_count = self.get_snuba_query_result(
tsdb_function=self.tsdb.get_sums,
keys=[event.group_id],
group=event.group,
group_id=event.group.id,
organization_id=event.group.project.organization_id,
model=get_issue_tsdb_group_model(event.group.issue_category),
start=start,
end=end,
Expand All @@ -618,21 +658,23 @@ def batch_query_hook(
self, group_ids: set[int], start: datetime, end: datetime, environment_id: int
) -> dict[int, int]:
batch_percents: dict[int, int] = defaultdict(int)
groups = Group.objects.filter(id__in=group_ids)
project_id = groups[0].project.id
session_count_last_hour = self.get_session_count(project_id, environment_id, start, end)
groups = Group.objects.filter(id__in=group_ids).values_list(
"id", "type", "project_id", "project__organization_id"
)
session_count_last_hour = self.get_session_count(groups[0][2], environment_id, start, end)
avg_sessions_in_interval = self.get_session_interval(
session_count_last_hour, self.get_option("interval")
)

if avg_sessions_in_interval:
error_issues = [
group for group in groups if group.issue_category == GroupCategory.ERROR
]
if error_issues:
error_issue_ids, _ = self.get_error_and_generic_group_ids(groups)
organization_id = self.get_organization_id_from_groups(groups)
if error_issue_ids and organization_id:
error_issue_count = self.get_chunked_result(
tsdb_function=self.tsdb.get_sums,
model=get_issue_tsdb_group_model(error_issues[0].issue_category),
groups=error_issues,
model=get_issue_tsdb_group_model(GroupCategory.ERROR),
group_ids=error_issue_ids,
organization_id=organization_id,
start=start,
end=end,
environment_id=environment_id,
Expand Down
11 changes: 11 additions & 0 deletions src/sentry/rules/processing/delayed_processing.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import math
import uuid
from collections import defaultdict
from datetime import datetime, timedelta, timezone
Expand Down Expand Up @@ -393,6 +394,13 @@ def get_group_to_groupevent(
return group_to_groupevent


def bucket_num_groups(num_groups: int) -> str:
if num_groups > 1:
magnitude = 10 ** int(math.log10(num_groups))
return f">{magnitude}"
return "1"


def process_delayed_alert_conditions() -> None:
with metrics.timer("delayed_processing.process_all_conditions.duration"):
fetch_time = datetime.now(tz=timezone.utc)
Expand Down Expand Up @@ -428,6 +436,9 @@ def apply_delayed(project_id: int, *args: Any, **kwargs: Any) -> None:
rulegroup_to_event_data = buffer.backend.get_hash(
model=Project, field={"project_id": project.id}
)
num_groups = len(rulegroup_to_event_data.keys())
num_groups_bucketed = bucket_num_groups(num_groups)
metrics.incr("delayed_processing.num_groups", tags={"num_groups": num_groups_bucketed})
logger.info(
"delayed_processing.rulegroupeventdata",
extra={"rulegroupdata": rulegroup_to_event_data, "project_id": project_id},
Expand Down
10 changes: 9 additions & 1 deletion tests/sentry/rules/processing/test_delayed_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
EventFrequencyCondition,
EventFrequencyConditionData,
)
from sentry.rules.processing.delayed_processing import ( # bulk_fetch_events; get_group_to_groupevent; parse_rulegroup_to_event_data,
from sentry.rules.processing.delayed_processing import (
DataAndGroups,
UniqueConditionQuery,
apply_delayed,
bucket_num_groups,
generate_unique_queries,
get_condition_group_results,
get_condition_query_groups,
Expand All @@ -44,6 +45,13 @@
TEST_RULE_FAST_CONDITION = {"id": "sentry.rules.conditions.every_event.EveryEventCondition"}


def test_bucket_num_groups():
assert bucket_num_groups(1) == "1"
assert bucket_num_groups(50) == ">10"
assert bucket_num_groups(101) == ">100"
assert bucket_num_groups(1001) == ">1000"


def mock_get_condition_group(descending=False):
"""
Mocks get_condition_groups to run with the passed in alert_rules in
Expand Down
12 changes: 12 additions & 0 deletions tests/snuba/rules/conditions/test_event_frequency.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from django.utils import timezone

from sentry.issues.grouptype import PerformanceNPlusOneGroupType
from sentry.models.group import Group
from sentry.models.project import Project
from sentry.models.rule import Rule
from sentry.rules.conditions.event_frequency import (
Expand Down Expand Up @@ -149,6 +150,17 @@ def test_batch_query(self):
)
assert batch_query == {self.event3.group_id: 1}

def test_get_error_and_generic_group_ids(self):
groups = Group.objects.filter(
id__in=[self.event.group_id, self.event2.group_id, self.perf_event.group_id]
).values_list("id", "type", "project__organization_id")
error_issue_ids, generic_issue_ids = self.condition_inst.get_error_and_generic_group_ids(
groups
)
assert self.event.group_id in error_issue_ids
assert self.event2.group_id in error_issue_ids
assert self.perf_event.group_id in generic_issue_ids


class EventUniqueUserFrequencyQueryTest(EventFrequencyQueryTestBase):
rule_cls = EventUniqueUserFrequencyCondition
Expand Down

0 comments on commit c80d024

Please sign in to comment.