Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(metrics): Use a priority queue for metrics aggregator #3845

Merged
merged 1 commit into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ pest = "2.1.3"
pest_derive = "2.1.0"
pin-project-lite = "0.2.12"
pretty-hex = "0.3.0"
priority-queue = "2.0.3"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting crate, might actually be a good replacement for my homegrown priority map. But do we need it for the aggregator use case? IIUC the std binary heap should be good enough?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need keyed access and priority separately. Keyed for the aggregation key, priority for the flush time.

proc-macro2 = "1.0.8"
psl = "2.1.33"
quote = "1.0.2"
Expand Down
1 change: 1 addition & 0 deletions relay-metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ fnv = { workspace = true }
hash32 = { workspace = true }
hashbrown = { workspace = true }
itertools = { workspace = true }
priority-queue = { workspace = true }
relay-base-schema = { workspace = true }
relay-cardinality = { workspace = true }
relay-cogs = { workspace = true }
Expand Down
97 changes: 60 additions & 37 deletions relay-metrics/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::time::Duration;
use std::{fmt, mem};

use fnv::FnvHasher;
use priority_queue::PriorityQueue;
use relay_base_schema::project::ProjectKey;
use relay_common::time::UnixTimestamp;
use serde::{Deserialize, Serialize};
Expand All @@ -16,7 +17,7 @@ use tokio::time::Instant;
use crate::bucket::{Bucket, BucketValue};
use crate::protocol::{self, MetricNamespace, MetricResourceIdentifier};
use crate::statsd::{MetricCounters, MetricGauges, MetricHistograms, MetricSets, MetricTimers};
use crate::{BucketMetadata, MetricName};
use crate::{BucketMetadata, FiniteF64, MetricName};

use hashbrown::{hash_map::Entry, HashMap};

Expand Down Expand Up @@ -531,7 +532,7 @@ fn get_flush_time(
pub struct Aggregator {
name: String,
config: AggregatorConfig,
buckets: HashMap<BucketKey, QueuedBucket>,
buckets: PriorityQueue<BucketKey, QueuedBucket>,
cost_tracker: CostTracker,
reference_time: Instant,
}
Expand All @@ -547,7 +548,7 @@ impl Aggregator {
Self {
name,
config,
buckets: HashMap::new(),
buckets: Default::default(),
cost_tracker: CostTracker::default(),
reference_time: Instant::now(),
}
Expand Down Expand Up @@ -624,10 +625,12 @@ impl Aggregator {
let bucket_interval = self.config.bucket_interval;
let cost_tracker = &mut self.cost_tracker;

for (key, entry) in self
.buckets
.extract_if(|_, entry| force || entry.elapsed(now))
{
while let Some((_, entry)) = self.buckets.peek() {
if !entry.elapsed(now) && !force {
break;
}

let (key, entry) = self.buckets.pop().expect("pop after peek");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is awkward, right? For our own priority map I'm considering writing an interface that lets you do let value = queue.peek().remove() instead of queue.pop().

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it is, I proposed an API here.

What could also be nice is also something like in heed where you have extra functionality on the iterator.

cost_tracker.subtract_cost(key.project_key, key.cost());
cost_tracker.subtract_cost(key.project_key, entry.value.cost());

Expand All @@ -637,17 +640,19 @@ impl Aggregator {
*bucket_count += 1;
*item_count += entry.value.len();

let partition = self.config.flush_partitions.map(|p| key.partition_key(p));

let bucket = Bucket {
timestamp: key.timestamp,
width: bucket_interval,
name: key.metric_name.clone(),
name: key.metric_name,
value: entry.value,
tags: key.tags.clone(),
tags: key.tags,
metadata: entry.metadata,
};

partitions
.entry(self.config.flush_partitions.map(|p| key.partition_key(p)))
.entry(partition)
.or_insert_with(HashMap::new)
.entry(key.project_key)
.or_insert_with(Vec::new)
Expand Down Expand Up @@ -697,7 +702,7 @@ impl Aggregator {
pub fn merge(
&mut self,
project_key: ProjectKey,
bucket: Bucket,
mut bucket: Bucket,
max_total_bucket_bytes: Option<usize>,
) -> Result<(), AggregateMetricsError> {
let timestamp = self.get_bucket_timestamp(bucket.timestamp, bucket.width)?;
Expand Down Expand Up @@ -742,34 +747,48 @@ impl Aggregator {
self.config.max_project_key_bucket_bytes,
)?;

let added_cost;
match self.buckets.entry(key) {
Entry::Occupied(mut entry) => {
relay_statsd::metric!(
counter(MetricCounters::MergeHit) += 1,
aggregator = &self.name,
namespace = entry.key().namespace().as_str(),
);
let mut added_cost = 0;

added_cost = entry.get_mut().merge(bucket.value, bucket.metadata)?;
}
Entry::Vacant(entry) => {
relay_statsd::metric!(
counter(MetricCounters::MergeMiss) += 1,
aggregator = &self.name,
namespace = entry.key().namespace().as_str(),
);
relay_statsd::metric!(
set(MetricSets::UniqueBucketsCreated) = entry.key().hash64() as i64, // 2-complement
aggregator = &self.name,
namespace = entry.key().namespace().as_str(),
);
let mut error = None;
let updated = self.buckets.change_priority_by(&key, |value| {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we not use self.buckets.get_mut() here? IIUC, the closure is only called if the element exists, and we don't actually update any priorities.

Copy link
Member Author

@Dav1dde Dav1dde Jul 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_mut only returns the key as mutable, but we need a mutable value which also causes this unfortunate error handling.

It's kinda double unfortunate API wise, because we never change the priority, maybe worth investigating wrapping the value itself in a RefCell then we can use get() although I am not sure if that makes it better ...

relay_statsd::metric!(
counter(MetricCounters::MergeHit) += 1,
aggregator = &self.name,
namespace = key.namespace().as_str(),
);

let flush_at = get_flush_time(&self.config, self.reference_time, entry.key());
let value = bucket.value;
added_cost = entry.key().cost() + value.cost();
entry.insert(QueuedBucket::new(flush_at, value, bucket.metadata));
let bv = std::mem::replace(
&mut bucket.value,
BucketValue::Counter(FiniteF64::default()),
);
match value.merge(bv, bucket.metadata) {
Ok(ac) => added_cost = ac,
Err(err) => error = Some(err),
}
});

if let Some(error) = error {
return Err(error.into());
}

if !updated {
relay_statsd::metric!(
counter(MetricCounters::MergeMiss) += 1,
aggregator = &self.name,
namespace = key.namespace().as_str(),
);
relay_statsd::metric!(
set(MetricSets::UniqueBucketsCreated) = key.hash64() as i64, // 2-complement
aggregator = &self.name,
namespace = key.namespace().as_str(),
);

let flush_at = get_flush_time(&self.config, self.reference_time, &key);
let value = bucket.value;
added_cost = key.cost() + value.cost();

self.buckets
.push(key, QueuedBucket::new(flush_at, value, bucket.metadata));
}

self.cost_tracker.add_cost(project_key, added_cost);
Expand Down Expand Up @@ -1607,7 +1626,11 @@ mod tests {
.merge(project_key, bucket3.clone(), None)
.unwrap();

let buckets_metadata: Vec<_> = aggregator.buckets.values().map(|v| &v.metadata).collect();
let buckets_metadata: Vec<_> = aggregator
.buckets
.iter()
.map(|(_, v)| &v.metadata)
.collect();
insta::assert_debug_snapshot!(buckets_metadata, @r###"
[
BucketMetadata {
Expand Down
Loading