Skip to content

Commit

Permalink
ref: Deprecate TOPIC_PARTITION_COUNTS setting (#5868)
Browse files Browse the repository at this point in the history
Snuba should fetch the actual value from the broker on startup rather than rely on a hardcoded TOPIC_PARTITION_COUNTS settings. This will ensure that it always has the correct values and remove the manual process and associated drift in ops that is currently involved in keeping this up to date.

Depends on getsentry/sentry#70478

This is a prerequisite to getsentry/ops#10392 and getsentry/ops#10451 which will enable clean up of topicctl configuration in ops.
  • Loading branch information
lynnagara authored May 20, 2024
1 parent e79b1ee commit 4aedda7
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 16 deletions.
18 changes: 12 additions & 6 deletions snuba/datasets/table_storage.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from functools import cached_property
from typing import Any, Mapping, Optional, Sequence

from arroyo.backends.kafka import KafkaPayload
from confluent_kafka.admin import AdminClient, _TopicCollection

from snuba import settings
from snuba.clickhouse.http import InsertStatement, JSONRow
Expand All @@ -22,6 +24,7 @@
from snuba.subscriptions.utils import SchedulingWatermarkMode
from snuba.utils.metrics import MetricsBackend
from snuba.utils.schemas import ReadOnly
from snuba.utils.streams.configuration_builder import get_default_kafka_configuration
from snuba.utils.streams.topics import Topic, get_topic_creation_config
from snuba.writer import BatchWriter

Expand Down Expand Up @@ -57,13 +60,16 @@ def get_physical_topic_name(self, slice_id: Optional[int] = None) -> str:

return physical_topic

@property
@cached_property
def partitions_number(self) -> int:
return settings.TOPIC_PARTITION_COUNTS.get(self.__topic.value, 1)

@property
def replication_factor(self) -> int:
return 1
config = get_default_kafka_configuration(self.__topic, None)
client = AdminClient(config)
topic_name = self.get_physical_topic_name()
return len(
client.describe_topics(_TopicCollection([topic_name]))[topic_name]
.result()
.partitions
)

@property
def topic_creation_config(self) -> Mapping[str, str]:
Expand Down
8 changes: 5 additions & 3 deletions snuba/utils/manage_topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,18 @@
logger = logging.getLogger(__name__)


def create_topics(client: AdminClient, topics: Sequence[Topic]) -> None:
def create_topics(
client: AdminClient, topics: Sequence[Topic], num_partitions: int = 1
) -> None:
topics_to_create = {}

for topic in topics:
topic_spec = KafkaTopicSpec(topic)
logger.debug("Adding topic %s to creation list", topic_spec.topic_name)
topics_to_create[topic_spec.topic_name] = NewTopic(
topic_spec.topic_name,
num_partitions=topic_spec.partitions_number,
replication_factor=topic_spec.replication_factor,
num_partitions=num_partitions,
replication_factor=1,
config=topic_spec.topic_creation_config,
)

Expand Down
15 changes: 15 additions & 0 deletions tests/datasets/test_table_storage.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
from confluent_kafka.admin import AdminClient

from snuba.datasets.storages.factory import get_writable_storage
from snuba.datasets.storages.storage_key import StorageKey
from snuba.datasets.table_storage import KafkaTopicSpec
from snuba.settings import SLICED_KAFKA_TOPIC_MAP
from snuba.utils.manage_topics import create_topics
from snuba.utils.streams.configuration_builder import get_default_kafka_configuration
from snuba.utils.streams.topics import Topic
from snuba.utils.streams.topics import Topic as SnubaTopic


def test_get_physical_topic_name(monkeypatch) -> None: # type: ignore
Expand All @@ -19,3 +26,11 @@ def test_get_physical_topic_name(monkeypatch) -> None: # type: ignore
physical_topic_name = default_topic_spec.get_physical_topic_name(slice_id=2)

assert physical_topic_name == "ingest-replay-events-2"


def test_partitions_number() -> None:
admin_client = AdminClient(get_default_kafka_configuration())
create_topics(admin_client, [SnubaTopic.REPLAYEVENTS])

topic_spec = KafkaTopicSpec(Topic.REPLAYEVENTS)
assert topic_spec.partitions_number == 1
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from uuid import UUID

import pytest
from confluent_kafka.admin import AdminClient

from snuba.datasets.entities.entity_key import EntityKey
from snuba.datasets.entities.factory import get_entity, get_entity_name
Expand All @@ -13,7 +14,10 @@
from snuba.subscriptions.data import PartitionId, SubscriptionData
from snuba.subscriptions.store import RedisSubscriptionDataStore
from snuba.subscriptions.subscription import SubscriptionCreator
from snuba.utils.manage_topics import create_topics
from snuba.utils.metrics.timer import Timer
from snuba.utils.streams.configuration_builder import get_default_kafka_configuration
from snuba.utils.streams.topics import Topic as SnubaTopic

dataset = get_dataset("generic_metrics")
entity = get_entity(EntityKey.GENERIC_METRICS_SETS)
Expand Down Expand Up @@ -50,6 +54,9 @@ def subscription_data_builder() -> SubscriptionData:
@pytest.mark.clickhouse_db
@pytest.mark.redis_db
def test_entity_subscriptions_data() -> None:
admin_client = AdminClient(get_default_kafka_configuration())
create_topics(admin_client, [SnubaTopic.GENERIC_METRICS])

subscription_data = subscription_data_builder()

subscription_identifier = SubscriptionCreator(dataset, entity_key).create(
Expand Down
6 changes: 3 additions & 3 deletions tests/subscriptions/test_partitioner.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import pytest

from snuba import settings
from snuba.datasets.entities.entity_key import EntityKey
from snuba.datasets.entities.factory import get_entity
from snuba.datasets.table_storage import KafkaTopicSpec
Expand Down Expand Up @@ -44,7 +43,8 @@ class TestBuildRequest(BaseSubscriptionTest):
@pytest.mark.parametrize("subscription", TESTS)
@pytest.mark.clickhouse_db
def test(self, subscription: SubscriptionData) -> None:
settings.TOPIC_PARTITION_COUNTS = {"events": 64}
partitioner = TopicSubscriptionDataPartitioner(KafkaTopicSpec(Topic.EVENTS))
kafka_topic_spec = KafkaTopicSpec(Topic.EVENTS)
kafka_topic_spec.partitions_number = 64
partitioner = TopicSubscriptionDataPartitioner(kafka_topic_spec)

assert partitioner.build_partition_id(subscription) == 18
12 changes: 8 additions & 4 deletions tests/subscriptions/test_scheduler_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,15 @@

@pytest.mark.redis_db
def test_scheduler_consumer(tmpdir: LocalPath) -> None:
settings.TOPIC_PARTITION_COUNTS = {"events": 2}
settings.KAFKA_TOPIC_MAP = {
"events": "events-test",
"snuba-commit-log": "snuba-commit-log-test",
}
importlib.reload(scheduler_consumer)

admin_client = AdminClient(get_default_kafka_configuration())
create_topics(admin_client, [SnubaTopic.COMMIT_LOG])
create_topics(admin_client, [SnubaTopic.EVENTS], 2)
create_topics(admin_client, [SnubaTopic.COMMIT_LOG], 1)

metrics_backend = TestingMetricsBackend()
entity_name = "events"
Expand All @@ -52,7 +56,7 @@ def test_scheduler_consumer(tmpdir: LocalPath) -> None:
assert storage is not None
stream_loader = storage.get_table_writer().get_stream_loader()

commit_log_topic = Topic("snuba-commit-log")
commit_log_topic = Topic("snuba-commit-log-test")

mock_scheduler_producer = mock.Mock()

Expand Down Expand Up @@ -138,7 +142,7 @@ def test_scheduler_consumer(tmpdir: LocalPath) -> None:
assert (tmpdir / "health.txt").check()
assert mock_scheduler_producer.produce.call_count == 2

settings.TOPIC_PARTITION_COUNTS = {}
settings.KAFKA_TOPIC_MAP = {}


def test_tick_time_shift() -> None:
Expand Down
9 changes: 9 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import pytest
import simplejson as json
from confluent_kafka.admin import AdminClient
from dateutil.parser import parse as parse_datetime
from sentry_sdk import Client, Hub

Expand All @@ -23,6 +24,9 @@
from snuba.processor import InsertBatch, InsertEvent, ReplacementType
from snuba.redis import RedisClientKey, RedisClientType, get_redis_client
from snuba.subscriptions.store import RedisSubscriptionDataStore
from snuba.utils.manage_topics import create_topics
from snuba.utils.streams.configuration_builder import get_default_kafka_configuration
from snuba.utils.streams.topics import Topic as SnubaTopic
from tests.base import BaseApiTest
from tests.conftest import SnubaSetConfig
from tests.helpers import write_processed_messages
Expand Down Expand Up @@ -2149,6 +2153,9 @@ class TestCreateSubscriptionApi(BaseApiTest):
entity_key = "events"

def test(self) -> None:
admin_client = AdminClient(get_default_kafka_configuration())
create_topics(admin_client, [SnubaTopic.EVENTS])

expected_uuid = uuid.uuid1()

with patch("snuba.subscriptions.subscription.uuid1") as uuid4:
Expand Down Expand Up @@ -2219,6 +2226,8 @@ def test_selected_entity_is_used(self) -> None:
Test that ensures that the passed entity is the selected one, not the dataset's default
entity
"""
admin_client = AdminClient(get_default_kafka_configuration())
create_topics(admin_client, [SnubaTopic.METRICS])

expected_uuid = uuid.uuid1()
entity_key = EntityKey.METRICS_COUNTERS
Expand Down

0 comments on commit 4aedda7

Please sign in to comment.