Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Automatically resize blocks if they get too small #270

Merged
merged 6 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 99 additions & 11 deletions arroyo/processing/strategies/run_task_with_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
TResult = TypeVar("TResult")
TBatchValue = TypeVar("TBatchValue")

DEFAULT_INPUT_BLOCK_SIZE = 16 * 1024
DEFAULT_OUTPUT_BLOCK_SIZE = 16 * 1024

LOG_THRESHOLD_TIME = 20 # In seconds


Expand Down Expand Up @@ -303,12 +306,26 @@ class RunTaskWithMultiprocessing(
``arroyo.strategies.run_task_with_multiprocessing.batch.input.overflow``
metric is emitted.

If the value is set to `None`, the `input_block_size` is automatically
adjusted to adapt to traffic. Keep in mind that this is a rather
experimental feature and less productionized than explicitly setting a
value.

:param output_block_size: Size of the shared memory buffer used to store
results. Like with input data, the batch is implicitly broken up on
overflow, and
``arroyo.strategies.run_task_with_multiprocessing.batch.output.overflow``
metric is incremented.

Like with `input_block_size`, the value can be set to `None` to enable
automatic resizing.

:param max_input_block_size: If automatic resizing is enabled, this sets an
upper limit on how large those blocks can get.

:param max_output_block_size: Same as `max_input_block_size` but for output
blocks.

:param initializer: A function to run at the beginning of each subprocess.

Subprocesses are spawned without any of the state of the parent
Expand Down Expand Up @@ -411,15 +428,22 @@ def __init__(
num_processes: int,
max_batch_size: int,
max_batch_time: float,
input_block_size: int,
output_block_size: int,
input_block_size: Optional[int] = None,
output_block_size: Optional[int] = None,
max_input_block_size: Optional[int] = None,
max_output_block_size: Optional[int] = None,
initializer: Optional[Callable[[], None]] = None,
) -> None:
self.__transform_function = function
self.__next_step = next_step
self.__max_batch_size = max_batch_size
self.__max_batch_time = max_batch_time

self.__resize_input_blocks = input_block_size is None
self.__resize_output_blocks = output_block_size is None
self.__max_input_block_size = max_input_block_size
self.__max_output_block_size = max_output_block_size

self.__shared_memory_manager = SharedMemoryManager()
self.__shared_memory_manager.start()

Expand All @@ -430,12 +454,16 @@ def __init__(
)

self.__input_blocks = [
self.__shared_memory_manager.SharedMemory(input_block_size)
self.__shared_memory_manager.SharedMemory(
input_block_size or DEFAULT_INPUT_BLOCK_SIZE
)
for _ in range(num_processes)
]

self.__output_blocks = [
self.__shared_memory_manager.SharedMemory(output_block_size)
self.__shared_memory_manager.SharedMemory(
output_block_size or DEFAULT_OUTPUT_BLOCK_SIZE
)
for _ in range(num_processes)
]

Expand All @@ -454,6 +482,8 @@ def __init__(
]
],
AsyncResult[ParallelRunTaskResult[TResult]],
bool, # was the input block too small?
bool, # was the output block too small?
]
] = deque()
self.__invalid_messages = InvalidMessageState()
Expand Down Expand Up @@ -481,7 +511,7 @@ def handle_sigchld(signum: int, frame: Any) -> None:

signal.signal(signal.SIGCHLD, handle_sigchld)

def __submit_batch(self) -> None:
def __submit_batch(self, input_block_too_small: bool) -> None:
assert self.__batch_builder is not None
batch = self.__batch_builder.build()
logger.debug("Submitting %r to %r...", batch, self.__pool)
Expand All @@ -492,6 +522,8 @@ def __submit_batch(self) -> None:
parallel_run_task_worker_apply,
(self.__transform_function, batch, self.__output_blocks.pop()),
),
input_block_too_small,
False,
)
)
self.__batches_in_progress.increment()
Expand Down Expand Up @@ -547,7 +579,12 @@ def __check_for_results(self, timeout: Optional[float] = None) -> None:
self.__pool_waiting_time = None

def __check_for_results_impl(self, timeout: Optional[float] = None) -> None:
input_batch, async_result = self.__processes[0]
(
input_batch,
async_result,
input_block_too_small,
output_block_too_small,
) = self.__processes[0]

# If this call is being made in a context where it is intended to be
# nonblocking, checking if the result is ready (rather than trying to
Expand All @@ -559,6 +596,15 @@ def __check_for_results_impl(self, timeout: Optional[float] = None) -> None:

result = async_result.get(timeout=timeout)

self.__metrics.timing(
"arroyo.strategies.run_task_with_multiprocessing.output_batch.size.msg",
len(result.valid_messages_transformed),
)
self.__metrics.timing(
"arroyo.strategies.run_task_with_multiprocessing.output_batch.size.bytes",
result.valid_messages_transformed.get_content_size(),
)

for idx, message in result.valid_messages_transformed:
if isinstance(message, InvalidMessage):
# For the next invocation of __check_for_results, skip over this message
Expand Down Expand Up @@ -604,12 +650,54 @@ def __check_for_results_impl(self, timeout: Optional[float] = None) -> None:
result.next_index_to_process,
),
),
input_block_too_small,
True,
)
return

old_input_block = input_batch.block

if (
input_block_too_small
and self.__resize_input_blocks
and (
self.__max_input_block_size is None
or self.__max_input_block_size > old_input_block.size * 2
)
):
self.__metrics.increment(
"arroyo.strategies.run_task_with_multiprocessing.batch.input.resize"
)
new_input_block = self.__shared_memory_manager.SharedMemory(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Knowing that memory settings for Kubernetes are static (we cannot resize the memory assigned to a pod) and that you cannot exceed the memory allocated (OOMKill), do we have a reason to ever resize our input/output blocks instead of just taking all the available memory at the start ? It seems it would be much easier to just statically create memory blocks that used all the available memory of the pod and not try to change anything.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have a reason to ever resize our input/output blocks instead of just taking all the available memory at the start

there's still a ton of stuff outside of arroyo that takes memory, but even if not I don't think it's a good idea to consume significantly more memory than needed. When we deploy a new service we configure k8s limits based on projected memory usage, and later optionally adjust based on average memory usage. If arroyo defaults to consuming all pod memory, we lose insight into how much memory we actually need and how much cost we could save.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but even if not I don't think it's a good idea to consume significantly more memory than needed

Why? That memory is already allocated and not usable by others anyway.
If you are concerned of not having visibility on the actual usage, why not having specific metrics for that? It seems an easier, safer system with fewer moving parts and fewer failure modes.

Copy link
Member Author

@untitaker untitaker Jun 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal is to eliminate tuning parameters the user has to tweak to get optimal consumer performance. If we manage to do that, then we can start thinking of removing some of those options (as they are not required and have optimal defaults) and therefore moving parts in that sense. I would be aiming for these defaults for all consumers specifically:

  • auto_resize=True for both input/output block
  • max_input_block_size/max_output_block_size at either None or 1GB each, or some limit that only a really misbehaving consumer would hit
  • input_block_size = output_block_size = 10MB so we can be somewhat sure the block can hold a single message

An engineer of the product team should not have to think about how much memory their pod is going to consume and tune Arroyo parameters based off of it.

I don't think this is possible at all with a static approach, because it requires the author of the consumer to think about how much memory their pod has (unclear, gets adjusted by ops), how much their regular code consumes per-process (entirely unclear, especially in a shared codebase like sentry where tons of random stuff gets imported at every CLI invocation) and then think about how much of the remainder can be allocated to input/output blocks.

If you are suggesting a static approach that is also zero-config, I don't know how that would work. Does it mean that arroyo determines free memory and allocates it evenly divided for input/output blocks? And is it evenly, or do input blocks get more than output blocks? And what does it do on a dev machine where there's no k8s request/limit per-consumer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are concerned of not having visibility on the actual usage, why not having specific metrics for that?

I think this is possible but it feels like Java/node heap tuning parameters and I would like to avoid that sort of experience as well.

old_input_block.size * 2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like it could trigger quite a few resizes depending on the values passed. Is there another way to calculate the sharedmemory size based on prior seen message sizes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's a good idea, we can keep track of the batch size and use it as input for reallocating. I'll have to see how to do that.. I don't think I can use our existing batch size metrics because they don't emit the right value when batches are split up due to input/output overflow.

)
old_input_block.unlink()
else:
new_input_block = old_input_block

old_output_block = result.valid_messages_transformed.block

if (
output_block_too_small
and self.__resize_output_blocks
and (
self.__max_output_block_size is None
or self.__max_output_block_size > old_output_block.size * 2
)
):
self.__metrics.increment(
"arroyo.strategies.run_task_with_multiprocessing.batch.output.resize"
)
new_output_block = self.__shared_memory_manager.SharedMemory(
old_output_block.size * 2
)
old_output_block.unlink()
else:
new_output_block = old_output_block

logger.debug("Completed %r, reclaiming blocks...", input_batch)
self.__input_blocks.append(input_batch.block)
self.__output_blocks.append(result.valid_messages_transformed.block)
self.__input_blocks.append(new_input_block)
self.__output_blocks.append(new_output_block)
self.__batches_in_progress.decrement()

del self.__processes[0]
Expand All @@ -621,7 +709,7 @@ def poll(self) -> None:
self.__check_for_results(timeout=0)

if self.__batch_builder is not None and self.__batch_builder.ready():
self.__submit_batch()
self.__submit_batch(False)

def __reset_batch_builder(self) -> None:
try:
Expand Down Expand Up @@ -651,7 +739,7 @@ def submit(
self.__metrics.increment(
"arroyo.strategies.run_task_with_multiprocessing.batch.input.overflow"
)
self.__submit_batch()
self.__submit_batch(True)

# This may raise ``MessageRejected`` (if all of the shared memory
# is in use) and create backpressure.
Expand All @@ -667,7 +755,7 @@ def close(self) -> None:
self.__closed = True

if self.__batch_builder is not None and len(self.__batch_builder) > 0:
self.__submit_batch()
self.__submit_batch(False)

def terminate(self) -> None:
self.__closed = True
Expand Down
12 changes: 12 additions & 0 deletions arroyo/utils/metric_defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
"arroyo.strategies.run_task_with_multiprocessing.batch.size.msg",
# Number of bytes in a multiprocessing batch
"arroyo.strategies.run_task_with_multiprocessing.batch.size.bytes",
# Number of messages in a multiprocessing batch after the message transformation
"arroyo.strategies.run_task_with_multiprocessing.output_batch.size.msg",
# Number of bytes in a multiprocessing batch after the message transformation
"arroyo.strategies.run_task_with_multiprocessing.output_batch.size.bytes",
# Number of times the consumer is spinning
"arroyo.consumer.run.count",
# How long it took the Reduce step to fill up a batch
Expand All @@ -22,6 +26,14 @@
# This can be devastating for throughput. Increase `output_block_size` to
# fix.
"arroyo.strategies.run_task_with_multiprocessing.batch.output.overflow",
# Arroyo has decided to re-allocate a block in order to combat input buffer
# overflow. This behavior can be disabled by explicitly setting
# `input_block_size` to a not-None value in `RunTaskWithMultiprocessing`.
"arroyo.strategies.run_task_with_multiprocessing.batch.input.resize",
# Arroyo has decided to re-allocate a block in order to combat output buffer
# overflow. This behavior can be disabled by explicitly setting
# `output_block_size` to a not-None value in `RunTaskWithMultiprocessing`.
"arroyo.strategies.run_task_with_multiprocessing.batch.output.resize",
# How many batches are being processed in parallel by multiprocessing.
"arroyo.strategies.run_task_with_multiprocessing.batches_in_progress",
# Counter. A subprocess by multiprocessing unexpectedly died.
Expand Down
Loading
Loading