Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process delayed alert conditions in batches of 10,000 #75302

Merged
merged 20 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/sentry/buffer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class Buffer(Service):
"push_to_hash",
"get_sorted_set",
"get_hash",
"get_hash_length",
"delete_hash",
"delete_key",
)
Expand All @@ -54,6 +55,11 @@ def get_hash(
) -> dict[str, str]:
return {}

def get_hash_length(
self, model: type[models.Model], field: dict[str, models.Model | str | int]
) -> int:
raise NotImplementedError

def get_sorted_set(self, key: str, min: float, max: float) -> list[tuple[int, datetime]]:
return []

Expand All @@ -69,6 +75,14 @@ def push_to_hash(
) -> None:
return None

def push_to_hash_bulk(
self,
model: type[models.Model],
filters: dict[str, models.Model | str | int],
data: dict[str, str],
) -> None:
raise NotImplementedError

def delete_hash(
self,
model: type[models.Model],
Expand Down
18 changes: 18 additions & 0 deletions src/sentry/buffer/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,16 @@ def callback(self, buffer_hook_event: BufferHookEvent) -> bool:
redis_buffer_registry = BufferHookRegistry()


# Note HMSET is not supported after redis 4.0.0, after updating we can use HSET directly.
class RedisOperation(Enum):
SORTED_SET_ADD = "zadd"
SORTED_SET_GET_RANGE = "zrangebyscore"
SORTED_SET_DELETE_RANGE = "zremrangebyscore"
HASH_ADD = "hset"
HASH_ADD_BULK = "hmset"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to https://redis.io/docs/latest/commands/hmset/, this command is deprecated after 4.0 and can be replaced with hset which can take multiple key value pairs.
I'm not what version of redis we use tho

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, i had initially tried the hset with mapping syntax and it threw errors, so we'll need to use hmset until then.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ok, could we add a comment somewhere then referencing the future deprecation?

HASH_GET_ALL = "hgetall"
HASH_DELETE = "hdel"
HASH_LENGTH = "hlen"


class PendingBuffer:
Expand Down Expand Up @@ -296,6 +299,15 @@ def push_to_hash(
key = self._make_key(model, filters)
self._execute_redis_operation(key, RedisOperation.HASH_ADD, field, value)

def push_to_hash_bulk(
self,
model: type[models.Model],
filters: dict[str, models.Model | str | int],
data: dict[str, str],
) -> None:
key = self._make_key(model, filters)
self._execute_redis_operation(key, RedisOperation.HASH_ADD_BULK, data)

def get_hash(
self, model: type[models.Model], field: dict[str, models.Model | str | int]
) -> dict[str, str]:
Expand All @@ -311,6 +323,12 @@ def get_hash(

return decoded_hash

def get_hash_length(
self, model: type[models.Model], field: dict[str, models.Model | str | int]
) -> int:
key = self._make_key(model, field)
return self._execute_redis_operation(key, RedisOperation.HASH_LENGTH)

def process_batch(self) -> None:
try:
redis_buffer_registry.callback(BufferHookEvent.FLUSH)
Expand Down
5 changes: 5 additions & 0 deletions src/sentry/options/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -2636,3 +2636,8 @@
default=1,
flags=FLAG_AUTOMATOR_MODIFIABLE,
)
register(
"delayed_processing.batch_size",
default=10000,
flags=FLAG_AUTOMATOR_MODIFIABLE,
)
101 changes: 73 additions & 28 deletions src/sentry/rules/processing/delayed_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
from collections import defaultdict
from collections.abc import Sequence
from datetime import datetime, timedelta, timezone
from itertools import islice
from typing import Any, DefaultDict, NamedTuple

import sentry_sdk
from django.db.models import OuterRef, Subquery

from sentry import buffer, nodestore
from sentry import buffer, nodestore, options
from sentry.buffer.redis import BufferHookEvent, redis_buffer_registry
from sentry.db import models
from sentry.eventstore.models import Event, GroupEvent
from sentry.issues.issue_occurrence import IssueOccurrence
from sentry.models.group import Group
Expand Down Expand Up @@ -85,8 +87,15 @@ def fetch_project(project_id: int) -> Project | None:
return None


def fetch_rulegroup_to_event_data(project_id: int) -> dict[str, str]:
return buffer.backend.get_hash(model=Project, field={"project_id": project_id})
def fetch_rulegroup_to_event_data(project_id: int, batch_key: str | None = None) -> dict[str, str]:
field: dict[str, models.Model | int | str] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does this models.Model typing come from on the key 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's like a weird nested definition on the function call to get_hash for the field variable.

ideally i could not type this at all, and mypy would evaluate the field data type as dict[str, int | str] which adheres to dict[str, models.Model | int | str] (at least, that's how typescript works). unfortunately, it was throwing errors and if i typed it explicitly it threw the same errors.. since the dict does adhere to the type definition with models.Model, i just added it to appease the mypy overlords.

any recommendations on cleanup or better ways to appease the mypy?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that makes sense, I would think the implicit typing would work but I guess not lol
mypy is just a mystery sometimes 🔍

"project_id": project_id,
}

if batch_key:
field["batch_key"] = batch_key

return buffer.backend.get_hash(model=Project, field=field)


def get_rules_to_groups(rulegroup_to_event_data: dict[str, str]) -> DefaultDict[int, set[int]]:
Expand Down Expand Up @@ -447,13 +456,17 @@ def fire_rules(
safe_execute(callback, groupevent, futures)


def cleanup_redis_buffer(project_id: int, rules_to_groups: DefaultDict[int, set[int]]) -> None:
def cleanup_redis_buffer(
project_id: int, rules_to_groups: DefaultDict[int, set[int]], batch_key: str | None
) -> None:
hashes_to_delete = [
f"{rule}:{group}" for rule, groups in rules_to_groups.items() for group in groups
]
buffer.backend.delete_hash(
model=Project, filters={"project_id": project_id}, fields=hashes_to_delete
)
filters: dict[str, models.Model | str | int] = {"project_id": project_id}
if batch_key:
filters["batch_key"] = batch_key

buffer.backend.delete_hash(model=Project, filters=filters, fields=hashes_to_delete)


def bucket_num_groups(num_groups: int) -> str:
Expand All @@ -463,6 +476,55 @@ def bucket_num_groups(num_groups: int) -> str:
return "1"


def process_rulegroups_in_batches(project_id: int):
"""
This will check the number of rulegroup_to_event_data items in the Redis buffer for a project.

If the number is larger than the batch size, it will chunk the items and process them in batches.

The batches are replicated into a new redis hash with a unique filter (a uuid) to identify the batch.
We need to use a UUID because these batches can be created in multiple processes and we need to ensure
uniqueness across all of them for the centralized redis buffer. The batches are stored in redis because
we shouldn't pass objects that need to be pickled and 10k items could be problematic in the celery tasks
as arguments could be problematic. Finally, we can't use a pagination system on the data because
redis doesn't maintain the sort order of the hash keys.

`apply_delayed` will fetch the batch from redis and process the rules.
"""
batch_size = options.get("delayed_processing.batch_size")
event_count = buffer.backend.get_hash_length(Project, {"project_id": project_id})

if event_count < batch_size:
return apply_delayed.delayed(project_id)

logger.info(
"delayed_processing.process_large_batch",
extra={"project_id": project_id, "count": event_count},
)

# if the dictionary is large, get the items and chunk them.
rulegroup_to_event_data = fetch_rulegroup_to_event_data(project_id)

with metrics.timer("delayed_processing.process_batch.duration"):
items = iter(rulegroup_to_event_data.items())

while batch := dict(islice(items, batch_size)):
batch_key = str(uuid.uuid4())

buffer.backend.push_to_hash_bulk(
model=Project,
filters={"project_id": project_id, "batch_key": batch_key},
data=batch,
)

# remove the batched items from the project rulegroup_to_event_data
buffer.backend.delete_hash(
model=Project, filters={"project_id": project_id}, fields=list(batch.keys())
)

apply_delayed.delayed(project_id, batch_key)


def process_delayed_alert_conditions() -> None:
with metrics.timer("delayed_processing.process_all_conditions.duration"):
fetch_time = datetime.now(tz=timezone.utc)
Expand All @@ -473,7 +535,7 @@ def process_delayed_alert_conditions() -> None:
logger.info("delayed_processing.project_id_list", extra={"project_ids": log_str})

for project_id, _ in project_ids:
apply_delayed.delay(project_id)
process_rulegroups_in_batches(project_id)

buffer.backend.delete_key(PROJECT_ID_BUFFER_LIST_KEY, min=0, max=fetch_time.timestamp())

Expand All @@ -487,32 +549,15 @@ def process_delayed_alert_conditions() -> None:
time_limit=60,
silo_mode=SiloMode.REGION,
)
def apply_delayed(project_id: int, *args: Any, **kwargs: Any) -> None:
def apply_delayed(project_id: int, batch_key: str | None = None, *args: Any, **kwargs: Any) -> None:
"""
Grab rules, groups, and events from the Redis buffer, evaluate the "slow" conditions in a bulk snuba query, and fire them if they pass
"""
project = fetch_project(project_id)
if not project:
# Should we remove the project_id from the redis queue?
return

rulegroup_to_event_data = fetch_rulegroup_to_event_data(project_id)
num_groups = len(rulegroup_to_event_data)
num_groups_bucketed = bucket_num_groups(num_groups)
metrics.incr("delayed_processing.num_groups", tags={"num_groups": num_groups_bucketed})

if num_groups >= 10000:
logger.error(
"delayed_processing.too_many_groups",
extra={
"project_id": project_id,
"num_groups": num_groups,
"organization_id": project.organization_id,
},
)
# TODO @saponifi3d - Split the processing from here into smaller groups
return

rulegroup_to_event_data = fetch_rulegroup_to_event_data(project_id, batch_key)
rules_to_groups = get_rules_to_groups(rulegroup_to_event_data)
alert_rules = fetch_alert_rules(list(rules_to_groups.keys()))
condition_groups = get_condition_query_groups(alert_rules, rules_to_groups)
Expand Down Expand Up @@ -542,7 +587,7 @@ def apply_delayed(project_id: int, *args: Any, **kwargs: Any) -> None:
with metrics.timer("delayed_processing.fire_rules.duration"):
fire_rules(rules_to_fire, parsed_rulegroup_to_event_data, alert_rules, project)

cleanup_redis_buffer(project_id, rules_to_groups)
cleanup_redis_buffer(project_id, rules_to_groups, batch_key)


if not redis_buffer_registry.has(BufferHookEvent.FLUSH):
Expand Down
7 changes: 7 additions & 0 deletions tests/sentry/buffer/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from unittest import mock

from django.utils import timezone
from pytest import raises

from sentry.buffer.base import Buffer
from sentry.db import models
Expand Down Expand Up @@ -77,3 +78,9 @@ def test_signal_only(self, create_or_update):
self.buf.process(Group, columns, filters, {"last_seen": the_date}, signal_only=True)
group.refresh_from_db()
assert group.times_seen == prev_times_seen

def test_push_to_hash_bulk(self):
raises(NotImplementedError, self.buf.push_to_hash_bulk, Group, {"id": 1}, {"foo": "bar"})

def test_get_hash_length(self):
raises(NotImplementedError, self.buf.get_hash_length, Group, {"id": 1})
31 changes: 31 additions & 0 deletions tests/sentry/buffer/test_redis.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime
import pickle
from collections import defaultdict
from collections.abc import Mapping
from unittest import mock
from unittest.mock import Mock

Expand Down Expand Up @@ -367,6 +368,36 @@ def test_process_uses_signal_only(self, process):
self.buf.process("foo")
process.assert_called_once_with(mock.Mock, {"times_seen": 1}, {"pk": 1}, {}, True)

@mock.patch("sentry.buffer.redis.RedisBuffer._make_key", mock.Mock(return_value="foo"))
def test_get_hash_length(self):
client = get_cluster_routing_client(self.buf.cluster, self.buf.is_redis_cluster)
data: Mapping[str | bytes, bytes | float | int | str] = {
"f": '{"pk": ["i","1"]}',
"i+times_seen": "1",
"m": "unittest.mock.Mock",
"s": "1",
}

client.hmset("foo", data)
buffer_length = self.buf.get_hash_length("foo", field={"bar": 1})
assert buffer_length == len(data)

@mock.patch("sentry.buffer.redis.RedisBuffer._make_key", mock.Mock(return_value="foo"))
def test_push_to_hash_bulk(self):
def decode_dict(d):
return {k: v.decode("utf-8") if isinstance(v, bytes) else v for k, v in d.items()}

client = get_cluster_routing_client(self.buf.cluster, self.buf.is_redis_cluster)
data = {
"f": '{"pk": ["i","1"]}',
"i+times_seen": "1",
"m": "unittest.mock.Mock",
"s": "1",
}
self.buf.push_to_hash_bulk(model=Project, filters={"project_id": 1}, data=data)
result = _hgetall_decode_keys(client, "foo", self.buf.is_redis_cluster)
assert decode_dict(result) == data


# @mock.patch("sentry.buffer.redis.RedisBuffer._make_key", mock.Mock(return_value="foo"))
# def test_incr_uses_signal_only(self):
Expand Down
Loading
Loading