From 79ba7d0e290deab15981a1088d56a0da67c8211a Mon Sep 17 00:00:00 2001 From: Josh Callender <1569818+saponifi3d@users.noreply.github.com> Date: Tue, 30 Jul 2024 10:08:48 -0700 Subject: [PATCH 01/20] WIP --- .../rules/processing/delayed_processing.py | 64 ++++++++++--------- 1 file changed, 35 insertions(+), 29 deletions(-) diff --git a/src/sentry/rules/processing/delayed_processing.py b/src/sentry/rules/processing/delayed_processing.py index 68248e037783fe..47408438852f29 100644 --- a/src/sentry/rules/processing/delayed_processing.py +++ b/src/sentry/rules/processing/delayed_processing.py @@ -4,6 +4,7 @@ from collections import defaultdict from collections.abc import Sequence from datetime import datetime, timedelta, timezone +from itertools import islice from typing import Any, DefaultDict, NamedTuple import sentry_sdk @@ -74,13 +75,13 @@ def __repr__(self): return f"" -def fetch_project(project_id: int) -> Project | None: +def fetch_projects(project_ids: list[int]) -> Sequence[Project] | None: try: - return Project.objects.get_from_cache(id=project_id) + return Project.objects.get_many_from_cache(project_ids) except Project.DoesNotExist: logger.info( - "delayed_processing.project_does_not_exist", - extra={"project_id": project_id}, + "delayed_processing.can_not_fetch_projects", + extra={"project_ids": project_ids}, ) return None @@ -463,6 +464,25 @@ def bucket_num_groups(num_groups: int) -> str: return "1" +CHUNK_PAGE_SIZE = 10000 + + +def process_rulegroups_in_batches(project: Project, batch_size=CHUNK_PAGE_SIZE): + rulegroup_to_event_data = fetch_rulegroup_to_event_data(project.id) + + # For small dictionaries, process all at once + if len(rulegroup_to_event_data) < 10: + return apply_delayed(project, rulegroup_to_event_data) + + # For larger dictionaries, process in batches + items = iter(rulegroup_to_event_data.items()) + while True: + batch = dict(islice(items, batch_size)) + if not batch: + break + apply_delayed.delayed(project, batch) + + def process_delayed_alert_conditions() -> None: with metrics.timer("delayed_processing.process_all_conditions.duration"): fetch_time = datetime.now(tz=timezone.utc) @@ -472,8 +492,13 @@ def process_delayed_alert_conditions() -> None: log_str = ", ".join(f"{project_id}: {timestamp}" for project_id, timestamp in project_ids) logger.info("delayed_processing.project_id_list", extra={"project_ids": log_str}) - for project_id, _ in project_ids: - apply_delayed.delay(project_id) + projects = fetch_projects([project_id for project_id, _ in project_ids]) + if not projects: + logger.info("delayed_processing.no_projects", extra={"project_ids": log_str}) + return None + + for project in projects: + process_rulegroups_in_batches(project) buffer.backend.delete_key(PROJECT_ID_BUFFER_LIST_KEY, min=0, max=fetch_time.timestamp()) @@ -487,32 +512,13 @@ def process_delayed_alert_conditions() -> None: time_limit=60, silo_mode=SiloMode.REGION, ) -def apply_delayed(project_id: int, *args: Any, **kwargs: Any) -> None: +def apply_delayed( + project: Project, rulegroup_to_event_data: dict[str, str], *args: Any, **kwargs: Any +) -> None: """ Grab rules, groups, and events from the Redis buffer, evaluate the "slow" conditions in a bulk snuba query, and fire them if they pass """ - project = fetch_project(project_id) - if not project: - # Should we remove the project_id from the redis queue? - return - - rulegroup_to_event_data = fetch_rulegroup_to_event_data(project_id) - num_groups = len(rulegroup_to_event_data) - num_groups_bucketed = bucket_num_groups(num_groups) - metrics.incr("delayed_processing.num_groups", tags={"num_groups": num_groups_bucketed}) - - if num_groups >= 10000: - logger.error( - "delayed_processing.too_many_groups", - extra={ - "project_id": project_id, - "num_groups": num_groups, - "organization_id": project.organization_id, - }, - ) - # TODO @saponifi3d - Split the processing from here into smaller groups - return - + project_id = project.id rules_to_groups = get_rules_to_groups(rulegroup_to_event_data) alert_rules = fetch_alert_rules(list(rules_to_groups.keys())) condition_groups = get_condition_query_groups(alert_rules, rules_to_groups) From 1ed44ad8e64e36a21b3562b8c486e8bde7d364fd Mon Sep 17 00:00:00 2001 From: Josh Callender <1569818+saponifi3d@users.noreply.github.com> Date: Tue, 30 Jul 2024 12:12:11 -0700 Subject: [PATCH 02/20] Add the ability to batch the apply delayed function. - look at the size of the hash - if under the limit - parse the groups as we were - else (over the limit) - break into batches of 10,000. Store new batches in redis. --- src/sentry/buffer/base.py | 6 ++ src/sentry/buffer/redis.py | 7 ++ .../rules/processing/delayed_processing.py | 72 ++++++++++++------- .../processing/test_delayed_processing.py | 20 +++++- 4 files changed, 75 insertions(+), 30 deletions(-) diff --git a/src/sentry/buffer/base.py b/src/sentry/buffer/base.py index 6182667947c5f2..b9e6320ef6c5e6 100644 --- a/src/sentry/buffer/base.py +++ b/src/sentry/buffer/base.py @@ -34,6 +34,7 @@ class Buffer(Service): "push_to_hash", "get_sorted_set", "get_hash", + "get_hash_length", "delete_hash", "delete_key", ) @@ -54,6 +55,11 @@ def get_hash( ) -> dict[str, str]: return {} + def get_hash_length( + self, model: type[models.Model], field: dict[str, models.Model | str | int] + ) -> int: + return 0 + def get_sorted_set(self, key: str, min: float, max: float) -> list[tuple[int, datetime]]: return [] diff --git a/src/sentry/buffer/redis.py b/src/sentry/buffer/redis.py index 27c00b6d778870..739e0944ecace9 100644 --- a/src/sentry/buffer/redis.py +++ b/src/sentry/buffer/redis.py @@ -88,6 +88,7 @@ class RedisOperation(Enum): HASH_ADD = "hset" HASH_GET_ALL = "hgetall" HASH_DELETE = "hdel" + HASH_LENGTH = "hlen" class PendingBuffer: @@ -311,6 +312,12 @@ def get_hash( return decoded_hash + def get_hash_length( + self, model: type[models.Model], field: dict[str, models.Model | str | int] + ) -> int: + key = self._make_key(model, field) + return self._execute_redis_operation(key, RedisOperation.HASH_LENGTH) + def process_batch(self) -> None: try: redis_buffer_registry.callback(BufferHookEvent.FLUSH) diff --git a/src/sentry/rules/processing/delayed_processing.py b/src/sentry/rules/processing/delayed_processing.py index 47408438852f29..8cdddf0e8c900d 100644 --- a/src/sentry/rules/processing/delayed_processing.py +++ b/src/sentry/rules/processing/delayed_processing.py @@ -75,13 +75,13 @@ def __repr__(self): return f"" -def fetch_projects(project_ids: list[int]) -> Sequence[Project] | None: +def fetch_project(project_id: int) -> Project | None: try: - return Project.objects.get_many_from_cache(project_ids) + return Project.objects.get_from_cache(id=project_id) except Project.DoesNotExist: logger.info( - "delayed_processing.can_not_fetch_projects", - extra={"project_ids": project_ids}, + "delayed_processing.project_does_not_exist", + extra={"project_id": project_id}, ) return None @@ -467,20 +467,34 @@ def bucket_num_groups(num_groups: int) -> str: CHUNK_PAGE_SIZE = 10000 -def process_rulegroups_in_batches(project: Project, batch_size=CHUNK_PAGE_SIZE): - rulegroup_to_event_data = fetch_rulegroup_to_event_data(project.id) +def process_rulegroups_in_batches(project_id: int, batch_size=CHUNK_PAGE_SIZE): + event_count = buffer.backend.get_hash_length(Project, {"project_id": project_id}) - # For small dictionaries, process all at once - if len(rulegroup_to_event_data) < 10: - return apply_delayed(project, rulegroup_to_event_data) + if event_count < CHUNK_PAGE_SIZE: + return apply_delayed(project_id) - # For larger dictionaries, process in batches - items = iter(rulegroup_to_event_data.items()) - while True: - batch = dict(islice(items, batch_size)) - if not batch: - break - apply_delayed.delayed(project, batch) + # if the dictionary is large, get the items and chunk them. + rulegroup_to_event_data = fetch_rulegroup_to_event_data(project_id) + + with metrics.timer("delayed_processing.process_batch.duration"): + items = iter(rulegroup_to_event_data.items()) + while True: + batch = dict(islice(items, batch_size)) + if not batch: + break + + batch_key = f"{project_id}:{uuid.uuid4()}" + batch_values = json.dumps(batch) + + buffer.backend.push_to_hash( + model=Project, + filters={"project_id": project_id, "batch_key": batch_key}, + field=batch_key, + value=batch_values, + ) + apply_delayed.delayed(project_id, batch_key) + + # TODO destroy processed redis data def process_delayed_alert_conditions() -> None: @@ -492,13 +506,8 @@ def process_delayed_alert_conditions() -> None: log_str = ", ".join(f"{project_id}: {timestamp}" for project_id, timestamp in project_ids) logger.info("delayed_processing.project_id_list", extra={"project_ids": log_str}) - projects = fetch_projects([project_id for project_id, _ in project_ids]) - if not projects: - logger.info("delayed_processing.no_projects", extra={"project_ids": log_str}) - return None - - for project in projects: - process_rulegroups_in_batches(project) + for project_id, _ in project_ids: + process_rulegroups_in_batches(project_id) buffer.backend.delete_key(PROJECT_ID_BUFFER_LIST_KEY, min=0, max=fetch_time.timestamp()) @@ -512,13 +521,22 @@ def process_delayed_alert_conditions() -> None: time_limit=60, silo_mode=SiloMode.REGION, ) -def apply_delayed( - project: Project, rulegroup_to_event_data: dict[str, str], *args: Any, **kwargs: Any -) -> None: +def apply_delayed(project_id: int, batch_key: str | None = None, *args: Any, **kwargs: Any) -> None: """ Grab rules, groups, and events from the Redis buffer, evaluate the "slow" conditions in a bulk snuba query, and fire them if they pass """ - project_id = project.id + project = fetch_project(project_id) + + if project is None: + return + + if batch_key is None: + rulegroup_to_event_data = fetch_rulegroup_to_event_data(project_id) + else: + rulegroup_to_event_data = json.loads( + buffer.backend.get_hash(model=Project, field={"batch_key": batch_key}) + ) + rules_to_groups = get_rules_to_groups(rulegroup_to_event_data) alert_rules = fetch_alert_rules(list(rules_to_groups.keys())) condition_groups = get_condition_query_groups(alert_rules, rules_to_groups) diff --git a/tests/sentry/rules/processing/test_delayed_processing.py b/tests/sentry/rules/processing/test_delayed_processing.py index 2330115404f123..a40d98d116ea52 100644 --- a/tests/sentry/rules/processing/test_delayed_processing.py +++ b/tests/sentry/rules/processing/test_delayed_processing.py @@ -750,8 +750,8 @@ def _push_base_events(self) -> None: def tearDown(self): self.mock_redis_buffer.__exit__(None, None, None) - @patch("sentry.rules.processing.delayed_processing.apply_delayed") - def test_fetches_from_buffer_and_executes(self, mock_apply_delayed): + @patch("sentry.rules.processing.delayed_processing.process_rulegroups_in_batches") + def test_fetches_from_buffer_and_executes(self, mock_process_in_batches): self._push_base_events() # To get the correct mapping, we need to return the correct # rulegroup_event mapping based on the project_id input @@ -761,7 +761,7 @@ def test_fetches_from_buffer_and_executes(self, mock_apply_delayed): (self.project, self.rulegroup_event_mapping_one), (self.project_two, self.rulegroup_event_mapping_two), ): - assert mock_apply_delayed.delay.call_count == 2 + assert mock_process_in_batches.call_count == 2 project_ids = buffer.backend.get_sorted_set( PROJECT_ID_BUFFER_LIST_KEY, 0, self.buffer_timestamp @@ -1325,6 +1325,20 @@ def test_apply_delayed_process_count_then_percent(self, safe_execute_callthrough apply_delayed(project_id) self._assert_count_percent_results(safe_execute_callthrough) + def test_process_in_batches(self): + # TODO fill this in + pass + + +class ProcessRuleGroupsInBatchesTest(TestCase): + def test_basic(self): + # TODO write tests for processin in batches + pass + + # todo test for no batches + # todo test for multiple batches + # verify state of redis buffers + class UniqueConditionQueryTest(TestCase): """ From 70cbbb46e817c6ca09c96551aff4b72ca9533103 Mon Sep 17 00:00:00 2001 From: Josh Callender <1569818+saponifi3d@users.noreply.github.com> Date: Wed, 31 Jul 2024 11:33:13 -0700 Subject: [PATCH 03/20] WIP --- .../rules/processing/delayed_processing.py | 34 +++++++------- .../processing/test_delayed_processing.py | 45 ++++++++++++------- 2 files changed, 47 insertions(+), 32 deletions(-) diff --git a/src/sentry/rules/processing/delayed_processing.py b/src/sentry/rules/processing/delayed_processing.py index 8cdddf0e8c900d..7a7c22d3b22b36 100644 --- a/src/sentry/rules/processing/delayed_processing.py +++ b/src/sentry/rules/processing/delayed_processing.py @@ -46,6 +46,7 @@ logger = logging.getLogger("sentry.rules.delayed_processing") EVENT_LIMIT = 100 COMPARISON_INTERVALS_VALUES = {k: v[1] for k, v in COMPARISON_INTERVALS.items()} +CHUNK_BATCH_SIZE = 10000 class UniqueConditionQuery(NamedTuple): @@ -464,14 +465,16 @@ def bucket_num_groups(num_groups: int) -> str: return "1" -CHUNK_PAGE_SIZE = 10000 - - -def process_rulegroups_in_batches(project_id: int, batch_size=CHUNK_PAGE_SIZE): +def process_rulegroups_in_batches(project_id: int, batch_size=CHUNK_BATCH_SIZE): + rulegroup_to_event_data = fetch_rulegroup_to_event_data(project_id) event_count = buffer.backend.get_hash_length(Project, {"project_id": project_id}) - if event_count < CHUNK_PAGE_SIZE: - return apply_delayed(project_id) + print("REHYDRAAATTE", batch_size, event_count, project_id, rulegroup_to_event_data) + if event_count < batch_size: + print("SINGLE EXECUTE!") + return apply_delayed.delayed(project_id) + + print("BATCH EXECUTE!") # if the dictionary is large, get the items and chunk them. rulegroup_to_event_data = fetch_rulegroup_to_event_data(project_id) @@ -484,14 +487,15 @@ def process_rulegroups_in_batches(project_id: int, batch_size=CHUNK_PAGE_SIZE): break batch_key = f"{project_id}:{uuid.uuid4()}" - batch_values = json.dumps(batch) - buffer.backend.push_to_hash( - model=Project, - filters={"project_id": project_id, "batch_key": batch_key}, - field=batch_key, - value=batch_values, - ) + for field, value in batch.items(): + buffer.backend.push_to_hash( + model=Project, + filters={"project_id": batch_key}, + field=field, + value=value, + ) + apply_delayed.delayed(project_id, batch_key) # TODO destroy processed redis data @@ -533,8 +537,8 @@ def apply_delayed(project_id: int, batch_key: str | None = None, *args: Any, **k if batch_key is None: rulegroup_to_event_data = fetch_rulegroup_to_event_data(project_id) else: - rulegroup_to_event_data = json.loads( - buffer.backend.get_hash(model=Project, field={"batch_key": batch_key}) + rulegroup_to_event_data = buffer.backend.get_hash( + model=Project, field={"project_id": project_id, "batch_key": batch_key} ) rules_to_groups = get_rules_to_groups(rulegroup_to_event_data) diff --git a/tests/sentry/rules/processing/test_delayed_processing.py b/tests/sentry/rules/processing/test_delayed_processing.py index a40d98d116ea52..6cb3146a36c95a 100644 --- a/tests/sentry/rules/processing/test_delayed_processing.py +++ b/tests/sentry/rules/processing/test_delayed_processing.py @@ -34,6 +34,7 @@ get_slow_conditions, parse_rulegroup_to_event_data, process_delayed_alert_conditions, + process_rulegroups_in_batches, ) from sentry.rules.processing.processor import PROJECT_ID_BUFFER_LIST_KEY from sentry.testutils.cases import PerformanceIssueTestCase, RuleTestCase, TestCase @@ -83,6 +84,15 @@ def _callthrough_with_order(*args, **kwargs): @freeze_time(FROZEN_TIME) class CreateEventTestCase(TestCase, BaseEventFrequencyPercentTest): + def push_to_hash(self, project_id, rule_id, group_id, event_id=None, occurrence_id=None): + value = json.dumps({"event_id": event_id, "occurrence_id": occurrence_id}) + buffer.backend.push_to_hash( + model=Project, + filters={"project_id": project_id}, + field=f"{rule_id}:{group_id}", + value=value, + ) + def create_event( self, project_id: int, @@ -643,15 +653,6 @@ def test_parse_rulegroup_invalid_json(self): class ProcessDelayedAlertConditionsTest(CreateEventTestCase, PerformanceIssueTestCase): buffer_timestamp = (FROZEN_TIME + timedelta(seconds=1)).timestamp() - def push_to_hash(self, project_id, rule_id, group_id, event_id=None, occurrence_id=None): - value = json.dumps({"event_id": event_id, "occurrence_id": occurrence_id}) - buffer.backend.push_to_hash( - model=Project, - filters={"project_id": project_id}, - field=f"{rule_id}:{group_id}", - value=value, - ) - def assert_buffer_cleared(self, project_id): rule_group_data = buffer.backend.get_hash(Project, {"project_id": project_id}) assert rule_group_data == {} @@ -1325,17 +1326,27 @@ def test_apply_delayed_process_count_then_percent(self, safe_execute_callthrough apply_delayed(project_id) self._assert_count_percent_results(safe_execute_callthrough) - def test_process_in_batches(self): - # TODO fill this in - pass +class ProcessRuleGroupsInBatchesTest(CreateEventTestCase): + @patch("sentry.rules.processing.delayed_processing.apply_delayed") + def test_no_redis_data(self, mock_apply_delayed): + mock_delayed = Mock() + mock_apply_delayed.delayed = mock_delayed + process_rulegroups_in_batches(self.project.id) -class ProcessRuleGroupsInBatchesTest(TestCase): - def test_basic(self): - # TODO write tests for processin in batches - pass + mock_delayed.assert_called_once_with(self.project.id) + + @patch("sentry.rules.processing.delayed_processing.apply_delayed") + def test_basic(self, mock_apply_delayed): + self.rule = self.create_alert_rule() + + mock_delayed = Mock() + mock_apply_delayed.delayed = mock_delayed + self.push_to_hash(self.project.id, self.rule.id, self.group.id) + + process_rulegroups_in_batches(self.project.id) + mock_delayed.assert_called_once_with(self.project.id) - # todo test for no batches # todo test for multiple batches # verify state of redis buffers From 5225441091c2883e31de798bac6068ccb4bbe79b Mon Sep 17 00:00:00 2001 From: Josh Callender <1569818+saponifi3d@users.noreply.github.com> Date: Wed, 31 Jul 2024 11:45:43 -0700 Subject: [PATCH 04/20] move mock redis buffer into the create event test case, and provide push_to_hash there --- src/sentry/rules/processing/delayed_processing.py | 2 ++ .../rules/processing/test_delayed_processing.py | 13 ++++++++----- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/src/sentry/rules/processing/delayed_processing.py b/src/sentry/rules/processing/delayed_processing.py index 7a7c22d3b22b36..e63ebbda8d730b 100644 --- a/src/sentry/rules/processing/delayed_processing.py +++ b/src/sentry/rules/processing/delayed_processing.py @@ -466,7 +466,9 @@ def bucket_num_groups(num_groups: int) -> str: def process_rulegroups_in_batches(project_id: int, batch_size=CHUNK_BATCH_SIZE): + # TODO REMOVE THIS rulegroup_to_event_data = fetch_rulegroup_to_event_data(project_id) + event_count = buffer.backend.get_hash_length(Project, {"project_id": project_id}) print("REHYDRAAATTE", batch_size, event_count, project_id, rulegroup_to_event_data) diff --git a/tests/sentry/rules/processing/test_delayed_processing.py b/tests/sentry/rules/processing/test_delayed_processing.py index 6cb3146a36c95a..37a4cb7ae7f45e 100644 --- a/tests/sentry/rules/processing/test_delayed_processing.py +++ b/tests/sentry/rules/processing/test_delayed_processing.py @@ -84,6 +84,14 @@ def _callthrough_with_order(*args, **kwargs): @freeze_time(FROZEN_TIME) class CreateEventTestCase(TestCase, BaseEventFrequencyPercentTest): + def setUp(self): + super().setUp() + self.mock_redis_buffer = mock_redis_buffer() + self.mock_redis_buffer.__enter__() + + def tearDown(self): + self.mock_redis_buffer.__exit__(None, None, None) + def push_to_hash(self, project_id, rule_id, group_id, event_id=None, occurrence_id=None): value = json.dumps({"event_id": event_id, "occurrence_id": occurrence_id}) buffer.backend.push_to_hash( @@ -659,8 +667,6 @@ def assert_buffer_cleared(self, project_id): def setUp(self): super().setUp() - self.mock_redis_buffer = mock_redis_buffer() - self.mock_redis_buffer.__enter__() self.tag_filter = { "id": "sentry.rules.filters.tagged_event.TaggedEventFilter", @@ -748,9 +754,6 @@ def _push_base_events(self) -> None: self.push_to_hash(self.project_two.id, self.rule3.id, self.group3.id, self.event3.event_id) self.push_to_hash(self.project_two.id, self.rule4.id, self.group4.id, self.event4.event_id) - def tearDown(self): - self.mock_redis_buffer.__exit__(None, None, None) - @patch("sentry.rules.processing.delayed_processing.process_rulegroups_in_batches") def test_fetches_from_buffer_and_executes(self, mock_process_in_batches): self._push_base_events() From 8b20402d902fb452d9389bda35930a42fe5e4b27 Mon Sep 17 00:00:00 2001 From: Josh Callender <1569818+saponifi3d@users.noreply.github.com> Date: Wed, 31 Jul 2024 12:15:22 -0700 Subject: [PATCH 05/20] Cleanup the code, add some test cases around single / batch execution --- .../rules/processing/delayed_processing.py | 19 ++++---- .../processing/test_delayed_processing.py | 43 +++++++++++++++++-- 2 files changed, 47 insertions(+), 15 deletions(-) diff --git a/src/sentry/rules/processing/delayed_processing.py b/src/sentry/rules/processing/delayed_processing.py index e63ebbda8d730b..ad3ee7be90006a 100644 --- a/src/sentry/rules/processing/delayed_processing.py +++ b/src/sentry/rules/processing/delayed_processing.py @@ -466,41 +466,38 @@ def bucket_num_groups(num_groups: int) -> str: def process_rulegroups_in_batches(project_id: int, batch_size=CHUNK_BATCH_SIZE): - # TODO REMOVE THIS - rulegroup_to_event_data = fetch_rulegroup_to_event_data(project_id) - event_count = buffer.backend.get_hash_length(Project, {"project_id": project_id}) - print("REHYDRAAATTE", batch_size, event_count, project_id, rulegroup_to_event_data) if event_count < batch_size: - print("SINGLE EXECUTE!") return apply_delayed.delayed(project_id) - print("BATCH EXECUTE!") - # if the dictionary is large, get the items and chunk them. rulegroup_to_event_data = fetch_rulegroup_to_event_data(project_id) with metrics.timer("delayed_processing.process_batch.duration"): items = iter(rulegroup_to_event_data.items()) + while True: batch = dict(islice(items, batch_size)) if not batch: break - batch_key = f"{project_id}:{uuid.uuid4()}" + batch_key = str(uuid.uuid4()) for field, value in batch.items(): buffer.backend.push_to_hash( model=Project, - filters={"project_id": batch_key}, + filters={"project_id": project_id, "batch_key": batch_key}, field=field, value=value, ) - apply_delayed.delayed(project_id, batch_key) + # remove the batched items from the project rulegroup_to_event_data + buffer.backend.delete_hash( + model=Project, filters={"project_id": project_id}, fields=list(batch.keys()) + ) - # TODO destroy processed redis data + apply_delayed.delayed(project_id, batch_key) def process_delayed_alert_conditions() -> None: diff --git a/tests/sentry/rules/processing/test_delayed_processing.py b/tests/sentry/rules/processing/test_delayed_processing.py index 37a4cb7ae7f45e..a01dd1f8f2a878 100644 --- a/tests/sentry/rules/processing/test_delayed_processing.py +++ b/tests/sentry/rules/processing/test_delayed_processing.py @@ -1331,6 +1331,15 @@ def test_apply_delayed_process_count_then_percent(self, safe_execute_callthrough class ProcessRuleGroupsInBatchesTest(CreateEventTestCase): + def setUp(self): + super().setUp() + + self.project = self.create_project() + self.group = self.create_group(self.project) + self.group_two = self.create_group(self.project) + self.group_three = self.create_group(self.project) + self.rule = self.create_alert_rule() + @patch("sentry.rules.processing.delayed_processing.apply_delayed") def test_no_redis_data(self, mock_apply_delayed): mock_delayed = Mock() @@ -1341,17 +1350,43 @@ def test_no_redis_data(self, mock_apply_delayed): @patch("sentry.rules.processing.delayed_processing.apply_delayed") def test_basic(self, mock_apply_delayed): - self.rule = self.create_alert_rule() - mock_delayed = Mock() mock_apply_delayed.delayed = mock_delayed + self.push_to_hash(self.project.id, self.rule.id, self.group.id) + self.push_to_hash(self.project.id, self.rule.id, self.group_two.id) + self.push_to_hash(self.project.id, self.rule.id, self.group_three.id) process_rulegroups_in_batches(self.project.id) mock_delayed.assert_called_once_with(self.project.id) - # todo test for multiple batches - # verify state of redis buffers + @patch("sentry.rules.processing.delayed_processing.apply_delayed") + def test_batch(self, mock_apply_delayed): + mock_delayed = Mock() + mock_apply_delayed.delayed = mock_delayed + + self.push_to_hash(self.project.id, self.rule.id, self.group.id) + self.push_to_hash(self.project.id, self.rule.id, self.group_two.id) + self.push_to_hash(self.project.id, self.rule.id, self.group_three.id) + + process_rulegroups_in_batches(self.project.id, batch_size=2) + assert mock_delayed.call_count == 2 + + # Validate the batches are created correctly + batch_one_key = mock_delayed.call_args_list[0][0][1] + batch_one = buffer.backend.get_hash( + model=Project, field={"project_id": self.project.id, "batch_key": batch_one_key} + ) + batch_two_key = mock_delayed.call_args_list[1][0][1] + batch_two = buffer.backend.get_hash( + model=Project, field={"project_id": self.project.id, "batch_key": batch_two_key} + ) + + assert len(batch_one) == 2 + assert len(batch_two) == 1 + + # Validate that we've cleared the original data to reduce storage usage + assert not buffer.backend.get_hash(model=Project, field={"project_id": self.project.id}) class UniqueConditionQueryTest(TestCase): From 7aafed8cc14365276e47e2ea606adf5c052e9ea9 Mon Sep 17 00:00:00 2001 From: Josh Callender <1569818+saponifi3d@users.noreply.github.com> Date: Wed, 31 Jul 2024 12:18:10 -0700 Subject: [PATCH 06/20] raise a not implemented error --- src/sentry/buffer/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sentry/buffer/base.py b/src/sentry/buffer/base.py index b9e6320ef6c5e6..d31c645c932ed6 100644 --- a/src/sentry/buffer/base.py +++ b/src/sentry/buffer/base.py @@ -58,7 +58,7 @@ def get_hash( def get_hash_length( self, model: type[models.Model], field: dict[str, models.Model | str | int] ) -> int: - return 0 + raise NotImplementedError def get_sorted_set(self, key: str, min: float, max: float) -> list[tuple[int, datetime]]: return [] From cdf288f8d626ac241fa50cff083478b01175936c Mon Sep 17 00:00:00 2001 From: Josh Callender <1569818+saponifi3d@users.noreply.github.com> Date: Wed, 31 Jul 2024 12:35:15 -0700 Subject: [PATCH 07/20] add a docstring tot he process grops, move logic for fetching teh rulegroup data to that function --- .../rules/processing/delayed_processing.py | 33 ++++++++++++++----- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/src/sentry/rules/processing/delayed_processing.py b/src/sentry/rules/processing/delayed_processing.py index ad3ee7be90006a..e38d955a553d27 100644 --- a/src/sentry/rules/processing/delayed_processing.py +++ b/src/sentry/rules/processing/delayed_processing.py @@ -12,6 +12,7 @@ from sentry import buffer, nodestore from sentry.buffer.redis import BufferHookEvent, redis_buffer_registry +from sentry.db import models from sentry.eventstore.models import Event, GroupEvent from sentry.issues.issue_occurrence import IssueOccurrence from sentry.models.group import Group @@ -87,8 +88,15 @@ def fetch_project(project_id: int) -> Project | None: return None -def fetch_rulegroup_to_event_data(project_id: int) -> dict[str, str]: - return buffer.backend.get_hash(model=Project, field={"project_id": project_id}) +def fetch_rulegroup_to_event_data(project_id: int, batch_key: str | None = None) -> dict[str, str]: + field: dict[str, models.Model | int | str] = { + "project_id": project_id, + } + + if batch_key: + field["batch_key"] = batch_key + + return buffer.backend.get_hash(model=Project, field=field) def get_rules_to_groups(rulegroup_to_event_data: dict[str, str]) -> DefaultDict[int, set[int]]: @@ -466,6 +474,19 @@ def bucket_num_groups(num_groups: int) -> str: def process_rulegroups_in_batches(project_id: int, batch_size=CHUNK_BATCH_SIZE): + """ + This will check the number of rulegroup_to_event_data items in the Redis buffer for a project. + + If the number is larger than the batch size, it will chunk the items and process them in batches. + + The batches are replicated into a new redis hash with a unique filter (a uuid) to identify the batch. + We need to use a UUID because these batches can be created in multiple processes and we need to ensure + uniqueness across all of them for the centralized redis buffer. The batches are stored in redis because + we shouldn't pass complex objects in the celery task arguments, and we can't send a page of data in the + batch becaues redis may not maintain the sort of the hash response. + + In `apply_delayed` will will fetch the batch from redis and process the rules. + """ event_count = buffer.backend.get_hash_length(Project, {"project_id": project_id}) if event_count < batch_size: @@ -533,13 +554,7 @@ def apply_delayed(project_id: int, batch_key: str | None = None, *args: Any, **k if project is None: return - if batch_key is None: - rulegroup_to_event_data = fetch_rulegroup_to_event_data(project_id) - else: - rulegroup_to_event_data = buffer.backend.get_hash( - model=Project, field={"project_id": project_id, "batch_key": batch_key} - ) - + rulegroup_to_event_data = fetch_rulegroup_to_event_data(project_id, batch_key) rules_to_groups = get_rules_to_groups(rulegroup_to_event_data) alert_rules = fetch_alert_rules(list(rules_to_groups.keys())) condition_groups = get_condition_query_groups(alert_rules, rules_to_groups) From d39202ead2c966bd64ed7e81be9e8c53529b5f54 Mon Sep 17 00:00:00 2001 From: Josh Callender <1569818+saponifi3d@users.noreply.github.com> Date: Wed, 31 Jul 2024 12:37:59 -0700 Subject: [PATCH 08/20] i am the walrus --- src/sentry/rules/processing/delayed_processing.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/sentry/rules/processing/delayed_processing.py b/src/sentry/rules/processing/delayed_processing.py index e38d955a553d27..c6a0d7384fd1ca 100644 --- a/src/sentry/rules/processing/delayed_processing.py +++ b/src/sentry/rules/processing/delayed_processing.py @@ -498,11 +498,7 @@ def process_rulegroups_in_batches(project_id: int, batch_size=CHUNK_BATCH_SIZE): with metrics.timer("delayed_processing.process_batch.duration"): items = iter(rulegroup_to_event_data.items()) - while True: - batch = dict(islice(items, batch_size)) - if not batch: - break - + while batch := dict(islice(items, batch_size)): batch_key = str(uuid.uuid4()) for field, value in batch.items(): From 9af164b824bf9359be639e57d48a3b77ae38b981 Mon Sep 17 00:00:00 2001 From: Josh Callender <1569818+saponifi3d@users.noreply.github.com> Date: Wed, 31 Jul 2024 12:40:26 -0700 Subject: [PATCH 09/20] fix comment --- src/sentry/rules/processing/delayed_processing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sentry/rules/processing/delayed_processing.py b/src/sentry/rules/processing/delayed_processing.py index c6a0d7384fd1ca..3104994d357c6e 100644 --- a/src/sentry/rules/processing/delayed_processing.py +++ b/src/sentry/rules/processing/delayed_processing.py @@ -485,7 +485,7 @@ def process_rulegroups_in_batches(project_id: int, batch_size=CHUNK_BATCH_SIZE): we shouldn't pass complex objects in the celery task arguments, and we can't send a page of data in the batch becaues redis may not maintain the sort of the hash response. - In `apply_delayed` will will fetch the batch from redis and process the rules. + `apply_delayed` will fetch the batch from redis and process the rules. """ event_count = buffer.backend.get_hash_length(Project, {"project_id": project_id}) From c94c18ea594065bb79e486ad07910270f065ec79 Mon Sep 17 00:00:00 2001 From: Josh Callender <1569818+saponifi3d@users.noreply.github.com> Date: Wed, 31 Jul 2024 12:50:51 -0700 Subject: [PATCH 10/20] -1 line changed --- src/sentry/rules/processing/delayed_processing.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/sentry/rules/processing/delayed_processing.py b/src/sentry/rules/processing/delayed_processing.py index 3104994d357c6e..8c7a4b42af222b 100644 --- a/src/sentry/rules/processing/delayed_processing.py +++ b/src/sentry/rules/processing/delayed_processing.py @@ -492,6 +492,11 @@ def process_rulegroups_in_batches(project_id: int, batch_size=CHUNK_BATCH_SIZE): if event_count < batch_size: return apply_delayed.delayed(project_id) + logger.info( + "delayed_processing.process_large_batch", + extra={"project_id": project_id, "count": event_count}, + ) + # if the dictionary is large, get the items and chunk them. rulegroup_to_event_data = fetch_rulegroup_to_event_data(project_id) @@ -547,7 +552,7 @@ def apply_delayed(project_id: int, batch_key: str | None = None, *args: Any, **k """ project = fetch_project(project_id) - if project is None: + if not project: return rulegroup_to_event_data = fetch_rulegroup_to_event_data(project_id, batch_key) From 41fbcb7b35b60f4a657bfbcafd824119eff97a2b Mon Sep 17 00:00:00 2001 From: Josh Callender <1569818+saponifi3d@users.noreply.github.com> Date: Wed, 31 Jul 2024 13:09:00 -0700 Subject: [PATCH 11/20] add hmset and use bulk updating to redis --- src/sentry/buffer/base.py | 8 ++++++++ src/sentry/buffer/redis.py | 10 ++++++++++ src/sentry/rules/processing/delayed_processing.py | 12 +++++------- 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/src/sentry/buffer/base.py b/src/sentry/buffer/base.py index d31c645c932ed6..f2ef35ad7440e5 100644 --- a/src/sentry/buffer/base.py +++ b/src/sentry/buffer/base.py @@ -75,6 +75,14 @@ def push_to_hash( ) -> None: return None + def push_to_hash_bulk( + self, + model: type[models.Model], + filters: dict[str, models.Model | str | int], + data: dict[str, str], + ) -> None: + raise NotImplementedError + def delete_hash( self, model: type[models.Model], diff --git a/src/sentry/buffer/redis.py b/src/sentry/buffer/redis.py index 739e0944ecace9..486a4864a97c7d 100644 --- a/src/sentry/buffer/redis.py +++ b/src/sentry/buffer/redis.py @@ -86,6 +86,7 @@ class RedisOperation(Enum): SORTED_SET_GET_RANGE = "zrangebyscore" SORTED_SET_DELETE_RANGE = "zremrangebyscore" HASH_ADD = "hset" + HASH_ADD_BULK = "hmset" HASH_GET_ALL = "hgetall" HASH_DELETE = "hdel" HASH_LENGTH = "hlen" @@ -297,6 +298,15 @@ def push_to_hash( key = self._make_key(model, filters) self._execute_redis_operation(key, RedisOperation.HASH_ADD, field, value) + def push_to_hash_bulk( + self, + model: type[models.Model], + filters: dict[str, models.Model | str | int], + data: dict[str, str], + ) -> None: + key = self._make_key(model, filters) + self._execute_redis_operation(key, RedisOperation.HASH_ADD_BULK, data) + def get_hash( self, model: type[models.Model], field: dict[str, models.Model | str | int] ) -> dict[str, str]: diff --git a/src/sentry/rules/processing/delayed_processing.py b/src/sentry/rules/processing/delayed_processing.py index 8c7a4b42af222b..199990d69c1591 100644 --- a/src/sentry/rules/processing/delayed_processing.py +++ b/src/sentry/rules/processing/delayed_processing.py @@ -506,13 +506,11 @@ def process_rulegroups_in_batches(project_id: int, batch_size=CHUNK_BATCH_SIZE): while batch := dict(islice(items, batch_size)): batch_key = str(uuid.uuid4()) - for field, value in batch.items(): - buffer.backend.push_to_hash( - model=Project, - filters={"project_id": project_id, "batch_key": batch_key}, - field=field, - value=value, - ) + buffer.backend.push_to_hash_bulk( + model=Project, + filters={"project_id": project_id, "batch_key": batch_key}, + data=batch, + ) # remove the batched items from the project rulegroup_to_event_data buffer.backend.delete_hash( From c1a7982cb03fa151b07219053c826cc9fc0c0f61 Mon Sep 17 00:00:00 2001 From: Josh Callender <1569818+saponifi3d@users.noreply.github.com> Date: Wed, 31 Jul 2024 13:25:08 -0700 Subject: [PATCH 12/20] add tests for the all the new buffer code --- tests/sentry/buffer/test_base.py | 7 +++++++ tests/sentry/buffer/test_redis.py | 31 +++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/tests/sentry/buffer/test_base.py b/tests/sentry/buffer/test_base.py index 5f216d8bc2a5a7..c0cd0d241ab9b0 100644 --- a/tests/sentry/buffer/test_base.py +++ b/tests/sentry/buffer/test_base.py @@ -2,6 +2,7 @@ from unittest import mock from django.utils import timezone +from pytest import raises from sentry.buffer.base import Buffer from sentry.db import models @@ -77,3 +78,9 @@ def test_signal_only(self, create_or_update): self.buf.process(Group, columns, filters, {"last_seen": the_date}, signal_only=True) group.refresh_from_db() assert group.times_seen == prev_times_seen + + def test_push_to_hash_bulk(self): + raises(NotImplementedError, self.buf.push_to_hash_bulk, Group, {"id": 1}, {"foo": "bar"}) + + def test_get_hash_length(self): + raises(NotImplementedError, self.buf.get_hash_length, Group, {"id": 1}) diff --git a/tests/sentry/buffer/test_redis.py b/tests/sentry/buffer/test_redis.py index d2da538de87e94..c725ec4e78e16e 100644 --- a/tests/sentry/buffer/test_redis.py +++ b/tests/sentry/buffer/test_redis.py @@ -1,6 +1,7 @@ import datetime import pickle from collections import defaultdict +from collections.abc import Mapping from unittest import mock from unittest.mock import Mock @@ -367,6 +368,36 @@ def test_process_uses_signal_only(self, process): self.buf.process("foo") process.assert_called_once_with(mock.Mock, {"times_seen": 1}, {"pk": 1}, {}, True) + @mock.patch("sentry.buffer.redis.RedisBuffer._make_key", mock.Mock(return_value="foo")) + def test_get_hash_length(self): + client = get_cluster_routing_client(self.buf.cluster, self.buf.is_redis_cluster) + data: Mapping[str | bytes, bytes | float | int | str] = { + "f": '{"pk": ["i","1"]}', + "i+times_seen": "1", + "m": "unittest.mock.Mock", + "s": "1", + } + + client.hmset("foo", data) + buffer_length = self.buf.get_hash_length("foo", field={"bar": 1}) + assert buffer_length == len(data) + + @mock.patch("sentry.buffer.redis.RedisBuffer._make_key", mock.Mock(return_value="foo")) + def test_push_to_hash_bulk(self): + def decode_dict(d): + return {k: v.decode("utf-8") if isinstance(v, bytes) else v for k, v in d.items()} + + client = get_cluster_routing_client(self.buf.cluster, self.buf.is_redis_cluster) + data = { + "f": '{"pk": ["i","1"]}', + "i+times_seen": "1", + "m": "unittest.mock.Mock", + "s": "1", + } + self.buf.push_to_hash_bulk(model=Project, filters={"project_id": 1}, data=data) + result = _hgetall_decode_keys(client, "foo", self.buf.is_redis_cluster) + assert decode_dict(result) == data + # @mock.patch("sentry.buffer.redis.RedisBuffer._make_key", mock.Mock(return_value="foo")) # def test_incr_uses_signal_only(self): From ba6b28d5cd04340f963f87176c72881a7293e310 Mon Sep 17 00:00:00 2001 From: Josh Callender <1569818+saponifi3d@users.noreply.github.com> Date: Wed, 31 Jul 2024 13:26:28 -0700 Subject: [PATCH 13/20] reduce diff --- src/sentry/rules/processing/delayed_processing.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/sentry/rules/processing/delayed_processing.py b/src/sentry/rules/processing/delayed_processing.py index 199990d69c1591..48b4b6cccd4628 100644 --- a/src/sentry/rules/processing/delayed_processing.py +++ b/src/sentry/rules/processing/delayed_processing.py @@ -549,7 +549,6 @@ def apply_delayed(project_id: int, batch_key: str | None = None, *args: Any, **k Grab rules, groups, and events from the Redis buffer, evaluate the "slow" conditions in a bulk snuba query, and fire them if they pass """ project = fetch_project(project_id) - if not project: return From b767878cf5ca0c5b004f7897728eeea18379e3e5 Mon Sep 17 00:00:00 2001 From: Josh Callender <1569818+saponifi3d@users.noreply.github.com> Date: Wed, 31 Jul 2024 14:44:29 -0700 Subject: [PATCH 14/20] use options automator to store the batch size --- src/sentry/options/defaults.py | 4 ++++ src/sentry/rules/processing/delayed_processing.py | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/sentry/options/defaults.py b/src/sentry/options/defaults.py index cb92d5b4281b6b..10fea5fcac3f66 100644 --- a/src/sentry/options/defaults.py +++ b/src/sentry/options/defaults.py @@ -2636,3 +2636,7 @@ default=1, flags=FLAG_AUTOMATOR_MODIFIABLE, ) +register( + "delayed_processing.batch_size", + default=10000, +) diff --git a/src/sentry/rules/processing/delayed_processing.py b/src/sentry/rules/processing/delayed_processing.py index 48b4b6cccd4628..bb4a1fbb66a6cc 100644 --- a/src/sentry/rules/processing/delayed_processing.py +++ b/src/sentry/rules/processing/delayed_processing.py @@ -10,7 +10,7 @@ import sentry_sdk from django.db.models import OuterRef, Subquery -from sentry import buffer, nodestore +from sentry import buffer, nodestore, options from sentry.buffer.redis import BufferHookEvent, redis_buffer_registry from sentry.db import models from sentry.eventstore.models import Event, GroupEvent @@ -47,7 +47,7 @@ logger = logging.getLogger("sentry.rules.delayed_processing") EVENT_LIMIT = 100 COMPARISON_INTERVALS_VALUES = {k: v[1] for k, v in COMPARISON_INTERVALS.items()} -CHUNK_BATCH_SIZE = 10000 +CHUNK_BATCH_SIZE = options.get("delayed_processing.batch_size") class UniqueConditionQuery(NamedTuple): From 267d71d3ed962078e75676ab53a14844d6e0e9fe Mon Sep 17 00:00:00 2001 From: Josh Callender <1569818+saponifi3d@users.noreply.github.com> Date: Wed, 31 Jul 2024 14:50:41 -0700 Subject: [PATCH 15/20] move from a module vairable to getting it from the options automator. using the test helper instead --- src/sentry/rules/processing/delayed_processing.py | 4 ++-- tests/sentry/rules/processing/test_delayed_processing.py | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/sentry/rules/processing/delayed_processing.py b/src/sentry/rules/processing/delayed_processing.py index bb4a1fbb66a6cc..52b85e24298ad1 100644 --- a/src/sentry/rules/processing/delayed_processing.py +++ b/src/sentry/rules/processing/delayed_processing.py @@ -47,7 +47,6 @@ logger = logging.getLogger("sentry.rules.delayed_processing") EVENT_LIMIT = 100 COMPARISON_INTERVALS_VALUES = {k: v[1] for k, v in COMPARISON_INTERVALS.items()} -CHUNK_BATCH_SIZE = options.get("delayed_processing.batch_size") class UniqueConditionQuery(NamedTuple): @@ -473,7 +472,7 @@ def bucket_num_groups(num_groups: int) -> str: return "1" -def process_rulegroups_in_batches(project_id: int, batch_size=CHUNK_BATCH_SIZE): +def process_rulegroups_in_batches(project_id: int): """ This will check the number of rulegroup_to_event_data items in the Redis buffer for a project. @@ -487,6 +486,7 @@ def process_rulegroups_in_batches(project_id: int, batch_size=CHUNK_BATCH_SIZE): `apply_delayed` will fetch the batch from redis and process the rules. """ + batch_size = options.get("delayed_processing.batch_size") event_count = buffer.backend.get_hash_length(Project, {"project_id": project_id}) if event_count < batch_size: diff --git a/tests/sentry/rules/processing/test_delayed_processing.py b/tests/sentry/rules/processing/test_delayed_processing.py index a01dd1f8f2a878..36c743ebca7626 100644 --- a/tests/sentry/rules/processing/test_delayed_processing.py +++ b/tests/sentry/rules/processing/test_delayed_processing.py @@ -40,6 +40,7 @@ from sentry.testutils.cases import PerformanceIssueTestCase, RuleTestCase, TestCase from sentry.testutils.factories import EventType from sentry.testutils.helpers.datetime import before_now, freeze_time, iso_format +from sentry.testutils.helpers.options import override_options from sentry.testutils.helpers.redis import mock_redis_buffer from sentry.utils import json from sentry.utils.safe import safe_execute @@ -1360,6 +1361,7 @@ def test_basic(self, mock_apply_delayed): process_rulegroups_in_batches(self.project.id) mock_delayed.assert_called_once_with(self.project.id) + @override_options({"delayed_processing.batch_size": 2}) @patch("sentry.rules.processing.delayed_processing.apply_delayed") def test_batch(self, mock_apply_delayed): mock_delayed = Mock() @@ -1369,7 +1371,7 @@ def test_batch(self, mock_apply_delayed): self.push_to_hash(self.project.id, self.rule.id, self.group_two.id) self.push_to_hash(self.project.id, self.rule.id, self.group_three.id) - process_rulegroups_in_batches(self.project.id, batch_size=2) + process_rulegroups_in_batches(self.project.id) assert mock_delayed.call_count == 2 # Validate the batches are created correctly From c29c5e30d544a910d6e50b0146ba3cb8517a7273 Mon Sep 17 00:00:00 2001 From: Josh Callender <1569818+saponifi3d@users.noreply.github.com> Date: Wed, 31 Jul 2024 14:53:44 -0700 Subject: [PATCH 16/20] make the comment _better_ --- src/sentry/rules/processing/delayed_processing.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/sentry/rules/processing/delayed_processing.py b/src/sentry/rules/processing/delayed_processing.py index 52b85e24298ad1..9849b12f09efb5 100644 --- a/src/sentry/rules/processing/delayed_processing.py +++ b/src/sentry/rules/processing/delayed_processing.py @@ -481,8 +481,9 @@ def process_rulegroups_in_batches(project_id: int): The batches are replicated into a new redis hash with a unique filter (a uuid) to identify the batch. We need to use a UUID because these batches can be created in multiple processes and we need to ensure uniqueness across all of them for the centralized redis buffer. The batches are stored in redis because - we shouldn't pass complex objects in the celery task arguments, and we can't send a page of data in the - batch becaues redis may not maintain the sort of the hash response. + we shouldn't pass objects that need to be pickled and 10k items could be problematic in the celery tasks + as arguments could be problematic. Finally, we can't use a pagination system on the data because + redis doesn't maintain the sort order of the hash keys. `apply_delayed` will fetch the batch from redis and process the rules. """ From babca173303fc3746e48818cd7607a6ece24f15d Mon Sep 17 00:00:00 2001 From: Josh Callender <1569818+saponifi3d@users.noreply.github.com> Date: Wed, 31 Jul 2024 14:59:07 -0700 Subject: [PATCH 17/20] make it modifiable --- src/sentry/options/defaults.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/sentry/options/defaults.py b/src/sentry/options/defaults.py index 10fea5fcac3f66..d0a5b5a4af8748 100644 --- a/src/sentry/options/defaults.py +++ b/src/sentry/options/defaults.py @@ -2639,4 +2639,5 @@ register( "delayed_processing.batch_size", default=10000, + flags=FLAG_AUTOMATOR_MODIFIABLE, ) From 1cd92fed88b48925176cb6abd847e0f686096e6a Mon Sep 17 00:00:00 2001 From: Josh Callender <1569818+saponifi3d@users.noreply.github.com> Date: Wed, 31 Jul 2024 15:39:31 -0700 Subject: [PATCH 18/20] update cleanup_redis_buffers to remove batched keys as well --- .../rules/processing/delayed_processing.py | 14 +++-- .../processing/test_delayed_processing.py | 60 +++++++++++++++++++ 2 files changed, 69 insertions(+), 5 deletions(-) diff --git a/src/sentry/rules/processing/delayed_processing.py b/src/sentry/rules/processing/delayed_processing.py index 9849b12f09efb5..41f69e75572144 100644 --- a/src/sentry/rules/processing/delayed_processing.py +++ b/src/sentry/rules/processing/delayed_processing.py @@ -456,13 +456,17 @@ def fire_rules( safe_execute(callback, groupevent, futures) -def cleanup_redis_buffer(project_id: int, rules_to_groups: DefaultDict[int, set[int]]) -> None: +def cleanup_redis_buffer( + project_id: int, rules_to_groups: DefaultDict[int, set[int]], batch_key: str | None +) -> None: hashes_to_delete = [ f"{rule}:{group}" for rule, groups in rules_to_groups.items() for group in groups ] - buffer.backend.delete_hash( - model=Project, filters={"project_id": project_id}, fields=hashes_to_delete - ) + filters: dict[str, models.Model | str | int] = {"project_id": project_id} + if batch_key: + filters["batch_key"] = batch_key + + buffer.backend.delete_hash(model=Project, filters=filters, fields=hashes_to_delete) def bucket_num_groups(num_groups: int) -> str: @@ -583,7 +587,7 @@ def apply_delayed(project_id: int, batch_key: str | None = None, *args: Any, **k with metrics.timer("delayed_processing.fire_rules.duration"): fire_rules(rules_to_fire, parsed_rulegroup_to_event_data, alert_rules, project) - cleanup_redis_buffer(project_id, rules_to_groups) + cleanup_redis_buffer(project_id, rules_to_groups, batch_key) if not redis_buffer_registry.has(BufferHookEvent.FLUSH): diff --git a/tests/sentry/rules/processing/test_delayed_processing.py b/tests/sentry/rules/processing/test_delayed_processing.py index 36c743ebca7626..3a72ec8b5fea74 100644 --- a/tests/sentry/rules/processing/test_delayed_processing.py +++ b/tests/sentry/rules/processing/test_delayed_processing.py @@ -25,6 +25,7 @@ apply_delayed, bucket_num_groups, bulk_fetch_events, + cleanup_redis_buffer, generate_unique_queries, get_condition_group_results, get_condition_query_groups, @@ -1417,3 +1418,62 @@ def test_repr(self): repr(condition) == "" ) + + +class CleanupRedisBufferTest(CreateEventTestCase): + def setUp(self): + super().setUp() + + self.project = self.create_project() + self.group = self.create_group(self.project) + self.rule = self.create_alert_rule() + + def test_cleanup_redis(self): + self.push_to_hash(self.project.id, self.rule.id, self.group.id) + rules_to_groups: defaultdict[int, set[int]] = defaultdict(set) + rules_to_groups[self.rule.id].add(self.group.id) + + cleanup_redis_buffer(self.project.id, rules_to_groups, None) + rule_group_data = buffer.backend.get_hash(Project, {"project_id": self.project.id}) + assert rule_group_data == {} + + @override_options({"delayed_processing.batch_size": 2}) + @patch("sentry.rules.processing.delayed_processing.apply_delayed") + def test_batched_cleanup(self, mock_apply_delayed): + mock_delayed = Mock() + mock_apply_delayed.delayed = mock_delayed + group_two = self.create_group(self.project) + group_three = self.create_group(self.project) + + self.push_to_hash(self.project.id, self.rule.id, self.group.id) + self.push_to_hash(self.project.id, self.rule.id, group_two.id) + self.push_to_hash(self.project.id, self.rule.id, group_three.id) + + rules_to_groups: defaultdict[int, set[int]] = defaultdict(set) + rules_to_groups[self.rule.id].add(self.group.id) + rules_to_groups[self.rule.id].add(group_two.id) + rules_to_groups[self.rule.id].add(group_three.id) + + process_rulegroups_in_batches(self.project.id) + batch_one_key = mock_delayed.call_args_list[0][0][1] + batch_two_key = mock_delayed.call_args_list[1][0][1] + + # Verify process_rulegroups_in_batches removed the data from the buffer + rule_group_data = buffer.backend.get_hash(Project, {"project_id": self.project.id}) + assert rule_group_data == {} + + cleanup_redis_buffer(self.project.id, rules_to_groups, batch_one_key) + + # Verify the batch we "executed" is removed + rule_group_data = buffer.backend.get_hash( + Project, {"project_id": self.project.id, "batch_key": batch_one_key} + ) + assert rule_group_data == {} + + # Verify the batch we didn't execute is still in redis + rule_group_data = buffer.backend.get_hash( + Project, {"project_id": self.project.id, "batch_key": batch_two_key} + ) + assert rule_group_data == { + f"{self.rule.id}:{group_three.id}": '{"event_id":null,"occurrence_id":null}', + } From 834445dcbd0eedb18c769afb64bf4eec7181d8c3 Mon Sep 17 00:00:00 2001 From: Josh Callender <1569818+saponifi3d@users.noreply.github.com> Date: Wed, 31 Jul 2024 15:50:40 -0700 Subject: [PATCH 19/20] add a note about hmset v hset --- src/sentry/buffer/redis.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/sentry/buffer/redis.py b/src/sentry/buffer/redis.py index 486a4864a97c7d..bce5263f42789d 100644 --- a/src/sentry/buffer/redis.py +++ b/src/sentry/buffer/redis.py @@ -81,6 +81,7 @@ def callback(self, buffer_hook_event: BufferHookEvent) -> bool: redis_buffer_registry = BufferHookRegistry() +# Note HMSET is not supported after redis 4.0.0, after updating we can use HSET directly. class RedisOperation(Enum): SORTED_SET_ADD = "zadd" SORTED_SET_GET_RANGE = "zrangebyscore" From 4bac96988fe5ea60c24f74269cc9c047ff2ccaa3 Mon Sep 17 00:00:00 2001 From: Josh Callender <1569818+saponifi3d@users.noreply.github.com> Date: Wed, 31 Jul 2024 16:24:30 -0700 Subject: [PATCH 20/20] remove all these extra mocks --- .../processing/test_delayed_processing.py | 20 +++++-------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/tests/sentry/rules/processing/test_delayed_processing.py b/tests/sentry/rules/processing/test_delayed_processing.py index 3a72ec8b5fea74..c049ecf6040e5b 100644 --- a/tests/sentry/rules/processing/test_delayed_processing.py +++ b/tests/sentry/rules/processing/test_delayed_processing.py @@ -1344,30 +1344,22 @@ def setUp(self): @patch("sentry.rules.processing.delayed_processing.apply_delayed") def test_no_redis_data(self, mock_apply_delayed): - mock_delayed = Mock() - mock_apply_delayed.delayed = mock_delayed process_rulegroups_in_batches(self.project.id) - - mock_delayed.assert_called_once_with(self.project.id) + mock_apply_delayed.delayed.assert_called_once_with(self.project.id) @patch("sentry.rules.processing.delayed_processing.apply_delayed") def test_basic(self, mock_apply_delayed): - mock_delayed = Mock() - mock_apply_delayed.delayed = mock_delayed - self.push_to_hash(self.project.id, self.rule.id, self.group.id) self.push_to_hash(self.project.id, self.rule.id, self.group_two.id) self.push_to_hash(self.project.id, self.rule.id, self.group_three.id) process_rulegroups_in_batches(self.project.id) - mock_delayed.assert_called_once_with(self.project.id) + mock_apply_delayed.delayed.assert_called_once_with(self.project.id) @override_options({"delayed_processing.batch_size": 2}) @patch("sentry.rules.processing.delayed_processing.apply_delayed") def test_batch(self, mock_apply_delayed): - mock_delayed = Mock() - mock_apply_delayed.delayed = mock_delayed - + mock_delayed = mock_apply_delayed.delayed self.push_to_hash(self.project.id, self.rule.id, self.group.id) self.push_to_hash(self.project.id, self.rule.id, self.group_two.id) self.push_to_hash(self.project.id, self.rule.id, self.group_three.id) @@ -1440,8 +1432,6 @@ def test_cleanup_redis(self): @override_options({"delayed_processing.batch_size": 2}) @patch("sentry.rules.processing.delayed_processing.apply_delayed") def test_batched_cleanup(self, mock_apply_delayed): - mock_delayed = Mock() - mock_apply_delayed.delayed = mock_delayed group_two = self.create_group(self.project) group_three = self.create_group(self.project) @@ -1455,8 +1445,8 @@ def test_batched_cleanup(self, mock_apply_delayed): rules_to_groups[self.rule.id].add(group_three.id) process_rulegroups_in_batches(self.project.id) - batch_one_key = mock_delayed.call_args_list[0][0][1] - batch_two_key = mock_delayed.call_args_list[1][0][1] + batch_one_key = mock_apply_delayed.delayed.call_args_list[0][0][1] + batch_two_key = mock_apply_delayed.delayed.call_args_list[1][0][1] # Verify process_rulegroups_in_batches removed the data from the buffer rule_group_data = buffer.backend.get_hash(Project, {"project_id": self.project.id})