Skip to content

Commit

Permalink
fold resize flag into input_block_size
Browse files Browse the repository at this point in the history
  • Loading branch information
untitaker committed Aug 31, 2023
1 parent 54f1c6c commit 5cbe431
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 32 deletions.
36 changes: 20 additions & 16 deletions arroyo/processing/strategies/run_task_with_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
TResult = TypeVar("TResult")
TBatchValue = TypeVar("TBatchValue")

DEFAULT_INPUT_BLOCK_SIZE = 16 * 1024
DEFAULT_OUTPUT_BLOCK_SIZE = 16 * 1024

LOG_THRESHOLD_TIME = 20 # In seconds


Expand Down Expand Up @@ -303,20 +306,19 @@ class RunTaskWithMultiprocessing(
``arroyo.strategies.run_task_with_multiprocessing.batch.input.overflow``
metric is emitted.
If the value is set to `None`, the `input_block_size` is automatically
adjusted to adapt to traffic. Keep in mind that this is a rather
experimental feature and less productionized than explicitly setting a
value.
:param output_block_size: Size of the shared memory buffer used to store
results. Like with input data, the batch is implicitly broken up on
overflow, and
``arroyo.strategies.run_task_with_multiprocessing.batch.output.overflow``
metric is incremented.
:param resize_input_blocks: Experimental feature, whether input blocks
should be dynamically resized if they end up being too small for batches.
This can technically cause unbounded memory consumption and it is
recommended to also configure `max_input_block_size` if this option is
used.
:param resize_output_blocks: Experimental feature, same as
`resize_input_blocks` but for the output blocks.
Like with `input_block_size`, the value can be set to `None` to enable
automatic resizing.
:param max_input_block_size: If automatic resizing is enabled, this sets an
upper limit on how large those blocks can get.
Expand Down Expand Up @@ -426,10 +428,8 @@ def __init__(
num_processes: int,
max_batch_size: int,
max_batch_time: float,
input_block_size: int,
output_block_size: int,
resize_input_blocks: bool = False,
resize_output_blocks: bool = False,
input_block_size: Optional[int] = None,
output_block_size: Optional[int] = None,
max_input_block_size: Optional[int] = None,
max_output_block_size: Optional[int] = None,
initializer: Optional[Callable[[], None]] = None,
Expand All @@ -439,8 +439,8 @@ def __init__(
self.__max_batch_size = max_batch_size
self.__max_batch_time = max_batch_time

self.__resize_input_blocks = resize_input_blocks
self.__resize_output_blocks = resize_output_blocks
self.__resize_input_blocks = input_block_size is None
self.__resize_output_blocks = output_block_size is None
self.__max_input_block_size = max_input_block_size
self.__max_output_block_size = max_output_block_size

Expand All @@ -454,12 +454,16 @@ def __init__(
)

self.__input_blocks = [
self.__shared_memory_manager.SharedMemory(input_block_size)
self.__shared_memory_manager.SharedMemory(
input_block_size or DEFAULT_INPUT_BLOCK_SIZE
)
for _ in range(num_processes)
]

self.__output_blocks = [
self.__shared_memory_manager.SharedMemory(output_block_size)
self.__shared_memory_manager.SharedMemory(
output_block_size or DEFAULT_OUTPUT_BLOCK_SIZE
)
for _ in range(num_processes)
]

Expand Down
8 changes: 4 additions & 4 deletions arroyo/utils/metric_defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@
# fix.
"arroyo.strategies.run_task_with_multiprocessing.batch.output.overflow",
# Arroyo has decided to re-allocate a block in order to combat input buffer
# overflow. This can be enabled or disabled using `resize_input_blocks`
# setting.
# overflow. This behavior can be disabled by explicitly setting
# `input_block_size` to a not-None value in `RunTaskWithMultiprocessing`.
"arroyo.strategies.run_task_with_multiprocessing.batch.input.resize",
# Arroyo has decided to re-allocate a block in order to combat output buffer
# overflow. This can be enabled or disabled using `resize_output_blocks`
# setting.
# overflow. This behavior can be disabled by explicitly setting
# `output_block_size` to a not-None value in `RunTaskWithMultiprocessing`.
"arroyo.strategies.run_task_with_multiprocessing.batch.output.resize",
# How many batches are being processed in parallel by multiprocessing.
"arroyo.strategies.run_task_with_multiprocessing.batches_in_progress",
Expand Down
16 changes: 4 additions & 12 deletions tests/processing/strategies/test_run_task_with_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,11 +523,9 @@ def test_input_block_resizing_max_size() -> None:
num_processes=2,
max_batch_size=INPUT_SIZE // 10,
max_batch_time=60,
input_block_size=16000,
input_block_size=None,
output_block_size=16000,
resize_input_blocks=True,
max_input_block_size=16000,
resize_output_blocks=False,
)

with pytest.raises(MessageRejected):
Expand All @@ -553,10 +551,8 @@ def test_input_block_resizing_without_limits() -> None:
num_processes=2,
max_batch_size=INPUT_SIZE // 10,
max_batch_time=60,
input_block_size=16000,
input_block_size=None,
output_block_size=16000,
resize_input_blocks=True,
resize_output_blocks=False,
)

with pytest.raises(MessageRejected):
Expand Down Expand Up @@ -587,9 +583,7 @@ def test_output_block_resizing_max_size() -> None:
max_batch_size=INPUT_SIZE // 10,
max_batch_time=60,
input_block_size=INPUT_SIZE,
output_block_size=16000,
resize_input_blocks=False,
resize_output_blocks=True,
output_block_size=None,
max_output_block_size=16000,
)

Expand All @@ -616,9 +610,7 @@ def test_output_block_resizing_without_limits() -> None:
max_batch_size=INPUT_SIZE // 10,
max_batch_time=60,
input_block_size=INPUT_SIZE,
output_block_size=16000,
resize_input_blocks=False,
resize_output_blocks=True,
output_block_size=None,
)

for _ in range(INPUT_SIZE // 10):
Expand Down

0 comments on commit 5cbe431

Please sign in to comment.