diff --git a/relay-metrics/src/aggregator.rs b/relay-metrics/src/aggregator.rs index 0719b86898..b0aab5fec4 100644 --- a/relay-metrics/src/aggregator.rs +++ b/relay-metrics/src/aggregator.rs @@ -1,7 +1,6 @@ //! Core functionality of metrics aggregation. use std::collections::BTreeMap; -use std::error::Error; use std::hash::{Hash, Hasher}; use std::time::Duration; use std::{fmt, mem}; @@ -23,20 +22,7 @@ use hashbrown::HashMap; /// Any error that may occur during aggregation. #[derive(Debug, Error, PartialEq)] -#[error("failed to aggregate metrics: {kind}")] -pub struct AggregateMetricsError { - kind: AggregateMetricsErrorKind, -} - -impl From for AggregateMetricsError { - fn from(kind: AggregateMetricsErrorKind) -> Self { - AggregateMetricsError { kind } - } -} - -#[derive(Debug, Error, PartialEq)] -#[allow(clippy::enum_variant_names)] -enum AggregateMetricsErrorKind { +pub enum AggregateMetricsError { /// A metric bucket had invalid characters in the metric name. #[error("found invalid characters: {0}")] InvalidCharacters(MetricName), @@ -336,12 +322,12 @@ impl QueuedBucket { &mut self, value: BucketValue, metadata: BucketMetadata, - ) -> Result { + ) -> Result { let cost_before = self.value.cost(); self.value .merge(value) - .map_err(|_| AggregateMetricsErrorKind::InvalidTypes)?; + .map_err(|_| AggregateMetricsError::InvalidTypes)?; self.metadata.merge(metadata); Ok(self.value.cost().saturating_sub(cost_before)) @@ -398,7 +384,7 @@ impl CostTracker { relay_log::configure_scope(|scope| { scope.set_extra("bucket.project_key", project_key.as_str().to_owned().into()); }); - return Err(AggregateMetricsErrorKind::TotalLimitExceeded.into()); + return Err(AggregateMetricsError::TotalLimitExceeded); } if let Some(max_project_cost) = max_project_cost { @@ -411,7 +397,7 @@ impl CostTracker { relay_log::configure_scope(|scope| { scope.set_extra("bucket.project_key", project_key.as_str().to_owned().into()); }); - return Err(AggregateMetricsErrorKind::ProjectLimitExceeded.into()); + return Err(AggregateMetricsError::ProjectLimitExceeded); } } @@ -704,7 +690,7 @@ impl Aggregator { aggregator = &self.name, ); - return Err(AggregateMetricsErrorKind::InvalidTimestamp(timestamp).into()); + return Err(AggregateMetricsError::InvalidTimestamp(timestamp)); } Ok(bucket_ts) @@ -783,7 +769,7 @@ impl Aggregator { }); if let Some(error) = error { - return Err(error.into()); + return Err(error); } if !updated { @@ -811,31 +797,6 @@ impl Aggregator { Ok(()) } - - /// Merges all given `buckets` into this aggregator. - /// - /// Buckets that do not exist yet will be created. - pub fn merge_all( - &mut self, - project_key: ProjectKey, - buckets: impl IntoIterator, - max_total_bucket_bytes: Option, - ) { - for bucket in buckets.into_iter() { - if let Err(error) = self.merge(project_key, bucket, max_total_bucket_bytes) { - match &error.kind { - // Ignore invalid timestamp errors. - AggregateMetricsErrorKind::InvalidTimestamp(_) => {} - _other => { - relay_log::error!( - tags.aggregator = self.name, - bucket.error = &error as &dyn Error - ); - } - } - } - } - } } impl fmt::Debug for Aggregator { @@ -874,7 +835,7 @@ fn validate_metric_name( aggregator_config.max_name_length, key.metric_name ); - return Err(AggregateMetricsErrorKind::InvalidStringLength(key.metric_name).into()); + return Err(AggregateMetricsError::InvalidStringLength(key.metric_name)); } normalize_metric_name(&mut key)?; @@ -887,16 +848,16 @@ fn normalize_metric_name(key: &mut BucketKey) -> Result<(), AggregateMetricsErro Ok(mri) => { if matches!(mri.namespace, MetricNamespace::Unsupported) { relay_log::debug!("invalid metric namespace {:?}", &key.metric_name); - return Err(AggregateMetricsErrorKind::UnsupportedNamespace(mri.namespace).into()); + return Err(AggregateMetricsError::UnsupportedNamespace(mri.namespace)); } mri.to_string().into() } Err(_) => { relay_log::debug!("invalid metric name {:?}", &key.metric_name); - return Err( - AggregateMetricsErrorKind::InvalidCharacters(key.metric_name.clone()).into(), - ); + return Err(AggregateMetricsError::InvalidCharacters( + key.metric_name.clone(), + )); } }; @@ -1406,9 +1367,8 @@ mod tests { assert!(matches!( aggregator .get_bucket_timestamp(UnixTimestamp::from_secs(u64::MAX), 2) - .unwrap_err() - .kind, - AggregateMetricsErrorKind::InvalidTimestamp(_) + .unwrap_err(), + AggregateMetricsError::InvalidTimestamp(_) )); } @@ -1534,9 +1494,7 @@ mod tests { assert!(matches!( validation.unwrap_err(), - AggregateMetricsError { - kind: AggregateMetricsErrorKind::InvalidStringLength(_) - } + AggregateMetricsError::InvalidStringLength(_), )); let short_metric_long_tag_key = BucketKey { @@ -1598,11 +1556,8 @@ mod tests { .unwrap(); assert_eq!( - aggregator - .merge(project_key, bucket, Some(1)) - .unwrap_err() - .kind, - AggregateMetricsErrorKind::TotalLimitExceeded + aggregator.merge(project_key, bucket, Some(1)).unwrap_err(), + AggregateMetricsError::TotalLimitExceeded ); } @@ -1627,11 +1582,8 @@ mod tests { aggregator.merge(project_key, bucket.clone(), None).unwrap(); assert_eq!( - aggregator - .merge(project_key, bucket, None) - .unwrap_err() - .kind, - AggregateMetricsErrorKind::ProjectLimitExceeded + aggregator.merge(project_key, bucket, None).unwrap_err(), + AggregateMetricsError::ProjectLimitExceeded ); } diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 571ab646e8..0770449bd3 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -190,8 +190,9 @@ impl ServiceState { config.default_aggregator_config().clone(), config.secondary_aggregator_configs().clone(), Some(project_cache.clone().recipient()), - ) - .start(); + ); + let aggregator_handle = aggregator.handle(); + let aggregator = aggregator.start(); let metric_stats = MetricStats::new( config.clone(), @@ -268,7 +269,7 @@ impl ServiceState { let health_check = HealthCheckService::new( config.clone(), MemoryChecker::new(memory_stat.clone(), config.clone()), - aggregator.clone(), + aggregator_handle, upstream_relay.clone(), project_cache.clone(), ) diff --git a/relay-server/src/services/health_check.rs b/relay-server/src/services/health_check.rs index a32750297c..2899fc14b3 100644 --- a/relay-server/src/services/health_check.rs +++ b/relay-server/src/services/health_check.rs @@ -6,7 +6,7 @@ use std::future::Future; use tokio::sync::watch; use tokio::time::{timeout, Instant}; -use crate::services::metrics::{AcceptsMetrics, Aggregator}; +use crate::services::metrics::RouterHandle; use crate::services::project_cache::{ProjectCache, SpoolHealth}; use crate::services::upstream::{IsAuthenticated, UpstreamRelay}; use crate::statsd::RelayTimers; @@ -84,7 +84,7 @@ impl StatusUpdate { pub struct HealthCheckService { config: Arc, memory_checker: MemoryChecker, - aggregator: Addr, + aggregator: RouterHandle, upstream_relay: Addr, project_cache: Addr, } @@ -96,7 +96,7 @@ impl HealthCheckService { pub fn new( config: Arc, memory_checker: MemoryChecker, - aggregator: Addr, + aggregator: RouterHandle, upstream_relay: Addr, project_cache: Addr, ) -> Self { @@ -147,10 +147,7 @@ impl HealthCheckService { } async fn aggregator_probe(&self) -> Status { - self.aggregator - .send(AcceptsMetrics) - .await - .map_or(Status::Unhealthy, Status::from) + Status::from(self.aggregator.can_accept_metrics()) } async fn spool_health_probe(&self) -> Status { diff --git a/relay-server/src/services/metrics/aggregator.rs b/relay-server/src/services/metrics/aggregator.rs index 6d916d73f0..71e3186f3f 100644 --- a/relay-server/src/services/metrics/aggregator.rs +++ b/relay-server/src/services/metrics/aggregator.rs @@ -1,13 +1,13 @@ +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; use std::time::Duration; use hashbrown::HashMap; use relay_base_schema::project::ProjectKey; use relay_config::AggregatorServiceConfig; +use relay_metrics::aggregator::AggregateMetricsError; use relay_metrics::{aggregator, Bucket}; -use relay_system::{ - AsyncResponse, Controller, FromMessage, Interface, NoResponse, Recipient, Sender, Service, - Shutdown, -}; +use relay_system::{Controller, FromMessage, Interface, NoResponse, Recipient, Service, Shutdown}; use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers}; @@ -23,21 +23,18 @@ use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers}; /// Receivers must implement a handler for the [`FlushBuckets`] message. #[derive(Debug)] pub enum Aggregator { - /// The health check message which makes sure that the service can accept the requests now. - AcceptsMetrics(AcceptsMetrics, Sender), /// Merge the buckets. MergeBuckets(MergeBuckets), /// Message is used only for tests to get the current number of buckets in `AggregatorService`. #[cfg(test)] - BucketCountInquiry(BucketCountInquiry, Sender), + BucketCountInquiry(BucketCountInquiry, relay_system::Sender), } impl Aggregator { /// Returns the name of the message variant. pub fn variant(&self) -> &'static str { match self { - Aggregator::AcceptsMetrics(_, _) => "AcceptsMetrics", Aggregator::MergeBuckets(_) => "MergeBuckets", #[cfg(test)] Aggregator::BucketCountInquiry(_, _) => "BucketCountInquiry", @@ -47,13 +44,6 @@ impl Aggregator { impl Interface for Aggregator {} -impl FromMessage for Aggregator { - type Response = AsyncResponse; - fn from_message(message: AcceptsMetrics, sender: Sender) -> Self { - Self::AcceptsMetrics(message, sender) - } -} - impl FromMessage for Aggregator { type Response = NoResponse; fn from_message(message: MergeBuckets, _: ()) -> Self { @@ -63,16 +53,12 @@ impl FromMessage for Aggregator { #[cfg(test)] impl FromMessage for Aggregator { - type Response = AsyncResponse; - fn from_message(message: BucketCountInquiry, sender: Sender) -> Self { + type Response = relay_system::AsyncResponse; + fn from_message(message: BucketCountInquiry, sender: relay_system::Sender) -> Self { Self::BucketCountInquiry(message, sender) } } -/// Check whether the aggregator has not (yet) exceeded its total limits. Used for health checks. -#[derive(Debug)] -pub struct AcceptsMetrics; - /// Used only for testing the `AggregatorService`. #[cfg(test)] #[derive(Debug)] @@ -106,6 +92,7 @@ pub struct AggregatorService { receiver: Option>, max_total_bucket_bytes: Option, flush_interval_ms: u64, + can_accept_metrics: Arc, } impl AggregatorService { @@ -132,15 +119,14 @@ impl AggregatorService { max_total_bucket_bytes: config.max_total_bucket_bytes, aggregator: aggregator::Aggregator::named(name, config.aggregator), flush_interval_ms: config.flush_interval_ms, + can_accept_metrics: Arc::new(AtomicBool::new(true)), } } - fn handle_accepts_metrics(&self, sender: Sender) { - let result = !self - .aggregator - .totals_cost_exceeded(self.max_total_bucket_bytes); - - sender.send(result); + pub fn handle(&self) -> AggregatorHandle { + AggregatorHandle { + can_accept_metrics: Arc::clone(&self.can_accept_metrics), + } } /// Sends the [`FlushBuckets`] message to the receiver in the fire and forget fashion. It is up @@ -152,6 +138,13 @@ impl AggregatorService { let force_flush = matches!(&self.state, AggregatorState::ShuttingDown); let partitions = self.aggregator.pop_flush_buckets(force_flush); + self.can_accept_metrics.store( + !self + .aggregator + .totals_cost_exceeded(self.max_total_bucket_bytes), + Ordering::Relaxed, + ); + if partitions.is_empty() { return; } @@ -196,8 +189,41 @@ impl AggregatorService { project_key, buckets, } = msg; - self.aggregator - .merge_all(project_key, buckets, self.max_total_bucket_bytes); + + for bucket in buckets.into_iter() { + match self + .aggregator + .merge(project_key, bucket, self.max_total_bucket_bytes) + { + // Ignore invalid timestamp errors. + Err(AggregateMetricsError::InvalidTimestamp(_)) => {} + Err(AggregateMetricsError::TotalLimitExceeded) => { + relay_log::error!( + tags.aggregator = self.aggregator.name(), + "aggregator limit exceeded" + ); + self.can_accept_metrics.store(false, Ordering::Relaxed); + break; + } + Err(AggregateMetricsError::ProjectLimitExceeded) => { + relay_log::error!( + tags.aggregator = self.aggregator.name(), + tags.project_key = project_key.as_str(), + "project metrics limit exceeded for project {project_key}" + ); + break; + } + Err(error) => { + relay_log::error!( + tags.aggregator = self.aggregator.name(), + tags.project_key = project_key.as_str(), + bucket.error = &error as &dyn std::error::Error, + "failed to aggregate metric bucket" + ); + } + Ok(()) => {} + }; + } } fn handle_message(&mut self, message: Aggregator) { @@ -207,7 +233,6 @@ impl AggregatorService { message = ty, { match message { - Aggregator::AcceptsMetrics(_, sender) => self.handle_accepts_metrics(sender), Aggregator::MergeBuckets(msg) => self.handle_merge_buckets(msg), #[cfg(test)] Aggregator::BucketCountInquiry(_, sender) => { @@ -283,6 +308,19 @@ impl MergeBuckets { } } +/// Provides sync access to the state of the [`AggregatorService`]. +#[derive(Debug, Clone)] +pub struct AggregatorHandle { + can_accept_metrics: Arc, +} + +impl AggregatorHandle { + /// Returns `true` if the aggregator can still accept metrics. + pub fn can_accept_metrics(&self) -> bool { + self.can_accept_metrics.load(Ordering::Relaxed) + } +} + #[cfg(test)] mod tests { use std::collections::BTreeMap; diff --git a/relay-server/src/services/metrics/router.rs b/relay-server/src/services/metrics/router.rs index 6b7de838f3..522eb729f1 100644 --- a/relay-server/src/services/metrics/router.rs +++ b/relay-server/src/services/metrics/router.rs @@ -1,15 +1,13 @@ //! Routing logic for metrics. Metrics from different namespaces may be routed to different aggregators, //! with their own limits, bucket intervals, etc. -use futures::stream::FuturesUnordered; -use futures::StreamExt; - +use relay_config::aggregator::Condition; use relay_config::{AggregatorServiceConfig, ScopedAggregatorConfig}; use relay_metrics::MetricNamespace; use relay_system::{Addr, NoResponse, Recipient, Service}; use crate::services::metrics::{ - AcceptsMetrics, Aggregator, AggregatorService, FlushBuckets, MergeBuckets, + Aggregator, AggregatorHandle, AggregatorService, FlushBuckets, MergeBuckets, }; use crate::statsd::RelayTimers; use crate::utils; @@ -17,13 +15,11 @@ use crate::utils; /// Service that routes metrics & metric buckets to the appropriate aggregator. /// /// Each aggregator gets its own configuration. -/// Metrics are routed to the first aggregator which matches the configuration's -/// [`Condition`](relay_config::aggregator::Condition). +/// Metrics are routed to the first aggregator which matches the configuration's [`Condition`]. /// If no condition matches, the metric/bucket is routed to the `default_aggregator`. pub struct RouterService { - default_config: AggregatorServiceConfig, - secondary_configs: Vec, - receiver: Option>, + default: AggregatorService, + secondary: Vec<(AggregatorService, Condition)>, } impl RouterService { @@ -33,11 +29,24 @@ impl RouterService { secondary_configs: Vec, receiver: Option>, ) -> Self { - Self { - default_config, - secondary_configs, - receiver, + let mut secondary = Vec::new(); + + for c in secondary_configs { + let service = AggregatorService::named(c.name, c.config, receiver.clone()); + secondary.push((service, c.condition)); } + + let default = AggregatorService::new(default_config, receiver); + Self { default, secondary } + } + + pub fn handle(&self) -> RouterHandle { + let mut handles = vec![self.default.handle()]; + for (aggregator, _) in &self.secondary { + handles.push(aggregator.handle()); + } + + RouterHandle(handles) } } @@ -75,30 +84,22 @@ struct StartedRouter { impl StartedRouter { fn start(router: RouterService) -> Self { - let RouterService { - default_config, - secondary_configs, - receiver, - } = router; + let RouterService { default, secondary } = router; - let secondary = secondary_configs + let secondary = secondary .into_iter() - .map(|c| { - let addr = AggregatorService::named(c.name, c.config, receiver.clone()).start(); - (c.condition, addr) - }) - .map(|(cond, agg)| { + .map(|(aggregator, condition)| { let namespaces: Vec<_> = MetricNamespace::all() .into_iter() - .filter(|&namespace| cond.matches(Some(namespace))) + .filter(|&namespace| condition.matches(Some(namespace))) .collect(); - (agg, namespaces) + (aggregator.start(), namespaces) }) .collect(); Self { - default: AggregatorService::new(default_config, receiver).start(), + default: default.start(), secondary, } } @@ -110,22 +111,6 @@ impl StartedRouter { message = ty, { match message { - Aggregator::AcceptsMetrics(_, sender) => { - let mut requests = self - .secondary - .iter() - .map(|(agg, _)| agg.send(AcceptsMetrics)) - .chain(Some(self.default.send(AcceptsMetrics))) - .collect::>(); - - tokio::spawn(async move { - let mut accepts = true; - while let Some(req) = requests.next().await { - accepts &= req.unwrap_or_default(); - } - sender.send(accepts); - }); - } Aggregator::MergeBuckets(msg) => self.handle_merge_buckets(msg), #[cfg(test)] Aggregator::BucketCountInquiry(_, _sender) => (), // not supported @@ -160,3 +145,14 @@ impl StartedRouter { } } } + +/// Provides sync access to the state of the [`RouterService`]. +#[derive(Clone, Debug)] +pub struct RouterHandle(Vec); + +impl RouterHandle { + /// Returns `true` if all the aggregators can still accept metrics. + pub fn can_accept_metrics(&self) -> bool { + self.0.iter().all(|ah| ah.can_accept_metrics()) + } +}