Skip to content

Commit

Permalink
ref: fix typing for sentry.digests.notifications
Browse files Browse the repository at this point in the history
  • Loading branch information
asottile-sentry committed Aug 9, 2024
1 parent b3e4d0f commit f535e97
Show file tree
Hide file tree
Showing 11 changed files with 191 additions and 320 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ module = [
"sentry.db.mixin",
"sentry.db.postgres.base",
"sentry.db.router",
"sentry.digests.notifications",
"sentry.discover.endpoints.discover_key_transactions",
"sentry.eventstore.models",
"sentry.features.handler",
Expand Down Expand Up @@ -468,6 +467,7 @@ module = [
"sentry.db.models.manager.*",
"sentry.db.models.paranoia",
"sentry.db.models.utils",
"sentry.digests.notifications",
"sentry.eventstore.reprocessing.redis",
"sentry.eventtypes.error",
"sentry.grouping.component",
Expand Down
236 changes: 104 additions & 132 deletions src/sentry/digests/notifications.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from __future__ import annotations

import functools
import itertools
import logging
from collections import defaultdict
from collections.abc import Mapping, MutableMapping, Sequence
from typing import Any, TypeAlias
from collections.abc import Sequence
from typing import NamedTuple, TypeAlias

from sentry import tsdb
from sentry.digests.types import Notification, Record, RecordWithRuleObjects
Expand All @@ -15,11 +13,16 @@
from sentry.models.rule import Rule
from sentry.notifications.types import ActionTargetType, FallthroughChoiceType
from sentry.tsdb.base import TSDBModel
from sentry.utils.pipeline import Pipeline

logger = logging.getLogger("sentry.digests")

Digest: TypeAlias = dict["Rule", dict["Group", list[RecordWithRuleObjects]]]
Digest: TypeAlias = dict[Rule, dict[Group, list[RecordWithRuleObjects]]]


class DigestInfo(NamedTuple):
digest: Digest
event_counts: dict[int, int]
user_counts: dict[int, int]


def split_key(
Expand Down Expand Up @@ -72,148 +75,117 @@ def event_to_record(
)


def fetch_state(project: Project, records: Sequence[Record]) -> Mapping[str, Any]:
# This reads a little strange, but remember that records are returned in
# reverse chronological order, and we query the database in chronological
# order.
# NOTE: This doesn't account for any issues that are filtered out later.
start = records[-1].datetime
end = records[0].datetime

groups = Group.objects.in_bulk(record.value.event.group_id for record in records)
tenant_ids = {"organization_id": project.organization_id}
return {
"project": project,
"groups": groups,
"rules": Rule.objects.in_bulk(
itertools.chain.from_iterable(record.value.rules for record in records)
),
"event_counts": tsdb.backend.get_sums(
TSDBModel.group,
list(groups.keys()),
start,
end,
tenant_ids=tenant_ids,
),
"user_counts": tsdb.backend.get_distinct_counts_totals(
TSDBModel.users_affected_by_group,
list(groups.keys()),
start,
end,
tenant_ids=tenant_ids,
),
}


def attach_state(
project: Project,
groups: MutableMapping[int, Group],
rules: Mapping[int, Rule],
event_counts: Mapping[int, int],
user_counts: Mapping[int, int],
) -> Mapping[str, Any]:
for id, group in groups.items():
assert group.project_id == project.id, "Group must belong to Project"
group.project = project
group.event_count = 0
group.user_count = 0

for id, rule in rules.items():
assert rule.project_id == project.id, "Rule must belong to Project"
rule.project = project

for id, event_count in event_counts.items():
groups[id].event_count = event_count

for id, user_count in user_counts.items():
groups[id].user_count = user_count

return {"project": project, "groups": groups, "rules": rules}


def rewrite_record(
record: Record,
project: Project,
groups: Mapping[int, Group],
rules: Mapping[str, Rule],
) -> RecordWithRuleObjects | None:
event = record.value.event

# Reattach the group to the event.
if event.group_id is None:
group = None
else:
group = groups.get(event.group_id)
if group is not None:
event.group = group
else:
logger.debug("%s could not be associated with a group.", record)
return None

rules = [_f for _f in [rules.get(id) for id in record.value.rules] if _f]
return record.with_rules(rules)


def group_records(groups: Digest, record: RecordWithRuleObjects) -> Digest:
group = record.value.event.group
rules = record.value.rules
if not rules:
logger.debug("%s has no associated rules, and will not be added to any groups.", record)

for rule in rules:
groups[rule][group].append(record)

return groups


def sort_group_contents(rules: Digest) -> Digest:
for key, groups in rules.items():
rules[key] = dict(
def _bind_records(
records: Sequence[Record], groups: dict[int, Group], rules: dict[int, Rule]
) -> list[RecordWithRuleObjects]:
ret = []
for record in records:
if record.value.event.group_id is None:
continue
group = groups.get(record.value.event.group_id)
if group is None:
logger.debug("%s could not be associated with a group.", record)
continue
elif group.get_status() != GroupStatus.UNRESOLVED:
continue

record.value.event.group = group

record_rules = [
rule
for rule in (rules.get(rule_id) for rule_id in record.value.rules)
if rule is not None
]
ret.append(record.with_rules(record_rules))

return ret


def _group_records(
records: Sequence[RecordWithRuleObjects], groups: dict[int, Group], rules: dict[int, Rule]
) -> Digest:
grouped: Digest = defaultdict(lambda: defaultdict(list))
for record in records:
assert record.value.event.group is not None
for rule in record.value.rules:
grouped[rule][record.value.event.group].append(record)
return grouped


def _sort_digest(
digest: Digest, event_counts: dict[int, int], user_counts: dict[int, int]
) -> Digest:
# sort inner groups dict by (event_count, user_count) descending
for key, rule_groups in digest.items():
digest[key] = dict(
sorted(
groups.items(),
rule_groups.items(),
# x = (group, records)
key=lambda x: (x[0].event_count, x[0].user_count),
key=lambda x: (event_counts[x[0].id], user_counts[x[0].id]),
reverse=True,
)
)
return rules


def sort_rule_groups(rules: Digest) -> Digest:
# sort outer rules dict by number of groups (descending)
return dict(
sorted(
rules.items(),
digest.items(),
# x = (rule, groups)
key=lambda x: len(x[1]),
reverse=True,
)
)


def check_group_state(record: Record) -> bool:
return record.value.event.group.get_status() == GroupStatus.UNRESOLVED


def build_digest(
project: Project,
def _build_digest_impl(
records: Sequence[Record],
state: Mapping[str, Any] | None = None,
) -> tuple[Digest | None, Sequence[str]]:
groups: dict[int, Group],
rules: dict[int, Rule],
event_counts: dict[int, int],
user_counts: dict[int, int],
) -> Digest:
# sans-io implementation details
bound_records = _bind_records(records, groups, rules)
grouped = _group_records(bound_records, groups, rules)
return _sort_digest(grouped, event_counts=event_counts, user_counts=user_counts)


def build_digest(project: Project, records: Sequence[Record]) -> DigestInfo:
if not records:
return None, []

# XXX(hack): Allow generating a mock digest without actually doing any real IO!
state = state or fetch_state(project, records)

pipeline = (
Pipeline()
.map(functools.partial(rewrite_record, **attach_state(**state)))
.filter(bool)
.filter(check_group_state)
.reduce(group_records, lambda sequence: defaultdict(lambda: defaultdict(list)))
.apply(sort_group_contents)
.apply(sort_rule_groups)
return DigestInfo({}, {}, {})

# This reads a little strange, but remember that records are returned in
# reverse chronological order, and we query the database in chronological
# order.
# NOTE: This doesn't account for any issues that are filtered out later.
start = records[-1].datetime
end = records[0].datetime

groups = Group.objects.in_bulk(record.value.event.group_id for record in records)
group_ids = list(groups)
rules = Rule.objects.in_bulk(rule_id for record in records for rule_id in record.value.rules)

for group_id, g in groups.items():
assert g.project_id == project.id, "Group must belong to Project"
for rule_id, rule in rules.items():
assert rule.project_id == project.id, "Rule must belong to Project"

tenant_ids = {"organization_id": project.organization_id}
event_counts = tsdb.backend.get_sums(
TSDBModel.group,
group_ids,
start,
end,
tenant_ids=tenant_ids,
)
user_counts = tsdb.backend.get_distinct_counts_totals(
TSDBModel.users_affected_by_group,
group_ids,
start,
end,
tenant_ids=tenant_ids,
)

digest, logs = pipeline(records)
return digest, logs
digest = _build_digest_impl(records, groups, rules, event_counts, user_counts)

return DigestInfo(digest, event_counts, user_counts)
4 changes: 2 additions & 2 deletions src/sentry/mail/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from sentry import digests
from sentry.digests import get_option_key as get_digest_option_key
from sentry.digests.notifications import Digest, event_to_record, unsplit_key
from sentry.digests.notifications import DigestInfo, event_to_record, unsplit_key
from sentry.integrations.types import ExternalProviders
from sentry.models.options.project_option import ProjectOption
from sentry.models.project import Project
Expand Down Expand Up @@ -146,7 +146,7 @@ def notify(
@staticmethod
def notify_digest(
project: Project,
digest: Digest,
digest: DigestInfo,
target_type: ActionTargetType,
target_identifier: int | None = None,
fallthrough_choice: FallthroughChoiceType | None = None,
Expand Down
20 changes: 12 additions & 8 deletions src/sentry/notifications/notifications/digest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from sentry import analytics, features
from sentry.db.models import Model
from sentry.digests.notifications import Digest
from sentry.digests.notifications import DigestInfo
from sentry.digests.utils import (
get_digest_as_context,
get_participants_by_event,
Expand Down Expand Up @@ -49,7 +49,7 @@ class DigestNotification(ProjectNotification):
def __init__(
self,
project: Project,
digest: Digest,
digest: DigestInfo,
target_type: ActionTargetType,
target_identifier: int | None = None,
fallthrough_choice: FallthroughChoiceType | None = None,
Expand Down Expand Up @@ -106,7 +106,9 @@ def reference(self) -> Model | None:
return self.project

def get_context(self) -> MutableMapping[str, Any]:
rule_details = get_rules(list(self.digest.keys()), self.project.organization, self.project)
rule_details = get_rules(
list(self.digest.digest.keys()), self.project.organization, self.project
)
context = DigestNotification.build_context(
self.digest,
self.project,
Expand All @@ -130,7 +132,7 @@ def get_context(self) -> MutableMapping[str, Any]:

@staticmethod
def build_context(
digest: Digest,
digest: DigestInfo,
project: Project,
organization: Organization,
rule_details: Sequence[NotificationRuleDetails],
Expand All @@ -140,7 +142,9 @@ def build_context(
has_session_replay = features.has("organizations:session-replay", organization)
show_replay_link = features.has("organizations:session-replay-issue-emails", organization)
return {
**get_digest_as_context(digest),
**get_digest_as_context(digest.digest),
"event_counts": digest.event_counts,
"user_counts": digest.user_counts,
"has_alert_integration": has_alert_integration(project),
"project": project,
"slack_link": get_integration_link(organization, "slack"),
Expand All @@ -160,7 +164,7 @@ def get_extra_context(
participants_by_provider_by_event: Mapping[Event, Mapping[ExternalProviders, set[Actor]]],
) -> Mapping[Actor, Mapping[str, Any]]:
personalized_digests = get_personalized_digests(
self.digest, participants_by_provider_by_event
self.digest.digest, participants_by_provider_by_event
)
return {
actor: get_digest_as_context(digest) for actor, digest in personalized_digests.items()
Expand All @@ -176,7 +180,7 @@ def send(self) -> None:
)

participants_by_provider_by_event = get_participants_by_event(
self.digest,
self.digest.digest,
self.project,
self.target_type,
self.target_identifier,
Expand Down Expand Up @@ -230,7 +234,7 @@ def send(self) -> None:

def get_log_params(self, recipient: Actor) -> Mapping[str, Any]:
try:
alert_id = list(self.digest.keys())[0].id
alert_id = list(self.digest.digest.keys())[0].id
except Exception:
alert_id = None

Expand Down
Loading

0 comments on commit f535e97

Please sign in to comment.