Skip to content

Commit

Permalink
Merge branch 'main' into add_force_flush_log
Browse files Browse the repository at this point in the history
  • Loading branch information
ThomsonTan authored Nov 7, 2024
2 parents 4ed3b73 + c322a50 commit 125af29
Showing 1 changed file with 46 additions and 125 deletions.
171 changes: 46 additions & 125 deletions opentelemetry-sdk/src/metrics/internal/histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,7 @@ use opentelemetry::KeyValue;
use super::ValueMap;
use super::{Aggregator, Number};

struct HistogramTracker<T> {
buckets: Mutex<Buckets<T>>,
}

impl<T> Aggregator for HistogramTracker<T>
impl<T> Aggregator for Mutex<Buckets<T>>
where
T: Number,
{
Expand All @@ -22,27 +18,26 @@ where
type PreComputedValue = (T, usize);

fn update(&self, (value, index): (T, usize)) {
let mut buckets = match self.buckets.lock() {
Ok(guard) => guard,
Err(_) => return,
};
let mut buckets = self.lock().unwrap_or_else(|err| err.into_inner());

buckets.bin(index, value);
buckets.sum(value);
buckets.total += value;
buckets.count += 1;
buckets.counts[index] += 1;
if value < buckets.min {
buckets.min = value;
}
if value > buckets.max {
buckets.max = value
}
}

fn create(count: &usize) -> Self {
HistogramTracker {
buckets: Mutex::new(Buckets::<T>::new(*count)),
}
Mutex::new(Buckets::<T>::new(*count))
}

fn clone_and_reset(&self, count: &usize) -> Self {
let mut current = self.buckets.lock().unwrap_or_else(|err| err.into_inner());
let cloned = replace(current.deref_mut(), Buckets::new(*count));
Self {
buckets: Mutex::new(cloned),
}
let mut current = self.lock().unwrap_or_else(|err| err.into_inner());
Mutex::new(replace(current.deref_mut(), Buckets::new(*count)))
}
}

Expand All @@ -65,62 +60,34 @@ impl<T: Number> Buckets<T> {
..Default::default()
}
}

fn sum(&mut self, value: T) {
self.total += value;
}

fn bin(&mut self, idx: usize, value: T) {
self.counts[idx] += 1;
self.count += 1;
if value < self.min {
self.min = value;
}
if value > self.max {
self.max = value
}
}
}

/// Summarizes a set of measurements as a histogram with explicitly defined
/// buckets.
pub(crate) struct Histogram<T: Number> {
value_map: ValueMap<HistogramTracker<T>>,
value_map: ValueMap<Mutex<Buckets<T>>>,
bounds: Vec<f64>,
record_min_max: bool,
record_sum: bool,
start: Mutex<SystemTime>,
}

impl<T: Number> Histogram<T> {
pub(crate) fn new(boundaries: Vec<f64>, record_min_max: bool, record_sum: bool) -> Self {
// TODO fix the bug, by first removing NaN and only then getting buckets_count
// once we know the reason for performance degradation
let buckets_count = boundaries.len() + 1;
let mut histogram = Histogram {
pub(crate) fn new(mut bounds: Vec<f64>, record_min_max: bool, record_sum: bool) -> Self {
bounds.retain(|v| !v.is_nan());
bounds.sort_by(|a, b| a.partial_cmp(b).expect("NaNs filtered out"));
let buckets_count = bounds.len() + 1;
Histogram {
value_map: ValueMap::new(buckets_count),
bounds: boundaries,
bounds,
record_min_max,
record_sum,
start: Mutex::new(SystemTime::now()),
};

histogram.bounds.retain(|v| !v.is_nan());
histogram
.bounds
.sort_by(|a, b| a.partial_cmp(b).expect("NaNs filtered out"));

histogram
}
}

pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) {
let f = measurement.into_float();
// Ignore NaN and infinity.
// Only makes sense if T is f64, maybe this could be no-op for other cases?
// TODO: uncomment once we know the reason for performance degradation
// if f.is_infinite() || f.is_nan() {
// return;
// }
// This search will return an index in the range `[0, bounds.len()]`, where
// it will return `bounds.len()` if value is greater than the last element
// of `bounds`. This aligns with the buckets in that the length of buckets
Expand Down Expand Up @@ -156,17 +123,14 @@ impl<T: Number> Histogram<T> {

self.value_map
.collect_and_reset(&mut h.data_points, |attributes, aggr| {
let b = aggr
.buckets
.into_inner()
.unwrap_or_else(|err| err.into_inner());
let b = aggr.into_inner().unwrap_or_else(|err| err.into_inner());
HistogramDataPoint {
attributes,
start_time: prev_start,
time: t,
count: b.count,
bounds: self.bounds.clone(),
bucket_counts: b.counts.clone(),
bucket_counts: b.counts,
sum: if self.record_sum {
b.total
} else {
Expand Down Expand Up @@ -214,7 +178,7 @@ impl<T: Number> Histogram<T> {

self.value_map
.collect_readonly(&mut h.data_points, |attributes, aggr| {
let b = aggr.buckets.lock().unwrap_or_else(|err| err.into_inner());
let b = aggr.lock().unwrap_or_else(|err| err.into_inner());
HistogramDataPoint {
attributes,
start_time: prev_start,
Expand Down Expand Up @@ -245,68 +209,25 @@ impl<T: Number> Histogram<T> {
}
}

// TODO: uncomment once we know the reason for performance degradation
// #[cfg(test)]
// mod tests {
#[cfg(test)]
mod tests {
use super::*;

// use super::*;

// #[test]
// fn when_f64_is_nan_or_infinity_then_ignore() {
// struct Expected {
// min: f64,
// max: f64,
// sum: f64,
// count: u64,
// }
// impl Expected {
// fn new(min: f64, max: f64, sum: f64, count: u64) -> Self {
// Expected {
// min,
// max,
// sum,
// count,
// }
// }
// }
// struct TestCase {
// values: Vec<f64>,
// expected: Expected,
// }

// let test_cases = vec![
// TestCase {
// values: vec![2.0, 4.0, 1.0],
// expected: Expected::new(1.0, 4.0, 7.0, 3),
// },
// TestCase {
// values: vec![2.0, 4.0, 1.0, f64::INFINITY],
// expected: Expected::new(1.0, 4.0, 7.0, 3),
// },
// TestCase {
// values: vec![2.0, 4.0, 1.0, -f64::INFINITY],
// expected: Expected::new(1.0, 4.0, 7.0, 3),
// },
// TestCase {
// values: vec![2.0, f64::NAN, 4.0, 1.0],
// expected: Expected::new(1.0, 4.0, 7.0, 3),
// },
// TestCase {
// values: vec![4.0, 4.0, 4.0, 2.0, 16.0, 1.0],
// expected: Expected::new(1.0, 16.0, 31.0, 6),
// },
// ];

// for test in test_cases {
// let h = Histogram::new(vec![], true, true);
// for v in test.values {
// h.measure(v, &[]);
// }
// let res = h.value_map.no_attribute_tracker.buckets.lock().unwrap();
// assert_eq!(test.expected.max, res.max);
// assert_eq!(test.expected.min, res.min);
// assert_eq!(test.expected.sum, res.total);
// assert_eq!(test.expected.count, res.count);
// }
// }
// }
#[test]
fn check_buckets_are_selected_correctly() {
let hist = Histogram::<i64>::new(vec![1.0, 3.0, 6.0], false, false);
for v in 1..11 {
hist.measure(v, &[]);
}
let (count, dp) = hist.cumulative(None);
let dp = dp.unwrap();
let dp = dp.as_any().downcast_ref::<data::Histogram<i64>>().unwrap();
assert_eq!(count, 1);
assert_eq!(dp.data_points[0].count, 10);
assert_eq!(dp.data_points[0].bucket_counts.len(), 4);
assert_eq!(dp.data_points[0].bucket_counts[0], 1); // 1
assert_eq!(dp.data_points[0].bucket_counts[1], 2); // 2, 3
assert_eq!(dp.data_points[0].bucket_counts[2], 3); // 4, 5, 6
assert_eq!(dp.data_points[0].bucket_counts[3], 4); // 7, 8, 9, 10
}
}

0 comments on commit 125af29

Please sign in to comment.