Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(schemas): Validate all source topics have schemas defined #4374

Merged
merged 2 commits into from
Jun 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 2 additions & 14 deletions snuba/admin/dead_letter_queue/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from __future__ import annotations

from typing import MutableSequence, NamedTuple, Optional, Sequence, TypedDict
from typing import NamedTuple, Optional, Sequence, TypedDict

from snuba import settings
from snuba.datasets.slicing import is_storage_set_sliced
from snuba.datasets.storage import WritableTableStorage
from snuba.datasets.storages.factory import get_all_storage_keys, get_writable_storage
from snuba.datasets.storages.factory import get_writable_storages

Topic = TypedDict(
"Topic",
Expand Down Expand Up @@ -59,15 +59,3 @@ def get_slices(storage: WritableTableStorage) -> Sequence[Optional[int]]:
return list(range(settings.SLICED_STORAGE_SETS[storage_set_key.value]))
else:
return [None]


def get_writable_storages() -> Sequence[WritableTableStorage]:
writable_storages: MutableSequence[WritableTableStorage] = []
storage_keys = get_all_storage_keys()
for storage_key in storage_keys:
try:
writable_storages.append(get_writable_storage(storage_key))
except AssertionError:
pass

return writable_storages
13 changes: 13 additions & 0 deletions snuba/datasets/storages/factory.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

from glob import glob
from typing import MutableSequence, Sequence

import sentry_sdk

Expand Down Expand Up @@ -82,6 +83,18 @@ def get_writable_storage(storage_key: StorageKey) -> WritableTableStorage:
return storage


def get_writable_storages() -> Sequence[WritableTableStorage]:
writable_storages: MutableSequence[WritableTableStorage] = []
storage_keys = get_all_storage_keys()
for storage_key in storage_keys:
try:
writable_storages.append(get_writable_storage(storage_key))
except AssertionError:
pass

return writable_storages


def get_cdc_storage(storage_key: StorageKey) -> CdcStorage:
storage = _storage_factory().get(storage_key)
assert isinstance(storage, CdcStorage)
Expand Down
28 changes: 28 additions & 0 deletions tests/consumers/test_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from snuba.datasets.storages.factory import (
get_writable_storage,
get_writable_storage_keys,
get_writable_storages,
)
from snuba.processor import MessageProcessor, ReplacementBatch
from snuba.replacers.replacer_processor import (
Expand Down Expand Up @@ -74,3 +75,30 @@ def test_all_schemas(case: Case) -> None:
action_type=action_type, data=data, metadata=replacement_metadata
)
)


TEMPORARILY_SKIPPED_TOPICS = [
"ingest-sessions",
"cdc",
"ingest-replay-events",
"profiles-call-tree",
"processed-profiles",
"generic-events",
]


def test_has_kafka_schema() -> None:
"""
Source topics for a writable storage must have schema defined.
Temporarily skipped for a few topics where schemas are in progress.
"""
for storage in get_writable_storages():
stream_loader = storage.get_table_writer().get_stream_loader()
topic_name = stream_loader.get_default_topic_spec().topic.value
try:
sentry_kafka_schemas.get_codec(topic_name)
except sentry_kafka_schemas.SchemaNotFound:
if topic_name in TEMPORARILY_SKIPPED_TOPICS:
print("Temporarily skipped validation for topic: %s" % topic_name)
else:
raise