Skip to content

Commit

Permalink
ref(metrics): Make metrics aggregator health check sync
Browse files Browse the repository at this point in the history
  • Loading branch information
Dav1dde committed Aug 5, 2024
1 parent bcbfa7e commit dcfa67d
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 150 deletions.
86 changes: 19 additions & 67 deletions relay-metrics/src/aggregator.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Core functionality of metrics aggregation.

use std::collections::BTreeMap;
use std::error::Error;
use std::hash::{Hash, Hasher};
use std::time::Duration;
use std::{fmt, mem};
Expand All @@ -23,20 +22,7 @@ use hashbrown::HashMap;

/// Any error that may occur during aggregation.
#[derive(Debug, Error, PartialEq)]
#[error("failed to aggregate metrics: {kind}")]
pub struct AggregateMetricsError {
kind: AggregateMetricsErrorKind,
}

impl From<AggregateMetricsErrorKind> for AggregateMetricsError {
fn from(kind: AggregateMetricsErrorKind) -> Self {
AggregateMetricsError { kind }
}
}

#[derive(Debug, Error, PartialEq)]
#[allow(clippy::enum_variant_names)]
enum AggregateMetricsErrorKind {
pub enum AggregateMetricsError {
/// A metric bucket had invalid characters in the metric name.
#[error("found invalid characters: {0}")]
InvalidCharacters(MetricName),
Expand Down Expand Up @@ -336,12 +322,12 @@ impl QueuedBucket {
&mut self,
value: BucketValue,
metadata: BucketMetadata,
) -> Result<usize, AggregateMetricsErrorKind> {
) -> Result<usize, AggregateMetricsError> {
let cost_before = self.value.cost();

self.value
.merge(value)
.map_err(|_| AggregateMetricsErrorKind::InvalidTypes)?;
.map_err(|_| AggregateMetricsError::InvalidTypes)?;
self.metadata.merge(metadata);

Ok(self.value.cost().saturating_sub(cost_before))
Expand Down Expand Up @@ -398,7 +384,7 @@ impl CostTracker {
relay_log::configure_scope(|scope| {
scope.set_extra("bucket.project_key", project_key.as_str().to_owned().into());
});
return Err(AggregateMetricsErrorKind::TotalLimitExceeded.into());
return Err(AggregateMetricsError::TotalLimitExceeded);
}

if let Some(max_project_cost) = max_project_cost {
Expand All @@ -411,7 +397,7 @@ impl CostTracker {
relay_log::configure_scope(|scope| {
scope.set_extra("bucket.project_key", project_key.as_str().to_owned().into());
});
return Err(AggregateMetricsErrorKind::ProjectLimitExceeded.into());
return Err(AggregateMetricsError::ProjectLimitExceeded);
}
}

Expand Down Expand Up @@ -704,7 +690,7 @@ impl Aggregator {
aggregator = &self.name,
);

return Err(AggregateMetricsErrorKind::InvalidTimestamp(timestamp).into());
return Err(AggregateMetricsError::InvalidTimestamp(timestamp));
}

Ok(bucket_ts)
Expand Down Expand Up @@ -783,7 +769,7 @@ impl Aggregator {
});

if let Some(error) = error {
return Err(error.into());
return Err(error);
}

if !updated {
Expand Down Expand Up @@ -811,31 +797,6 @@ impl Aggregator {

Ok(())
}

/// Merges all given `buckets` into this aggregator.
///
/// Buckets that do not exist yet will be created.
pub fn merge_all(
&mut self,
project_key: ProjectKey,
buckets: impl IntoIterator<Item = Bucket>,
max_total_bucket_bytes: Option<usize>,
) {
for bucket in buckets.into_iter() {
if let Err(error) = self.merge(project_key, bucket, max_total_bucket_bytes) {
match &error.kind {
// Ignore invalid timestamp errors.
AggregateMetricsErrorKind::InvalidTimestamp(_) => {}
_other => {
relay_log::error!(
tags.aggregator = self.name,
bucket.error = &error as &dyn Error
);
}
}
}
}
}
}

impl fmt::Debug for Aggregator {
Expand Down Expand Up @@ -874,7 +835,7 @@ fn validate_metric_name(
aggregator_config.max_name_length,
key.metric_name
);
return Err(AggregateMetricsErrorKind::InvalidStringLength(key.metric_name).into());
return Err(AggregateMetricsError::InvalidStringLength(key.metric_name));
}

normalize_metric_name(&mut key)?;
Expand All @@ -887,16 +848,16 @@ fn normalize_metric_name(key: &mut BucketKey) -> Result<(), AggregateMetricsErro
Ok(mri) => {
if matches!(mri.namespace, MetricNamespace::Unsupported) {
relay_log::debug!("invalid metric namespace {:?}", &key.metric_name);
return Err(AggregateMetricsErrorKind::UnsupportedNamespace(mri.namespace).into());
return Err(AggregateMetricsError::UnsupportedNamespace(mri.namespace));
}

mri.to_string().into()
}
Err(_) => {
relay_log::debug!("invalid metric name {:?}", &key.metric_name);
return Err(
AggregateMetricsErrorKind::InvalidCharacters(key.metric_name.clone()).into(),
);
return Err(AggregateMetricsError::InvalidCharacters(
key.metric_name.clone(),
));
}
};

Expand Down Expand Up @@ -1406,9 +1367,8 @@ mod tests {
assert!(matches!(
aggregator
.get_bucket_timestamp(UnixTimestamp::from_secs(u64::MAX), 2)
.unwrap_err()
.kind,
AggregateMetricsErrorKind::InvalidTimestamp(_)
.unwrap_err(),
AggregateMetricsError::InvalidTimestamp(_)
));
}

Expand Down Expand Up @@ -1534,9 +1494,7 @@ mod tests {

assert!(matches!(
validation.unwrap_err(),
AggregateMetricsError {
kind: AggregateMetricsErrorKind::InvalidStringLength(_)
}
AggregateMetricsError::InvalidStringLength(_),
));

let short_metric_long_tag_key = BucketKey {
Expand Down Expand Up @@ -1598,11 +1556,8 @@ mod tests {
.unwrap();

assert_eq!(
aggregator
.merge(project_key, bucket, Some(1))
.unwrap_err()
.kind,
AggregateMetricsErrorKind::TotalLimitExceeded
aggregator.merge(project_key, bucket, Some(1)).unwrap_err(),
AggregateMetricsError::TotalLimitExceeded
);
}

Expand All @@ -1627,11 +1582,8 @@ mod tests {

aggregator.merge(project_key, bucket.clone(), None).unwrap();
assert_eq!(
aggregator
.merge(project_key, bucket, None)
.unwrap_err()
.kind,
AggregateMetricsErrorKind::ProjectLimitExceeded
aggregator.merge(project_key, bucket, None).unwrap_err(),
AggregateMetricsError::ProjectLimitExceeded
);
}

Expand Down
7 changes: 4 additions & 3 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,9 @@ impl ServiceState {
config.default_aggregator_config().clone(),
config.secondary_aggregator_configs().clone(),
Some(project_cache.clone().recipient()),
)
.start();
);
let aggregator_handle = aggregator.handle();
let aggregator = aggregator.start();

let metric_stats = MetricStats::new(
config.clone(),
Expand Down Expand Up @@ -268,7 +269,7 @@ impl ServiceState {
let health_check = HealthCheckService::new(
config.clone(),
MemoryChecker::new(memory_stat.clone(), config.clone()),
aggregator.clone(),
aggregator_handle,
upstream_relay.clone(),
project_cache.clone(),
)
Expand Down
11 changes: 4 additions & 7 deletions relay-server/src/services/health_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::future::Future;
use tokio::sync::watch;
use tokio::time::{timeout, Instant};

use crate::services::metrics::{AcceptsMetrics, Aggregator};
use crate::services::metrics::RouterHandle;
use crate::services::project_cache::{ProjectCache, SpoolHealth};
use crate::services::upstream::{IsAuthenticated, UpstreamRelay};
use crate::statsd::RelayTimers;
Expand Down Expand Up @@ -84,7 +84,7 @@ impl StatusUpdate {
pub struct HealthCheckService {
config: Arc<Config>,
memory_checker: MemoryChecker,
aggregator: Addr<Aggregator>,
aggregator: RouterHandle,
upstream_relay: Addr<UpstreamRelay>,
project_cache: Addr<ProjectCache>,
}
Expand All @@ -96,7 +96,7 @@ impl HealthCheckService {
pub fn new(
config: Arc<Config>,
memory_checker: MemoryChecker,
aggregator: Addr<Aggregator>,
aggregator: RouterHandle,
upstream_relay: Addr<UpstreamRelay>,
project_cache: Addr<ProjectCache>,
) -> Self {
Expand Down Expand Up @@ -147,10 +147,7 @@ impl HealthCheckService {
}

async fn aggregator_probe(&self) -> Status {
self.aggregator
.send(AcceptsMetrics)
.await
.map_or(Status::Unhealthy, Status::from)
Status::from(self.aggregator.can_accept_metrics())
}

async fn spool_health_probe(&self) -> Status {
Expand Down
Loading

0 comments on commit dcfa67d

Please sign in to comment.