From 0e08ee9cb074f5a9151f2d2744198f56bc5f07f0 Mon Sep 17 00:00:00 2001 From: Nisanthan Nanthakumar Date: Wed, 13 Sep 2023 08:23:55 -0700 Subject: [PATCH 1/7] ref(escalating-issues): Handle hot shards in auto-transition tasks --- src/sentry/tasks/auto_ongoing_issues.py | 96 +++++++------------------ 1 file changed, 25 insertions(+), 71 deletions(-) diff --git a/src/sentry/tasks/auto_ongoing_issues.py b/src/sentry/tasks/auto_ongoing_issues.py index 0f9a5be1d387ab..c106d0358d3e2f 100644 --- a/src/sentry/tasks/auto_ongoing_issues.py +++ b/src/sentry/tasks/auto_ongoing_issues.py @@ -7,18 +7,9 @@ from django.db.models import Max from sentry_sdk.crons.decorator import monitor -from sentry import features from sentry.conf.server import CELERY_ISSUE_STATES_QUEUE -from sentry.constants import ObjectStatus from sentry.issues.ongoing import bulk_transition_group_to_ongoing -from sentry.models import ( - Group, - GroupHistoryStatus, - GroupStatus, - Organization, - OrganizationStatus, - Project, -) +from sentry.models import Group, GroupHistoryStatus, GroupStatus from sentry.monitoring.queues import backend from sentry.silo import SiloMode from sentry.tasks.base import instrumented_task, retry @@ -56,18 +47,6 @@ def wrapped(*args, **kwargs): return inner(func) -def get_daily_10min_bucket(now: datetime): - """ - If we split a day into 10min buckets, this function - returns the bucket that the given datetime is in. - """ - bucket = now.hour * 6 + now.minute / 10 - if bucket == 0: - bucket = 144 - - return bucket - - @instrumented_task( name="sentry.tasks.schedule_auto_transition_to_ongoing", queue="auto_transition_issue_states", @@ -80,44 +59,28 @@ def get_daily_10min_bucket(now: datetime): @monitor(monitor_slug="schedule_auto_transition_to_ongoing") @log_error_if_queue_has_items def schedule_auto_transition_to_ongoing() -> None: - """ - This func will be instantiated by a cron every 10min. - We create 144 buckets, which comes from the 10min intervals in 24hrs. - We distribute all the orgs evenly in 144 buckets. For a given cron-tick's - 10min interval, we fetch the orgs from that bucket and transition eligible Groups to ongoing - """ now = datetime.now(tz=timezone.utc) - bucket = get_daily_10min_bucket(now) - seven_days_ago = now - timedelta(days=TRANSITION_AFTER_DAYS) - for org in RangeQuerySetWrapper(Organization.objects.filter(status=OrganizationStatus.ACTIVE)): - if features.has("organizations:escalating-issues", org) and org.id % 144 == bucket: - project_ids = list( - Project.objects.filter( - organization_id=org.id, status=ObjectStatus.ACTIVE - ).values_list("id", flat=True) - ) - - auto_transition_issues_new_to_ongoing.delay( - project_ids=project_ids, - first_seen_lte=int(seven_days_ago.timestamp()), - organization_id=org.id, - expires=now + timedelta(hours=1), - ) + auto_transition_issues_new_to_ongoing.delay( + project_ids=[], # TODO remove arg in next PR + first_seen_lte=int(seven_days_ago.timestamp()), + organization_id=None, # TODO remove arg in next PR + expires=now + timedelta(hours=1), + ) - auto_transition_issues_regressed_to_ongoing.delay( - project_ids=project_ids, - date_added_lte=int(seven_days_ago.timestamp()), - expires=now + timedelta(hours=1), - ) + auto_transition_issues_regressed_to_ongoing.delay( + project_ids=[], # TODO(nisanthan): Remove this arg in next PR + date_added_lte=int(seven_days_ago.timestamp()), + expires=now + timedelta(hours=1), + ) - auto_transition_issues_escalating_to_ongoing.delay( - project_ids=project_ids, - date_added_lte=int(seven_days_ago.timestamp()), - expires=now + timedelta(hours=1), - ) + auto_transition_issues_escalating_to_ongoing.delay( + project_ids=[], # TODO(nisanthan): Remove this arg in next PR + date_added_lte=int(seven_days_ago.timestamp()), + expires=now + timedelta(hours=1), + ) @instrumented_task( @@ -133,9 +96,9 @@ def schedule_auto_transition_to_ongoing() -> None: @retry(on=(OperationalError,)) @log_error_if_queue_has_items def auto_transition_issues_new_to_ongoing( - project_ids: List[int], + project_ids: List[int], # TODO remove arg in next PR first_seen_lte: int, - organization_id: int, + organization_id: int, # TODO remove arg in next PR **kwargs, ) -> None: """ @@ -153,7 +116,6 @@ def auto_transition_issues_new_to_ongoing( logger.info( "auto_transition_issues_new_to_ongoing started", extra={ - "organization_id": organization_id, "most_recent_group_first_seen_seven_days_ago": most_recent_group_first_seen_seven_days_ago.id, "first_seen_lte": first_seen_lte, }, @@ -162,12 +124,12 @@ def auto_transition_issues_new_to_ongoing( for new_groups in chunked( RangeQuerySetWrapper( Group.objects.filter( - project_id__in=project_ids, status=GroupStatus.UNRESOLVED, substatus=GroupSubStatus.NEW, - id__lte=most_recent_group_first_seen_seven_days_ago.id, + id__lte=1, ), step=ITERATOR_CHUNK, + limit=ITERATOR_CHUNK * 50, ), ITERATOR_CHUNK, ): @@ -175,7 +137,6 @@ def auto_transition_issues_new_to_ongoing( logger.info( "auto_transition_issues_new_to_ongoing updating group", extra={ - "organization_id": organization_id, "most_recent_group_first_seen_seven_days_ago": most_recent_group_first_seen_seven_days_ago.id, "group_id": group.id, }, @@ -201,20 +162,15 @@ def auto_transition_issues_new_to_ongoing( @retry(on=(OperationalError,)) @log_error_if_queue_has_items def auto_transition_issues_regressed_to_ongoing( - project_ids: List[int], + project_ids: List[int], # TODO(nisanthan): Remove this arg in next PR date_added_lte: int, project_id: Optional[int] = None, # TODO(nisanthan): Remove this arg in next PR **kwargs, ) -> None: - # TODO(nisanthan): Remove this conditional in next PR - if project_id is not None: - project_ids = [project_id] - for groups_with_regressed_history in chunked( RangeQuerySetWrapper( Group.objects.filter( - project_id__in=project_ids, status=GroupStatus.UNRESOLVED, substatus=GroupSubStatus.REGRESSED, grouphistory__status=GroupHistoryStatus.REGRESSED, @@ -224,6 +180,7 @@ def auto_transition_issues_regressed_to_ongoing( recent_regressed_history__lte=datetime.fromtimestamp(date_added_lte, timezone.utc) ), step=ITERATOR_CHUNK, + limit=ITERATOR_CHUNK * 50, ), ITERATOR_CHUNK, ): @@ -249,19 +206,15 @@ def auto_transition_issues_regressed_to_ongoing( @retry(on=(OperationalError,)) @log_error_if_queue_has_items def auto_transition_issues_escalating_to_ongoing( - project_ids: List[int], + project_ids: List[int], # TODO(nisanthan): Remove this arg in next PR date_added_lte: int, project_id: Optional[int] = None, # TODO(nisanthan): Remove this arg in next PR **kwargs, ) -> None: - # TODO(nisanthan): Remove this conditional in next PR - if project_id is not None: - project_ids = [project_id] for new_groups in chunked( RangeQuerySetWrapper( Group.objects.filter( - project_id__in=project_ids, status=GroupStatus.UNRESOLVED, substatus=GroupSubStatus.ESCALATING, grouphistory__status=GroupHistoryStatus.ESCALATING, @@ -271,6 +224,7 @@ def auto_transition_issues_escalating_to_ongoing( recent_escalating_history__lte=datetime.fromtimestamp(date_added_lte, timezone.utc) ), step=ITERATOR_CHUNK, + limit=ITERATOR_CHUNK * 50, ), ITERATOR_CHUNK, ): From b34a5752662a366649aa0576960e62144ff29558 Mon Sep 17 00:00:00 2001 From: Nisanthan Nanthakumar Date: Wed, 13 Sep 2023 08:28:35 -0700 Subject: [PATCH 2/7] run the job every minute --- src/sentry/conf/server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index f7e75f5f726e32..646181a13754ef 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -1131,8 +1131,8 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str: }, "schedule_auto_transition_to_ongoing": { "task": "sentry.tasks.schedule_auto_transition_to_ongoing", - # Run job every 10 minutes - "schedule": crontab(minute="*/10"), + # Run job every minute + "schedule": crontab(minute="*/1"), "options": {"expires": 3600}, }, "github_comment_reactions": { From 62ec35ae83f03712d79eef840096a5339b13fd64 Mon Sep 17 00:00:00 2001 From: Nisanthan Nanthakumar Date: Wed, 13 Sep 2023 14:44:39 -0700 Subject: [PATCH 3/7] trigger child tasks & add comments & and pass group_ids --- src/sentry/issues/ongoing.py | 6 +- src/sentry/issues/update_inbox.py | 2 +- src/sentry/tasks/auto_ongoing_issues.py | 169 +++++++++++++++++------- tests/sentry/issues/test_ongoing.py | 6 +- 4 files changed, 132 insertions(+), 51 deletions(-) diff --git a/src/sentry/issues/ongoing.py b/src/sentry/issues/ongoing.py index 775fbd30e1e63f..953bc6eea6b91a 100644 --- a/src/sentry/issues/ongoing.py +++ b/src/sentry/issues/ongoing.py @@ -10,12 +10,12 @@ def bulk_transition_group_to_ongoing( from_status: int, from_substatus: int, - groups: List[Group], + group_ids: List[int], activity_data: Optional[Mapping[str, Any]] = None, ) -> None: # make sure we don't update the Group when its already updated by conditionally updating the Group groups_to_transistion = Group.objects.filter( - id__in=[group.id for group in groups], status=from_status, substatus=from_substatus + id__in=group_ids, status=from_status, substatus=from_substatus ) Group.objects.update_group_status( @@ -31,7 +31,7 @@ def bulk_transition_group_to_ongoing( group.status = GroupStatus.UNRESOLVED group.substatus = GroupSubStatus.ONGOING - bulk_remove_groups_from_inbox(groups) + bulk_remove_groups_from_inbox(groups_to_transistion) for group in groups_to_transistion: post_save.send_robust( diff --git a/src/sentry/issues/update_inbox.py b/src/sentry/issues/update_inbox.py index 5182e77a8d2b89..ce1689562d6e5f 100644 --- a/src/sentry/issues/update_inbox.py +++ b/src/sentry/issues/update_inbox.py @@ -54,7 +54,7 @@ def update_inbox( bulk_transition_group_to_ongoing( group.status, group.substatus, - [group], + [group.id], activity_data={"manually": True}, ) diff --git a/src/sentry/tasks/auto_ongoing_issues.py b/src/sentry/tasks/auto_ongoing_issues.py index c106d0358d3e2f..66dc8cd64af893 100644 --- a/src/sentry/tasks/auto_ongoing_issues.py +++ b/src/sentry/tasks/auto_ongoing_issues.py @@ -1,7 +1,7 @@ import logging from datetime import datetime, timedelta, timezone from functools import wraps -from typing import List, Optional +from typing import List from django.db import OperationalError from django.db.models import Max @@ -59,25 +59,26 @@ def wrapped(*args, **kwargs): @monitor(monitor_slug="schedule_auto_transition_to_ongoing") @log_error_if_queue_has_items def schedule_auto_transition_to_ongoing() -> None: + """ + Triggered by cronjob every minute. This task will spawn subtasks + that transition Issues to Ongoing according to their specific + criteria. + """ now = datetime.now(tz=timezone.utc) seven_days_ago = now - timedelta(days=TRANSITION_AFTER_DAYS) auto_transition_issues_new_to_ongoing.delay( - project_ids=[], # TODO remove arg in next PR first_seen_lte=int(seven_days_ago.timestamp()), - organization_id=None, # TODO remove arg in next PR expires=now + timedelta(hours=1), ) auto_transition_issues_regressed_to_ongoing.delay( - project_ids=[], # TODO(nisanthan): Remove this arg in next PR date_added_lte=int(seven_days_ago.timestamp()), expires=now + timedelta(hours=1), ) auto_transition_issues_escalating_to_ongoing.delay( - project_ids=[], # TODO(nisanthan): Remove this arg in next PR date_added_lte=int(seven_days_ago.timestamp()), expires=now + timedelta(hours=1), ) @@ -96,14 +97,15 @@ def schedule_auto_transition_to_ongoing() -> None: @retry(on=(OperationalError,)) @log_error_if_queue_has_items def auto_transition_issues_new_to_ongoing( - project_ids: List[int], # TODO remove arg in next PR first_seen_lte: int, - organization_id: int, # TODO remove arg in next PR **kwargs, ) -> None: """ - We will update all NEW Groups to ONGOING that were created before the - most recent Group first seen 7 days ago. + We will update NEW Groups to ONGOING that were created before the + most recent Group first seen 7 days ago. This task will trigger upto + 50 subtasks to complete the update. We don't expect all eligible Groups + to be updated in a single run. However, we expect every instantiation of this task + to chip away at the backlog of Groups and eventually update all the eligible groups. """ most_recent_group_first_seen_seven_days_ago = ( @@ -121,34 +123,51 @@ def auto_transition_issues_new_to_ongoing( }, ) - for new_groups in chunked( + for new_group_ids in chunked( RangeQuerySetWrapper( Group.objects.filter( status=GroupStatus.UNRESOLVED, substatus=GroupSubStatus.NEW, - id__lte=1, - ), + id__lte=most_recent_group_first_seen_seven_days_ago.id, + ).values_list("id", flat=True), step=ITERATOR_CHUNK, limit=ITERATOR_CHUNK * 50, + result_value_getter=lambda item: item, ), ITERATOR_CHUNK, ): - for group in new_groups: - logger.info( - "auto_transition_issues_new_to_ongoing updating group", - extra={ - "most_recent_group_first_seen_seven_days_ago": most_recent_group_first_seen_seven_days_ago.id, - "group_id": group.id, - }, - ) - bulk_transition_group_to_ongoing( - GroupStatus.UNRESOLVED, - GroupSubStatus.NEW, - new_groups, - activity_data={"after_days": TRANSITION_AFTER_DAYS}, + run_auto_transition_issues_new_to_ongoing.delay( + group_ids=new_group_ids, ) +@instrumented_task( + name="sentry.tasks.run_auto_transition_issues_new_to_ongoing", + queue="auto_transition_issue_states", + time_limit=25 * 60, + soft_time_limit=20 * 60, + max_retries=3, + default_retry_delay=60, + acks_late=True, + silo_mode=SiloMode.REGION, +) +@retry(on=(OperationalError,)) +def run_auto_transition_issues_new_to_ongoing( + group_ids: List[int], + **kwargs, +): + """ + Child task of `auto_transition_issues_new_to_ongoing` + to conduct the update of specified Groups to Ongoing. + """ + bulk_transition_group_to_ongoing( + GroupStatus.UNRESOLVED, + GroupSubStatus.NEW, + group_ids, + activity_data={"after_days": TRANSITION_AFTER_DAYS}, + ) + + @instrumented_task( name="sentry.tasks.auto_transition_issues_regressed_to_ongoing", queue="auto_transition_issue_states", @@ -162,13 +181,18 @@ def auto_transition_issues_new_to_ongoing( @retry(on=(OperationalError,)) @log_error_if_queue_has_items def auto_transition_issues_regressed_to_ongoing( - project_ids: List[int], # TODO(nisanthan): Remove this arg in next PR date_added_lte: int, - project_id: Optional[int] = None, # TODO(nisanthan): Remove this arg in next PR **kwargs, ) -> None: + """ + We will update REGRESSED Groups to ONGOING that were created before the + most recent Group first seen 7 days ago. This task will trigger upto + 50 subtasks to complete the update. We don't expect all eligible Groups + to be updated in a single run. However, we expect every instantiation of this task + to chip away at the backlog of Groups and eventually update all the eligible groups. + """ - for groups_with_regressed_history in chunked( + for group_ids_with_regressed_history in chunked( RangeQuerySetWrapper( Group.objects.filter( status=GroupStatus.UNRESOLVED, @@ -178,21 +202,46 @@ def auto_transition_issues_regressed_to_ongoing( .annotate(recent_regressed_history=Max("grouphistory__date_added")) .filter( recent_regressed_history__lte=datetime.fromtimestamp(date_added_lte, timezone.utc) - ), + ) + .values_list("id", flat=True), step=ITERATOR_CHUNK, limit=ITERATOR_CHUNK * 50, + result_value_getter=lambda item: item, ), ITERATOR_CHUNK, ): - - bulk_transition_group_to_ongoing( - GroupStatus.UNRESOLVED, - GroupSubStatus.REGRESSED, - groups_with_regressed_history, - activity_data={"after_days": TRANSITION_AFTER_DAYS}, + run_auto_transition_issues_regressed_to_ongoing.delay( + group_ids=group_ids_with_regressed_history, ) +@instrumented_task( + name="sentry.tasks.run_auto_transition_issues_regressed_to_ongoing", + queue="auto_transition_issue_states", + time_limit=25 * 60, + soft_time_limit=20 * 60, + max_retries=3, + default_retry_delay=60, + acks_late=True, + silo_mode=SiloMode.REGION, +) +@retry(on=(OperationalError,)) +def run_auto_transition_issues_regressed_to_ongoing( + group_ids: List[int], + **kwargs, +) -> None: + """ + Child task of `auto_transition_issues_regressed_to_ongoing` + to conduct the update of specified Groups to Ongoing. + """ + bulk_transition_group_to_ongoing( + GroupStatus.UNRESOLVED, + GroupSubStatus.REGRESSED, + group_ids, + activity_data={"after_days": TRANSITION_AFTER_DAYS}, + ) + + @instrumented_task( name="sentry.tasks.auto_transition_issues_escalating_to_ongoing", queue="auto_transition_issue_states", @@ -206,13 +255,17 @@ def auto_transition_issues_regressed_to_ongoing( @retry(on=(OperationalError,)) @log_error_if_queue_has_items def auto_transition_issues_escalating_to_ongoing( - project_ids: List[int], # TODO(nisanthan): Remove this arg in next PR date_added_lte: int, - project_id: Optional[int] = None, # TODO(nisanthan): Remove this arg in next PR **kwargs, ) -> None: - - for new_groups in chunked( + """ + We will update ESCALATING Groups to ONGOING that were created before the + most recent Group first seen 7 days ago. This task will trigger upto + 50 subtasks to complete the update. We don't expect all eligible Groups + to be updated in a single run. However, we expect every instantiation of this task + to chip away at the backlog of Groups and eventually update all the eligible groups. + """ + for new_group_ids in chunked( RangeQuerySetWrapper( Group.objects.filter( status=GroupStatus.UNRESOLVED, @@ -222,15 +275,41 @@ def auto_transition_issues_escalating_to_ongoing( .annotate(recent_escalating_history=Max("grouphistory__date_added")) .filter( recent_escalating_history__lte=datetime.fromtimestamp(date_added_lte, timezone.utc) - ), + ) + .values_list("id", flat=True), step=ITERATOR_CHUNK, limit=ITERATOR_CHUNK * 50, + result_value_getter=lambda item: item, ), ITERATOR_CHUNK, ): - bulk_transition_group_to_ongoing( - GroupStatus.UNRESOLVED, - GroupSubStatus.ESCALATING, - new_groups, - activity_data={"after_days": TRANSITION_AFTER_DAYS}, + run_auto_transition_issues_escalating_to_ongoing.delay( + group_ids=new_group_ids, ) + + +@instrumented_task( + name="sentry.tasks.run_auto_transition_issues_escalating_to_ongoing", + queue="auto_transition_issue_states", + time_limit=25 * 60, + soft_time_limit=20 * 60, + max_retries=3, + default_retry_delay=60, + acks_late=True, + silo_mode=SiloMode.REGION, +) +@retry(on=(OperationalError,)) +def run_auto_transition_issues_escalating_to_ongoing( + group_ids: List[int], + **kwargs, +) -> None: + """ + Child task of `auto_transition_issues_escalating_to_ongoing` + to conduct the update of specified Groups to Ongoing. + """ + bulk_transition_group_to_ongoing( + GroupStatus.UNRESOLVED, + GroupSubStatus.ESCALATING, + group_ids, + activity_data={"after_days": TRANSITION_AFTER_DAYS}, + ) diff --git a/tests/sentry/issues/test_ongoing.py b/tests/sentry/issues/test_ongoing.py index 99ad0c573d05e0..9a18d512d8cc72 100644 --- a/tests/sentry/issues/test_ongoing.py +++ b/tests/sentry/issues/test_ongoing.py @@ -9,7 +9,7 @@ class TransitionNewToOngoingTest(TestCase): def test_new_to_ongoing(self) -> None: group = self.create_group(status=GroupStatus.UNRESOLVED, substatus=GroupSubStatus.NEW) - bulk_transition_group_to_ongoing(GroupStatus.UNRESOLVED, GroupSubStatus.NEW, [group]) + bulk_transition_group_to_ongoing(GroupStatus.UNRESOLVED, GroupSubStatus.NEW, [group.id]) assert Activity.objects.filter( group=group, type=ActivityType.AUTO_SET_ONGOING.value ).exists() @@ -20,7 +20,9 @@ def test_new_to_ongoing(self) -> None: def test_regressed_to_ongoing(self) -> None: group = self.create_group(status=GroupStatus.UNRESOLVED, substatus=GroupSubStatus.REGRESSED) - bulk_transition_group_to_ongoing(GroupStatus.UNRESOLVED, GroupSubStatus.REGRESSED, [group]) + bulk_transition_group_to_ongoing( + GroupStatus.UNRESOLVED, GroupSubStatus.REGRESSED, [group.id] + ) assert Activity.objects.filter( group=group, type=ActivityType.AUTO_SET_ONGOING.value ).exists() From eaf09f6982685bcde2674304508f0e0f9698237d Mon Sep 17 00:00:00 2001 From: Nisanthan Nanthakumar Date: Thu, 14 Sep 2023 10:30:27 -0700 Subject: [PATCH 4/7] add test --- .../sentry/tasks/test_auto_ongoing_issues.py | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/tests/sentry/tasks/test_auto_ongoing_issues.py b/tests/sentry/tasks/test_auto_ongoing_issues.py index 1a6e2490d54bc4..32184b1827b996 100644 --- a/tests/sentry/tasks/test_auto_ongoing_issues.py +++ b/tests/sentry/tasks/test_auto_ongoing_issues.py @@ -149,6 +149,55 @@ def test_multiple_old_new(self, mock_backend): ).values_list("id", flat=True) ) == {g.id for g in older_groups} + @freeze_time("2023-07-12 18:40:00Z") + @mock.patch("sentry.tasks.auto_ongoing_issues.ITERATOR_CHUNK", new=2) + @mock.patch("sentry.tasks.auto_ongoing_issues.backend") + def test_not_all_groups_get_updated(self, mock_backend): + now = datetime.now(tz=timezone.utc) + project = self.create_project() + groups_count = 110 + for day, hours in [(8, 1) for _ in range(groups_count)]: + group = self.create_group( + project=project, + status=GroupStatus.UNRESOLVED, + substatus=GroupSubStatus.NEW, + ) + first_seen = now - timedelta(days=day, hours=hours) + group.first_seen = first_seen + group.save() + + # before + assert ( + Group.objects.filter( + project_id=project.id, status=GroupStatus.UNRESOLVED, substatus=GroupSubStatus.NEW + ).count() + == groups_count + ) + + mock_backend.get_size.return_value = 0 + + with self.tasks(): + schedule_auto_transition_to_ongoing() + + # after + + assert ( + Group.objects.filter( + project=project, + status=GroupStatus.UNRESOLVED, + substatus=GroupSubStatus.NEW, + ).count() + == 10 + ) + assert ( + Group.objects.filter( + project=project, + status=GroupStatus.UNRESOLVED, + substatus=GroupSubStatus.ONGOING, + ).count() + == 100 + ) + @apply_feature_flag_on_cls("organizations:escalating-issues") class ScheduleAutoRegressedOngoingIssuesTest(TestCase): From ca4d6c01d3366933ff1665199a406b7a04c6e64e Mon Sep 17 00:00:00 2001 From: Nisanthan Nanthakumar Date: Thu, 14 Sep 2023 10:39:07 -0700 Subject: [PATCH 5/7] keep task function args --- src/sentry/tasks/auto_ongoing_issues.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/sentry/tasks/auto_ongoing_issues.py b/src/sentry/tasks/auto_ongoing_issues.py index 66dc8cd64af893..cd3772741fc863 100644 --- a/src/sentry/tasks/auto_ongoing_issues.py +++ b/src/sentry/tasks/auto_ongoing_issues.py @@ -1,7 +1,7 @@ import logging from datetime import datetime, timedelta, timezone from functools import wraps -from typing import List +from typing import List, Optional from django.db import OperationalError from django.db.models import Max @@ -97,7 +97,9 @@ def schedule_auto_transition_to_ongoing() -> None: @retry(on=(OperationalError,)) @log_error_if_queue_has_items def auto_transition_issues_new_to_ongoing( + project_ids: List[int], # TODO(nisanthan): Remove in following PR first_seen_lte: int, + organization_id: int, # TODO(nisanthan): Remove in following PR **kwargs, ) -> None: """ @@ -181,7 +183,9 @@ def run_auto_transition_issues_new_to_ongoing( @retry(on=(OperationalError,)) @log_error_if_queue_has_items def auto_transition_issues_regressed_to_ongoing( + project_ids: List[int], # TODO(nisanthan): Remove this arg in next PR date_added_lte: int, + project_id: Optional[int], # TODO(nisanthan): Remove this arg in next PR **kwargs, ) -> None: """ @@ -255,7 +259,9 @@ def run_auto_transition_issues_regressed_to_ongoing( @retry(on=(OperationalError,)) @log_error_if_queue_has_items def auto_transition_issues_escalating_to_ongoing( + project_ids: List[int], # TODO(nisanthan): Remove this arg in next PR date_added_lte: int, + project_id: Optional[int], # TODO(nisanthan): Remove this arg in next PR **kwargs, ) -> None: """ From 3979c3b4b15090c17faa580eadbbfdafc952efbb Mon Sep 17 00:00:00 2001 From: Nisanthan Nanthakumar Date: Tue, 19 Sep 2023 16:18:07 -0700 Subject: [PATCH 6/7] add analytics & rename subtasks --- src/sentry/tasks/auto_ongoing_issues.py | 180 ++++++++++++++---- .../sentry/tasks/test_auto_ongoing_issues.py | 35 +++- 2 files changed, 173 insertions(+), 42 deletions(-) diff --git a/src/sentry/tasks/auto_ongoing_issues.py b/src/sentry/tasks/auto_ongoing_issues.py index cd3772741fc863..f3fe89ba794b78 100644 --- a/src/sentry/tasks/auto_ongoing_issues.py +++ b/src/sentry/tasks/auto_ongoing_issues.py @@ -1,7 +1,7 @@ import logging from datetime import datetime, timedelta, timezone from functools import wraps -from typing import List, Optional +from typing import List from django.db import OperationalError from django.db.models import Max @@ -14,6 +14,7 @@ from sentry.silo import SiloMode from sentry.tasks.base import instrumented_task, retry from sentry.types.group import GroupSubStatus +from sentry.utils import metrics from sentry.utils.iterators import chunked from sentry.utils.query import RangeQuerySetWrapper @@ -68,24 +69,24 @@ def schedule_auto_transition_to_ongoing() -> None: seven_days_ago = now - timedelta(days=TRANSITION_AFTER_DAYS) - auto_transition_issues_new_to_ongoing.delay( + schedule_auto_transition_issues_new_to_ongoing.delay( first_seen_lte=int(seven_days_ago.timestamp()), expires=now + timedelta(hours=1), ) - auto_transition_issues_regressed_to_ongoing.delay( + schedule_auto_transition_issues_regressed_to_ongoing.delay( date_added_lte=int(seven_days_ago.timestamp()), expires=now + timedelta(hours=1), ) - auto_transition_issues_escalating_to_ongoing.delay( + schedule_auto_transition_issues_escalating_to_ongoing.delay( date_added_lte=int(seven_days_ago.timestamp()), expires=now + timedelta(hours=1), ) @instrumented_task( - name="sentry.tasks.auto_transition_issues_new_to_ongoing", + name="sentry.tasks.schedule_auto_transition_issues_new_to_ongoing", queue="auto_transition_issue_states", time_limit=25 * 60, soft_time_limit=20 * 60, @@ -96,10 +97,8 @@ def schedule_auto_transition_to_ongoing() -> None: ) @retry(on=(OperationalError,)) @log_error_if_queue_has_items -def auto_transition_issues_new_to_ongoing( - project_ids: List[int], # TODO(nisanthan): Remove in following PR +def schedule_auto_transition_issues_new_to_ongoing( first_seen_lte: int, - organization_id: int, # TODO(nisanthan): Remove in following PR **kwargs, ) -> None: """ @@ -110,6 +109,20 @@ def auto_transition_issues_new_to_ongoing( to chip away at the backlog of Groups and eventually update all the eligible groups. """ + last_id = None + total_count = 0 + + def get_last_id(results): + nonlocal last_id + try: + last_id = results[-1] + except IndexError: + last_id = None + + def get_total_count(results): + nonlocal total_count + total_count += len(results) + most_recent_group_first_seen_seven_days_ago = ( Group.objects.filter( first_seen__lte=datetime.fromtimestamp(first_seen_lte, timezone.utc), @@ -125,16 +138,19 @@ def auto_transition_issues_new_to_ongoing( }, ) + base_queryset = Group.objects.filter( + status=GroupStatus.UNRESOLVED, + substatus=GroupSubStatus.NEW, + id__lte=most_recent_group_first_seen_seven_days_ago.id, + ) + for new_group_ids in chunked( RangeQuerySetWrapper( - Group.objects.filter( - status=GroupStatus.UNRESOLVED, - substatus=GroupSubStatus.NEW, - id__lte=most_recent_group_first_seen_seven_days_ago.id, - ).values_list("id", flat=True), + base_queryset._clone().values_list("id", flat=True), step=ITERATOR_CHUNK, limit=ITERATOR_CHUNK * 50, result_value_getter=lambda item: item, + callbacks=[get_last_id, get_total_count], ), ITERATOR_CHUNK, ): @@ -142,6 +158,24 @@ def auto_transition_issues_new_to_ongoing( group_ids=new_group_ids, ) + remaining_groups_queryset = base_queryset._clone() + + if last_id is not None: + remaining_groups_queryset = remaining_groups_queryset.filter(id__gt=last_id) + + remaining_groups = remaining_groups_queryset.count() + + metrics.incr( + "sentry.tasks.schedule_auto_transition_issues_new_to_ongoing.executed", + sample_rate=1.0, + tags={"count": total_count}, + ) + metrics.incr( + "sentry.tasks.schedule_auto_transition_issues_new_to_ongoing.remaining", + sample_rate=1.0, + tags={"count": remaining_groups}, + ) + @instrumented_task( name="sentry.tasks.run_auto_transition_issues_new_to_ongoing", @@ -171,7 +205,7 @@ def run_auto_transition_issues_new_to_ongoing( @instrumented_task( - name="sentry.tasks.auto_transition_issues_regressed_to_ongoing", + name="sentry.tasks.schedule_auto_transition_issues_regressed_to_ongoing", queue="auto_transition_issue_states", time_limit=25 * 60, soft_time_limit=20 * 60, @@ -182,10 +216,8 @@ def run_auto_transition_issues_new_to_ongoing( ) @retry(on=(OperationalError,)) @log_error_if_queue_has_items -def auto_transition_issues_regressed_to_ongoing( - project_ids: List[int], # TODO(nisanthan): Remove this arg in next PR +def schedule_auto_transition_issues_regressed_to_ongoing( date_added_lte: int, - project_id: Optional[int], # TODO(nisanthan): Remove this arg in next PR **kwargs, ) -> None: """ @@ -195,22 +227,37 @@ def auto_transition_issues_regressed_to_ongoing( to be updated in a single run. However, we expect every instantiation of this task to chip away at the backlog of Groups and eventually update all the eligible groups. """ + last_id = None + total_count = 0 + + def get_last_id(results): + nonlocal last_id + try: + last_id = results[-1] + except IndexError: + last_id = None + + def get_total_count(results): + nonlocal total_count + total_count += len(results) + + base_queryset = ( + Group.objects.filter( + status=GroupStatus.UNRESOLVED, + substatus=GroupSubStatus.REGRESSED, + grouphistory__status=GroupHistoryStatus.REGRESSED, + ) + .annotate(recent_regressed_history=Max("grouphistory__date_added")) + .filter(recent_regressed_history__lte=datetime.fromtimestamp(date_added_lte, timezone.utc)) + ) for group_ids_with_regressed_history in chunked( RangeQuerySetWrapper( - Group.objects.filter( - status=GroupStatus.UNRESOLVED, - substatus=GroupSubStatus.REGRESSED, - grouphistory__status=GroupHistoryStatus.REGRESSED, - ) - .annotate(recent_regressed_history=Max("grouphistory__date_added")) - .filter( - recent_regressed_history__lte=datetime.fromtimestamp(date_added_lte, timezone.utc) - ) - .values_list("id", flat=True), + base_queryset._clone().values_list("id", flat=True), step=ITERATOR_CHUNK, limit=ITERATOR_CHUNK * 50, result_value_getter=lambda item: item, + callbacks=[get_last_id, get_total_count], ), ITERATOR_CHUNK, ): @@ -218,6 +265,24 @@ def auto_transition_issues_regressed_to_ongoing( group_ids=group_ids_with_regressed_history, ) + remaining_groups_queryset = base_queryset._clone() + + if last_id is not None: + remaining_groups_queryset = remaining_groups_queryset.filter(id__gt=last_id) + + remaining_groups = remaining_groups_queryset.count() + + metrics.incr( + "sentry.tasks.schedule_auto_transition_issues_regressed_to_ongoing.executed", + sample_rate=1.0, + tags={"count": total_count}, + ) + metrics.incr( + "sentry.tasks.schedule_auto_transition_issues_regressed_to_ongoing.remaining", + sample_rate=1.0, + tags={"count": remaining_groups}, + ) + @instrumented_task( name="sentry.tasks.run_auto_transition_issues_regressed_to_ongoing", @@ -247,7 +312,7 @@ def run_auto_transition_issues_regressed_to_ongoing( @instrumented_task( - name="sentry.tasks.auto_transition_issues_escalating_to_ongoing", + name="sentry.tasks.schedule_auto_transition_issues_escalating_to_ongoing", queue="auto_transition_issue_states", time_limit=25 * 60, soft_time_limit=20 * 60, @@ -258,10 +323,8 @@ def run_auto_transition_issues_regressed_to_ongoing( ) @retry(on=(OperationalError,)) @log_error_if_queue_has_items -def auto_transition_issues_escalating_to_ongoing( - project_ids: List[int], # TODO(nisanthan): Remove this arg in next PR +def schedule_auto_transition_issues_escalating_to_ongoing( date_added_lte: int, - project_id: Optional[int], # TODO(nisanthan): Remove this arg in next PR **kwargs, ) -> None: """ @@ -271,21 +334,38 @@ def auto_transition_issues_escalating_to_ongoing( to be updated in a single run. However, we expect every instantiation of this task to chip away at the backlog of Groups and eventually update all the eligible groups. """ + + last_id = None + total_count = 0 + + def get_last_id(results): + nonlocal last_id + try: + last_id = results[-1] + except IndexError: + last_id = None + + def get_total_count(results): + nonlocal total_count + total_count += len(results) + + base_queryset = ( + Group.objects.filter( + status=GroupStatus.UNRESOLVED, + substatus=GroupSubStatus.ESCALATING, + grouphistory__status=GroupHistoryStatus.ESCALATING, + ) + .annotate(recent_escalating_history=Max("grouphistory__date_added")) + .filter(recent_escalating_history__lte=datetime.fromtimestamp(date_added_lte, timezone.utc)) + ) + for new_group_ids in chunked( RangeQuerySetWrapper( - Group.objects.filter( - status=GroupStatus.UNRESOLVED, - substatus=GroupSubStatus.ESCALATING, - grouphistory__status=GroupHistoryStatus.ESCALATING, - ) - .annotate(recent_escalating_history=Max("grouphistory__date_added")) - .filter( - recent_escalating_history__lte=datetime.fromtimestamp(date_added_lte, timezone.utc) - ) - .values_list("id", flat=True), + base_queryset._clone().values_list("id", flat=True), step=ITERATOR_CHUNK, limit=ITERATOR_CHUNK * 50, result_value_getter=lambda item: item, + callbacks=[get_last_id, get_total_count], ), ITERATOR_CHUNK, ): @@ -293,6 +373,24 @@ def auto_transition_issues_escalating_to_ongoing( group_ids=new_group_ids, ) + remaining_groups_queryset = base_queryset._clone() + + if last_id is not None: + remaining_groups_queryset = remaining_groups_queryset.filter(id__gt=last_id) + + remaining_groups = remaining_groups_queryset.count() + + metrics.incr( + "sentry.tasks.schedule_auto_transition_issues_escalating_to_ongoing.executed", + sample_rate=1.0, + tags={"count": total_count}, + ) + metrics.incr( + "sentry.tasks.schedule_auto_transition_issues_escalating_to_ongoing.remaining", + sample_rate=1.0, + tags={"count": remaining_groups}, + ) + @instrumented_task( name="sentry.tasks.run_auto_transition_issues_escalating_to_ongoing", diff --git a/tests/sentry/tasks/test_auto_ongoing_issues.py b/tests/sentry/tasks/test_auto_ongoing_issues.py index 32184b1827b996..30ca93283b7125 100644 --- a/tests/sentry/tasks/test_auto_ongoing_issues.py +++ b/tests/sentry/tasks/test_auto_ongoing_issues.py @@ -150,9 +150,10 @@ def test_multiple_old_new(self, mock_backend): ) == {g.id for g in older_groups} @freeze_time("2023-07-12 18:40:00Z") + @mock.patch("sentry.utils.metrics.incr") @mock.patch("sentry.tasks.auto_ongoing_issues.ITERATOR_CHUNK", new=2) @mock.patch("sentry.tasks.auto_ongoing_issues.backend") - def test_not_all_groups_get_updated(self, mock_backend): + def test_not_all_groups_get_updated(self, mock_backend, mock_metrics_incr): now = datetime.now(tz=timezone.utc) project = self.create_project() groups_count = 110 @@ -198,6 +199,38 @@ def test_not_all_groups_get_updated(self, mock_backend): == 100 ) + mock_metrics_incr.assert_any_call( + "sentry.tasks.schedule_auto_transition_issues_new_to_ongoing.executed", + sample_rate=1.0, + tags={"count": 100}, + ) + mock_metrics_incr.assert_any_call( + "sentry.tasks.schedule_auto_transition_issues_new_to_ongoing.remaining", + sample_rate=1.0, + tags={"count": 10}, + ) + + mock_metrics_incr.assert_any_call( + "sentry.tasks.schedule_auto_transition_issues_regressed_to_ongoing.executed", + sample_rate=1.0, + tags={"count": 0}, + ) + mock_metrics_incr.assert_any_call( + "sentry.tasks.schedule_auto_transition_issues_regressed_to_ongoing.remaining", + sample_rate=1.0, + tags={"count": 0}, + ) + mock_metrics_incr.assert_any_call( + "sentry.tasks.schedule_auto_transition_issues_escalating_to_ongoing.executed", + sample_rate=1.0, + tags={"count": 0}, + ) + mock_metrics_incr.assert_any_call( + "sentry.tasks.schedule_auto_transition_issues_escalating_to_ongoing.remaining", + sample_rate=1.0, + tags={"count": 0}, + ) + @apply_feature_flag_on_cls("organizations:escalating-issues") class ScheduleAutoRegressedOngoingIssuesTest(TestCase): From 7a6a69d8cb839ff1df3692582ded05c19e3c56d3 Mon Sep 17 00:00:00 2001 From: Nisanthan Nanthakumar Date: Tue, 19 Sep 2023 16:34:21 -0700 Subject: [PATCH 7/7] add test coverage for other tasks --- .../sentry/tasks/test_auto_ongoing_issues.py | 165 +++++++++++++++++- 1 file changed, 160 insertions(+), 5 deletions(-) diff --git a/tests/sentry/tasks/test_auto_ongoing_issues.py b/tests/sentry/tasks/test_auto_ongoing_issues.py index 30ca93283b7125..c51cf1b286f792 100644 --- a/tests/sentry/tasks/test_auto_ongoing_issues.py +++ b/tests/sentry/tasks/test_auto_ongoing_issues.py @@ -157,15 +157,13 @@ def test_not_all_groups_get_updated(self, mock_backend, mock_metrics_incr): now = datetime.now(tz=timezone.utc) project = self.create_project() groups_count = 110 - for day, hours in [(8, 1) for _ in range(groups_count)]: - group = self.create_group( + for day, hours in [(TRANSITION_AFTER_DAYS, 1) for _ in range(groups_count)]: + self.create_group( project=project, status=GroupStatus.UNRESOLVED, substatus=GroupSubStatus.NEW, + first_seen=now - timedelta(days=day, hours=hours), ) - first_seen = now - timedelta(days=day, hours=hours) - group.first_seen = first_seen - group.save() # before assert ( @@ -270,6 +268,84 @@ def test_simple(self, mock_backend): assert GroupHistory.objects.filter(group=group, status=GroupHistoryStatus.ONGOING).exists() + @freeze_time("2023-07-12 18:40:00Z") + @mock.patch("sentry.utils.metrics.incr") + @mock.patch("sentry.tasks.auto_ongoing_issues.ITERATOR_CHUNK", new=2) + @mock.patch("sentry.tasks.auto_ongoing_issues.backend") + def test_not_all_groups_get_updated(self, mock_backend, mock_metrics_incr): + now = datetime.now(tz=timezone.utc) + project = self.create_project() + groups_count = 110 + for day, hours in [(TRANSITION_AFTER_DAYS, 1) for _ in range(groups_count)]: + group = self.create_group( + project=project, + status=GroupStatus.UNRESOLVED, + substatus=GroupSubStatus.REGRESSED, + first_seen=now - timedelta(days=day, hours=hours), + ) + group_inbox = add_group_to_inbox(group, GroupInboxReason.REGRESSION) + group_inbox.date_added = now - timedelta(days=TRANSITION_AFTER_DAYS, hours=1) + group_inbox.save(update_fields=["date_added"]) + group_history = record_group_history( + group, GroupHistoryStatus.REGRESSED, actor=None, release=None + ) + group_history.date_added = now - timedelta(days=TRANSITION_AFTER_DAYS, hours=1) + group_history.save(update_fields=["date_added"]) + + mock_backend.get_size.return_value = 0 + + with self.tasks(): + schedule_auto_transition_to_ongoing() + + assert ( + Group.objects.filter( + project=project, + status=GroupStatus.UNRESOLVED, + substatus=GroupSubStatus.REGRESSED, + ).count() + == 10 + ) + assert ( + Group.objects.filter( + project=project, + status=GroupStatus.UNRESOLVED, + substatus=GroupSubStatus.ONGOING, + ).count() + == 100 + ) + + mock_metrics_incr.assert_any_call( + "sentry.tasks.schedule_auto_transition_issues_new_to_ongoing.executed", + sample_rate=1.0, + tags={"count": 0}, + ) + mock_metrics_incr.assert_any_call( + "sentry.tasks.schedule_auto_transition_issues_new_to_ongoing.remaining", + sample_rate=1.0, + tags={"count": 0}, + ) + + mock_metrics_incr.assert_any_call( + "sentry.tasks.schedule_auto_transition_issues_regressed_to_ongoing.executed", + sample_rate=1.0, + tags={"count": 100}, + ) + mock_metrics_incr.assert_any_call( + "sentry.tasks.schedule_auto_transition_issues_regressed_to_ongoing.remaining", + sample_rate=1.0, + tags={"count": 10}, + ) + mock_metrics_incr.assert_any_call( + "sentry.tasks.schedule_auto_transition_issues_escalating_to_ongoing.executed", + sample_rate=1.0, + tags={"count": 0}, + ) + mock_metrics_incr.assert_any_call( + "sentry.tasks.schedule_auto_transition_issues_escalating_to_ongoing.remaining", + sample_rate=1.0, + tags={"count": 0}, + ) + @apply_feature_flag_on_cls("organizations:escalating-issues") class ScheduleAutoEscalatingOngoingIssuesTest(TestCase): @@ -308,3 +384,82 @@ def test_simple(self, mock_backend): assert set_ongoing_activity.data == {"after_days": 7} assert GroupHistory.objects.filter(group=group, status=GroupHistoryStatus.ONGOING).exists() + + @freeze_time("2023-07-12 18:40:00Z") + @mock.patch("sentry.utils.metrics.incr") + @mock.patch("sentry.tasks.auto_ongoing_issues.ITERATOR_CHUNK", new=2) + @mock.patch("sentry.tasks.auto_ongoing_issues.backend") + def test_not_all_groups_get_updated(self, mock_backend, mock_metrics_incr): + now = datetime.now(tz=timezone.utc) + project = self.create_project() + groups_count = 110 + + for day, hours in [(TRANSITION_AFTER_DAYS, 1) for _ in range(groups_count)]: + group = self.create_group( + project=project, + status=GroupStatus.UNRESOLVED, + substatus=GroupSubStatus.ESCALATING, + first_seen=now - timedelta(days=day, hours=hours), + ) + group_inbox = add_group_to_inbox(group, GroupInboxReason.ESCALATING) + group_inbox.date_added = now - timedelta(days=TRANSITION_AFTER_DAYS, hours=1) + group_inbox.save(update_fields=["date_added"]) + group_history = record_group_history( + group, GroupHistoryStatus.ESCALATING, actor=None, release=None + ) + group_history.date_added = now - timedelta(days=TRANSITION_AFTER_DAYS, hours=1) + group_history.save(update_fields=["date_added"]) + + mock_backend.get_size.return_value = 0 + + with self.tasks(): + schedule_auto_transition_to_ongoing() + + assert ( + Group.objects.filter( + project=project, + status=GroupStatus.UNRESOLVED, + substatus=GroupSubStatus.ESCALATING, + ).count() + == 10 + ) + assert ( + Group.objects.filter( + project=project, + status=GroupStatus.UNRESOLVED, + substatus=GroupSubStatus.ONGOING, + ).count() + == 100 + ) + + mock_metrics_incr.assert_any_call( + "sentry.tasks.schedule_auto_transition_issues_new_to_ongoing.executed", + sample_rate=1.0, + tags={"count": 0}, + ) + mock_metrics_incr.assert_any_call( + "sentry.tasks.schedule_auto_transition_issues_new_to_ongoing.remaining", + sample_rate=1.0, + tags={"count": 0}, + ) + + mock_metrics_incr.assert_any_call( + "sentry.tasks.schedule_auto_transition_issues_regressed_to_ongoing.executed", + sample_rate=1.0, + tags={"count": 0}, + ) + mock_metrics_incr.assert_any_call( + "sentry.tasks.schedule_auto_transition_issues_regressed_to_ongoing.remaining", + sample_rate=1.0, + tags={"count": 0}, + ) + mock_metrics_incr.assert_any_call( + "sentry.tasks.schedule_auto_transition_issues_escalating_to_ongoing.executed", + sample_rate=1.0, + tags={"count": 100}, + ) + mock_metrics_incr.assert_any_call( + "sentry.tasks.schedule_auto_transition_issues_escalating_to_ongoing.remaining", + sample_rate=1.0, + tags={"count": 10}, + )