diff --git a/CHANGELOG.md b/CHANGELOG.md index 79063f3d20..76f996dbd9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ - Adds support for dynamic metric bucket encoding. ([#3137](https://github.com/getsentry/relay/pull/3137)) - Use statsdproxy to pre-aggregate metrics. ([#2425](https://github.com/getsentry/relay/pull/2425)) - Add SDK information to spans. ([#3178](https://github.com/getsentry/relay/pull/3178)) +- Drop replay envelopes if any item fails. ([#3201](https://github.com/getsentry/relay/pull/3201)) - Filter null values from metrics summary tags. ([#3204](https://github.com/getsentry/relay/pull/3204)) - Emit a usage metric for every span seen. ([#3209](https://github.com/getsentry/relay/pull/3209)) - Add namespace for profile metrics. ([#3229](https://github.com/getsentry/relay/pull/3229)) diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index 6185cdd38b..f88d388a0f 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -1319,6 +1319,11 @@ impl Envelope { self.items.retain(f) } + /// Drops every item in the envelope. + pub fn drop_items_silently(&mut self) { + self.items.clear() + } + /// Serializes this envelope into the given writer. pub fn serialize(&self, mut writer: W) -> Result<(), EnvelopeError> where diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 70956e55fb..284bf86cf4 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -419,6 +419,9 @@ pub enum ProcessingError { #[error("invalid processing group type")] InvalidProcessingGroup(#[from] InvalidProcessingGroupType), + + #[error("invalid replay")] + InvalidReplay(DiscardReason), } impl ProcessingError { @@ -460,6 +463,8 @@ impl ProcessingError { Self::MissingProjectId => None, Self::EventFiltered(_) => None, Self::InvalidProcessingGroup(_) => None, + + Self::InvalidReplay(reason) => Some(Outcome::Invalid(reason)), } } diff --git a/relay-server/src/services/processor/replay.rs b/relay-server/src/services/processor/replay.rs index e5dca9a7bd..3d1a07046d 100644 --- a/relay-server/src/services/processor/replay.rs +++ b/relay-server/src/services/processor/replay.rs @@ -17,10 +17,9 @@ use rmp_serde; use serde::{Deserialize, Serialize}; use crate::envelope::{ContentType, ItemType}; -use crate::services::outcome::{DiscardReason, Outcome}; +use crate::services::outcome::DiscardReason; use crate::services::processor::{ProcessEnvelopeState, ProcessingError, ReplayGroup}; use crate::statsd::RelayTimers; -use crate::utils::ItemAction; /// Removes replays if the feature flag is not enabled. pub fn process( @@ -55,64 +54,57 @@ pub fn process( let combined_envelope_items = project_state.has_feature(Feature::SessionReplayCombinedEnvelopeItems); - state.managed_envelope.retain_items(|item| { - // If replays aren't enabled or an item was dropped - drop the remainder of the - // envelope. - if !replays_enabled { - return ItemAction::DropSilently; - } + // If the replay feature is not enabled drop the items silenty. + if !replays_enabled { + state.managed_envelope.drop_items_silently(); + return Ok(()); + } + for item in state.managed_envelope.envelope_mut().items_mut() { // Set the combined payload header to the value of the combined feature. item.set_replay_combined_payload(combined_envelope_items); match item.ty() { ItemType::ReplayEvent => { - match handle_replay_event_item( + let replay_event = handle_replay_event_item( item.payload(), &event_id, project_config, client_addr, user_agent, - ) { - Err(outcome) => ItemAction::Drop(outcome), - Ok(replay_event) => { - item.set_payload(ContentType::Json, replay_event); - ItemAction::Keep - } - } + ) + .map_err(ProcessingError::InvalidReplay)?; + + item.set_payload(ContentType::Json, replay_event); } ItemType::ReplayRecording => { - match handle_replay_recording_item( + let replay_recording = handle_replay_recording_item( item.payload(), &event_id, scrubbing_enabled, &mut scrubber, - ) { - Err(outcome) => ItemAction::Drop(outcome), - Ok(replay_recording) => { - item.set_payload(ContentType::OctetStream, replay_recording); - ItemAction::Keep - } - } + ) + .map_err(ProcessingError::InvalidReplay)?; + + item.set_payload(ContentType::OctetStream, replay_recording); } - ItemType::ReplayVideo => match handle_replay_video_item( - item.payload(), - &event_id, - project_config, - client_addr, - user_agent, - scrubbing_enabled, - &mut scrubber, - ) { - Err(outcome) => ItemAction::Drop(outcome), - Ok(payload) => { - item.set_payload(ContentType::OctetStream, payload); - ItemAction::Keep - } - }, - _ => ItemAction::Keep, + ItemType::ReplayVideo => { + let replay_video = handle_replay_video_item( + item.payload(), + &event_id, + project_config, + client_addr, + user_agent, + scrubbing_enabled, + &mut scrubber, + ) + .map_err(ProcessingError::InvalidReplay)?; + + item.set_payload(ContentType::OctetStream, replay_video); + } + _ => {} } - }); + } Ok(()) } @@ -125,7 +117,7 @@ fn handle_replay_event_item( config: &ProjectConfig, client_ip: Option, user_agent: &RawUserAgentInfo<&str>, -) -> Result { +) -> Result { match process_replay_event(&payload, config, client_ip, user_agent) { Ok(replay) => match replay.to_json() { Ok(json) => Ok(json.into_bytes().into()), @@ -144,12 +136,12 @@ fn handle_replay_event_item( ?event_id, "invalid replay event" ); - Err(Outcome::Invalid(match error { + Err(match error { ReplayError::NoContent => DiscardReason::InvalidReplayEventNoPayload, ReplayError::CouldNotScrub(_) => DiscardReason::InvalidReplayEventPii, ReplayError::CouldNotParse(_) => DiscardReason::InvalidReplayEvent, ReplayError::InvalidPayload(_) => DiscardReason::InvalidReplayEvent, - })) + }) } } } @@ -197,7 +189,7 @@ fn handle_replay_recording_item( event_id: &Option, scrubbing_enabled: bool, scrubber: &mut RecordingScrubber, -) -> Result { +) -> Result { // XXX: Processing is there just for data scrubbing. Skip the entire expensive // processing step if we do not need to scrub. if !scrubbing_enabled || scrubber.is_empty() { @@ -216,7 +208,7 @@ fn handle_replay_recording_item( Ok(recording) => Ok(recording.into()), Err(e) => { relay_log::warn!("replay-recording-event: {e} {event_id:?}"); - Err(Outcome::Invalid(DiscardReason::InvalidReplayRecordingEvent)) + Err(DiscardReason::InvalidReplayRecordingEvent) } } } @@ -238,7 +230,7 @@ fn handle_replay_video_item( user_agent: &RawUserAgentInfo<&str>, scrubbing_enabled: bool, scrubber: &mut RecordingScrubber, -) -> Result { +) -> Result { let ReplayVideoEvent { replay_event, replay_recording, @@ -247,7 +239,7 @@ fn handle_replay_video_item( Ok(result) => result, Err(e) => { relay_log::warn!("replay-video-event: {e} {event_id:?}"); - return Err(Outcome::Invalid(DiscardReason::InvalidReplayVideoEvent)); + return Err(DiscardReason::InvalidReplayVideoEvent); } }; @@ -261,7 +253,7 @@ fn handle_replay_video_item( // Verify the replay-video payload is not empty. if replay_video.is_empty() { - return Err(Outcome::Invalid(DiscardReason::InvalidReplayVideoEvent)); + return Err(DiscardReason::InvalidReplayVideoEvent); } match rmp_serde::to_vec_named(&ReplayVideoEvent { @@ -270,6 +262,6 @@ fn handle_replay_video_item( replay_video, }) { Ok(payload) => Ok(payload.into()), - Err(_) => Err(Outcome::Invalid(DiscardReason::InvalidReplayVideoEvent)), + Err(_) => Err(DiscardReason::InvalidReplayVideoEvent), } } diff --git a/relay-server/src/utils/managed_envelope.rs b/relay-server/src/utils/managed_envelope.rs index e4229cb5a5..9a3be7bc1b 100644 --- a/relay-server/src/utils/managed_envelope.rs +++ b/relay-server/src/utils/managed_envelope.rs @@ -315,6 +315,11 @@ impl ManagedEnvelope { // TODO: once `update` is private, it should be called here. } + /// Drops every item in the envelope. + pub fn drop_items_silently(&mut self) { + self.envelope.drop_items_silently(); + } + /// Record that event metrics have been extracted. /// /// This is usually done automatically as part of `EnvelopeContext::new` or `update`. However, @@ -460,13 +465,21 @@ impl ManagedEnvelope { // (see: `Self::event_category()`). if self.context.summary.secondary_transaction_quantity > 0 { self.track_outcome( - outcome, + outcome.clone(), // Secondary transaction counts are never indexed transactions DataCategory::Transaction, self.context.summary.secondary_transaction_quantity, ); } + if self.context.summary.replay_quantity > 0 { + self.track_outcome( + outcome.clone(), + DataCategory::Replay, + self.context.summary.replay_quantity, + ); + } + self.finish(RelayCounters::EnvelopeRejected, handling); } diff --git a/tests/integration/test_outcome.py b/tests/integration/test_outcome.py index d0d09698cd..9f91743218 100644 --- a/tests/integration/test_outcome.py +++ b/tests/integration/test_outcome.py @@ -2041,3 +2041,69 @@ def send_buckets(n, name, value, ty): outcomes = outcomes_consumer.get_outcomes() assert len(outcomes) == 1 assert outcomes[0]["reason"] == global_reason_code + + +def test_replay_outcomes_item_failed( + mini_sentry, + relay_with_processing, + outcomes_consumer, + metrics_consumer, +): + """ + Assert Relay records a single outcome even though both envelope items fail. + """ + outcomes_consumer = outcomes_consumer(timeout=2) + metrics_consumer = metrics_consumer() + + project_id = 42 + mini_sentry.add_basic_project_config( + project_id, extra={"config": {"features": ["organizations:session-replay"]}} + ) + + config = { + "outcomes": { + "emit_outcomes": True, + "batch_size": 1, + "batch_interval": 1, + "aggregator": { + "bucket_interval": 1, + "flush_interval": 1, + }, + "source": "pop-relay", + }, + "aggregator": {"bucket_interval": 1, "initial_delay": 0, "debounce_delay": 0}, + } + + upstream = relay_with_processing(config) + + def make_envelope(): + envelope = Envelope(headers=[["event_id", "515539018c9b4260a6f999572f1661ee"]]) + envelope.add_item( + Item(payload=PayloadRef(bytes=b"not valid"), type="replay_event") + ) + envelope.add_item( + Item(payload=PayloadRef(bytes=b"still not valid"), type="replay_recording") + ) + + return envelope + + envelope = make_envelope() + upstream.send_envelope(project_id, envelope) + + outcomes = outcomes_consumer.get_outcomes() + + assert len(outcomes) == 1 + + expected = { + "category": 7, + "event_id": "515539018c9b4260a6f999572f1661ee", + "key_id": 123, + "outcome": 3, + "project_id": 42, + "quantity": 2, + "reason": "invalid_replay", + "remote_addr": "127.0.0.1", + "source": "pop-relay", + } + expected["timestamp"] = outcomes[0]["timestamp"] + assert outcomes[0] == expected