Skip to content

Commit

Permalink
Use DefaultHasher
Browse files Browse the repository at this point in the history
  • Loading branch information
fraillt committed Nov 13, 2024
1 parent 4dd09d9 commit 8e9d216
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 89 deletions.
1 change: 0 additions & 1 deletion opentelemetry-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ tokio = { workspace = true, features = ["rt", "time"], optional = true }
tokio-stream = { workspace = true, optional = true }
http = { workspace = true, optional = true }
tracing = {workspace = true, optional = true}
rustc-hash = "2.0.0"

[package.metadata.docs.rs]
all-features = true
Expand Down
21 changes: 4 additions & 17 deletions opentelemetry-sdk/src/metrics/internal/hashed.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use std::{
borrow::{Borrow, Cow},
hash::{BuildHasher, Hash, Hasher},
hash::{BuildHasher, DefaultHasher, Hash, Hasher},
ops::Deref,
};

use rustc_hash::*;

/// Hash value only once, works with references and owned types.
pub(crate) struct Hashed<'a, T>
where
Expand All @@ -20,11 +18,10 @@ where
T: ToOwned + Hash + ?Sized,
{
pub(crate) fn from_borrowed(value: &'a T) -> Self {
let mut hasher = FxHasher::default();
value.hash(&mut hasher);
let hash = calc_hash(&value);
Self {
value: Cow::Borrowed(value),
hash: hasher.finish(),
hash,
}
}

Expand All @@ -36,16 +33,6 @@ where
}
}

pub(crate) fn mutate(self, f: impl FnOnce(&mut <T as ToOwned>::Owned)) -> Hashed<'static, T> {
let mut value = self.value.into_owned();
f(&mut value);
let hash = calc_hash(value.borrow());
Hashed {
value: Cow::Owned(value),
hash,
}
}

pub(crate) fn into_owned(self) -> Hashed<'static, T> {
let value = self.value.into_owned();
Hashed {
Expand All @@ -63,7 +50,7 @@ fn calc_hash<T>(value: T) -> u64
where
T: Hash,
{
let mut hasher = FxHasher::default();
let mut hasher = DefaultHasher::default();
value.hash(&mut hasher);
hasher.finish()
}
Expand Down
58 changes: 3 additions & 55 deletions opentelemetry-sdk/src/metrics/internal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use hashed::{Hashed, HashedNoOpBuilder};
use once_cell::sync::Lazy;
use opentelemetry::{otel_warn, KeyValue};

use super::sort_and_dedup;

pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy<Hashed<'static, [KeyValue]>> =
Lazy::new(|| Hashed::from_owned(vec![KeyValue::new("otel.metric.overflow", "true")]));

Expand Down Expand Up @@ -97,11 +99,7 @@ where
}

// Try to retrieve and update the tracker with the attributes sorted.
let sorted_attrs = attributes.clone().mutate(|list| {
// use stable sort
list.sort_by(|a, b| a.key.cmp(&b.key));
dedup_remove_first(list, |a, b| a.key == b.key);
});
let sorted_attrs = Hashed::from_owned(sort_and_dedup(&attributes));
if let Some(tracker) = trackers.get(&sorted_attrs) {
tracker.update(value);
return;
Expand Down Expand Up @@ -198,20 +196,6 @@ where
}
}

fn dedup_remove_first<T>(values: &mut Vec<T>, is_eq: impl Fn(&T, &T) -> bool) {
// we cannot use vec.dedup_by because it will remove last duplicate not first
if values.len() > 1 {
let mut i = values.len() - 1;
while i != 0 {
let is_same = unsafe { is_eq(values.get_unchecked(i - 1), values.get_unchecked(i)) };
if is_same {
values.remove(i - 1);
}
i -= 1;
}
}
}

/// Clear and allocate exactly required amount of space for all attribute-sets
fn prepare_data<T>(data: &mut Vec<T>, list_len: usize) {
data.clear();
Expand Down Expand Up @@ -415,45 +399,9 @@ impl AtomicallyUpdate<f64> for f64 {

#[cfg(test)]
mod tests {
use std::usize;

use super::*;

fn assert_deduped<const N: usize, const M: usize>(
input: [(i32, bool); N],
expect: [(i32, bool); M],
) {
let mut list: Vec<(i32, bool)> = Vec::from(input);
dedup_remove_first(&mut list, |a, b| a.0 == b.0);
assert_eq!(list, expect);
}

#[test]
fn deduplicate_by_removing_first_element_from_sorted_array() {
assert_deduped([], []);
assert_deduped([(1, true)], [(1, true)]);
assert_deduped([(1, false), (1, false), (1, true)], [(1, true)]);
assert_deduped(
[(1, true), (2, false), (2, false), (2, true)],
[(1, true), (2, true)],
);
assert_deduped(
[(1, true), (1, false), (1, true), (2, true)],
[(1, true), (2, true)],
);
assert_deduped(
[
(1, false),
(1, true),
(2, false),
(2, true),
(3, false),
(3, true),
],
[(1, true), (2, true), (3, true)],
);
}

#[test]
fn can_store_u64_atomic_value() {
let atomic = u64::new_atomic_tracker(0);
Expand Down
35 changes: 19 additions & 16 deletions opentelemetry-sdk/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,23 +115,27 @@ pub(crate) struct AttributeSet(Vec<KeyValue>, u64);

impl From<&[KeyValue]> for AttributeSet {
fn from(values: &[KeyValue]) -> Self {
let mut seen_keys = HashSet::with_capacity(values.len());
let vec = values
.iter()
.rev()
.filter_map(|kv| {
if seen_keys.insert(kv.key.clone()) {
Some(kv.clone())
} else {
None
}
})
.collect::<Vec<_>>();

AttributeSet::new(vec)
AttributeSet::new(sort_and_dedup(values))
}
}

pub(crate) fn sort_and_dedup(values: &[KeyValue]) -> Vec<KeyValue> {
let mut seen_keys = HashSet::with_capacity(values.len());
let mut vec = values
.iter()
.rev()
.filter_map(|kv| {
if seen_keys.insert(kv.key.clone()) {
Some(kv.clone())
} else {
None
}
})
.collect::<Vec<_>>();
vec.sort_unstable_by(|a, b| a.key.cmp(&b.key));
vec
}

fn calculate_hash(values: &[KeyValue]) -> u64 {
let mut hasher = DefaultHasher::new();
values.iter().fold(&mut hasher, |mut hasher, item| {
Expand All @@ -142,8 +146,7 @@ fn calculate_hash(values: &[KeyValue]) -> u64 {
}

impl AttributeSet {
fn new(mut values: Vec<KeyValue>) -> Self {
values.sort_unstable_by(|a, b| a.key.cmp(&b.key));
fn new(values: Vec<KeyValue>) -> Self {
let hash = calculate_hash(&values);
AttributeSet(values, hash)
}
Expand Down

0 comments on commit 8e9d216

Please sign in to comment.