Skip to content

Commit

Permalink
feat(schemas): Validate all source topics have schemas defined (#4374)
Browse files Browse the repository at this point in the history
There's a few temporary exclusions for schemas which still need to be created.
  • Loading branch information
lynnagara authored Jun 20, 2023
1 parent 2d2fb92 commit 4a3c547
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 14 deletions.
16 changes: 2 additions & 14 deletions snuba/admin/dead_letter_queue/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from __future__ import annotations

from typing import MutableSequence, NamedTuple, Optional, Sequence, TypedDict
from typing import NamedTuple, Optional, Sequence, TypedDict

from snuba import settings
from snuba.datasets.slicing import is_storage_set_sliced
from snuba.datasets.storage import WritableTableStorage
from snuba.datasets.storages.factory import get_all_storage_keys, get_writable_storage
from snuba.datasets.storages.factory import get_writable_storages

Topic = TypedDict(
"Topic",
Expand Down Expand Up @@ -59,15 +59,3 @@ def get_slices(storage: WritableTableStorage) -> Sequence[Optional[int]]:
return list(range(settings.SLICED_STORAGE_SETS[storage_set_key.value]))
else:
return [None]


def get_writable_storages() -> Sequence[WritableTableStorage]:
writable_storages: MutableSequence[WritableTableStorage] = []
storage_keys = get_all_storage_keys()
for storage_key in storage_keys:
try:
writable_storages.append(get_writable_storage(storage_key))
except AssertionError:
pass

return writable_storages
13 changes: 13 additions & 0 deletions snuba/datasets/storages/factory.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

from glob import glob
from typing import MutableSequence, Sequence

import sentry_sdk

Expand Down Expand Up @@ -82,6 +83,18 @@ def get_writable_storage(storage_key: StorageKey) -> WritableTableStorage:
return storage


def get_writable_storages() -> Sequence[WritableTableStorage]:
writable_storages: MutableSequence[WritableTableStorage] = []
storage_keys = get_all_storage_keys()
for storage_key in storage_keys:
try:
writable_storages.append(get_writable_storage(storage_key))
except AssertionError:
pass

return writable_storages


def get_cdc_storage(storage_key: StorageKey) -> CdcStorage:
storage = _storage_factory().get(storage_key)
assert isinstance(storage, CdcStorage)
Expand Down
28 changes: 28 additions & 0 deletions tests/consumers/test_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from snuba.datasets.storages.factory import (
get_writable_storage,
get_writable_storage_keys,
get_writable_storages,
)
from snuba.processor import MessageProcessor, ReplacementBatch
from snuba.replacers.replacer_processor import (
Expand Down Expand Up @@ -74,3 +75,30 @@ def test_all_schemas(case: Case) -> None:
action_type=action_type, data=data, metadata=replacement_metadata
)
)


TEMPORARILY_SKIPPED_TOPICS = [
"ingest-sessions",
"cdc",
"ingest-replay-events",
"profiles-call-tree",
"processed-profiles",
"generic-events",
]


def test_has_kafka_schema() -> None:
"""
Source topics for a writable storage must have schema defined.
Temporarily skipped for a few topics where schemas are in progress.
"""
for storage in get_writable_storages():
stream_loader = storage.get_table_writer().get_stream_loader()
topic_name = stream_loader.get_default_topic_spec().topic.value
try:
sentry_kafka_schemas.get_codec(topic_name)
except sentry_kafka_schemas.SchemaNotFound:
if topic_name in TEMPORARILY_SKIPPED_TOPICS:
print("Temporarily skipped validation for topic: %s" % topic_name)
else:
raise

0 comments on commit 4a3c547

Please sign in to comment.