Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(rules): Only fetch necessary data #74229

Merged
merged 12 commits into from
Jul 16, 2024
116 changes: 76 additions & 40 deletions src/sentry/rules/conditions/event_frequency.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from sentry import release_health, tsdb
from sentry.eventstore.models import GroupEvent
from sentry.issues.constants import get_issue_tsdb_group_model, get_issue_tsdb_user_group_model
from sentry.issues.grouptype import GroupCategory
from sentry.issues.grouptype import GroupCategory, get_group_type_by_type_id
from sentry.models.group import Group
from sentry.rules import EventState
from sentry.rules.conditions.base import EventCondition, GenericCondition
Expand Down Expand Up @@ -326,7 +326,8 @@ def get_snuba_query_result(
self,
tsdb_function: Callable[..., Any],
keys: list[int],
group: Group,
group_id: int,
organization_id: int,
model: TSDBModel,
start: datetime,
end: datetime,
Expand All @@ -340,8 +341,8 @@ def get_snuba_query_result(
end=end,
environment_id=environment_id,
use_cache=True,
jitter_value=group.id,
tenant_ids={"organization_id": group.project.organization_id},
jitter_value=group_id,
tenant_ids={"organization_id": organization_id},
referrer_suffix=referrer_suffix,
)
return result
Expand All @@ -350,20 +351,22 @@ def get_chunked_result(
self,
tsdb_function: Callable[..., Any],
model: TSDBModel,
groups: list[Group],
group_ids: list[int],
organization_id: int,
start: datetime,
end: datetime,
environment_id: int,
referrer_suffix: str,
) -> dict[int, int]:
batch_totals: dict[int, int] = defaultdict(int)
group = groups[0]
for group_chunk in chunked(groups, SNUBA_LIMIT):
group_id = group_ids[0]
for group_chunk in chunked(group_ids, SNUBA_LIMIT):
result = self.get_snuba_query_result(
tsdb_function=tsdb_function,
model=model,
keys=[group.id for group in group_chunk],
group=group,
keys=[group_id for group_id in group_chunk],
group_id=group_id,
organization_id=organization_id,
start=start,
end=end,
environment_id=environment_id,
Expand All @@ -372,6 +375,23 @@ def get_chunked_result(
batch_totals.update(result)
return batch_totals

def get_error_and_generic_group_ids(
self, groups: list[tuple[int, int, int]]
) -> tuple[list[int], list[int]]:
"""
Separate group ids into error group ids and generic group ids
"""
generic_issue_ids = []
error_issue_ids = []

for group in groups:
issue_type = get_group_type_by_type_id(group[1])
if GroupCategory(issue_type.category) == GroupCategory.ERROR:
error_issue_ids.append(group[0])
else:
generic_issue_ids.append(group[0])
return (error_issue_ids, generic_issue_ids)


class EventFrequencyCondition(BaseEventFrequencyCondition):
id = "sentry.rules.conditions.event_frequency.EventFrequencyCondition"
Expand All @@ -383,7 +403,8 @@ def query_hook(
sums: Mapping[int, int] = self.get_snuba_query_result(
tsdb_function=self.tsdb.get_sums,
keys=[event.group_id],
group=event.group,
group_id=event.group.id,
organization_id=event.group.project.organization_id,
model=get_issue_tsdb_group_model(event.group.issue_category),
start=start,
end=end,
Expand All @@ -396,27 +417,36 @@ def batch_query_hook(
self, group_ids: set[int], start: datetime, end: datetime, environment_id: int
) -> dict[int, int]:
batch_sums: dict[int, int] = defaultdict(int)
groups = Group.objects.filter(id__in=group_ids)
error_issues = [group for group in groups if group.issue_category == GroupCategory.ERROR]
generic_issues = [group for group in groups if group.issue_category != GroupCategory.ERROR]

if error_issues:
groups = Group.objects.filter(id__in=group_ids).values_list(
"id", "type", "project__organization_id"
)
error_issue_ids, generic_issue_ids = self.get_error_and_generic_group_ids(groups)
organization_id = None
if groups:
group = groups[0]
if len(group) == 3:
organization_id = group[3]
ceorourke marked this conversation as resolved.
Show resolved Hide resolved

if error_issue_ids:
error_sums = self.get_chunked_result(
tsdb_function=self.tsdb.get_sums,
model=get_issue_tsdb_group_model(error_issues[0].issue_category),
groups=error_issues,
model=get_issue_tsdb_group_model(GroupCategory.ERROR),
group_ids=error_issue_ids,
organization_id=organization_id,
start=start,
end=end,
environment_id=environment_id,
referrer_suffix="batch_alert_event_frequency",
)
batch_sums.update(error_sums)

if generic_issues:
if generic_issue_ids:
generic_sums = self.get_chunked_result(
tsdb_function=self.tsdb.get_sums,
model=get_issue_tsdb_group_model(generic_issues[0].issue_category),
groups=generic_issues,
# this isn't necessarily performance, just any non-error category
model=get_issue_tsdb_group_model(GroupCategory.PERFORMANCE),
group_ids=generic_issue_ids,
organization_id=organization_id,
start=start,
end=end,
environment_id=environment_id,
Expand All @@ -440,7 +470,8 @@ def query_hook(
totals: Mapping[int, int] = self.get_snuba_query_result(
tsdb_function=self.tsdb.get_distinct_counts_totals,
keys=[event.group_id],
group=event.group,
group_id=event.group.id,
organization_id=event.group.project.organization_id,
model=get_issue_tsdb_user_group_model(event.group.issue_category),
start=start,
end=end,
Expand All @@ -453,27 +484,31 @@ def batch_query_hook(
self, group_ids: set[int], start: datetime, end: datetime, environment_id: int
) -> dict[int, int]:
batch_totals: dict[int, int] = defaultdict(int)
groups = Group.objects.filter(id__in=group_ids)
error_issues = [group for group in groups if group.issue_category == GroupCategory.ERROR]
generic_issues = [group for group in groups if group.issue_category != GroupCategory.ERROR]
groups = Group.objects.filter(id__in=group_ids).values_list(
"id", "type", "project__organization_id"
)
error_issue_ids, generic_issue_ids = self.get_error_and_generic_group_ids(groups)

if error_issues:
if error_issue_ids:
error_totals = self.get_chunked_result(
tsdb_function=self.tsdb.get_distinct_counts_totals,
model=get_issue_tsdb_user_group_model(error_issues[0].issue_category),
groups=error_issues,
model=get_issue_tsdb_user_group_model(GroupCategory.ERROR),
group_ids=error_issue_ids,
organization_id=groups[0][2],
start=start,
end=end,
environment_id=environment_id,
referrer_suffix="batch_alert_event_uniq_user_frequency",
)
batch_totals.update(error_totals)

if generic_issues:
if generic_issue_ids:
generic_totals = self.get_chunked_result(
tsdb_function=self.tsdb.get_distinct_counts_totals,
model=get_issue_tsdb_user_group_model(generic_issues[0].issue_category),
groups=generic_issues,
# this isn't necessarily performance, just any non-error category
model=get_issue_tsdb_user_group_model(GroupCategory.PERFORMANCE),
group_ids=generic_issue_ids,
organization_id=groups[0][2],
start=start,
end=end,
environment_id=environment_id,
Expand Down Expand Up @@ -591,7 +626,8 @@ def query_hook(
issue_count = self.get_snuba_query_result(
tsdb_function=self.tsdb.get_sums,
keys=[event.group_id],
group=event.group,
group_id=event.group.id,
organization_id=event.group.project.organization_id,
model=get_issue_tsdb_group_model(event.group.issue_category),
start=start,
end=end,
Expand All @@ -618,21 +654,21 @@ def batch_query_hook(
self, group_ids: set[int], start: datetime, end: datetime, environment_id: int
) -> dict[int, int]:
batch_percents: dict[int, int] = defaultdict(int)
groups = Group.objects.filter(id__in=group_ids)
project_id = groups[0].project.id
session_count_last_hour = self.get_session_count(project_id, environment_id, start, end)
groups = Group.objects.filter(id__in=group_ids).values_list(
"id", "type", "project_id", "project__organization_id"
)
session_count_last_hour = self.get_session_count(groups[0][2], environment_id, start, end)
avg_sessions_in_interval = self.get_session_interval(
session_count_last_hour, self.get_option("interval")
)
if avg_sessions_in_interval:
error_issues = [
group for group in groups if group.issue_category == GroupCategory.ERROR
]
if error_issues:
error_issue_ids, _ = self.get_error_and_generic_group_ids(groups)
if error_issue_ids:
error_issue_count = self.get_chunked_result(
tsdb_function=self.tsdb.get_sums,
model=get_issue_tsdb_group_model(error_issues[0].issue_category),
groups=error_issues,
model=get_issue_tsdb_group_model(GroupCategory.ERROR),
group_ids=error_issue_ids,
organization_id=groups[0][3],
start=start,
end=end,
environment_id=environment_id,
Expand Down
11 changes: 11 additions & 0 deletions src/sentry/rules/processing/delayed_processing.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import math
import uuid
from collections import defaultdict
from datetime import datetime, timedelta, timezone
Expand Down Expand Up @@ -393,6 +394,13 @@ def get_group_to_groupevent(
return group_to_groupevent


def bucket_num_groups(num_groups: int) -> str:
if num_groups > 1:
magnitude = 10 ** int(math.log10(num_groups))
return f">{magnitude}"
return "1"


def process_delayed_alert_conditions() -> None:
with metrics.timer("delayed_processing.process_all_conditions.duration"):
fetch_time = datetime.now(tz=timezone.utc)
Expand Down Expand Up @@ -428,6 +436,9 @@ def apply_delayed(project_id: int, *args: Any, **kwargs: Any) -> None:
rulegroup_to_event_data = buffer.backend.get_hash(
model=Project, field={"project_id": project.id}
)
num_groups = len(rulegroup_to_event_data.keys())
num_groups_bucketed = bucket_num_groups(num_groups)
metrics.incr("delayed_processing.num_groups", tags={"num_groups": num_groups_bucketed})
logger.info(
"delayed_processing.rulegroupeventdata",
extra={"rulegroupdata": rulegroup_to_event_data, "project_id": project_id},
Expand Down
10 changes: 9 additions & 1 deletion tests/sentry/rules/processing/test_delayed_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
EventFrequencyCondition,
EventFrequencyConditionData,
)
from sentry.rules.processing.delayed_processing import ( # bulk_fetch_events; get_group_to_groupevent; parse_rulegroup_to_event_data,
from sentry.rules.processing.delayed_processing import (
DataAndGroups,
UniqueConditionQuery,
apply_delayed,
bucket_num_groups,
generate_unique_queries,
get_condition_group_results,
get_condition_query_groups,
Expand All @@ -44,6 +45,13 @@
TEST_RULE_FAST_CONDITION = {"id": "sentry.rules.conditions.every_event.EveryEventCondition"}


def test_bucket_num_groups():
assert bucket_num_groups(1) == "1"
assert bucket_num_groups(50) == ">10"
assert bucket_num_groups(101) == ">100"
assert bucket_num_groups(1001) == ">1000"


def mock_get_condition_group(descending=False):
"""
Mocks get_condition_groups to run with the passed in alert_rules in
Expand Down
12 changes: 12 additions & 0 deletions tests/snuba/rules/conditions/test_event_frequency.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from django.utils import timezone

from sentry.issues.grouptype import PerformanceNPlusOneGroupType
from sentry.models.group import Group
from sentry.models.project import Project
from sentry.models.rule import Rule
from sentry.rules.conditions.event_frequency import (
Expand Down Expand Up @@ -149,6 +150,17 @@ def test_batch_query(self):
)
assert batch_query == {self.event3.group_id: 1}

def test_get_error_and_generic_group_ids(self):
groups = Group.objects.filter(
id__in=[self.event.group_id, self.event2.group_id, self.perf_event.group_id]
).values_list("id", "type", "project__organization_id")
error_issue_ids, generic_issue_ids = self.condition_inst.get_error_and_generic_group_ids(
groups
)
assert self.event.group_id in error_issue_ids
assert self.event2.group_id in error_issue_ids
assert self.perf_event.group_id in generic_issue_ids


class EventUniqueUserFrequencyQueryTest(EventFrequencyQueryTestBase):
rule_cls = EventUniqueUserFrequencyCondition
Expand Down
Loading