Skip to content

Commit

Permalink
ref: bump sentry-kafka-schemas to 0.1.37 (#5127)
Browse files Browse the repository at this point in the history
  • Loading branch information
getsentry-bot committed Nov 30, 2023
1 parent 853926e commit 00ca464
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 12 deletions.
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ python-dateutil==2.8.2
python-rapidjson==1.8
redis==4.3.4
sentry-arroyo==2.14.23
sentry-kafka-schemas==0.1.35
sentry-kafka-schemas==0.1.37
sentry-redis-tools==0.1.7
sentry-relay==0.8.27
sentry-sdk==1.28.0
Expand Down
17 changes: 17 additions & 0 deletions rust_snuba/src/processors/spans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ struct FromSpanMessage {
#[serde(deserialize_with = "hex_to_u64")]
group_raw: u64,
is_segment: bool,
measurements: Option<BTreeMap<String, FromMeasurementValue>>,
#[serde(deserialize_with = "hex_to_u64")]
parent_span_id: u64,
profile_id: Option<Uuid>,
Expand All @@ -51,6 +52,11 @@ struct FromSpanMessage {
trace_id: Uuid,
}

#[derive(Debug, Default, Deserialize)]
struct FromMeasurementValue {
value: f64,
}

#[derive(Debug, Default, Deserialize)]
struct FromSentryTags {
action: Option<String>,
Expand Down Expand Up @@ -200,6 +206,15 @@ impl TryFrom<FromSpanMessage> for Span {
let tags = from.tags.unwrap_or_default();
let (mut tag_keys, mut tag_values): (Vec<_>, Vec<_>) = tags.into_iter().unzip();

let measurements = from.measurements.unwrap_or_default();
let (measurement_keys, measurement_raw_values): (Vec<_>, Vec<_>) =
measurements.into_iter().unzip();

let mut measurement_values: Vec<f64> = Vec::new();
for measurement in measurement_raw_values {
measurement_values.push(measurement.value);
}

if let Some(http_method) = from.sentry_tags.http_method.clone() {
tag_keys.push("http.method".into());
tag_values.push(http_method);
Expand All @@ -226,6 +241,8 @@ impl TryFrom<FromSpanMessage> for Span {
group,
group_raw: from.group_raw,
is_segment: if from.is_segment { 1 } else { 0 },
measurement_keys,
measurement_values,
module: from.sentry_tags.module.unwrap_or_default(),
op: from.sentry_tags.op.unwrap_or_default(),
parent_span_id: from.parent_span_id,
Expand Down
7 changes: 5 additions & 2 deletions snuba/datasets/processors/spans_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def _process_span_event(
processed["trace_id"] = str(uuid.UUID(span_event["trace_id"]))
processed["span_id"] = int(span_event["span_id"], 16)
processed["segment_id"] = processed["span_id"]
processed["is_segment"] = span_event["is_segment"]
processed["is_segment"] = 1 if span_event["is_segment"] else 0
parent_span_id: Optional[str] = span_event.get("parent_span_id", None)
if parent_span_id:
processed["parent_span_id"] = int(parent_span_id, 16)
Expand Down Expand Up @@ -111,7 +111,7 @@ def _process_span_event(
)

processed["duration"] = max(span_event["duration_ms"], 0)
processed["exclusive_time"] = span_event["exclusive_time_ms"]
processed["exclusive_time"] = float(span_event["exclusive_time_ms"])

@staticmethod
def _process_tags(
Expand Down Expand Up @@ -166,6 +166,9 @@ def _process_sentry_tags(
TODO: For the top level span belonging to a transaction, we do not know how to fill these
values yet. For now lets just set them to their default values.
"""
if "sentry_tags" not in span_event:
return

sentry_tags = span_event["sentry_tags"].copy()
sentry_tag_keys, sentry_tag_values = extract_extra_tags(sentry_tags)
processed["sentry_tags.key"] = sentry_tag_keys
Expand Down
26 changes: 17 additions & 9 deletions tests/datasets/test_spans_processor.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import uuid
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Any, Mapping, Optional, Sequence, Tuple
from typing import Any, Dict, Mapping, Optional, Sequence, Tuple

import pytest
from sentry_kafka_schemas.schema_types.snuba_spans_v1 import SpanEvent
from sentry_kafka_schemas.schema_types.snuba_spans_v1 import (
SpanEvent,
_MeasurementValue,
)
from sentry_relay.consts import SPAN_STATUS_NAME_TO_CODE

from snuba.consumers.types import KafkaMessageMetadata
Expand Down Expand Up @@ -39,6 +42,7 @@ class SpanEventExample:
transaction_name: str
user_id: Optional[str]
user_name: Optional[str]
measurements: Dict[str, _MeasurementValue]

def serialize(self) -> SpanEvent:
return {
Expand Down Expand Up @@ -75,6 +79,7 @@ def serialize(self) -> SpanEvent:
"sentry:user": self.user_id or "",
},
"trace_id": self.trace_id,
"measurements": self.measurements,
}

def build_result(self, meta: KafkaMessageMetadata) -> Sequence[Mapping[str, Any]]:
Expand All @@ -89,14 +94,16 @@ def build_result(self, meta: KafkaMessageMetadata) -> Sequence[Mapping[str, Any]
"is_segment": 0,
"segment_name": self.transaction_name,
"start_timestamp": int(
datetime.utcfromtimestamp(
self.start_timestamp_ms / 1000
datetime.fromtimestamp(
self.start_timestamp_ms / 1000,
tz=timezone.utc,
).timestamp()
),
"start_ms": self.start_timestamp_ms % 1000,
"end_timestamp": int(
datetime.utcfromtimestamp(
(self.start_timestamp_ms + self.duration_ms) / 1000
datetime.fromtimestamp(
(self.start_timestamp_ms + self.duration_ms) / 1000,
tz=timezone.utc,
).timestamp()
),
"end_ms": (self.start_timestamp_ms + self.duration_ms) % 1000,
Expand All @@ -123,8 +130,8 @@ def build_result(self, meta: KafkaMessageMetadata) -> Sequence[Mapping[str, Any]
"transaction.method",
],
"tags.value": ["123", "value1", "123", "True", "GET", "200", "GET"],
"measurements.key": [],
"measurements.value": [],
"measurements.key": ["memory"],
"measurements.value": [1000.0],
"partition": meta.partition,
"offset": meta.offset,
"retention_days": self.retention_days,
Expand Down Expand Up @@ -190,7 +197,7 @@ def compare_types_and_values(dict1: Any, dict2: Any) -> bool:
@pytest.mark.redis_db
class TestSpansProcessor:
@staticmethod
def __get_timestamps() -> Tuple[float, float]:
def __get_timestamps() -> Tuple[float, float, float]:
timestamp = datetime.now(tz=timezone.utc) - timedelta(seconds=1000)
start_timestamp = timestamp - timedelta(seconds=10)
received = timestamp + timedelta(seconds=1)
Expand All @@ -208,6 +215,7 @@ def __get_span_event(self) -> SpanEventExample:
group_raw="deadbeefdeadbeef",
http_method="POST",
http_referer="tagstore.something",
measurements={"memory": {"value": 1000.0}},
module="http",
op="navigation",
organization_id=69,
Expand Down

0 comments on commit 00ca464

Please sign in to comment.