diff --git a/pebble/pool/base_pool.py b/pebble/pool/base_pool.py index dfa1ec7..ec380c1 100644 --- a/pebble/pool/base_pool.py +++ b/pebble/pool/base_pool.py @@ -16,13 +16,13 @@ import time import logging +import itertools from queue import Queue from enum import IntEnum from threading import RLock from dataclasses import dataclass -from typing import Callable, Optional -from itertools import chain, count, islice +from typing import Any, Callable, Optional from concurrent.futures import Future, TimeoutError from pebble.common import Result, ResultStatus @@ -37,7 +37,7 @@ def __init__(self, max_workers: int, self._context = PoolContext( max_workers, max_tasks, initializer, initargs) self._loops = () - self._task_counter = count() + self._task_counter = itertools.count() def __enter__(self): return self @@ -123,7 +123,7 @@ def __init__(self, max_workers: int, self.task_queue = Queue() self.workers = max_workers - self.task_counter = count() + self.task_counter = itertools.count() self.worker_parameters = Worker(max_tasks, initializer, initargs) @property @@ -213,7 +213,7 @@ def cancel(self) -> bool: class MapResults: def __init__(self, futures: list, timeout: float = None): - self._results = chain.from_iterable( + self._results = itertools.chain.from_iterable( chunk_result(f, timeout) for f in futures) def __iter__(self): @@ -250,20 +250,21 @@ def done_map(_): return map_future -def iter_chunks(chunksize: int, *iterables): +def iter_chunks(iterable: iter, chunksize: int) -> iter: """Iterates over zipped iterables in chunks.""" - iterables = iter(zip(*iterables)) - - while 1: - chunk = tuple(islice(iterables, chunksize)) + try: + yield from itertools.batched(iterable, chunksize) + except AttributeError: # < Python 3.12 + while 1: + chunk = tuple(itertools.islice(iterable, chunksize)) - if not chunk: - return + if not chunk: + return - yield chunk + yield chunk -def chunk_result(future: ProcessFuture, timeout: Optional[float]): +def chunk_result(future: ProcessFuture, timeout: Optional[float]) -> Any: """Returns the results of a processed chunk.""" try: return future.result(timeout=timeout) @@ -271,7 +272,7 @@ def chunk_result(future: ProcessFuture, timeout: Optional[float]): return (error, ) -def run_initializer(initializer: Callable, initargs: list): +def run_initializer(initializer: Callable, initargs: list) -> bool: """Runs the Pool initializer dealing with errors.""" try: initializer(*initargs) diff --git a/pebble/pool/process.py b/pebble/pool/process.py index 7c9b332..f373b83 100644 --- a/pebble/pool/process.py +++ b/pebble/pool/process.py @@ -147,7 +147,7 @@ def map(self, function: Callable, futures = [self.schedule( process_chunk, args=(function, chunk), timeout=timeout) - for chunk in iter_chunks(chunksize, *iterables)] + for chunk in iter_chunks(zip(*iterables), chunksize)] return map_results(ProcessMapFuture(futures), timeout) diff --git a/pebble/pool/thread.py b/pebble/pool/thread.py index fd397a0..32104c3 100644 --- a/pebble/pool/thread.py +++ b/pebble/pool/thread.py @@ -107,7 +107,7 @@ def map(self, function: Callable, *iterables, **kwargs) -> MapFuture: raise ValueError("chunksize must be >= 1") futures = [self.schedule(process_chunk, args=(function, chunk)) - for chunk in iter_chunks(chunksize, *iterables)] + for chunk in iter_chunks(zip(*iterables), chunksize)] return map_results(MapFuture(futures), timeout)