diff --git a/CHANGELOG.md b/CHANGELOG.md index 7312fc1403..0ab7e54cb0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,10 @@ - Properly handle AI metrics from the Python SDK's `@ai_track` decorator. ([#3539](https://github.com/getsentry/relay/pull/3539)) - Mitigate occasional slowness and timeouts of the healthcheck endpoint. The endpoint will now respond promptly an unhealthy state. ([#3567](https://github.com/getsentry/relay/pull/3567)) +**Features**: + +- Apple trace-based sampling rules to standalone spans. ([#3476](https://github.com/getsentry/relay/pull/3476)) + **Internal**: - Add metrics extraction config to global config. ([#3490](https://github.com/getsentry/relay/pull/3490), [#3504](https://github.com/getsentry/relay/pull/3504)) diff --git a/relay-sampling/src/dsc.rs b/relay-sampling/src/dsc.rs index c503777ff6..6f839221e8 100644 --- a/relay-sampling/src/dsc.rs +++ b/relay-sampling/src/dsc.rs @@ -33,11 +33,11 @@ pub struct DynamicSamplingContext { /// The environment. #[serde(default)] pub environment: Option, - /// The name of the transaction extracted from the `transaction` field in the starting - /// transaction. + /// In the transaction-based model, this is the name of the transaction extracted from the `transaction` + /// field in the starting transaction and set on transaction start, or via `scope.transaction`. /// - /// Set on transaction start, or via `scope.transaction`. - #[serde(default)] + /// In the spans-only model, this is the segment name for the segment that started the trace. + #[serde(default, alias = "segment_name")] pub transaction: Option, /// The sample rate with which this trace was sampled in the client. This is a float between /// `0.0` and `1.0`. diff --git a/relay-sampling/src/evaluation.rs b/relay-sampling/src/evaluation.rs index a559097a21..8f29bae309 100644 --- a/relay-sampling/src/evaluation.rs +++ b/relay-sampling/src/evaluation.rs @@ -154,20 +154,24 @@ pub struct SamplingEvaluator<'a> { } impl<'a> SamplingEvaluator<'a> { - /// Constructor for [`SamplingEvaluator`]. - pub fn new(now: DateTime) -> Self { + /// Constructs an evaluator with reservoir sampling. + pub fn new_with_reservoir(now: DateTime, reservoir: &'a ReservoirEvaluator<'a>) -> Self { Self { now, rule_ids: vec![], factor: 1.0, - reservoir: None, + reservoir: Some(reservoir), } } - /// Sets a [`ReservoirEvaluator`]. - pub fn set_reservoir(mut self, reservoir: &'a ReservoirEvaluator) -> Self { - self.reservoir = Some(reservoir); - self + /// Constructs an evaluator without reservoir sampling. + pub fn new(now: DateTime) -> Self { + Self { + now, + rule_ids: vec![], + factor: 1.0, + reservoir: None, + } } /// Attempts to find a match for sampling rules using `ControlFlow`. @@ -533,19 +537,19 @@ mod tests { // shares state among multiple evaluator instances. let reservoir = mock_reservoir_evaluator(vec![]); - let evaluator = SamplingEvaluator::new(Utc::now()).set_reservoir(&reservoir); + let evaluator = SamplingEvaluator::new_with_reservoir(Utc::now(), &reservoir); let matched_rules = get_matched_rules(&evaluator.match_rules(Uuid::default(), &dsc, rules.iter())); // Reservoir rule overrides 0 and 2. assert_eq!(&matched_rules, &[1]); - let evaluator = SamplingEvaluator::new(Utc::now()).set_reservoir(&reservoir); + let evaluator = SamplingEvaluator::new_with_reservoir(Utc::now(), &reservoir); let matched_rules = get_matched_rules(&evaluator.match_rules(Uuid::default(), &dsc, rules.iter())); // Reservoir rule overrides 0 and 2. assert_eq!(&matched_rules, &[1]); - let evaluator = SamplingEvaluator::new(Utc::now()).set_reservoir(&reservoir); + let evaluator = SamplingEvaluator::new_with_reservoir(Utc::now(), &reservoir); let matched_rules = get_matched_rules(&evaluator.match_rules(Uuid::default(), &dsc, rules.iter())); // Reservoir rule reached its limit, rule 0 and 2 are now matched instead. @@ -783,7 +787,7 @@ mod tests { let mut rule = mocked_sampling_rule(); let reservoir = ReservoirEvaluator::new(ReservoirCounters::default()); - let mut eval = SamplingEvaluator::new(Utc::now()).set_reservoir(&reservoir); + let mut eval = SamplingEvaluator::new_with_reservoir(Utc::now(), &reservoir); rule.sampling_value = SamplingValue::SampleRate { value: 1.0 }; assert_eq!(eval.try_compute_sample_rate(&rule), Some(1.0)); diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index a9c7b427ee..b5df980737 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -167,9 +167,29 @@ macro_rules! processing_group { /// Should be used only with groups which are responsible for processing envelopes with events. pub trait EventProcessing {} +/// A trait for processing groups that can be dynamically sampled. +pub trait Sampling { + /// Whether dynamic sampling should run under the given project's conditions. + fn supports_sampling(project_state: &ProjectState) -> bool; + + /// Whether reservoir sampling applies to this processing group (a.k.a. data type). + fn supports_reservoir_sampling() -> bool; +} + processing_group!(TransactionGroup, Transaction); impl EventProcessing for TransactionGroup {} +impl Sampling for TransactionGroup { + fn supports_sampling(project_state: &ProjectState) -> bool { + // For transactions, we require transaction metrics to be enabled before sampling. + matches!(&project_state.config.transaction_metrics, Some(ErrorBoundary::Ok(c)) if c.is_enabled()) + } + + fn supports_reservoir_sampling() -> bool { + true + } +} + processing_group!(ErrorGroup, Error); impl EventProcessing for ErrorGroup {} @@ -179,6 +199,18 @@ processing_group!(ClientReportGroup, ClientReport); processing_group!(ReplayGroup, Replay); processing_group!(CheckInGroup, CheckIn); processing_group!(SpanGroup, Span); + +impl Sampling for SpanGroup { + fn supports_sampling(project_state: &ProjectState) -> bool { + // If no metrics could be extracted, do not sample anything. + matches!(&project_state.config().metric_extraction, ErrorBoundary::Ok(c) if c.is_supported()) + } + + fn supports_reservoir_sampling() -> bool { + false + } +} + processing_group!(ProfileChunkGroup, ProfileChunk); processing_group!(MetricsGroup, Metrics); processing_group!(ForwardUnknownGroup, ForwardUnknown); @@ -1303,12 +1335,6 @@ impl EnvelopeProcessorService { } }; - if let Some(sampling_state) = state.sampling_project_state.clone() { - state - .envelope_mut() - .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules); - } - let request_meta = state.managed_envelope.envelope().meta(); let client_ipaddr = request_meta.client_addr().map(IpAddr::from); @@ -1650,7 +1676,7 @@ impl EnvelopeProcessorService { fn process_envelope( &self, - managed_envelope: ManagedEnvelope, + mut managed_envelope: ManagedEnvelope, project_id: ProjectId, project_state: Arc, sampling_project_state: Option>, @@ -1660,6 +1686,15 @@ impl EnvelopeProcessorService { // from the contents of the envelope. let group = managed_envelope.group(); + // Pre-process the envelope headers. + if let Some(sampling_state) = sampling_project_state.as_ref() { + // Both transactions and standalone span envelopes need a normalized DSC header + // to make sampling rules based on the segment/transaction name work correctly. + managed_envelope + .envelope_mut() + .parametrize_dsc_transaction(&sampling_state.config.tx_name_rules); + } + macro_rules! run { ($fn:ident) => {{ let managed_envelope = managed_envelope.try_into()?; diff --git a/relay-server/src/services/processor/dynamic_sampling.rs b/relay-server/src/services/processor/dynamic_sampling.rs index d0c2c53018..8f712ad034 100644 --- a/relay-server/src/services/processor/dynamic_sampling.rs +++ b/relay-server/src/services/processor/dynamic_sampling.rs @@ -16,7 +16,7 @@ use relay_statsd::metric; use crate::envelope::ItemType; use crate::services::outcome::Outcome; use crate::services::processor::{ - profile, EventProcessing, ProcessEnvelopeState, TransactionGroup, + profile, EventProcessing, ProcessEnvelopeState, Sampling, TransactionGroup, }; use crate::statsd::RelayCounters; use crate::utils::{self, sample, ItemAction, SamplingResult}; @@ -62,43 +62,35 @@ pub fn ensure_dsc(state: &mut ProcessEnvelopeState) { } /// Computes the sampling decision on the incoming event -pub fn run(state: &mut ProcessEnvelopeState, config: &Config) -> SamplingResult { - // Running dynamic sampling involves either: - // - Tagging whether an incoming error has a sampled trace connected to it. - // - Computing the actual sampling decision on an incoming transaction. - match state.event_type().unwrap_or_default() { - EventType::Default | EventType::Error => { - tag_error_with_sampling_decision(state, config); - SamplingResult::Pending - } - EventType::Transaction => { - match state.project_state.config.transaction_metrics { - Some(ErrorBoundary::Ok(ref c)) if c.is_enabled() => (), - _ => return SamplingResult::Pending, - } +pub fn run(state: &mut ProcessEnvelopeState, config: &Config) -> SamplingResult +where + Group: Sampling, +{ + if !Group::supports_sampling(&state.project_state) { + return SamplingResult::Pending; + } - let sampling_config = match state.project_state.config.sampling { - Some(ErrorBoundary::Ok(ref config)) if !config.unsupported() => Some(config), - _ => None, - }; + let sampling_config = match state.project_state.config.sampling { + Some(ErrorBoundary::Ok(ref config)) if !config.unsupported() => Some(config), + _ => None, + }; - let root_state = state.sampling_project_state.as_ref(); - let root_config = match root_state.and_then(|s| s.config.sampling.as_ref()) { - Some(ErrorBoundary::Ok(ref config)) if !config.unsupported() => Some(config), - _ => None, - }; + let root_state = state.sampling_project_state.as_ref(); + let root_config = match root_state.and_then(|s| s.config.sampling.as_ref()) { + Some(ErrorBoundary::Ok(ref config)) if !config.unsupported() => Some(config), + _ => None, + }; - compute_sampling_decision( - config.processing_enabled(), - &state.reservoir, - sampling_config, - state.event.value(), - root_config, - state.envelope().dsc(), - ) - } - _ => SamplingResult::Pending, - } + let reservoir = Group::supports_reservoir_sampling().then_some(&state.reservoir); + + compute_sampling_decision( + config.processing_enabled(), + reservoir, + sampling_config, + state.event.value(), + root_config, + state.envelope().dsc(), + ) } /// Apply the dynamic sampling decision from `compute_sampling_decision`. @@ -140,10 +132,10 @@ pub fn sample_envelope_items( } } -/// Computes the sampling decision on the incoming transaction. +/// Computes the sampling decision on the incoming envelope. fn compute_sampling_decision( processing_enabled: bool, - reservoir: &ReservoirEvaluator, + reservoir: Option<&ReservoirEvaluator>, sampling_config: Option<&SamplingConfig>, event: Option<&Event>, root_sampling_config: Option<&SamplingConfig>, @@ -165,7 +157,10 @@ fn compute_sampling_decision( } } - let mut evaluator = SamplingEvaluator::new(Utc::now()).set_reservoir(reservoir); + let mut evaluator = match reservoir { + Some(reservoir) => SamplingEvaluator::new_with_reservoir(Utc::now(), reservoir), + None => SamplingEvaluator::new(Utc::now()), + }; if let (Some(event), Some(sampling_state)) = (event, sampling_config) { if let Some(seed) = event.id.value().map(|id| id.0) { @@ -265,22 +260,24 @@ fn forward_unsampled_profiles( #[cfg(test)] mod tests { - use std::collections::BTreeMap; use std::sync::Arc; + use bytes::Bytes; use relay_base_schema::project::{ProjectId, ProjectKey}; + use relay_dynamic_config::{MetricExtractionConfig, TransactionMetricsConfig}; use relay_event_schema::protocol::{EventId, LenientString}; use relay_protocol::RuleCondition; use relay_sampling::config::{ DecayingFunction, RuleId, SamplingRule, SamplingValue, TimeRange, }; use relay_sampling::evaluation::{ReservoirCounters, SamplingMatch}; + use relay_system::Addr; use uuid::Uuid; use crate::envelope::{ContentType, Envelope, Item}; use crate::extractors::RequestMeta; - use crate::services::processor::{ProcessEnvelope, ProcessingGroup}; + use crate::services::processor::{ProcessEnvelope, ProcessingGroup, SpanGroup}; use crate::services::project::ProjectState; use crate::testutils::{ self, create_test_processor, new_envelope, state_with_rule_and_condition, @@ -320,13 +317,12 @@ mod tests { let processor = create_test_processor(Default::default()); let (outcome_aggregator, test_store) = testutils::processor_services(); + let mut envelopes = ProcessingGroup::split_envelope(*envelope); + assert_eq!(envelopes.len(), 1); + let (group, envelope) = envelopes.pop().unwrap(); + let message = ProcessEnvelope { - envelope: ManagedEnvelope::standalone( - envelope, - outcome_aggregator, - test_store, - ProcessingGroup::Transaction, - ), + envelope: ManagedEnvelope::standalone(envelope, outcome_aggregator, test_store, group), project_state: Arc::new(ProjectState::allowed()), sampling_project_state, reservoir_counters: ReservoirCounters::default(), @@ -410,7 +406,7 @@ mod tests { // pipeline. let res = compute_sampling_decision( false, - &dummy_reservoir(), + None, Some(&sampling_config), Some(&event), None, @@ -457,7 +453,7 @@ mod tests { .into(); } - ProcessEnvelopeState { + ProcessEnvelopeState:: { event: Annotated::from(event), metrics: Default::default(), sample_rates: None, @@ -623,7 +619,7 @@ mod tests { let res = compute_sampling_decision( false, - &dummy_reservoir(), + None, Some(&sampling_config), Some(&event), None, @@ -661,7 +657,7 @@ mod tests { // Unsupported rule should result in no match if processing is not enabled. let res = compute_sampling_decision( false, - &dummy_reservoir(), + None, Some(&sampling_config), Some(&event), None, @@ -670,14 +666,8 @@ mod tests { assert!(res.is_no_match()); // Match if processing is enabled. - let res = compute_sampling_decision( - true, - &dummy_reservoir(), - Some(&sampling_config), - Some(&event), - None, - None, - ); + let res = + compute_sampling_decision(true, None, Some(&sampling_config), Some(&event), None, None); assert!(res.is_match()); } @@ -710,15 +700,93 @@ mod tests { ..SamplingConfig::new() }; - let res = compute_sampling_decision( - false, - &dummy_reservoir(), - None, - None, - Some(&sampling_config), - Some(&dsc), - ); + let res = + compute_sampling_decision(false, None, None, None, Some(&sampling_config), Some(&dsc)); assert_eq!(get_sampling_match(res).sample_rate(), 0.2); } + + fn run_with_reservoir_rule(processing_group: ProcessingGroup) -> SamplingResult + where + G: Sampling + TryFrom, + { + let bytes = Bytes::from( + r#"{"dsn":"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42","trace":{"trace_id":"89143b0763095bd9c9955e8175d1fb23","public_key":"e12d836b15bb49d7bbf99e64295d995b"}}"#, + ); + let envelope = Envelope::parse_bytes(bytes).unwrap(); + let mut state = ProcessEnvelopeState:: { + event: Annotated::new(Event::default()), + event_metrics_extracted: false, + spans_extracted: false, + metrics: Default::default(), + sample_rates: Default::default(), + extracted_metrics: Default::default(), + project_state: { + let mut state = ProjectState::allowed(); + state.config.transaction_metrics = + Some(ErrorBoundary::Ok(TransactionMetricsConfig { + version: 1, + ..Default::default() + })); + Arc::new(state) + }, + sampling_project_state: { + let mut state = ProjectState::allowed(); + state.config.metric_extraction = + ErrorBoundary::Ok(MetricExtractionConfig::default()); + state.config.sampling = Some(ErrorBoundary::Ok(SamplingConfig { + version: 2, + rules: vec![ + // Set up a reservoir (only used for transactions): + SamplingRule { + condition: RuleCondition::all(), + sampling_value: SamplingValue::Reservoir { limit: 100 }, + ty: RuleType::Trace, + id: RuleId(1), + time_range: Default::default(), + decaying_fn: Default::default(), + }, + // Reject everything that does not go into the reservoir: + SamplingRule { + condition: RuleCondition::all(), + sampling_value: SamplingValue::SampleRate { value: 0.0 }, + ty: RuleType::Trace, + id: RuleId(2), + time_range: Default::default(), + decaying_fn: Default::default(), + }, + ], + rules_v2: vec![], + })); + Some(Arc::new(state)) + }, + project_id: ProjectId::new(1), + managed_envelope: ManagedEnvelope::standalone( + envelope, + Addr::dummy(), + Addr::dummy(), + processing_group, + ) + .try_into() + .unwrap(), + profile_id: None, + reservoir: dummy_reservoir(), + }; + + run(&mut state, &Config::default()) + } + + #[test] + fn test_reservoir_applied_for_transactions() { + let result = run_with_reservoir_rule::(ProcessingGroup::Transaction); + // Default sampling rate is 0.0, but transaction is retained because of reservoir: + assert!(result.should_keep()); + } + + #[test] + fn test_reservoir_not_applied_for_spans() { + let result = run_with_reservoir_rule::(ProcessingGroup::Span); + // Default sampling rate is 0.0, and the reservoir does not apply to spans: + assert!(result.should_drop()); + } } diff --git a/relay-server/src/services/processor/span/processing.rs b/relay-server/src/services/processor/span/processing.rs index 3779a96b76..84f6d6c5b7 100644 --- a/relay-server/src/services/processor/span/processing.rs +++ b/relay-server/src/services/processor/span/processing.rs @@ -28,11 +28,11 @@ use crate::metrics_extraction::generic::extract_metrics; use crate::services::outcome::{DiscardReason, Outcome}; use crate::services::processor::span::extract_transaction_span; use crate::services::processor::{ - Addrs, ProcessEnvelope, ProcessEnvelopeState, ProcessingError, ProcessingGroup, SpanGroup, - TransactionGroup, + dynamic_sampling, Addrs, ProcessEnvelope, ProcessEnvelopeState, ProcessingError, + ProcessingGroup, SpanGroup, TransactionGroup, }; use crate::statsd::{RelayCounters, RelayHistograms}; -use crate::utils::{sample, BufferGuard, ItemAction}; +use crate::utils::{sample, BufferGuard, ItemAction, SamplingResult}; use relay_event_normalization::span::ai::extract_ai_measurements; use thiserror::Error; @@ -49,13 +49,22 @@ pub fn process( ) { use relay_event_normalization::RemoveOtherProcessor; + // We only implement trace-based sampling rules for now, which can be computed + // once for all spans in the envelope. + let sampling_outcome = match dynamic_sampling::run(state, &config) { + SamplingResult::Match(sampling_match) if sampling_match.should_drop() => Some( + Outcome::FilteredSampling(sampling_match.into_matched_rules()), + ), + _ => None, + }; + let span_metrics_extraction_config = match state.project_state.config.metric_extraction { ErrorBoundary::Ok(ref config) if config.is_enabled() => Some(config), _ => None, }; let ai_model_costs_config = global_config.ai_model_costs.clone().ok(); let normalize_span_config = get_normalize_span_config( - config, + Arc::clone(&config), state.managed_envelope.received_at(), global_config.measurements.as_ref(), state.project_state.config().measurements.as_ref(), @@ -138,14 +147,18 @@ pub fn process( item.set_metrics_extracted(true); } - // TODO: dynamic sampling + if let Some(sampling_outcome) = &sampling_outcome { + relay_log::trace!( + "Dropping span because of sampling rule {}", + sampling_outcome + ); + return ItemAction::Drop(sampling_outcome.clone()); + } if let Err(e) = scrub(&mut annotated_span, &state.project_state.config) { relay_log::error!("failed to scrub span: {e}"); } - // TODO: rate limiting - // Remove additional fields. process_value( &mut annotated_span, @@ -567,7 +580,6 @@ fn normalize( Ok(()) } -#[cfg(feature = "processing")] fn scrub( annotated_span: &mut Annotated, project_config: &ProjectConfig, diff --git a/relay-server/src/utils/dynamic_sampling.rs b/relay-server/src/utils/dynamic_sampling.rs index f5cf868a9c..f3816b6eec 100644 --- a/relay-server/src/utils/dynamic_sampling.rs +++ b/relay-server/src/utils/dynamic_sampling.rs @@ -76,7 +76,6 @@ pub fn is_trace_fully_sampled( return Some(false); } - // TODO(tor): pass correct now timestamp let evaluator = SamplingEvaluator::new(Utc::now()); let rules = root_project_config.filter_rules(RuleType::Trace); @@ -94,8 +93,12 @@ pub fn get_sampling_key(envelope: &Envelope) -> Option { // If the envelope item is not of type transaction or event, we will not return a sampling key // because it doesn't make sense to load the root project state if we don't perform trace // sampling. - envelope - .get_item_by(|item| item.ty() == &ItemType::Transaction || item.ty() == &ItemType::Event)?; + envelope.get_item_by(|item| { + matches!( + item.ty(), + ItemType::Transaction | ItemType::Event | ItemType::Span + ) + })?; envelope.dsc().map(|dsc| dsc.public_key) } diff --git a/tests/integration/test_dynamic_sampling.py b/tests/integration/test_dynamic_sampling.py index 05af8204a2..d1826e9417 100644 --- a/tests/integration/test_dynamic_sampling.py +++ b/tests/integration/test_dynamic_sampling.py @@ -176,25 +176,6 @@ def _add_trace_info( trace_info["sampled"] = sampled -def _create_event_envelope( - public_key, client_sample_rate=None, trace_id=None, event_id=None, transaction=None -): - envelope = Envelope() - event, trace_id, event_id = _create_event_item( - trace_id=trace_id, event_id=event_id, transaction=transaction - ) - envelope.add_event(event) - _add_trace_info( - envelope, - trace_id=trace_id, - public_key=public_key, - client_sample_rate=client_sample_rate, - transaction=transaction, - ) - - return envelope, trace_id, event_id - - def _create_transaction_envelope( public_key, client_sample_rate=None, diff --git a/tests/integration/test_spans.py b/tests/integration/test_spans.py index 6109e0737a..eb2afeb94a 100644 --- a/tests/integration/test_spans.py +++ b/tests/integration/test_spans.py @@ -1594,3 +1594,99 @@ def test_span_extraction_with_tags( assert transaction_span["tags"] == expected_tags spans_consumer.assert_empty() + + +@pytest.mark.parametrize("sample_rate", [0.0, 1.0]) +def test_dynamic_sampling( + mini_sentry, + relay_with_processing, + spans_consumer, + outcomes_consumer, + sample_rate, +): + spans_consumer = spans_consumer() + outcomes_consumer = outcomes_consumer() + + project_id = 42 + project_config = mini_sentry.add_basic_project_config(project_id) + project_config["config"]["features"] = [ + "organizations:standalone-span-ingestion", + ] + project_config["config"]["transactionMetrics"] = {"version": 1} + + sampling_config = mini_sentry.add_basic_project_config(43) + sampling_public_key = sampling_config["publicKeys"][0]["publicKey"] + sampling_config["config"]["txNameRules"] = [ + { + "pattern": "/auth/login/*/**", + "expiry": "3022-11-30T00:00:00.000000Z", + "redaction": {"method": "replace", "substitution": "*"}, + } + ] + sampling_config["config"]["sampling"] = { + "version": 2, + "rules": [ + { + "id": 1, + "samplingValue": {"type": "sampleRate", "value": sample_rate}, + "type": "trace", + "condition": { + "op": "and", + "inner": [ + { + "op": "eq", + "name": "trace.transaction", + "value": "/auth/login/*", + "options": { + "ignoreCase": True, + }, + } + ], + }, + }, + ], + } + + relay = relay_with_processing( + options={ + "aggregator": { + "bucket_interval": 1, + "initial_delay": 0, + "debounce_delay": 0, + "max_secs_in_past": 2**64 - 1, + } + } + ) + + duration = timedelta(milliseconds=500) + end = datetime.now(timezone.utc) - timedelta(seconds=1) + start = end - duration + + # 1 - Send OTel span and sentry span via envelope + envelope = envelope_with_spans(start, end) + envelope.headers["trace"] = { + "public_key": sampling_public_key, + "trace_id": "89143b0763095bd9c9955e8175d1fb23", + "segment_name": "/auth/login/my_user_name", + } + + relay.send_envelope(project_id, envelope) + + def summarize_outcomes(outcomes): + counter = Counter() + for outcome in outcomes: + counter[(outcome["category"], outcome["outcome"])] += outcome["quantity"] + return counter + + if sample_rate == 1.0: + spans = list(spans_consumer.get_spans(timeout=10, max_attempts=4)) + assert len(spans) == 4 + outcomes = outcomes_consumer.get_outcomes(timeout=0.1) + assert summarize_outcomes(outcomes) == {(16, 0): 4} # SpanIndexed, Accepted + else: + outcomes = outcomes_consumer.get_outcomes(timeout=10) + assert summarize_outcomes(outcomes) == {(12, 1): 4} # Span, Filtered + assert {o["reason"] for o in outcomes} == {"Sampled:1"} + + spans_consumer.assert_empty() + outcomes_consumer.assert_empty()