Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker stops if there are "too much" simultaneous jobs #881

Closed
3 tasks done
mavahedinia opened this issue Mar 25, 2024 · 2 comments
Closed
3 tasks done

Worker stops if there are "too much" simultaneous jobs #881

mavahedinia opened this issue Mar 25, 2024 · 2 comments
Labels

Comments

@mavahedinia
Copy link
Contributor

Things to check first

  • I have checked that my issue does not already have a solution in the FAQ

  • I have searched the existing issues and didn't find my bug already reported there

  • I have checked that my bug is still present in the latest release

Version

4.0.0a4

What happened?

For my use case, I needed to have separate scheduler and worker instances (since there might be thousands of jobs to be executed in a short time and I need the system to be scalable). Everything was fine and stable until our team decided to take a stress test to see when the system breaks. Here we understood that if we throttle the number of jobs submitted to the scheduler, everything would be fine. However, if we submit like 1000 jobs at the same time to the scheduler, workers are overwhelmed and a deadlock situation is caused in the workers job processing loop; causing the worker to stop acquiring and executing jobs at all without any crashing.

I inspected the code and pinpointed the issue. It is happening inside the _process_jobs function under the scheduler (async) class (line 905 as I am reporting the issue - commit hash is f375b67). On line 938 inside the loop the worker awaits wakeup event, which itself is controlled by the job_added function defined there. this function is called only when a new job is added:

async def job_added(event: Event) -> None:
    if len(self._running_jobs) < self.max_concurrent_jobs:
        wakeup_event.set()
...
self.event_broker.subscribe(job_added, {JobAdded})

This combined with the max_concurrent_jobs constraint controlled on line 927 implies that if there are more than max_concurrent_jobs jobs in the db, the worker acquires and tries to execute them, i.e., appends them to the queue; but if there are no newer jobs scheduled after them, the wakeup_event is not set, resulting in a deadlock situation prohibiting the loop from acquiring more jobs, even if the queue is empty.

To fix that, I propose to change the structure only a little bit:

  1. changing the job_added name to check_queue_capacity
  2. Subscribe that function to both JobAdded and JobReleased events.

This way, we can ensure that if there are more jobs that the worker can handle at once, the worker gets notified after the queue is freed up.

How can we reproduce the bug?

Simply schedule around 1000 jobs, run couple of workers (less than 4) and see that some of the jobs won't be executed after a while, unless you restart the workers manually.

@mavahedinia
Copy link
Contributor Author

The proposed solution is available at PR#882

@agronholm
Copy link
Owner

Fixed via 0596db7.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants