Skip to content

Commit

Permalink
Revert the revert for #75302 (#75412)
Browse files Browse the repository at this point in the history
# Description
Revert of Revert for: #75302 

`.delayed` vs `.delay` 🤦‍♂️ all pertinent changes are in:
482439a
  • Loading branch information
saponifi3d committed Aug 1, 2024
1 parent 2339fbc commit dc5d329
Show file tree
Hide file tree
Showing 7 changed files with 279 additions and 45 deletions.
14 changes: 14 additions & 0 deletions src/sentry/buffer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class Buffer(Service):
"push_to_hash",
"get_sorted_set",
"get_hash",
"get_hash_length",
"delete_hash",
"delete_key",
)
Expand All @@ -54,6 +55,11 @@ def get_hash(
) -> dict[str, str]:
return {}

def get_hash_length(
self, model: type[models.Model], field: dict[str, models.Model | str | int]
) -> int:
raise NotImplementedError

def get_sorted_set(self, key: str, min: float, max: float) -> list[tuple[int, datetime]]:
return []

Expand All @@ -69,6 +75,14 @@ def push_to_hash(
) -> None:
return None

def push_to_hash_bulk(
self,
model: type[models.Model],
filters: dict[str, models.Model | str | int],
data: dict[str, str],
) -> None:
raise NotImplementedError

def delete_hash(
self,
model: type[models.Model],
Expand Down
18 changes: 18 additions & 0 deletions src/sentry/buffer/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,16 @@ def callback(self, buffer_hook_event: BufferHookEvent) -> bool:
redis_buffer_registry = BufferHookRegistry()


# Note HMSET is not supported after redis 4.0.0, after updating we can use HSET directly.
class RedisOperation(Enum):
SORTED_SET_ADD = "zadd"
SORTED_SET_GET_RANGE = "zrangebyscore"
SORTED_SET_DELETE_RANGE = "zremrangebyscore"
HASH_ADD = "hset"
HASH_ADD_BULK = "hmset"
HASH_GET_ALL = "hgetall"
HASH_DELETE = "hdel"
HASH_LENGTH = "hlen"


class PendingBuffer:
Expand Down Expand Up @@ -296,6 +299,15 @@ def push_to_hash(
key = self._make_key(model, filters)
self._execute_redis_operation(key, RedisOperation.HASH_ADD, field, value)

def push_to_hash_bulk(
self,
model: type[models.Model],
filters: dict[str, models.Model | str | int],
data: dict[str, str],
) -> None:
key = self._make_key(model, filters)
self._execute_redis_operation(key, RedisOperation.HASH_ADD_BULK, data)

def get_hash(
self, model: type[models.Model], field: dict[str, models.Model | str | int]
) -> dict[str, str]:
Expand All @@ -311,6 +323,12 @@ def get_hash(

return decoded_hash

def get_hash_length(
self, model: type[models.Model], field: dict[str, models.Model | str | int]
) -> int:
key = self._make_key(model, field)
return self._execute_redis_operation(key, RedisOperation.HASH_LENGTH)

def process_batch(self) -> None:
try:
redis_buffer_registry.callback(BufferHookEvent.FLUSH)
Expand Down
5 changes: 5 additions & 0 deletions src/sentry/options/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -2648,3 +2648,8 @@
default=1,
flags=FLAG_AUTOMATOR_MODIFIABLE,
)
register(
"delayed_processing.batch_size",
default=10000,
flags=FLAG_AUTOMATOR_MODIFIABLE,
)
101 changes: 73 additions & 28 deletions src/sentry/rules/processing/delayed_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
from collections import defaultdict
from collections.abc import Sequence
from datetime import datetime, timedelta, timezone
from itertools import islice
from typing import Any, DefaultDict, NamedTuple

import sentry_sdk
from django.db.models import OuterRef, Subquery

from sentry import buffer, nodestore
from sentry import buffer, nodestore, options
from sentry.buffer.redis import BufferHookEvent, redis_buffer_registry
from sentry.db import models
from sentry.eventstore.models import Event, GroupEvent
from sentry.issues.issue_occurrence import IssueOccurrence
from sentry.models.group import Group
Expand Down Expand Up @@ -85,8 +87,15 @@ def fetch_project(project_id: int) -> Project | None:
return None


def fetch_rulegroup_to_event_data(project_id: int) -> dict[str, str]:
return buffer.backend.get_hash(model=Project, field={"project_id": project_id})
def fetch_rulegroup_to_event_data(project_id: int, batch_key: str | None = None) -> dict[str, str]:
field: dict[str, models.Model | int | str] = {
"project_id": project_id,
}

if batch_key:
field["batch_key"] = batch_key

return buffer.backend.get_hash(model=Project, field=field)


def get_rules_to_groups(rulegroup_to_event_data: dict[str, str]) -> DefaultDict[int, set[int]]:
Expand Down Expand Up @@ -447,13 +456,17 @@ def fire_rules(
safe_execute(callback, groupevent, futures)


def cleanup_redis_buffer(project_id: int, rules_to_groups: DefaultDict[int, set[int]]) -> None:
def cleanup_redis_buffer(
project_id: int, rules_to_groups: DefaultDict[int, set[int]], batch_key: str | None
) -> None:
hashes_to_delete = [
f"{rule}:{group}" for rule, groups in rules_to_groups.items() for group in groups
]
buffer.backend.delete_hash(
model=Project, filters={"project_id": project_id}, fields=hashes_to_delete
)
filters: dict[str, models.Model | str | int] = {"project_id": project_id}
if batch_key:
filters["batch_key"] = batch_key

buffer.backend.delete_hash(model=Project, filters=filters, fields=hashes_to_delete)


def bucket_num_groups(num_groups: int) -> str:
Expand All @@ -463,6 +476,55 @@ def bucket_num_groups(num_groups: int) -> str:
return "1"


def process_rulegroups_in_batches(project_id: int):
"""
This will check the number of rulegroup_to_event_data items in the Redis buffer for a project.
If the number is larger than the batch size, it will chunk the items and process them in batches.
The batches are replicated into a new redis hash with a unique filter (a uuid) to identify the batch.
We need to use a UUID because these batches can be created in multiple processes and we need to ensure
uniqueness across all of them for the centralized redis buffer. The batches are stored in redis because
we shouldn't pass objects that need to be pickled and 10k items could be problematic in the celery tasks
as arguments could be problematic. Finally, we can't use a pagination system on the data because
redis doesn't maintain the sort order of the hash keys.
`apply_delayed` will fetch the batch from redis and process the rules.
"""
batch_size = options.get("delayed_processing.batch_size")
event_count = buffer.backend.get_hash_length(Project, {"project_id": project_id})

if event_count < batch_size:
return apply_delayed.delay(project_id)

logger.info(
"delayed_processing.process_large_batch",
extra={"project_id": project_id, "count": event_count},
)

# if the dictionary is large, get the items and chunk them.
rulegroup_to_event_data = fetch_rulegroup_to_event_data(project_id)

with metrics.timer("delayed_processing.process_batch.duration"):
items = iter(rulegroup_to_event_data.items())

while batch := dict(islice(items, batch_size)):
batch_key = str(uuid.uuid4())

buffer.backend.push_to_hash_bulk(
model=Project,
filters={"project_id": project_id, "batch_key": batch_key},
data=batch,
)

# remove the batched items from the project rulegroup_to_event_data
buffer.backend.delete_hash(
model=Project, filters={"project_id": project_id}, fields=list(batch.keys())
)

apply_delayed.delay(project_id, batch_key)


def process_delayed_alert_conditions() -> None:
with metrics.timer("delayed_processing.process_all_conditions.duration"):
fetch_time = datetime.now(tz=timezone.utc)
Expand All @@ -473,7 +535,7 @@ def process_delayed_alert_conditions() -> None:
logger.info("delayed_processing.project_id_list", extra={"project_ids": log_str})

for project_id, _ in project_ids:
apply_delayed.delay(project_id)
process_rulegroups_in_batches(project_id)

buffer.backend.delete_key(PROJECT_ID_BUFFER_LIST_KEY, min=0, max=fetch_time.timestamp())

Expand All @@ -487,32 +549,15 @@ def process_delayed_alert_conditions() -> None:
time_limit=60,
silo_mode=SiloMode.REGION,
)
def apply_delayed(project_id: int, *args: Any, **kwargs: Any) -> None:
def apply_delayed(project_id: int, batch_key: str | None = None, *args: Any, **kwargs: Any) -> None:
"""
Grab rules, groups, and events from the Redis buffer, evaluate the "slow" conditions in a bulk snuba query, and fire them if they pass
"""
project = fetch_project(project_id)
if not project:
# Should we remove the project_id from the redis queue?
return

rulegroup_to_event_data = fetch_rulegroup_to_event_data(project_id)
num_groups = len(rulegroup_to_event_data)
num_groups_bucketed = bucket_num_groups(num_groups)
metrics.incr("delayed_processing.num_groups", tags={"num_groups": num_groups_bucketed})

if num_groups >= 10000:
logger.error(
"delayed_processing.too_many_groups",
extra={
"project_id": project_id,
"num_groups": num_groups,
"organization_id": project.organization_id,
},
)
# TODO @saponifi3d - Split the processing from here into smaller groups
return

rulegroup_to_event_data = fetch_rulegroup_to_event_data(project_id, batch_key)
rules_to_groups = get_rules_to_groups(rulegroup_to_event_data)
alert_rules = fetch_alert_rules(list(rules_to_groups.keys()))
condition_groups = get_condition_query_groups(alert_rules, rules_to_groups)
Expand Down Expand Up @@ -542,7 +587,7 @@ def apply_delayed(project_id: int, *args: Any, **kwargs: Any) -> None:
with metrics.timer("delayed_processing.fire_rules.duration"):
fire_rules(rules_to_fire, parsed_rulegroup_to_event_data, alert_rules, project)

cleanup_redis_buffer(project_id, rules_to_groups)
cleanup_redis_buffer(project_id, rules_to_groups, batch_key)


if not redis_buffer_registry.has(BufferHookEvent.FLUSH):
Expand Down
7 changes: 7 additions & 0 deletions tests/sentry/buffer/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from unittest import mock

from django.utils import timezone
from pytest import raises

from sentry.buffer.base import Buffer
from sentry.db import models
Expand Down Expand Up @@ -77,3 +78,9 @@ def test_signal_only(self, create_or_update):
self.buf.process(Group, columns, filters, {"last_seen": the_date}, signal_only=True)
group.refresh_from_db()
assert group.times_seen == prev_times_seen

def test_push_to_hash_bulk(self):
raises(NotImplementedError, self.buf.push_to_hash_bulk, Group, {"id": 1}, {"foo": "bar"})

def test_get_hash_length(self):
raises(NotImplementedError, self.buf.get_hash_length, Group, {"id": 1})
31 changes: 31 additions & 0 deletions tests/sentry/buffer/test_redis.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime
import pickle
from collections import defaultdict
from collections.abc import Mapping
from unittest import mock
from unittest.mock import Mock

Expand Down Expand Up @@ -371,6 +372,36 @@ def test_process_uses_signal_only(self, process):
self.buf.process("foo")
process.assert_called_once_with(mock.Mock, {"times_seen": 1}, {"pk": 1}, {}, True)

@mock.patch("sentry.buffer.redis.RedisBuffer._make_key", mock.Mock(return_value="foo"))
def test_get_hash_length(self):
client = get_cluster_routing_client(self.buf.cluster, self.buf.is_redis_cluster)
data: Mapping[str | bytes, bytes | float | int | str] = {
"f": '{"pk": ["i","1"]}',
"i+times_seen": "1",
"m": "unittest.mock.Mock",
"s": "1",
}

client.hmset("foo", data)
buffer_length = self.buf.get_hash_length("foo", field={"bar": 1})
assert buffer_length == len(data)

@mock.patch("sentry.buffer.redis.RedisBuffer._make_key", mock.Mock(return_value="foo"))
def test_push_to_hash_bulk(self):
def decode_dict(d):
return {k: v.decode("utf-8") if isinstance(v, bytes) else v for k, v in d.items()}

client = get_cluster_routing_client(self.buf.cluster, self.buf.is_redis_cluster)
data = {
"f": '{"pk": ["i","1"]}',
"i+times_seen": "1",
"m": "unittest.mock.Mock",
"s": "1",
}
self.buf.push_to_hash_bulk(model=Project, filters={"project_id": 1}, data=data)
result = _hgetall_decode_keys(client, "foo", self.buf.is_redis_cluster)
assert decode_dict(result) == data


# @mock.patch("sentry.buffer.redis.RedisBuffer._make_key", mock.Mock(return_value="foo"))
# def test_incr_uses_signal_only(self):
Expand Down
Loading

0 comments on commit dc5d329

Please sign in to comment.