Skip to content

Commit

Permalink
Revert "Revert "ref: Deprecate TOPIC_PARTITION_COUNTS setting (#5868)""
Browse files Browse the repository at this point in the history
This reverts commit ed0426b.
  • Loading branch information
lynnagara committed May 20, 2024
1 parent ed0426b commit 7569441
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 16 deletions.
18 changes: 12 additions & 6 deletions snuba/datasets/table_storage.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from functools import cached_property
from typing import Any, Mapping, Optional, Sequence

from arroyo.backends.kafka import KafkaPayload
from confluent_kafka.admin import AdminClient, _TopicCollection

from snuba import settings
from snuba.clickhouse.http import InsertStatement, JSONRow
Expand All @@ -22,6 +24,7 @@
from snuba.subscriptions.utils import SchedulingWatermarkMode
from snuba.utils.metrics import MetricsBackend
from snuba.utils.schemas import ReadOnly
from snuba.utils.streams.configuration_builder import get_default_kafka_configuration
from snuba.utils.streams.topics import Topic, get_topic_creation_config
from snuba.writer import BatchWriter

Expand Down Expand Up @@ -57,13 +60,16 @@ def get_physical_topic_name(self, slice_id: Optional[int] = None) -> str:

return physical_topic

@property
@cached_property
def partitions_number(self) -> int:
return settings.TOPIC_PARTITION_COUNTS.get(self.__topic.value, 1)

@property
def replication_factor(self) -> int:
return 1
config = get_default_kafka_configuration(self.__topic, None)
client = AdminClient(config)
topic_name = self.get_physical_topic_name()
return len(
client.describe_topics(_TopicCollection([topic_name]))[topic_name]
.result()
.partitions
)

@property
def topic_creation_config(self) -> Mapping[str, str]:
Expand Down
8 changes: 5 additions & 3 deletions snuba/utils/manage_topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,18 @@
logger = logging.getLogger(__name__)


def create_topics(client: AdminClient, topics: Sequence[Topic]) -> None:
def create_topics(
client: AdminClient, topics: Sequence[Topic], num_partitions: int = 1
) -> None:
topics_to_create = {}

for topic in topics:
topic_spec = KafkaTopicSpec(topic)
logger.debug("Adding topic %s to creation list", topic_spec.topic_name)
topics_to_create[topic_spec.topic_name] = NewTopic(
topic_spec.topic_name,
num_partitions=topic_spec.partitions_number,
replication_factor=topic_spec.replication_factor,
num_partitions=num_partitions,
replication_factor=1,
config=topic_spec.topic_creation_config,
)

Expand Down
15 changes: 15 additions & 0 deletions tests/datasets/test_table_storage.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
from confluent_kafka.admin import AdminClient

from snuba.datasets.storages.factory import get_writable_storage
from snuba.datasets.storages.storage_key import StorageKey
from snuba.datasets.table_storage import KafkaTopicSpec
from snuba.settings import SLICED_KAFKA_TOPIC_MAP
from snuba.utils.manage_topics import create_topics
from snuba.utils.streams.configuration_builder import get_default_kafka_configuration
from snuba.utils.streams.topics import Topic
from snuba.utils.streams.topics import Topic as SnubaTopic


def test_get_physical_topic_name(monkeypatch) -> None: # type: ignore
Expand All @@ -19,3 +26,11 @@ def test_get_physical_topic_name(monkeypatch) -> None: # type: ignore
physical_topic_name = default_topic_spec.get_physical_topic_name(slice_id=2)

assert physical_topic_name == "ingest-replay-events-2"


def test_partitions_number() -> None:
admin_client = AdminClient(get_default_kafka_configuration())
create_topics(admin_client, [SnubaTopic.REPLAYEVENTS])

topic_spec = KafkaTopicSpec(Topic.REPLAYEVENTS)
assert topic_spec.partitions_number == 1
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from uuid import UUID

import pytest
from confluent_kafka.admin import AdminClient

from snuba.datasets.entities.entity_key import EntityKey
from snuba.datasets.entities.factory import get_entity, get_entity_name
Expand All @@ -13,7 +14,10 @@
from snuba.subscriptions.data import PartitionId, SubscriptionData
from snuba.subscriptions.store import RedisSubscriptionDataStore
from snuba.subscriptions.subscription import SubscriptionCreator
from snuba.utils.manage_topics import create_topics
from snuba.utils.metrics.timer import Timer
from snuba.utils.streams.configuration_builder import get_default_kafka_configuration
from snuba.utils.streams.topics import Topic as SnubaTopic

dataset = get_dataset("generic_metrics")
entity = get_entity(EntityKey.GENERIC_METRICS_SETS)
Expand Down Expand Up @@ -50,6 +54,9 @@ def subscription_data_builder() -> SubscriptionData:
@pytest.mark.clickhouse_db
@pytest.mark.redis_db
def test_entity_subscriptions_data() -> None:
admin_client = AdminClient(get_default_kafka_configuration())
create_topics(admin_client, [SnubaTopic.GENERIC_METRICS])

subscription_data = subscription_data_builder()

subscription_identifier = SubscriptionCreator(dataset, entity_key).create(
Expand Down
6 changes: 3 additions & 3 deletions tests/subscriptions/test_partitioner.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import pytest

from snuba import settings
from snuba.datasets.entities.entity_key import EntityKey
from snuba.datasets.entities.factory import get_entity
from snuba.datasets.table_storage import KafkaTopicSpec
Expand Down Expand Up @@ -44,7 +43,8 @@ class TestBuildRequest(BaseSubscriptionTest):
@pytest.mark.parametrize("subscription", TESTS)
@pytest.mark.clickhouse_db
def test(self, subscription: SubscriptionData) -> None:
settings.TOPIC_PARTITION_COUNTS = {"events": 64}
partitioner = TopicSubscriptionDataPartitioner(KafkaTopicSpec(Topic.EVENTS))
kafka_topic_spec = KafkaTopicSpec(Topic.EVENTS)
kafka_topic_spec.partitions_number = 64
partitioner = TopicSubscriptionDataPartitioner(kafka_topic_spec)

assert partitioner.build_partition_id(subscription) == 18
12 changes: 8 additions & 4 deletions tests/subscriptions/test_scheduler_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,15 @@

@pytest.mark.redis_db
def test_scheduler_consumer(tmpdir: LocalPath) -> None:
settings.TOPIC_PARTITION_COUNTS = {"events": 2}
settings.KAFKA_TOPIC_MAP = {
"events": "events-test",
"snuba-commit-log": "snuba-commit-log-test",
}
importlib.reload(scheduler_consumer)

admin_client = AdminClient(get_default_kafka_configuration())
create_topics(admin_client, [SnubaTopic.COMMIT_LOG])
create_topics(admin_client, [SnubaTopic.EVENTS], 2)
create_topics(admin_client, [SnubaTopic.COMMIT_LOG], 1)

metrics_backend = TestingMetricsBackend()
entity_name = "events"
Expand All @@ -52,7 +56,7 @@ def test_scheduler_consumer(tmpdir: LocalPath) -> None:
assert storage is not None
stream_loader = storage.get_table_writer().get_stream_loader()

commit_log_topic = Topic("snuba-commit-log")
commit_log_topic = Topic("snuba-commit-log-test")

mock_scheduler_producer = mock.Mock()

Expand Down Expand Up @@ -138,7 +142,7 @@ def test_scheduler_consumer(tmpdir: LocalPath) -> None:
assert (tmpdir / "health.txt").check()
assert mock_scheduler_producer.produce.call_count == 2

settings.TOPIC_PARTITION_COUNTS = {}
settings.KAFKA_TOPIC_MAP = {}


def test_tick_time_shift() -> None:
Expand Down
9 changes: 9 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import pytest
import simplejson as json
from confluent_kafka.admin import AdminClient
from dateutil.parser import parse as parse_datetime
from sentry_sdk import Client, Hub

Expand All @@ -23,6 +24,9 @@
from snuba.processor import InsertBatch, InsertEvent, ReplacementType
from snuba.redis import RedisClientKey, RedisClientType, get_redis_client
from snuba.subscriptions.store import RedisSubscriptionDataStore
from snuba.utils.manage_topics import create_topics
from snuba.utils.streams.configuration_builder import get_default_kafka_configuration
from snuba.utils.streams.topics import Topic as SnubaTopic
from tests.base import BaseApiTest
from tests.conftest import SnubaSetConfig
from tests.helpers import write_processed_messages
Expand Down Expand Up @@ -2149,6 +2153,9 @@ class TestCreateSubscriptionApi(BaseApiTest):
entity_key = "events"

def test(self) -> None:
admin_client = AdminClient(get_default_kafka_configuration())
create_topics(admin_client, [SnubaTopic.EVENTS])

expected_uuid = uuid.uuid1()

with patch("snuba.subscriptions.subscription.uuid1") as uuid4:
Expand Down Expand Up @@ -2219,6 +2226,8 @@ def test_selected_entity_is_used(self) -> None:
Test that ensures that the passed entity is the selected one, not the dataset's default
entity
"""
admin_client = AdminClient(get_default_kafka_configuration())
create_topics(admin_client, [SnubaTopic.METRICS])

expected_uuid = uuid.uuid1()
entity_key = EntityKey.METRICS_COUNTERS
Expand Down

0 comments on commit 7569441

Please sign in to comment.