Skip to content

Commit

Permalink
fix(arroyo): Tag all consumer metrics by the smallest partition index (
Browse files Browse the repository at this point in the history
…#5208)

* fix(arroyo): Tag all consumer metrics by the smallest partition index

INC-588 INC-581

This should help us identify pod-specific problems, such as one
partition lagging behind another.

Ideally arroyo should do this, but it's easier here because arroyo has
no concept of global tags.

* fix test

* work around mypy

* fix tests

* fix typin
  • Loading branch information
untitaker committed Dec 14, 2023
1 parent 8618c2c commit 83d53c8
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 6 deletions.
1 change: 1 addition & 0 deletions snuba/cli/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ def consumer(
max_insert_batch_size=max_insert_batch_size,
max_insert_batch_time_ms=max_insert_batch_time_ms,
metrics=metrics,
metrics_tags=metrics_tags,
profile_path=profile_path,
slice_id=slice_id,
join_timeout=join_timeout,
Expand Down
1 change: 1 addition & 0 deletions snuba/cli/dlq_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ def handler(signum: int, frame: Any) -> None:
slice_id=instruction.slice_id,
join_timeout=None,
enforce_schema=False,
metrics_tags=metrics_tags,
)

consumer = consumer_builder.build_dlq_consumer(instruction)
Expand Down
6 changes: 5 additions & 1 deletion snuba/consumers/consumer_builder.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import functools
import logging
from dataclasses import dataclass
from typing import Optional
from typing import MutableMapping, Optional

from arroyo.backends.kafka import (
KafkaConsumer,
Expand Down Expand Up @@ -71,6 +71,7 @@ def __init__(
max_insert_batch_size: Optional[int],
max_insert_batch_time_ms: Optional[int],
metrics: MetricsBackend,
metrics_tags: MutableMapping[str, str],
slice_id: Optional[int],
join_timeout: Optional[float],
enforce_schema: bool,
Expand Down Expand Up @@ -136,6 +137,7 @@ def __init__(
self.commit_log_producer = None

self.metrics = metrics
self.metrics_tags = metrics_tags
self.max_batch_size = max_batch_size
self.max_batch_time_ms = max_batch_time_ms
self.max_insert_batch_size = max_insert_batch_size
Expand Down Expand Up @@ -253,6 +255,7 @@ def build_streaming_strategy_factory(
initialize_parallel_transform=setup_sentry,
health_check_file=self.health_check_file,
skip_write=self.__skip_write,
metrics_tags=self.metrics_tags,
)

if self.__profile_path is not None:
Expand Down Expand Up @@ -310,6 +313,7 @@ def build_dlq_strategy_factory(
output_block_size=self.output_block_size,
max_messages_to_process=instruction.max_messages_to_process,
initialize_parallel_transform=setup_sentry,
metrics_tags=self.metrics_tags,
)

return strategy_factory
Expand Down
10 changes: 9 additions & 1 deletion snuba/consumers/strategy_factory.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import abstractmethod
from typing import Callable, Mapping, Optional, Protocol, Union
from typing import Callable, Mapping, MutableMapping, Optional, Protocol, Union

from arroyo.backends.kafka import KafkaPayload
from arroyo.commit import ONCE_PER_SECOND
Expand Down Expand Up @@ -70,6 +70,7 @@ def __init__(
output_block_size: Optional[int],
max_insert_batch_size: Optional[int],
max_insert_batch_time: Optional[float],
metrics_tags: MutableMapping[str, str],
skip_write: bool = False,
# Passed in the case of DLQ consumer which exits after a certain number of messages
# is processed
Expand Down Expand Up @@ -106,6 +107,7 @@ def __init__(
if self.__processes
else None
)
self.__metrics_tags = metrics_tags

def __should_accept(self, message: Message[KafkaPayload]) -> bool:
assert self.__prefilter is not None
Expand All @@ -116,6 +118,12 @@ def create_with_partitions(
commit: Commit,
partitions: Mapping[Partition, int],
) -> ProcessingStrategy[KafkaPayload]:

if partitions:
self.__metrics_tags["min_partition"] = str(min(x.index for x in partitions))
else:
self.__metrics_tags.pop("min_partition", None)

def accumulator(
batch_writer: ProcessedMessageBatchWriter,
message: BaseValue[ProcessedMessage],
Expand Down
1 change: 1 addition & 0 deletions snuba/web/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,7 @@ def commit(
output_block_size=None,
max_insert_batch_size=None,
max_insert_batch_time=None,
metrics_tags={},
).create_with_partitions(commit, {})
strategy.submit(message)
strategy.close()
Expand Down
5 changes: 3 additions & 2 deletions tests/consumers/test_consumer_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
slice_id=None,
join_timeout=5,
enforce_schema=True,
metrics_tags={},
)

optional_consumer_config = resolve_consumer_config(
Expand Down Expand Up @@ -110,6 +111,7 @@
slice_id=None,
join_timeout=5,
enforce_schema=True,
metrics_tags={},
)


Expand Down Expand Up @@ -162,9 +164,8 @@ def test_run_processing_strategy() -> None:
assert get_row_count(get_writable_storage(StorageKey.ERRORS)) == 0

commit = Mock()
partitions = Mock()
strategy_factory = consumer_builder.build_streaming_strategy_factory()
strategy = strategy_factory.create_with_partitions(commit, partitions)
strategy = strategy_factory.create_with_partitions(commit, {})

json_string = json.dumps(get_raw_error_message())

Expand Down
4 changes: 2 additions & 2 deletions tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ def write_step() -> ProcessedMessageBatchWriter:
input_block_size=None,
output_block_size=None,
health_check_file=health_check_file.strpath,
metrics_tags={},
)

commit_function = Mock()
partitions = Mock()
strategy = factory.create_with_partitions(commit_function, partitions)
strategy = factory.create_with_partitions(commit_function, {})

for i in range(3):
strategy.poll()
Expand Down

0 comments on commit 83d53c8

Please sign in to comment.