From 5f8335f3f9d7bcde308a29ecd533a50716cc3724 Mon Sep 17 00:00:00 2001 From: Tor Date: Fri, 29 Sep 2023 11:20:54 +0200 Subject: [PATCH] feat(sampling): Reservoir sampling (#2550) Relay implementation of the reservoir project: https://github.com/getsentry/sentry/issues/54449 Reservoir bias uses a type of `SamplingRule` which will sample all matches until a certain limit has been reached. This limit is tracked both locally on each relay, and with a global synchronized one in redis that procesisng relays can have access to. The redis counter will update the local counter if it's available. The counters are saved on the `Project` struct, with a Mutex> . When we send an envelope for processing we send its corresponding project counters in the `ProcessEnvelopeState` to the `EnvelopeProcessorService`. There, in the `dynamic-sampling` crate, we introduce a `ReservoirEvaluator`, which will, when a reservoir rule is matching, check if the rule has been reached or not by using the local counters we sent or if applicable the redis global count. The `ReservoirEvaluator` also takes care of updating both redis and the local counter. After the limit is reached, the rule is no longer valid and will be ignored, so that the normal `SampleRate` and `Factor` variant of `SamplingValue` will apply. Sentry is responsible for removing the reservoir rule from the `SamplingConfig` when it has reached its limit. Whenever we receive a new `ProjectConfig`, we remove all the reservoir counters from its project that are no longer in the `DynamicSamplingConfig`. regarding the use of mutex: We use try_lock to avoid getting blocked in case the mutex is already in use. There's two reasons it might be blocked. 1. Another thread is handling a reservoir rule from the same project at the same time. 2. We are in the process of removing counters from that same project. --- CHANGELOG.md | 1 + Cargo.lock | 2 + relay-kafka/src/lib.rs | 2 +- relay-sampling/Cargo.toml | 6 + relay-sampling/src/config.rs | 96 ++++++--- relay-sampling/src/evaluation.rs | 223 ++++++++++++++++++++- relay-sampling/src/lib.rs | 2 + relay-sampling/src/redis_sampling.rs | 48 +++++ relay-server/Cargo.toml | 1 + relay-server/src/actors/processor.rs | 52 ++++- relay-server/src/actors/project.rs | 27 +++ relay-server/src/actors/project_cache.rs | 3 + relay-server/src/utils/dynamic_sampling.rs | 2 +- 13 files changed, 420 insertions(+), 45 deletions(-) create mode 100644 relay-sampling/src/redis_sampling.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 20a350a81d..adb631c884 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ - Remove filtering for Android events with missing close events. ([#2524](https://github.com/getsentry/relay/pull/2524)) - Exclude more spans fron metrics extraction. ([#2522](https://github.com/getsentry/relay/pull/2522), [#2525](https://github.com/getsentry/relay/pull/2525), [#2545](https://github.com/getsentry/relay/pull/2545)) - Fix hot-loop burning CPU when upstream service is unavailable. ([#2518](https://github.com/getsentry/relay/pull/2518)) +- Introduce reservoir sampling rule. ([#2550](https://github.com/getsentry/relay/pull/2550)) ## 23.9.1 diff --git a/Cargo.lock b/Cargo.lock index c8abcb2e3d..4a2611b74d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3822,6 +3822,7 @@ dependencies = [ name = "relay-sampling" version = "23.9.1" dependencies = [ + "anyhow", "chrono", "insta", "rand", @@ -3830,6 +3831,7 @@ dependencies = [ "relay-common", "relay-log", "relay-protocol", + "relay-redis", "serde", "serde_json", "similar-asserts", diff --git a/relay-kafka/src/lib.rs b/relay-kafka/src/lib.rs index efc9f2d039..20b87fbc93 100644 --- a/relay-kafka/src/lib.rs +++ b/relay-kafka/src/lib.rs @@ -20,7 +20,7 @@ //! //! // build the client //! let kafka_client = builder.build(); -//! +//! //! // send the message //! kafka_client.send_message(KafkaTopic::Events, 1u64, &kafka_message).unwrap(); //! ``` diff --git a/relay-sampling/Cargo.toml b/relay-sampling/Cargo.toml index 8f803a7e78..4eb0665081 100644 --- a/relay-sampling/Cargo.toml +++ b/relay-sampling/Cargo.toml @@ -9,7 +9,12 @@ edition = "2021" license-file = "../LICENSE" publish = false +[features] +default = [] +redis = ["dep:anyhow", "relay-redis/impl"] + [dependencies] +anyhow = { workspace = true, optional = true } chrono = { workspace = true } rand = { workspace = true } rand_pcg = "0.3.1" @@ -17,6 +22,7 @@ relay-base-schema = { path = "../relay-base-schema" } relay-common = { path = "../relay-common" } relay-log = { path = "../relay-log" } relay-protocol = { path = "../relay-protocol" } +relay-redis = { path = "../relay-redis", optional = true } serde = { workspace = true } serde_json = { workspace = true } unicase = "2.6.0" diff --git a/relay-sampling/src/config.rs b/relay-sampling/src/config.rs index a07b38ca7d..9addfceb54 100644 --- a/relay-sampling/src/config.rs +++ b/relay-sampling/src/config.rs @@ -6,6 +6,7 @@ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use crate::condition::RuleCondition; +use crate::evaluation::ReservoirEvaluator; use crate::utils; /// Represents the dynamic sampling configuration available to a project. @@ -81,14 +82,28 @@ impl SamplingRule { self.condition.supported() && self.ty != RuleType::Unsupported } - /// Returns the sample rate if the rule is active. - pub fn sample_rate(&self, now: DateTime) -> Option { + /// Returns the updated [`SamplingValue`] if it's valid. + pub fn evaluate( + &self, + now: DateTime, + reservoir: Option<&ReservoirEvaluator>, + ) -> Option { if !self.time_range.contains(now) { // Return None if rule is inactive. return None; } - let sampling_base_value = self.sampling_value.value(); + let sampling_base_value = match self.sampling_value { + SamplingValue::SampleRate { value } => value, + SamplingValue::Factor { value } => value, + SamplingValue::Reservoir { limit } => { + return reservoir.and_then(|reservoir| { + reservoir + .evaluate(self.id, limit, self.time_range.end.as_ref()) + .then_some(SamplingValue::Reservoir { limit }) + }); + } + }; let value = match self.decaying_fn { DecayingFunction::Linear { decayed_value } => { @@ -114,6 +129,9 @@ impl SamplingRule { match self.sampling_value { SamplingValue::SampleRate { .. } => Some(SamplingValue::SampleRate { value }), SamplingValue::Factor { .. } => Some(SamplingValue::Factor { value }), + // This should be impossible. + // Todo(tor): refactor so we don't run into this invalid state. + _ => None, } } } @@ -140,18 +158,18 @@ pub enum SamplingValue { /// until a sample rate rule is found. The matched rule's factor will be multiplied with the /// accumulated factors before moving onto the next possible match. Factor { - /// The fator to apply on another matched sample rate. + /// The factor to apply on another matched sample rate. value: f64, }, -} -impl SamplingValue { - pub(crate) fn value(&self) -> f64 { - *match self { - SamplingValue::SampleRate { value } => value, - SamplingValue::Factor { value } => value, - } - } + /// A reservoir limit. + /// + /// A rule with a reservoir limit will be sampled if the rule have been matched fewer times + /// than the limit. + Reservoir { + /// The limit of how many times this rule will be sampled before this rule is invalid. + limit: i64, + }, } /// Defines what a dynamic sampling rule applies to. @@ -174,7 +192,7 @@ pub enum RuleType { /// /// This number must be unique within a Sentry organization, as it is recorded in outcomes and used /// to infer which sampling rule caused data to be dropped. -#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash, PartialOrd, Ord)] pub struct RuleId(pub u32); impl fmt::Display for RuleId { @@ -515,19 +533,19 @@ mod tests { // At the start of the time range, sample rate is equal to the rule's initial sampling value. assert_eq!( - rule.sample_rate(start).unwrap(), + rule.evaluate(start, None).unwrap(), SamplingValue::SampleRate { value: 1.0 } ); // Halfway in the time range, the value is exactly between 1.0 and 0.5. assert_eq!( - rule.sample_rate(halfway).unwrap(), + rule.evaluate(halfway, None).unwrap(), SamplingValue::SampleRate { value: 0.75 } ); // Approaches 0.5 at the end. assert_eq!( - rule.sample_rate(end).unwrap(), + rule.evaluate(end, None).unwrap(), SamplingValue::SampleRate { // It won't go to exactly 0.5 because the time range is end-exclusive. value: 0.5000028935185186 @@ -541,7 +559,7 @@ mod tests { rule }; - assert!(rule_without_start.sample_rate(halfway).is_none()); + assert!(rule_without_start.evaluate(halfway, None).is_none()); let rule_without_end = { let mut rule = rule.clone(); @@ -549,7 +567,7 @@ mod tests { rule }; - assert!(rule_without_end.sample_rate(halfway).is_none()); + assert!(rule_without_end.evaluate(halfway, None).is_none()); } /// If the decayingfunction is set to `Constant` then it shouldn't adjust the sample rate. @@ -571,7 +589,7 @@ mod tests { let halfway = Utc.with_ymd_and_hms(1970, 10, 11, 0, 0, 0).unwrap(); - assert_eq!(rule.sample_rate(halfway), Some(sampling_value)); + assert_eq!(rule.evaluate(halfway, None), Some(sampling_value)); } /// Validates the `sample_rate` method for different time range configurations. @@ -597,30 +615,42 @@ mod tests { time_range, decaying_fn: DecayingFunction::Constant, }; - assert!(rule.sample_rate(before_time_range).is_none()); - assert!(rule.sample_rate(during_time_range).is_some()); - assert!(rule.sample_rate(after_time_range).is_none()); + assert!(rule.evaluate(before_time_range, None).is_none()); + assert!(rule.evaluate(during_time_range, None).is_some()); + assert!(rule.evaluate(after_time_range, None).is_none()); // [start..] let mut rule_without_end = rule.clone(); rule_without_end.time_range.end = None; - assert!(rule_without_end.sample_rate(before_time_range).is_none()); - assert!(rule_without_end.sample_rate(during_time_range).is_some()); - assert!(rule_without_end.sample_rate(after_time_range).is_some()); + assert!(rule_without_end.evaluate(before_time_range, None).is_none()); + assert!(rule_without_end.evaluate(during_time_range, None).is_some()); + assert!(rule_without_end.evaluate(after_time_range, None).is_some()); // [..end] let mut rule_without_start = rule.clone(); rule_without_start.time_range.start = None; - assert!(rule_without_start.sample_rate(before_time_range).is_some()); - assert!(rule_without_start.sample_rate(during_time_range).is_some()); - assert!(rule_without_start.sample_rate(after_time_range).is_none()); + assert!(rule_without_start + .evaluate(before_time_range, None) + .is_some()); + assert!(rule_without_start + .evaluate(during_time_range, None) + .is_some()); + assert!(rule_without_start + .evaluate(after_time_range, None) + .is_none()); // [..] let mut rule_without_range = rule.clone(); rule_without_range.time_range = TimeRange::default(); - assert!(rule_without_range.sample_rate(before_time_range).is_some()); - assert!(rule_without_range.sample_rate(during_time_range).is_some()); - assert!(rule_without_range.sample_rate(after_time_range).is_some()); + assert!(rule_without_range + .evaluate(before_time_range, None) + .is_some()); + assert!(rule_without_range + .evaluate(during_time_range, None) + .is_some()); + assert!(rule_without_range + .evaluate(after_time_range, None) + .is_some()); } /// You can pass in a SamplingValue of either variant, and it should return the same one if @@ -639,13 +669,13 @@ mod tests { }; matches!( - rule.sample_rate(Utc::now()).unwrap(), + rule.evaluate(Utc::now(), None).unwrap(), SamplingValue::SampleRate { .. } ); rule.sampling_value = SamplingValue::Factor { value: 0.42 }; matches!( - rule.sample_rate(Utc::now()).unwrap(), + rule.evaluate(Utc::now(), None).unwrap(), SamplingValue::Factor { .. } ); } diff --git a/relay-sampling/src/evaluation.rs b/relay-sampling/src/evaluation.rs index 62a5192a63..0d414fa888 100644 --- a/relay-sampling/src/evaluation.rs +++ b/relay-sampling/src/evaluation.rs @@ -1,18 +1,24 @@ //! Evaluation of dynamic sampling rules. +use std::collections::BTreeMap; use std::fmt; use std::num::ParseIntError; use std::ops::ControlFlow; +use std::sync::{Arc, Mutex}; use chrono::{DateTime, Utc}; use rand::distributions::Uniform; use rand::Rng; use rand_pcg::Pcg32; use relay_protocol::Getter; +#[cfg(feature = "redis")] +use relay_redis::RedisPool; use serde::Serialize; use uuid::Uuid; use crate::config::{RuleId, SamplingRule, SamplingValue}; +#[cfg(feature = "redis")] +use crate::redis_sampling::{self, ReservoirRuleKey}; /// Generates a pseudo random number by seeding the generator with the given id. /// @@ -24,16 +30,126 @@ fn pseudo_random_from_uuid(id: Uuid) -> f64 { generator.sample(dist) } +/// The amount of matches for each reservoir rule in a given project. +pub type ReservoirCounters = Arc>>; + +/// Utility for evaluating reservoir-based sampling rules. +/// +/// A "reservoir limit" rule samples every match until its limit is reached, after which +/// the rule is disabled. +/// +/// This utility uses a dual-counter system for enforcing this limit: +/// +/// - Local Counter: Each relay instance maintains a local counter to track sampled events. +/// +/// - Redis Counter: For processing relays, a Redis-based counter provides synchronization +/// across multiple relay-instances. When incremented, the Redis counter returns the current global +/// count for the given rule, which is then used to update the local counter. +#[derive(Debug)] +pub struct ReservoirEvaluator<'a> { + counters: ReservoirCounters, + #[cfg(feature = "redis")] + org_id_and_redis_pool: Option<(u64, &'a RedisPool)>, + // Using PhantomData because the lifetimes are behind a feature flag. + _phantom: std::marker::PhantomData<&'a ()>, +} + +impl<'a> ReservoirEvaluator<'a> { + /// Constructor for [`ReservoirEvaluator`]. + pub fn new(counters: ReservoirCounters) -> Self { + Self { + counters, + #[cfg(feature = "redis")] + org_id_and_redis_pool: None, + _phantom: std::marker::PhantomData, + } + } + + /// Sets the Redis pool and organiation ID for the [`ReservoirEvaluator`]. + /// + /// These values are needed to synchronize with Redis. + #[cfg(feature = "redis")] + pub fn set_redis(&mut self, org_id: u64, redis_pool: &'a RedisPool) { + self.org_id_and_redis_pool = Some((org_id, redis_pool)); + } + + #[cfg(feature = "redis")] + fn redis_incr( + &self, + key: &ReservoirRuleKey, + redis_pool: &RedisPool, + rule_expiry: Option<&DateTime>, + ) -> anyhow::Result { + let mut redis_client = redis_pool.client()?; + let mut redis_connection = redis_client.connection()?; + + let val = redis_sampling::increment_redis_reservoir_count(&mut redis_connection, key)?; + redis_sampling::set_redis_expiry(&mut redis_connection, key, rule_expiry)?; + + Ok(val) + } + + /// Evaluates a reservoir rule, returning `true` if it should be sampled. + pub fn incr_local(&self, rule: RuleId, limit: i64) -> bool { + let Ok(mut map_guard) = self.counters.lock() else { + relay_log::error!("failed to lock reservoir counter mutex"); + return false; + }; + + let counter_value = map_guard.entry(rule).or_insert(0); + + if *counter_value < limit { + *counter_value += 1; + true + } else { + false + } + } + + /// Evaluates a reservoir rule, returning `true` if it should be sampled. + pub fn evaluate(&self, rule: RuleId, limit: i64, _rule_expiry: Option<&DateTime>) -> bool { + #[cfg(feature = "redis")] + if let Some((org_id, redis_pool)) = self.org_id_and_redis_pool { + if let Ok(guard) = self.counters.lock() { + if *guard.get(&rule).unwrap_or(&0) > limit { + return false; + } + } + + let key = ReservoirRuleKey::new(org_id, rule); + let redis_count = match self.redis_incr(&key, redis_pool, _rule_expiry) { + Ok(redis_count) => redis_count, + Err(e) => { + relay_log::error!(error = &*e, "failed to increment reservoir rule"); + return false; + } + }; + + if let Ok(mut map_guard) = self.counters.lock() { + // If the rule isn't present, it has just been cleaned up by a project state update. + // In that case, it is no longer relevant so we ignore it. + if let Some(value) = map_guard.get_mut(&rule) { + *value = redis_count.max(*value); + } + } + return redis_count <= limit; + } + + self.incr_local(rule, limit) + } +} + /// State machine for dynamic sampling. #[derive(Debug)] -pub struct SamplingEvaluator { +pub struct SamplingEvaluator<'a> { now: DateTime, rule_ids: Vec, factor: f64, client_sample_rate: Option, + reservoir: Option<&'a ReservoirEvaluator<'a>>, } -impl SamplingEvaluator { +impl<'a> SamplingEvaluator<'a> { /// Constructor for [`SamplingEvaluator`]. pub fn new(now: DateTime) -> Self { Self { @@ -41,9 +157,16 @@ impl SamplingEvaluator { rule_ids: vec![], factor: 1.0, client_sample_rate: None, + reservoir: None, } } + /// Sets a [`ReservoirEvaluator`]. + pub fn set_reservoir(mut self, reservoir: &'a ReservoirEvaluator) -> Self { + self.reservoir = Some(reservoir); + self + } + /// Sets a new client sample rate value. pub fn adjust_client_sample_rate(mut self, client_sample_rate: Option) -> Self { self.client_sample_rate = client_sample_rate; @@ -61,7 +184,7 @@ impl SamplingEvaluator { /// - If this value is returned and there are no more rules to evaluate, it should be interpreted as "no match." /// /// - `ControlFlow::Break`: Indicates that one or more rules have successfully matched. - pub fn match_rules<'a, I, G>( + pub fn match_rules<'b, I, G>( mut self, seed: Uuid, instance: &G, @@ -69,14 +192,14 @@ impl SamplingEvaluator { ) -> ControlFlow where G: Getter, - I: Iterator, + I: Iterator, { for rule in rules { if !rule.condition.matches(instance) { continue; }; - let Some(sampling_value) = rule.sample_rate(self.now) else { + let Some(sampling_value) = rule.evaluate(self.now, self.reservoir) else { continue; }; @@ -93,6 +216,9 @@ impl SamplingEvaluator { self.rule_ids, )); } + SamplingValue::Reservoir { .. } => { + return ControlFlow::Break(SamplingMatch::new(1.0, seed, vec![rule.id])); + } } } ControlFlow::Continue(self) @@ -130,6 +256,12 @@ impl SamplingEvaluator { } fn sampling_match(sample_rate: f64, seed: Uuid) -> bool { + if sample_rate == 0.0 { + return false; + } else if sample_rate == 1.0 { + return true; + } + let random_number = pseudo_random_from_uuid(seed); relay_log::trace!( sample_rate, @@ -254,6 +386,18 @@ mod tests { use super::*; + fn mock_reservoir_evaluator(vals: Vec<(u32, i64)>) -> ReservoirEvaluator<'static> { + let mut map = BTreeMap::default(); + + for (rule_id, count) in vals { + map.insert(RuleId(rule_id), count); + } + + let map = Arc::new(Mutex::new(map)); + + ReservoirEvaluator::new(map) + } + /// Helper to extract the sampling match after evaluating rules. fn get_sampling_match(rules: &[SamplingRule], instance: &impl Getter) -> SamplingMatch { match SamplingEvaluator::new(Utc::now()).match_rules( @@ -277,6 +421,16 @@ mod tests { matched_rule_ids == sampling_match.matched_rules } + // Helper method to "unwrap" the sampling match. + fn get_matched_rules( + sampling_evaluator: &ControlFlow, + ) -> Vec { + match sampling_evaluator { + ControlFlow::Continue(_) => panic!("expected a sampling match"), + ControlFlow::Break(m) => m.matched_rules.0.iter().map(|rule_id| rule_id.0).collect(), + } + } + /// Helper function to create a dsc with the provided getter-values set. fn mocked_dsc_with_getter_values( paths_and_values: Vec<(&str, &str)>, @@ -309,6 +463,21 @@ mod tests { dsc } + #[test] + fn test_reservoir_evaluator_limit() { + let evaluator = mock_reservoir_evaluator(vec![(1, 0)]); + + let rule = RuleId(1); + let limit = 3; + + assert!(evaluator.evaluate(rule, limit, None)); + assert!(evaluator.evaluate(rule, limit, None)); + assert!(evaluator.evaluate(rule, limit, None)); + // After 3 samples we have reached the limit, and the following rules are not sampled. + assert!(!evaluator.evaluate(rule, limit, None)); + assert!(!evaluator.evaluate(rule, limit, None)); + } + #[test] fn test_adjust_sample_rate() { // return the same as input if no client sample rate set in the sampling evaluator. @@ -387,6 +556,50 @@ mod tests { vec } + /// Tests that reservoir rules override the other rules. + /// + /// Here all 3 rules are a match. But when the reservoir + /// rule (id = 1) has not yet reached its limit of "2" matches, the + /// previous rule(s) will not be present in the matched rules output. + /// After the limit has been reached, the reservoir rule is ignored + /// and the output is the two other rules (id = 0, id = 2). + #[test] + fn test_reservoir_override() { + let dsc = mocked_dsc_with_getter_values(vec![]); + let rules = simple_sampling_rules(vec![ + (RuleCondition::all(), SamplingValue::Factor { value: 0.5 }), + // The reservoir has a limit of 2, meaning it should be sampled twice + // before it is ignored. + (RuleCondition::all(), SamplingValue::Reservoir { limit: 2 }), + ( + RuleCondition::all(), + SamplingValue::SampleRate { value: 0.5 }, + ), + ]); + + // The reservoir keeps the counter state behind a mutex, which is how it + // shares state among multiple evaluator instances. + let reservoir = mock_reservoir_evaluator(vec![]); + + let evaluator = SamplingEvaluator::new(Utc::now()).set_reservoir(&reservoir); + let matched_rules = + get_matched_rules(&evaluator.match_rules(Uuid::default(), &dsc, rules.iter())); + // Reservoir rule overrides 0 and 2. + assert_eq!(&matched_rules, &[1]); + + let evaluator = SamplingEvaluator::new(Utc::now()).set_reservoir(&reservoir); + let matched_rules = + get_matched_rules(&evaluator.match_rules(Uuid::default(), &dsc, rules.iter())); + // Reservoir rule overrides 0 and 2. + assert_eq!(&matched_rules, &[1]); + + let evaluator = SamplingEvaluator::new(Utc::now()).set_reservoir(&reservoir); + let matched_rules = + get_matched_rules(&evaluator.match_rules(Uuid::default(), &dsc, rules.iter())); + // Reservoir rule reached its limit, rule 0 and 2 are now matched instead. + assert_eq!(&matched_rules, &[0, 2]); + } + /// Checks that rules don't match if the time is outside the time range. #[test] fn test_expired_rules() { diff --git a/relay-sampling/src/lib.rs b/relay-sampling/src/lib.rs index ce777bc4e1..b79edefbd8 100644 --- a/relay-sampling/src/lib.rs +++ b/relay-sampling/src/lib.rs @@ -77,6 +77,8 @@ pub mod condition; pub mod config; pub mod dsc; pub mod evaluation; +#[cfg(feature = "redis")] +mod redis_sampling; mod utils; pub use config::SamplingConfig; diff --git a/relay-sampling/src/redis_sampling.rs b/relay-sampling/src/redis_sampling.rs new file mode 100644 index 0000000000..d56c217dde --- /dev/null +++ b/relay-sampling/src/redis_sampling.rs @@ -0,0 +1,48 @@ +use chrono::{DateTime, Utc}; + +use crate::config::RuleId; + +pub struct ReservoirRuleKey(String); + +impl ReservoirRuleKey { + pub fn new(org_id: u64, rule_id: RuleId) -> Self { + Self(format!("reservoir:{}:{}", org_id, rule_id)) + } + + fn as_str(&self) -> &str { + self.0.as_str() + } +} + +/// Increments the reservoir count for a given rule in redis. +/// +/// - INCR docs: [`https://redis.io/commands/incr/`] +/// - If the counter doesn't exist in redis, a new one will be inserted. +pub fn increment_redis_reservoir_count( + redis_connection: &mut relay_redis::Connection, + key: &ReservoirRuleKey, +) -> anyhow::Result { + let val = relay_redis::redis::cmd("INCR") + .arg(key.as_str()) + .query(redis_connection)?; + + Ok(val) +} + +/// Sets the expiry time for a reservoir rule count. +pub fn set_redis_expiry( + redis_connection: &mut relay_redis::Connection, + key: &ReservoirRuleKey, + rule_expiry: Option<&DateTime>, +) -> anyhow::Result<()> { + let now = Utc::now().timestamp(); + let expiry_time = rule_expiry + .map(|rule_expiry| rule_expiry.timestamp() + 60) + .unwrap_or_else(|| now + 86400); + + relay_redis::redis::cmd("EXPIRE") + .arg(key.as_str()) + .arg(expiry_time - now) + .query(redis_connection)?; + Ok(()) +} diff --git a/relay-server/Cargo.toml b/relay-server/Cargo.toml index 1d8f7344fc..28337f2d17 100644 --- a/relay-server/Cargo.toml +++ b/relay-server/Cargo.toml @@ -28,6 +28,7 @@ processing = [ "relay-kafka/producer", "relay-quotas/redis", "relay-redis/impl", + "relay-sampling/redis", ] [dependencies] diff --git a/relay-server/src/actors/processor.rs b/relay-server/src/actors/processor.rs index b4b5a4014d..8e0d6880d8 100644 --- a/relay-server/src/actors/processor.rs +++ b/relay-server/src/actors/processor.rs @@ -44,7 +44,9 @@ use relay_quotas::{DataCategory, ReasonCode}; use relay_redis::RedisPool; use relay_replays::recording::RecordingScrubber; use relay_sampling::config::{RuleType, SamplingMode}; -use relay_sampling::evaluation::{MatchedRuleIds, SamplingEvaluator}; +use relay_sampling::evaluation::{ + MatchedRuleIds, ReservoirCounters, ReservoirEvaluator, SamplingEvaluator, +}; use relay_sampling::{DynamicSamplingContext, SamplingConfig}; use relay_statsd::metric; use relay_system::{Addr, FromMessage, NoResponse, Service}; @@ -254,7 +256,7 @@ impl ExtractedMetrics { /// A state container for envelope processing. #[derive(Debug)] -struct ProcessEnvelopeState { +struct ProcessEnvelopeState<'a> { /// The extracted event payload. /// /// For Envelopes without event payloads, this contains `Annotated::empty`. If a single item has @@ -307,9 +309,12 @@ struct ProcessEnvelopeState { /// Whether there is a profiling item in the envelope. has_profile: bool, + + /// Reservoir evaluator that we use for dynamic sampling. + reservoir: ReservoirEvaluator<'a>, } -impl ProcessEnvelopeState { +impl<'a> ProcessEnvelopeState<'a> { /// Returns a reference to the contained [`Envelope`]. fn envelope(&self) -> &Envelope { self.managed_envelope.envelope() @@ -425,6 +430,7 @@ pub struct ProcessEnvelope { pub envelope: ManagedEnvelope, pub project_state: Arc, pub sampling_project_state: Option>, + pub reservoir_counters: ReservoirCounters, } /// Parses a list of metrics or metric buckets and pushes them to the project's aggregator. @@ -533,6 +539,8 @@ pub struct EnvelopeProcessorService { struct InnerProcessor { config: Arc, + #[cfg(feature = "processing")] + redis_pool: Option, envelope_manager: Addr, project_cache: Addr, global_config: Addr, @@ -565,6 +573,8 @@ impl EnvelopeProcessorService { }); let inner = InnerProcessor { + #[cfg(feature = "processing")] + redis_pool: _redis.clone(), #[cfg(feature = "processing")] rate_limiter: _redis .map(|pool| RedisRateLimiter::new(pool).max_limit(config.max_rate_limit())), @@ -1319,6 +1329,7 @@ impl EnvelopeProcessorService { envelope: mut managed_envelope, project_state, sampling_project_state, + reservoir_counters, } = message; let envelope = managed_envelope.envelope_mut(); @@ -1352,6 +1363,14 @@ impl EnvelopeProcessorService { // 2. The DSN was moved and the envelope sent to the old project ID. envelope.meta_mut().set_project_id(project_id); + #[allow(unused_mut)] + let mut reservoir = ReservoirEvaluator::new(reservoir_counters); + #[cfg(feature = "processing")] + if let Some(redis_pool) = self.inner.redis_pool.as_ref() { + let org_id = managed_envelope.scoping().organization_id; + reservoir.set_redis(org_id, redis_pool); + } + Ok(ProcessEnvelopeState { event: Annotated::empty(), event_metrics_extracted: false, @@ -1364,6 +1383,7 @@ impl EnvelopeProcessorService { project_id, managed_envelope, has_profile: false, + reservoir, }) } @@ -2345,6 +2365,7 @@ impl EnvelopeProcessorService { if config.is_enabled() { state.sampling_result = Self::compute_sampling_decision( self.inner.config.processing_enabled(), + &state.reservoir, state.project_state.config.dynamic_sampling.as_ref(), state.event.value(), state @@ -2364,6 +2385,7 @@ impl EnvelopeProcessorService { /// Computes the sampling decision on the incoming transaction. fn compute_sampling_decision( processing_enabled: bool, + reservoir: &ReservoirEvaluator, sampling_config: Option<&SamplingConfig>, event: Option<&Event>, root_sampling_config: Option<&SamplingConfig>, @@ -2403,8 +2425,9 @@ impl EnvelopeProcessorService { } }; - let mut evaluator = - SamplingEvaluator::new(Utc::now()).adjust_client_sample_rate(adjustment_rate); + let mut evaluator = SamplingEvaluator::new(Utc::now()) + .adjust_client_sample_rate(adjustment_rate) + .set_reservoir(reservoir); if let (Some(event), Some(sampling_state)) = (event, sampling_config) { if let Some(seed) = event.id.value().map(|id| id.0) { @@ -3035,6 +3058,10 @@ mod tests { } } + fn dummy_reservoir() -> ReservoirEvaluator<'static> { + ReservoirEvaluator::new(ReservoirCounters::default()) + } + fn mocked_event(event_type: EventType, transaction: &str, release: &str) -> Event { Event { id: Annotated::new(EventId::new()), @@ -3188,6 +3215,7 @@ mod tests { ), has_profile: false, event_metrics_extracted: false, + reservoir: dummy_reservoir(), } }; @@ -3235,6 +3263,7 @@ mod tests { // pipeline. let res = EnvelopeProcessorService::compute_sampling_decision( false, + &dummy_reservoir(), Some(&sampling_config), Some(&event), None, @@ -3378,6 +3407,8 @@ mod tests { upstream_relay, #[cfg(feature = "processing")] rate_limiter: None, + #[cfg(feature = "processing")] + redis_pool: None, geoip_lookup: None, global_config, }; @@ -3417,6 +3448,7 @@ mod tests { envelope: ManagedEnvelope::standalone(envelope, outcome_aggregator, test_store), project_state: Arc::new(ProjectState::allowed()), sampling_project_state: None, + reservoir_counters: ReservoirCounters::default(), }; let envelope_response = processor.process(message).unwrap(); @@ -3438,6 +3470,7 @@ mod tests { envelope: ManagedEnvelope::standalone(envelope, outcome_aggregator, test_store), project_state: Arc::new(ProjectState::allowed()), sampling_project_state, + reservoir_counters: ReservoirCounters::default(), }; let envelope_response = processor.process(message).unwrap(); @@ -3648,6 +3681,7 @@ mod tests { envelope: ManagedEnvelope::standalone(envelope, outcome_aggregator, test_store), project_state: Arc::new(project_state), sampling_project_state: None, + reservoir_counters: ReservoirCounters::default(), }; let envelope_response = processor.process(message).unwrap(); @@ -3718,6 +3752,7 @@ mod tests { envelope: ManagedEnvelope::standalone(envelope, outcome_aggregator, test_store), project_state: Arc::new(ProjectState::allowed()), sampling_project_state: None, + reservoir_counters: ReservoirCounters::default(), }; let envelope_response = processor.process(message).unwrap(); @@ -3766,6 +3801,7 @@ mod tests { envelope: ManagedEnvelope::standalone(envelope, outcome_aggregator, test_store), project_state: Arc::new(ProjectState::allowed()), sampling_project_state: None, + reservoir_counters: ReservoirCounters::default(), }; let envelope_response = processor.process(message).unwrap(); @@ -3822,6 +3858,7 @@ mod tests { envelope: ManagedEnvelope::standalone(envelope, outcome_aggregator, test_store), project_state: Arc::new(ProjectState::allowed()), sampling_project_state: None, + reservoir_counters: ReservoirCounters::default(), }; let envelope_response = processor.process(message).unwrap(); @@ -4126,6 +4163,7 @@ mod tests { let res = EnvelopeProcessorService::compute_sampling_decision( false, + &dummy_reservoir(), Some(&sampling_config), Some(&event), None, @@ -4164,6 +4202,7 @@ mod tests { // Unsupported rule should result in no match if processing is not enabled. let res = EnvelopeProcessorService::compute_sampling_decision( false, + &dummy_reservoir(), Some(&sampling_config), Some(&event), None, @@ -4174,6 +4213,7 @@ mod tests { // Match if processing is enabled. let res = EnvelopeProcessorService::compute_sampling_decision( true, + &dummy_reservoir(), Some(&sampling_config), Some(&event), None, @@ -4214,6 +4254,7 @@ mod tests { let res = EnvelopeProcessorService::compute_sampling_decision( false, + &dummy_reservoir(), None, None, Some(&sampling_config), @@ -4226,6 +4267,7 @@ mod tests { let res = EnvelopeProcessorService::compute_sampling_decision( false, + &dummy_reservoir(), None, None, Some(&sampling_config), diff --git a/relay-server/src/actors/project.rs b/relay-server/src/actors/project.rs index 23b3f2dc81..ade1417d4a 100644 --- a/relay-server/src/actors/project.rs +++ b/relay-server/src/actors/project.rs @@ -8,6 +8,7 @@ use relay_dynamic_config::{Feature, LimitedProjectConfig, ProjectConfig}; use relay_filter::matches_any_origin; use relay_metrics::{Aggregator, Bucket, MergeBuckets, MetricNamespace, MetricResourceIdentifier}; use relay_quotas::{Quota, RateLimits, Scoping}; +use relay_sampling::evaluation::ReservoirCounters; use relay_statsd::metric; use relay_system::{Addr, BroadcastChannel}; use serde::{Deserialize, Serialize}; @@ -393,6 +394,7 @@ pub struct Project { state_channel: Option, rate_limits: RateLimits, last_no_cache: Instant, + reservoir_counters: ReservoirCounters, } impl Project { @@ -408,6 +410,7 @@ impl Project { state_channel: None, rate_limits: RateLimits::new(), last_no_cache: Instant::now(), + reservoir_counters: Arc::default(), } } @@ -421,6 +424,27 @@ impl Project { } } + /// Returns the [`ReservoirCounters`] for the project. + pub fn reservoir_counters(&self) -> ReservoirCounters { + self.reservoir_counters.clone() + } + + /// If a reservoir rule is no longer in the sampling config, we will remove those counters. + fn remove_expired_reservoir_rules(&self) { + let Some(config) = self + .state + .as_ref() + .and_then(|state| state.config.dynamic_sampling.as_ref()) + else { + return; + }; + + // Using try_lock to not slow down the project cache service. + if let Ok(mut guard) = self.reservoir_counters.try_lock() { + guard.retain(|key, _| config.rules_v2.iter().any(|rule| rule.id == *key)); + } + } + pub fn merge_rate_limits(&mut self, rate_limits: RateLimits) { self.rate_limits.merge(rate_limits); } @@ -742,6 +766,9 @@ impl Project { // Flush all waiting recipients. relay_log::debug!("project state {} updated", self.project_key); channel.inner.send(state); + + // Check if the new sampling config got rid of any reservoir rules we have counters for. + self.remove_expired_reservoir_rules(); } /// Creates `Scoping` for this project if the state is loaded. diff --git a/relay-server/src/actors/project_cache.rs b/relay-server/src/actors/project_cache.rs index 08696e9b4f..616d7400e6 100644 --- a/relay-server/src/actors/project_cache.rs +++ b/relay-server/src/actors/project_cache.rs @@ -680,6 +680,8 @@ impl ProjectCacheBroker { .. }) = project.check_envelope(managed_envelope, self.services.outcome_aggregator.clone()) { + let reservoir_counters = project.reservoir_counters(); + let sampling_state = utils::get_sampling_key(managed_envelope.envelope()) .and_then(|key| self.projects.get(&key)) .and_then(|p| p.valid_state()); @@ -688,6 +690,7 @@ impl ProjectCacheBroker { envelope: managed_envelope, project_state: own_project_state.clone(), sampling_project_state: None, + reservoir_counters, }; if let Some(sampling_state) = sampling_state { diff --git a/relay-server/src/utils/dynamic_sampling.rs b/relay-server/src/utils/dynamic_sampling.rs index e127893ab6..f0b4ffaa44 100644 --- a/relay-server/src/utils/dynamic_sampling.rs +++ b/relay-server/src/utils/dynamic_sampling.rs @@ -53,7 +53,7 @@ impl SamplingResult { } } -impl From> for SamplingResult { +impl From>> for SamplingResult { fn from(value: ControlFlow) -> Self { match value { ControlFlow::Break(sampling_match) => Self::Match(sampling_match),