Skip to content

Commit

Permalink
ref: fix typing for sentry.digests.notifications
Browse files Browse the repository at this point in the history
  • Loading branch information
asottile-sentry committed Jun 7, 2024
1 parent b71f323 commit cadc564
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 212 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,6 @@ module = [
"sentry.db.models.utils",
"sentry.db.postgres.base",
"sentry.db.router",
"sentry.digests.notifications",
"sentry.discover.endpoints.discover_key_transactions",
"sentry.eventstore.models",
"sentry.features.handler",
Expand Down Expand Up @@ -529,6 +528,7 @@ module = [
"sentry.api.helpers.source_map_helper",
"sentry.buffer.*",
"sentry.build.*",
"sentry.digests.notifications",
"sentry.eventstore.reprocessing.redis",
"sentry.grouping.parameterization",
"sentry.hybridcloud",
Expand Down
11 changes: 9 additions & 2 deletions src/sentry/digests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import datetime as datetime_mod
from collections.abc import MutableMapping
from collections.abc import Sequence
from typing import TYPE_CHECKING, Any, NamedTuple, TypeAlias

from django.conf import settings
Expand All @@ -13,6 +13,7 @@
from .backends.dummy import DummyBackend

if TYPE_CHECKING:
from sentry.eventstore.models import Event
from sentry.models.group import Group
from sentry.models.rule import Rule

Expand All @@ -22,6 +23,12 @@
backend.expose(locals())


class Notification(NamedTuple):
event: Event
rules: Sequence[int] = ()
notification_uuid: str | None = None


class Record(NamedTuple):
key: str
value: Any # TODO: I think this is `Notification` ?
Expand All @@ -39,7 +46,7 @@ class ScheduleEntry(NamedTuple):

OPTIONS = frozenset(("increment_delay", "maximum_delay", "minimum_delay"))

Digest: TypeAlias = MutableMapping["Rule", MutableMapping["Group", list[Record]]]
Digest: TypeAlias = dict["Rule", dict["Group", list[Record]]]


def get_option_key(plugin: str, option: str) -> str:
Expand Down
180 changes: 50 additions & 130 deletions src/sentry/digests/notifications.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,21 @@
from __future__ import annotations

import functools
import itertools
import logging
from collections import defaultdict, namedtuple
from collections.abc import Mapping, MutableMapping, MutableSequence, Sequence
from typing import Any
from collections import defaultdict
from collections.abc import Sequence

from sentry import tsdb
from sentry.digests import Digest, Record
from sentry.digests import Digest, Notification, Record
from sentry.eventstore.models import Event
from sentry.models.group import Group, GroupStatus
from sentry.models.project import Project
from sentry.models.rule import Rule
from sentry.notifications.types import ActionTargetType, FallthroughChoiceType
from sentry.tsdb.base import TSDBModel
from sentry.utils.pipeline import Pipeline

logger = logging.getLogger("sentry.digests")

Notification = namedtuple(
"Notification", "event rules notification_uuid", defaults=(None, None, None)
)


def split_key(
key: str,
Expand Down Expand Up @@ -74,7 +67,13 @@ def event_to_record(
)


def fetch_state(project: Project, records: Sequence[Record]) -> Mapping[str, Any]:
def build_digest(
project: Project,
records: Sequence[Record],
) -> tuple[Digest | None, list[str]]:
if not records:
return None, []

# This reads a little strange, but remember that records are returned in
# reverse chronological order, and we query the database in chronological
# order.
Expand All @@ -83,147 +82,68 @@ def fetch_state(project: Project, records: Sequence[Record]) -> Mapping[str, Any
end = records[0].datetime

groups = Group.objects.in_bulk(record.value.event.group_id for record in records)
tenant_ids = {"organization_id": project.organization_id}
return {
"project": project,
"groups": groups,
"rules": Rule.objects.in_bulk(
itertools.chain.from_iterable(record.value.rules for record in records)
),
"event_counts": tsdb.backend.get_sums(
TSDBModel.group,
list(groups.keys()),
start,
end,
tenant_ids=tenant_ids,
),
"user_counts": tsdb.backend.get_distinct_counts_totals(
TSDBModel.users_affected_by_group,
list(groups.keys()),
start,
end,
tenant_ids=tenant_ids,
),
}


def attach_state(
project: Project,
groups: MutableMapping[int, Group],
rules: Mapping[int, Rule],
event_counts: Mapping[int, int],
user_counts: Mapping[int, int],
) -> Mapping[str, Any]:
for id, group in groups.items():
assert group.project_id == project.id, "Group must belong to Project"
group.project = project
group.event_count = 0
group.user_count = 0
rules = Rule.objects.in_bulk(
itertools.chain.from_iterable(record.value.rules for record in records)
)

for id, rule in rules.items():
for group_id, group in groups.items():
assert group.project_id == project.id, "Group must belong to Project"
for rule_id, rule in rules.items():
assert rule.project_id == project.id, "Rule must belong to Project"
rule.project = project

for id, event_count in event_counts.items():
groups[id].event_count = event_count

for id, user_count in user_counts.items():
groups[id].user_count = user_count

return {"project": project, "groups": groups, "rules": rules}


def rewrite_record(
record: Record,
project: Project,
groups: Mapping[int, Group],
rules: Mapping[str, Rule],
) -> Record | None:
event = record.value.event

# Reattach the group to the event.
group = groups.get(event.group_id)
if group is not None:
event.group = group
else:
logger.debug("%s could not be associated with a group.", record)
return None

return Record(
record.key,
Notification(
event,
[_f for _f in [rules.get(id) for id in record.value.rules] if _f],
record.value.notification_uuid,
),
record.timestamp,
tenant_ids = {"organization_id": project.organization_id}
event_counts = tsdb.backend.get_sums(
TSDBModel.group,
list(groups),
start,
end,
tenant_ids=tenant_ids,
)
user_counts = tsdb.backend.get_distinct_counts_totals(
TSDBModel.users_affected_by_group,
list(groups),
start,
end,
tenant_ids=tenant_ids,
)

grouped: Digest = defaultdict(lambda: defaultdict(list))
for record in records:
# Reattach the group to the event.
group = groups.get(record.value.event.group_id)
if group is not None:
record.value.event.group = group
else:
logger.debug("%s could not be associated with a group.", record)
continue

def group_records(
groups: MutableMapping[str, Mapping[str, MutableSequence[Record]]], record: Record
) -> MutableMapping[str, Mapping[str, MutableSequence[Record]]]:
group = record.value.event.group
rules = record.value.rules
if not rules:
logger.debug("%s has no associated rules, and will not be added to any groups.", record)
if record.value.event.group.get_status() != GroupStatus.UNRESOLVED:
continue

for rule in rules:
groups[rule][group].append(record)
record_rules = [_f for _f in (rules.get(rule_id) for rule_id in record.value.rules) if _f]

return groups
if not record_rules:
logger.debug("%s has no associated rules, and will not be added to any groups.", record)

for rule in record_rules:
grouped[rule][group].append(record)

def sort_group_contents(
rules: MutableMapping[str, Mapping[Group, Sequence[Record]]]
) -> Mapping[str, Mapping[Group, Sequence[Record]]]:
for key, groups in rules.items():
rules[key] = dict(
grouped[key] = dict(
sorted(
groups.items(),
# x = (group, records)
key=lambda x: (x[0].event_count, x[0].user_count),
key=lambda x: (event_counts[x[0].id], user_counts[x[0].id]),
reverse=True,
)
)
return rules


def sort_rule_groups(rules: Mapping[str, Rule]) -> Mapping[str, Rule]:
return dict(
grouped = dict(
sorted(
rules.items(),
# x = (rule, groups)
key=lambda x: len(x[1]),
reverse=True,
)
)


def check_group_state(record: Record) -> bool:
return record.value.event.group.get_status() == GroupStatus.UNRESOLVED


def build_digest(
project: Project,
records: Sequence[Record],
state: Mapping[str, Any] | None = None,
) -> tuple[Digest | None, Sequence[str]]:
if not records:
return None, []

# XXX(hack): Allow generating a mock digest without actually doing any real IO!
state = state or fetch_state(project, records)

pipeline = (
Pipeline()
.map(functools.partial(rewrite_record, **attach_state(**state)))
.filter(bool)
.filter(check_group_state)
.reduce(group_records, lambda sequence: defaultdict(lambda: defaultdict(list)))
.apply(sort_group_contents)
.apply(sort_rule_groups)
)

digest, logs = pipeline(records)
return digest, logs
return grouped, []
79 changes: 0 additions & 79 deletions src/sentry/utils/pipeline.py

This file was deleted.

0 comments on commit cadc564

Please sign in to comment.