From 08b6fb264550a2fd18644bd00cf731a7b5aa3db2 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Mon, 24 Jul 2023 11:35:29 -0700 Subject: [PATCH] ref: Remove manual collection of librdkafka stats (#4542) Arroyo does this automatically now for all consumers since https://github.com/getsentry/arroyo/pull/252. We don't need to duplicate this code in Snuba --- snuba/cli/multistorage_consumer.py | 21 -------------------- snuba/subscriptions/executor_consumer.py | 22 --------------------- snuba/subscriptions/scheduler_consumer.py | 24 ----------------------- 3 files changed, 67 deletions(-) diff --git a/snuba/cli/multistorage_consumer.py b/snuba/cli/multistorage_consumer.py index 52be90cc54..422a6658ca 100644 --- a/snuba/cli/multistorage_consumer.py +++ b/snuba/cli/multistorage_consumer.py @@ -2,7 +2,6 @@ from typing import Any, Optional, Sequence import click -import rapidjson from arroyo import Topic, configure_metrics from arroyo.backends.kafka import ( KafkaConsumer, @@ -31,7 +30,6 @@ ) from snuba.datasets.storages.storage_key import StorageKey from snuba.environment import setup_logging, setup_sentry -from snuba.state import get_config from snuba.utils.metrics.backends.abstract import MetricsBackend from snuba.utils.metrics.wrapper import MetricsWrapper from snuba.utils.streams.metrics_adapter import StreamMetricsAdapter @@ -213,25 +211,6 @@ def multistorage_consumer( "consumer", tags=metrics_tags, ) - # Collect metrics from librdkafka if we have stats_collection_freq_ms set - # for the consumer group, or use the default. - stats_collection_frequency_ms = get_config( - f"stats_collection_freq_ms_{consumer_group}", - get_config("stats_collection_freq_ms", 0), - ) - - if stats_collection_frequency_ms and stats_collection_frequency_ms > 0: - - def stats_callback(stats_json: str) -> None: - stats = rapidjson.loads(stats_json) - metrics.gauge("librdkafka.total_queue_size", stats.get("replyq", 0)) - - configuration.update( - { - "statistics.interval.ms": stats_collection_frequency_ms, - "stats_cb": stats_callback, - } - ) consumer = KafkaConsumer(configuration) diff --git a/snuba/subscriptions/executor_consumer.py b/snuba/subscriptions/executor_consumer.py index 325787d29f..376bfad2c5 100644 --- a/snuba/subscriptions/executor_consumer.py +++ b/snuba/subscriptions/executor_consumer.py @@ -8,7 +8,6 @@ from datetime import datetime from typing import Deque, Mapping, Optional, Sequence, Tuple -import rapidjson from arroyo import Message, Partition, Topic from arroyo.backends.abstract import Producer from arroyo.backends.kafka import KafkaConsumer, KafkaPayload @@ -27,7 +26,6 @@ from snuba.datasets.table_storage import KafkaTopicSpec from snuba.reader import Result from snuba.request import Request -from snuba.state import get_config from snuba.subscriptions.codecs import ( SubscriptionScheduledTaskEncoder, SubscriptionTaskResultEncoder, @@ -136,26 +134,6 @@ def get_topics_for_entity( total_partition_count = get_partition_count(SnubaTopic(physical_scheduled_topic)) - # Collect metrics from librdkafka if we have stats_collection_freq_ms set - # for the consumer group, or use the default. - stats_collection_frequency_ms = get_config( - f"stats_collection_freq_ms_{consumer_group}", - get_config("stats_collection_freq_ms", 0), - ) - - if stats_collection_frequency_ms and stats_collection_frequency_ms > 0: - - def stats_callback(stats_json: str) -> None: - stats = rapidjson.loads(stats_json) - metrics.gauge("librdkafka.total_queue_size", stats.get("replyq", 0)) - - consumer_configuration.update( - { - "statistics.interval.ms": stats_collection_frequency_ms, - "stats_cb": stats_callback, - } - ) - return StreamProcessor( KafkaConsumer(consumer_configuration), Topic(physical_scheduled_topic), diff --git a/snuba/subscriptions/scheduler_consumer.py b/snuba/subscriptions/scheduler_consumer.py index 4f2bbbdf9c..43d08fa7f4 100644 --- a/snuba/subscriptions/scheduler_consumer.py +++ b/snuba/subscriptions/scheduler_consumer.py @@ -2,7 +2,6 @@ from datetime import datetime, timedelta from typing import Callable, Mapping, MutableMapping, NamedTuple, Optional, Sequence -import rapidjson from arroyo.backends.abstract import Consumer, Producer from arroyo.backends.kafka import KafkaConsumer, KafkaPayload from arroyo.backends.kafka.commit import CommitCodec @@ -17,7 +16,6 @@ from snuba.datasets.entities.factory import get_entity from snuba.datasets.table_storage import KafkaTopicSpec from snuba.redis import RedisClientKey, get_redis_client -from snuba.state import get_config from snuba.subscriptions.data import PartitionId from snuba.subscriptions.scheduler import SubscriptionScheduler from snuba.subscriptions.scheduler_processing_strategy import ( @@ -295,28 +293,6 @@ def __build_tick_consumer(self) -> CommitLogTickConsumer: strict_offset_reset=self.__strict_offset_reset, ) - # Collect metrics from librdkafka if we have stats_collection_freq_ms set - # for the consumer group, or use the default. - stats_collection_frequency_ms = get_config( - f"stats_collection_freq_ms_{self.__consumer_group}", - get_config("stats_collection_freq_ms", 0), - ) - - if stats_collection_frequency_ms and stats_collection_frequency_ms > 0: - - def stats_callback(stats_json: str) -> None: - stats = rapidjson.loads(stats_json) - self.__metrics.gauge( - "librdkafka.total_queue_size", stats.get("replyq", 0) - ) - - consumer_configuration.update( - { - "statistics.interval.ms": stats_collection_frequency_ms, - "stats_cb": stats_callback, - } - ) - return CommitLogTickConsumer( KafkaConsumer(consumer_configuration), followed_consumer_group=self.__followed_consumer_group,