Skip to content

Commit

Permalink
feat: A small step towards schema enforcement (#4405)
Browse files Browse the repository at this point in the history
Devserver now enforces the schema for the events topic
  • Loading branch information
lynnagara authored Jun 23, 2023
1 parent 197a772 commit ce84708
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 7 deletions.
9 changes: 9 additions & 0 deletions snuba/cli/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@
type=int,
)
@click.option("--join-timeout", type=int, help="Join timeout in seconds.", default=5)
@click.option(
"--enforce-schema",
type=bool,
is_flag=True,
default=False,
help="Enforce schema on the raw events topic.",
)
@click.option(
"--profile-path", type=click.Path(dir_okay=True, file_okay=False, exists=True)
)
Expand Down Expand Up @@ -144,6 +151,7 @@ def consumer(
input_block_size: Optional[int],
output_block_size: Optional[int],
join_timeout: int = 5,
enforce_schema: bool = False,
log_level: Optional[str] = None,
profile_path: Optional[str] = None,
max_poll_interval_ms: Optional[int] = None,
Expand Down Expand Up @@ -201,6 +209,7 @@ def consumer(
slice_id=slice_id,
join_timeout=join_timeout,
max_poll_interval_ms=max_poll_interval_ms,
enforce_schema=enforce_schema,
)

consumer = consumer_builder.build_base_consumer()
Expand Down
1 change: 1 addition & 0 deletions snuba/cli/devserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def devserver(*, bootstrap: bool, workers: bool) -> None:
"--no-strict-offset-reset",
"--log-level=debug",
"--storage=errors",
"--enforce-schema",
],
),
(
Expand Down
1 change: 1 addition & 0 deletions snuba/cli/dlq_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ def handler(signum: int, frame: Any) -> None:
metrics=metrics,
slice_id=instruction.slice_id,
join_timeout=None,
enforce_schema=False,
)

consumer = consumer_builder.build_dlq_consumer(instruction)
Expand Down
5 changes: 3 additions & 2 deletions snuba/consumers/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,7 @@ def process_message(
processor: MessageProcessor,
consumer_group: str,
snuba_logical_topic: SnubaTopic,
enforce_schema: bool,
message: Message[KafkaPayload],
) -> Union[None, BytesInsertBatch, ReplacementBatch]:
local_metrics = MetricsWrapper(
Expand Down Expand Up @@ -573,6 +574,8 @@ def process_message(
_LAST_INVALID_MESSAGE[snuba_logical_topic.name] = start
sentry_sdk.set_tag("invalid_message_schema", "true")
logger.warning(err, exc_info=True)
if enforce_schema:
raise

# TODO: this is not the most efficient place to emit a metric, but
# as long as should_validate is behind a sample rate it should be
Expand Down Expand Up @@ -604,8 +607,6 @@ def process_message(
value = message.value
raise InvalidMessage(value.partition, value.offset) from err

return None

if isinstance(result, InsertBatch):
return BytesInsertBatch(
[json_row_encoder.encode(row) for row in result.rows],
Expand Down
3 changes: 3 additions & 0 deletions snuba/consumers/consumer_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def __init__(
metrics: MetricsBackend,
slice_id: Optional[int],
join_timeout: Optional[float],
enforce_schema: bool,
profile_path: Optional[str] = None,
max_poll_interval_ms: Optional[int] = None,
) -> None:
Expand All @@ -83,6 +84,7 @@ def __init__(
self.__consumer_config = consumer_config
self.__kafka_params = kafka_params
self.consumer_group = kafka_params.group_id
self.__enforce_schema = enforce_schema

broker_config = build_kafka_consumer_configuration(
self.__consumer_config.raw_topic.broker_config,
Expand Down Expand Up @@ -213,6 +215,7 @@ def build_streaming_strategy_factory(
processor,
self.consumer_group,
logical_topic,
self.__enforce_schema,
),
collector=build_batch_writer(
table_writer,
Expand Down
1 change: 1 addition & 0 deletions snuba/web/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,7 @@ def commit(
stream_loader.get_processor(),
"consumer_grouup",
stream_loader.get_default_topic_spec().topic,
False,
),
build_batch_writer(table_writer, metrics=metrics),
max_batch_size=1,
Expand Down
7 changes: 4 additions & 3 deletions tests/consumers/test_consumer_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from snuba.datasets.storages.storage_key import StorageKey
from snuba.utils.metrics.backends.abstract import MetricsBackend
from snuba.utils.metrics.wrapper import MetricsWrapper
from tests.fixtures import get_raw_event
from tests.fixtures import get_raw_error_message
from tests.test_consumer import get_row_count

test_storage_key = StorageKey("errors")
Expand Down Expand Up @@ -61,6 +61,7 @@
),
slice_id=None,
join_timeout=5,
enforce_schema=True,
)

optional_consumer_config = resolve_consumer_config(
Expand Down Expand Up @@ -104,6 +105,7 @@
),
slice_id=None,
join_timeout=5,
enforce_schema=True,
)


Expand Down Expand Up @@ -160,8 +162,7 @@ def test_run_processing_strategy() -> None:
strategy_factory = consumer_builder.build_streaming_strategy_factory()
strategy = strategy_factory.create_with_partitions(commit, partitions)

raw_message = get_raw_event()
json_string = json.dumps([2, "insert", raw_message, []])
json_string = json.dumps(get_raw_error_message())

message = Message(
BrokerValue(
Expand Down
7 changes: 5 additions & 2 deletions tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,16 @@
from snuba.utils.streams.topics import Topic as SnubaTopic
from tests.assertions import assert_changes
from tests.backends.metrics import TestingMetricsBackend, Timing
from tests.fixtures import get_raw_error_message


def test_streaming_consumer_strategy() -> None:
messages = (
Message(
BrokerValue(
KafkaPayload(None, b"{}", []),
KafkaPayload(
None, json.dumps(get_raw_error_message()).encode("utf-8"), []
),
Partition(Topic("events"), 0),
i,
datetime.now(),
Expand Down Expand Up @@ -72,7 +75,7 @@ def write_step() -> ProcessedMessageBatchWriter:
factory = KafkaConsumerStrategyFactory(
None,
functools.partial(
process_message, processor, "consumer_group", SnubaTopic.EVENTS
process_message, processor, "consumer_group", SnubaTopic.EVENTS, True
),
write_step,
max_batch_size=10,
Expand Down

0 comments on commit ce84708

Please sign in to comment.