diff --git a/CHANGELOG.md b/CHANGELOG.md index e2f4244c97..c7e35126c1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ **Bug Fixes**: - Add automatic PII scrubbing to `logentry.params`. ([#2956](https://github.com/getsentry/relay/pull/2956)) +- Avoid producing `null` values in metric data. These values were the result of Infinity or NaN values extracted from event data. The values are now discarded during extraction. ([#2958](https://github.com/getsentry/relay/pull/2958)) ## 24.1.0 diff --git a/relay-metrics/benches/benches.rs b/relay-metrics/benches/benches.rs index 4a009c664f..78f9f5e055 100644 --- a/relay-metrics/benches/benches.rs +++ b/relay-metrics/benches/benches.rs @@ -6,7 +6,7 @@ use relay_base_schema::project::ProjectKey; use relay_common::time::UnixTimestamp; use relay_metrics::{ aggregator::{Aggregator, AggregatorConfig}, - Bucket, BucketValue, DistributionValue, + Bucket, BucketValue, DistributionValue, FiniteF64, }; /// Struct representing a testcase for which insert + flush are timed. @@ -65,7 +65,7 @@ fn bench_insert_and_flush(c: &mut Criterion) { timestamp: UnixTimestamp::now(), width: 0, name: "c:transactions/foo@none".to_owned(), - value: BucketValue::counter(42.), + value: BucketValue::counter(42.into()), tags: BTreeMap::new(), }; @@ -163,8 +163,9 @@ fn bench_distribution(c: &mut Criterion) { for size in [1, 10, 100, 1000, 10_000, 100_000, 1_000_000] { let values = std::iter::from_fn(|| Some(rand::random())) + .filter_map(FiniteF64::new) .take(size as usize) - .collect::>(); + .collect::>(); group.throughput(criterion::Throughput::Elements(size)); group.bench_with_input(BenchmarkId::from_parameter(size), &values, |b, values| { diff --git a/relay-metrics/src/aggregator.rs b/relay-metrics/src/aggregator.rs index 9939c15324..da6da380db 100644 --- a/relay-metrics/src/aggregator.rs +++ b/relay-metrics/src/aggregator.rs @@ -529,7 +529,7 @@ impl Aggregator { self.buckets.retain(|key, entry| { if force || entry.elapsed() { // Take the value and leave a placeholder behind. It'll be removed right after. - let value = mem::replace(&mut entry.value, BucketValue::Counter(0.0)); + let value = mem::replace(&mut entry.value, BucketValue::counter(0.into())); cost_tracker.subtract_cost(key.project_key, key.cost()); cost_tracker.subtract_cost(key.project_key, value.cost()); @@ -887,7 +887,7 @@ mod tests { timestamp: UnixTimestamp::from_secs(999994711), width: 0, name: "c:transactions/foo".to_owned(), - value: BucketValue::counter(42.), + value: BucketValue::counter(42.into()), tags: BTreeMap::new(), } } @@ -901,7 +901,7 @@ mod tests { let bucket1 = some_bucket(); let mut bucket2 = bucket1.clone(); - bucket2.value = BucketValue::counter(43.); + bucket2.value = BucketValue::counter(43.into()); aggregator.merge(project_key, bucket1, None).unwrap(); aggregator.merge(project_key, bucket2, None).unwrap(); @@ -935,20 +935,20 @@ mod tests { let expected_bucket_value_size = 48; let expected_set_entry_size = 4; - let counter = BucketValue::Counter(123.0); + let counter = BucketValue::Counter(123.into()); assert_eq!(counter.cost(), expected_bucket_value_size); let set = BucketValue::Set([1, 2, 3, 4, 5].into()); assert_eq!( set.cost(), expected_bucket_value_size + 5 * expected_set_entry_size ); - let distribution = BucketValue::Distribution(dist![1., 2., 3.]); + let distribution = BucketValue::Distribution(dist![1, 2, 3]); assert_eq!(distribution.cost(), expected_bucket_value_size + 3 * 8); let gauge = BucketValue::Gauge(GaugeValue { - last: 43., - min: 42., - max: 43., - sum: 85., + last: 43.into(), + min: 42.into(), + max: 43.into(), + sum: 85.into(), count: 2, }); assert_eq!(gauge.cost(), expected_bucket_value_size); @@ -1131,7 +1131,7 @@ mod tests { timestamp: UnixTimestamp::from_secs(999994711), width: 0, name: "c:transactions/foo@none".to_owned(), - value: BucketValue::counter(42.), + value: BucketValue::counter(42.into()), tags: BTreeMap::new(), }; let bucket_key = BucketKey { @@ -1144,10 +1144,14 @@ mod tests { for (metric_name, metric_value, expected_added_cost) in [ ( "c:transactions/foo@none", - BucketValue::counter(42.), + BucketValue::counter(42.into()), fixed_cost, ), - ("c:transactions/foo@none", BucketValue::counter(42.), 0), // counters have constant size + ( + "c:transactions/foo@none", + BucketValue::counter(42.into()), + 0, + ), // counters have constant size ( "s:transactions/foo@none", BucketValue::set(123), @@ -1157,17 +1161,25 @@ mod tests { ("s:transactions/foo@none", BucketValue::set(456), 4), // Different element in set -> +4 ( "d:transactions/foo@none", - BucketValue::distribution(1.0), + BucketValue::distribution(1.into()), fixed_cost + 8, ), // New bucket + 1 element - ("d:transactions/foo@none", BucketValue::distribution(1.0), 8), // duplicate element - ("d:transactions/foo@none", BucketValue::distribution(2.0), 8), // 1 new element + ( + "d:transactions/foo@none", + BucketValue::distribution(1.into()), + 8, + ), // duplicate element + ( + "d:transactions/foo@none", + BucketValue::distribution(2.into()), + 8, + ), // 1 new element ( "g:transactions/foo@none", - BucketValue::gauge(0.3), + BucketValue::gauge(3.into()), fixed_cost, ), // New bucket - ("g:transactions/foo@none", BucketValue::gauge(0.2), 0), // gauge has constant size + ("g:transactions/foo@none", BucketValue::gauge(2.into()), 0), // gauge has constant size ] { let mut bucket = bucket.clone(); bucket.value = metric_value; @@ -1374,7 +1386,7 @@ mod tests { timestamp: UnixTimestamp::from_secs(999994711), width: 0, name: "c:transactions/foo".to_owned(), - value: BucketValue::counter(42.), + value: BucketValue::counter(42.into()), tags: BTreeMap::new(), }; @@ -1404,7 +1416,7 @@ mod tests { timestamp: UnixTimestamp::from_secs(999994711), width: 0, name: "c:transactions/foo".to_owned(), - value: BucketValue::counter(42.), + value: BucketValue::counter(42.into()), tags: BTreeMap::new(), }; diff --git a/relay-metrics/src/aggregatorservice.rs b/relay-metrics/src/aggregatorservice.rs index 09747eb0cb..079cf031df 100644 --- a/relay-metrics/src/aggregatorservice.rs +++ b/relay-metrics/src/aggregatorservice.rs @@ -457,7 +457,7 @@ mod tests { timestamp: UnixTimestamp::from_secs(999994711), width: 0, name: "c:transactions/foo".to_owned(), - value: BucketValue::counter(42.), + value: BucketValue::counter(42.into()), tags: BTreeMap::new(), } } diff --git a/relay-metrics/src/bucket.rs b/relay-metrics/src/bucket.rs index 7727b2a6aa..e24b9c2b2b 100644 --- a/relay-metrics/src/bucket.rs +++ b/relay-metrics/src/bucket.rs @@ -13,7 +13,7 @@ use crate::protocol::{ self, hash_set_value, CounterType, DistributionType, GaugeType, MetricResourceIdentifier, MetricType, SetType, }; -use crate::{MetricNamespace, ParseMetricError}; +use crate::{FiniteF64, MetricNamespace, ParseMetricError}; const VALUE_SEPARATOR: char = ':'; @@ -51,7 +51,7 @@ impl GaugeValue { self.last = value; self.min = self.min.min(value); self.max = self.max.max(value); - self.sum += value; + self.sum = self.sum.saturating_add(value); self.count += 1; } @@ -60,17 +60,13 @@ impl GaugeValue { self.last = other.last; self.min = self.min.min(other.min); self.max = self.max.max(other.max); - self.sum += other.sum; + self.sum = self.sum.saturating_add(other.sum); self.count += other.count; } /// Returns the average of all values reported in this bucket. - pub fn avg(&self) -> GaugeType { - if self.count > 0 { - self.sum / (self.count as GaugeType) - } else { - 0.0 - } + pub fn avg(&self) -> Option { + self.sum / FiniteF64::new(self.count as f64)? } } @@ -85,9 +81,9 @@ impl GaugeValue { /// ``` /// use relay_metrics::dist; /// -/// let mut dist = dist![1.0, 1.0, 1.0, 2.0]; -/// dist.push(5.0); -/// dist.extend(std::iter::repeat(3.0).take(7)); +/// let mut dist = dist![1, 1, 1, 2]; +/// dist.push(5.into()); +/// dist.extend(std::iter::repeat(3.into()).take(7)); /// ``` /// /// Logically, this distribution is equivalent to this visualization: @@ -117,12 +113,12 @@ pub use smallvec::smallvec as _smallvec; /// # Example /// /// ``` -/// let dist = relay_metrics::dist![1.0, 2.0]; +/// let dist = relay_metrics::dist![1, 2]; /// ``` #[macro_export] macro_rules! dist { ($($x:expr),*$(,)*) => { - $crate::_smallvec!($($x),*) as $crate::DistributionValue + $crate::_smallvec!($($crate::DistributionType::from($x)),*) as $crate::DistributionValue }; } @@ -340,7 +336,7 @@ impl BucketValue { /// values are of the same variant. Otherwise, this returns `Err(other)`. pub fn merge(&mut self, other: Self) -> Result<(), Self> { match (self, other) { - (Self::Counter(slf), Self::Counter(other)) => *slf += other, + (Self::Counter(slf), Self::Counter(other)) => *slf = slf.saturating_add(other), (Self::Distribution(slf), Self::Distribution(other)) => slf.extend_from_slice(&other), (Self::Set(slf), Self::Set(other)) => slf.extend(other), (Self::Gauge(slf), Self::Gauge(other)) => slf.merge(other), @@ -355,7 +351,7 @@ impl BucketValue { fn parse_counter(string: &str) -> Option { let mut sum = CounterType::default(); for component in string.split(VALUE_SEPARATOR) { - sum += component.parse::().ok()?; + sum = sum.saturating_add(component.parse().ok()?); } Some(sum) } @@ -803,18 +799,16 @@ mod tests { #[test] fn test_bucket_value_merge_counter() { - let mut value = BucketValue::Counter(42.); - value.merge(BucketValue::Counter(43.)).unwrap(); - assert_eq!(value, BucketValue::Counter(85.)); + let mut value = BucketValue::Counter(42.into()); + value.merge(BucketValue::Counter(43.into())).unwrap(); + assert_eq!(value, BucketValue::Counter(85.into())); } #[test] fn test_bucket_value_merge_distribution() { - let mut value = BucketValue::Distribution(dist![1., 2., 3.]); - value - .merge(BucketValue::Distribution(dist![2., 4.])) - .unwrap(); - assert_eq!(value, BucketValue::Distribution(dist![1., 2., 3., 2., 4.])); + let mut value = BucketValue::Distribution(dist![1, 2, 3]); + value.merge(BucketValue::Distribution(dist![2, 4])).unwrap(); + assert_eq!(value, BucketValue::Distribution(dist![1, 2, 3, 2, 4])); } #[test] @@ -826,16 +820,16 @@ mod tests { #[test] fn test_bucket_value_merge_gauge() { - let mut value = BucketValue::Gauge(GaugeValue::single(42.)); - value.merge(BucketValue::gauge(43.)).unwrap(); + let mut value = BucketValue::Gauge(GaugeValue::single(42.into())); + value.merge(BucketValue::gauge(43.into())).unwrap(); assert_eq!( value, BucketValue::Gauge(GaugeValue { - last: 43., - min: 42., - max: 43., - sum: 85., + last: 43.into(), + min: 42.into(), + max: 43.into(), + sum: 85.into(), count: 2, }) ); @@ -872,7 +866,7 @@ mod tests { let s = "transactions/foo:42:17:21|c"; let timestamp = UnixTimestamp::from_secs(4711); let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap(); - assert_eq!(metric.value, BucketValue::Counter(80.0)); + assert_eq!(metric.value, BucketValue::Counter(80.into())); } #[test] @@ -902,7 +896,11 @@ mod tests { let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap(); assert_eq!( metric.value, - BucketValue::Distribution(dist![17.5, 21.9, 42.7]) + BucketValue::Distribution(dist![ + FiniteF64::new(17.5).unwrap(), + FiniteF64::new(21.9).unwrap(), + FiniteF64::new(42.7).unwrap() + ]) ); } @@ -911,7 +909,10 @@ mod tests { let s = "transactions/foo:17.5|h"; // common alias for distribution let timestamp = UnixTimestamp::from_secs(4711); let metric = Bucket::parse(s.as_bytes(), timestamp).unwrap(); - assert_eq!(metric.value, BucketValue::Distribution(dist![17.5])); + assert_eq!( + metric.value, + BucketValue::Distribution(dist![FiniteF64::new(17.5).unwrap()]) + ); } #[test] diff --git a/relay-metrics/src/finite.rs b/relay-metrics/src/finite.rs new file mode 100644 index 0000000000..0915de57e3 --- /dev/null +++ b/relay-metrics/src/finite.rs @@ -0,0 +1,373 @@ +use std::cmp::Ordering; +use std::error::Error; +use std::hash::{Hash, Hasher}; +use std::num::ParseFloatError; +use std::str::FromStr; +use std::{fmt, ops}; + +use serde::{Deserialize, Serialize}; + +/// A finite 64-bit floating point type. +/// +/// This is a restricted version of [`f64`] that does not allow NaN or infinity. +#[derive(Clone, Copy, Default, PartialEq, Deserialize, Serialize)] +#[serde(try_from = "f64")] +#[repr(transparent)] +pub struct FiniteF64(f64); + +impl FiniteF64 { + /// Largest finite value. + pub const MAX: Self = Self(f64::MAX); + /// Smallest finite value. + pub const MIN: Self = Self(f64::MIN); + /// Smallest positive normal value. + pub const EPSILON: Self = Self(f64::EPSILON); + + /// Creates a finite float without checking whether the value is finte. This results in + /// undefined behavior if the value is non-finite. + /// + /// # Safety + /// + /// The value must not be NaN or infinite. + #[must_use] + #[inline] + pub const unsafe fn new_unchecked(value: f64) -> Self { + Self(value) + } + + /// Creates a finite float if the value is finite. + #[must_use] + #[inline] + pub fn new(value: f64) -> Option { + if value.is_finite() { + Some(Self(value)) + } else { + None + } + } + + /// Returns the plain [`f64`]. + #[inline] + pub const fn to_f64(self) -> f64 { + self.0 + } + + /// Computes the absolute value of self. + pub fn abs(self) -> Self { + Self(self.0.abs()) + } + + /// Returns the maximum of two numbers. + pub fn max(self, other: Self) -> Self { + Self(self.0.max(other.0)) + } + + /// Returns the minimum of two numbers. + pub fn min(self, other: Self) -> Self { + Self(self.0.min(other.0)) + } + + /// Adds two numbers, saturating at the maximum and minimum representable values. + pub fn saturating_add(self, other: Self) -> Self { + Self((self.0 + other.0).clamp(f64::MIN, f64::MAX)) + } + + /// Adds two numbers, saturating at the maximum and minimum representable values. + pub fn saturating_sub(self, other: Self) -> Self { + Self((self.0 - other.0).clamp(f64::MIN, f64::MAX)) + } + + /// Multiplies two numbers, saturating at the maximum and minimum representable values. + pub fn saturating_mul(self, other: Self) -> Self { + Self((self.0 * other.0).clamp(f64::MIN, f64::MAX)) + } + + // NB: There is no saturating_div, since 0/0 is NaN, which is not finite. +} + +impl Eq for FiniteF64 {} + +impl PartialOrd for FiniteF64 { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for FiniteF64 { + fn cmp(&self, other: &Self) -> Ordering { + // Safety: NaN and infinity cannot be constructed from a finite f64. + self.0.partial_cmp(&other.0).unwrap_or(Ordering::Less) + } +} + +impl Hash for FiniteF64 { + fn hash(&self, state: &mut H) { + // Safety: NaN and infinity cannot be constructed from a finite f64. + self.0.to_bits().hash(state) + } +} + +impl fmt::Debug for FiniteF64 { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} + +impl fmt::Display for FiniteF64 { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} + +impl ops::Add for FiniteF64 { + type Output = Option; + + fn add(self, other: Self) -> Option { + Self::new(self.0 + other.0) + } +} + +impl ops::Sub for FiniteF64 { + type Output = Option; + + fn sub(self, other: Self) -> Option { + Self::new(self.0 - other.0) + } +} + +impl ops::Mul for FiniteF64 { + type Output = Option; + + fn mul(self, other: Self) -> Option { + Self::new(self.0 * other.0) + } +} + +impl ops::Div for FiniteF64 { + type Output = Option; + + fn div(self, other: Self) -> Option { + Self::new(self.0 / other.0) + } +} + +impl ops::Rem for FiniteF64 { + type Output = Option; + + fn rem(self, other: Self) -> Option { + Self::new(self.0 % other.0) + } +} + +/// Error type returned when conversion to [`FiniteF64`] fails. +#[derive(Debug, thiserror::Error)] +#[error("float is nan or infinite")] +pub struct TryFromFloatError; + +impl TryFrom for FiniteF64 { + type Error = TryFromFloatError; + + fn try_from(value: f64) -> Result { + Self::new(value).ok_or(TryFromFloatError) + } +} + +impl TryFrom for FiniteF64 { + type Error = TryFromFloatError; + + fn try_from(value: f32) -> Result { + f64::from(value).try_into() + } +} + +impl From for FiniteF64 { + fn from(value: i8) -> Self { + unsafe { Self::new_unchecked(value.into()) } + } +} + +impl From for FiniteF64 { + fn from(value: i16) -> Self { + unsafe { Self::new_unchecked(value.into()) } + } +} + +impl From for FiniteF64 { + fn from(value: i32) -> Self { + unsafe { Self::new_unchecked(value.into()) } + } +} + +impl From for FiniteF64 { + fn from(value: u8) -> Self { + unsafe { Self::new_unchecked(value.into()) } + } +} + +impl From for FiniteF64 { + fn from(value: u16) -> Self { + unsafe { Self::new_unchecked(value.into()) } + } +} + +impl From for FiniteF64 { + fn from(value: u32) -> Self { + unsafe { Self::new_unchecked(value.into()) } + } +} + +impl From for f64 { + fn from(value: FiniteF64) -> Self { + value.to_f64() + } +} + +#[derive(Debug)] +enum ParseFiniteFloatErrorKind { + Invalid(ParseFloatError), + NonFinite(TryFromFloatError), +} + +/// Error type returned when parsing [`FiniteF64`] fails. +#[derive(Debug)] +pub struct ParseFiniteFloatError(ParseFiniteFloatErrorKind); + +impl fmt::Display for ParseFiniteFloatError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match &self.0 { + ParseFiniteFloatErrorKind::Invalid(err) => err.fmt(f), + ParseFiniteFloatErrorKind::NonFinite(err) => err.fmt(f), + } + } +} + +impl Error for ParseFiniteFloatError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + match &self.0 { + ParseFiniteFloatErrorKind::Invalid(err) => Some(err), + ParseFiniteFloatErrorKind::NonFinite(err) => Some(err), + } + } +} + +impl From for ParseFiniteFloatError { + fn from(err: ParseFloatError) -> Self { + Self(ParseFiniteFloatErrorKind::Invalid(err)) + } +} + +impl From for ParseFiniteFloatError { + fn from(err: TryFromFloatError) -> Self { + Self(ParseFiniteFloatErrorKind::NonFinite(err)) + } +} + +impl FromStr for FiniteF64 { + type Err = ParseFiniteFloatError; + + fn from_str(s: &str) -> Result { + Ok(s.parse::()?.try_into()?) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_new() { + assert_eq!(FiniteF64::new(0.0), Some(FiniteF64(0.0))); + assert_eq!(FiniteF64::new(1.0), Some(FiniteF64(1.0))); + assert_eq!(FiniteF64::new(-1.0), Some(FiniteF64(-1.0))); + assert_eq!(FiniteF64::new(f64::MIN), Some(FiniteF64(f64::MIN))); + assert_eq!(FiniteF64::new(f64::MAX), Some(FiniteF64(f64::MAX))); + assert_eq!(FiniteF64::new(f64::NAN), None); + assert_eq!(FiniteF64::new(f64::INFINITY), None); + assert_eq!(FiniteF64::new(f64::NEG_INFINITY), None); + } + + #[test] + fn test_arithmetics() { + assert_eq!(FiniteF64(1.0) + FiniteF64(1.0), Some(FiniteF64(2.0))); + assert_eq!(FiniteF64(f64::MAX) + FiniteF64(f64::MAX), None); + assert_eq!(FiniteF64(f64::MIN) + FiniteF64(f64::MIN), None); + + assert_eq!(FiniteF64(1.0) - FiniteF64(1.0), Some(FiniteF64(0.0))); + assert_eq!( + FiniteF64(f64::MAX) - FiniteF64(f64::MAX), + Some(FiniteF64(0.0)) + ); + assert_eq!( + FiniteF64(f64::MIN) - FiniteF64(f64::MIN), + Some(FiniteF64(0.0)) + ); + + assert_eq!(FiniteF64(2.0) * FiniteF64(2.0), Some(FiniteF64(4.0))); + assert_eq!(FiniteF64(f64::MAX) * FiniteF64(f64::MAX), None); + assert_eq!(FiniteF64(f64::MIN) * FiniteF64(f64::MIN), None); + + assert_eq!(FiniteF64(2.0) / FiniteF64(2.0), Some(FiniteF64(1.0))); + assert_eq!(FiniteF64(2.0) / FiniteF64(0.0), None); // Infinity + assert_eq!(FiniteF64(-2.0) / FiniteF64(0.0), None); // -Infinity + assert_eq!(FiniteF64(0.0) / FiniteF64(0.0), None); // NaN + } + + #[test] + fn test_saturating_add() { + assert_eq!( + FiniteF64(1.0).saturating_add(FiniteF64(1.0)), + FiniteF64(2.0) + ); + assert_eq!( + FiniteF64(f64::MAX).saturating_add(FiniteF64(1.0)), + FiniteF64(f64::MAX) + ); + assert_eq!( + FiniteF64(f64::MIN).saturating_add(FiniteF64(-1.0)), + FiniteF64(f64::MIN) + ); + } + + #[test] + fn test_saturating_sub() { + assert_eq!( + FiniteF64(1.0).saturating_sub(FiniteF64(1.0)), + FiniteF64(0.0) + ); + assert_eq!( + FiniteF64(f64::MAX).saturating_sub(FiniteF64(-1.0)), + FiniteF64(f64::MAX) + ); + assert_eq!( + FiniteF64(f64::MIN).saturating_sub(FiniteF64(1.0)), + FiniteF64(f64::MIN) + ); + } + + #[test] + fn test_saturating_mul() { + assert_eq!( + FiniteF64::from(2).saturating_mul(FiniteF64::from(2)), + FiniteF64::from(4) + ); + assert_eq!( + FiniteF64(f64::MAX).saturating_mul(FiniteF64::from(2)), + FiniteF64(f64::MAX) + ); + assert_eq!( + FiniteF64(f64::MIN).saturating_mul(FiniteF64::from(2)), + FiniteF64(f64::MIN) + ); + } + + #[test] + fn teste_parse() { + assert_eq!("0".parse::().unwrap(), FiniteF64(0.0)); + assert_eq!("0.0".parse::().unwrap(), FiniteF64(0.0)); + + assert!("bla".parse::().is_err()); + assert!("inf".parse::().is_err()); + assert!("-inf".parse::().is_err()); + assert!("NaN".parse::().is_err()); + } +} diff --git a/relay-metrics/src/lib.rs b/relay-metrics/src/lib.rs index 8b602c8a33..b3c8851c91 100644 --- a/relay-metrics/src/lib.rs +++ b/relay-metrics/src/lib.rs @@ -73,6 +73,7 @@ pub mod meta; mod aggregatorservice; mod bucket; +mod finite; mod protocol; mod router; mod statsd; @@ -80,6 +81,7 @@ mod view; pub use aggregatorservice::*; pub use bucket::*; +pub use finite::*; #[cfg(feature = "redis")] pub use meta::RedisMetricMetaStore; pub use meta::{MetaAggregator, MetricMeta}; diff --git a/relay-metrics/src/protocol.rs b/relay-metrics/src/protocol.rs index 42012a2930..433b6a514e 100644 --- a/relay-metrics/src/protocol.rs +++ b/relay-metrics/src/protocol.rs @@ -10,17 +10,19 @@ pub use relay_base_schema::metrics::{ #[doc(inline)] pub use relay_common::time::UnixTimestamp; +use crate::FiniteF64; + /// Type used for Counter metric -pub type CounterType = f64; +pub type CounterType = FiniteF64; /// Type of distribution entries -pub type DistributionType = f64; +pub type DistributionType = FiniteF64; /// Type used for set elements in Set metric pub type SetType = u32; /// Type used for Gauge entries -pub type GaugeType = f64; +pub type GaugeType = FiniteF64; /// Validates a tag key. /// diff --git a/relay-metrics/src/view.rs b/relay-metrics/src/view.rs index a0b88a7ea2..3072901eaf 100644 --- a/relay-metrics/src/view.rs +++ b/relay-metrics/src/view.rs @@ -729,16 +729,19 @@ mod tests { assert_eq!(view.len(), 3); assert_eq!( view.value(), - BucketViewValue::Distribution(&[1.0, 2.0, 3.0]) + BucketViewValue::Distribution(&[1.into(), 2.into(), 3.into()]) ); let view = BucketView::new(&bucket).select(1..3).unwrap(); assert_eq!(view.len(), 2); - assert_eq!(view.value(), BucketViewValue::Distribution(&[2.0, 3.0])); + assert_eq!( + view.value(), + BucketViewValue::Distribution(&[2.into(), 3.into()]) + ); let view = BucketView::new(&bucket).select(1..5).unwrap(); assert_eq!(view.len(), 4); assert_eq!( view.value(), - BucketViewValue::Distribution(&[2.0, 3.0, 5.0, 5.0]) + BucketViewValue::Distribution(&[2.into(), 3.into(), 5.into(), 5.into()]) ); } @@ -786,10 +789,10 @@ mod tests { assert_eq!( view.value(), BucketViewValue::Gauge(GaugeValue { - last: 25.0, - min: 17.0, - max: 42.0, - sum: 220.0, + last: 25.into(), + min: 17.into(), + max: 42.into(), + sum: 220.into(), count: 85 }) ); diff --git a/relay-server/src/actors/processor.rs b/relay-server/src/actors/processor.rs index 7421125dc2..4626ead5c4 100644 --- a/relay-server/src/actors/processor.rs +++ b/relay-server/src/actors/processor.rs @@ -2688,7 +2688,7 @@ mod tests { let derived_value = { let name = "foobar".to_string(); - let value = 5.0; // Arbitrary value. + let value = 5.into(); // Arbitrary value. let unit = MetricUnit::Duration(DurationUnit::default()); let tags = TransactionMeasurementTags { measurement_rating: None, diff --git a/relay-server/src/actors/project.rs b/relay-server/src/actors/project.rs index e978475f26..5b29f955b2 100644 --- a/relay-server/src/actors/project.rs +++ b/relay-server/src/actors/project.rs @@ -1287,7 +1287,7 @@ mod tests { Bucket { name: "d:transactions/foo".to_string(), width: 0, - value: BucketValue::counter(1.0), + value: BucketValue::counter(1.into()), timestamp: UnixTimestamp::now(), tags: Default::default(), } @@ -1387,7 +1387,7 @@ mod tests { fn create_transaction_bucket() -> Bucket { Bucket { name: "d:transactions/foo".to_string(), - value: BucketValue::Counter(1.0), + value: BucketValue::Counter(1.into()), timestamp: UnixTimestamp::now(), tags: Default::default(), width: 10, diff --git a/relay-server/src/metrics_extraction/generic.rs b/relay-server/src/metrics_extraction/generic.rs index baa5350549..c9e71f3a51 100644 --- a/relay-server/src/metrics_extraction/generic.rs +++ b/relay-server/src/metrics_extraction/generic.rs @@ -2,7 +2,7 @@ use std::collections::BTreeMap; use relay_common::time::UnixTimestamp; use relay_dynamic_config::{MetricExtractionConfig, TagMapping, TagSource, TagSpec}; -use relay_metrics::{Bucket, BucketValue, MetricResourceIdentifier, MetricType}; +use relay_metrics::{Bucket, BucketValue, FiniteF64, MetricResourceIdentifier, MetricType}; use relay_protocol::{Getter, Val}; use relay_quotas::DataCategory; @@ -129,14 +129,16 @@ fn read_metric_value( ) -> Option { Some(match ty { MetricType::Counter => BucketValue::counter(match field { - Some(field) => instance.get_value(field)?.as_f64()?, - None => 1.0, + Some(field) => FiniteF64::new(instance.get_value(field)?.as_f64()?)?, + None => 1.into(), }), MetricType::Distribution => { - BucketValue::distribution(instance.get_value(field?)?.as_f64()?) + BucketValue::distribution(FiniteF64::new(instance.get_value(field?)?.as_f64()?)?) } MetricType::Set => BucketValue::set_from_str(instance.get_value(field?)?.as_str()?), - MetricType::Gauge => BucketValue::gauge(instance.get_value(field?)?.as_f64()?), + MetricType::Gauge => { + BucketValue::gauge(FiniteF64::new(instance.get_value(field?)?.as_f64()?)?) + } }) } @@ -425,4 +427,68 @@ mod tests { ] "###); } + + #[test] + fn skip_nonfinite_float() { + let event_json = json!({ + "type": "transaction", + "timestamp": 1597976302.0, + "measurements": { + "valid": {"value": 1.0}, + "invalid": {"value": 0.0}, + } + }); + let mut event = Event::from_value(event_json.into()); + + // Patch event.measurements.test.value to NAN + event + .value_mut() + .as_mut() + .unwrap() + .measurements + .value_mut() + .as_mut() + .unwrap() + .get_mut("invalid") + .unwrap() + .value_mut() + .as_mut() + .unwrap() + .value + .set_value(Some(f64::NAN)); + + let config_json = json!({ + "version": 1, + "metrics": [ + { + "category": "transaction", + "mri": "d:transactions/measurements.valid@none", + "field": "event.measurements.valid.value", + }, + { + "category": "transaction", + "mri": "d:transactions/measurements.invalid@none", + "field": "event.measurements.invalid.value", + } + ] + }); + let config = serde_json::from_value(config_json).unwrap(); + + let metrics = extract_metrics(event.value().unwrap(), &config); + insta::assert_debug_snapshot!(metrics, @r###" + [ + Bucket { + timestamp: UnixTimestamp(1597976302), + width: 0, + name: "d:transactions/measurements.valid@none", + value: Distribution( + [ + 1.0, + ], + ), + tags: {}, + }, + ] + "###); + } } diff --git a/relay-server/src/metrics_extraction/sessions/mod.rs b/relay-server/src/metrics_extraction/sessions/mod.rs index a666c04b53..60333abbc6 100644 --- a/relay-server/src/metrics_extraction/sessions/mod.rs +++ b/relay-server/src/metrics_extraction/sessions/mod.rs @@ -53,7 +53,7 @@ pub fn extract_session_metrics( if session.total_count() > 0 { target.push( SessionMetric::Session { - counter: session.total_count() as f64, + counter: session.total_count().into(), tags: SessionSessionTags { status: "init".to_string(), common_tags: common_tags.clone(), @@ -73,7 +73,7 @@ pub fn extract_session_metrics( .into_metric(timestamp), SessionErrored::Aggregated(count) => SessionMetric::Session { - counter: count as f64, + counter: count.into(), tags: SessionSessionTags { status: "errored_preaggr".to_string(), common_tags: common_tags.clone(), @@ -117,7 +117,7 @@ pub fn extract_session_metrics( if session.abnormal_count() > 0 { target.push( SessionMetric::Session { - counter: session.abnormal_count() as f64, + counter: session.abnormal_count().into(), tags: SessionSessionTags { status: "abnormal".to_string(), common_tags: common_tags.clone(), @@ -151,7 +151,7 @@ pub fn extract_session_metrics( if session.crashed_count() > 0 { target.push( SessionMetric::Session { - counter: session.crashed_count() as f64, + counter: session.crashed_count().into(), tags: SessionSessionTags { status: "crashed".to_string(), common_tags: common_tags.clone(), diff --git a/relay-server/src/metrics_extraction/transactions/mod.rs b/relay-server/src/metrics_extraction/transactions/mod.rs index 0162051e1b..3e1ca320ad 100644 --- a/relay-server/src/metrics_extraction/transactions/mod.rs +++ b/relay-server/src/metrics_extraction/transactions/mod.rs @@ -7,7 +7,7 @@ use relay_event_normalization::utils as normalize_utils; use relay_event_schema::protocol::{ AsPair, BrowserContext, Event, OsContext, TraceContext, TransactionSource, }; -use relay_metrics::{Bucket, DurationUnit}; +use relay_metrics::{Bucket, DurationUnit, FiniteF64}; use crate::metrics_extraction::generic; use crate::metrics_extraction::transactions::types::{ @@ -285,14 +285,12 @@ impl TransactionExtractor<'_> { .collect(); if let Some(measurements) = event.measurements.value() { for (name, annotated) in measurements.iter() { - let measurement = match annotated.value() { - Some(m) => m, - None => continue, + let Some(measurement) = annotated.value() else { + continue; }; - let value = match measurement.value.value() { - Some(value) => *value, - None => continue, + let Some(value) = measurement.value.value().and_then(|v| FiniteF64::new(*v)) else { + continue; }; // We treat a measurement as "performance score" if its name is the name of another @@ -304,7 +302,7 @@ impl TransactionExtractor<'_> { .map_or(false, |suffix| measurement_names.contains(suffix)); let measurement_tags = TransactionMeasurementTags { - measurement_rating: get_measurement_rating(name, value), + measurement_rating: get_measurement_rating(name, value.to_f64()), universal_tags: if is_performance_score { CommonTags( tags.0 @@ -341,15 +339,14 @@ impl TransactionExtractor<'_> { continue; } - let measurement = match annotated.value() { - Some(m) => m, - None => continue, + let Some(m) = annotated.value() else { + continue; }; - let value = match measurement.value.value() { - Some(value) => *value, - None => continue, + let Some(value) = m.value.value().and_then(|v| FiniteF64::new(*v)) else { + continue; }; + metrics.project_metrics.push( TransactionMetric::Breakdown { name: format!("{breakdown}.{measurement_name}"), @@ -375,33 +372,35 @@ impl TransactionExtractor<'_> { // Duration let duration = relay_common::time::chrono_to_positive_millis(end - start); - let has_profile = if self.config.version >= 3 { - false - } else { - self.has_profile - }; - - metrics.project_metrics.push( - TransactionMetric::Duration { - unit: DurationUnit::MilliSecond, - value: duration, - tags: TransactionDurationTags { - has_profile, - universal_tags: tags.clone(), - }, - } - .into_metric(timestamp), - ); - - // Lower cardinality duration - metrics.project_metrics.push( - TransactionMetric::DurationLight { - unit: DurationUnit::MilliSecond, - value: duration, - tags: light_tags, - } - .into_metric(timestamp), - ); + if let Some(duration) = FiniteF64::new(duration) { + let has_profile = if self.config.version >= 3 { + false + } else { + self.has_profile + }; + + metrics.project_metrics.push( + TransactionMetric::Duration { + unit: DurationUnit::MilliSecond, + value: duration, + tags: TransactionDurationTags { + has_profile, + universal_tags: tags.clone(), + }, + } + .into_metric(timestamp), + ); + + // Lower cardinality duration + metrics.project_metrics.push( + TransactionMetric::DurationLight { + unit: DurationUnit::MilliSecond, + value: duration, + tags: light_tags, + } + .into_metric(timestamp), + ); + } let root_counter_tags = { let mut universal_tags = CommonTags(BTreeMap::default()); @@ -410,12 +409,14 @@ impl TransactionExtractor<'_> { .0 .insert(CommonTag::Transaction, transaction_from_dsc.to_string()); } + let decision = if self.sampling_result.should_keep() { + "keep" + } else { + "drop" + }; + TransactionCPRTags { - decision: if self.sampling_result.should_keep() { - "keep".to_owned() - } else { - "drop".to_owned() - }, + decision: decision.to_owned(), universal_tags, } }; @@ -1104,7 +1105,10 @@ mod tests { .unwrap(); assert_eq!(duration_metric.name, "d:transactions/duration@millisecond"); - assert_eq!(duration_metric.value, BucketValue::distribution(59000.0)); + assert_eq!( + duration_metric.value, + BucketValue::distribution(59000.into()) + ); assert_eq!(duration_metric.tags.len(), 4); assert_eq!(duration_metric.tags["release"], "1.2.3"); diff --git a/relay-server/src/metrics_extraction/transactions/types.rs b/relay-server/src/metrics_extraction/transactions/types.rs index 0639758fe9..bba0b1e5b1 100644 --- a/relay-server/src/metrics_extraction/transactions/types.rs +++ b/relay-server/src/metrics_extraction/transactions/types.rs @@ -78,13 +78,13 @@ impl IntoMetric for TransactionMetric { ), Self::Usage { tags } => ( Cow::Borrowed("usage"), - BucketValue::counter(1.0), + BucketValue::counter(1.into()), MetricUnit::None, tags.into(), ), Self::CountPerRootProject { tags } => ( Cow::Borrowed("count_per_root_project"), - BucketValue::counter(1.0), + BucketValue::counter(1.into()), MetricUnit::None, tags.into(), ), diff --git a/relay-server/src/utils/metrics_rate_limits.rs b/relay-server/src/utils/metrics_rate_limits.rs index 0d8bb90718..dcacccad11 100644 --- a/relay-server/src/utils/metrics_rate_limits.rs +++ b/relay-server/src/utils/metrics_rate_limits.rs @@ -60,7 +60,7 @@ fn count_metric_bucket(metric: BucketView<'_>, mode: ExtractionMode) -> Option c as usize, + BucketViewValue::Counter(c) if usage && mri.name == "usage" => c.to_f64() as usize, BucketViewValue::Distribution(d) if !usage && mri.name == "duration" => d.len(), _ => 0, }; @@ -344,7 +344,7 @@ mod tests { width: 0, name: "d:transactions/duration@millisecond".to_string(), tags: Default::default(), - value: BucketValue::distribution(123.0), + value: BucketValue::distribution(123.into()), }, Bucket { // transaction with profile @@ -352,7 +352,7 @@ mod tests { width: 0, name: "d:transactions/duration@millisecond".to_string(), tags: [("has_profile".to_string(), "true".to_string())].into(), - value: BucketValue::distribution(456.0), + value: BucketValue::distribution(456.into()), }, Bucket { // transaction without profile @@ -360,7 +360,7 @@ mod tests { width: 0, name: "c:transactions/usage@none".to_string(), tags: Default::default(), - value: BucketValue::counter(1.0), + value: BucketValue::counter(1.into()), }, Bucket { // transaction with profile @@ -368,7 +368,7 @@ mod tests { width: 0, name: "c:transactions/usage@none".to_string(), tags: [("has_profile".to_string(), "true".to_string())].into(), - value: BucketValue::counter(1.0), + value: BucketValue::counter(1.into()), }, Bucket { // unrelated metric @@ -376,7 +376,7 @@ mod tests { width: 0, name: "something_else".to_string(), tags: [("has_profile".to_string(), "true".to_string())].into(), - value: BucketValue::distribution(123.0), + value: BucketValue::distribution(123.into()), }, ]; let quotas = vec![Quota { @@ -432,7 +432,7 @@ mod tests { width: 0, name: "d:transactions/duration@millisecond".to_string(), tags: Default::default(), - value: BucketValue::distribution(123.0), + value: BucketValue::distribution(123.into()), }, Bucket { // transaction with profile @@ -440,7 +440,7 @@ mod tests { width: 0, name: "d:transactions/duration@millisecond".to_string(), tags: [("has_profile".to_string(), "true".to_string())].into(), - value: BucketValue::distribution(456.0), + value: BucketValue::distribution(456.into()), }, Bucket { // transaction without profile @@ -448,7 +448,7 @@ mod tests { width: 0, name: "c:transactions/usage@none".to_string(), tags: Default::default(), - value: BucketValue::counter(1.0), + value: BucketValue::counter(1.into()), }, Bucket { // transaction with profile @@ -456,7 +456,7 @@ mod tests { width: 0, name: "c:transactions/usage@none".to_string(), tags: [("has_profile".to_string(), "true".to_string())].into(), - value: BucketValue::counter(1.0), + value: BucketValue::counter(1.into()), }, Bucket { // unrelated metric @@ -464,7 +464,7 @@ mod tests { width: 0, name: "something_else".to_string(), tags: [("has_profile".to_string(), "true".to_string())].into(), - value: BucketValue::distribution(123.0), + value: BucketValue::distribution(123.into()), }, ]; let quotas = vec![Quota {