Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
saponifi3d committed Jul 31, 2024
1 parent c62baee commit b98edae
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 32 deletions.
34 changes: 19 additions & 15 deletions src/sentry/rules/processing/delayed_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
logger = logging.getLogger("sentry.rules.delayed_processing")
EVENT_LIMIT = 100
COMPARISON_INTERVALS_VALUES = {k: v[1] for k, v in COMPARISON_INTERVALS.items()}
CHUNK_BATCH_SIZE = 10000


class UniqueConditionQuery(NamedTuple):
Expand Down Expand Up @@ -464,14 +465,16 @@ def bucket_num_groups(num_groups: int) -> str:
return "1"


CHUNK_PAGE_SIZE = 10000


def process_rulegroups_in_batches(project_id: int, batch_size=CHUNK_PAGE_SIZE):
def process_rulegroups_in_batches(project_id: int, batch_size=CHUNK_BATCH_SIZE):
rulegroup_to_event_data = fetch_rulegroup_to_event_data(project_id)
event_count = buffer.backend.get_hash_length(Project, {"project_id": project_id})

if event_count < CHUNK_PAGE_SIZE:
return apply_delayed(project_id)
print("REHYDRAAATTE", batch_size, event_count, project_id, rulegroup_to_event_data)
if event_count < batch_size:
print("SINGLE EXECUTE!")
return apply_delayed.delayed(project_id)

print("BATCH EXECUTE!")

# if the dictionary is large, get the items and chunk them.
rulegroup_to_event_data = fetch_rulegroup_to_event_data(project_id)
Expand All @@ -484,14 +487,15 @@ def process_rulegroups_in_batches(project_id: int, batch_size=CHUNK_PAGE_SIZE):
break

batch_key = f"{project_id}:{uuid.uuid4()}"
batch_values = json.dumps(batch)

buffer.backend.push_to_hash(
model=Project,
filters={"project_id": project_id, "batch_key": batch_key},
field=batch_key,
value=batch_values,
)
for field, value in batch.items():
buffer.backend.push_to_hash(
model=Project,
filters={"project_id": batch_key},
field=field,
value=value,
)

apply_delayed.delayed(project_id, batch_key)

# TODO destroy processed redis data
Expand Down Expand Up @@ -533,8 +537,8 @@ def apply_delayed(project_id: int, batch_key: str | None = None, *args: Any, **k
if batch_key is None:
rulegroup_to_event_data = fetch_rulegroup_to_event_data(project_id)
else:
rulegroup_to_event_data = json.loads(
buffer.backend.get_hash(model=Project, field={"batch_key": batch_key})
rulegroup_to_event_data = buffer.backend.get_hash(
model=Project, field={"project_id": project_id, "batch_key": batch_key}
)

rules_to_groups = get_rules_to_groups(rulegroup_to_event_data)
Expand Down
43 changes: 26 additions & 17 deletions tests/sentry/rules/processing/test_delayed_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
get_slow_conditions,
parse_rulegroup_to_event_data,
process_delayed_alert_conditions,
process_rulegroups_in_batches,
)
from sentry.rules.processing.processor import PROJECT_ID_BUFFER_LIST_KEY
from sentry.testutils.cases import PerformanceIssueTestCase, RuleTestCase, TestCase
Expand Down Expand Up @@ -83,6 +84,15 @@ def _callthrough_with_order(*args, **kwargs):

@freeze_time(FROZEN_TIME)
class CreateEventTestCase(TestCase, BaseEventFrequencyPercentTest):
def push_to_hash(self, project_id, rule_id, group_id, event_id=None, occurrence_id=None):
value = json.dumps({"event_id": event_id, "occurrence_id": occurrence_id})
buffer.backend.push_to_hash(
model=Project,
filters={"project_id": project_id},
field=f"{rule_id}:{group_id}",
value=value,
)

def create_event(
self,
project_id: int,
Expand Down Expand Up @@ -643,15 +653,6 @@ def test_parse_rulegroup_invalid_json(self):
class ProcessDelayedAlertConditionsTest(CreateEventTestCase, PerformanceIssueTestCase):
buffer_timestamp = (FROZEN_TIME + timedelta(seconds=1)).timestamp()

def push_to_hash(self, project_id, rule_id, group_id, event_id=None, occurrence_id=None):
value = json.dumps({"event_id": event_id, "occurrence_id": occurrence_id})
buffer.backend.push_to_hash(
model=Project,
filters={"project_id": project_id},
field=f"{rule_id}:{group_id}",
value=value,
)

def assert_buffer_cleared(self, project_id):
rule_group_data = buffer.backend.get_hash(Project, {"project_id": project_id})
assert rule_group_data == {}
Expand Down Expand Up @@ -1325,17 +1326,25 @@ def test_apply_delayed_process_count_then_percent(self, safe_execute_callthrough
apply_delayed(project_id)
self._assert_count_percent_results(safe_execute_callthrough)

def test_process_in_batches(self):
# TODO fill this in
pass

class ProcessRuleGroupsInBatchesTest(CreateEventTestCase):
@patch("sentry.rules.processing.delayed_processing.apply_delayed")
def test_no_redis_data(self, mock_apply_delayed):
mock_delayed = Mock()
mock_apply_delayed.delayed = mock_delayed
process_rulegroups_in_batches(self.project.id)

class ProcessRuleGroupsInBatchesTest(TestCase):
def test_basic(self):
# TODO write tests for processin in batches
pass
mock_delayed.assert_called_once_with(self.project.id)

@patch("sentry.rules.processing.delayed_processing.apply_delayed")
def test_basic(self, mock_apply_delayed):
mock_delayed = Mock()
mock_apply_delayed.delayed = mock_delayed
self.push_to_hash(self.project.id, self.rule.id, self.group.id)

process_rulegroups_in_batches(self.project.id)
mock_delayed.assert_called_once_with(self.project.id)

# todo test for no batches
# todo test for multiple batches
# verify state of redis buffers

Expand Down

0 comments on commit b98edae

Please sign in to comment.