Skip to content

Commit

Permalink
Pass pool_kwargs to the replacement process pool too
Browse files Browse the repository at this point in the history
  • Loading branch information
agronholm committed Nov 24, 2024
1 parent 4584091 commit f3c68df
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 16 deletions.
3 changes: 3 additions & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ UNRELEASED
forking on all platform
- Fixed ``AsyncIOScheduler`` inadvertently creating a defunct event loop at start,
leading to the scheduler not working at all
- Fixed ``ProcessPoolExecutor`` not respecting the passed keyword arguments when a
broken pool was being replaced


3.10.4
------
Expand Down
36 changes: 20 additions & 16 deletions src/apscheduler/executors/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,9 @@ def callback(f):
else:
self._run_job_success(job.id, f.result())

try:
f = self._pool.submit(
run_job, job, job._jobstore_alias, run_times, self._logger.name
)
except BrokenProcessPool:
self._logger.warning(
"Process pool is broken; replacing pool with a fresh instance"
)
self._pool = self._pool.__class__(self._pool._max_workers)
f = self._pool.submit(
run_job, job, job._jobstore_alias, run_times, self._logger.name
)

f = self._pool.submit(
run_job, job, job._jobstore_alias, run_times, self._logger.name
)
f.add_done_callback(callback)

def shutdown(self, wait=True):
Expand Down Expand Up @@ -72,7 +62,21 @@ class ProcessPoolExecutor(BasePoolExecutor):
"""

def __init__(self, max_workers=10, pool_kwargs=None):
pool_kwargs = pool_kwargs or {}
pool_kwargs.setdefault("mp_context", multiprocessing.get_context("spawn"))
pool = concurrent.futures.ProcessPoolExecutor(int(max_workers), **pool_kwargs)
self.pool_kwargs = pool_kwargs or {}
self.pool_kwargs.setdefault("mp_context", multiprocessing.get_context("spawn"))
pool = concurrent.futures.ProcessPoolExecutor(
int(max_workers), **self.pool_kwargs
)
super().__init__(pool)

def _do_submit_job(self, job, run_times):
try:
super()._do_submit_job(job, run_times)
except BrokenProcessPool:
self._logger.warning(
"Process pool is broken; replacing pool with a fresh instance"
)
self._pool = self._pool.__class__(
self._pool._max_workers, **self.pool_kwargs
)
super()._do_submit_job(job, run_times)

0 comments on commit f3c68df

Please sign in to comment.