-
-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Automatically resize blocks if they get too small #270
Changes from 5 commits
8f63956
be26498
88263e3
3c73f4d
54f1c6c
5cbe431
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -309,6 +309,21 @@ class RunTaskWithMultiprocessing( | |
``arroyo.strategies.run_task_with_multiprocessing.batch.output.overflow`` | ||
metric is incremented. | ||
|
||
:param resize_input_blocks: Experimental feature, whether input blocks | ||
should be dynamically resized if they end up being too small for batches. | ||
This can technically cause unbounded memory consumption and it is | ||
recommended to also configure `max_input_block_size` if this option is | ||
used. | ||
|
||
:param resize_output_blocks: Experimental feature, same as | ||
`resize_input_blocks` but for the output blocks. | ||
|
||
:param max_input_block_size: If automatic resizing is enabled, this sets an | ||
upper limit on how large those blocks can get. | ||
|
||
:param max_output_block_size: Same as `max_input_block_size` but for output | ||
blocks. | ||
|
||
:param initializer: A function to run at the beginning of each subprocess. | ||
|
||
Subprocesses are spawned without any of the state of the parent | ||
|
@@ -413,13 +428,22 @@ def __init__( | |
max_batch_time: float, | ||
input_block_size: int, | ||
output_block_size: int, | ||
resize_input_blocks: bool = False, | ||
resize_output_blocks: bool = False, | ||
max_input_block_size: Optional[int] = None, | ||
max_output_block_size: Optional[int] = None, | ||
initializer: Optional[Callable[[], None]] = None, | ||
) -> None: | ||
self.__transform_function = function | ||
self.__next_step = next_step | ||
self.__max_batch_size = max_batch_size | ||
self.__max_batch_time = max_batch_time | ||
|
||
self.__resize_input_blocks = resize_input_blocks | ||
self.__resize_output_blocks = resize_output_blocks | ||
self.__max_input_block_size = max_input_block_size | ||
self.__max_output_block_size = max_output_block_size | ||
|
||
self.__shared_memory_manager = SharedMemoryManager() | ||
self.__shared_memory_manager.start() | ||
|
||
|
@@ -454,6 +478,8 @@ def __init__( | |
] | ||
], | ||
AsyncResult[ParallelRunTaskResult[TResult]], | ||
bool, # was the input block too small? | ||
bool, # was the output block too small? | ||
] | ||
] = deque() | ||
self.__invalid_messages = InvalidMessageState() | ||
|
@@ -481,7 +507,7 @@ def handle_sigchld(signum: int, frame: Any) -> None: | |
|
||
signal.signal(signal.SIGCHLD, handle_sigchld) | ||
|
||
def __submit_batch(self) -> None: | ||
def __submit_batch(self, input_block_too_small: bool) -> None: | ||
assert self.__batch_builder is not None | ||
batch = self.__batch_builder.build() | ||
logger.debug("Submitting %r to %r...", batch, self.__pool) | ||
|
@@ -492,6 +518,8 @@ def __submit_batch(self) -> None: | |
parallel_run_task_worker_apply, | ||
(self.__transform_function, batch, self.__output_blocks.pop()), | ||
), | ||
input_block_too_small, | ||
False, | ||
) | ||
) | ||
self.__batches_in_progress.increment() | ||
|
@@ -547,7 +575,12 @@ def __check_for_results(self, timeout: Optional[float] = None) -> None: | |
self.__pool_waiting_time = None | ||
|
||
def __check_for_results_impl(self, timeout: Optional[float] = None) -> None: | ||
input_batch, async_result = self.__processes[0] | ||
( | ||
input_batch, | ||
async_result, | ||
input_block_too_small, | ||
output_block_too_small, | ||
) = self.__processes[0] | ||
|
||
# If this call is being made in a context where it is intended to be | ||
# nonblocking, checking if the result is ready (rather than trying to | ||
|
@@ -559,6 +592,15 @@ def __check_for_results_impl(self, timeout: Optional[float] = None) -> None: | |
|
||
result = async_result.get(timeout=timeout) | ||
|
||
self.__metrics.timing( | ||
"arroyo.strategies.run_task_with_multiprocessing.output_batch.size.msg", | ||
len(result.valid_messages_transformed), | ||
) | ||
self.__metrics.timing( | ||
"arroyo.strategies.run_task_with_multiprocessing.output_batch.size.bytes", | ||
result.valid_messages_transformed.get_content_size(), | ||
) | ||
|
||
for idx, message in result.valid_messages_transformed: | ||
if isinstance(message, InvalidMessage): | ||
# For the next invocation of __check_for_results, skip over this message | ||
|
@@ -604,12 +646,54 @@ def __check_for_results_impl(self, timeout: Optional[float] = None) -> None: | |
result.next_index_to_process, | ||
), | ||
), | ||
input_block_too_small, | ||
True, | ||
) | ||
return | ||
|
||
old_input_block = input_batch.block | ||
|
||
if ( | ||
input_block_too_small | ||
and self.__resize_input_blocks | ||
and ( | ||
self.__max_input_block_size is None | ||
or self.__max_input_block_size > old_input_block.size * 2 | ||
) | ||
): | ||
self.__metrics.increment( | ||
"arroyo.strategies.run_task_with_multiprocessing.batch.input.resize" | ||
) | ||
new_input_block = self.__shared_memory_manager.SharedMemory( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Knowing that memory settings for Kubernetes are static (we cannot resize the memory assigned to a pod) and that you cannot exceed the memory allocated (OOMKill), do we have a reason to ever resize our input/output blocks instead of just taking all the available memory at the start ? It seems it would be much easier to just statically create memory blocks that used all the available memory of the pod and not try to change anything. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
there's still a ton of stuff outside of arroyo that takes memory, but even if not I don't think it's a good idea to consume significantly more memory than needed. When we deploy a new service we configure k8s limits based on projected memory usage, and later optionally adjust based on average memory usage. If arroyo defaults to consuming all pod memory, we lose insight into how much memory we actually need and how much cost we could save. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Why? That memory is already allocated and not usable by others anyway. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The goal is to eliminate tuning parameters the user has to tweak to get optimal consumer performance. If we manage to do that, then we can start thinking of removing some of those options (as they are not required and have optimal defaults) and therefore moving parts in that sense. I would be aiming for these defaults for all consumers specifically:
An engineer of the product team should not have to think about how much memory their pod is going to consume and tune Arroyo parameters based off of it. I don't think this is possible at all with a static approach, because it requires the author of the consumer to think about how much memory their pod has (unclear, gets adjusted by ops), how much their regular code consumes per-process (entirely unclear, especially in a shared codebase like sentry where tons of random stuff gets imported at every CLI invocation) and then think about how much of the remainder can be allocated to input/output blocks. If you are suggesting a static approach that is also zero-config, I don't know how that would work. Does it mean that arroyo determines free memory and allocates it evenly divided for input/output blocks? And is it evenly, or do input blocks get more than output blocks? And what does it do on a dev machine where there's no k8s request/limit per-consumer? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think this is possible but it feels like Java/node heap tuning parameters and I would like to avoid that sort of experience as well. |
||
old_input_block.size * 2 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems like it could trigger quite a few resizes depending on the values passed. Is there another way to calculate the sharedmemory size based on prior seen message sizes? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that's a good idea, we can keep track of the batch size and use it as input for reallocating. I'll have to see how to do that.. I don't think I can use our existing batch size metrics because they don't emit the right value when batches are split up due to input/output overflow. |
||
) | ||
old_input_block.unlink() | ||
else: | ||
new_input_block = old_input_block | ||
|
||
old_output_block = result.valid_messages_transformed.block | ||
|
||
if ( | ||
output_block_too_small | ||
and self.__resize_output_blocks | ||
and ( | ||
self.__max_output_block_size is None | ||
or self.__max_output_block_size > old_output_block.size * 2 | ||
) | ||
): | ||
self.__metrics.increment( | ||
"arroyo.strategies.run_task_with_multiprocessing.batch.output.resize" | ||
) | ||
new_output_block = self.__shared_memory_manager.SharedMemory( | ||
old_output_block.size * 2 | ||
) | ||
old_output_block.unlink() | ||
else: | ||
new_output_block = old_output_block | ||
|
||
logger.debug("Completed %r, reclaiming blocks...", input_batch) | ||
self.__input_blocks.append(input_batch.block) | ||
self.__output_blocks.append(result.valid_messages_transformed.block) | ||
self.__input_blocks.append(new_input_block) | ||
self.__output_blocks.append(new_output_block) | ||
self.__batches_in_progress.decrement() | ||
|
||
del self.__processes[0] | ||
|
@@ -621,7 +705,7 @@ def poll(self) -> None: | |
self.__check_for_results(timeout=0) | ||
|
||
if self.__batch_builder is not None and self.__batch_builder.ready(): | ||
self.__submit_batch() | ||
self.__submit_batch(False) | ||
|
||
def __reset_batch_builder(self) -> None: | ||
try: | ||
|
@@ -651,7 +735,7 @@ def submit( | |
self.__metrics.increment( | ||
"arroyo.strategies.run_task_with_multiprocessing.batch.input.overflow" | ||
) | ||
self.__submit_batch() | ||
self.__submit_batch(True) | ||
|
||
# This may raise ``MessageRejected`` (if all of the shared memory | ||
# is in use) and create backpressure. | ||
|
@@ -667,7 +751,7 @@ def close(self) -> None: | |
self.__closed = True | ||
|
||
if self.__batch_builder is not None and len(self.__batch_builder) > 0: | ||
self.__submit_batch() | ||
self.__submit_batch(False) | ||
|
||
def terminate(self) -> None: | ||
self.__closed = True | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With so many extra parameters, this seems like it would be more complicated rather than less to figure out what to pass here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I just want to make this opt-in for now. I think long-term we should give
input_block_size
and default, and enable auto-resizing by default. Then we still have a lot of parameters, but people don't need to set any of them.Right now they need to configure
input_block_size
, and we can't give them a default.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the user knows what the max is supposed to be, what's the rationale for them to start with a smaller number at all, rather than just passing the max into
input_block_size
andoutput_block_size
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I expect most users to not configure a max at all. I just added that because people were concerned about unbounded memory usage, but I would also be fine with not having a max parameter at all. I also believe passing 1GB or 500MB to max_output_block_size would be reasonable, but I don't think such a value would be reasonable for output_block_size.