Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hashed utility for ValueMap to improve hashing performance #2296

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 133 additions & 0 deletions opentelemetry-sdk/src/metrics/internal/hashed.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
use std::{
borrow::{Borrow, Cow},
hash::{BuildHasher, DefaultHasher, Hash, Hasher},
ops::Deref,
};

/// Hash value only once, works with references and owned types.
pub(crate) struct Hashed<'a, T>
where
T: ToOwned + ?Sized,
{
value: Cow<'a, T>,
hash: u64,
}

impl<'a, T> Hashed<'a, T>
where
T: ToOwned + Hash + ?Sized,
{
pub(crate) fn from_borrowed(value: &'a T) -> Self {
let hash = calc_hash(&value);
Self {
value: Cow::Borrowed(value),
hash,
}
}

pub(crate) fn from_owned(value: <T as ToOwned>::Owned) -> Self {
let hash = calc_hash(value.borrow());
Self {
value: Cow::Owned(value),
hash,
}
}

pub(crate) fn into_owned(self) -> Hashed<'static, T> {
let value = self.value.into_owned();
Hashed {
value: Cow::Owned(value),
hash: self.hash,
}
}

pub(crate) fn into_inner_owned(self) -> T::Owned {
self.value.into_owned()
}
}

fn calc_hash<T>(value: T) -> u64
where
T: Hash,
{
let mut hasher = DefaultHasher::default();
value.hash(&mut hasher);
hasher.finish()
}

impl<T> Clone for Hashed<'_, T>
where
T: ToOwned + ?Sized,
{
fn clone(&self) -> Self {
Self {
value: self.value.clone(),
hash: self.hash,
}
}

fn clone_from(&mut self, source: &Self) {
self.value.clone_from(&source.value);
self.hash = source.hash;
}
}

impl<T> Hash for Hashed<'_, T>
where
T: ToOwned + Hash + ?Sized,
{
fn hash<H: Hasher>(&self, state: &mut H) {
state.write_u64(self.hash);
}
}

impl<T> PartialEq for Hashed<'_, T>
where
T: ToOwned + PartialEq + ?Sized,
{
fn eq(&self, other: &Self) -> bool {
self.value.as_ref() == other.value.as_ref()
}
}

impl<T> Eq for Hashed<'_, T> where T: ToOwned + Eq + ?Sized {}

impl<T> Deref for Hashed<'_, T>
where
T: ToOwned + ?Sized,
{
type Target = T;

fn deref(&self) -> &Self::Target {
self.value.deref()
}
}

/// Used to make [`Hashed`] values no-op in [`HashMap`](std::collections::HashMap) or [`HashSet`](std::collections::HashSet).
/// For all other keys types (except for [`u64`]) it will panic.
#[derive(Default, Clone)]
pub(crate) struct HashedNoOpBuilder {
hashed: u64,
}

impl Hasher for HashedNoOpBuilder {
fn finish(&self) -> u64 {
self.hashed
}

fn write(&mut self, _bytes: &[u8]) {
panic!("Only works with `Hashed` value")
}

fn write_u64(&mut self, i: u64) {
self.hashed = i;
}
}

impl BuildHasher for HashedNoOpBuilder {
type Hasher = HashedNoOpBuilder;

fn build_hasher(&self) -> Self::Hasher {
HashedNoOpBuilder::default()
}
}
36 changes: 22 additions & 14 deletions opentelemetry-sdk/src/metrics/internal/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod aggregate;
mod exponential_histogram;
mod hashed;
mod histogram;
mod last_value;
mod precomputed_sum;
Expand All @@ -15,13 +16,14 @@ use std::sync::{Arc, RwLock};
use aggregate::is_under_cardinality_limit;
pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure};
pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE};
use hashed::{Hashed, HashedNoOpBuilder};
use once_cell::sync::Lazy;
use opentelemetry::{otel_warn, KeyValue};

use crate::metrics::AttributeSet;
use super::sort_and_dedup;

pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy<Vec<KeyValue>> =
Lazy::new(|| vec![KeyValue::new("otel.metric.overflow", "true")]);
pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy<Hashed<'static, [KeyValue]>> =
Lazy::new(|| Hashed::from_owned(vec![KeyValue::new("otel.metric.overflow", "true")]));

pub(crate) trait Aggregator {
/// A static configuration that is needed in order to initialize aggregator.
Expand Down Expand Up @@ -52,7 +54,7 @@ where
A: Aggregator,
{
/// Trackers store the values associated with different attribute sets.
trackers: RwLock<HashMap<Vec<KeyValue>, Arc<A>>>,
trackers: RwLock<HashMap<Hashed<'static, [KeyValue]>, Arc<A>, HashedNoOpBuilder>>,
/// Number of different attribute set stored in the `trackers` map.
count: AtomicUsize,
/// Indicates whether a value with no attributes has been stored.
Expand All @@ -69,7 +71,7 @@ where
{
fn new(config: A::InitConfig) -> Self {
ValueMap {
trackers: RwLock::new(HashMap::new()),
trackers: RwLock::new(HashMap::default()),
has_no_attribute_value: AtomicBool::new(false),
no_attribute_tracker: A::create(&config),
count: AtomicUsize::new(0),
Expand All @@ -84,19 +86,21 @@ where
return;
}

let attributes = Hashed::from_borrowed(attributes);

let Ok(trackers) = self.trackers.read() else {
return;
};

// Try to retrieve and update the tracker with the attributes in the provided order first
if let Some(tracker) = trackers.get(attributes) {
if let Some(tracker) = trackers.get(&attributes) {
tracker.update(value);
return;
}

// Try to retrieve and update the tracker with the attributes sorted.
let sorted_attrs = AttributeSet::from(attributes).into_vec();
if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
let sorted_attrs = Hashed::from_owned(sort_and_dedup(&attributes));
if let Some(tracker) = trackers.get(&sorted_attrs) {
tracker.update(value);
return;
}
Expand All @@ -110,20 +114,20 @@ where

// Recheck both the provided and sorted orders after acquiring the write lock
// in case another thread has pushed an update in the meantime.
if let Some(tracker) = trackers.get(attributes) {
if let Some(tracker) = trackers.get(&attributes) {
tracker.update(value);
} else if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
} else if let Some(tracker) = trackers.get(&sorted_attrs) {
tracker.update(value);
} else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) {
let new_tracker = Arc::new(A::create(&self.config));
new_tracker.update(value);

// Insert tracker with the attributes in the provided and sorted orders
trackers.insert(attributes.to_vec(), new_tracker.clone());
trackers.insert(attributes.into_owned(), new_tracker.clone());
trackers.insert(sorted_attrs, new_tracker);

self.count.fetch_add(1, Ordering::SeqCst);
} else if let Some(overflow_value) = trackers.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) {
} else if let Some(overflow_value) = trackers.get(&STREAM_OVERFLOW_ATTRIBUTES) {
overflow_value.update(value);
} else {
let new_tracker = A::create(&self.config);
Expand Down Expand Up @@ -153,7 +157,7 @@ where
let mut seen = HashSet::new();
for (attrs, tracker) in trackers.iter() {
if seen.insert(Arc::as_ptr(tracker)) {
dest.push(map_fn(attrs.clone(), tracker));
dest.push(map_fn(attrs.clone().into_inner_owned(), tracker));
}
}
}
Expand Down Expand Up @@ -183,7 +187,10 @@ where
let mut seen = HashSet::new();
for (attrs, tracker) in trackers.into_iter() {
if seen.insert(Arc::as_ptr(&tracker)) {
dest.push(map_fn(attrs, tracker.clone_and_reset(&self.config)));
dest.push(map_fn(
attrs.into_inner_owned(),
tracker.clone_and_reset(&self.config),
));
}
}
}
Expand Down Expand Up @@ -392,6 +399,7 @@ impl AtomicallyUpdate<f64> for f64 {

#[cfg(test)]
mod tests {

use super::*;

#[test]
Expand Down
40 changes: 19 additions & 21 deletions opentelemetry-sdk/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,23 +115,27 @@ pub(crate) struct AttributeSet(Vec<KeyValue>, u64);

impl From<&[KeyValue]> for AttributeSet {
fn from(values: &[KeyValue]) -> Self {
let mut seen_keys = HashSet::with_capacity(values.len());
let vec = values
.iter()
.rev()
.filter_map(|kv| {
if seen_keys.insert(kv.key.clone()) {
Some(kv.clone())
} else {
None
}
})
.collect::<Vec<_>>();

AttributeSet::new(vec)
AttributeSet::new(sort_and_dedup(values))
}
}

pub(crate) fn sort_and_dedup(values: &[KeyValue]) -> Vec<KeyValue> {
let mut seen_keys = HashSet::with_capacity(values.len());
let mut vec = values
.iter()
.rev()
.filter_map(|kv| {
if seen_keys.insert(kv.key.clone()) {
Some(kv.clone())
} else {
None
}
})
.collect::<Vec<_>>();
vec.sort_unstable_by(|a, b| a.key.cmp(&b.key));
vec
}

fn calculate_hash(values: &[KeyValue]) -> u64 {
let mut hasher = DefaultHasher::new();
values.iter().fold(&mut hasher, |mut hasher, item| {
Expand All @@ -142,8 +146,7 @@ fn calculate_hash(values: &[KeyValue]) -> u64 {
}

impl AttributeSet {
fn new(mut values: Vec<KeyValue>) -> Self {
values.sort_unstable_by(|a, b| a.key.cmp(&b.key));
fn new(values: Vec<KeyValue>) -> Self {
let hash = calculate_hash(&values);
AttributeSet(values, hash)
}
Expand All @@ -152,11 +155,6 @@ impl AttributeSet {
pub(crate) fn iter(&self) -> impl Iterator<Item = (&Key, &Value)> {
self.0.iter().map(|kv| (&kv.key, &kv.value))
}

/// Returns the underlying Vec of KeyValue pairs
pub(crate) fn into_vec(self) -> Vec<KeyValue> {
self.0
}
}

impl Hash for AttributeSet {
Expand Down
Loading