Skip to content

Commit

Permalink
ref(metrics): Make metrics aggregator health check sync (#3894)
Browse files Browse the repository at this point in the history
Makes the health check sync, it occassionally times out due to metric
flushes taking longer than the threshold, this is generally not a
problem and small short backlogs in the aggregator are expected (due to
the flushes), the health checks shouldn't be failing because of it.

The health check itself is very useful though, we should not be ready if
the aggregator would reject metrics.
  • Loading branch information
Dav1dde committed Aug 5, 2024
1 parent bcbfa7e commit f35650e
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 151 deletions.
86 changes: 19 additions & 67 deletions relay-metrics/src/aggregator.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Core functionality of metrics aggregation.

use std::collections::BTreeMap;
use std::error::Error;
use std::hash::{Hash, Hasher};
use std::time::Duration;
use std::{fmt, mem};
Expand All @@ -23,20 +22,7 @@ use hashbrown::HashMap;

/// Any error that may occur during aggregation.
#[derive(Debug, Error, PartialEq)]
#[error("failed to aggregate metrics: {kind}")]
pub struct AggregateMetricsError {
kind: AggregateMetricsErrorKind,
}

impl From<AggregateMetricsErrorKind> for AggregateMetricsError {
fn from(kind: AggregateMetricsErrorKind) -> Self {
AggregateMetricsError { kind }
}
}

#[derive(Debug, Error, PartialEq)]
#[allow(clippy::enum_variant_names)]
enum AggregateMetricsErrorKind {
pub enum AggregateMetricsError {
/// A metric bucket had invalid characters in the metric name.
#[error("found invalid characters: {0}")]
InvalidCharacters(MetricName),
Expand Down Expand Up @@ -336,12 +322,12 @@ impl QueuedBucket {
&mut self,
value: BucketValue,
metadata: BucketMetadata,
) -> Result<usize, AggregateMetricsErrorKind> {
) -> Result<usize, AggregateMetricsError> {
let cost_before = self.value.cost();

self.value
.merge(value)
.map_err(|_| AggregateMetricsErrorKind::InvalidTypes)?;
.map_err(|_| AggregateMetricsError::InvalidTypes)?;
self.metadata.merge(metadata);

Ok(self.value.cost().saturating_sub(cost_before))
Expand Down Expand Up @@ -398,7 +384,7 @@ impl CostTracker {
relay_log::configure_scope(|scope| {
scope.set_extra("bucket.project_key", project_key.as_str().to_owned().into());
});
return Err(AggregateMetricsErrorKind::TotalLimitExceeded.into());
return Err(AggregateMetricsError::TotalLimitExceeded);
}

if let Some(max_project_cost) = max_project_cost {
Expand All @@ -411,7 +397,7 @@ impl CostTracker {
relay_log::configure_scope(|scope| {
scope.set_extra("bucket.project_key", project_key.as_str().to_owned().into());
});
return Err(AggregateMetricsErrorKind::ProjectLimitExceeded.into());
return Err(AggregateMetricsError::ProjectLimitExceeded);
}
}

Expand Down Expand Up @@ -704,7 +690,7 @@ impl Aggregator {
aggregator = &self.name,
);

return Err(AggregateMetricsErrorKind::InvalidTimestamp(timestamp).into());
return Err(AggregateMetricsError::InvalidTimestamp(timestamp));
}

Ok(bucket_ts)
Expand Down Expand Up @@ -783,7 +769,7 @@ impl Aggregator {
});

if let Some(error) = error {
return Err(error.into());
return Err(error);
}

if !updated {
Expand Down Expand Up @@ -811,31 +797,6 @@ impl Aggregator {

Ok(())
}

/// Merges all given `buckets` into this aggregator.
///
/// Buckets that do not exist yet will be created.
pub fn merge_all(
&mut self,
project_key: ProjectKey,
buckets: impl IntoIterator<Item = Bucket>,
max_total_bucket_bytes: Option<usize>,
) {
for bucket in buckets.into_iter() {
if let Err(error) = self.merge(project_key, bucket, max_total_bucket_bytes) {
match &error.kind {
// Ignore invalid timestamp errors.
AggregateMetricsErrorKind::InvalidTimestamp(_) => {}
_other => {
relay_log::error!(
tags.aggregator = self.name,
bucket.error = &error as &dyn Error
);
}
}
}
}
}
}

impl fmt::Debug for Aggregator {
Expand Down Expand Up @@ -874,7 +835,7 @@ fn validate_metric_name(
aggregator_config.max_name_length,
key.metric_name
);
return Err(AggregateMetricsErrorKind::InvalidStringLength(key.metric_name).into());
return Err(AggregateMetricsError::InvalidStringLength(key.metric_name));
}

normalize_metric_name(&mut key)?;
Expand All @@ -887,16 +848,16 @@ fn normalize_metric_name(key: &mut BucketKey) -> Result<(), AggregateMetricsErro
Ok(mri) => {
if matches!(mri.namespace, MetricNamespace::Unsupported) {
relay_log::debug!("invalid metric namespace {:?}", &key.metric_name);
return Err(AggregateMetricsErrorKind::UnsupportedNamespace(mri.namespace).into());
return Err(AggregateMetricsError::UnsupportedNamespace(mri.namespace));
}

mri.to_string().into()
}
Err(_) => {
relay_log::debug!("invalid metric name {:?}", &key.metric_name);
return Err(
AggregateMetricsErrorKind::InvalidCharacters(key.metric_name.clone()).into(),
);
return Err(AggregateMetricsError::InvalidCharacters(
key.metric_name.clone(),
));
}
};

Expand Down Expand Up @@ -1406,9 +1367,8 @@ mod tests {
assert!(matches!(
aggregator
.get_bucket_timestamp(UnixTimestamp::from_secs(u64::MAX), 2)
.unwrap_err()
.kind,
AggregateMetricsErrorKind::InvalidTimestamp(_)
.unwrap_err(),
AggregateMetricsError::InvalidTimestamp(_)
));
}

Expand Down Expand Up @@ -1534,9 +1494,7 @@ mod tests {

assert!(matches!(
validation.unwrap_err(),
AggregateMetricsError {
kind: AggregateMetricsErrorKind::InvalidStringLength(_)
}
AggregateMetricsError::InvalidStringLength(_),
));

let short_metric_long_tag_key = BucketKey {
Expand Down Expand Up @@ -1598,11 +1556,8 @@ mod tests {
.unwrap();

assert_eq!(
aggregator
.merge(project_key, bucket, Some(1))
.unwrap_err()
.kind,
AggregateMetricsErrorKind::TotalLimitExceeded
aggregator.merge(project_key, bucket, Some(1)).unwrap_err(),
AggregateMetricsError::TotalLimitExceeded
);
}

Expand All @@ -1627,11 +1582,8 @@ mod tests {

aggregator.merge(project_key, bucket.clone(), None).unwrap();
assert_eq!(
aggregator
.merge(project_key, bucket, None)
.unwrap_err()
.kind,
AggregateMetricsErrorKind::ProjectLimitExceeded
aggregator.merge(project_key, bucket, None).unwrap_err(),
AggregateMetricsError::ProjectLimitExceeded
);
}

Expand Down
7 changes: 4 additions & 3 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,9 @@ impl ServiceState {
config.default_aggregator_config().clone(),
config.secondary_aggregator_configs().clone(),
Some(project_cache.clone().recipient()),
)
.start();
);
let aggregator_handle = aggregator.handle();
let aggregator = aggregator.start();

let metric_stats = MetricStats::new(
config.clone(),
Expand Down Expand Up @@ -268,7 +269,7 @@ impl ServiceState {
let health_check = HealthCheckService::new(
config.clone(),
MemoryChecker::new(memory_stat.clone(), config.clone()),
aggregator.clone(),
aggregator_handle,
upstream_relay.clone(),
project_cache.clone(),
)
Expand Down
11 changes: 4 additions & 7 deletions relay-server/src/services/health_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::future::Future;
use tokio::sync::watch;
use tokio::time::{timeout, Instant};

use crate::services::metrics::{AcceptsMetrics, Aggregator};
use crate::services::metrics::RouterHandle;
use crate::services::project_cache::{ProjectCache, SpoolHealth};
use crate::services::upstream::{IsAuthenticated, UpstreamRelay};
use crate::statsd::RelayTimers;
Expand Down Expand Up @@ -84,7 +84,7 @@ impl StatusUpdate {
pub struct HealthCheckService {
config: Arc<Config>,
memory_checker: MemoryChecker,
aggregator: Addr<Aggregator>,
aggregator: RouterHandle,
upstream_relay: Addr<UpstreamRelay>,
project_cache: Addr<ProjectCache>,
}
Expand All @@ -96,7 +96,7 @@ impl HealthCheckService {
pub fn new(
config: Arc<Config>,
memory_checker: MemoryChecker,
aggregator: Addr<Aggregator>,
aggregator: RouterHandle,
upstream_relay: Addr<UpstreamRelay>,
project_cache: Addr<ProjectCache>,
) -> Self {
Expand Down Expand Up @@ -147,10 +147,7 @@ impl HealthCheckService {
}

async fn aggregator_probe(&self) -> Status {
self.aggregator
.send(AcceptsMetrics)
.await
.map_or(Status::Unhealthy, Status::from)
Status::from(self.aggregator.can_accept_metrics())
}

async fn spool_health_probe(&self) -> Status {
Expand Down
Loading

0 comments on commit f35650e

Please sign in to comment.