diff --git a/snuba/admin/dead_letter_queue/__init__.py b/snuba/admin/dead_letter_queue/__init__.py index 69ede0fabb..2cdc46dd9a 100644 --- a/snuba/admin/dead_letter_queue/__init__.py +++ b/snuba/admin/dead_letter_queue/__init__.py @@ -1,11 +1,11 @@ from __future__ import annotations -from typing import MutableSequence, NamedTuple, Optional, Sequence, TypedDict +from typing import NamedTuple, Optional, Sequence, TypedDict from snuba import settings from snuba.datasets.slicing import is_storage_set_sliced from snuba.datasets.storage import WritableTableStorage -from snuba.datasets.storages.factory import get_all_storage_keys, get_writable_storage +from snuba.datasets.storages.factory import get_writable_storages Topic = TypedDict( "Topic", @@ -59,15 +59,3 @@ def get_slices(storage: WritableTableStorage) -> Sequence[Optional[int]]: return list(range(settings.SLICED_STORAGE_SETS[storage_set_key.value])) else: return [None] - - -def get_writable_storages() -> Sequence[WritableTableStorage]: - writable_storages: MutableSequence[WritableTableStorage] = [] - storage_keys = get_all_storage_keys() - for storage_key in storage_keys: - try: - writable_storages.append(get_writable_storage(storage_key)) - except AssertionError: - pass - - return writable_storages diff --git a/snuba/datasets/storages/factory.py b/snuba/datasets/storages/factory.py index e510fc2333..bcb89e2750 100644 --- a/snuba/datasets/storages/factory.py +++ b/snuba/datasets/storages/factory.py @@ -1,6 +1,7 @@ from __future__ import annotations from glob import glob +from typing import MutableSequence, Sequence import sentry_sdk @@ -82,6 +83,18 @@ def get_writable_storage(storage_key: StorageKey) -> WritableTableStorage: return storage +def get_writable_storages() -> Sequence[WritableTableStorage]: + writable_storages: MutableSequence[WritableTableStorage] = [] + storage_keys = get_all_storage_keys() + for storage_key in storage_keys: + try: + writable_storages.append(get_writable_storage(storage_key)) + except AssertionError: + pass + + return writable_storages + + def get_cdc_storage(storage_key: StorageKey) -> CdcStorage: storage = _storage_factory().get(storage_key) assert isinstance(storage, CdcStorage) diff --git a/tests/consumers/test_schemas.py b/tests/consumers/test_schemas.py index 9f57f5fb44..1d057a8317 100644 --- a/tests/consumers/test_schemas.py +++ b/tests/consumers/test_schemas.py @@ -10,6 +10,7 @@ from snuba.datasets.storages.factory import ( get_writable_storage, get_writable_storage_keys, + get_writable_storages, ) from snuba.processor import MessageProcessor, ReplacementBatch from snuba.replacers.replacer_processor import ( @@ -74,3 +75,30 @@ def test_all_schemas(case: Case) -> None: action_type=action_type, data=data, metadata=replacement_metadata ) ) + + +TEMPORARILY_SKIPPED_TOPICS = [ + "ingest-sessions", + "cdc", + "ingest-replay-events", + "profiles-call-tree", + "processed-profiles", + "generic-events", +] + + +def test_has_kafka_schema() -> None: + """ + Source topics for a writable storage must have schema defined. + Temporarily skipped for a few topics where schemas are in progress. + """ + for storage in get_writable_storages(): + stream_loader = storage.get_table_writer().get_stream_loader() + topic_name = stream_loader.get_default_topic_spec().topic.value + try: + sentry_kafka_schemas.get_codec(topic_name) + except sentry_kafka_schemas.SchemaNotFound: + if topic_name in TEMPORARILY_SKIPPED_TOPICS: + print("Temporarily skipped validation for topic: %s" % topic_name) + else: + raise