Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(metrics): Make metrics aggregator health check sync #3895

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 19 additions & 67 deletions relay-metrics/src/aggregator.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Core functionality of metrics aggregation.

use std::collections::BTreeMap;
use std::error::Error;
use std::hash::{Hash, Hasher};
use std::time::Duration;
use std::{fmt, mem};
Expand All @@ -23,20 +22,7 @@ use hashbrown::HashMap;

/// Any error that may occur during aggregation.
#[derive(Debug, Error, PartialEq)]
#[error("failed to aggregate metrics: {kind}")]
pub struct AggregateMetricsError {
kind: AggregateMetricsErrorKind,
}

impl From<AggregateMetricsErrorKind> for AggregateMetricsError {
fn from(kind: AggregateMetricsErrorKind) -> Self {
AggregateMetricsError { kind }
}
}

#[derive(Debug, Error, PartialEq)]
#[allow(clippy::enum_variant_names)]
enum AggregateMetricsErrorKind {
pub enum AggregateMetricsError {
/// A metric bucket had invalid characters in the metric name.
#[error("found invalid characters: {0}")]
InvalidCharacters(MetricName),
Expand Down Expand Up @@ -336,12 +322,12 @@ impl QueuedBucket {
&mut self,
value: BucketValue,
metadata: BucketMetadata,
) -> Result<usize, AggregateMetricsErrorKind> {
) -> Result<usize, AggregateMetricsError> {
let cost_before = self.value.cost();

self.value
.merge(value)
.map_err(|_| AggregateMetricsErrorKind::InvalidTypes)?;
.map_err(|_| AggregateMetricsError::InvalidTypes)?;
self.metadata.merge(metadata);

Ok(self.value.cost().saturating_sub(cost_before))
Expand Down Expand Up @@ -398,7 +384,7 @@ impl CostTracker {
relay_log::configure_scope(|scope| {
scope.set_extra("bucket.project_key", project_key.as_str().to_owned().into());
});
return Err(AggregateMetricsErrorKind::TotalLimitExceeded.into());
return Err(AggregateMetricsError::TotalLimitExceeded);
}

if let Some(max_project_cost) = max_project_cost {
Expand All @@ -411,7 +397,7 @@ impl CostTracker {
relay_log::configure_scope(|scope| {
scope.set_extra("bucket.project_key", project_key.as_str().to_owned().into());
});
return Err(AggregateMetricsErrorKind::ProjectLimitExceeded.into());
return Err(AggregateMetricsError::ProjectLimitExceeded);
}
}

Expand Down Expand Up @@ -704,7 +690,7 @@ impl Aggregator {
aggregator = &self.name,
);

return Err(AggregateMetricsErrorKind::InvalidTimestamp(timestamp).into());
return Err(AggregateMetricsError::InvalidTimestamp(timestamp));
}

Ok(bucket_ts)
Expand Down Expand Up @@ -783,7 +769,7 @@ impl Aggregator {
});

if let Some(error) = error {
return Err(error.into());
return Err(error);
}

if !updated {
Expand Down Expand Up @@ -811,31 +797,6 @@ impl Aggregator {

Ok(())
}

/// Merges all given `buckets` into this aggregator.
///
/// Buckets that do not exist yet will be created.
pub fn merge_all(
&mut self,
project_key: ProjectKey,
buckets: impl IntoIterator<Item = Bucket>,
max_total_bucket_bytes: Option<usize>,
) {
for bucket in buckets.into_iter() {
if let Err(error) = self.merge(project_key, bucket, max_total_bucket_bytes) {
match &error.kind {
// Ignore invalid timestamp errors.
AggregateMetricsErrorKind::InvalidTimestamp(_) => {}
_other => {
relay_log::error!(
tags.aggregator = self.name,
bucket.error = &error as &dyn Error
);
}
}
}
}
}
}

impl fmt::Debug for Aggregator {
Expand Down Expand Up @@ -874,7 +835,7 @@ fn validate_metric_name(
aggregator_config.max_name_length,
key.metric_name
);
return Err(AggregateMetricsErrorKind::InvalidStringLength(key.metric_name).into());
return Err(AggregateMetricsError::InvalidStringLength(key.metric_name));
}

normalize_metric_name(&mut key)?;
Expand All @@ -887,16 +848,16 @@ fn normalize_metric_name(key: &mut BucketKey) -> Result<(), AggregateMetricsErro
Ok(mri) => {
if matches!(mri.namespace, MetricNamespace::Unsupported) {
relay_log::debug!("invalid metric namespace {:?}", &key.metric_name);
return Err(AggregateMetricsErrorKind::UnsupportedNamespace(mri.namespace).into());
return Err(AggregateMetricsError::UnsupportedNamespace(mri.namespace));
}

mri.to_string().into()
}
Err(_) => {
relay_log::debug!("invalid metric name {:?}", &key.metric_name);
return Err(
AggregateMetricsErrorKind::InvalidCharacters(key.metric_name.clone()).into(),
);
return Err(AggregateMetricsError::InvalidCharacters(
key.metric_name.clone(),
));
}
};

Expand Down Expand Up @@ -1406,9 +1367,8 @@ mod tests {
assert!(matches!(
aggregator
.get_bucket_timestamp(UnixTimestamp::from_secs(u64::MAX), 2)
.unwrap_err()
.kind,
AggregateMetricsErrorKind::InvalidTimestamp(_)
.unwrap_err(),
AggregateMetricsError::InvalidTimestamp(_)
));
}

Expand Down Expand Up @@ -1534,9 +1494,7 @@ mod tests {

assert!(matches!(
validation.unwrap_err(),
AggregateMetricsError {
kind: AggregateMetricsErrorKind::InvalidStringLength(_)
}
AggregateMetricsError::InvalidStringLength(_),
));

let short_metric_long_tag_key = BucketKey {
Expand Down Expand Up @@ -1598,11 +1556,8 @@ mod tests {
.unwrap();

assert_eq!(
aggregator
.merge(project_key, bucket, Some(1))
.unwrap_err()
.kind,
AggregateMetricsErrorKind::TotalLimitExceeded
aggregator.merge(project_key, bucket, Some(1)).unwrap_err(),
AggregateMetricsError::TotalLimitExceeded
);
}

Expand All @@ -1627,11 +1582,8 @@ mod tests {

aggregator.merge(project_key, bucket.clone(), None).unwrap();
assert_eq!(
aggregator
.merge(project_key, bucket, None)
.unwrap_err()
.kind,
AggregateMetricsErrorKind::ProjectLimitExceeded
aggregator.merge(project_key, bucket, None).unwrap_err(),
AggregateMetricsError::ProjectLimitExceeded
);
}

Expand Down
7 changes: 4 additions & 3 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,9 @@ impl ServiceState {
config.default_aggregator_config().clone(),
config.secondary_aggregator_configs().clone(),
Some(project_cache.clone().recipient()),
)
.start();
);
let aggregator_handle = aggregator.handle();
let aggregator = aggregator.start();

let metric_stats = MetricStats::new(
config.clone(),
Expand Down Expand Up @@ -268,7 +269,7 @@ impl ServiceState {
let health_check = HealthCheckService::new(
config.clone(),
MemoryChecker::new(memory_stat.clone(), config.clone()),
aggregator.clone(),
aggregator_handle,
upstream_relay.clone(),
project_cache.clone(),
)
Expand Down
11 changes: 4 additions & 7 deletions relay-server/src/services/health_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::future::Future;
use tokio::sync::watch;
use tokio::time::{timeout, Instant};

use crate::services::metrics::{AcceptsMetrics, Aggregator};
use crate::services::metrics::RouterHandle;
use crate::services::project_cache::{ProjectCache, SpoolHealth};
use crate::services::upstream::{IsAuthenticated, UpstreamRelay};
use crate::statsd::RelayTimers;
Expand Down Expand Up @@ -84,7 +84,7 @@ impl StatusUpdate {
pub struct HealthCheckService {
config: Arc<Config>,
memory_checker: MemoryChecker,
aggregator: Addr<Aggregator>,
aggregator: RouterHandle,
upstream_relay: Addr<UpstreamRelay>,
project_cache: Addr<ProjectCache>,
}
Expand All @@ -96,7 +96,7 @@ impl HealthCheckService {
pub fn new(
config: Arc<Config>,
memory_checker: MemoryChecker,
aggregator: Addr<Aggregator>,
aggregator: RouterHandle,
upstream_relay: Addr<UpstreamRelay>,
project_cache: Addr<ProjectCache>,
) -> Self {
Expand Down Expand Up @@ -147,10 +147,7 @@ impl HealthCheckService {
}

async fn aggregator_probe(&self) -> Status {
self.aggregator
.send(AcceptsMetrics)
.await
.map_or(Status::Unhealthy, Status::from)
Status::from(self.aggregator.can_accept_metrics())
}

async fn spool_health_probe(&self) -> Status {
Expand Down
Loading
Loading