From 4dd09d9d9c049f5a981466d345b4e416481506cb Mon Sep 17 00:00:00 2001 From: Mindaugas Vinkelis Date: Sat, 9 Nov 2024 22:57:21 +0200 Subject: [PATCH 1/2] Hashed utility for ValueMap to improve hashing performance --- opentelemetry-sdk/Cargo.toml | 1 + .../src/metrics/internal/hashed.rs | 146 ++++++++++++++++++ opentelemetry-sdk/src/metrics/internal/mod.rs | 90 +++++++++-- opentelemetry-sdk/src/metrics/mod.rs | 5 - 4 files changed, 222 insertions(+), 20 deletions(-) create mode 100644 opentelemetry-sdk/src/metrics/internal/hashed.rs diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index 39928fecb3..8a7d648afd 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -29,6 +29,7 @@ tokio = { workspace = true, features = ["rt", "time"], optional = true } tokio-stream = { workspace = true, optional = true } http = { workspace = true, optional = true } tracing = {workspace = true, optional = true} +rustc-hash = "2.0.0" [package.metadata.docs.rs] all-features = true diff --git a/opentelemetry-sdk/src/metrics/internal/hashed.rs b/opentelemetry-sdk/src/metrics/internal/hashed.rs new file mode 100644 index 0000000000..387cc7444a --- /dev/null +++ b/opentelemetry-sdk/src/metrics/internal/hashed.rs @@ -0,0 +1,146 @@ +use std::{ + borrow::{Borrow, Cow}, + hash::{BuildHasher, Hash, Hasher}, + ops::Deref, +}; + +use rustc_hash::*; + +/// Hash value only once, works with references and owned types. +pub(crate) struct Hashed<'a, T> +where + T: ToOwned + ?Sized, +{ + value: Cow<'a, T>, + hash: u64, +} + +impl<'a, T> Hashed<'a, T> +where + T: ToOwned + Hash + ?Sized, +{ + pub(crate) fn from_borrowed(value: &'a T) -> Self { + let mut hasher = FxHasher::default(); + value.hash(&mut hasher); + Self { + value: Cow::Borrowed(value), + hash: hasher.finish(), + } + } + + pub(crate) fn from_owned(value: ::Owned) -> Self { + let hash = calc_hash(value.borrow()); + Self { + value: Cow::Owned(value), + hash, + } + } + + pub(crate) fn mutate(self, f: impl FnOnce(&mut ::Owned)) -> Hashed<'static, T> { + let mut value = self.value.into_owned(); + f(&mut value); + let hash = calc_hash(value.borrow()); + Hashed { + value: Cow::Owned(value), + hash, + } + } + + pub(crate) fn into_owned(self) -> Hashed<'static, T> { + let value = self.value.into_owned(); + Hashed { + value: Cow::Owned(value), + hash: self.hash, + } + } + + pub(crate) fn into_inner_owned(self) -> T::Owned { + self.value.into_owned() + } +} + +fn calc_hash(value: T) -> u64 +where + T: Hash, +{ + let mut hasher = FxHasher::default(); + value.hash(&mut hasher); + hasher.finish() +} + +impl Clone for Hashed<'_, T> +where + T: ToOwned + ?Sized, +{ + fn clone(&self) -> Self { + Self { + value: self.value.clone(), + hash: self.hash, + } + } + + fn clone_from(&mut self, source: &Self) { + self.value.clone_from(&source.value); + self.hash = source.hash; + } +} + +impl Hash for Hashed<'_, T> +where + T: ToOwned + Hash + ?Sized, +{ + fn hash(&self, state: &mut H) { + state.write_u64(self.hash); + } +} + +impl PartialEq for Hashed<'_, T> +where + T: ToOwned + PartialEq + ?Sized, +{ + fn eq(&self, other: &Self) -> bool { + self.value.as_ref() == other.value.as_ref() + } +} + +impl Eq for Hashed<'_, T> where T: ToOwned + Eq + ?Sized {} + +impl Deref for Hashed<'_, T> +where + T: ToOwned + ?Sized, +{ + type Target = T; + + fn deref(&self) -> &Self::Target { + self.value.deref() + } +} + +/// Used to make [`Hashed`] values no-op in [`HashMap`](std::collections::HashMap) or [`HashSet`](std::collections::HashSet). +/// For all other keys types (except for [`u64`]) it will panic. +#[derive(Default, Clone)] +pub(crate) struct HashedNoOpBuilder { + hashed: u64, +} + +impl Hasher for HashedNoOpBuilder { + fn finish(&self) -> u64 { + self.hashed + } + + fn write(&mut self, _bytes: &[u8]) { + panic!("Only works with `Hashed` value") + } + + fn write_u64(&mut self, i: u64) { + self.hashed = i; + } +} + +impl BuildHasher for HashedNoOpBuilder { + type Hasher = HashedNoOpBuilder; + + fn build_hasher(&self) -> Self::Hasher { + HashedNoOpBuilder::default() + } +} diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index 8b6136d7ce..e20c615fb6 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -1,5 +1,6 @@ mod aggregate; mod exponential_histogram; +mod hashed; mod histogram; mod last_value; mod precomputed_sum; @@ -15,13 +16,12 @@ use std::sync::{Arc, RwLock}; use aggregate::is_under_cardinality_limit; pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure}; pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE}; +use hashed::{Hashed, HashedNoOpBuilder}; use once_cell::sync::Lazy; use opentelemetry::{otel_warn, KeyValue}; -use crate::metrics::AttributeSet; - -pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy> = - Lazy::new(|| vec![KeyValue::new("otel.metric.overflow", "true")]); +pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy> = + Lazy::new(|| Hashed::from_owned(vec![KeyValue::new("otel.metric.overflow", "true")])); pub(crate) trait Aggregator { /// A static configuration that is needed in order to initialize aggregator. @@ -52,7 +52,7 @@ where A: Aggregator, { /// Trackers store the values associated with different attribute sets. - trackers: RwLock, Arc>>, + trackers: RwLock, Arc, HashedNoOpBuilder>>, /// Number of different attribute set stored in the `trackers` map. count: AtomicUsize, /// Indicates whether a value with no attributes has been stored. @@ -69,7 +69,7 @@ where { fn new(config: A::InitConfig) -> Self { ValueMap { - trackers: RwLock::new(HashMap::new()), + trackers: RwLock::new(HashMap::default()), has_no_attribute_value: AtomicBool::new(false), no_attribute_tracker: A::create(&config), count: AtomicUsize::new(0), @@ -84,19 +84,25 @@ where return; } + let attributes = Hashed::from_borrowed(attributes); + let Ok(trackers) = self.trackers.read() else { return; }; // Try to retrieve and update the tracker with the attributes in the provided order first - if let Some(tracker) = trackers.get(attributes) { + if let Some(tracker) = trackers.get(&attributes) { tracker.update(value); return; } // Try to retrieve and update the tracker with the attributes sorted. - let sorted_attrs = AttributeSet::from(attributes).into_vec(); - if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) { + let sorted_attrs = attributes.clone().mutate(|list| { + // use stable sort + list.sort_by(|a, b| a.key.cmp(&b.key)); + dedup_remove_first(list, |a, b| a.key == b.key); + }); + if let Some(tracker) = trackers.get(&sorted_attrs) { tracker.update(value); return; } @@ -110,20 +116,20 @@ where // Recheck both the provided and sorted orders after acquiring the write lock // in case another thread has pushed an update in the meantime. - if let Some(tracker) = trackers.get(attributes) { + if let Some(tracker) = trackers.get(&attributes) { tracker.update(value); - } else if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) { + } else if let Some(tracker) = trackers.get(&sorted_attrs) { tracker.update(value); } else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) { let new_tracker = Arc::new(A::create(&self.config)); new_tracker.update(value); // Insert tracker with the attributes in the provided and sorted orders - trackers.insert(attributes.to_vec(), new_tracker.clone()); + trackers.insert(attributes.into_owned(), new_tracker.clone()); trackers.insert(sorted_attrs, new_tracker); self.count.fetch_add(1, Ordering::SeqCst); - } else if let Some(overflow_value) = trackers.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) { + } else if let Some(overflow_value) = trackers.get(&STREAM_OVERFLOW_ATTRIBUTES) { overflow_value.update(value); } else { let new_tracker = A::create(&self.config); @@ -153,7 +159,7 @@ where let mut seen = HashSet::new(); for (attrs, tracker) in trackers.iter() { if seen.insert(Arc::as_ptr(tracker)) { - dest.push(map_fn(attrs.clone(), tracker)); + dest.push(map_fn(attrs.clone().into_inner_owned(), tracker)); } } } @@ -183,8 +189,25 @@ where let mut seen = HashSet::new(); for (attrs, tracker) in trackers.into_iter() { if seen.insert(Arc::as_ptr(&tracker)) { - dest.push(map_fn(attrs, tracker.clone_and_reset(&self.config))); + dest.push(map_fn( + attrs.into_inner_owned(), + tracker.clone_and_reset(&self.config), + )); + } + } + } +} + +fn dedup_remove_first(values: &mut Vec, is_eq: impl Fn(&T, &T) -> bool) { + // we cannot use vec.dedup_by because it will remove last duplicate not first + if values.len() > 1 { + let mut i = values.len() - 1; + while i != 0 { + let is_same = unsafe { is_eq(values.get_unchecked(i - 1), values.get_unchecked(i)) }; + if is_same { + values.remove(i - 1); } + i -= 1; } } } @@ -392,8 +415,45 @@ impl AtomicallyUpdate for f64 { #[cfg(test)] mod tests { + use std::usize; + use super::*; + fn assert_deduped( + input: [(i32, bool); N], + expect: [(i32, bool); M], + ) { + let mut list: Vec<(i32, bool)> = Vec::from(input); + dedup_remove_first(&mut list, |a, b| a.0 == b.0); + assert_eq!(list, expect); + } + + #[test] + fn deduplicate_by_removing_first_element_from_sorted_array() { + assert_deduped([], []); + assert_deduped([(1, true)], [(1, true)]); + assert_deduped([(1, false), (1, false), (1, true)], [(1, true)]); + assert_deduped( + [(1, true), (2, false), (2, false), (2, true)], + [(1, true), (2, true)], + ); + assert_deduped( + [(1, true), (1, false), (1, true), (2, true)], + [(1, true), (2, true)], + ); + assert_deduped( + [ + (1, false), + (1, true), + (2, false), + (2, true), + (3, false), + (3, true), + ], + [(1, true), (2, true), (3, true)], + ); + } + #[test] fn can_store_u64_atomic_value() { let atomic = u64::new_atomic_tracker(0); diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 7db8a63ec2..ca70ccfe24 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -152,11 +152,6 @@ impl AttributeSet { pub(crate) fn iter(&self) -> impl Iterator { self.0.iter().map(|kv| (&kv.key, &kv.value)) } - - /// Returns the underlying Vec of KeyValue pairs - pub(crate) fn into_vec(self) -> Vec { - self.0 - } } impl Hash for AttributeSet { From 8e9d21694c8feb20f2af0bf070e9c94e77ecd455 Mon Sep 17 00:00:00 2001 From: Mindaugas Vinkelis Date: Wed, 13 Nov 2024 07:23:15 +0200 Subject: [PATCH 2/2] Use DefaultHasher --- opentelemetry-sdk/Cargo.toml | 1 - .../src/metrics/internal/hashed.rs | 21 ++----- opentelemetry-sdk/src/metrics/internal/mod.rs | 58 +------------------ opentelemetry-sdk/src/metrics/mod.rs | 35 ++++++----- 4 files changed, 26 insertions(+), 89 deletions(-) diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index 8a7d648afd..39928fecb3 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -29,7 +29,6 @@ tokio = { workspace = true, features = ["rt", "time"], optional = true } tokio-stream = { workspace = true, optional = true } http = { workspace = true, optional = true } tracing = {workspace = true, optional = true} -rustc-hash = "2.0.0" [package.metadata.docs.rs] all-features = true diff --git a/opentelemetry-sdk/src/metrics/internal/hashed.rs b/opentelemetry-sdk/src/metrics/internal/hashed.rs index 387cc7444a..d66d8f898e 100644 --- a/opentelemetry-sdk/src/metrics/internal/hashed.rs +++ b/opentelemetry-sdk/src/metrics/internal/hashed.rs @@ -1,11 +1,9 @@ use std::{ borrow::{Borrow, Cow}, - hash::{BuildHasher, Hash, Hasher}, + hash::{BuildHasher, DefaultHasher, Hash, Hasher}, ops::Deref, }; -use rustc_hash::*; - /// Hash value only once, works with references and owned types. pub(crate) struct Hashed<'a, T> where @@ -20,11 +18,10 @@ where T: ToOwned + Hash + ?Sized, { pub(crate) fn from_borrowed(value: &'a T) -> Self { - let mut hasher = FxHasher::default(); - value.hash(&mut hasher); + let hash = calc_hash(&value); Self { value: Cow::Borrowed(value), - hash: hasher.finish(), + hash, } } @@ -36,16 +33,6 @@ where } } - pub(crate) fn mutate(self, f: impl FnOnce(&mut ::Owned)) -> Hashed<'static, T> { - let mut value = self.value.into_owned(); - f(&mut value); - let hash = calc_hash(value.borrow()); - Hashed { - value: Cow::Owned(value), - hash, - } - } - pub(crate) fn into_owned(self) -> Hashed<'static, T> { let value = self.value.into_owned(); Hashed { @@ -63,7 +50,7 @@ fn calc_hash(value: T) -> u64 where T: Hash, { - let mut hasher = FxHasher::default(); + let mut hasher = DefaultHasher::default(); value.hash(&mut hasher); hasher.finish() } diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index e20c615fb6..6f5ac6acae 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -20,6 +20,8 @@ use hashed::{Hashed, HashedNoOpBuilder}; use once_cell::sync::Lazy; use opentelemetry::{otel_warn, KeyValue}; +use super::sort_and_dedup; + pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy> = Lazy::new(|| Hashed::from_owned(vec![KeyValue::new("otel.metric.overflow", "true")])); @@ -97,11 +99,7 @@ where } // Try to retrieve and update the tracker with the attributes sorted. - let sorted_attrs = attributes.clone().mutate(|list| { - // use stable sort - list.sort_by(|a, b| a.key.cmp(&b.key)); - dedup_remove_first(list, |a, b| a.key == b.key); - }); + let sorted_attrs = Hashed::from_owned(sort_and_dedup(&attributes)); if let Some(tracker) = trackers.get(&sorted_attrs) { tracker.update(value); return; @@ -198,20 +196,6 @@ where } } -fn dedup_remove_first(values: &mut Vec, is_eq: impl Fn(&T, &T) -> bool) { - // we cannot use vec.dedup_by because it will remove last duplicate not first - if values.len() > 1 { - let mut i = values.len() - 1; - while i != 0 { - let is_same = unsafe { is_eq(values.get_unchecked(i - 1), values.get_unchecked(i)) }; - if is_same { - values.remove(i - 1); - } - i -= 1; - } - } -} - /// Clear and allocate exactly required amount of space for all attribute-sets fn prepare_data(data: &mut Vec, list_len: usize) { data.clear(); @@ -415,45 +399,9 @@ impl AtomicallyUpdate for f64 { #[cfg(test)] mod tests { - use std::usize; use super::*; - fn assert_deduped( - input: [(i32, bool); N], - expect: [(i32, bool); M], - ) { - let mut list: Vec<(i32, bool)> = Vec::from(input); - dedup_remove_first(&mut list, |a, b| a.0 == b.0); - assert_eq!(list, expect); - } - - #[test] - fn deduplicate_by_removing_first_element_from_sorted_array() { - assert_deduped([], []); - assert_deduped([(1, true)], [(1, true)]); - assert_deduped([(1, false), (1, false), (1, true)], [(1, true)]); - assert_deduped( - [(1, true), (2, false), (2, false), (2, true)], - [(1, true), (2, true)], - ); - assert_deduped( - [(1, true), (1, false), (1, true), (2, true)], - [(1, true), (2, true)], - ); - assert_deduped( - [ - (1, false), - (1, true), - (2, false), - (2, true), - (3, false), - (3, true), - ], - [(1, true), (2, true), (3, true)], - ); - } - #[test] fn can_store_u64_atomic_value() { let atomic = u64::new_atomic_tracker(0); diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index ca70ccfe24..54a606243b 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -115,23 +115,27 @@ pub(crate) struct AttributeSet(Vec, u64); impl From<&[KeyValue]> for AttributeSet { fn from(values: &[KeyValue]) -> Self { - let mut seen_keys = HashSet::with_capacity(values.len()); - let vec = values - .iter() - .rev() - .filter_map(|kv| { - if seen_keys.insert(kv.key.clone()) { - Some(kv.clone()) - } else { - None - } - }) - .collect::>(); - - AttributeSet::new(vec) + AttributeSet::new(sort_and_dedup(values)) } } +pub(crate) fn sort_and_dedup(values: &[KeyValue]) -> Vec { + let mut seen_keys = HashSet::with_capacity(values.len()); + let mut vec = values + .iter() + .rev() + .filter_map(|kv| { + if seen_keys.insert(kv.key.clone()) { + Some(kv.clone()) + } else { + None + } + }) + .collect::>(); + vec.sort_unstable_by(|a, b| a.key.cmp(&b.key)); + vec +} + fn calculate_hash(values: &[KeyValue]) -> u64 { let mut hasher = DefaultHasher::new(); values.iter().fold(&mut hasher, |mut hasher, item| { @@ -142,8 +146,7 @@ fn calculate_hash(values: &[KeyValue]) -> u64 { } impl AttributeSet { - fn new(mut values: Vec) -> Self { - values.sort_unstable_by(|a, b| a.key.cmp(&b.key)); + fn new(values: Vec) -> Self { let hash = calculate_hash(&values); AttributeSet(values, hash) }