diff --git a/relay-metrics/src/aggregator.rs b/relay-metrics/src/aggregator.rs index 47914771b8..6406405c57 100644 --- a/relay-metrics/src/aggregator.rs +++ b/relay-metrics/src/aggregator.rs @@ -897,6 +897,7 @@ fn validate_metric_tags(mut key: BucketKey, aggregator_config: &AggregatorConfig #[cfg(test)] mod tests { + use insta::assert_debug_snapshot; use similar_asserts::assert_eq; use super::*; @@ -1249,6 +1250,84 @@ mod tests { assert_eq!(aggregator.cost_tracker.total_cost, 0); } + #[tokio::test] + async fn test_aggregator_flush() { + // Make sure that the right cost is added / subtracted + let mut aggregator: Aggregator = Aggregator::new(AggregatorConfig { + bucket_interval: 10, + ..test_config() + }); + let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); + + let now = UnixTimestamp::now(); + tokio::time::pause(); + + for i in 0..3u32 { + for (name, offset) in [("foo", 30), ("bar", 15)] { + let timestamp = now + Duration::from_secs(60 * u64::from(i) + offset); + let bucket = Bucket { + timestamp, + width: 0, + name: format!("c:transactions/{name}_{i}@none").into(), + value: BucketValue::counter(i.into()), + tags: BTreeMap::new(), + metadata: BucketMetadata::new(timestamp), + }; + + aggregator.merge(project_key, bucket, None).unwrap(); + } + } + + let mut flush_buckets = || { + let mut result = Vec::new(); + for (partition, v) in aggregator.pop_flush_buckets(false) { + assert!(partition.is_none()); + for (pk, buckets) in v { + assert_eq!(pk, project_key); + for bucket in buckets { + result.push(bucket.name.to_string()); + } + } + } + result.sort(); + result + }; + + assert!(flush_buckets().is_empty()); + + tokio::time::advance(Duration::from_secs(60)).await; + assert_debug_snapshot!(flush_buckets(), @r###" + [ + "c:transactions/bar_0@none", + "c:transactions/foo_0@none", + ] + "###); + assert!(flush_buckets().is_empty()); + + tokio::time::advance(Duration::from_secs(60)).await; + assert_debug_snapshot!(flush_buckets(), @r###" + [ + "c:transactions/bar_1@none", + "c:transactions/foo_1@none", + ] + "###); + assert!(flush_buckets().is_empty()); + + tokio::time::advance(Duration::from_secs(60)).await; + assert_debug_snapshot!(flush_buckets(), @r###" + [ + "c:transactions/bar_2@none", + "c:transactions/foo_2@none", + ] + "###); + assert!(flush_buckets().is_empty()); + + tokio::time::advance(Duration::from_secs(3600)).await; + assert!(flush_buckets().is_empty()); + + assert!(aggregator.into_buckets().is_empty()); + } + #[test] fn test_get_bucket_timestamp_overflow() { let config = AggregatorConfig {