From 5cbe4318674da96698cdf7d683dcfd7c9681c94b Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Thu, 31 Aug 2023 13:15:16 +0200 Subject: [PATCH] fold resize flag into input_block_size --- .../run_task_with_multiprocessing.py | 36 ++++++++++--------- arroyo/utils/metric_defs.py | 8 ++--- .../test_run_task_with_multiprocessing.py | 16 +++------ 3 files changed, 28 insertions(+), 32 deletions(-) diff --git a/arroyo/processing/strategies/run_task_with_multiprocessing.py b/arroyo/processing/strategies/run_task_with_multiprocessing.py index 09c78cd6..107b6d83 100644 --- a/arroyo/processing/strategies/run_task_with_multiprocessing.py +++ b/arroyo/processing/strategies/run_task_with_multiprocessing.py @@ -37,6 +37,9 @@ TResult = TypeVar("TResult") TBatchValue = TypeVar("TBatchValue") +DEFAULT_INPUT_BLOCK_SIZE = 16 * 1024 +DEFAULT_OUTPUT_BLOCK_SIZE = 16 * 1024 + LOG_THRESHOLD_TIME = 20 # In seconds @@ -303,20 +306,19 @@ class RunTaskWithMultiprocessing( ``arroyo.strategies.run_task_with_multiprocessing.batch.input.overflow`` metric is emitted. + If the value is set to `None`, the `input_block_size` is automatically + adjusted to adapt to traffic. Keep in mind that this is a rather + experimental feature and less productionized than explicitly setting a + value. + :param output_block_size: Size of the shared memory buffer used to store results. Like with input data, the batch is implicitly broken up on overflow, and ``arroyo.strategies.run_task_with_multiprocessing.batch.output.overflow`` metric is incremented. - :param resize_input_blocks: Experimental feature, whether input blocks - should be dynamically resized if they end up being too small for batches. - This can technically cause unbounded memory consumption and it is - recommended to also configure `max_input_block_size` if this option is - used. - - :param resize_output_blocks: Experimental feature, same as - `resize_input_blocks` but for the output blocks. + Like with `input_block_size`, the value can be set to `None` to enable + automatic resizing. :param max_input_block_size: If automatic resizing is enabled, this sets an upper limit on how large those blocks can get. @@ -426,10 +428,8 @@ def __init__( num_processes: int, max_batch_size: int, max_batch_time: float, - input_block_size: int, - output_block_size: int, - resize_input_blocks: bool = False, - resize_output_blocks: bool = False, + input_block_size: Optional[int] = None, + output_block_size: Optional[int] = None, max_input_block_size: Optional[int] = None, max_output_block_size: Optional[int] = None, initializer: Optional[Callable[[], None]] = None, @@ -439,8 +439,8 @@ def __init__( self.__max_batch_size = max_batch_size self.__max_batch_time = max_batch_time - self.__resize_input_blocks = resize_input_blocks - self.__resize_output_blocks = resize_output_blocks + self.__resize_input_blocks = input_block_size is None + self.__resize_output_blocks = output_block_size is None self.__max_input_block_size = max_input_block_size self.__max_output_block_size = max_output_block_size @@ -454,12 +454,16 @@ def __init__( ) self.__input_blocks = [ - self.__shared_memory_manager.SharedMemory(input_block_size) + self.__shared_memory_manager.SharedMemory( + input_block_size or DEFAULT_INPUT_BLOCK_SIZE + ) for _ in range(num_processes) ] self.__output_blocks = [ - self.__shared_memory_manager.SharedMemory(output_block_size) + self.__shared_memory_manager.SharedMemory( + output_block_size or DEFAULT_OUTPUT_BLOCK_SIZE + ) for _ in range(num_processes) ] diff --git a/arroyo/utils/metric_defs.py b/arroyo/utils/metric_defs.py index 31c0584d..8ef5f5f0 100644 --- a/arroyo/utils/metric_defs.py +++ b/arroyo/utils/metric_defs.py @@ -27,12 +27,12 @@ # fix. "arroyo.strategies.run_task_with_multiprocessing.batch.output.overflow", # Arroyo has decided to re-allocate a block in order to combat input buffer - # overflow. This can be enabled or disabled using `resize_input_blocks` - # setting. + # overflow. This behavior can be disabled by explicitly setting + # `input_block_size` to a not-None value in `RunTaskWithMultiprocessing`. "arroyo.strategies.run_task_with_multiprocessing.batch.input.resize", # Arroyo has decided to re-allocate a block in order to combat output buffer - # overflow. This can be enabled or disabled using `resize_output_blocks` - # setting. + # overflow. This behavior can be disabled by explicitly setting + # `output_block_size` to a not-None value in `RunTaskWithMultiprocessing`. "arroyo.strategies.run_task_with_multiprocessing.batch.output.resize", # How many batches are being processed in parallel by multiprocessing. "arroyo.strategies.run_task_with_multiprocessing.batches_in_progress", diff --git a/tests/processing/strategies/test_run_task_with_multiprocessing.py b/tests/processing/strategies/test_run_task_with_multiprocessing.py index b173a89a..63922a04 100644 --- a/tests/processing/strategies/test_run_task_with_multiprocessing.py +++ b/tests/processing/strategies/test_run_task_with_multiprocessing.py @@ -523,11 +523,9 @@ def test_input_block_resizing_max_size() -> None: num_processes=2, max_batch_size=INPUT_SIZE // 10, max_batch_time=60, - input_block_size=16000, + input_block_size=None, output_block_size=16000, - resize_input_blocks=True, max_input_block_size=16000, - resize_output_blocks=False, ) with pytest.raises(MessageRejected): @@ -553,10 +551,8 @@ def test_input_block_resizing_without_limits() -> None: num_processes=2, max_batch_size=INPUT_SIZE // 10, max_batch_time=60, - input_block_size=16000, + input_block_size=None, output_block_size=16000, - resize_input_blocks=True, - resize_output_blocks=False, ) with pytest.raises(MessageRejected): @@ -587,9 +583,7 @@ def test_output_block_resizing_max_size() -> None: max_batch_size=INPUT_SIZE // 10, max_batch_time=60, input_block_size=INPUT_SIZE, - output_block_size=16000, - resize_input_blocks=False, - resize_output_blocks=True, + output_block_size=None, max_output_block_size=16000, ) @@ -616,9 +610,7 @@ def test_output_block_resizing_without_limits() -> None: max_batch_size=INPUT_SIZE // 10, max_batch_time=60, input_block_size=INPUT_SIZE, - output_block_size=16000, - resize_input_blocks=False, - resize_output_blocks=True, + output_block_size=None, ) for _ in range(INPUT_SIZE // 10):