Skip to content

Commit

Permalink
test(metrics): Add simple test for metrics aggregator flush
Browse files Browse the repository at this point in the history
  • Loading branch information
Dav1dde committed Jul 23, 2024
1 parent ab8210f commit 62f9470
Showing 1 changed file with 79 additions and 0 deletions.
79 changes: 79 additions & 0 deletions relay-metrics/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,7 @@ fn validate_metric_tags(mut key: BucketKey, aggregator_config: &AggregatorConfig

#[cfg(test)]
mod tests {
use insta::assert_debug_snapshot;
use similar_asserts::assert_eq;

use super::*;
Expand Down Expand Up @@ -1249,6 +1250,84 @@ mod tests {
assert_eq!(aggregator.cost_tracker.total_cost, 0);
}

#[tokio::test]
async fn test_aggregator_flush() {
// Make sure that the right cost is added / subtracted
let mut aggregator: Aggregator = Aggregator::new(AggregatorConfig {
bucket_interval: 10,
..test_config()
});
let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();

let now = UnixTimestamp::now();
tokio::time::pause();

for i in 0..3u32 {
for (name, offset) in [("foo", 30), ("bar", 15)] {
let timestamp = now + Duration::from_secs(60 * u64::from(i) + offset);
let bucket = Bucket {
timestamp,
width: 0,
name: format!("c:transactions/{name}_{i}@none").into(),
value: BucketValue::counter(i.into()),
tags: BTreeMap::new(),
metadata: BucketMetadata::new(timestamp),
};

aggregator.merge(project_key, bucket, None).unwrap();
}
}

let mut flush_buckets = || {
let mut result = Vec::new();
for (partition, v) in aggregator.pop_flush_buckets(false) {
assert!(partition.is_none());
for (pk, buckets) in v {
assert_eq!(pk, project_key);
for bucket in buckets {
result.push(bucket.name.to_string());
}
}
}
result.sort();
result
};

assert!(flush_buckets().is_empty());

tokio::time::advance(Duration::from_secs(60)).await;
assert_debug_snapshot!(flush_buckets(), @r###"
[
"c:transactions/bar_0@none",
"c:transactions/foo_0@none",
]
"###);
assert!(flush_buckets().is_empty());

tokio::time::advance(Duration::from_secs(60)).await;
assert_debug_snapshot!(flush_buckets(), @r###"
[
"c:transactions/bar_1@none",
"c:transactions/foo_1@none",
]
"###);
assert!(flush_buckets().is_empty());

tokio::time::advance(Duration::from_secs(60)).await;
assert_debug_snapshot!(flush_buckets(), @r###"
[
"c:transactions/bar_2@none",
"c:transactions/foo_2@none",
]
"###);
assert!(flush_buckets().is_empty());

tokio::time::advance(Duration::from_secs(3600)).await;
assert!(flush_buckets().is_empty());

assert!(aggregator.into_buckets().is_empty());
}

#[test]
fn test_get_bucket_timestamp_overflow() {
let config = AggregatorConfig {
Expand Down

0 comments on commit 62f9470

Please sign in to comment.