Skip to content

Commit

Permalink
feat(sampling): Reservoir sampling (#2550)
Browse files Browse the repository at this point in the history
Relay implementation of the reservoir project:
getsentry/sentry#54449

Reservoir bias uses a type of `SamplingRule` which will sample all
matches until a certain limit has been reached. This limit is tracked
both locally on each relay, and with a global synchronized one in redis
that procesisng relays can have access to. The redis counter will update
the local counter if it's available.

The counters are saved on the `Project` struct, with a
Mutex<BTreeMap<Ruleid, i64>> .
When we send an envelope for processing we send its corresponding
project counters in the `ProcessEnvelopeState` to the
`EnvelopeProcessorService`.

There, in the `dynamic-sampling` crate, we introduce a
`ReservoirEvaluator`, which will, when a reservoir rule is matching,
check if the rule has been reached or not by using the local counters we
sent or if applicable the redis global count. The `ReservoirEvaluator`
also takes care of updating both redis and the local counter.

After the limit is reached, the rule is no longer valid and will be
ignored, so that the normal `SampleRate` and `Factor` variant of
`SamplingValue` will apply.

Sentry is responsible for removing the reservoir rule from the
`SamplingConfig` when it has reached its limit.

Whenever we receive a new `ProjectConfig`, we remove all the reservoir
counters from its project that are no longer in the
`DynamicSamplingConfig`.

regarding the use of mutex:
We use try_lock to avoid getting blocked in case the mutex is already in
use. There's two reasons it might be blocked.

1. Another thread is handling a reservoir rule from the same project at
the same time.
2. We are in the process of removing counters from that same project.
  • Loading branch information
TBS1996 committed Sep 29, 2023
1 parent 6452adf commit 5f8335f
Show file tree
Hide file tree
Showing 13 changed files with 420 additions and 45 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
- Remove filtering for Android events with missing close events. ([#2524](https://github.com/getsentry/relay/pull/2524))
- Exclude more spans fron metrics extraction. ([#2522](https://github.com/getsentry/relay/pull/2522), [#2525](https://github.com/getsentry/relay/pull/2525), [#2545](https://github.com/getsentry/relay/pull/2545))
- Fix hot-loop burning CPU when upstream service is unavailable. ([#2518](https://github.com/getsentry/relay/pull/2518))
- Introduce reservoir sampling rule. ([#2550](https://github.com/getsentry/relay/pull/2550))

## 23.9.1

Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion relay-kafka/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
//!
//! // build the client
//! let kafka_client = builder.build();
//!
//!
//! // send the message
//! kafka_client.send_message(KafkaTopic::Events, 1u64, &kafka_message).unwrap();
//! ```
Expand Down
6 changes: 6 additions & 0 deletions relay-sampling/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,20 @@ edition = "2021"
license-file = "../LICENSE"
publish = false

[features]
default = []
redis = ["dep:anyhow", "relay-redis/impl"]

[dependencies]
anyhow = { workspace = true, optional = true }
chrono = { workspace = true }
rand = { workspace = true }
rand_pcg = "0.3.1"
relay-base-schema = { path = "../relay-base-schema" }
relay-common = { path = "../relay-common" }
relay-log = { path = "../relay-log" }
relay-protocol = { path = "../relay-protocol" }
relay-redis = { path = "../relay-redis", optional = true }
serde = { workspace = true }
serde_json = { workspace = true }
unicase = "2.6.0"
Expand Down
96 changes: 63 additions & 33 deletions relay-sampling/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};

use crate::condition::RuleCondition;
use crate::evaluation::ReservoirEvaluator;
use crate::utils;

/// Represents the dynamic sampling configuration available to a project.
Expand Down Expand Up @@ -81,14 +82,28 @@ impl SamplingRule {
self.condition.supported() && self.ty != RuleType::Unsupported
}

/// Returns the sample rate if the rule is active.
pub fn sample_rate(&self, now: DateTime<Utc>) -> Option<SamplingValue> {
/// Returns the updated [`SamplingValue`] if it's valid.
pub fn evaluate(
&self,
now: DateTime<Utc>,
reservoir: Option<&ReservoirEvaluator>,
) -> Option<SamplingValue> {
if !self.time_range.contains(now) {
// Return None if rule is inactive.
return None;
}

let sampling_base_value = self.sampling_value.value();
let sampling_base_value = match self.sampling_value {
SamplingValue::SampleRate { value } => value,
SamplingValue::Factor { value } => value,
SamplingValue::Reservoir { limit } => {
return reservoir.and_then(|reservoir| {
reservoir
.evaluate(self.id, limit, self.time_range.end.as_ref())
.then_some(SamplingValue::Reservoir { limit })
});
}
};

let value = match self.decaying_fn {
DecayingFunction::Linear { decayed_value } => {
Expand All @@ -114,6 +129,9 @@ impl SamplingRule {
match self.sampling_value {
SamplingValue::SampleRate { .. } => Some(SamplingValue::SampleRate { value }),
SamplingValue::Factor { .. } => Some(SamplingValue::Factor { value }),
// This should be impossible.
// Todo(tor): refactor so we don't run into this invalid state.
_ => None,
}
}
}
Expand All @@ -140,18 +158,18 @@ pub enum SamplingValue {
/// until a sample rate rule is found. The matched rule's factor will be multiplied with the
/// accumulated factors before moving onto the next possible match.
Factor {
/// The fator to apply on another matched sample rate.
/// The factor to apply on another matched sample rate.
value: f64,
},
}

impl SamplingValue {
pub(crate) fn value(&self) -> f64 {
*match self {
SamplingValue::SampleRate { value } => value,
SamplingValue::Factor { value } => value,
}
}
/// A reservoir limit.
///
/// A rule with a reservoir limit will be sampled if the rule have been matched fewer times
/// than the limit.
Reservoir {
/// The limit of how many times this rule will be sampled before this rule is invalid.
limit: i64,
},
}

/// Defines what a dynamic sampling rule applies to.
Expand All @@ -174,7 +192,7 @@ pub enum RuleType {
///
/// This number must be unique within a Sentry organization, as it is recorded in outcomes and used
/// to infer which sampling rule caused data to be dropped.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct RuleId(pub u32);

impl fmt::Display for RuleId {
Expand Down Expand Up @@ -515,19 +533,19 @@ mod tests {

// At the start of the time range, sample rate is equal to the rule's initial sampling value.
assert_eq!(
rule.sample_rate(start).unwrap(),
rule.evaluate(start, None).unwrap(),
SamplingValue::SampleRate { value: 1.0 }
);

// Halfway in the time range, the value is exactly between 1.0 and 0.5.
assert_eq!(
rule.sample_rate(halfway).unwrap(),
rule.evaluate(halfway, None).unwrap(),
SamplingValue::SampleRate { value: 0.75 }
);

// Approaches 0.5 at the end.
assert_eq!(
rule.sample_rate(end).unwrap(),
rule.evaluate(end, None).unwrap(),
SamplingValue::SampleRate {
// It won't go to exactly 0.5 because the time range is end-exclusive.
value: 0.5000028935185186
Expand All @@ -541,15 +559,15 @@ mod tests {
rule
};

assert!(rule_without_start.sample_rate(halfway).is_none());
assert!(rule_without_start.evaluate(halfway, None).is_none());

let rule_without_end = {
let mut rule = rule.clone();
rule.time_range.end = None;
rule
};

assert!(rule_without_end.sample_rate(halfway).is_none());
assert!(rule_without_end.evaluate(halfway, None).is_none());
}

/// If the decayingfunction is set to `Constant` then it shouldn't adjust the sample rate.
Expand All @@ -571,7 +589,7 @@ mod tests {

let halfway = Utc.with_ymd_and_hms(1970, 10, 11, 0, 0, 0).unwrap();

assert_eq!(rule.sample_rate(halfway), Some(sampling_value));
assert_eq!(rule.evaluate(halfway, None), Some(sampling_value));
}

/// Validates the `sample_rate` method for different time range configurations.
Expand All @@ -597,30 +615,42 @@ mod tests {
time_range,
decaying_fn: DecayingFunction::Constant,
};
assert!(rule.sample_rate(before_time_range).is_none());
assert!(rule.sample_rate(during_time_range).is_some());
assert!(rule.sample_rate(after_time_range).is_none());
assert!(rule.evaluate(before_time_range, None).is_none());
assert!(rule.evaluate(during_time_range, None).is_some());
assert!(rule.evaluate(after_time_range, None).is_none());

// [start..]
let mut rule_without_end = rule.clone();
rule_without_end.time_range.end = None;
assert!(rule_without_end.sample_rate(before_time_range).is_none());
assert!(rule_without_end.sample_rate(during_time_range).is_some());
assert!(rule_without_end.sample_rate(after_time_range).is_some());
assert!(rule_without_end.evaluate(before_time_range, None).is_none());
assert!(rule_without_end.evaluate(during_time_range, None).is_some());
assert!(rule_without_end.evaluate(after_time_range, None).is_some());

// [..end]
let mut rule_without_start = rule.clone();
rule_without_start.time_range.start = None;
assert!(rule_without_start.sample_rate(before_time_range).is_some());
assert!(rule_without_start.sample_rate(during_time_range).is_some());
assert!(rule_without_start.sample_rate(after_time_range).is_none());
assert!(rule_without_start
.evaluate(before_time_range, None)
.is_some());
assert!(rule_without_start
.evaluate(during_time_range, None)
.is_some());
assert!(rule_without_start
.evaluate(after_time_range, None)
.is_none());

// [..]
let mut rule_without_range = rule.clone();
rule_without_range.time_range = TimeRange::default();
assert!(rule_without_range.sample_rate(before_time_range).is_some());
assert!(rule_without_range.sample_rate(during_time_range).is_some());
assert!(rule_without_range.sample_rate(after_time_range).is_some());
assert!(rule_without_range
.evaluate(before_time_range, None)
.is_some());
assert!(rule_without_range
.evaluate(during_time_range, None)
.is_some());
assert!(rule_without_range
.evaluate(after_time_range, None)
.is_some());
}

/// You can pass in a SamplingValue of either variant, and it should return the same one if
Expand All @@ -639,13 +669,13 @@ mod tests {
};

matches!(
rule.sample_rate(Utc::now()).unwrap(),
rule.evaluate(Utc::now(), None).unwrap(),
SamplingValue::SampleRate { .. }
);

rule.sampling_value = SamplingValue::Factor { value: 0.42 };
matches!(
rule.sample_rate(Utc::now()).unwrap(),
rule.evaluate(Utc::now(), None).unwrap(),
SamplingValue::Factor { .. }
);
}
Expand Down
Loading

0 comments on commit 5f8335f

Please sign in to comment.