From f35650e3dd83b687fa9a6af6a5ad7b363e953dd9 Mon Sep 17 00:00:00 2001 From: David Herberth Date: Mon, 5 Aug 2024 14:53:01 +0200 Subject: [PATCH] ref(metrics): Make metrics aggregator health check sync (#3894) Makes the health check sync, it occassionally times out due to metric flushes taking longer than the threshold, this is generally not a problem and small short backlogs in the aggregator are expected (due to the flushes), the health checks shouldn't be failing because of it. The health check itself is very useful though, we should not be ready if the aggregator would reject metrics. --- relay-metrics/src/aggregator.rs | 86 ++++----------- relay-server/src/service.rs | 7 +- relay-server/src/services/health_check.rs | 11 +- .../src/services/metrics/aggregator.rs | 103 ++++++++++++------ relay-server/src/services/metrics/router.rs | 82 +++++++------- tests/integration/test_healthchecks.py | 23 ++++ 6 files changed, 161 insertions(+), 151 deletions(-) diff --git a/relay-metrics/src/aggregator.rs b/relay-metrics/src/aggregator.rs index 0719b86898..b0aab5fec4 100644 --- a/relay-metrics/src/aggregator.rs +++ b/relay-metrics/src/aggregator.rs @@ -1,7 +1,6 @@ //! Core functionality of metrics aggregation. use std::collections::BTreeMap; -use std::error::Error; use std::hash::{Hash, Hasher}; use std::time::Duration; use std::{fmt, mem}; @@ -23,20 +22,7 @@ use hashbrown::HashMap; /// Any error that may occur during aggregation. #[derive(Debug, Error, PartialEq)] -#[error("failed to aggregate metrics: {kind}")] -pub struct AggregateMetricsError { - kind: AggregateMetricsErrorKind, -} - -impl From for AggregateMetricsError { - fn from(kind: AggregateMetricsErrorKind) -> Self { - AggregateMetricsError { kind } - } -} - -#[derive(Debug, Error, PartialEq)] -#[allow(clippy::enum_variant_names)] -enum AggregateMetricsErrorKind { +pub enum AggregateMetricsError { /// A metric bucket had invalid characters in the metric name. #[error("found invalid characters: {0}")] InvalidCharacters(MetricName), @@ -336,12 +322,12 @@ impl QueuedBucket { &mut self, value: BucketValue, metadata: BucketMetadata, - ) -> Result { + ) -> Result { let cost_before = self.value.cost(); self.value .merge(value) - .map_err(|_| AggregateMetricsErrorKind::InvalidTypes)?; + .map_err(|_| AggregateMetricsError::InvalidTypes)?; self.metadata.merge(metadata); Ok(self.value.cost().saturating_sub(cost_before)) @@ -398,7 +384,7 @@ impl CostTracker { relay_log::configure_scope(|scope| { scope.set_extra("bucket.project_key", project_key.as_str().to_owned().into()); }); - return Err(AggregateMetricsErrorKind::TotalLimitExceeded.into()); + return Err(AggregateMetricsError::TotalLimitExceeded); } if let Some(max_project_cost) = max_project_cost { @@ -411,7 +397,7 @@ impl CostTracker { relay_log::configure_scope(|scope| { scope.set_extra("bucket.project_key", project_key.as_str().to_owned().into()); }); - return Err(AggregateMetricsErrorKind::ProjectLimitExceeded.into()); + return Err(AggregateMetricsError::ProjectLimitExceeded); } } @@ -704,7 +690,7 @@ impl Aggregator { aggregator = &self.name, ); - return Err(AggregateMetricsErrorKind::InvalidTimestamp(timestamp).into()); + return Err(AggregateMetricsError::InvalidTimestamp(timestamp)); } Ok(bucket_ts) @@ -783,7 +769,7 @@ impl Aggregator { }); if let Some(error) = error { - return Err(error.into()); + return Err(error); } if !updated { @@ -811,31 +797,6 @@ impl Aggregator { Ok(()) } - - /// Merges all given `buckets` into this aggregator. - /// - /// Buckets that do not exist yet will be created. - pub fn merge_all( - &mut self, - project_key: ProjectKey, - buckets: impl IntoIterator, - max_total_bucket_bytes: Option, - ) { - for bucket in buckets.into_iter() { - if let Err(error) = self.merge(project_key, bucket, max_total_bucket_bytes) { - match &error.kind { - // Ignore invalid timestamp errors. - AggregateMetricsErrorKind::InvalidTimestamp(_) => {} - _other => { - relay_log::error!( - tags.aggregator = self.name, - bucket.error = &error as &dyn Error - ); - } - } - } - } - } } impl fmt::Debug for Aggregator { @@ -874,7 +835,7 @@ fn validate_metric_name( aggregator_config.max_name_length, key.metric_name ); - return Err(AggregateMetricsErrorKind::InvalidStringLength(key.metric_name).into()); + return Err(AggregateMetricsError::InvalidStringLength(key.metric_name)); } normalize_metric_name(&mut key)?; @@ -887,16 +848,16 @@ fn normalize_metric_name(key: &mut BucketKey) -> Result<(), AggregateMetricsErro Ok(mri) => { if matches!(mri.namespace, MetricNamespace::Unsupported) { relay_log::debug!("invalid metric namespace {:?}", &key.metric_name); - return Err(AggregateMetricsErrorKind::UnsupportedNamespace(mri.namespace).into()); + return Err(AggregateMetricsError::UnsupportedNamespace(mri.namespace)); } mri.to_string().into() } Err(_) => { relay_log::debug!("invalid metric name {:?}", &key.metric_name); - return Err( - AggregateMetricsErrorKind::InvalidCharacters(key.metric_name.clone()).into(), - ); + return Err(AggregateMetricsError::InvalidCharacters( + key.metric_name.clone(), + )); } }; @@ -1406,9 +1367,8 @@ mod tests { assert!(matches!( aggregator .get_bucket_timestamp(UnixTimestamp::from_secs(u64::MAX), 2) - .unwrap_err() - .kind, - AggregateMetricsErrorKind::InvalidTimestamp(_) + .unwrap_err(), + AggregateMetricsError::InvalidTimestamp(_) )); } @@ -1534,9 +1494,7 @@ mod tests { assert!(matches!( validation.unwrap_err(), - AggregateMetricsError { - kind: AggregateMetricsErrorKind::InvalidStringLength(_) - } + AggregateMetricsError::InvalidStringLength(_), )); let short_metric_long_tag_key = BucketKey { @@ -1598,11 +1556,8 @@ mod tests { .unwrap(); assert_eq!( - aggregator - .merge(project_key, bucket, Some(1)) - .unwrap_err() - .kind, - AggregateMetricsErrorKind::TotalLimitExceeded + aggregator.merge(project_key, bucket, Some(1)).unwrap_err(), + AggregateMetricsError::TotalLimitExceeded ); } @@ -1627,11 +1582,8 @@ mod tests { aggregator.merge(project_key, bucket.clone(), None).unwrap(); assert_eq!( - aggregator - .merge(project_key, bucket, None) - .unwrap_err() - .kind, - AggregateMetricsErrorKind::ProjectLimitExceeded + aggregator.merge(project_key, bucket, None).unwrap_err(), + AggregateMetricsError::ProjectLimitExceeded ); } diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 571ab646e8..0770449bd3 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -190,8 +190,9 @@ impl ServiceState { config.default_aggregator_config().clone(), config.secondary_aggregator_configs().clone(), Some(project_cache.clone().recipient()), - ) - .start(); + ); + let aggregator_handle = aggregator.handle(); + let aggregator = aggregator.start(); let metric_stats = MetricStats::new( config.clone(), @@ -268,7 +269,7 @@ impl ServiceState { let health_check = HealthCheckService::new( config.clone(), MemoryChecker::new(memory_stat.clone(), config.clone()), - aggregator.clone(), + aggregator_handle, upstream_relay.clone(), project_cache.clone(), ) diff --git a/relay-server/src/services/health_check.rs b/relay-server/src/services/health_check.rs index a32750297c..2899fc14b3 100644 --- a/relay-server/src/services/health_check.rs +++ b/relay-server/src/services/health_check.rs @@ -6,7 +6,7 @@ use std::future::Future; use tokio::sync::watch; use tokio::time::{timeout, Instant}; -use crate::services::metrics::{AcceptsMetrics, Aggregator}; +use crate::services::metrics::RouterHandle; use crate::services::project_cache::{ProjectCache, SpoolHealth}; use crate::services::upstream::{IsAuthenticated, UpstreamRelay}; use crate::statsd::RelayTimers; @@ -84,7 +84,7 @@ impl StatusUpdate { pub struct HealthCheckService { config: Arc, memory_checker: MemoryChecker, - aggregator: Addr, + aggregator: RouterHandle, upstream_relay: Addr, project_cache: Addr, } @@ -96,7 +96,7 @@ impl HealthCheckService { pub fn new( config: Arc, memory_checker: MemoryChecker, - aggregator: Addr, + aggregator: RouterHandle, upstream_relay: Addr, project_cache: Addr, ) -> Self { @@ -147,10 +147,7 @@ impl HealthCheckService { } async fn aggregator_probe(&self) -> Status { - self.aggregator - .send(AcceptsMetrics) - .await - .map_or(Status::Unhealthy, Status::from) + Status::from(self.aggregator.can_accept_metrics()) } async fn spool_health_probe(&self) -> Status { diff --git a/relay-server/src/services/metrics/aggregator.rs b/relay-server/src/services/metrics/aggregator.rs index 6d916d73f0..320c5494ca 100644 --- a/relay-server/src/services/metrics/aggregator.rs +++ b/relay-server/src/services/metrics/aggregator.rs @@ -1,13 +1,13 @@ +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; use std::time::Duration; use hashbrown::HashMap; use relay_base_schema::project::ProjectKey; use relay_config::AggregatorServiceConfig; +use relay_metrics::aggregator::AggregateMetricsError; use relay_metrics::{aggregator, Bucket}; -use relay_system::{ - AsyncResponse, Controller, FromMessage, Interface, NoResponse, Recipient, Sender, Service, - Shutdown, -}; +use relay_system::{Controller, FromMessage, Interface, NoResponse, Recipient, Service, Shutdown}; use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers}; @@ -23,21 +23,18 @@ use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers}; /// Receivers must implement a handler for the [`FlushBuckets`] message. #[derive(Debug)] pub enum Aggregator { - /// The health check message which makes sure that the service can accept the requests now. - AcceptsMetrics(AcceptsMetrics, Sender), /// Merge the buckets. MergeBuckets(MergeBuckets), /// Message is used only for tests to get the current number of buckets in `AggregatorService`. #[cfg(test)] - BucketCountInquiry(BucketCountInquiry, Sender), + BucketCountInquiry(BucketCountInquiry, relay_system::Sender), } impl Aggregator { /// Returns the name of the message variant. pub fn variant(&self) -> &'static str { match self { - Aggregator::AcceptsMetrics(_, _) => "AcceptsMetrics", Aggregator::MergeBuckets(_) => "MergeBuckets", #[cfg(test)] Aggregator::BucketCountInquiry(_, _) => "BucketCountInquiry", @@ -47,13 +44,6 @@ impl Aggregator { impl Interface for Aggregator {} -impl FromMessage for Aggregator { - type Response = AsyncResponse; - fn from_message(message: AcceptsMetrics, sender: Sender) -> Self { - Self::AcceptsMetrics(message, sender) - } -} - impl FromMessage for Aggregator { type Response = NoResponse; fn from_message(message: MergeBuckets, _: ()) -> Self { @@ -63,16 +53,12 @@ impl FromMessage for Aggregator { #[cfg(test)] impl FromMessage for Aggregator { - type Response = AsyncResponse; - fn from_message(message: BucketCountInquiry, sender: Sender) -> Self { + type Response = relay_system::AsyncResponse; + fn from_message(message: BucketCountInquiry, sender: relay_system::Sender) -> Self { Self::BucketCountInquiry(message, sender) } } -/// Check whether the aggregator has not (yet) exceeded its total limits. Used for health checks. -#[derive(Debug)] -pub struct AcceptsMetrics; - /// Used only for testing the `AggregatorService`. #[cfg(test)] #[derive(Debug)] @@ -106,6 +92,7 @@ pub struct AggregatorService { receiver: Option>, max_total_bucket_bytes: Option, flush_interval_ms: u64, + can_accept_metrics: Arc, } impl AggregatorService { @@ -126,21 +113,23 @@ impl AggregatorService { config: AggregatorServiceConfig, receiver: Option>, ) -> Self { + let aggregator = aggregator::Aggregator::named(name, config.aggregator); Self { receiver, state: AggregatorState::Running, max_total_bucket_bytes: config.max_total_bucket_bytes, - aggregator: aggregator::Aggregator::named(name, config.aggregator), flush_interval_ms: config.flush_interval_ms, + can_accept_metrics: Arc::new(AtomicBool::new( + !aggregator.totals_cost_exceeded(config.max_total_bucket_bytes), + )), + aggregator, } } - fn handle_accepts_metrics(&self, sender: Sender) { - let result = !self - .aggregator - .totals_cost_exceeded(self.max_total_bucket_bytes); - - sender.send(result); + pub fn handle(&self) -> AggregatorHandle { + AggregatorHandle { + can_accept_metrics: Arc::clone(&self.can_accept_metrics), + } } /// Sends the [`FlushBuckets`] message to the receiver in the fire and forget fashion. It is up @@ -152,6 +141,13 @@ impl AggregatorService { let force_flush = matches!(&self.state, AggregatorState::ShuttingDown); let partitions = self.aggregator.pop_flush_buckets(force_flush); + self.can_accept_metrics.store( + !self + .aggregator + .totals_cost_exceeded(self.max_total_bucket_bytes), + Ordering::Relaxed, + ); + if partitions.is_empty() { return; } @@ -196,8 +192,41 @@ impl AggregatorService { project_key, buckets, } = msg; - self.aggregator - .merge_all(project_key, buckets, self.max_total_bucket_bytes); + + for bucket in buckets.into_iter() { + match self + .aggregator + .merge(project_key, bucket, self.max_total_bucket_bytes) + { + // Ignore invalid timestamp errors. + Err(AggregateMetricsError::InvalidTimestamp(_)) => {} + Err(AggregateMetricsError::TotalLimitExceeded) => { + relay_log::error!( + tags.aggregator = self.aggregator.name(), + "aggregator limit exceeded" + ); + self.can_accept_metrics.store(false, Ordering::Relaxed); + break; + } + Err(AggregateMetricsError::ProjectLimitExceeded) => { + relay_log::error!( + tags.aggregator = self.aggregator.name(), + tags.project_key = project_key.as_str(), + "project metrics limit exceeded for project {project_key}" + ); + break; + } + Err(error) => { + relay_log::error!( + tags.aggregator = self.aggregator.name(), + tags.project_key = project_key.as_str(), + bucket.error = &error as &dyn std::error::Error, + "failed to aggregate metric bucket" + ); + } + Ok(()) => {} + }; + } } fn handle_message(&mut self, message: Aggregator) { @@ -207,7 +236,6 @@ impl AggregatorService { message = ty, { match message { - Aggregator::AcceptsMetrics(_, sender) => self.handle_accepts_metrics(sender), Aggregator::MergeBuckets(msg) => self.handle_merge_buckets(msg), #[cfg(test)] Aggregator::BucketCountInquiry(_, sender) => { @@ -283,6 +311,19 @@ impl MergeBuckets { } } +/// Provides sync access to the state of the [`AggregatorService`]. +#[derive(Debug, Clone)] +pub struct AggregatorHandle { + can_accept_metrics: Arc, +} + +impl AggregatorHandle { + /// Returns `true` if the aggregator can still accept metrics. + pub fn can_accept_metrics(&self) -> bool { + self.can_accept_metrics.load(Ordering::Relaxed) + } +} + #[cfg(test)] mod tests { use std::collections::BTreeMap; diff --git a/relay-server/src/services/metrics/router.rs b/relay-server/src/services/metrics/router.rs index 6b7de838f3..522eb729f1 100644 --- a/relay-server/src/services/metrics/router.rs +++ b/relay-server/src/services/metrics/router.rs @@ -1,15 +1,13 @@ //! Routing logic for metrics. Metrics from different namespaces may be routed to different aggregators, //! with their own limits, bucket intervals, etc. -use futures::stream::FuturesUnordered; -use futures::StreamExt; - +use relay_config::aggregator::Condition; use relay_config::{AggregatorServiceConfig, ScopedAggregatorConfig}; use relay_metrics::MetricNamespace; use relay_system::{Addr, NoResponse, Recipient, Service}; use crate::services::metrics::{ - AcceptsMetrics, Aggregator, AggregatorService, FlushBuckets, MergeBuckets, + Aggregator, AggregatorHandle, AggregatorService, FlushBuckets, MergeBuckets, }; use crate::statsd::RelayTimers; use crate::utils; @@ -17,13 +15,11 @@ use crate::utils; /// Service that routes metrics & metric buckets to the appropriate aggregator. /// /// Each aggregator gets its own configuration. -/// Metrics are routed to the first aggregator which matches the configuration's -/// [`Condition`](relay_config::aggregator::Condition). +/// Metrics are routed to the first aggregator which matches the configuration's [`Condition`]. /// If no condition matches, the metric/bucket is routed to the `default_aggregator`. pub struct RouterService { - default_config: AggregatorServiceConfig, - secondary_configs: Vec, - receiver: Option>, + default: AggregatorService, + secondary: Vec<(AggregatorService, Condition)>, } impl RouterService { @@ -33,11 +29,24 @@ impl RouterService { secondary_configs: Vec, receiver: Option>, ) -> Self { - Self { - default_config, - secondary_configs, - receiver, + let mut secondary = Vec::new(); + + for c in secondary_configs { + let service = AggregatorService::named(c.name, c.config, receiver.clone()); + secondary.push((service, c.condition)); } + + let default = AggregatorService::new(default_config, receiver); + Self { default, secondary } + } + + pub fn handle(&self) -> RouterHandle { + let mut handles = vec![self.default.handle()]; + for (aggregator, _) in &self.secondary { + handles.push(aggregator.handle()); + } + + RouterHandle(handles) } } @@ -75,30 +84,22 @@ struct StartedRouter { impl StartedRouter { fn start(router: RouterService) -> Self { - let RouterService { - default_config, - secondary_configs, - receiver, - } = router; + let RouterService { default, secondary } = router; - let secondary = secondary_configs + let secondary = secondary .into_iter() - .map(|c| { - let addr = AggregatorService::named(c.name, c.config, receiver.clone()).start(); - (c.condition, addr) - }) - .map(|(cond, agg)| { + .map(|(aggregator, condition)| { let namespaces: Vec<_> = MetricNamespace::all() .into_iter() - .filter(|&namespace| cond.matches(Some(namespace))) + .filter(|&namespace| condition.matches(Some(namespace))) .collect(); - (agg, namespaces) + (aggregator.start(), namespaces) }) .collect(); Self { - default: AggregatorService::new(default_config, receiver).start(), + default: default.start(), secondary, } } @@ -110,22 +111,6 @@ impl StartedRouter { message = ty, { match message { - Aggregator::AcceptsMetrics(_, sender) => { - let mut requests = self - .secondary - .iter() - .map(|(agg, _)| agg.send(AcceptsMetrics)) - .chain(Some(self.default.send(AcceptsMetrics))) - .collect::>(); - - tokio::spawn(async move { - let mut accepts = true; - while let Some(req) = requests.next().await { - accepts &= req.unwrap_or_default(); - } - sender.send(accepts); - }); - } Aggregator::MergeBuckets(msg) => self.handle_merge_buckets(msg), #[cfg(test)] Aggregator::BucketCountInquiry(_, _sender) => (), // not supported @@ -160,3 +145,14 @@ impl StartedRouter { } } } + +/// Provides sync access to the state of the [`RouterService`]. +#[derive(Clone, Debug)] +pub struct RouterHandle(Vec); + +impl RouterHandle { + /// Returns `true` if all the aggregators can still accept metrics. + pub fn can_accept_metrics(&self) -> bool { + self.0.iter().all(|ah| ah.can_accept_metrics()) + } +} diff --git a/tests/integration/test_healthchecks.py b/tests/integration/test_healthchecks.py index 7a1505b2ba..e2f9ee3ed0 100644 --- a/tests/integration/test_healthchecks.py +++ b/tests/integration/test_healthchecks.py @@ -123,6 +123,29 @@ def test_readiness_depends_on_aggregator_being_full(mini_sentry, relay): assert response.status_code == 503 +def test_readiness_depends_on_aggregator_being_full_after_metrics(mini_sentry, relay): + relay = relay( + mini_sentry, + {"aggregator": {"max_total_bucket_bytes": 1}}, + ) + + metrics_payload = "transactions/foo:42|c\ntransactions/bar:17|c" + relay.send_metrics(42, metrics_payload) + + for _ in range(100): + response = wait_get(relay, "/api/relay/healthcheck/ready/") + print(response, response.status_code) + if response.status_code == 503: + error = str(mini_sentry.test_failures.pop()) + assert "Health check probe 'aggregator'" in error + error = str(mini_sentry.test_failures.pop()) + assert "aggregator limit exceeded" in error + return + time.sleep(0.1) + + assert False, "health check never failed" + + def test_readiness_disk_spool(mini_sentry, relay): try: temp = tempfile.mkdtemp()