Skip to content

Commit

Permalink
feat(protobuf) Add support for protobuf message topics (#344)
Browse files Browse the repository at this point in the history
For taskworkers (and likely other topics) in the future, we would like
to put protobuf serialized messages into the topic instead of
json/msgpack. Using protobufs will let us have one message schema for
both kafka, rpc and application logic.

In order to get access to the protobuf generated code, I've needed to
expand the dependencies of this library. I thought this was the simplest
to build and maintain solution.

Our protos package isn't compatible with python 3.9. 3.9 went end of
life mid 2022 and I don't think we have any applications still running
on 3.9
  • Loading branch information
markstory authored Oct 31, 2024
1 parent 262e8da commit 1abd122
Show file tree
Hide file tree
Showing 17 changed files with 201 additions and 108 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ jobs:
strategy:
max-parallel: 4
matrix:
python: [3.9, "3.10", "3.11", "3.12"]
python: ["3.10", "3.11", "3.12"]
timeout-minutes: 10
steps:
- uses: actions/checkout@v2
Expand Down
2 changes: 1 addition & 1 deletion CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@
/examples/monitors-clock-tasks/ @getsentry/crons
/examples/uptime-results/ @getsentry/crons
/examples/uptime-configs/ @getsentry/crons
/examples/taskworker/ @getsentry/hybrid-cloud
/examples/taskworker/ @getsentry/taskworker

# Internal Snuba topics
/topics/snuba-queries.yaml @getsentry/owners-snuba
Expand Down
14 changes: 0 additions & 14 deletions examples/taskworker/1/basic.json

This file was deleted.

2 changes: 2 additions & 0 deletions examples/taskworker/1/basic.protobuf
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@

abc123teststests.do_things"{"args":[],"kwargs":{}}2����
6 changes: 6 additions & 0 deletions python/generate_python_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ def run(target_folder: str = "python/sentry_kafka_schemas/schema_types/") -> Non
version = schema_meta["version"]
schema_data = sentry_kafka_schemas.sentry_kafka_schemas._get_schema(topic_name, version)

if schema_data["type"] == "protobuf":
print(
f"Skipping generating types for protobuf message {schema_data['schema_filepath']}"
)
continue

schema_tmp_typename_base = f"{topic_name.replace('-', '_')}_v{version}"
schema_tmp_module_name = schema_tmp_typename_base.lower()
if schema_tmp_module_name in already_used_filenames:
Expand Down
1 change: 0 additions & 1 deletion python/mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ python_version = 3.11
[mypy-setuptools]
ignore_missing_imports = True


[mypy-rapidjson]
ignore_missing_imports = True

Expand Down
3 changes: 2 additions & 1 deletion python/requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
black==24.3.0
isort==5.13.2
flake8==5.0.4
mypy==0.971
mypy==1.13.0
pytest>=7.2.0
pytest-xdist==3.2.0
fastjsonschema==2.16.3
jsonschema==4.17.3
types-protobuf
1 change: 1 addition & 0 deletions python/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ PyYAML>=5.4,<7.0
typing-extensions>=4.0.0
fastjsonschema>=2.16.2
msgpack>=1.0.4
sentry-protos>=0.1.30
44 changes: 44 additions & 0 deletions python/sentry_kafka_schemas/codecs/protobuf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from typing import TypeVar, cast

from google.protobuf.message import Message as ProtoMessage
from sentry_kafka_schemas.codecs import Codec

T = TypeVar("T", bound=ProtoMessage)


class ProtobufCodec(Codec[T]):
"""
This codec assumes the payload is a protobuf payload.
`resource` should be a module path to the message type
in a protobuf generated module. For us this will most likely
be message in `sentry-protos`.
"""

def __init__(self, resource: str) -> None:
self._resource = resource
self._message_cls = self._import_resource()

def _import_resource(self) -> type[T]:
module_name, class_name = self._resource.rsplit(".", 1)

module = __import__(module_name, {}, {}, [class_name])
class_type = getattr(module, class_name)
return cast(type[T], class_type)

def encode(self, data: T, validate: bool = True) -> bytes:
# There isn't any validation logic as protobuf
# does most of the type validation as messages are constructed.
return data.SerializeToString()

def decode(self, raw_data: bytes, validate: bool = True) -> T:
# There isn't any validation logic as protobuf
# does validation implicitly when deserializing.
instance = self._message_cls()
instance.ParseFromString(raw_data)
return instance

def validate(self, data: T) -> None:
# Protobuf automatically validates instances
# as they are built.
return None
28 changes: 23 additions & 5 deletions python/sentry_kafka_schemas/sentry_kafka_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
from sentry_kafka_schemas.codecs import Codec
from sentry_kafka_schemas.codecs.json import JsonCodec
from sentry_kafka_schemas.codecs.msgpack import MsgpackCodec
from sentry_kafka_schemas.types import Example, Schema
from sentry_kafka_schemas.codecs.protobuf import ProtobufCodec
from sentry_kafka_schemas.types import Example, Schema, is_protobuf_schema
from typing_extensions import NotRequired
from yaml import safe_load

Expand All @@ -32,7 +33,7 @@ class SchemaNotFound(Exception):

class TopicSchema(TypedDict):
version: int
type: Literal["json", "msgpack"]
type: Literal["json", "msgpack", "protobuf"]
compatibility_mode: Literal["none", "backward"]
resource: str
examples: Sequence[str]
Expand Down Expand Up @@ -96,7 +97,7 @@ def _get_schema(topic: str, version: Optional[int] = None) -> Schema:
If a matching schema can't be found, raise SchemaNotFound.
Only JSON schemas are currently supported.
JSON schema and protobufs are supported
"""
topic_data = get_topic(topic)

Expand All @@ -114,6 +115,20 @@ def _get_schema(topic: str, version: Optional[int] = None) -> Schema:
if schema_metadata is None:
raise SchemaNotFound("Invalid version")

if schema_metadata["type"] == "protobuf":
proto_schema: Schema = {
"version": schema_metadata["version"],
"type": schema_metadata["type"],
"compatibility_mode": schema_metadata["compatibility_mode"],
"schema": {
"resource": schema_metadata["resource"],
},
"schema_filepath": schema_metadata["resource"],
"examples": schema_metadata["examples"],
}
return proto_schema

# Json and Msgpack
resource_path = Path.joinpath(_SCHEMAS_PATH, schema_metadata["resource"])
with open(resource_path) as f:
json_schema = rapidjson.load(f)
Expand Down Expand Up @@ -144,10 +159,13 @@ def get_codec(topic: str, version: Optional[int] = None) -> Codec[Any]:
raise

rv: Codec[Any]
schema_data = schema["schema"]
if schema["type"] == "json":
rv = JsonCodec(json_schema=schema["schema"])
rv = JsonCodec(json_schema=schema_data)
elif schema["type"] == "msgpack":
rv = MsgpackCodec(json_schema=schema["schema"])
rv = MsgpackCodec(json_schema=schema_data)
elif schema["type"] == "protobuf" and is_protobuf_schema(schema_data):
rv = ProtobufCodec(resource=schema_data["resource"])
else:
raise ValueError(schema["type"])
__TOPIC_TO_CODEC[cache_key] = rv
Expand Down
24 changes: 20 additions & 4 deletions python/sentry_kafka_schemas/types.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import dataclasses
from pathlib import Path
from typing import Any, Literal, Mapping, Sequence, TypedDict, Union
from typing import Any, Literal, Mapping, Sequence, TypedDict, TypeGuard, Union

import msgpack
import rapidjson
Expand All @@ -21,23 +21,37 @@
total=False,
)

ProtobufSchema = TypedDict(
"ProtobufSchema",
{"resource": str},
total=False,
)

Schema = TypedDict(
"Schema",
{
"version": int,
"type": Literal["json", "msgpack"],
"type": Literal["json", "msgpack", "protobuf"],
"compatibility_mode": Union[Literal["none"], Literal["backward"]],
"schema": Union[JsonSchema],
"schema": Union[JsonSchema, ProtobufSchema],
"schema_filepath": str,
"examples": Sequence[str],
},
)


def is_json_schema(o: object) -> TypeGuard[JsonSchema]:
return isinstance(o, dict) and o.get("$schema") is not None


def is_protobuf_schema(o: object) -> TypeGuard[ProtobufSchema]:
return isinstance(o, dict) and o.get("resource") is not None


@dataclasses.dataclass(frozen=True)
class Example:
path: Path
type: Literal["json", "msgpack"]
type: Literal["json", "msgpack", "protobuf"]

_examples_basepath: Path

Expand All @@ -51,3 +65,5 @@ def load(self) -> Any:
return rapidjson.load(f)
elif self.type == "msgpack":
return msgpack.unpackb(f.read())
elif self.type == "protobuf":
return f.read()
29 changes: 29 additions & 0 deletions python/tests/test_codecs.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import json
import time
from pathlib import Path
from typing import Any, TypedDict

import pytest
from google.protobuf.timestamp_pb2 import Timestamp
from sentry_kafka_schemas.codecs import Codec
from sentry_kafka_schemas.codecs.json import JsonCodec
from sentry_kafka_schemas.codecs.msgpack import MsgpackCodec
from sentry_kafka_schemas.codecs.protobuf import ProtobufCodec
from sentry_protos.sentry.v1.taskworker_pb2 import TaskActivation


class Example(TypedDict):
Expand Down Expand Up @@ -35,3 +39,28 @@ def test_json_codec(codec_cls: Codec[Any]) -> None:
data_intermediate = codec.encode(data, validate=False)
assert isinstance(data_intermediate, bytes)
assert codec.decode(data_intermediate, validate=True) == data


def test_protobuf_codec() -> None:
activation = TaskActivation(
id="abc123",
taskname="tests.do_things",
namespace="tests",
parameters='{"args":[],"kwargs":{}}',
headers={},
received_at=Timestamp(seconds=int(time.time())),
)
codec: ProtobufCodec[TaskActivation] = ProtobufCodec(
resource="sentry_protos.sentry.v1.taskworker_pb2.TaskActivation"
)
serialized = codec.encode(activation)
assert type(serialized) == bytes

rebuild = codec.decode(serialized)
assert rebuild
assert rebuild.id == activation.id
assert rebuild.taskname == activation.taskname
assert rebuild.namespace == activation.namespace
assert rebuild.parameters == activation.parameters
assert rebuild.headers == activation.headers
assert rebuild.received_at == activation.received_at
40 changes: 36 additions & 4 deletions python/tests/test_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pytest
import rapidjson
from sentry_kafka_schemas import iter_examples
from sentry_kafka_schemas.sentry_kafka_schemas import _get_schema, get_topic, list_topics
from sentry_kafka_schemas.sentry_kafka_schemas import _get_schema, get_codec, get_topic, list_topics
from sentry_kafka_schemas.types import Example


Expand All @@ -17,6 +17,26 @@ def get_all_examples() -> Iterator[Tuple[str, int, Example]]:
yield topic, version, x


def get_protobuf_examples() -> Iterator[Tuple[str, int, Example]]:
for topic in list_topics():
for schema_raw in get_topic(topic)["schemas"]:
if schema_raw["type"] != "protobuf":
continue
version = schema_raw["version"]
for x in iter_examples(topic, version=version):
yield topic, version, x


def get_json_examples() -> Iterator[Tuple[str, int, Example]]:
for topic in list_topics():
for schema_raw in get_topic(topic)["schemas"]:
if schema_raw["type"] not in ("json", "msgpack"):
continue
version = schema_raw["version"]
for x in iter_examples(topic, version=version):
yield topic, version, x


def _get_most_specific_jsonschema_error(e: jsonschema.ValidationError) -> None:
"""
Errors from the jsonschema library are often somewhat vague as the
Expand All @@ -30,12 +50,12 @@ def _get_most_specific_jsonschema_error(e: jsonschema.ValidationError) -> None:

def test_file_extension() -> None:
for example in get_all_examples():
assert example[2].path.name.endswith((".msgpack", ".json"))
assert example[2].path.name.endswith((".msgpack", ".json", ".protobuf"))


@pytest.mark.parametrize("topic,version,example", get_all_examples(), ids=str)
@pytest.mark.parametrize("topic,version,example", get_json_examples(), ids=str)
@pytest.mark.parametrize("jsonschema_library", ["fastjsonschema", "jsonschema", "rapidjson"])
def test_examples(
def test_json_examples(
topic: str,
version: int,
example: Example,
Expand All @@ -56,3 +76,15 @@ def test_examples(
compiled = rapidjson.Validator(rapidjson.dumps(schema))
raw_example_data = rapidjson.dumps(example_data)
compiled(raw_example_data)


@pytest.mark.parametrize("topic,version,example", get_protobuf_examples(), ids=str)
def test_protobuf_examples(
topic: str,
version: int,
example: Example,
) -> None:
example_data = example.load()
codec = get_codec(topic, version=version)
result = codec.decode(example_data)
assert result
Loading

0 comments on commit 1abd122

Please sign in to comment.