Skip to content

Commit

Permalink
ref(server): Remove sampling_result from state (#3226)
Browse files Browse the repository at this point in the history
Make `sampling_result` a plain old return value, to prevent bugs like
the one fixed in #3186.

ref: getsentry/team-ingest#299
  • Loading branch information
jjbayer committed Mar 6, 2024
1 parent 5f2e1e8 commit 69c8a18
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 23 deletions.
16 changes: 6 additions & 10 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,11 +539,6 @@ struct ProcessEnvelopeState<'a, Group> {
/// resulting item.
sample_rates: Option<Value>,

/// The result of a dynamic sampling operation on this envelope.
///
/// The event will be kept if there's either no match, or there's a match and it was sampled.
sampling_result: SamplingResult,

/// Metrics extracted from items in the envelope.
///
/// Relay can extract metrics for sessions and transactions, which is controlled by
Expand Down Expand Up @@ -1025,7 +1020,6 @@ impl EnvelopeProcessorService {
event_metrics_extracted: false,
metrics: Metrics::default(),
sample_rates: None,
sampling_result: SamplingResult::Pending,
extracted_metrics: Default::default(),
project_state,
sampling_project_state,
Expand Down Expand Up @@ -1102,6 +1096,7 @@ impl EnvelopeProcessorService {
fn extract_metrics(
&self,
state: &mut ProcessEnvelopeState<TransactionGroup>,
sampling_result: &SamplingResult,
) -> Result<(), ProcessingError> {
// NOTE: This function requires a `metric_extraction` in the project config. Legacy configs
// will upsert this configuration from transaction and conditional tagging fields, even if
Expand Down Expand Up @@ -1140,7 +1135,7 @@ impl EnvelopeProcessorService {
config: tx_config,
generic_tags: config.map(|c| c.tags.as_slice()).unwrap_or_default(),
transaction_from_dsc,
sampling_result: &state.sampling_result,
sampling_result,
has_profile: state.profile_id.is_some(),
};

Expand Down Expand Up @@ -1318,18 +1313,19 @@ impl EnvelopeProcessorService {
let mut sampling_should_drop = false;

if self.inner.config.processing_enabled() || supported_generic_filters {
dynamic_sampling::run(state, &self.inner.config);
let sampling_result = dynamic_sampling::run(state, &self.inner.config);
// Remember sampling decision, before it is reset in `dynamic_sampling::sample_envelope_items`.
sampling_should_drop = state.sampling_result.should_drop();
sampling_should_drop = sampling_result.should_drop();

// We avoid extracting metrics if we are not sampling the event while in non-processing
// relays, in order to synchronize rate limits on indexed and processed transactions.
if self.inner.config.processing_enabled() || sampling_should_drop {
self.extract_metrics(state)?;
self.extract_metrics(state, &sampling_result)?;
}

dynamic_sampling::sample_envelope_items(
state,
sampling_result,
&self.inner.config,
&self.inner.global_config.current(),
);
Expand Down
27 changes: 14 additions & 13 deletions relay-server/src/services/processor/dynamic_sampling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,19 @@ pub fn normalize(state: &mut ProcessEnvelopeState<TransactionGroup>) {
}

/// Computes the sampling decision on the incoming event
pub fn run(state: &mut ProcessEnvelopeState<TransactionGroup>, config: &Config) {
pub fn run(state: &mut ProcessEnvelopeState<TransactionGroup>, config: &Config) -> SamplingResult {
// Running dynamic sampling involves either:
// - Tagging whether an incoming error has a sampled trace connected to it.
// - Computing the actual sampling decision on an incoming transaction.
match state.event_type().unwrap_or_default() {
EventType::Default | EventType::Error => {
tag_error_with_sampling_decision(state, config);
SamplingResult::Pending
}
EventType::Transaction => {
match state.project_state.config.transaction_metrics {
Some(ErrorBoundary::Ok(ref c)) if c.is_enabled() => (),
_ => return,
_ => return SamplingResult::Pending,
}

let sampling_config = match state.project_state.config.sampling {
Expand All @@ -87,26 +88,27 @@ pub fn run(state: &mut ProcessEnvelopeState<TransactionGroup>, config: &Config)
_ => None,
};

state.sampling_result = compute_sampling_decision(
compute_sampling_decision(
config.processing_enabled(),
&state.reservoir,
sampling_config,
state.event.value(),
root_config,
state.envelope().dsc(),
);
)
}
_ => {}
_ => SamplingResult::Pending,
}
}

/// Apply the dynamic sampling decision from `compute_sampling_decision`.
pub fn sample_envelope_items(
state: &mut ProcessEnvelopeState<TransactionGroup>,
sampling_result: SamplingResult,
config: &Config,
global_config: &GlobalConfig,
) {
if let SamplingResult::Match(sampling_match) = std::mem::take(&mut state.sampling_result) {
if let SamplingResult::Match(sampling_match) = sampling_result {
// We assume that sampling is only supposed to work on transactions.
let Some(event) = state.event.value() else {
return;
Expand Down Expand Up @@ -479,7 +481,6 @@ mod tests {
event: Annotated::from(event),
metrics: Default::default(),
sample_rates: None,
sampling_result: SamplingResult::Pending,
extracted_metrics: Default::default(),
project_state: Arc::new(project_state),
sampling_project_state: None,
Expand All @@ -501,18 +502,18 @@ mod tests {

// None represents no TransactionMetricsConfig, DS will not be run
let mut state = get_state(None);
run(&mut state, &config);
assert!(state.sampling_result.should_keep());
let sampling_result = run(&mut state, &config);
assert!(sampling_result.should_keep());

// Current version is 1, so it won't run DS if it's outdated
let mut state = get_state(Some(0));
run(&mut state, &config);
assert!(state.sampling_result.should_keep());
let sampling_result = run(&mut state, &config);
assert!(sampling_result.should_keep());

// Dynamic sampling is run, as the transactionmetrics version is up to date.
let mut state = get_state(Some(1));
run(&mut state, &config);
assert!(state.sampling_result.should_drop());
let sampling_result = run(&mut state, &config);
assert!(sampling_result.should_drop());
}

fn project_state_with_single_rule(sample_rate: f64) -> ProjectState {
Expand Down

0 comments on commit 69c8a18

Please sign in to comment.