Skip to content

Commit

Permalink
feat(metrics): Add configurable flush interval for metrics aggregator (
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo committed Jun 12, 2024
1 parent a24dda8 commit 4700bd5
Showing 1 changed file with 10 additions and 4 deletions.
14 changes: 10 additions & 4 deletions relay-metrics/src/aggregatorservice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ use crate::aggregator::{self, AggregatorConfig, FlushBatching};
use crate::bucket::Bucket;
use crate::statsd::{MetricCounters, MetricHistograms, MetricTimers};

/// Interval for the flush cycle of the [`AggregatorService`].
const FLUSH_INTERVAL: Duration = Duration::from_millis(100);

/// Parameters used by the [`AggregatorService`].
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(default)]
Expand Down Expand Up @@ -111,6 +108,12 @@ pub struct AggregatorServiceConfig {
/// partition, effectively allowing all the elements of that partition to be flushed together.
#[serde(alias = "shift_key")]
pub flush_batching: FlushBatching,

/// The flushing interval in milliseconds that determines how often the aggregator is polled for
/// flushing new buckets.
///
/// Defaults to `100` milliseconds.
pub flush_interval_ms: u64,
}

impl Default for AggregatorServiceConfig {
Expand All @@ -129,6 +132,7 @@ impl Default for AggregatorServiceConfig {
max_flush_bytes: 5_000_000, // 5 MB
flush_partitions: None,
flush_batching: FlushBatching::Project,
flush_interval_ms: 100, // 100 milliseconds
}
}
}
Expand Down Expand Up @@ -245,6 +249,7 @@ pub struct AggregatorService {
state: AggregatorState,
receiver: Option<Recipient<FlushBuckets, NoResponse>>,
max_total_bucket_bytes: Option<usize>,
flush_interval_ms: u64,
}

impl AggregatorService {
Expand All @@ -270,6 +275,7 @@ impl AggregatorService {
state: AggregatorState::Running,
max_total_bucket_bytes: config.max_total_bucket_bytes,
aggregator: aggregator::Aggregator::named(name, AggregatorConfig::from(&config)),
flush_interval_ms: config.flush_interval_ms,
}
}

Expand Down Expand Up @@ -368,7 +374,7 @@ impl Service for AggregatorService {

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
tokio::spawn(async move {
let mut ticker = tokio::time::interval(FLUSH_INTERVAL);
let mut ticker = tokio::time::interval(Duration::from_millis(self.flush_interval_ms));
let mut shutdown = Controller::shutdown_handle();

// Note that currently this loop never exits and will run till the tokio runtime shuts
Expand Down

0 comments on commit 4700bd5

Please sign in to comment.