From 0a7037e7c05f673d85bc7e8d44ca318fca33e61d Mon Sep 17 00:00:00 2001 From: David Herberth Date: Wed, 8 May 2024 14:39:42 +0200 Subject: [PATCH] ref(metrics): Make buckets view generic over the contained buckets (#3569) The idea is that `BucketsView` now works on everything that contains buckets `AsRef<[Bucket]>`, this allows us to transfer bucket views in channels and especially it allows us to send `BucketsView>` which we will need to track outcomes from an upstream request callback. --- relay-metrics/src/view.rs | 259 +++++++++++-------------- relay-server/src/services/processor.rs | 2 +- 2 files changed, 117 insertions(+), 144 deletions(-) diff --git a/relay-metrics/src/view.rs b/relay-metrics/src/view.rs index 8aab8bfdd6..c9a75d0030 100644 --- a/relay-metrics/src/view.rs +++ b/relay-metrics/src/view.rs @@ -59,9 +59,10 @@ struct Index { /// /// Using the above example, iterating over `View 1` yields the buckets: /// `[C:1], [C:12], [D:0, 1, 2, 3]`. -pub struct BucketsView<'a> { - /// Source slice of buckets. - inner: &'a [Bucket], +#[derive(Clone, Copy)] +pub struct BucketsView { + /// The contained buckets. + inner: T, /// Start index. /// /// - Slice index indicates bucket. @@ -74,9 +75,13 @@ pub struct BucketsView<'a> { end: Index, } -impl<'a> BucketsView<'a> { +impl BucketsView +where + T: AsRef<[Bucket]>, +{ /// Creates a new buckets view containing all data from the slice. - pub fn new(buckets: &'a [Bucket]) -> Self { + pub fn new(buckets: T) -> Self { + let len = buckets.as_ref().len(); Self { inner: buckets, start: Index { @@ -84,7 +89,7 @@ impl<'a> BucketsView<'a> { bucket: 0, }, end: Index { - slice: buckets.len(), + slice: len, bucket: 0, }, } @@ -104,39 +109,66 @@ impl<'a> BucketsView<'a> { self.len() == 0 } + /// Returns the same bucket view as a bucket view over a slice. + pub fn as_slice(&self) -> BucketsView<&[Bucket]> { + BucketsView { + inner: self.inner.as_ref(), + start: self.start, + end: self.end, + } + } + /// Iterator over all buckets in the view. - pub fn iter(&self) -> BucketsViewIter<'a> { - BucketsViewIter::new(self.inner, self.start, self.end) + pub fn iter(&self) -> BucketsViewIter<'_> { + BucketsViewIter::new(self.inner.as_ref(), self.start, self.end) } /// Iterator which slices the source view into segments with an approximate size of `size_in_bytes`. - pub fn by_size(&self, size_in_bytes: usize) -> BucketsViewBySizeIter<'a> { + pub fn by_size(self, size_in_bytes: usize) -> BucketsViewBySizeIter { BucketsViewBySizeIter::new(self.inner, self.start, self.end, size_in_bytes) } } -impl<'a> fmt::Debug for BucketsView<'a> { +impl<'a> From<&'a [Bucket]> for BucketsView<&'a [Bucket]> { + fn from(value: &'a [Bucket]) -> Self { + Self::new(value) + } +} + +impl<'a> From<&'a Vec> for BucketsView<&'a [Bucket]> { + fn from(value: &'a Vec) -> Self { + Self::new(value.as_slice()) + } +} + +impl fmt::Debug for BucketsView +where + T: AsRef<[Bucket]>, +{ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let contents = self.iter().collect::>(); f.debug_tuple("BucketsView").field(&contents).finish() } } -impl<'a> IntoIterator for BucketsView<'a> { +impl<'a> IntoIterator for BucketsView<&'a [Bucket]> { type Item = BucketView<'a>; type IntoIter = BucketsViewIter<'a>; fn into_iter(self) -> Self::IntoIter { - self.iter() + BucketsViewIter::new(self.inner, self.start, self.end) } } -impl<'a> IntoIterator for &'_ BucketsView<'a> { +impl<'a, T> IntoIterator for &'a BucketsView +where + T: AsRef<[Bucket]>, +{ type Item = BucketView<'a>; type IntoIter = BucketsViewIter<'a>; fn into_iter(self) -> Self::IntoIter { - self.iter() + BucketsViewIter::new(self.inner.as_ref(), self.start, self.end) } } @@ -212,9 +244,9 @@ impl<'a> Iterator for BucketsViewIter<'a> { /// Iterator slicing a [`BucketsView`] into smaller views constrained by a given size in bytes. /// // See [`estimate_size`] for how the size of a bucket is calculated. -pub struct BucketsViewBySizeIter<'a> { +pub struct BucketsViewBySizeIter { /// Source slice. - inner: &'a [Bucket], + inner: T, /// Current position in the slice. current: Index, /// Terminal position. @@ -223,11 +255,11 @@ pub struct BucketsViewBySizeIter<'a> { max_size_bytes: usize, } -impl<'a> BucketsViewBySizeIter<'a> { +impl BucketsViewBySizeIter { /// Creates a new iterator. /// /// Start and end must be valid indices or iterator may end early. - fn new(inner: &'a [Bucket], start: Index, end: Index, max_size_bytes: usize) -> Self { + fn new(inner: T, start: Index, end: Index, max_size_bytes: usize) -> Self { Self { inner, end, @@ -237,8 +269,12 @@ impl<'a> BucketsViewBySizeIter<'a> { } } -impl<'a> Iterator for BucketsViewBySizeIter<'a> { - type Item = BucketsView<'a>; +impl Iterator for BucketsViewBySizeIter +where + T: AsRef<[Bucket]>, + T: Clone, +{ + type Item = BucketsView; fn next(&mut self) -> Option { let start = self.current; @@ -252,14 +288,15 @@ impl<'a> Iterator for BucketsViewBySizeIter<'a> { break; } + let inner = self.inner.as_ref(); // Select next potential bucket, // this should never overflow because `end` will never go past the slice and // we just validated that current is constrained by end. debug_assert!( - self.current.slice < self.inner.len(), + self.current.slice < inner.len(), "invariant violated, iterator pointing past the slice" ); - let bucket = self.inner.get(self.current.slice)?; + let bucket = inner.get(self.current.slice)?; // Selection should never fail, because either we select the entire range, // or we previously already split the bucket, which means this range is good. @@ -303,14 +340,17 @@ impl<'a> Iterator for BucketsViewBySizeIter<'a> { // Current is the current for the next batch now, // which means, current is the end for this batch. Some(BucketsView { - inner: self.inner, + inner: self.inner.clone(), start, end: self.current, }) } } -impl<'a> Serialize for BucketsView<'a> { +impl Serialize for BucketsView +where + T: AsRef<[Bucket]>, +{ fn serialize(&self, serializer: S) -> Result where S: serde::Serializer, @@ -720,6 +760,8 @@ fn split_at(bucket: &BucketView<'_>, max_size: usize, min_split_size: usize) -> #[cfg(test)] mod tests { + use std::sync::Arc; + use insta::assert_json_snapshot; use super::*; @@ -833,10 +875,19 @@ mod tests { assert!(BucketView::new(&bucket).select(5..6).is_none()); } + fn buckets(s: &[u8]) -> T + where + T: FromIterator, + { + let timestamp = UnixTimestamp::from_secs(5000); + Bucket::parse_all(s, timestamp) + .collect::>() + .unwrap() + } + #[test] fn test_buckets_view_empty() { - let buckets = Vec::new(); - let view = BucketsView::new(&buckets); + let view = BucketsView::new(Vec::new()); assert_eq!(view.len(), 0); assert!(view.is_empty()); let partials = view.iter().collect::>(); @@ -845,18 +896,9 @@ mod tests { #[test] fn test_buckets_view_iter_full() { - let b = br#" -b0:1|c -b1:12|c -b2:1:2:3:5:5|d -b3:42:75|s"#; - - let timestamp = UnixTimestamp::from_secs(5000); - let buckets = Bucket::parse_all(b, timestamp) - .collect::, _>>() - .unwrap(); + let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s"); - let view = BucketsView::new(&buckets); + let view = BucketsView::from(&buckets); assert_eq!(view.len(), 4); assert!(!view.is_empty()); let partials = view.iter().collect::>(); @@ -873,16 +915,7 @@ b3:42:75|s"#; #[test] fn test_buckets_view_iter_partial_end() { - let b = br#" -b0:1|c -b1:12|c -b2:1:2:3:5:5|d -b3:42:75|s"#; - - let timestamp = UnixTimestamp::from_secs(5000); - let buckets = Bucket::parse_all(b, timestamp) - .collect::, _>>() - .unwrap(); + let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s"); let mut view = BucketsView::new(&buckets); view.end.slice = 2; @@ -902,18 +935,9 @@ b3:42:75|s"#; #[test] fn test_buckets_view_iter_partial_start() { - let b = br#" -b0:1|c -b1:12|c -b2:1:2:3:5:5|d -b3:42:75|s"#; - - let timestamp = UnixTimestamp::from_secs(5000); - let buckets = Bucket::parse_all(b, timestamp) - .collect::, _>>() - .unwrap(); + let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s"); - let mut view = BucketsView::new(&buckets); + let mut view = BucketsView::new(buckets); view.start.slice = 2; view.start.bucket = 3; assert_eq!(view.len(), 2); @@ -929,18 +953,9 @@ b3:42:75|s"#; #[test] fn test_buckets_view_iter_partial_start_and_end() { - let b = br#" -b0:1|c -b1:12|c -b2:1:2:3:5:5|d -b3:42:75|s"#; - - let timestamp = UnixTimestamp::from_secs(5000); - let buckets = Bucket::parse_all(b, timestamp) - .collect::, _>>() - .unwrap(); + let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s"); - let mut view = BucketsView::new(&buckets); + let mut view = BucketsView::from(&buckets); view.start.slice = 2; view.start.bucket = 1; view.end.slice = 3; @@ -958,18 +973,26 @@ b3:42:75|s"#; #[test] fn test_buckets_view_by_size_small() { - let b = br#" -b0:1|c -b1:12|c -b2:1:2:3:5:5|d -b3:42:75|s"#; + let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s"); - let timestamp = UnixTimestamp::from_secs(5000); - let buckets = Bucket::parse_all(b, timestamp) - .collect::, _>>() - .unwrap(); + let view = BucketsView::from(&buckets); + let partials = view + .by_size(100) + .map(|bv| { + let len: usize = bv.iter().map(|b| b.len()).sum(); + let size: usize = bv.iter().map(|b| b.estimated_size()).sum(); + (len, size) + }) + .collect::>(); + + assert_eq!(partials, vec![(1, 74), (1, 74), (4, 98), (1, 74), (2, 82),]); + } + + #[test] + fn test_buckets_view_by_size_small_as_arc() { + let buckets: Arc<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s"); - let view = BucketsView::new(&buckets); + let view = BucketsView::new(buckets); let partials = view .by_size(100) .map(|bv| { @@ -984,18 +1007,9 @@ b3:42:75|s"#; #[test] fn test_buckets_view_by_size_one_split() { - let b = br#" -b0:1|c -b1:12|c -b2:1:2:3:5:5|d -b3:42:75|s"#; - - let timestamp = UnixTimestamp::from_secs(5000); - let buckets = Bucket::parse_all(b, timestamp) - .collect::, _>>() - .unwrap(); + let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s"); - let view = BucketsView::new(&buckets); + let view = BucketsView::from(&buckets); let partials = view .by_size(250) .map(|bv| { @@ -1010,18 +1024,9 @@ b3:42:75|s"#; #[test] fn test_buckets_view_by_size_no_split() { - let b = br#" -b0:1|c -b1:12|c -b2:1:2:3:5:5|d -b3:42:75|s"#; + let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s"); - let timestamp = UnixTimestamp::from_secs(5000); - let buckets = Bucket::parse_all(b, timestamp) - .collect::, _>>() - .unwrap(); - - let view = BucketsView::new(&buckets); + let view = BucketsView::from(&buckets); let partials = view .by_size(500) .map(|bv| { @@ -1036,18 +1041,9 @@ b3:42:75|s"#; #[test] fn test_buckets_view_by_size_no_too_small_no_bucket_fits() { - let b = br#" -b0:1|c -b1:12|c -b2:1:2:3:5:5|d -b3:42:75|s"#; - - let timestamp = UnixTimestamp::from_secs(5000); - let buckets = Bucket::parse_all(b, timestamp) - .collect::, _>>() - .unwrap(); + let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c\nb2:1:2:3:5:5|d\nb3:42:75|s"); - let view = BucketsView::new(&buckets); + let view = BucketsView::from(&buckets); let partials = view .by_size(50) // Too small, a bucket requires at least 74 bytes .count(); @@ -1057,15 +1053,9 @@ b3:42:75|s"#; #[test] fn test_buckets_view_by_size_do_not_split_gauge() { - let b = br#" -transactions/foo:25:17:42:220:85|g"#; + let buckets: Vec<_> = buckets(b"transactions/foo:25:17:42:220:85|g"); - let timestamp = UnixTimestamp::from_secs(5000); - let buckets = Bucket::parse_all(b, timestamp) - .collect::, _>>() - .unwrap(); - - let view = BucketsView::new(&buckets); + let view = BucketsView::from(&buckets); // 100 is too small to fit the gauge, but it is big enough to fit half a gauage, // make sure the gauge does not actually get split. let partials = view.by_size(100).count(); @@ -1075,38 +1065,21 @@ transactions/foo:25:17:42:220:85|g"#; #[test] fn test_buckets_view_serialize_full() { - let b = br#" -b0:1|c -b1:12|c|#foo,bar:baz -b2:1:2:3:5:5|d|#foo,bar:baz -b3:42:75|s -transactions/foo:25:17:42:220:85|g"#; - - let timestamp = UnixTimestamp::from_secs(5000); - let buckets = Bucket::parse_all(b, timestamp) - .collect::, _>>() - .unwrap(); + let buckets: Vec<_> = buckets(b"b0:1|c\nb1:12|c|#foo,bar:baz\nb2:1:2:3:5:5|d|#foo,bar:baz b3:42:75|s\ntransactions/foo:25:17:42:220:85|g"); assert_eq!( - serde_json::to_string(&BucketsView::new(&buckets)).unwrap(), + serde_json::to_string(&BucketsView::from(&buckets)).unwrap(), serde_json::to_string(&buckets).unwrap() ); } #[test] fn test_buckets_view_serialize_partial() { - let b = br#" -b1:12|c|#foo,bar:baz -b2:1:2:3:5:5|d|#foo,bar:baz -b3:42:75|s -b4:25:17:42:220:85|g"#; - - let timestamp = UnixTimestamp::from_secs(5000); - let buckets = Bucket::parse_all(b, timestamp) - .collect::, _>>() - .unwrap(); + let buckets: Arc<[_]> = buckets( + b"b1:12|c|#foo,bar:baz\nb2:1:2:3:5:5|d|#foo,bar:baz\nb3:42:75|s\nb4:25:17:42:220:85|g", + ); - let view = BucketsView::new(&buckets); + let view = BucketsView::new(buckets); // This creates 4 separate views, spanning 1-2, 2-3, 3, 4. // 4 is too big to fit into a view together with the remainder of 3. let partials = view.by_size(178).collect::>(); diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 2e50312209..816bca00da 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -2217,7 +2217,7 @@ impl EnvelopeProcessorService { rate_limiter: &RedisRateLimiter, ) -> Vec { let batch_size = self.inner.config.metrics_max_batch_size_bytes(); - let batched_bucket_iter = BucketsView::new(&buckets).by_size(batch_size).flatten(); + let batched_bucket_iter = BucketsView::from(&buckets).by_size(batch_size).flatten(); let quantities = utils::extract_metric_quantities(batched_bucket_iter, mode); // Check with redis if the throughput limit has been exceeded, while also updating