diff --git a/Cargo.lock b/Cargo.lock index 205363ea1c..3a5f7316af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1313,9 +1313,9 @@ dependencies = [ [[package]] name = "equivalent" -version = "1.0.0" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88bffebc5d80432c9b140ee17875ff173a8ab62faad5b257da912bd2f6c1c0a1" +checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "erased-serde" @@ -3204,6 +3204,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "priority-queue" +version = "2.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70c501afe3a2e25c9bd219aa56ec1e04cdb3fcdd763055be268778c13fa82c1f" +dependencies = [ + "autocfg", + "equivalent", + "indexmap 2.2.5", +] + [[package]] name = "proc-macro-crate" version = "1.3.0" @@ -3870,6 +3881,7 @@ dependencies = [ "hashbrown 0.14.3", "insta", "itertools", + "priority-queue", "rand", "relay-base-schema", "relay-cardinality", diff --git a/Cargo.toml b/Cargo.toml index 742e8ad4bb..572976825a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -123,6 +123,7 @@ pest = "2.1.3" pest_derive = "2.1.0" pin-project-lite = "0.2.12" pretty-hex = "0.3.0" +priority-queue = "2.0.3" proc-macro2 = "1.0.8" psl = "2.1.33" quote = "1.0.2" diff --git a/relay-metrics/Cargo.toml b/relay-metrics/Cargo.toml index 016f969deb..fd5881b277 100644 --- a/relay-metrics/Cargo.toml +++ b/relay-metrics/Cargo.toml @@ -22,6 +22,7 @@ fnv = { workspace = true } hash32 = { workspace = true } hashbrown = { workspace = true } itertools = { workspace = true } +priority-queue = { workspace = true } relay-base-schema = { workspace = true } relay-cardinality = { workspace = true } relay-cogs = { workspace = true } diff --git a/relay-metrics/src/aggregator.rs b/relay-metrics/src/aggregator.rs index 6406405c57..0b42044da5 100644 --- a/relay-metrics/src/aggregator.rs +++ b/relay-metrics/src/aggregator.rs @@ -7,6 +7,7 @@ use std::time::Duration; use std::{fmt, mem}; use fnv::FnvHasher; +use priority_queue::PriorityQueue; use relay_base_schema::project::ProjectKey; use relay_common::time::UnixTimestamp; use serde::{Deserialize, Serialize}; @@ -16,7 +17,7 @@ use tokio::time::Instant; use crate::bucket::{Bucket, BucketValue}; use crate::protocol::{self, MetricNamespace, MetricResourceIdentifier}; use crate::statsd::{MetricCounters, MetricGauges, MetricHistograms, MetricSets, MetricTimers}; -use crate::{BucketMetadata, MetricName}; +use crate::{BucketMetadata, FiniteF64, MetricName}; use hashbrown::{hash_map::Entry, HashMap}; @@ -531,7 +532,7 @@ fn get_flush_time( pub struct Aggregator { name: String, config: AggregatorConfig, - buckets: HashMap, + buckets: PriorityQueue, cost_tracker: CostTracker, reference_time: Instant, } @@ -547,7 +548,7 @@ impl Aggregator { Self { name, config, - buckets: HashMap::new(), + buckets: Default::default(), cost_tracker: CostTracker::default(), reference_time: Instant::now(), } @@ -624,10 +625,12 @@ impl Aggregator { let bucket_interval = self.config.bucket_interval; let cost_tracker = &mut self.cost_tracker; - for (key, entry) in self - .buckets - .extract_if(|_, entry| force || entry.elapsed(now)) - { + while let Some((_, entry)) = self.buckets.peek() { + if !entry.elapsed(now) && !force { + break; + } + + let (key, entry) = self.buckets.pop().expect("pop after peek"); cost_tracker.subtract_cost(key.project_key, key.cost()); cost_tracker.subtract_cost(key.project_key, entry.value.cost()); @@ -637,17 +640,19 @@ impl Aggregator { *bucket_count += 1; *item_count += entry.value.len(); + let partition = self.config.flush_partitions.map(|p| key.partition_key(p)); + let bucket = Bucket { timestamp: key.timestamp, width: bucket_interval, - name: key.metric_name.clone(), + name: key.metric_name, value: entry.value, - tags: key.tags.clone(), + tags: key.tags, metadata: entry.metadata, }; partitions - .entry(self.config.flush_partitions.map(|p| key.partition_key(p))) + .entry(partition) .or_insert_with(HashMap::new) .entry(key.project_key) .or_insert_with(Vec::new) @@ -697,7 +702,7 @@ impl Aggregator { pub fn merge( &mut self, project_key: ProjectKey, - bucket: Bucket, + mut bucket: Bucket, max_total_bucket_bytes: Option, ) -> Result<(), AggregateMetricsError> { let timestamp = self.get_bucket_timestamp(bucket.timestamp, bucket.width)?; @@ -742,34 +747,48 @@ impl Aggregator { self.config.max_project_key_bucket_bytes, )?; - let added_cost; - match self.buckets.entry(key) { - Entry::Occupied(mut entry) => { - relay_statsd::metric!( - counter(MetricCounters::MergeHit) += 1, - aggregator = &self.name, - namespace = entry.key().namespace().as_str(), - ); + let mut added_cost = 0; - added_cost = entry.get_mut().merge(bucket.value, bucket.metadata)?; - } - Entry::Vacant(entry) => { - relay_statsd::metric!( - counter(MetricCounters::MergeMiss) += 1, - aggregator = &self.name, - namespace = entry.key().namespace().as_str(), - ); - relay_statsd::metric!( - set(MetricSets::UniqueBucketsCreated) = entry.key().hash64() as i64, // 2-complement - aggregator = &self.name, - namespace = entry.key().namespace().as_str(), - ); + let mut error = None; + let updated = self.buckets.change_priority_by(&key, |value| { + relay_statsd::metric!( + counter(MetricCounters::MergeHit) += 1, + aggregator = &self.name, + namespace = key.namespace().as_str(), + ); - let flush_at = get_flush_time(&self.config, self.reference_time, entry.key()); - let value = bucket.value; - added_cost = entry.key().cost() + value.cost(); - entry.insert(QueuedBucket::new(flush_at, value, bucket.metadata)); + let bv = std::mem::replace( + &mut bucket.value, + BucketValue::Counter(FiniteF64::default()), + ); + match value.merge(bv, bucket.metadata) { + Ok(ac) => added_cost = ac, + Err(err) => error = Some(err), } + }); + + if let Some(error) = error { + return Err(error.into()); + } + + if !updated { + relay_statsd::metric!( + counter(MetricCounters::MergeMiss) += 1, + aggregator = &self.name, + namespace = key.namespace().as_str(), + ); + relay_statsd::metric!( + set(MetricSets::UniqueBucketsCreated) = key.hash64() as i64, // 2-complement + aggregator = &self.name, + namespace = key.namespace().as_str(), + ); + + let flush_at = get_flush_time(&self.config, self.reference_time, &key); + let value = bucket.value; + added_cost = key.cost() + value.cost(); + + self.buckets + .push(key, QueuedBucket::new(flush_at, value, bucket.metadata)); } self.cost_tracker.add_cost(project_key, added_cost); @@ -1607,7 +1626,11 @@ mod tests { .merge(project_key, bucket3.clone(), None) .unwrap(); - let buckets_metadata: Vec<_> = aggregator.buckets.values().map(|v| &v.metadata).collect(); + let buckets_metadata: Vec<_> = aggregator + .buckets + .iter() + .map(|(_, v)| &v.metadata) + .collect(); insta::assert_debug_snapshot!(buckets_metadata, @r###" [ BucketMetadata {