Skip to content

Commit

Permalink
fix multiprocessing error propagation
Browse files Browse the repository at this point in the history
  • Loading branch information
azuline committed Oct 25, 2023
1 parent 0bab536 commit 598df92
Showing 1 changed file with 30 additions and 7 deletions.
37 changes: 30 additions & 7 deletions rose/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import re
import sqlite3
import time
import traceback
from collections.abc import Iterator
from contextlib import contextmanager
from dataclasses import asdict, dataclass
Expand All @@ -23,6 +24,8 @@

logger = logging.getLogger(__name__)

T = TypeVar("T")

CACHE_SCHEMA_PATH = Path(__file__).resolve().parent / "cache.sql"


Expand Down Expand Up @@ -235,23 +238,46 @@ def update_cache_for_releases(
num_proc = max(1, math.ceil(len(release_dirs) // 50))
batch_size = len(release_dirs) // num_proc + 1

manager = multiprocessing.Manager()
# Track the known virtual dirnames for collision calculation. This needs to be shared across
# all processes.
# all processes, because we want to compare against the global set of known virtual dirnames.
manager = multiprocessing.Manager()
known_virtual_dirnames = manager.dict()
# Create a queue to propagate exceptions back up to the parent.
error_queue = manager.Queue()

with multiprocessing.Pool(processes=c.max_proc) as pool:
# At 0, no batch. At 1, 1 batch. At 49, 1 batch. At 50, 1 batch. At 51, 2 batches.
for i in range(0, len(release_dirs), batch_size):
logger.debug(
f"Spawning release cache update process for releases [{i}, {i+batch_size})"
)
pool.apply_async(
_update_cache_for_releases_executor,
(c, release_dirs[i : i + batch_size], force, known_virtual_dirnames),
_update_cache_for_releases_process,
(c, release_dirs[i : i + batch_size], force, known_virtual_dirnames, error_queue),
)
pool.close()
pool.join()

if not error_queue.empty():
etype, tb = error_queue.get()
raise etype(f"Error in cache update subprocess.\n{tb}")


def _update_cache_for_releases_process(
c: Config,
release_dirs: list[Path],
force: bool,
known_virtual_dirnames: dict[str, bool],
error_queue: "multiprocessing.Queue[Any]",
) -> None:
"""General error handling stuff for the cache update subprocess."""
try:
return _update_cache_for_releases_executor(c, release_dirs, force, known_virtual_dirnames)
except Exception as e:
# Use traceback.format_exc() to get the formatted traceback string
tb = traceback.format_exc()
error_queue.put((type(e), tb))


def _update_cache_for_releases_executor(
c: Config,
Expand Down Expand Up @@ -1461,9 +1487,6 @@ def _sanitize_filename(x: str) -> str:
return ILLEGAL_FS_CHARS_REGEX.sub("_", x)


T = TypeVar("T")


def _flatten(xxs: list[list[T]]) -> list[T]:
xs: list[T] = []
for group in xxs:
Expand Down

0 comments on commit 598df92

Please sign in to comment.