-
-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ref(escalating-issues): Auto-transition tasks should update up to 500_000 groups per minute #56168
ref(escalating-issues): Auto-transition tasks should update up to 500_000 groups per minute #56168
Conversation
Codecov Report
@@ Coverage Diff @@
## master #56168 +/- ##
==========================================
- Coverage 79.99% 78.61% -1.39%
==========================================
Files 5062 5079 +17
Lines 217728 218659 +931
Branches 36856 37014 +158
==========================================
- Hits 174182 171895 -2287
- Misses 38200 41204 +3004
- Partials 5346 5560 +214
|
@@ -162,20 +124,19 @@ def auto_transition_issues_new_to_ongoing( | |||
for new_groups in chunked( | |||
RangeQuerySetWrapper( | |||
Group.objects.filter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this query needs the new index from this PR: #56180
# Run job every 10 minutes | ||
"schedule": crontab(minute="*/10"), | ||
# Run job every minute | ||
"schedule": crontab(minute="*/1"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changing to run every minute.
@NisanthanNanthakumar do you know what's the throughput for the worker? We need to make sure workers can consume 50000 groups in a minute. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks very good. My only request for changes has to do with the change of signature for tasks since past in the queue would face a conflict with the new signature.
first_seen_lte: int, | ||
organization_id: int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing the signature of a task is a risky change.
You have to write a PR that can handle both the old signature and the new signature (probably making the call being keyword args).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@armenzg ahh yea forgot about this! I will keep this signature for this PR. Once this PR get deployed, new tasks will instantiated with only first_seen_lte
. Then I'll make a new PR to remove the unnecessary args.
@@ -201,41 +181,67 @@ def auto_transition_issues_new_to_ongoing( | |||
@retry(on=(OperationalError,)) | |||
@log_error_if_queue_has_items | |||
def auto_transition_issues_regressed_to_ongoing( | |||
project_ids: List[int], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above.
silo_mode=SiloMode.REGION, | ||
) | ||
@retry(on=(OperationalError,)) | ||
def run_auto_transition_issues_escalating_to_ongoing( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional feedback: This function and run_auto_transition_issues_regressed_to_ongoing
are almost the same in case you want to join them and pass the status/substatus as parameters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to leave it as separate child tasks for analytics purposes.
@hubertsentry this graph show the current duration of the auto_transition_issues_new_to_ongoing task. It peaks at 12s for hot shards. but this iteration will limit each child task to a single bulk update of 10_000 groups. So I expect it to be much faster. |
sounds good, since we have at least 1 workers, and 1 worker should have 2 processes. It might not be too bad |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great! Good test.
date_added_lte=int(seven_days_ago.timestamp()), | ||
expires=now + timedelta(hours=1), | ||
) | ||
schedule_auto_transition_issues_new_to_ongoing.delay( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed the child tasks so that I can remove the unnecessary positional arguments.
"most_recent_group_first_seen_seven_days_ago": most_recent_group_first_seen_seven_days_ago.id, | ||
"first_seen_lte": first_seen_lte, | ||
}, | ||
) | ||
|
||
for new_groups in chunked( | ||
base_queryset = Group.objects.filter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we will reuse the base queryset for the analytics query.
step=ITERATOR_CHUNK, | ||
limit=ITERATOR_CHUNK * 50, | ||
result_value_getter=lambda item: item, | ||
callbacks=[get_last_id, get_total_count], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use the callback functionality to get the id of the last object from all the potential iterations and the total count
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very cool. Good usage 👍🏻
) | ||
|
||
remaining_groups_queryset = base_queryset._clone() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we use _clone() to ensure that you're working with separate instances
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job on all those tests! 🎉
step=ITERATOR_CHUNK, | ||
limit=ITERATOR_CHUNK * 50, | ||
result_value_getter=lambda item: item, | ||
callbacks=[get_last_id, get_total_count], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very cool. Good usage 👍🏻
Suspect IssuesThis pull request was deployed and Sentry observed the following issues:
Did you find this useful? React with a 👍 or 👎 |
Objective:
The current implementation of the auto-transition tasks is leading to spiky memory pressure on RabbitMQ. This is partly due to the hot shards of big orgs with a lot of groups. The alternative approach is to consistently send x number of messages of groups older than 7 days. No need to care about org or project bc all those groups have the same status and substatus changes. This iteration sends up to 50 child tasks with each task updating 10_000 groups. We will increase the number of tasks if the backlog of groups_to_be_updated is increasing.