Skip to content

Commit

Permalink
should_filter, remove config args
Browse files Browse the repository at this point in the history
  • Loading branch information
jjbayer committed Aug 2, 2024
1 parent 915a92e commit d9663f3
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 64 deletions.
30 changes: 11 additions & 19 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -867,11 +867,11 @@ impl<'a, Group> ProcessEnvelopeState<'a, Group> {
self.event = Annotated::empty();
}

/// Returns `true` for managed relays if a feature is disabled.
/// Function for on-off switches that filter specific item types (profiles, spans)
/// based on a feature flag.
///
/// Some envelope items are dropped based on a feature flag,
/// but we want to forward them in proxy mode.
fn feature_disabled_by_upstream(&self, feature: Feature) -> bool {
/// If the project config did not come from the upstream, we keep the items
fn should_filter(&self, feature: Feature) -> bool {
match self.config.relay_mode() {
RelayMode::Proxy | RelayMode::Static | RelayMode::Capture => false,
RelayMode::Managed => !self.project_state.has_feature(feature),
Expand Down Expand Up @@ -1771,7 +1771,7 @@ impl EnvelopeProcessorService {
matches!(filter_run, FiltersStatus::Ok) || self.inner.config.processing_enabled();

let sampling_result = match run_dynamic_sampling {
true => dynamic_sampling::run(state, &self.inner.config),
true => dynamic_sampling::run(state),
false => SamplingResult::Pending,
};

Expand All @@ -1780,7 +1780,7 @@ impl EnvelopeProcessorService {
// Process profiles before dropping the transaction, if necessary.
// Before metric extraction to make sure the profile count is reflected correctly.
let profile_id = match keep_profiles {
true => profile::process(state, &self.inner.config),
true => profile::process(state),
false => profile_id,
};

Expand Down Expand Up @@ -1808,7 +1808,7 @@ impl EnvelopeProcessorService {

if_processing!(self.inner.config, {
// Process profiles before extracting metrics, to make sure they are removed if they are invalid.
let profile_id = profile::process(state, &self.inner.config);
let profile_id = profile::process(state);
profile::transfer_id(state, profile_id);

// Always extract metrics in processing Relays for sampled items.
Expand All @@ -1818,7 +1818,7 @@ impl EnvelopeProcessorService {
.project_state
.has_feature(Feature::ExtractSpansFromEvent)
{
span::extract_from_event(state, &self.inner.config, &global_config);
span::extract_from_event(state, &global_config);
}

self.enforce_quotas(state)?;
Expand Down Expand Up @@ -1890,11 +1890,7 @@ impl EnvelopeProcessorService {
self.enforce_quotas(state)?;
});

report::process_client_reports(
state,
&self.inner.config,
self.inner.addrs.outcome_aggregator.clone(),
);
report::process_client_reports(state, self.inner.addrs.outcome_aggregator.clone());

Ok(())
}
Expand All @@ -1904,11 +1900,7 @@ impl EnvelopeProcessorService {
&self,
state: &mut ProcessEnvelopeState<ReplayGroup>,
) -> Result<(), ProcessingError> {
replay::process(
state,
&self.inner.config,
&self.inner.global_config.current(),
)?;
replay::process(state, &self.inner.global_config.current())?;
if_processing!(self.inner.config, {
self.enforce_quotas(state)?;
});
Expand Down Expand Up @@ -1939,7 +1931,7 @@ impl EnvelopeProcessorService {
if_processing!(self.inner.config, {
let global_config = self.inner.global_config.current();

span::process(state, self.inner.config.clone(), &global_config);
span::process(state, &global_config);

self.enforce_quotas(state)?;
});
Expand Down
30 changes: 16 additions & 14 deletions relay-server/src/services/processor/dynamic_sampling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub fn ensure_dsc(state: &mut ProcessEnvelopeState<TransactionGroup>) {
}

/// Computes the sampling decision on the incoming event
pub fn run<Group>(state: &mut ProcessEnvelopeState<Group>, config: &Config) -> SamplingResult
pub fn run<Group>(state: &mut ProcessEnvelopeState<Group>) -> SamplingResult
where
Group: Sampling,
{
Expand All @@ -81,7 +81,7 @@ where
let reservoir = Group::supports_reservoir_sampling().then_some(&state.reservoir);

compute_sampling_decision(
config.processing_enabled(),
state.config.processing_enabled(),
reservoir,
sampling_config,
state.event.value(),
Expand Down Expand Up @@ -422,13 +422,15 @@ mod tests {
relay_test::setup();
let (outcome_aggregator, test_store) = testutils::processor_services();

let config = Config::from_json_value(serde_json::json!({
"processing": {
"enabled": true,
"kafka_config": [],
}
}))
.unwrap();
let config = Arc::new(
Config::from_json_value(serde_json::json!({
"processing": {
"enabled": true,
"kafka_config": [],
}
}))
.unwrap(),
);

// Gets a ProcessEnvelopeState, either with or without the metrics_exracted flag toggled.
let get_state = |version: Option<u16>| {
Expand Down Expand Up @@ -466,7 +468,7 @@ mod tests {
Arc::new(GlobalConfig::default()),
envelope.dsc(),
),
config: Arc::new(Config::default()),
config: config.clone(),
project_state,
sampling_project_state: None,
project_id: ProjectId::new(42),
Expand All @@ -487,17 +489,17 @@ mod tests {

// None represents no TransactionMetricsConfig, DS will not be run
let mut state = get_state(None);
let sampling_result = run(&mut state, &config);
let sampling_result = run(&mut state);
assert_eq!(sampling_result.decision(), SamplingDecision::Keep);

// Current version is 3, so it won't run DS if it's outdated
let mut state = get_state(Some(2));
let sampling_result = run(&mut state, &config);
let sampling_result = run(&mut state);
assert_eq!(sampling_result.decision(), SamplingDecision::Keep);

// Dynamic sampling is run, as the transactionmetrics version is up to date.
let mut state = get_state(Some(3));
let sampling_result = run(&mut state, &config);
let sampling_result = run(&mut state);
assert_eq!(sampling_result.decision(), SamplingDecision::Drop);
}

Expand Down Expand Up @@ -787,7 +789,7 @@ mod tests {
event_fully_normalized: false,
};

run(&mut state, &Config::default())
run(&mut state)
}

#[test]
Expand Down
12 changes: 4 additions & 8 deletions relay-server/src/services/processor/profile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@ use crate::utils::ItemAction;
///
/// Returns the profile id of the single remaining profile, if there is one.
pub fn filter<G>(state: &mut ProcessEnvelopeState<G>) -> Option<ProfileId> {
let profiling_disabled = state.feature_disabled_by_upstream(Feature::Profiling);
let profiling_disabled = state.should_filter(Feature::Profiling);
let has_transaction = state.event_type() == Some(EventType::Transaction);
let keep_unsampled_profiles =
!state.feature_disabled_by_upstream(Feature::IngestUnsampledProfiles);
let keep_unsampled_profiles = !state.should_filter(Feature::IngestUnsampledProfiles);

let mut profile_id = None;
state.managed_envelope.retain_items(|item| match item.ty() {
Expand Down Expand Up @@ -89,10 +88,7 @@ pub fn transfer_id(
}

/// Processes profiles and set the profile ID in the profile context on the transaction if successful.
pub fn process(
state: &mut ProcessEnvelopeState<TransactionGroup>,
config: &Config,
) -> Option<ProfileId> {
pub fn process(state: &mut ProcessEnvelopeState<TransactionGroup>) -> Option<ProfileId> {
let profiling_enabled = state.project_state.has_feature(Feature::Profiling);
let mut profile_id = None;

Expand All @@ -108,7 +104,7 @@ pub fn process(
return ItemAction::DropSilently;
};

match expand_profile(item, event, config) {
match expand_profile(item, event, &state.config) {
Ok(id) => {
profile_id = Some(id);
ItemAction::Keep
Expand Down
6 changes: 2 additions & 4 deletions relay-server/src/services/processor/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::error::Error;
use std::net::IpAddr;

use bytes::Bytes;
use relay_config::Config;
use relay_dynamic_config::{Feature, GlobalConfig, ProjectConfig};
use relay_event_normalization::replay::{self, ReplayError};
use relay_event_normalization::RawUserAgentInfo;
Expand All @@ -23,19 +22,18 @@ use crate::statsd::RelayTimers;
/// Removes replays if the feature flag is not enabled.
pub fn process(
state: &mut ProcessEnvelopeState<ReplayGroup>,
config: &Config,
global_config: &GlobalConfig,
) -> Result<(), ProcessingError> {
let project_state = &state.project_state;
let replays_disabled = state.feature_disabled_by_upstream(Feature::SessionReplay);
let replays_disabled = state.should_filter(Feature::SessionReplay);
let scrubbing_enabled = project_state.has_feature(Feature::SessionReplayRecordingScrubbing);
let replay_video_disabled = project_state.has_feature(Feature::SessionReplayVideoDisabled);

let meta = state.envelope().meta().clone();
let client_addr = meta.client_addr();
let event_id = state.envelope().event_id();

let limit = config.max_replay_uncompressed_size();
let limit = state.config.max_replay_uncompressed_size();
let project_config = project_state.config();
let datascrubbing_config = project_config
.datascrubbing_settings
Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/services/processor/report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::error::Error;

use chrono::{Duration as SignedDuration, Utc};
use relay_common::time::UnixTimestamp;
use relay_config::Config;
use relay_event_normalization::ClockDriftProcessor;
use relay_event_schema::protocol::{ClientReport, UserReport};
use relay_filter::FilterStatKey;
Expand Down Expand Up @@ -41,11 +40,11 @@ pub enum ClientReportField {
/// system.
pub fn process_client_reports(
state: &mut ProcessEnvelopeState<ClientReportGroup>,
config: &Config,
outcome_aggregator: Addr<TrackOutcome>,
) {
// if client outcomes are disabled we leave the the client reports unprocessed
// and pass them on.
let config = &state.config;
if !config.emit_outcomes().any() || !config.emit_client_outcomes() {
// if a processing relay has client outcomes disabled we drop them.
if config.processing_enabled() {
Expand Down Expand Up @@ -261,6 +260,7 @@ mod tests {

use std::sync::Arc;

use relay_config::Config;
use relay_event_schema::protocol::EventId;
use relay_sampling::evaluation::ReservoirCounters;

Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/services/processor/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ mod processing;
pub use processing::*;

pub fn filter(state: &mut ProcessEnvelopeState<SpanGroup>) {
let disabled = state.feature_disabled_by_upstream(Feature::StandaloneSpanIngestion);
let disabled = state.should_filter(Feature::StandaloneSpanIngestion);
state.managed_envelope.retain_items(|item| {
if item.is_span() && disabled {
relay_log::debug!("dropping span because feature is disabled");
Expand Down
25 changes: 9 additions & 16 deletions relay-server/src/services/processor/span/processing.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Contains the processing-only functionality.

use std::error::Error;
use std::sync::Arc;

use chrono::{DateTime, Utc};
use relay_base_schema::events::EventType;
Expand Down Expand Up @@ -43,23 +42,19 @@ use thiserror::Error;
#[error(transparent)]
struct ValidationError(#[from] anyhow::Error);

pub fn process(
state: &mut ProcessEnvelopeState<SpanGroup>,
config: Arc<Config>,
global_config: &GlobalConfig,
) {
pub fn process(state: &mut ProcessEnvelopeState<SpanGroup>, global_config: &GlobalConfig) {
use relay_event_normalization::RemoveOtherProcessor;

// We only implement trace-based sampling rules for now, which can be computed
// once for all spans in the envelope.
let sampling_result = dynamic_sampling::run(state, &config);
let sampling_result = dynamic_sampling::run(state);

let span_metrics_extraction_config = match state.project_state.config.metric_extraction {
ErrorBoundary::Ok(ref config) if config.is_enabled() => Some(config),
_ => None,
};
let normalize_span_config = NormalizeSpanConfig::new(
&config,
&state.config,
global_config,
state.project_state.config(),
&state.managed_envelope,
Expand Down Expand Up @@ -205,7 +200,6 @@ pub fn process(

pub fn extract_from_event(
state: &mut ProcessEnvelopeState<TransactionGroup>,
config: &Config,
global_config: &GlobalConfig,
) {
// Only extract spans from transactions (not errors).
Expand Down Expand Up @@ -269,7 +263,8 @@ pub fn extract_from_event(

let Some(transaction_span) = extract_transaction_span(
event,
config
state
.config
.aggregator_config_for(MetricNamespace::Spans)
.aggregator
.max_tag_value_length,
Expand Down Expand Up @@ -644,6 +639,7 @@ fn validate(span: &mut Annotated<Span>) -> Result<(), ValidationError> {
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use std::sync::Arc;

use bytes::Bytes;
use relay_base_schema::project::ProjectId;
Expand Down Expand Up @@ -727,11 +723,10 @@ mod tests {

#[test]
fn extract_sampled_default() {
let config = Config::default();
let global_config = GlobalConfig::default();
assert!(global_config.options.span_extraction_sample_rate.is_none());
let mut state = state();
extract_from_event(&mut state, &config, &global_config);
extract_from_event(&mut state, &global_config);
assert!(
state
.envelope()
Expand All @@ -744,11 +739,10 @@ mod tests {

#[test]
fn extract_sampled_explicit() {
let config = Config::default();
let mut global_config = GlobalConfig::default();
global_config.options.span_extraction_sample_rate = Some(1.0);
let mut state = state();
extract_from_event(&mut state, &config, &global_config);
extract_from_event(&mut state, &global_config);
assert!(
state
.envelope()
Expand All @@ -761,11 +755,10 @@ mod tests {

#[test]
fn extract_sampled_dropped() {
let config = Config::default();
let mut global_config = GlobalConfig::default();
global_config.options.span_extraction_sample_rate = Some(0.0);
let mut state = state();
extract_from_event(&mut state, &config, &global_config);
extract_from_event(&mut state, &global_config);
assert!(
!state
.envelope()
Expand Down

0 comments on commit d9663f3

Please sign in to comment.