Skip to content

Commit

Permalink
feat: Multiprocessing experiment (#5177)
Browse files Browse the repository at this point in the history
Try #5170 again with a high join timeout.

This PR is a short term test that should be reverted. We want to check if there's any correlation between join timeout and the consumer issues we saw with arroyo 2.15.0 earlier.
  • Loading branch information
lynnagara committed Dec 8, 2023
1 parent 1950498 commit 6afc407
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 10 deletions.
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pytest-watch==4.2.0
python-dateutil==2.8.2
python-rapidjson==1.8
redis==4.3.4
sentry-arroyo==2.14.25
sentry-arroyo==2.15.0
sentry-kafka-schemas==0.1.37
sentry-redis-tools==0.1.7
sentry-relay==0.8.27
Expand Down
3 changes: 1 addition & 2 deletions snuba/cli/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@
"--output-block-size",
type=int,
)
@click.option("--join-timeout", type=int, help="Join timeout in seconds.", default=5)
@click.option("--join-timeout", type=int, help="Join timeout in seconds.", default=20)
@click.option(
"--enforce-schema",
type=bool,
Expand Down Expand Up @@ -201,7 +201,6 @@ def consumer(
group_instance_id: Optional[str],
skip_write: bool,
) -> None:

setup_logging(log_level)
setup_sentry()

Expand Down
18 changes: 14 additions & 4 deletions snuba/consumers/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
RunTaskInThreads,
RunTaskWithMultiprocessing,
)
from arroyo.processing.strategies.run_task_with_multiprocessing import (
MultiprocessingPool,
)
from arroyo.types import (
BaseValue,
BrokerValue,
Expand Down Expand Up @@ -389,7 +392,6 @@ def build_batch_writer(
commit_log_config: Optional[CommitLogConfig] = None,
slice_id: Optional[int] = None,
) -> Callable[[], ProcessedMessageBatchWriter]:

assert not (replacements_producer is None) ^ (replacements_topic is None)
supports_replacements = replacements_producer is not None

Expand Down Expand Up @@ -800,6 +802,11 @@ def __init__(
replacements,
slice_id,
)
self.__pool = (
MultiprocessingPool(self.__processes, initialize_parallel_transform)
if self.__processes is not None
else None
)

def create_with_partitions(
self,
Expand Down Expand Up @@ -833,18 +840,17 @@ def flush_batch(
Union[FilteredPayload, MultistorageKafkaPayload]
]

if self.__processes is None:
if self.__pool is None:
inner_strategy = RunTask(transform_function, collect)
else:
inner_strategy = RunTaskWithMultiprocessing(
transform_function,
collect,
self.__processes,
max_batch_size=self.__max_batch_size,
max_batch_time=self.__max_batch_time,
pool=self.__pool,
input_block_size=self.__input_block_size,
output_block_size=self.__output_block_size,
initializer=self.__initialize_parallel_transform,
)

return RunTask(
Expand All @@ -854,3 +860,7 @@ def flush_batch(
inner_strategy,
),
)

def shutdown(self) -> None:
if self.__pool:
self.__pool.close()
17 changes: 14 additions & 3 deletions snuba/consumers/strategy_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
)
from arroyo.processing.strategies.commit import CommitOffsets
from arroyo.processing.strategies.healthcheck import Healthcheck
from arroyo.processing.strategies.run_task_with_multiprocessing import (
MultiprocessingPool,
)
from arroyo.types import BaseValue, Commit, FilteredPayload, Message, Partition

from snuba.consumers.consumer import BytesInsertBatch, ProcessedMessageBatchWriter
Expand Down Expand Up @@ -98,6 +101,11 @@ def __init__(
self.__output_block_size = output_block_size
self.__initialize_parallel_transform = initialize_parallel_transform
self.__health_check_file = health_check_file
self.__pool = (
MultiprocessingPool(self.__processes, self.__initialize_parallel_transform)
if self.__processes
else None
)

def __should_accept(self, message: Message[KafkaPayload]) -> bool:
assert self.__prefilter is not None
Expand Down Expand Up @@ -145,18 +153,17 @@ def flush_batch(
transform_function = self.__process_message

strategy: ProcessingStrategy[Union[FilteredPayload, KafkaPayload]]
if self.__processes is None:
if self.__pool is None:
strategy = RunTask(transform_function, collect)
else:
strategy = RunTaskWithMultiprocessing(
transform_function,
collect,
self.__processes,
max_batch_size=self.__max_batch_size,
max_batch_time=self.__max_batch_time,
pool=self.__pool,
input_block_size=self.__input_block_size,
output_block_size=self.__output_block_size,
initializer=self.__initialize_parallel_transform,
)

if self.__prefilter is not None:
Expand All @@ -173,3 +180,7 @@ def flush_batch(
strategy = Healthcheck(self.__health_check_file, strategy)

return strategy

def shutdown(self) -> None:
if self.__pool:
self.__pool.close()

0 comments on commit 6afc407

Please sign in to comment.