Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Arroyo 2.15.0 and reusable multiprocessing pools #5170

Merged
merged 5 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pytest-watch==4.2.0
python-dateutil==2.8.2
python-rapidjson==1.8
redis==4.3.4
sentry-arroyo==2.14.25
sentry-arroyo==2.15.0
sentry-kafka-schemas==0.1.37
sentry-redis-tools==0.1.7
sentry-relay==0.8.27
Expand Down
7 changes: 4 additions & 3 deletions snuba/consumers/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
from snuba.clickhouse.http import JSONRow, JSONRowEncoder, ValuesRowEncoder
from snuba.consumers.schemas import _NOOP_CODEC, get_json_codec
from snuba.consumers.types import KafkaMessageMetadata
from snuba.consumers.utils import get_reusable_multiprocessing_pool
from snuba.datasets.storage import WritableTableStorage
from snuba.datasets.storages.factory import get_writable_storage
from snuba.datasets.storages.storage_key import StorageKey
Expand Down Expand Up @@ -389,7 +390,6 @@ def build_batch_writer(
commit_log_config: Optional[CommitLogConfig] = None,
slice_id: Optional[int] = None,
) -> Callable[[], ProcessedMessageBatchWriter]:

assert not (replacements_producer is None) ^ (replacements_topic is None)
supports_replacements = replacements_producer is not None

Expand Down Expand Up @@ -839,12 +839,13 @@ def flush_batch(
inner_strategy = RunTaskWithMultiprocessing(
transform_function,
collect,
self.__processes,
max_batch_size=self.__max_batch_size,
max_batch_time=self.__max_batch_time,
pool=get_reusable_multiprocessing_pool(
self.__processes, self.__initialize_parallel_transform
),
input_block_size=self.__input_block_size,
output_block_size=self.__output_block_size,
initializer=self.__initialize_parallel_transform,
)

return RunTask(
Expand Down
6 changes: 4 additions & 2 deletions snuba/consumers/strategy_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from snuba.consumers.consumer import BytesInsertBatch, ProcessedMessageBatchWriter
from snuba.consumers.dlq import ExitAfterNMessages
from snuba.consumers.utils import get_reusable_multiprocessing_pool
from snuba.processor import ReplacementBatch

ProcessedMessage = Union[None, BytesInsertBatch, ReplacementBatch]
Expand Down Expand Up @@ -151,12 +152,13 @@ def flush_batch(
strategy = RunTaskWithMultiprocessing(
transform_function,
collect,
self.__processes,
max_batch_size=self.__max_batch_size,
max_batch_time=self.__max_batch_time,
pool=get_reusable_multiprocessing_pool(
self.__processes, self.__initialize_parallel_transform
),
input_block_size=self.__input_block_size,
output_block_size=self.__output_block_size,
initializer=self.__initialize_parallel_transform,
)

if self.__prefilter is not None:
Expand Down
20 changes: 20 additions & 0 deletions snuba/consumers/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import atexit
import logging
import time
from functools import lru_cache
from typing import Callable, Optional

from arroyo.processing.strategies.run_task_with_multiprocessing import (
MultiprocessingPool,
)
from confluent_kafka import KafkaException
from confluent_kafka.admin import AdminClient

Expand Down Expand Up @@ -45,3 +51,17 @@
total_partition_count = len(topic_metadata.partitions.keys())
logger.info(f"Total of {total_partition_count} partition(s) for {topic.value}.")
return total_partition_count


@lru_cache(maxsize=None)
def get_reusable_multiprocessing_pool(
num_processes: int, initializer: Optional[Callable[[], None]]
) -> MultiprocessingPool:
pool = MultiprocessingPool(num_processes, initializer)

def shutdown() -> None:
logger.info("Shutting down multiprocessing pool")
pool.close()

Check warning on line 64 in snuba/consumers/utils.py

View check run for this annotation

Codecov / codecov/patch

snuba/consumers/utils.py#L63-L64

Added lines #L63 - L64 were not covered by tests

atexit.register(shutdown)
return pool
Loading