From 8f639564f2c21f1e105a1466fd9eaae0579cd525 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Thu, 29 Jun 2023 07:38:18 +0200 Subject: [PATCH 1/6] feat: Automatically resize blocks if they get too small This is a naive implementation that resizes input/output blocks if they become too small. Blocks are reallocated once the overflowing batch is successfully drained. This will make startup of a consumer slower, but over time hopefully all blocks eventually reach a state where they won't overflow anymore. --- .../run_task_with_multiprocessing.py | 98 ++++++++- arroyo/utils/metric_defs.py | 12 ++ .../test_run_task_with_multiprocessing.py | 188 ++++++++++++++++-- 3 files changed, 279 insertions(+), 19 deletions(-) diff --git a/arroyo/processing/strategies/run_task_with_multiprocessing.py b/arroyo/processing/strategies/run_task_with_multiprocessing.py index fe4ef3f8..09c78cd6 100644 --- a/arroyo/processing/strategies/run_task_with_multiprocessing.py +++ b/arroyo/processing/strategies/run_task_with_multiprocessing.py @@ -309,6 +309,21 @@ class RunTaskWithMultiprocessing( ``arroyo.strategies.run_task_with_multiprocessing.batch.output.overflow`` metric is incremented. + :param resize_input_blocks: Experimental feature, whether input blocks + should be dynamically resized if they end up being too small for batches. + This can technically cause unbounded memory consumption and it is + recommended to also configure `max_input_block_size` if this option is + used. + + :param resize_output_blocks: Experimental feature, same as + `resize_input_blocks` but for the output blocks. + + :param max_input_block_size: If automatic resizing is enabled, this sets an + upper limit on how large those blocks can get. + + :param max_output_block_size: Same as `max_input_block_size` but for output + blocks. + :param initializer: A function to run at the beginning of each subprocess. Subprocesses are spawned without any of the state of the parent @@ -413,6 +428,10 @@ def __init__( max_batch_time: float, input_block_size: int, output_block_size: int, + resize_input_blocks: bool = False, + resize_output_blocks: bool = False, + max_input_block_size: Optional[int] = None, + max_output_block_size: Optional[int] = None, initializer: Optional[Callable[[], None]] = None, ) -> None: self.__transform_function = function @@ -420,6 +439,11 @@ def __init__( self.__max_batch_size = max_batch_size self.__max_batch_time = max_batch_time + self.__resize_input_blocks = resize_input_blocks + self.__resize_output_blocks = resize_output_blocks + self.__max_input_block_size = max_input_block_size + self.__max_output_block_size = max_output_block_size + self.__shared_memory_manager = SharedMemoryManager() self.__shared_memory_manager.start() @@ -454,6 +478,8 @@ def __init__( ] ], AsyncResult[ParallelRunTaskResult[TResult]], + bool, # was the input block too small? + bool, # was the output block too small? ] ] = deque() self.__invalid_messages = InvalidMessageState() @@ -481,7 +507,7 @@ def handle_sigchld(signum: int, frame: Any) -> None: signal.signal(signal.SIGCHLD, handle_sigchld) - def __submit_batch(self) -> None: + def __submit_batch(self, input_block_too_small: bool) -> None: assert self.__batch_builder is not None batch = self.__batch_builder.build() logger.debug("Submitting %r to %r...", batch, self.__pool) @@ -492,6 +518,8 @@ def __submit_batch(self) -> None: parallel_run_task_worker_apply, (self.__transform_function, batch, self.__output_blocks.pop()), ), + input_block_too_small, + False, ) ) self.__batches_in_progress.increment() @@ -547,7 +575,12 @@ def __check_for_results(self, timeout: Optional[float] = None) -> None: self.__pool_waiting_time = None def __check_for_results_impl(self, timeout: Optional[float] = None) -> None: - input_batch, async_result = self.__processes[0] + ( + input_batch, + async_result, + input_block_too_small, + output_block_too_small, + ) = self.__processes[0] # If this call is being made in a context where it is intended to be # nonblocking, checking if the result is ready (rather than trying to @@ -559,6 +592,15 @@ def __check_for_results_impl(self, timeout: Optional[float] = None) -> None: result = async_result.get(timeout=timeout) + self.__metrics.timing( + "arroyo.strategies.run_task_with_multiprocessing.output_batch.size.msg", + len(result.valid_messages_transformed), + ) + self.__metrics.timing( + "arroyo.strategies.run_task_with_multiprocessing.output_batch.size.bytes", + result.valid_messages_transformed.get_content_size(), + ) + for idx, message in result.valid_messages_transformed: if isinstance(message, InvalidMessage): # For the next invocation of __check_for_results, skip over this message @@ -604,12 +646,54 @@ def __check_for_results_impl(self, timeout: Optional[float] = None) -> None: result.next_index_to_process, ), ), + input_block_too_small, + True, ) return + old_input_block = input_batch.block + + if ( + input_block_too_small + and self.__resize_input_blocks + and ( + self.__max_input_block_size is None + or self.__max_input_block_size > old_input_block.size * 2 + ) + ): + self.__metrics.increment( + "arroyo.strategies.run_task_with_multiprocessing.batch.input.resize" + ) + new_input_block = self.__shared_memory_manager.SharedMemory( + old_input_block.size * 2 + ) + old_input_block.unlink() + else: + new_input_block = old_input_block + + old_output_block = result.valid_messages_transformed.block + + if ( + output_block_too_small + and self.__resize_output_blocks + and ( + self.__max_output_block_size is None + or self.__max_output_block_size > old_output_block.size * 2 + ) + ): + self.__metrics.increment( + "arroyo.strategies.run_task_with_multiprocessing.batch.output.resize" + ) + new_output_block = self.__shared_memory_manager.SharedMemory( + old_output_block.size * 2 + ) + old_output_block.unlink() + else: + new_output_block = old_output_block + logger.debug("Completed %r, reclaiming blocks...", input_batch) - self.__input_blocks.append(input_batch.block) - self.__output_blocks.append(result.valid_messages_transformed.block) + self.__input_blocks.append(new_input_block) + self.__output_blocks.append(new_output_block) self.__batches_in_progress.decrement() del self.__processes[0] @@ -621,7 +705,7 @@ def poll(self) -> None: self.__check_for_results(timeout=0) if self.__batch_builder is not None and self.__batch_builder.ready(): - self.__submit_batch() + self.__submit_batch(False) def __reset_batch_builder(self) -> None: try: @@ -651,7 +735,7 @@ def submit( self.__metrics.increment( "arroyo.strategies.run_task_with_multiprocessing.batch.input.overflow" ) - self.__submit_batch() + self.__submit_batch(True) # This may raise ``MessageRejected`` (if all of the shared memory # is in use) and create backpressure. @@ -667,7 +751,7 @@ def close(self) -> None: self.__closed = True if self.__batch_builder is not None and len(self.__batch_builder) > 0: - self.__submit_batch() + self.__submit_batch(False) def terminate(self) -> None: self.__closed = True diff --git a/arroyo/utils/metric_defs.py b/arroyo/utils/metric_defs.py index cc7801ef..31c0584d 100644 --- a/arroyo/utils/metric_defs.py +++ b/arroyo/utils/metric_defs.py @@ -5,6 +5,10 @@ "arroyo.strategies.run_task_with_multiprocessing.batch.size.msg", # Number of bytes in a multiprocessing batch "arroyo.strategies.run_task_with_multiprocessing.batch.size.bytes", + # Number of messages in a multiprocessing batch after the message transformation + "arroyo.strategies.run_task_with_multiprocessing.output_batch.size.msg", + # Number of bytes in a multiprocessing batch after the message transformation + "arroyo.strategies.run_task_with_multiprocessing.output_batch.size.bytes", # Number of times the consumer is spinning "arroyo.consumer.run.count", # How long it took the Reduce step to fill up a batch @@ -22,6 +26,14 @@ # This can be devastating for throughput. Increase `output_block_size` to # fix. "arroyo.strategies.run_task_with_multiprocessing.batch.output.overflow", + # Arroyo has decided to re-allocate a block in order to combat input buffer + # overflow. This can be enabled or disabled using `resize_input_blocks` + # setting. + "arroyo.strategies.run_task_with_multiprocessing.batch.input.resize", + # Arroyo has decided to re-allocate a block in order to combat output buffer + # overflow. This can be enabled or disabled using `resize_output_blocks` + # setting. + "arroyo.strategies.run_task_with_multiprocessing.batch.output.resize", # How many batches are being processed in parallel by multiprocessing. "arroyo.strategies.run_task_with_multiprocessing.batches_in_progress", # Counter. A subprocess by multiprocessing unexpectedly died. diff --git a/tests/processing/strategies/test_run_task_with_multiprocessing.py b/tests/processing/strategies/test_run_task_with_multiprocessing.py index 07a6aedf..66e5b4b4 100644 --- a/tests/processing/strategies/test_run_task_with_multiprocessing.py +++ b/tests/processing/strategies/test_run_task_with_multiprocessing.py @@ -222,16 +222,46 @@ def test_parallel_transform_step() -> None: lambda: metrics.calls, [], [ + TimingCall( + name="arroyo.strategies.run_task_with_multiprocessing.output_batch.size.msg", + value=2, + tags=None, + ), + TimingCall( + name="arroyo.strategies.run_task_with_multiprocessing.output_batch.size.bytes", + value=16000, + tags=None, + ), IncrementCall( name="arroyo.strategies.run_task_with_multiprocessing.batch.output.overflow", value=1, tags=None, ), + TimingCall( + name="arroyo.strategies.run_task_with_multiprocessing.output_batch.size.msg", + value=1, + tags=None, + ), + TimingCall( + name="arroyo.strategies.run_task_with_multiprocessing.output_batch.size.bytes", + value=16000, + tags=None, + ), GaugeCall( "arroyo.strategies.run_task_with_multiprocessing.batches_in_progress", 1.0, tags=None, ), + TimingCall( + name="arroyo.strategies.run_task_with_multiprocessing.output_batch.size.msg", + value=1, + tags=None, + ), + TimingCall( + name="arroyo.strategies.run_task_with_multiprocessing.output_batch.size.bytes", + value=4000, + tags=None, + ), GaugeCall( "arroyo.strategies.run_task_with_multiprocessing.batches_in_progress", 0.0, @@ -381,14 +411,15 @@ def test_message_rejected_multiple() -> None: value=0, tags=None, ), - IncrementCall( - name="arroyo.strategies.run_task_with_multiprocessing.batch.backpressure", - value=1, + ] + [ + TimingCall( + name="arroyo.strategies.run_task_with_multiprocessing.output_batch.size.msg", + value=2, tags=None, ), - IncrementCall( - name="arroyo.strategies.run_task_with_multiprocessing.batch.backpressure", - value=1, + TimingCall( + name="arroyo.strategies.run_task_with_multiprocessing.output_batch.size.bytes", + value=0, tags=None, ), IncrementCall( @@ -396,14 +427,15 @@ def test_message_rejected_multiple() -> None: value=1, tags=None, ), - IncrementCall( - name="arroyo.strategies.run_task_with_multiprocessing.batch.backpressure", - value=1, + ] * 5 + [ + TimingCall( + name="arroyo.strategies.run_task_with_multiprocessing.output_batch.size.msg", + value=2, tags=None, ), - IncrementCall( - name="arroyo.strategies.run_task_with_multiprocessing.batch.backpressure", - value=1, + TimingCall( + name="arroyo.strategies.run_task_with_multiprocessing.output_batch.size.bytes", + value=0, tags=None, ), GaugeCall( @@ -475,3 +507,135 @@ def test_regression_join_timeout_many_messages() -> None: assert 2 < time_taken < 4 assert next_step.submit.call_count > 0 + + +def run_multiply_times_two(x: Message[KafkaPayload]) -> KafkaPayload: + return KafkaPayload(None, x.payload.value * 2, []) + + +def test_input_block_resizing_max_size() -> None: + INPUT_SIZE = 36000 + next_step = Mock() + + strategy = RunTaskWithMultiprocessing( + run_multiply_times_two, + next_step, + num_processes=2, + max_batch_size=INPUT_SIZE // 10, + max_batch_time=60, + input_block_size=1, + output_block_size=1, + resize_input_blocks=True, + max_input_block_size=16000, + resize_output_blocks=False, + ) + + with pytest.raises(MessageRejected): + for _ in range(INPUT_SIZE // 10): + strategy.submit(Message(Value(KafkaPayload(None, b"x" * 10, []), {}))) + + strategy.close() + strategy.join(timeout=3) + + assert not any( + x.name == "arroyo.strategies.run_task_with_multiprocessing.batch.input.resize" + for x in TestingMetricsBackend.calls + ) + + +def test_input_block_resizing_without_limits() -> None: + INPUT_SIZE = 36000 + next_step = Mock() + + strategy = RunTaskWithMultiprocessing( + run_multiply_times_two, + next_step, + num_processes=2, + max_batch_size=INPUT_SIZE // 10, + max_batch_time=60, + input_block_size=1, + output_block_size=1, + resize_input_blocks=True, + resize_output_blocks=False, + ) + + with pytest.raises(MessageRejected): + for _ in range(INPUT_SIZE // 10): + strategy.submit(Message(Value(KafkaPayload(None, b"x" * 10, []), {}))) + + strategy.close() + strategy.join(timeout=3) + + assert ( + IncrementCall( + name="arroyo.strategies.run_task_with_multiprocessing.batch.input.resize", + value=1, + tags=None, + ) + in TestingMetricsBackend.calls + ) + + +def test_output_block_resizing_max_size() -> None: + INPUT_SIZE = 72000 + next_step = Mock() + + strategy = RunTaskWithMultiprocessing( + run_multiply_times_two, + next_step, + num_processes=2, + max_batch_size=INPUT_SIZE // 10, + max_batch_time=60, + input_block_size=INPUT_SIZE, + output_block_size=1, + resize_input_blocks=False, + resize_output_blocks=True, + max_output_block_size=16000, + ) + + for _ in range(INPUT_SIZE // 10): + strategy.submit(Message(Value(KafkaPayload(None, b"x" * 10, []), {}))) + + strategy.close() + strategy.join(timeout=3) + + assert not any( + x.name == "arroyo.strategies.run_task_with_multiprocessing.batch.output.resize" + for x in TestingMetricsBackend.calls + ) + + +def test_output_block_resizing_without_limits() -> None: + INPUT_SIZE = 144000 + next_step = Mock() + + strategy = RunTaskWithMultiprocessing( + run_multiply_times_two, + next_step, + num_processes=2, + max_batch_size=INPUT_SIZE // 10, + max_batch_time=60, + input_block_size=INPUT_SIZE, + output_block_size=1, + resize_input_blocks=False, + resize_output_blocks=True, + ) + + for _ in range(INPUT_SIZE // 10): + strategy.submit(Message(Value(KafkaPayload(None, b"x" * 10, []), {}))) + + strategy.close() + strategy.join(timeout=3) + + assert next_step.submit.call_args_list == [ + call(Message(Value(KafkaPayload(None, b"x" * 20, []), {}))), + ] * (INPUT_SIZE // 10) + + assert ( + IncrementCall( + name="arroyo.strategies.run_task_with_multiprocessing.batch.output.resize", + value=1, + tags=None, + ) + in TestingMetricsBackend.calls + ) From be26498925390460e053d3a41e2df4aba8797fb9 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Thu, 29 Jun 2023 08:55:32 +0200 Subject: [PATCH 2/6] try with larger input block size, because linux --- .../strategies/test_run_task_with_multiprocessing.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/processing/strategies/test_run_task_with_multiprocessing.py b/tests/processing/strategies/test_run_task_with_multiprocessing.py index 66e5b4b4..49839883 100644 --- a/tests/processing/strategies/test_run_task_with_multiprocessing.py +++ b/tests/processing/strategies/test_run_task_with_multiprocessing.py @@ -523,8 +523,8 @@ def test_input_block_resizing_max_size() -> None: num_processes=2, max_batch_size=INPUT_SIZE // 10, max_batch_time=60, - input_block_size=1, - output_block_size=1, + input_block_size=20, + output_block_size=20, resize_input_blocks=True, max_input_block_size=16000, resize_output_blocks=False, @@ -553,8 +553,8 @@ def test_input_block_resizing_without_limits() -> None: num_processes=2, max_batch_size=INPUT_SIZE // 10, max_batch_time=60, - input_block_size=1, - output_block_size=1, + input_block_size=20, + output_block_size=20, resize_input_blocks=True, resize_output_blocks=False, ) @@ -587,7 +587,7 @@ def test_output_block_resizing_max_size() -> None: max_batch_size=INPUT_SIZE // 10, max_batch_time=60, input_block_size=INPUT_SIZE, - output_block_size=1, + output_block_size=20, resize_input_blocks=False, resize_output_blocks=True, max_output_block_size=16000, @@ -616,7 +616,7 @@ def test_output_block_resizing_without_limits() -> None: max_batch_size=INPUT_SIZE // 10, max_batch_time=60, input_block_size=INPUT_SIZE, - output_block_size=1, + output_block_size=20, resize_input_blocks=False, resize_output_blocks=True, ) From 88263e3faf41edac920594d7f2c9a360959d3ec2 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Thu, 29 Jun 2023 09:16:10 +0200 Subject: [PATCH 3/6] bump it further --- .../strategies/test_run_task_with_multiprocessing.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/processing/strategies/test_run_task_with_multiprocessing.py b/tests/processing/strategies/test_run_task_with_multiprocessing.py index 49839883..322fc310 100644 --- a/tests/processing/strategies/test_run_task_with_multiprocessing.py +++ b/tests/processing/strategies/test_run_task_with_multiprocessing.py @@ -523,8 +523,8 @@ def test_input_block_resizing_max_size() -> None: num_processes=2, max_batch_size=INPUT_SIZE // 10, max_batch_time=60, - input_block_size=20, - output_block_size=20, + input_block_size=16000, + output_block_size=16000, resize_input_blocks=True, max_input_block_size=16000, resize_output_blocks=False, @@ -553,8 +553,8 @@ def test_input_block_resizing_without_limits() -> None: num_processes=2, max_batch_size=INPUT_SIZE // 10, max_batch_time=60, - input_block_size=20, - output_block_size=20, + input_block_size=16000, + output_block_size=16000, resize_input_blocks=True, resize_output_blocks=False, ) From 3c73f4d581b1482e48b1d7b90599975fdd7923f7 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Thu, 29 Jun 2023 09:17:11 +0200 Subject: [PATCH 4/6] bump another thing --- .../processing/strategies/test_run_task_with_multiprocessing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/processing/strategies/test_run_task_with_multiprocessing.py b/tests/processing/strategies/test_run_task_with_multiprocessing.py index 322fc310..94a51fe8 100644 --- a/tests/processing/strategies/test_run_task_with_multiprocessing.py +++ b/tests/processing/strategies/test_run_task_with_multiprocessing.py @@ -587,7 +587,7 @@ def test_output_block_resizing_max_size() -> None: max_batch_size=INPUT_SIZE // 10, max_batch_time=60, input_block_size=INPUT_SIZE, - output_block_size=20, + output_block_size=16000, resize_input_blocks=False, resize_output_blocks=True, max_output_block_size=16000, From 54f1c6c701eb408b50c53bc3e359e09962a566cf Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Thu, 29 Jun 2023 09:17:50 +0200 Subject: [PATCH 5/6] ho boy --- .../processing/strategies/test_run_task_with_multiprocessing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/processing/strategies/test_run_task_with_multiprocessing.py b/tests/processing/strategies/test_run_task_with_multiprocessing.py index 94a51fe8..b173a89a 100644 --- a/tests/processing/strategies/test_run_task_with_multiprocessing.py +++ b/tests/processing/strategies/test_run_task_with_multiprocessing.py @@ -616,7 +616,7 @@ def test_output_block_resizing_without_limits() -> None: max_batch_size=INPUT_SIZE // 10, max_batch_time=60, input_block_size=INPUT_SIZE, - output_block_size=20, + output_block_size=16000, resize_input_blocks=False, resize_output_blocks=True, ) From 5cbe4318674da96698cdf7d683dcfd7c9681c94b Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Thu, 31 Aug 2023 13:15:16 +0200 Subject: [PATCH 6/6] fold resize flag into input_block_size --- .../run_task_with_multiprocessing.py | 36 ++++++++++--------- arroyo/utils/metric_defs.py | 8 ++--- .../test_run_task_with_multiprocessing.py | 16 +++------ 3 files changed, 28 insertions(+), 32 deletions(-) diff --git a/arroyo/processing/strategies/run_task_with_multiprocessing.py b/arroyo/processing/strategies/run_task_with_multiprocessing.py index 09c78cd6..107b6d83 100644 --- a/arroyo/processing/strategies/run_task_with_multiprocessing.py +++ b/arroyo/processing/strategies/run_task_with_multiprocessing.py @@ -37,6 +37,9 @@ TResult = TypeVar("TResult") TBatchValue = TypeVar("TBatchValue") +DEFAULT_INPUT_BLOCK_SIZE = 16 * 1024 +DEFAULT_OUTPUT_BLOCK_SIZE = 16 * 1024 + LOG_THRESHOLD_TIME = 20 # In seconds @@ -303,20 +306,19 @@ class RunTaskWithMultiprocessing( ``arroyo.strategies.run_task_with_multiprocessing.batch.input.overflow`` metric is emitted. + If the value is set to `None`, the `input_block_size` is automatically + adjusted to adapt to traffic. Keep in mind that this is a rather + experimental feature and less productionized than explicitly setting a + value. + :param output_block_size: Size of the shared memory buffer used to store results. Like with input data, the batch is implicitly broken up on overflow, and ``arroyo.strategies.run_task_with_multiprocessing.batch.output.overflow`` metric is incremented. - :param resize_input_blocks: Experimental feature, whether input blocks - should be dynamically resized if they end up being too small for batches. - This can technically cause unbounded memory consumption and it is - recommended to also configure `max_input_block_size` if this option is - used. - - :param resize_output_blocks: Experimental feature, same as - `resize_input_blocks` but for the output blocks. + Like with `input_block_size`, the value can be set to `None` to enable + automatic resizing. :param max_input_block_size: If automatic resizing is enabled, this sets an upper limit on how large those blocks can get. @@ -426,10 +428,8 @@ def __init__( num_processes: int, max_batch_size: int, max_batch_time: float, - input_block_size: int, - output_block_size: int, - resize_input_blocks: bool = False, - resize_output_blocks: bool = False, + input_block_size: Optional[int] = None, + output_block_size: Optional[int] = None, max_input_block_size: Optional[int] = None, max_output_block_size: Optional[int] = None, initializer: Optional[Callable[[], None]] = None, @@ -439,8 +439,8 @@ def __init__( self.__max_batch_size = max_batch_size self.__max_batch_time = max_batch_time - self.__resize_input_blocks = resize_input_blocks - self.__resize_output_blocks = resize_output_blocks + self.__resize_input_blocks = input_block_size is None + self.__resize_output_blocks = output_block_size is None self.__max_input_block_size = max_input_block_size self.__max_output_block_size = max_output_block_size @@ -454,12 +454,16 @@ def __init__( ) self.__input_blocks = [ - self.__shared_memory_manager.SharedMemory(input_block_size) + self.__shared_memory_manager.SharedMemory( + input_block_size or DEFAULT_INPUT_BLOCK_SIZE + ) for _ in range(num_processes) ] self.__output_blocks = [ - self.__shared_memory_manager.SharedMemory(output_block_size) + self.__shared_memory_manager.SharedMemory( + output_block_size or DEFAULT_OUTPUT_BLOCK_SIZE + ) for _ in range(num_processes) ] diff --git a/arroyo/utils/metric_defs.py b/arroyo/utils/metric_defs.py index 31c0584d..8ef5f5f0 100644 --- a/arroyo/utils/metric_defs.py +++ b/arroyo/utils/metric_defs.py @@ -27,12 +27,12 @@ # fix. "arroyo.strategies.run_task_with_multiprocessing.batch.output.overflow", # Arroyo has decided to re-allocate a block in order to combat input buffer - # overflow. This can be enabled or disabled using `resize_input_blocks` - # setting. + # overflow. This behavior can be disabled by explicitly setting + # `input_block_size` to a not-None value in `RunTaskWithMultiprocessing`. "arroyo.strategies.run_task_with_multiprocessing.batch.input.resize", # Arroyo has decided to re-allocate a block in order to combat output buffer - # overflow. This can be enabled or disabled using `resize_output_blocks` - # setting. + # overflow. This behavior can be disabled by explicitly setting + # `output_block_size` to a not-None value in `RunTaskWithMultiprocessing`. "arroyo.strategies.run_task_with_multiprocessing.batch.output.resize", # How many batches are being processed in parallel by multiprocessing. "arroyo.strategies.run_task_with_multiprocessing.batches_in_progress", diff --git a/tests/processing/strategies/test_run_task_with_multiprocessing.py b/tests/processing/strategies/test_run_task_with_multiprocessing.py index b173a89a..63922a04 100644 --- a/tests/processing/strategies/test_run_task_with_multiprocessing.py +++ b/tests/processing/strategies/test_run_task_with_multiprocessing.py @@ -523,11 +523,9 @@ def test_input_block_resizing_max_size() -> None: num_processes=2, max_batch_size=INPUT_SIZE // 10, max_batch_time=60, - input_block_size=16000, + input_block_size=None, output_block_size=16000, - resize_input_blocks=True, max_input_block_size=16000, - resize_output_blocks=False, ) with pytest.raises(MessageRejected): @@ -553,10 +551,8 @@ def test_input_block_resizing_without_limits() -> None: num_processes=2, max_batch_size=INPUT_SIZE // 10, max_batch_time=60, - input_block_size=16000, + input_block_size=None, output_block_size=16000, - resize_input_blocks=True, - resize_output_blocks=False, ) with pytest.raises(MessageRejected): @@ -587,9 +583,7 @@ def test_output_block_resizing_max_size() -> None: max_batch_size=INPUT_SIZE // 10, max_batch_time=60, input_block_size=INPUT_SIZE, - output_block_size=16000, - resize_input_blocks=False, - resize_output_blocks=True, + output_block_size=None, max_output_block_size=16000, ) @@ -616,9 +610,7 @@ def test_output_block_resizing_without_limits() -> None: max_batch_size=INPUT_SIZE // 10, max_batch_time=60, input_block_size=INPUT_SIZE, - output_block_size=16000, - resize_input_blocks=False, - resize_output_blocks=True, + output_block_size=None, ) for _ in range(INPUT_SIZE // 10):