diff --git a/relay-metrics/src/aggregator.rs b/relay-metrics/src/aggregator.rs index 0719b868983..b0aab5fec4e 100644 --- a/relay-metrics/src/aggregator.rs +++ b/relay-metrics/src/aggregator.rs @@ -1,7 +1,6 @@ //! Core functionality of metrics aggregation. use std::collections::BTreeMap; -use std::error::Error; use std::hash::{Hash, Hasher}; use std::time::Duration; use std::{fmt, mem}; @@ -23,20 +22,7 @@ use hashbrown::HashMap; /// Any error that may occur during aggregation. #[derive(Debug, Error, PartialEq)] -#[error("failed to aggregate metrics: {kind}")] -pub struct AggregateMetricsError { - kind: AggregateMetricsErrorKind, -} - -impl From for AggregateMetricsError { - fn from(kind: AggregateMetricsErrorKind) -> Self { - AggregateMetricsError { kind } - } -} - -#[derive(Debug, Error, PartialEq)] -#[allow(clippy::enum_variant_names)] -enum AggregateMetricsErrorKind { +pub enum AggregateMetricsError { /// A metric bucket had invalid characters in the metric name. #[error("found invalid characters: {0}")] InvalidCharacters(MetricName), @@ -336,12 +322,12 @@ impl QueuedBucket { &mut self, value: BucketValue, metadata: BucketMetadata, - ) -> Result { + ) -> Result { let cost_before = self.value.cost(); self.value .merge(value) - .map_err(|_| AggregateMetricsErrorKind::InvalidTypes)?; + .map_err(|_| AggregateMetricsError::InvalidTypes)?; self.metadata.merge(metadata); Ok(self.value.cost().saturating_sub(cost_before)) @@ -398,7 +384,7 @@ impl CostTracker { relay_log::configure_scope(|scope| { scope.set_extra("bucket.project_key", project_key.as_str().to_owned().into()); }); - return Err(AggregateMetricsErrorKind::TotalLimitExceeded.into()); + return Err(AggregateMetricsError::TotalLimitExceeded); } if let Some(max_project_cost) = max_project_cost { @@ -411,7 +397,7 @@ impl CostTracker { relay_log::configure_scope(|scope| { scope.set_extra("bucket.project_key", project_key.as_str().to_owned().into()); }); - return Err(AggregateMetricsErrorKind::ProjectLimitExceeded.into()); + return Err(AggregateMetricsError::ProjectLimitExceeded); } } @@ -704,7 +690,7 @@ impl Aggregator { aggregator = &self.name, ); - return Err(AggregateMetricsErrorKind::InvalidTimestamp(timestamp).into()); + return Err(AggregateMetricsError::InvalidTimestamp(timestamp)); } Ok(bucket_ts) @@ -783,7 +769,7 @@ impl Aggregator { }); if let Some(error) = error { - return Err(error.into()); + return Err(error); } if !updated { @@ -811,31 +797,6 @@ impl Aggregator { Ok(()) } - - /// Merges all given `buckets` into this aggregator. - /// - /// Buckets that do not exist yet will be created. - pub fn merge_all( - &mut self, - project_key: ProjectKey, - buckets: impl IntoIterator, - max_total_bucket_bytes: Option, - ) { - for bucket in buckets.into_iter() { - if let Err(error) = self.merge(project_key, bucket, max_total_bucket_bytes) { - match &error.kind { - // Ignore invalid timestamp errors. - AggregateMetricsErrorKind::InvalidTimestamp(_) => {} - _other => { - relay_log::error!( - tags.aggregator = self.name, - bucket.error = &error as &dyn Error - ); - } - } - } - } - } } impl fmt::Debug for Aggregator { @@ -874,7 +835,7 @@ fn validate_metric_name( aggregator_config.max_name_length, key.metric_name ); - return Err(AggregateMetricsErrorKind::InvalidStringLength(key.metric_name).into()); + return Err(AggregateMetricsError::InvalidStringLength(key.metric_name)); } normalize_metric_name(&mut key)?; @@ -887,16 +848,16 @@ fn normalize_metric_name(key: &mut BucketKey) -> Result<(), AggregateMetricsErro Ok(mri) => { if matches!(mri.namespace, MetricNamespace::Unsupported) { relay_log::debug!("invalid metric namespace {:?}", &key.metric_name); - return Err(AggregateMetricsErrorKind::UnsupportedNamespace(mri.namespace).into()); + return Err(AggregateMetricsError::UnsupportedNamespace(mri.namespace)); } mri.to_string().into() } Err(_) => { relay_log::debug!("invalid metric name {:?}", &key.metric_name); - return Err( - AggregateMetricsErrorKind::InvalidCharacters(key.metric_name.clone()).into(), - ); + return Err(AggregateMetricsError::InvalidCharacters( + key.metric_name.clone(), + )); } }; @@ -1406,9 +1367,8 @@ mod tests { assert!(matches!( aggregator .get_bucket_timestamp(UnixTimestamp::from_secs(u64::MAX), 2) - .unwrap_err() - .kind, - AggregateMetricsErrorKind::InvalidTimestamp(_) + .unwrap_err(), + AggregateMetricsError::InvalidTimestamp(_) )); } @@ -1534,9 +1494,7 @@ mod tests { assert!(matches!( validation.unwrap_err(), - AggregateMetricsError { - kind: AggregateMetricsErrorKind::InvalidStringLength(_) - } + AggregateMetricsError::InvalidStringLength(_), )); let short_metric_long_tag_key = BucketKey { @@ -1598,11 +1556,8 @@ mod tests { .unwrap(); assert_eq!( - aggregator - .merge(project_key, bucket, Some(1)) - .unwrap_err() - .kind, - AggregateMetricsErrorKind::TotalLimitExceeded + aggregator.merge(project_key, bucket, Some(1)).unwrap_err(), + AggregateMetricsError::TotalLimitExceeded ); } @@ -1627,11 +1582,8 @@ mod tests { aggregator.merge(project_key, bucket.clone(), None).unwrap(); assert_eq!( - aggregator - .merge(project_key, bucket, None) - .unwrap_err() - .kind, - AggregateMetricsErrorKind::ProjectLimitExceeded + aggregator.merge(project_key, bucket, None).unwrap_err(), + AggregateMetricsError::ProjectLimitExceeded ); } diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 571ab646e8d..0770449bd39 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -190,8 +190,9 @@ impl ServiceState { config.default_aggregator_config().clone(), config.secondary_aggregator_configs().clone(), Some(project_cache.clone().recipient()), - ) - .start(); + ); + let aggregator_handle = aggregator.handle(); + let aggregator = aggregator.start(); let metric_stats = MetricStats::new( config.clone(), @@ -268,7 +269,7 @@ impl ServiceState { let health_check = HealthCheckService::new( config.clone(), MemoryChecker::new(memory_stat.clone(), config.clone()), - aggregator.clone(), + aggregator_handle, upstream_relay.clone(), project_cache.clone(), ) diff --git a/relay-server/src/services/health_check.rs b/relay-server/src/services/health_check.rs index a32750297c9..2899fc14b39 100644 --- a/relay-server/src/services/health_check.rs +++ b/relay-server/src/services/health_check.rs @@ -6,7 +6,7 @@ use std::future::Future; use tokio::sync::watch; use tokio::time::{timeout, Instant}; -use crate::services::metrics::{AcceptsMetrics, Aggregator}; +use crate::services::metrics::RouterHandle; use crate::services::project_cache::{ProjectCache, SpoolHealth}; use crate::services::upstream::{IsAuthenticated, UpstreamRelay}; use crate::statsd::RelayTimers; @@ -84,7 +84,7 @@ impl StatusUpdate { pub struct HealthCheckService { config: Arc, memory_checker: MemoryChecker, - aggregator: Addr, + aggregator: RouterHandle, upstream_relay: Addr, project_cache: Addr, } @@ -96,7 +96,7 @@ impl HealthCheckService { pub fn new( config: Arc, memory_checker: MemoryChecker, - aggregator: Addr, + aggregator: RouterHandle, upstream_relay: Addr, project_cache: Addr, ) -> Self { @@ -147,10 +147,7 @@ impl HealthCheckService { } async fn aggregator_probe(&self) -> Status { - self.aggregator - .send(AcceptsMetrics) - .await - .map_or(Status::Unhealthy, Status::from) + Status::from(self.aggregator.can_accept_metrics()) } async fn spool_health_probe(&self) -> Status { diff --git a/relay-server/src/services/metrics/aggregator.rs b/relay-server/src/services/metrics/aggregator.rs index 6d916d73f0b..6cff6ce9f9c 100644 --- a/relay-server/src/services/metrics/aggregator.rs +++ b/relay-server/src/services/metrics/aggregator.rs @@ -1,8 +1,11 @@ +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; use std::time::Duration; use hashbrown::HashMap; use relay_base_schema::project::ProjectKey; use relay_config::AggregatorServiceConfig; +use relay_metrics::aggregator::AggregateMetricsError; use relay_metrics::{aggregator, Bucket}; use relay_system::{ AsyncResponse, Controller, FromMessage, Interface, NoResponse, Recipient, Sender, Service, @@ -106,6 +109,7 @@ pub struct AggregatorService { receiver: Option>, max_total_bucket_bytes: Option, flush_interval_ms: u64, + can_accept_metrics: Arc, } impl AggregatorService { @@ -132,6 +136,13 @@ impl AggregatorService { max_total_bucket_bytes: config.max_total_bucket_bytes, aggregator: aggregator::Aggregator::named(name, config.aggregator), flush_interval_ms: config.flush_interval_ms, + can_accept_metrics: Arc::new(AtomicBool::new(true)), + } + } + + pub fn handle(&self) -> AggregatorHandle { + AggregatorHandle { + can_accept_metrics: Arc::clone(&self.can_accept_metrics), } } @@ -152,6 +163,13 @@ impl AggregatorService { let force_flush = matches!(&self.state, AggregatorState::ShuttingDown); let partitions = self.aggregator.pop_flush_buckets(force_flush); + self.can_accept_metrics.store( + !self + .aggregator + .totals_cost_exceeded(self.max_total_bucket_bytes), + Ordering::Relaxed, + ); + if partitions.is_empty() { return; } @@ -196,8 +214,41 @@ impl AggregatorService { project_key, buckets, } = msg; - self.aggregator - .merge_all(project_key, buckets, self.max_total_bucket_bytes); + + for bucket in buckets.into_iter() { + match self + .aggregator + .merge(project_key, bucket, self.max_total_bucket_bytes) + { + // Ignore invalid timestamp errors. + Err(AggregateMetricsError::InvalidTimestamp(_)) => {} + Err(AggregateMetricsError::TotalLimitExceeded) => { + relay_log::error!( + tags.aggregator = self.aggregator.name(), + "aggregator limit exceeded" + ); + self.can_accept_metrics.store(false, Ordering::Relaxed); + break; + } + Err(AggregateMetricsError::ProjectLimitExceeded) => { + relay_log::error!( + tags.aggregator = self.aggregator.name(), + tags.project_key = project_key.as_str(), + "project metrics limit exceeded for project {project_key}" + ); + break; + } + Err(error) => { + relay_log::error!( + tags.aggregator = self.aggregator.name(), + tags.project_key = project_key.as_str(), + bucket.error = &error as &dyn std::error::Error, + "failed to aggregate metric bucket" + ); + } + Ok(()) => {} + }; + } } fn handle_message(&mut self, message: Aggregator) { @@ -283,6 +334,19 @@ impl MergeBuckets { } } +/// Provides sync access to the state of the [`AggregatorService`]. +#[derive(Debug, Clone)] +pub struct AggregatorHandle { + can_accept_metrics: Arc, +} + +impl AggregatorHandle { + /// Returns `true` if the aggregator can still accept metrics. + pub fn can_accept_metrics(&self) -> bool { + self.can_accept_metrics.load(Ordering::Relaxed) + } +} + #[cfg(test)] mod tests { use std::collections::BTreeMap; diff --git a/relay-server/src/services/metrics/router.rs b/relay-server/src/services/metrics/router.rs index 6b7de838f32..172b5312bef 100644 --- a/relay-server/src/services/metrics/router.rs +++ b/relay-server/src/services/metrics/router.rs @@ -4,12 +4,13 @@ use futures::stream::FuturesUnordered; use futures::StreamExt; +use relay_config::aggregator::Condition; use relay_config::{AggregatorServiceConfig, ScopedAggregatorConfig}; use relay_metrics::MetricNamespace; use relay_system::{Addr, NoResponse, Recipient, Service}; use crate::services::metrics::{ - AcceptsMetrics, Aggregator, AggregatorService, FlushBuckets, MergeBuckets, + AcceptsMetrics, Aggregator, AggregatorHandle, AggregatorService, FlushBuckets, MergeBuckets, }; use crate::statsd::RelayTimers; use crate::utils; @@ -21,9 +22,8 @@ use crate::utils; /// [`Condition`](relay_config::aggregator::Condition). /// If no condition matches, the metric/bucket is routed to the `default_aggregator`. pub struct RouterService { - default_config: AggregatorServiceConfig, - secondary_configs: Vec, - receiver: Option>, + default: AggregatorService, + secondary: Vec<(AggregatorService, Condition)>, } impl RouterService { @@ -33,11 +33,24 @@ impl RouterService { secondary_configs: Vec, receiver: Option>, ) -> Self { - Self { - default_config, - secondary_configs, - receiver, + let mut secondary = Vec::new(); + + for c in secondary_configs { + let service = AggregatorService::named(c.name, c.config, receiver.clone()); + secondary.push((service, c.condition)); + } + + let default = AggregatorService::new(default_config, receiver); + Self { default, secondary } + } + + pub fn handle(&self) -> RouterHandle { + let mut handles = vec![self.default.handle()]; + for (aggregator, _) in &self.secondary { + handles.push(aggregator.handle()); } + + RouterHandle(handles) } } @@ -75,30 +88,22 @@ struct StartedRouter { impl StartedRouter { fn start(router: RouterService) -> Self { - let RouterService { - default_config, - secondary_configs, - receiver, - } = router; + let RouterService { default, secondary } = router; - let secondary = secondary_configs + let secondary = secondary .into_iter() - .map(|c| { - let addr = AggregatorService::named(c.name, c.config, receiver.clone()).start(); - (c.condition, addr) - }) - .map(|(cond, agg)| { + .map(|(aggregator, condition)| { let namespaces: Vec<_> = MetricNamespace::all() .into_iter() - .filter(|&namespace| cond.matches(Some(namespace))) + .filter(|&namespace| condition.matches(Some(namespace))) .collect(); - (agg, namespaces) + (aggregator.start(), namespaces) }) .collect(); Self { - default: AggregatorService::new(default_config, receiver).start(), + default: default.start(), secondary, } } @@ -160,3 +165,14 @@ impl StartedRouter { } } } + +/// Provides sync access to the state of the [`RouterService`]. +#[derive(Clone, Debug)] +pub struct RouterHandle(Vec); + +impl RouterHandle { + /// Returns `true` if all the aggregators can still accept metrics. + pub fn can_accept_metrics(&self) -> bool { + self.0.iter().all(|ah| ah.can_accept_metrics()) + } +}