Skip to content

Commit

Permalink
Revert "feat: Multiprocessing experiment (#5177)"
Browse files Browse the repository at this point in the history
This reverts commit 6afc407.

Co-authored-by: lynnagara <1779792+lynnagara@users.noreply.github.com>
  • Loading branch information
getsentry-bot and lynnagara committed Dec 8, 2023
1 parent 6afc407 commit cc3bca3
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 30 deletions.
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pytest-watch==4.2.0
python-dateutil==2.8.2
python-rapidjson==1.8
redis==4.3.4
sentry-arroyo==2.15.0
sentry-arroyo==2.14.25
sentry-kafka-schemas==0.1.37
sentry-redis-tools==0.1.7
sentry-relay==0.8.27
Expand Down
3 changes: 2 additions & 1 deletion snuba/cli/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@
"--output-block-size",
type=int,
)
@click.option("--join-timeout", type=int, help="Join timeout in seconds.", default=20)
@click.option("--join-timeout", type=int, help="Join timeout in seconds.", default=5)
@click.option(
"--enforce-schema",
type=bool,
Expand Down Expand Up @@ -201,6 +201,7 @@ def consumer(
group_instance_id: Optional[str],
skip_write: bool,
) -> None:

setup_logging(log_level)
setup_sentry()

Expand Down
18 changes: 4 additions & 14 deletions snuba/consumers/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@
RunTaskInThreads,
RunTaskWithMultiprocessing,
)
from arroyo.processing.strategies.run_task_with_multiprocessing import (
MultiprocessingPool,
)
from arroyo.types import (
BaseValue,
BrokerValue,
Expand Down Expand Up @@ -392,6 +389,7 @@ def build_batch_writer(
commit_log_config: Optional[CommitLogConfig] = None,
slice_id: Optional[int] = None,
) -> Callable[[], ProcessedMessageBatchWriter]:

assert not (replacements_producer is None) ^ (replacements_topic is None)
supports_replacements = replacements_producer is not None

Expand Down Expand Up @@ -802,11 +800,6 @@ def __init__(
replacements,
slice_id,
)
self.__pool = (
MultiprocessingPool(self.__processes, initialize_parallel_transform)
if self.__processes is not None
else None
)

def create_with_partitions(
self,
Expand Down Expand Up @@ -840,17 +833,18 @@ def flush_batch(
Union[FilteredPayload, MultistorageKafkaPayload]
]

if self.__pool is None:
if self.__processes is None:
inner_strategy = RunTask(transform_function, collect)
else:
inner_strategy = RunTaskWithMultiprocessing(
transform_function,
collect,
self.__processes,
max_batch_size=self.__max_batch_size,
max_batch_time=self.__max_batch_time,
pool=self.__pool,
input_block_size=self.__input_block_size,
output_block_size=self.__output_block_size,
initializer=self.__initialize_parallel_transform,
)

return RunTask(
Expand All @@ -860,7 +854,3 @@ def flush_batch(
inner_strategy,
),
)

def shutdown(self) -> None:
if self.__pool:
self.__pool.close()
17 changes: 3 additions & 14 deletions snuba/consumers/strategy_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@
)
from arroyo.processing.strategies.commit import CommitOffsets
from arroyo.processing.strategies.healthcheck import Healthcheck
from arroyo.processing.strategies.run_task_with_multiprocessing import (
MultiprocessingPool,
)
from arroyo.types import BaseValue, Commit, FilteredPayload, Message, Partition

from snuba.consumers.consumer import BytesInsertBatch, ProcessedMessageBatchWriter
Expand Down Expand Up @@ -101,11 +98,6 @@ def __init__(
self.__output_block_size = output_block_size
self.__initialize_parallel_transform = initialize_parallel_transform
self.__health_check_file = health_check_file
self.__pool = (
MultiprocessingPool(self.__processes, self.__initialize_parallel_transform)
if self.__processes
else None
)

def __should_accept(self, message: Message[KafkaPayload]) -> bool:
assert self.__prefilter is not None
Expand Down Expand Up @@ -153,17 +145,18 @@ def flush_batch(
transform_function = self.__process_message

strategy: ProcessingStrategy[Union[FilteredPayload, KafkaPayload]]
if self.__pool is None:
if self.__processes is None:
strategy = RunTask(transform_function, collect)
else:
strategy = RunTaskWithMultiprocessing(
transform_function,
collect,
self.__processes,
max_batch_size=self.__max_batch_size,
max_batch_time=self.__max_batch_time,
pool=self.__pool,
input_block_size=self.__input_block_size,
output_block_size=self.__output_block_size,
initializer=self.__initialize_parallel_transform,
)

if self.__prefilter is not None:
Expand All @@ -180,7 +173,3 @@ def flush_batch(
strategy = Healthcheck(self.__health_check_file, strategy)

return strategy

def shutdown(self) -> None:
if self.__pool:
self.__pool.close()

0 comments on commit cc3bca3

Please sign in to comment.