From 4700bd5ae989cfe68ea31a05f7dc53556577a46c Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Wed, 12 Jun 2024 16:04:44 +0200 Subject: [PATCH] feat(metrics): Add configurable flush interval for metrics aggregator (#3721) --- relay-metrics/src/aggregatorservice.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/relay-metrics/src/aggregatorservice.rs b/relay-metrics/src/aggregatorservice.rs index 6d95dbbe9d..f4b67d977e 100644 --- a/relay-metrics/src/aggregatorservice.rs +++ b/relay-metrics/src/aggregatorservice.rs @@ -12,9 +12,6 @@ use crate::aggregator::{self, AggregatorConfig, FlushBatching}; use crate::bucket::Bucket; use crate::statsd::{MetricCounters, MetricHistograms, MetricTimers}; -/// Interval for the flush cycle of the [`AggregatorService`]. -const FLUSH_INTERVAL: Duration = Duration::from_millis(100); - /// Parameters used by the [`AggregatorService`]. #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(default)] @@ -111,6 +108,12 @@ pub struct AggregatorServiceConfig { /// partition, effectively allowing all the elements of that partition to be flushed together. #[serde(alias = "shift_key")] pub flush_batching: FlushBatching, + + /// The flushing interval in milliseconds that determines how often the aggregator is polled for + /// flushing new buckets. + /// + /// Defaults to `100` milliseconds. + pub flush_interval_ms: u64, } impl Default for AggregatorServiceConfig { @@ -129,6 +132,7 @@ impl Default for AggregatorServiceConfig { max_flush_bytes: 5_000_000, // 5 MB flush_partitions: None, flush_batching: FlushBatching::Project, + flush_interval_ms: 100, // 100 milliseconds } } } @@ -245,6 +249,7 @@ pub struct AggregatorService { state: AggregatorState, receiver: Option>, max_total_bucket_bytes: Option, + flush_interval_ms: u64, } impl AggregatorService { @@ -270,6 +275,7 @@ impl AggregatorService { state: AggregatorState::Running, max_total_bucket_bytes: config.max_total_bucket_bytes, aggregator: aggregator::Aggregator::named(name, AggregatorConfig::from(&config)), + flush_interval_ms: config.flush_interval_ms, } } @@ -368,7 +374,7 @@ impl Service for AggregatorService { fn spawn_handler(mut self, mut rx: relay_system::Receiver) { tokio::spawn(async move { - let mut ticker = tokio::time::interval(FLUSH_INTERVAL); + let mut ticker = tokio::time::interval(Duration::from_millis(self.flush_interval_ms)); let mut shutdown = Controller::shutdown_handle(); // Note that currently this loop never exits and will run till the tokio runtime shuts