Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(subscriptions): Add create subscriptions RPC #6499

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,4 @@ sqlparse==0.5.0
google-api-python-client==2.88.0
sentry-usage-accountant==0.0.10
freezegun==1.2.2
sentry-protos==0.1.31
sentry-protos==0.1.34
1 change: 1 addition & 0 deletions snuba/datasets/slicing.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
should be stored. These do not require individual physical partitions but allow
for repartitioning with less code changes per physical change.
"""

from snuba.clusters.storage_sets import StorageSetKey

SENTRY_LOGICAL_PARTITIONS = 256
Expand Down
31 changes: 27 additions & 4 deletions snuba/subscriptions/codecs.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
import base64
import json
from datetime import datetime
from typing import cast

import rapidjson
from arroyo.backends.kafka import KafkaPayload
from google.protobuf.message import Message as ProtobufMessage
from sentry_kafka_schemas.schema_types import events_subscription_results_v1

from snuba.datasets.entities.entity_key import EntityKey
from snuba.query.exceptions import InvalidQueryException
from snuba.subscriptions.data import (
RPCSubscriptionData,
ScheduledSubscriptionTask,
SnQLSubscriptionData,
Subscription,
SubscriptionData,
SubscriptionIdentifier,
SubscriptionTaskResult,
SubscriptionType,
SubscriptionWithMetadata,
)
from snuba.utils.codecs import Codec, Encoder
Expand All @@ -33,6 +37,9 @@ def decode(self, value: bytes) -> SubscriptionData:
except json.JSONDecodeError:
raise InvalidQueryException("Invalid JSON")

if data.get("subscription_type") == SubscriptionType.RPC.value:
return RPCSubscriptionData.from_dict(data, self.entity_key)

return SnQLSubscriptionData.from_dict(data, self.entity_key)


Expand All @@ -42,11 +49,22 @@ def encode(self, value: SubscriptionTaskResult) -> KafkaPayload:
subscription_id = str(subscription.identifier)
request, result = value.result

if isinstance(request, ProtobufMessage):
original_body = {
"request": base64.b64encode(request.SerializeToString()).decode(
"utf-8"
),
"request_name": request.__class__.__name__,
"request_version": request.__class__.__module__.split(".", 3)[2],
}
else:
original_body = {**request.original_body}

data: events_subscription_results_v1.SubscriptionResult = {
"version": 3,
"payload": {
"subscription_id": subscription_id,
"request": {**request.original_body},
"request": original_body,
"result": {
"data": result["data"],
"meta": result["meta"],
Expand Down Expand Up @@ -98,15 +116,20 @@ def decode(self, value: KafkaPayload) -> ScheduledSubscriptionTask:

entity_key = EntityKey(scheduled_subscription_dict["entity"])

data = scheduled_subscription_dict["task"]["data"]
subscription: SubscriptionData
if data.get("subscription_type") == SubscriptionType.RPC.value:
subscription = RPCSubscriptionData.from_dict(data, entity_key)
else:
subscription = SnQLSubscriptionData.from_dict(data, entity_key)

return ScheduledSubscriptionTask(
datetime.fromisoformat(scheduled_subscription_dict["timestamp"]),
SubscriptionWithMetadata(
entity_key,
Subscription(
SubscriptionIdentifier.from_string(subscription_identifier),
SnQLSubscriptionData.from_dict(
scheduled_subscription_dict["task"]["data"], entity_key
),
subscription,
),
scheduled_subscription_dict["tick_upper_offset"],
),
Expand Down
Loading
Loading