Skip to content

Commit

Permalink
fix(metrics): Use a FiniteF64 type in metrics (#2958)
Browse files Browse the repository at this point in the history
Introduces a strictly typed `FiniteF64` type to prevent storing
non-finite floats in metrics. This type requires a runtime check to be
constructed. All existing usage of this type has been updated:

- Metric extraction skips values that cannot be converted
- Counters and gauges use saturating addition (clamped at MAX / MIN)
- Tests use integers as mocks, which support infallible conversion
- The `relay_metrics::dist!` macro now takes values that implement
`Into<FiniteF64>`
  • Loading branch information
jan-auer committed Jan 17, 2024
1 parent aae3e6d commit ba844dc
Show file tree
Hide file tree
Showing 16 changed files with 603 additions and 138 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
**Bug Fixes**:

- Add automatic PII scrubbing to `logentry.params`. ([#2956](https://github.com/getsentry/relay/pull/2956))
- Avoid producing `null` values in metric data. These values were the result of Infinity or NaN values extracted from event data. The values are now discarded during extraction. ([#2958](https://github.com/getsentry/relay/pull/2958))

## 24.1.0

Expand Down
7 changes: 4 additions & 3 deletions relay-metrics/benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use relay_base_schema::project::ProjectKey;
use relay_common::time::UnixTimestamp;
use relay_metrics::{
aggregator::{Aggregator, AggregatorConfig},
Bucket, BucketValue, DistributionValue,
Bucket, BucketValue, DistributionValue, FiniteF64,
};

/// Struct representing a testcase for which insert + flush are timed.
Expand Down Expand Up @@ -65,7 +65,7 @@ fn bench_insert_and_flush(c: &mut Criterion) {
timestamp: UnixTimestamp::now(),
width: 0,
name: "c:transactions/foo@none".to_owned(),
value: BucketValue::counter(42.),
value: BucketValue::counter(42.into()),
tags: BTreeMap::new(),
};

Expand Down Expand Up @@ -163,8 +163,9 @@ fn bench_distribution(c: &mut Criterion) {

for size in [1, 10, 100, 1000, 10_000, 100_000, 1_000_000] {
let values = std::iter::from_fn(|| Some(rand::random()))
.filter_map(FiniteF64::new)
.take(size as usize)
.collect::<Vec<f64>>();
.collect::<Vec<FiniteF64>>();

group.throughput(criterion::Throughput::Elements(size));
group.bench_with_input(BenchmarkId::from_parameter(size), &values, |b, values| {
Expand Down
50 changes: 31 additions & 19 deletions relay-metrics/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ impl Aggregator {
self.buckets.retain(|key, entry| {
if force || entry.elapsed() {
// Take the value and leave a placeholder behind. It'll be removed right after.
let value = mem::replace(&mut entry.value, BucketValue::Counter(0.0));
let value = mem::replace(&mut entry.value, BucketValue::counter(0.into()));
cost_tracker.subtract_cost(key.project_key, key.cost());
cost_tracker.subtract_cost(key.project_key, value.cost());

Expand Down Expand Up @@ -887,7 +887,7 @@ mod tests {
timestamp: UnixTimestamp::from_secs(999994711),
width: 0,
name: "c:transactions/foo".to_owned(),
value: BucketValue::counter(42.),
value: BucketValue::counter(42.into()),
tags: BTreeMap::new(),
}
}
Expand All @@ -901,7 +901,7 @@ mod tests {
let bucket1 = some_bucket();

let mut bucket2 = bucket1.clone();
bucket2.value = BucketValue::counter(43.);
bucket2.value = BucketValue::counter(43.into());
aggregator.merge(project_key, bucket1, None).unwrap();
aggregator.merge(project_key, bucket2, None).unwrap();

Expand Down Expand Up @@ -935,20 +935,20 @@ mod tests {
let expected_bucket_value_size = 48;
let expected_set_entry_size = 4;

let counter = BucketValue::Counter(123.0);
let counter = BucketValue::Counter(123.into());
assert_eq!(counter.cost(), expected_bucket_value_size);
let set = BucketValue::Set([1, 2, 3, 4, 5].into());
assert_eq!(
set.cost(),
expected_bucket_value_size + 5 * expected_set_entry_size
);
let distribution = BucketValue::Distribution(dist![1., 2., 3.]);
let distribution = BucketValue::Distribution(dist![1, 2, 3]);
assert_eq!(distribution.cost(), expected_bucket_value_size + 3 * 8);
let gauge = BucketValue::Gauge(GaugeValue {
last: 43.,
min: 42.,
max: 43.,
sum: 85.,
last: 43.into(),
min: 42.into(),
max: 43.into(),
sum: 85.into(),
count: 2,
});
assert_eq!(gauge.cost(), expected_bucket_value_size);
Expand Down Expand Up @@ -1131,7 +1131,7 @@ mod tests {
timestamp: UnixTimestamp::from_secs(999994711),
width: 0,
name: "c:transactions/foo@none".to_owned(),
value: BucketValue::counter(42.),
value: BucketValue::counter(42.into()),
tags: BTreeMap::new(),
};
let bucket_key = BucketKey {
Expand All @@ -1144,10 +1144,14 @@ mod tests {
for (metric_name, metric_value, expected_added_cost) in [
(
"c:transactions/foo@none",
BucketValue::counter(42.),
BucketValue::counter(42.into()),
fixed_cost,
),
("c:transactions/foo@none", BucketValue::counter(42.), 0), // counters have constant size
(
"c:transactions/foo@none",
BucketValue::counter(42.into()),
0,
), // counters have constant size
(
"s:transactions/foo@none",
BucketValue::set(123),
Expand All @@ -1157,17 +1161,25 @@ mod tests {
("s:transactions/foo@none", BucketValue::set(456), 4), // Different element in set -> +4
(
"d:transactions/foo@none",
BucketValue::distribution(1.0),
BucketValue::distribution(1.into()),
fixed_cost + 8,
), // New bucket + 1 element
("d:transactions/foo@none", BucketValue::distribution(1.0), 8), // duplicate element
("d:transactions/foo@none", BucketValue::distribution(2.0), 8), // 1 new element
(
"d:transactions/foo@none",
BucketValue::distribution(1.into()),
8,
), // duplicate element
(
"d:transactions/foo@none",
BucketValue::distribution(2.into()),
8,
), // 1 new element
(
"g:transactions/foo@none",
BucketValue::gauge(0.3),
BucketValue::gauge(3.into()),
fixed_cost,
), // New bucket
("g:transactions/foo@none", BucketValue::gauge(0.2), 0), // gauge has constant size
("g:transactions/foo@none", BucketValue::gauge(2.into()), 0), // gauge has constant size
] {
let mut bucket = bucket.clone();
bucket.value = metric_value;
Expand Down Expand Up @@ -1374,7 +1386,7 @@ mod tests {
timestamp: UnixTimestamp::from_secs(999994711),
width: 0,
name: "c:transactions/foo".to_owned(),
value: BucketValue::counter(42.),
value: BucketValue::counter(42.into()),
tags: BTreeMap::new(),
};

Expand Down Expand Up @@ -1404,7 +1416,7 @@ mod tests {
timestamp: UnixTimestamp::from_secs(999994711),
width: 0,
name: "c:transactions/foo".to_owned(),
value: BucketValue::counter(42.),
value: BucketValue::counter(42.into()),
tags: BTreeMap::new(),
};

Expand Down
2 changes: 1 addition & 1 deletion relay-metrics/src/aggregatorservice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ mod tests {
timestamp: UnixTimestamp::from_secs(999994711),
width: 0,
name: "c:transactions/foo".to_owned(),
value: BucketValue::counter(42.),
value: BucketValue::counter(42.into()),
tags: BTreeMap::new(),
}
}
Expand Down
67 changes: 34 additions & 33 deletions relay-metrics/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::protocol::{
self, hash_set_value, CounterType, DistributionType, GaugeType, MetricResourceIdentifier,
MetricType, SetType,
};
use crate::{MetricNamespace, ParseMetricError};
use crate::{FiniteF64, MetricNamespace, ParseMetricError};

const VALUE_SEPARATOR: char = ':';

Expand Down Expand Up @@ -51,7 +51,7 @@ impl GaugeValue {
self.last = value;
self.min = self.min.min(value);
self.max = self.max.max(value);
self.sum += value;
self.sum = self.sum.saturating_add(value);
self.count += 1;
}

Expand All @@ -60,17 +60,13 @@ impl GaugeValue {
self.last = other.last;
self.min = self.min.min(other.min);
self.max = self.max.max(other.max);
self.sum += other.sum;
self.sum = self.sum.saturating_add(other.sum);
self.count += other.count;
}

/// Returns the average of all values reported in this bucket.
pub fn avg(&self) -> GaugeType {
if self.count > 0 {
self.sum / (self.count as GaugeType)
} else {
0.0
}
pub fn avg(&self) -> Option<GaugeType> {
self.sum / FiniteF64::new(self.count as f64)?
}
}

Expand All @@ -85,9 +81,9 @@ impl GaugeValue {
/// ```
/// use relay_metrics::dist;
///
/// let mut dist = dist![1.0, 1.0, 1.0, 2.0];
/// dist.push(5.0);
/// dist.extend(std::iter::repeat(3.0).take(7));
/// let mut dist = dist![1, 1, 1, 2];
/// dist.push(5.into());
/// dist.extend(std::iter::repeat(3.into()).take(7));
/// ```
///
/// Logically, this distribution is equivalent to this visualization:
Expand Down Expand Up @@ -117,12 +113,12 @@ pub use smallvec::smallvec as _smallvec;
/// # Example
///
/// ```
/// let dist = relay_metrics::dist![1.0, 2.0];
/// let dist = relay_metrics::dist![1, 2];
/// ```
#[macro_export]
macro_rules! dist {
($($x:expr),*$(,)*) => {
$crate::_smallvec!($($x),*) as $crate::DistributionValue
$crate::_smallvec!($($crate::DistributionType::from($x)),*) as $crate::DistributionValue
};
}

Expand Down Expand Up @@ -340,7 +336,7 @@ impl BucketValue {
/// values are of the same variant. Otherwise, this returns `Err(other)`.
pub fn merge(&mut self, other: Self) -> Result<(), Self> {
match (self, other) {
(Self::Counter(slf), Self::Counter(other)) => *slf += other,
(Self::Counter(slf), Self::Counter(other)) => *slf = slf.saturating_add(other),
(Self::Distribution(slf), Self::Distribution(other)) => slf.extend_from_slice(&other),
(Self::Set(slf), Self::Set(other)) => slf.extend(other),
(Self::Gauge(slf), Self::Gauge(other)) => slf.merge(other),
Expand All @@ -355,7 +351,7 @@ impl BucketValue {
fn parse_counter(string: &str) -> Option<CounterType> {
let mut sum = CounterType::default();
for component in string.split(VALUE_SEPARATOR) {
sum += component.parse::<CounterType>().ok()?;
sum = sum.saturating_add(component.parse().ok()?);
}
Some(sum)
}
Expand Down Expand Up @@ -803,18 +799,16 @@ mod tests {

#[test]
fn test_bucket_value_merge_counter() {
let mut value = BucketValue::Counter(42.);
value.merge(BucketValue::Counter(43.)).unwrap();
assert_eq!(value, BucketValue::Counter(85.));
let mut value = BucketValue::Counter(42.into());
value.merge(BucketValue::Counter(43.into())).unwrap();
assert_eq!(value, BucketValue::Counter(85.into()));
}

#[test]
fn test_bucket_value_merge_distribution() {
let mut value = BucketValue::Distribution(dist![1., 2., 3.]);
value
.merge(BucketValue::Distribution(dist![2., 4.]))
.unwrap();
assert_eq!(value, BucketValue::Distribution(dist![1., 2., 3., 2., 4.]));
let mut value = BucketValue::Distribution(dist![1, 2, 3]);
value.merge(BucketValue::Distribution(dist![2, 4])).unwrap();
assert_eq!(value, BucketValue::Distribution(dist![1, 2, 3, 2, 4]));
}

#[test]
Expand All @@ -826,16 +820,16 @@ mod tests {

#[test]
fn test_bucket_value_merge_gauge() {
let mut value = BucketValue::Gauge(GaugeValue::single(42.));
value.merge(BucketValue::gauge(43.)).unwrap();
let mut value = BucketValue::Gauge(GaugeValue::single(42.into()));
value.merge(BucketValue::gauge(43.into())).unwrap();

assert_eq!(
value,
BucketValue::Gauge(GaugeValue {
last: 43.,
min: 42.,
max: 43.,
sum: 85.,
last: 43.into(),
min: 42.into(),
max: 43.into(),
sum: 85.into(),
count: 2,
})
);
Expand Down Expand Up @@ -872,7 +866,7 @@ mod tests {
let s = "transactions/foo:42:17:21|c";
let timestamp = UnixTimestamp::from_secs(4711);
let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
assert_eq!(metric.value, BucketValue::Counter(80.0));
assert_eq!(metric.value, BucketValue::Counter(80.into()));
}

#[test]
Expand Down Expand Up @@ -902,7 +896,11 @@ mod tests {
let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
assert_eq!(
metric.value,
BucketValue::Distribution(dist![17.5, 21.9, 42.7])
BucketValue::Distribution(dist![
FiniteF64::new(17.5).unwrap(),
FiniteF64::new(21.9).unwrap(),
FiniteF64::new(42.7).unwrap()
])
);
}

Expand All @@ -911,7 +909,10 @@ mod tests {
let s = "transactions/foo:17.5|h"; // common alias for distribution
let timestamp = UnixTimestamp::from_secs(4711);
let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap();
assert_eq!(metric.value, BucketValue::Distribution(dist![17.5]));
assert_eq!(
metric.value,
BucketValue::Distribution(dist![FiniteF64::new(17.5).unwrap()])
);
}

#[test]
Expand Down
Loading

0 comments on commit ba844dc

Please sign in to comment.