Skip to content

Commit

Permalink
ref: Remove manual collection of librdkafka stats (#4542)
Browse files Browse the repository at this point in the history
Arroyo does this automatically now for all consumers since
getsentry/arroyo#252.
We don't need to duplicate this code in Snuba
  • Loading branch information
lynnagara authored Jul 24, 2023
1 parent ffcfdb9 commit 08b6fb2
Show file tree
Hide file tree
Showing 3 changed files with 0 additions and 67 deletions.
21 changes: 0 additions & 21 deletions snuba/cli/multistorage_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from typing import Any, Optional, Sequence

import click
import rapidjson
from arroyo import Topic, configure_metrics
from arroyo.backends.kafka import (
KafkaConsumer,
Expand Down Expand Up @@ -31,7 +30,6 @@
)
from snuba.datasets.storages.storage_key import StorageKey
from snuba.environment import setup_logging, setup_sentry
from snuba.state import get_config
from snuba.utils.metrics.backends.abstract import MetricsBackend
from snuba.utils.metrics.wrapper import MetricsWrapper
from snuba.utils.streams.metrics_adapter import StreamMetricsAdapter
Expand Down Expand Up @@ -213,25 +211,6 @@ def multistorage_consumer(
"consumer",
tags=metrics_tags,
)
# Collect metrics from librdkafka if we have stats_collection_freq_ms set
# for the consumer group, or use the default.
stats_collection_frequency_ms = get_config(
f"stats_collection_freq_ms_{consumer_group}",
get_config("stats_collection_freq_ms", 0),
)

if stats_collection_frequency_ms and stats_collection_frequency_ms > 0:

def stats_callback(stats_json: str) -> None:
stats = rapidjson.loads(stats_json)
metrics.gauge("librdkafka.total_queue_size", stats.get("replyq", 0))

configuration.update(
{
"statistics.interval.ms": stats_collection_frequency_ms,
"stats_cb": stats_callback,
}
)

consumer = KafkaConsumer(configuration)

Expand Down
22 changes: 0 additions & 22 deletions snuba/subscriptions/executor_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from datetime import datetime
from typing import Deque, Mapping, Optional, Sequence, Tuple

import rapidjson
from arroyo import Message, Partition, Topic
from arroyo.backends.abstract import Producer
from arroyo.backends.kafka import KafkaConsumer, KafkaPayload
Expand All @@ -27,7 +26,6 @@
from snuba.datasets.table_storage import KafkaTopicSpec
from snuba.reader import Result
from snuba.request import Request
from snuba.state import get_config
from snuba.subscriptions.codecs import (
SubscriptionScheduledTaskEncoder,
SubscriptionTaskResultEncoder,
Expand Down Expand Up @@ -136,26 +134,6 @@ def get_topics_for_entity(

total_partition_count = get_partition_count(SnubaTopic(physical_scheduled_topic))

# Collect metrics from librdkafka if we have stats_collection_freq_ms set
# for the consumer group, or use the default.
stats_collection_frequency_ms = get_config(
f"stats_collection_freq_ms_{consumer_group}",
get_config("stats_collection_freq_ms", 0),
)

if stats_collection_frequency_ms and stats_collection_frequency_ms > 0:

def stats_callback(stats_json: str) -> None:
stats = rapidjson.loads(stats_json)
metrics.gauge("librdkafka.total_queue_size", stats.get("replyq", 0))

consumer_configuration.update(
{
"statistics.interval.ms": stats_collection_frequency_ms,
"stats_cb": stats_callback,
}
)

return StreamProcessor(
KafkaConsumer(consumer_configuration),
Topic(physical_scheduled_topic),
Expand Down
24 changes: 0 additions & 24 deletions snuba/subscriptions/scheduler_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from datetime import datetime, timedelta
from typing import Callable, Mapping, MutableMapping, NamedTuple, Optional, Sequence

import rapidjson
from arroyo.backends.abstract import Consumer, Producer
from arroyo.backends.kafka import KafkaConsumer, KafkaPayload
from arroyo.backends.kafka.commit import CommitCodec
Expand All @@ -17,7 +16,6 @@
from snuba.datasets.entities.factory import get_entity
from snuba.datasets.table_storage import KafkaTopicSpec
from snuba.redis import RedisClientKey, get_redis_client
from snuba.state import get_config
from snuba.subscriptions.data import PartitionId
from snuba.subscriptions.scheduler import SubscriptionScheduler
from snuba.subscriptions.scheduler_processing_strategy import (
Expand Down Expand Up @@ -295,28 +293,6 @@ def __build_tick_consumer(self) -> CommitLogTickConsumer:
strict_offset_reset=self.__strict_offset_reset,
)

# Collect metrics from librdkafka if we have stats_collection_freq_ms set
# for the consumer group, or use the default.
stats_collection_frequency_ms = get_config(
f"stats_collection_freq_ms_{self.__consumer_group}",
get_config("stats_collection_freq_ms", 0),
)

if stats_collection_frequency_ms and stats_collection_frequency_ms > 0:

def stats_callback(stats_json: str) -> None:
stats = rapidjson.loads(stats_json)
self.__metrics.gauge(
"librdkafka.total_queue_size", stats.get("replyq", 0)
)

consumer_configuration.update(
{
"statistics.interval.ms": stats_collection_frequency_ms,
"stats_cb": stats_callback,
}
)

return CommitLogTickConsumer(
KafkaConsumer(consumer_configuration),
followed_consumer_group=self.__followed_consumer_group,
Expand Down

0 comments on commit 08b6fb2

Please sign in to comment.