Skip to content

Commit

Permalink
ref(metrics): Use a priority queue for metrics aggregator
Browse files Browse the repository at this point in the history
  • Loading branch information
Dav1dde committed Jul 22, 2024
1 parent 4d5941a commit 977facc
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 39 deletions.
16 changes: 14 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ pest = "2.1.3"
pest_derive = "2.1.0"
pin-project-lite = "0.2.12"
pretty-hex = "0.3.0"
priority-queue = "2.0.3"
proc-macro2 = "1.0.8"
psl = "2.1.33"
quote = "1.0.2"
Expand Down
1 change: 1 addition & 0 deletions relay-metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ fnv = { workspace = true }
hash32 = { workspace = true }
hashbrown = { workspace = true }
itertools = { workspace = true }
priority-queue = { workspace = true }
relay-base-schema = { workspace = true }
relay-cardinality = { workspace = true }
relay-cogs = { workspace = true }
Expand Down
97 changes: 60 additions & 37 deletions relay-metrics/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::time::Duration;
use std::{fmt, mem};

use fnv::FnvHasher;
use priority_queue::PriorityQueue;
use relay_base_schema::project::ProjectKey;
use relay_common::time::UnixTimestamp;
use serde::{Deserialize, Serialize};
Expand All @@ -16,7 +17,7 @@ use tokio::time::Instant;
use crate::bucket::{Bucket, BucketValue};
use crate::protocol::{self, MetricNamespace, MetricResourceIdentifier};
use crate::statsd::{MetricCounters, MetricGauges, MetricHistograms, MetricSets, MetricTimers};
use crate::{BucketMetadata, MetricName};
use crate::{BucketMetadata, FiniteF64, MetricName};

use hashbrown::{hash_map::Entry, HashMap};

Expand Down Expand Up @@ -531,7 +532,7 @@ fn get_flush_time(
pub struct Aggregator {
name: String,
config: AggregatorConfig,
buckets: HashMap<BucketKey, QueuedBucket>,
buckets: PriorityQueue<BucketKey, QueuedBucket>,
cost_tracker: CostTracker,
reference_time: Instant,
}
Expand All @@ -547,7 +548,7 @@ impl Aggregator {
Self {
name,
config,
buckets: HashMap::new(),
buckets: Default::default(),
cost_tracker: CostTracker::default(),
reference_time: Instant::now(),
}
Expand Down Expand Up @@ -624,10 +625,12 @@ impl Aggregator {
let bucket_interval = self.config.bucket_interval;
let cost_tracker = &mut self.cost_tracker;

for (key, entry) in self
.buckets
.extract_if(|_, entry| force || entry.elapsed(now))
{
while let Some((_, entry)) = self.buckets.pop() {
if !entry.elapsed(now) && !force {
break;
}

let (key, entry) = self.buckets.pop().expect("pop after peek");
cost_tracker.subtract_cost(key.project_key, key.cost());
cost_tracker.subtract_cost(key.project_key, entry.value.cost());

Expand All @@ -637,17 +640,19 @@ impl Aggregator {
*bucket_count += 1;
*item_count += entry.value.len();

let partition = self.config.flush_partitions.map(|p| key.partition_key(p));

let bucket = Bucket {
timestamp: key.timestamp,
width: bucket_interval,
name: key.metric_name.clone(),
name: key.metric_name,
value: entry.value,
tags: key.tags.clone(),
tags: key.tags,
metadata: entry.metadata,
};

partitions
.entry(self.config.flush_partitions.map(|p| key.partition_key(p)))
.entry(partition)
.or_insert_with(HashMap::new)
.entry(key.project_key)
.or_insert_with(Vec::new)
Expand Down Expand Up @@ -697,7 +702,7 @@ impl Aggregator {
pub fn merge(
&mut self,
project_key: ProjectKey,
bucket: Bucket,
mut bucket: Bucket,
max_total_bucket_bytes: Option<usize>,
) -> Result<(), AggregateMetricsError> {
let timestamp = self.get_bucket_timestamp(bucket.timestamp, bucket.width)?;
Expand Down Expand Up @@ -742,34 +747,48 @@ impl Aggregator {
self.config.max_project_key_bucket_bytes,
)?;

let added_cost;
match self.buckets.entry(key) {
Entry::Occupied(mut entry) => {
relay_statsd::metric!(
counter(MetricCounters::MergeHit) += 1,
aggregator = &self.name,
namespace = entry.key().namespace().as_str(),
);
let mut added_cost = 0;

added_cost = entry.get_mut().merge(bucket.value, bucket.metadata)?;
}
Entry::Vacant(entry) => {
relay_statsd::metric!(
counter(MetricCounters::MergeMiss) += 1,
aggregator = &self.name,
namespace = entry.key().namespace().as_str(),
);
relay_statsd::metric!(
set(MetricSets::UniqueBucketsCreated) = entry.key().hash64() as i64, // 2-complement
aggregator = &self.name,
namespace = entry.key().namespace().as_str(),
);
let mut error = None;
let updated = self.buckets.change_priority_by(&key, |value| {
relay_statsd::metric!(
counter(MetricCounters::MergeHit) += 1,
aggregator = &self.name,
namespace = key.namespace().as_str(),
);

let flush_at = get_flush_time(&self.config, self.reference_time, entry.key());
let value = bucket.value;
added_cost = entry.key().cost() + value.cost();
entry.insert(QueuedBucket::new(flush_at, value, bucket.metadata));
let bv = std::mem::replace(
&mut bucket.value,
BucketValue::Counter(FiniteF64::default()),
);
match value.merge(bv, bucket.metadata) {
Ok(ac) => added_cost = ac,
Err(err) => error = Some(err),
}
});

if let Some(error) = error {
return Err(error.into());
}

if !updated {
relay_statsd::metric!(
counter(MetricCounters::MergeMiss) += 1,
aggregator = &self.name,
namespace = key.namespace().as_str(),
);
relay_statsd::metric!(
set(MetricSets::UniqueBucketsCreated) = key.hash64() as i64, // 2-complement
aggregator = &self.name,
namespace = key.namespace().as_str(),
);

let flush_at = get_flush_time(&self.config, self.reference_time, &key);
let value = bucket.value;
added_cost = key.cost() + value.cost();

self.buckets
.push(key, QueuedBucket::new(flush_at, value, bucket.metadata));
}

self.cost_tracker.add_cost(project_key, added_cost);
Expand Down Expand Up @@ -1528,7 +1547,11 @@ mod tests {
.merge(project_key, bucket3.clone(), None)
.unwrap();

let buckets_metadata: Vec<_> = aggregator.buckets.values().map(|v| &v.metadata).collect();
let buckets_metadata: Vec<_> = aggregator
.buckets
.iter()
.map(|(_, v)| &v.metadata)
.collect();
insta::assert_debug_snapshot!(buckets_metadata, @r###"
[
BucketMetadata {
Expand Down

0 comments on commit 977facc

Please sign in to comment.