Skip to content

Commit

Permalink
feat(replays): Reject replay envelopes if any item contained within i…
Browse files Browse the repository at this point in the history
…s invalid (#3201)

Closes: #3180

Drop the envelope in its entirety if any component fails validation.
This prevents confusing states where is shown a replay which was never
ingested.

---------

Co-authored-by: Joris Bayer <joris.bayer@sentry.io>
  • Loading branch information
cmanallen and jjbayer authored Mar 8, 2024
1 parent ae75d25 commit 87b23e6
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 51 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
- Adds support for dynamic metric bucket encoding. ([#3137](https://github.com/getsentry/relay/pull/3137))
- Use statsdproxy to pre-aggregate metrics. ([#2425](https://github.com/getsentry/relay/pull/2425))
- Add SDK information to spans. ([#3178](https://github.com/getsentry/relay/pull/3178))
- Drop replay envelopes if any item fails. ([#3201](https://github.com/getsentry/relay/pull/3201))
- Filter null values from metrics summary tags. ([#3204](https://github.com/getsentry/relay/pull/3204))
- Emit a usage metric for every span seen. ([#3209](https://github.com/getsentry/relay/pull/3209))
- Add namespace for profile metrics. ([#3229](https://github.com/getsentry/relay/pull/3229))
Expand Down
5 changes: 5 additions & 0 deletions relay-server/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1319,6 +1319,11 @@ impl Envelope {
self.items.retain(f)
}

/// Drops every item in the envelope.
pub fn drop_items_silently(&mut self) {
self.items.clear()
}

/// Serializes this envelope into the given writer.
pub fn serialize<W>(&self, mut writer: W) -> Result<(), EnvelopeError>
where
Expand Down
5 changes: 5 additions & 0 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,9 @@ pub enum ProcessingError {

#[error("invalid processing group type")]
InvalidProcessingGroup(#[from] InvalidProcessingGroupType),

#[error("invalid replay")]
InvalidReplay(DiscardReason),
}

impl ProcessingError {
Expand Down Expand Up @@ -460,6 +463,8 @@ impl ProcessingError {
Self::MissingProjectId => None,
Self::EventFiltered(_) => None,
Self::InvalidProcessingGroup(_) => None,

Self::InvalidReplay(reason) => Some(Outcome::Invalid(reason)),
}
}

Expand Down
92 changes: 42 additions & 50 deletions relay-server/src/services/processor/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@ use rmp_serde;
use serde::{Deserialize, Serialize};

use crate::envelope::{ContentType, ItemType};
use crate::services::outcome::{DiscardReason, Outcome};
use crate::services::outcome::DiscardReason;
use crate::services::processor::{ProcessEnvelopeState, ProcessingError, ReplayGroup};
use crate::statsd::RelayTimers;
use crate::utils::ItemAction;

/// Removes replays if the feature flag is not enabled.
pub fn process(
Expand Down Expand Up @@ -55,64 +54,57 @@ pub fn process(
let combined_envelope_items =
project_state.has_feature(Feature::SessionReplayCombinedEnvelopeItems);

state.managed_envelope.retain_items(|item| {
// If replays aren't enabled or an item was dropped - drop the remainder of the
// envelope.
if !replays_enabled {
return ItemAction::DropSilently;
}
// If the replay feature is not enabled drop the items silenty.
if !replays_enabled {
state.managed_envelope.drop_items_silently();
return Ok(());
}

for item in state.managed_envelope.envelope_mut().items_mut() {
// Set the combined payload header to the value of the combined feature.
item.set_replay_combined_payload(combined_envelope_items);

match item.ty() {
ItemType::ReplayEvent => {
match handle_replay_event_item(
let replay_event = handle_replay_event_item(
item.payload(),
&event_id,
project_config,
client_addr,
user_agent,
) {
Err(outcome) => ItemAction::Drop(outcome),
Ok(replay_event) => {
item.set_payload(ContentType::Json, replay_event);
ItemAction::Keep
}
}
)
.map_err(ProcessingError::InvalidReplay)?;

item.set_payload(ContentType::Json, replay_event);
}
ItemType::ReplayRecording => {
match handle_replay_recording_item(
let replay_recording = handle_replay_recording_item(
item.payload(),
&event_id,
scrubbing_enabled,
&mut scrubber,
) {
Err(outcome) => ItemAction::Drop(outcome),
Ok(replay_recording) => {
item.set_payload(ContentType::OctetStream, replay_recording);
ItemAction::Keep
}
}
)
.map_err(ProcessingError::InvalidReplay)?;

item.set_payload(ContentType::OctetStream, replay_recording);
}
ItemType::ReplayVideo => match handle_replay_video_item(
item.payload(),
&event_id,
project_config,
client_addr,
user_agent,
scrubbing_enabled,
&mut scrubber,
) {
Err(outcome) => ItemAction::Drop(outcome),
Ok(payload) => {
item.set_payload(ContentType::OctetStream, payload);
ItemAction::Keep
}
},
_ => ItemAction::Keep,
ItemType::ReplayVideo => {
let replay_video = handle_replay_video_item(
item.payload(),
&event_id,
project_config,
client_addr,
user_agent,
scrubbing_enabled,
&mut scrubber,
)
.map_err(ProcessingError::InvalidReplay)?;

item.set_payload(ContentType::OctetStream, replay_video);
}
_ => {}
}
});
}

Ok(())
}
Expand All @@ -125,7 +117,7 @@ fn handle_replay_event_item(
config: &ProjectConfig,
client_ip: Option<IpAddr>,
user_agent: &RawUserAgentInfo<&str>,
) -> Result<Bytes, Outcome> {
) -> Result<Bytes, DiscardReason> {
match process_replay_event(&payload, config, client_ip, user_agent) {
Ok(replay) => match replay.to_json() {
Ok(json) => Ok(json.into_bytes().into()),
Expand All @@ -144,12 +136,12 @@ fn handle_replay_event_item(
?event_id,
"invalid replay event"
);
Err(Outcome::Invalid(match error {
Err(match error {
ReplayError::NoContent => DiscardReason::InvalidReplayEventNoPayload,
ReplayError::CouldNotScrub(_) => DiscardReason::InvalidReplayEventPii,
ReplayError::CouldNotParse(_) => DiscardReason::InvalidReplayEvent,
ReplayError::InvalidPayload(_) => DiscardReason::InvalidReplayEvent,
}))
})
}
}
}
Expand Down Expand Up @@ -197,7 +189,7 @@ fn handle_replay_recording_item(
event_id: &Option<EventId>,
scrubbing_enabled: bool,
scrubber: &mut RecordingScrubber,
) -> Result<Bytes, Outcome> {
) -> Result<Bytes, DiscardReason> {
// XXX: Processing is there just for data scrubbing. Skip the entire expensive
// processing step if we do not need to scrub.
if !scrubbing_enabled || scrubber.is_empty() {
Expand All @@ -216,7 +208,7 @@ fn handle_replay_recording_item(
Ok(recording) => Ok(recording.into()),
Err(e) => {
relay_log::warn!("replay-recording-event: {e} {event_id:?}");
Err(Outcome::Invalid(DiscardReason::InvalidReplayRecordingEvent))
Err(DiscardReason::InvalidReplayRecordingEvent)
}
}
}
Expand All @@ -238,7 +230,7 @@ fn handle_replay_video_item(
user_agent: &RawUserAgentInfo<&str>,
scrubbing_enabled: bool,
scrubber: &mut RecordingScrubber,
) -> Result<Bytes, Outcome> {
) -> Result<Bytes, DiscardReason> {
let ReplayVideoEvent {
replay_event,
replay_recording,
Expand All @@ -247,7 +239,7 @@ fn handle_replay_video_item(
Ok(result) => result,
Err(e) => {
relay_log::warn!("replay-video-event: {e} {event_id:?}");
return Err(Outcome::Invalid(DiscardReason::InvalidReplayVideoEvent));
return Err(DiscardReason::InvalidReplayVideoEvent);
}
};

Expand All @@ -261,7 +253,7 @@ fn handle_replay_video_item(

// Verify the replay-video payload is not empty.
if replay_video.is_empty() {
return Err(Outcome::Invalid(DiscardReason::InvalidReplayVideoEvent));
return Err(DiscardReason::InvalidReplayVideoEvent);
}

match rmp_serde::to_vec_named(&ReplayVideoEvent {
Expand All @@ -270,6 +262,6 @@ fn handle_replay_video_item(
replay_video,
}) {
Ok(payload) => Ok(payload.into()),
Err(_) => Err(Outcome::Invalid(DiscardReason::InvalidReplayVideoEvent)),
Err(_) => Err(DiscardReason::InvalidReplayVideoEvent),
}
}
15 changes: 14 additions & 1 deletion relay-server/src/utils/managed_envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,11 @@ impl ManagedEnvelope {
// TODO: once `update` is private, it should be called here.
}

/// Drops every item in the envelope.
pub fn drop_items_silently(&mut self) {
self.envelope.drop_items_silently();
}

/// Record that event metrics have been extracted.
///
/// This is usually done automatically as part of `EnvelopeContext::new` or `update`. However,
Expand Down Expand Up @@ -460,13 +465,21 @@ impl ManagedEnvelope {
// (see: `Self::event_category()`).
if self.context.summary.secondary_transaction_quantity > 0 {
self.track_outcome(
outcome,
outcome.clone(),
// Secondary transaction counts are never indexed transactions
DataCategory::Transaction,
self.context.summary.secondary_transaction_quantity,
);
}

if self.context.summary.replay_quantity > 0 {
self.track_outcome(
outcome.clone(),
DataCategory::Replay,
self.context.summary.replay_quantity,
);
}

self.finish(RelayCounters::EnvelopeRejected, handling);
}

Expand Down
66 changes: 66 additions & 0 deletions tests/integration/test_outcome.py
Original file line number Diff line number Diff line change
Expand Up @@ -2041,3 +2041,69 @@ def send_buckets(n, name, value, ty):
outcomes = outcomes_consumer.get_outcomes()
assert len(outcomes) == 1
assert outcomes[0]["reason"] == global_reason_code


def test_replay_outcomes_item_failed(
mini_sentry,
relay_with_processing,
outcomes_consumer,
metrics_consumer,
):
"""
Assert Relay records a single outcome even though both envelope items fail.
"""
outcomes_consumer = outcomes_consumer(timeout=2)
metrics_consumer = metrics_consumer()

project_id = 42
mini_sentry.add_basic_project_config(
project_id, extra={"config": {"features": ["organizations:session-replay"]}}
)

config = {
"outcomes": {
"emit_outcomes": True,
"batch_size": 1,
"batch_interval": 1,
"aggregator": {
"bucket_interval": 1,
"flush_interval": 1,
},
"source": "pop-relay",
},
"aggregator": {"bucket_interval": 1, "initial_delay": 0, "debounce_delay": 0},
}

upstream = relay_with_processing(config)

def make_envelope():
envelope = Envelope(headers=[["event_id", "515539018c9b4260a6f999572f1661ee"]])
envelope.add_item(
Item(payload=PayloadRef(bytes=b"not valid"), type="replay_event")
)
envelope.add_item(
Item(payload=PayloadRef(bytes=b"still not valid"), type="replay_recording")
)

return envelope

envelope = make_envelope()
upstream.send_envelope(project_id, envelope)

outcomes = outcomes_consumer.get_outcomes()

assert len(outcomes) == 1

expected = {
"category": 7,
"event_id": "515539018c9b4260a6f999572f1661ee",
"key_id": 123,
"outcome": 3,
"project_id": 42,
"quantity": 2,
"reason": "invalid_replay",
"remote_addr": "127.0.0.1",
"source": "pop-relay",
}
expected["timestamp"] = outcomes[0]["timestamp"]
assert outcomes[0] == expected

0 comments on commit 87b23e6

Please sign in to comment.