Skip to content

Commit

Permalink
feat(spans): Extract spans from transactions (#2350)
Browse files Browse the repository at this point in the history
Extract spans from transactions and publish them on a new Kafka topic
`ingest-spans`. Protected by a feature flag.

Notes:

* Spans are extracted only in processing relays. If an SDK or downstream
Relay provides `Span` items in an envelope, they are deleted
unconditionally.
* The extraction happens after all regular transaction processing, for
example metrics extraction and dynamic sampling. If we want to split the
sampling logic in the future, we will have to move the extraction to an
earlier stage.
  • Loading branch information
jjbayer committed Aug 1, 2023
1 parent f942199 commit a333ee2
Show file tree
Hide file tree
Showing 12 changed files with 198 additions and 7 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@

- Limit environment names on check-ins to 64 chars. ([#2309](https://github.com/getsentry/relay/pull/2309))

**Internal**:

- Feature-flagged extraction & publishing of spans from transactions. ([#2350](https://github.com/getsentry/relay/pull/2350))


## 23.7.1

**Bug Fixes**:
Expand Down
14 changes: 11 additions & 3 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,8 @@ struct Limits {
max_api_chunk_upload_size: ByteSize,
/// The maximum payload size for a profile
max_profile_size: ByteSize,
/// The maximum payload size for a span.
max_span_size: ByteSize,
/// The maximum payload size for a compressed replay.
max_replay_compressed_size: ByteSize,
/// The maximum payload size for an uncompressed replay.
Expand Down Expand Up @@ -582,6 +584,7 @@ impl Default for Limits {
max_api_file_upload_size: ByteSize::mebibytes(40),
max_api_chunk_upload_size: ByteSize::mebibytes(100),
max_profile_size: ByteSize::mebibytes(50),
max_span_size: ByteSize::mebibytes(1),
max_replay_compressed_size: ByteSize::mebibytes(10),
max_replay_uncompressed_size: ByteSize::mebibytes(100),
max_replay_message_size: ByteSize::mebibytes(15),
Expand Down Expand Up @@ -1790,22 +1793,27 @@ impl Config {
self.values.limits.max_attachment_size.as_bytes()
}

/// Returns the maxmium combined size of attachments or payloads containing attachments
/// Returns the maximum combined size of attachments or payloads containing attachments
/// (minidump, unreal, standalone attachments) in bytes.
pub fn max_attachments_size(&self) -> usize {
self.values.limits.max_attachments_size.as_bytes()
}

/// Returns the maxmium combined size of client reports in bytes.
/// Returns the maximum combined size of client reports in bytes.
pub fn max_client_reports_size(&self) -> usize {
self.values.limits.max_client_reports_size.as_bytes()
}

/// Returns the maxmium payload size of a monitor check-in in bytes.
/// Returns the maximum payload size of a monitor check-in in bytes.
pub fn max_check_in_size(&self) -> usize {
self.values.limits.max_check_in_size.as_bytes()
}

/// Returns the maximum payload size of a span in bytes.
pub fn max_span_size(&self) -> usize {
self.values.limits.max_span_size.as_bytes()
}

/// Returns the maximum size of an envelope payload in bytes.
///
/// Individual item size limits still apply.
Expand Down
5 changes: 4 additions & 1 deletion relay-dynamic-config/src/feature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ pub enum Feature {
/// Enables metric extraction from spans.
#[serde(rename = "projects:span-metrics-extraction")]
SpanMetricsExtraction,
/// Extract spans from transactions and convert them to standalone spans.
#[serde(rename = "projects:extract-standalone-spans")]
ExtractStandaloneSpans,

/// Deprecated, still forwarded for older downstream Relays.
#[serde(rename = "organizations:transaction-name-mark-scrubbed-as-sanitized")]
Deprecated1,
Expand All @@ -28,7 +32,6 @@ pub enum Feature {
/// Deprecated, still forwarded for older downstream Relays.
#[serde(rename = "organizations:profiling")]
Deprecated3,

/// Forward compatibility.
#[serde(other)]
Unknown,
Expand Down
9 changes: 8 additions & 1 deletion relay-kafka/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,16 @@ pub enum KafkaTopic {
ReplayRecordings,
/// Monitor check-ins.
Monitors,
/// Standalone spans without a transaction.
Spans,
}

impl KafkaTopic {
/// Returns iterator over the variants of [`KafkaTopic`].
/// It will have to be adjusted if the new variants are added.
pub fn iter() -> std::slice::Iter<'static, Self> {
use KafkaTopic::*;
static TOPICS: [KafkaTopic; 12] = [
static TOPICS: [KafkaTopic; 13] = [
Events,
Attachments,
Transactions,
Expand All @@ -61,6 +63,7 @@ impl KafkaTopic {
ReplayEvents,
ReplayRecordings,
Monitors,
Spans,
];
TOPICS.iter()
}
Expand Down Expand Up @@ -98,6 +101,8 @@ pub struct TopicAssignments {
pub replay_recordings: TopicAssignment,
/// Monitor check-ins.
pub monitors: TopicAssignment,
/// Standalone spans without a transaction.
pub spans: TopicAssignment,
}

impl TopicAssignments {
Expand All @@ -117,6 +122,7 @@ impl TopicAssignments {
KafkaTopic::ReplayEvents => &self.replay_events,
KafkaTopic::ReplayRecordings => &self.replay_recordings,
KafkaTopic::Monitors => &self.monitors,
KafkaTopic::Spans => &self.spans,
}
}
}
Expand All @@ -137,6 +143,7 @@ impl Default for TopicAssignments {
replay_events: "ingest-replay-events".to_owned().into(),
replay_recordings: "ingest-replay-recordings".to_owned().into(),
monitors: "ingest-monitors".to_owned().into(),
spans: "ingest-spans".to_owned().into(),
}
}
}
Expand Down
41 changes: 41 additions & 0 deletions relay-server/src/actors/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1630,6 +1630,7 @@ impl EnvelopeProcessorService {
ItemType::ReplayEvent => false,
ItemType::ReplayRecording => false,
ItemType::CheckIn => false,
ItemType::Span => false,

// Without knowing more, `Unknown` items are allowed to be repeated
ItemType::Unknown(_) => false,
Expand Down Expand Up @@ -2237,6 +2238,43 @@ impl EnvelopeProcessorService {
Ok(())
}

#[cfg(feature = "processing")]
fn extract_spans(&self, state: &mut ProcessEnvelopeState) {
// For now, drop any spans submitted by the SDK.
state.managed_envelope.retain_items(|item| match item.ty() {
ItemType::Span => ItemAction::DropSilently,
_ => ItemAction::Keep,
});

// Only extract spans from transactions (not errors).
if state.event_type() != Some(EventType::Transaction) {
return;
};

// Check feature flag.
if !state
.project_state
.has_feature(Feature::ExtractStandaloneSpans)
{
return;
};

// Extract.
let Some(spans) = state.event.value().and_then(|e| e.spans.value()) else { return };
for span in spans {
let span = match span.to_json() {
Ok(span) => span,
Err(e) => {
relay_log::error!(error = &e as &dyn Error, "Failed to serialize span");
continue;
}
};
let mut item = Item::new(ItemType::Span);
item.set_payload(ContentType::Json, span);
state.managed_envelope.envelope_mut().add_item(item);
}
}

/// Computes the sampling decision on the incoming event
fn run_dynamic_sampling(&self, state: &mut ProcessEnvelopeState) {
// Running dynamic sampling involves either:
Expand Down Expand Up @@ -2438,6 +2476,9 @@ impl EnvelopeProcessorService {
if state.has_event() {
self.scrub_event(state)?;
self.serialize_event(state)?;
if_processing!({
self.extract_spans(state);
});
}

self.scrub_attachments(state);
Expand Down
53 changes: 53 additions & 0 deletions relay-server/src/actors/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,12 @@ impl StoreService {
retention,
item,
)?,
ItemType::Span => self.produce_span(
scoping.organization_id,
scoping.project_id,
start_time,
item,
)?,
_ => {}
}
}
Expand Down Expand Up @@ -732,6 +738,40 @@ impl StoreService {

Ok(())
}

fn produce_span(
&self,
organization_id: u64,
project_id: ProjectId,
start_time: Instant,
item: &Item,
) -> Result<(), StoreError> {
// Bit unfortunate that we need to parse again here, but it's the same for sessions.
let span: serde_json::Value = match serde_json::from_slice(&item.payload()) {
Ok(span) => span,
Err(error) => {
relay_log::error!(
error = &error as &dyn std::error::Error,
"failed to parse span"
);
return Ok(());
}
};
let message = KafkaMessage::Span(SpanKafkaMessage {
project_id,
start_time: UnixTimestamp::from_instant(start_time).as_secs(),
span,
});

self.produce(KafkaTopic::Spans, organization_id, message)?;

metric!(
counter(RelayCounters::ProcessingMessageProduced) += 1,
event_type = "span"
);

Ok(())
}
}

impl Service for StoreService {
Expand Down Expand Up @@ -990,6 +1030,16 @@ struct CheckInKafkaMessage {
retention_days: u16,
}

#[derive(Debug, Serialize)]
struct SpanKafkaMessage {
/// Raw span data.
span: serde_json::Value,
/// Time at which the span was received by Relay.
start_time: u64,
/// The project id for the current span.
project_id: ProjectId,
}

/// An enum over all possible ingest messages.
#[derive(Debug, Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
Expand All @@ -1010,6 +1060,7 @@ enum KafkaMessage {
ReplayEvent(ReplayEventKafkaMessage),
ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage),
CheckIn(CheckInKafkaMessage),
Span(SpanKafkaMessage),
}

impl Message for KafkaMessage {
Expand All @@ -1025,6 +1076,7 @@ impl Message for KafkaMessage {
KafkaMessage::ReplayEvent(_) => "replay_event",
KafkaMessage::ReplayRecordingNotChunked(_) => "replay_recording_not_chunked",
KafkaMessage::CheckIn(_) => "check_in",
KafkaMessage::Span(_) => "span",
}
}

Expand All @@ -1041,6 +1093,7 @@ impl Message for KafkaMessage {
Self::ReplayEvent(message) => message.replay_id.0,
Self::ReplayRecordingNotChunked(_message) => Uuid::nil(), // Ensure random partitioning.
Self::CheckIn(_message) => Uuid::nil(),
Self::Span(_) => Uuid::nil(), // random partitioning
};

if uuid.is_nil() {
Expand Down
9 changes: 8 additions & 1 deletion relay-server/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ pub enum ItemType {
ReplayRecording,
/// Monitor check-in encoded as JSON.
CheckIn,
/// A standalone span.
Span,
/// A new item type that is yet unknown by this version of Relay.
///
/// By default, items of this type are forwarded without modification. Processing Relays and
Expand Down Expand Up @@ -151,6 +153,7 @@ impl fmt::Display for ItemType {
Self::ReplayEvent => write!(f, "replay_event"),
Self::ReplayRecording => write!(f, "replay_recording"),
Self::CheckIn => write!(f, "check_in"),
Self::Span => write!(f, "span"),
Self::Unknown(s) => s.fmt(f),
}
}
Expand Down Expand Up @@ -178,6 +181,7 @@ impl std::str::FromStr for ItemType {
"replay_event" => Self::ReplayEvent,
"replay_recording" => Self::ReplayRecording,
"check_in" => Self::CheckIn,
"span" => Self::Span,
other => Self::Unknown(other.to_owned()),
})
}
Expand Down Expand Up @@ -559,6 +563,7 @@ impl Item {
ItemType::ClientReport => None,
ItemType::CheckIn => Some(DataCategory::Monitor),
ItemType::Unknown(_) => None,
ItemType::Span => None, // No outcomes, for now
}
}

Expand Down Expand Up @@ -722,7 +727,8 @@ impl Item {
| ItemType::ReplayEvent
| ItemType::ReplayRecording
| ItemType::Profile
| ItemType::CheckIn => false,
| ItemType::CheckIn
| ItemType::Span => false,

// The unknown item type can observe any behavior, most likely there are going to be no
// item types added that create events.
Expand Down Expand Up @@ -752,6 +758,7 @@ impl Item {
ItemType::ReplayRecording => false,
ItemType::Profile => true,
ItemType::CheckIn => false,
ItemType::Span => false,

// Since this Relay cannot interpret the semantics of this item, it does not know
// whether it requires an event or not. Depending on the strategy, this can cause two
Expand Down
1 change: 1 addition & 0 deletions relay-server/src/utils/rate_limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ fn infer_event_category(item: &Item) -> Option<DataCategory> {
ItemType::ReplayRecording => None,
ItemType::ClientReport => None,
ItemType::CheckIn => None,
ItemType::Span => None,
ItemType::Unknown(_) => None,
}
}
Expand Down
5 changes: 5 additions & 0 deletions relay-server/src/utils/sizes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ pub fn check_envelope_size_limits(config: &Config, envelope: &Envelope) -> bool
ItemType::UserReport => (),
ItemType::Metrics => (),
ItemType::MetricBuckets => (),
ItemType::Span => {
if item.len() > config.max_span_size() {
return false;
}
}
ItemType::Unknown(_) => (),
}
}
Expand Down
1 change: 1 addition & 0 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
metrics_consumer,
replay_events_consumer,
monitors_consumer,
spans_consumer,
)


Expand Down
Loading

0 comments on commit a333ee2

Please sign in to comment.