From bd63bb0d62b7693d0089bfb5de1f4c574d4bbeda Mon Sep 17 00:00:00 2001 From: Lyn Date: Fri, 16 Jun 2023 18:28:38 -0700 Subject: [PATCH 1/2] feat(schemas): Validate all source topics have schemas defined There's a few temporary exclusions for schemas which still need to be created. --- snuba/admin/dead_letter_queue/__init__.py | 16 ++---------- snuba/datasets/storages/factory.py | 13 ++++++++++ tests/consumers/test_kafka_schema.py | 31 +++++++++++++++++++++++ 3 files changed, 46 insertions(+), 14 deletions(-) create mode 100644 tests/consumers/test_kafka_schema.py diff --git a/snuba/admin/dead_letter_queue/__init__.py b/snuba/admin/dead_letter_queue/__init__.py index 69ede0fabb..2cdc46dd9a 100644 --- a/snuba/admin/dead_letter_queue/__init__.py +++ b/snuba/admin/dead_letter_queue/__init__.py @@ -1,11 +1,11 @@ from __future__ import annotations -from typing import MutableSequence, NamedTuple, Optional, Sequence, TypedDict +from typing import NamedTuple, Optional, Sequence, TypedDict from snuba import settings from snuba.datasets.slicing import is_storage_set_sliced from snuba.datasets.storage import WritableTableStorage -from snuba.datasets.storages.factory import get_all_storage_keys, get_writable_storage +from snuba.datasets.storages.factory import get_writable_storages Topic = TypedDict( "Topic", @@ -59,15 +59,3 @@ def get_slices(storage: WritableTableStorage) -> Sequence[Optional[int]]: return list(range(settings.SLICED_STORAGE_SETS[storage_set_key.value])) else: return [None] - - -def get_writable_storages() -> Sequence[WritableTableStorage]: - writable_storages: MutableSequence[WritableTableStorage] = [] - storage_keys = get_all_storage_keys() - for storage_key in storage_keys: - try: - writable_storages.append(get_writable_storage(storage_key)) - except AssertionError: - pass - - return writable_storages diff --git a/snuba/datasets/storages/factory.py b/snuba/datasets/storages/factory.py index e510fc2333..bcb89e2750 100644 --- a/snuba/datasets/storages/factory.py +++ b/snuba/datasets/storages/factory.py @@ -1,6 +1,7 @@ from __future__ import annotations from glob import glob +from typing import MutableSequence, Sequence import sentry_sdk @@ -82,6 +83,18 @@ def get_writable_storage(storage_key: StorageKey) -> WritableTableStorage: return storage +def get_writable_storages() -> Sequence[WritableTableStorage]: + writable_storages: MutableSequence[WritableTableStorage] = [] + storage_keys = get_all_storage_keys() + for storage_key in storage_keys: + try: + writable_storages.append(get_writable_storage(storage_key)) + except AssertionError: + pass + + return writable_storages + + def get_cdc_storage(storage_key: StorageKey) -> CdcStorage: storage = _storage_factory().get(storage_key) assert isinstance(storage, CdcStorage) diff --git a/tests/consumers/test_kafka_schema.py b/tests/consumers/test_kafka_schema.py new file mode 100644 index 0000000000..c9033e442e --- /dev/null +++ b/tests/consumers/test_kafka_schema.py @@ -0,0 +1,31 @@ +import sentry_kafka_schemas + +from snuba.datasets.storages.factory import get_writable_storages + +TEMPORARILY_SKIPPED_TOPICS = [ + "ingest-sessions", + "cdc", + "ingest-replay-events", + "profiles-call-tree", + "processed-profiles", + "generic-events", +] + + +def test_has_kafka_schema() -> None: + """ + Source topics for a writable storage must have schema defined. + Temporarily skipped for a few topics where schemas are in progress. + """ + for storage in get_writable_storages(): + stream_loader = storage.get_table_writer().get_stream_loader() + topic_name = stream_loader.get_default_topic_spec().topic.value + try: + codec = sentry_kafka_schemas.get_codec(topic_name) + except sentry_kafka_schemas.SchemaNotFound: + if topic_name in TEMPORARILY_SKIPPED_TOPICS: + print("Temporarily skipped validation for topic: %s" % topic_name) + else: + raise + + assert codec is not None From 4f13a3f0722a120014929861bf228a28b881b7fb Mon Sep 17 00:00:00 2001 From: Lyn Date: Fri, 16 Jun 2023 18:32:24 -0700 Subject: [PATCH 2/2] small refactor --- tests/consumers/test_kafka_schema.py | 31 ---------------------------- tests/consumers/test_schemas.py | 28 +++++++++++++++++++++++++ 2 files changed, 28 insertions(+), 31 deletions(-) delete mode 100644 tests/consumers/test_kafka_schema.py diff --git a/tests/consumers/test_kafka_schema.py b/tests/consumers/test_kafka_schema.py deleted file mode 100644 index c9033e442e..0000000000 --- a/tests/consumers/test_kafka_schema.py +++ /dev/null @@ -1,31 +0,0 @@ -import sentry_kafka_schemas - -from snuba.datasets.storages.factory import get_writable_storages - -TEMPORARILY_SKIPPED_TOPICS = [ - "ingest-sessions", - "cdc", - "ingest-replay-events", - "profiles-call-tree", - "processed-profiles", - "generic-events", -] - - -def test_has_kafka_schema() -> None: - """ - Source topics for a writable storage must have schema defined. - Temporarily skipped for a few topics where schemas are in progress. - """ - for storage in get_writable_storages(): - stream_loader = storage.get_table_writer().get_stream_loader() - topic_name = stream_loader.get_default_topic_spec().topic.value - try: - codec = sentry_kafka_schemas.get_codec(topic_name) - except sentry_kafka_schemas.SchemaNotFound: - if topic_name in TEMPORARILY_SKIPPED_TOPICS: - print("Temporarily skipped validation for topic: %s" % topic_name) - else: - raise - - assert codec is not None diff --git a/tests/consumers/test_schemas.py b/tests/consumers/test_schemas.py index 9f57f5fb44..1d057a8317 100644 --- a/tests/consumers/test_schemas.py +++ b/tests/consumers/test_schemas.py @@ -10,6 +10,7 @@ from snuba.datasets.storages.factory import ( get_writable_storage, get_writable_storage_keys, + get_writable_storages, ) from snuba.processor import MessageProcessor, ReplacementBatch from snuba.replacers.replacer_processor import ( @@ -74,3 +75,30 @@ def test_all_schemas(case: Case) -> None: action_type=action_type, data=data, metadata=replacement_metadata ) ) + + +TEMPORARILY_SKIPPED_TOPICS = [ + "ingest-sessions", + "cdc", + "ingest-replay-events", + "profiles-call-tree", + "processed-profiles", + "generic-events", +] + + +def test_has_kafka_schema() -> None: + """ + Source topics for a writable storage must have schema defined. + Temporarily skipped for a few topics where schemas are in progress. + """ + for storage in get_writable_storages(): + stream_loader = storage.get_table_writer().get_stream_loader() + topic_name = stream_loader.get_default_topic_spec().topic.value + try: + sentry_kafka_schemas.get_codec(topic_name) + except sentry_kafka_schemas.SchemaNotFound: + if topic_name in TEMPORARILY_SKIPPED_TOPICS: + print("Temporarily skipped validation for topic: %s" % topic_name) + else: + raise