diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 8f9d53e997..c3137bc9f5 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -539,11 +539,6 @@ struct ProcessEnvelopeState<'a, Group> { /// resulting item. sample_rates: Option, - /// The result of a dynamic sampling operation on this envelope. - /// - /// The event will be kept if there's either no match, or there's a match and it was sampled. - sampling_result: SamplingResult, - /// Metrics extracted from items in the envelope. /// /// Relay can extract metrics for sessions and transactions, which is controlled by @@ -1025,7 +1020,6 @@ impl EnvelopeProcessorService { event_metrics_extracted: false, metrics: Metrics::default(), sample_rates: None, - sampling_result: SamplingResult::Pending, extracted_metrics: Default::default(), project_state, sampling_project_state, @@ -1102,6 +1096,7 @@ impl EnvelopeProcessorService { fn extract_metrics( &self, state: &mut ProcessEnvelopeState, + sampling_result: &SamplingResult, ) -> Result<(), ProcessingError> { // NOTE: This function requires a `metric_extraction` in the project config. Legacy configs // will upsert this configuration from transaction and conditional tagging fields, even if @@ -1140,7 +1135,7 @@ impl EnvelopeProcessorService { config: tx_config, generic_tags: config.map(|c| c.tags.as_slice()).unwrap_or_default(), transaction_from_dsc, - sampling_result: &state.sampling_result, + sampling_result, has_profile: state.profile_id.is_some(), }; @@ -1318,18 +1313,19 @@ impl EnvelopeProcessorService { let mut sampling_should_drop = false; if self.inner.config.processing_enabled() || supported_generic_filters { - dynamic_sampling::run(state, &self.inner.config); + let sampling_result = dynamic_sampling::run(state, &self.inner.config); // Remember sampling decision, before it is reset in `dynamic_sampling::sample_envelope_items`. - sampling_should_drop = state.sampling_result.should_drop(); + sampling_should_drop = sampling_result.should_drop(); // We avoid extracting metrics if we are not sampling the event while in non-processing // relays, in order to synchronize rate limits on indexed and processed transactions. if self.inner.config.processing_enabled() || sampling_should_drop { - self.extract_metrics(state)?; + self.extract_metrics(state, &sampling_result)?; } dynamic_sampling::sample_envelope_items( state, + sampling_result, &self.inner.config, &self.inner.global_config.current(), ); diff --git a/relay-server/src/services/processor/dynamic_sampling.rs b/relay-server/src/services/processor/dynamic_sampling.rs index 53b3549a2b..d696f1fd95 100644 --- a/relay-server/src/services/processor/dynamic_sampling.rs +++ b/relay-server/src/services/processor/dynamic_sampling.rs @@ -62,18 +62,19 @@ pub fn normalize(state: &mut ProcessEnvelopeState) { } /// Computes the sampling decision on the incoming event -pub fn run(state: &mut ProcessEnvelopeState, config: &Config) { +pub fn run(state: &mut ProcessEnvelopeState, config: &Config) -> SamplingResult { // Running dynamic sampling involves either: // - Tagging whether an incoming error has a sampled trace connected to it. // - Computing the actual sampling decision on an incoming transaction. match state.event_type().unwrap_or_default() { EventType::Default | EventType::Error => { tag_error_with_sampling_decision(state, config); + SamplingResult::Pending } EventType::Transaction => { match state.project_state.config.transaction_metrics { Some(ErrorBoundary::Ok(ref c)) if c.is_enabled() => (), - _ => return, + _ => return SamplingResult::Pending, } let sampling_config = match state.project_state.config.sampling { @@ -87,26 +88,27 @@ pub fn run(state: &mut ProcessEnvelopeState, config: &Config) _ => None, }; - state.sampling_result = compute_sampling_decision( + compute_sampling_decision( config.processing_enabled(), &state.reservoir, sampling_config, state.event.value(), root_config, state.envelope().dsc(), - ); + ) } - _ => {} + _ => SamplingResult::Pending, } } /// Apply the dynamic sampling decision from `compute_sampling_decision`. pub fn sample_envelope_items( state: &mut ProcessEnvelopeState, + sampling_result: SamplingResult, config: &Config, global_config: &GlobalConfig, ) { - if let SamplingResult::Match(sampling_match) = std::mem::take(&mut state.sampling_result) { + if let SamplingResult::Match(sampling_match) = sampling_result { // We assume that sampling is only supposed to work on transactions. let Some(event) = state.event.value() else { return; @@ -479,7 +481,6 @@ mod tests { event: Annotated::from(event), metrics: Default::default(), sample_rates: None, - sampling_result: SamplingResult::Pending, extracted_metrics: Default::default(), project_state: Arc::new(project_state), sampling_project_state: None, @@ -501,18 +502,18 @@ mod tests { // None represents no TransactionMetricsConfig, DS will not be run let mut state = get_state(None); - run(&mut state, &config); - assert!(state.sampling_result.should_keep()); + let sampling_result = run(&mut state, &config); + assert!(sampling_result.should_keep()); // Current version is 1, so it won't run DS if it's outdated let mut state = get_state(Some(0)); - run(&mut state, &config); - assert!(state.sampling_result.should_keep()); + let sampling_result = run(&mut state, &config); + assert!(sampling_result.should_keep()); // Dynamic sampling is run, as the transactionmetrics version is up to date. let mut state = get_state(Some(1)); - run(&mut state, &config); - assert!(state.sampling_result.should_drop()); + let sampling_result = run(&mut state, &config); + assert!(sampling_result.should_drop()); } fn project_state_with_single_rule(sample_rate: f64) -> ProjectState {