From 1288b4eb6f5dda0c96d2f90485e1188bd50353d8 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Fri, 20 Sep 2024 13:48:46 +0200 Subject: [PATCH] fix(buffer): Replace spool health check --- relay-server/src/service.rs | 2 +- relay-server/src/services/health_check.rs | 20 ++-- relay-server/src/services/project_cache.rs | 15 --- relay-server/src/services/spooler/mod.rs | 103 --------------------- 4 files changed, 13 insertions(+), 127 deletions(-) diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 289014f858..78076bc068 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -280,7 +280,7 @@ impl ServiceState { MemoryChecker::new(memory_stat.clone(), config.clone()), aggregator_handle, upstream_relay.clone(), - project_cache.clone(), + envelope_buffer.clone(), ) .start(); diff --git a/relay-server/src/services/health_check.rs b/relay-server/src/services/health_check.rs index 2899fc14b3..867ddc3330 100644 --- a/relay-server/src/services/health_check.rs +++ b/relay-server/src/services/health_check.rs @@ -6,8 +6,8 @@ use std::future::Future; use tokio::sync::watch; use tokio::time::{timeout, Instant}; +use crate::services::buffer::ObservableEnvelopeBuffer; use crate::services::metrics::RouterHandle; -use crate::services::project_cache::{ProjectCache, SpoolHealth}; use crate::services::upstream::{IsAuthenticated, UpstreamRelay}; use crate::statsd::RelayTimers; use crate::utils::{MemoryCheck, MemoryChecker}; @@ -86,7 +86,7 @@ pub struct HealthCheckService { memory_checker: MemoryChecker, aggregator: RouterHandle, upstream_relay: Addr, - project_cache: Addr, + envelope_buffer: Option, // make non-optional once V1 has been removed } impl HealthCheckService { @@ -98,14 +98,14 @@ impl HealthCheckService { memory_checker: MemoryChecker, aggregator: RouterHandle, upstream_relay: Addr, - project_cache: Addr, + envelope_buffer: Option, ) -> Self { Self { config, memory_checker, aggregator, upstream_relay, - project_cache, + envelope_buffer, } } @@ -151,10 +151,14 @@ impl HealthCheckService { } async fn spool_health_probe(&self) -> Status { - self.project_cache - .send(SpoolHealth) - .await - .map_or(Status::Unhealthy, Status::from) + let has_capacity = self + .envelope_buffer + .as_ref() + .map_or(true, |b| b.has_capacity()); + match has_capacity { + true => Status::Healthy, + false => Status::Unhealthy, + } } async fn probe(&self, name: &'static str, fut: impl Future) -> Status { diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index be595f340a..d5cffdbbec 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -243,10 +243,6 @@ impl UpdateSpoolIndex { } } -/// Checks the status of underlying buffer spool. -#[derive(Debug)] -pub struct SpoolHealth; - /// The current envelopes index fetched from the underlying buffer spool. /// /// This index will be received only once shortly after startup and will trigger refresh for the @@ -293,7 +289,6 @@ pub enum ProjectCache { AddMetricMeta(AddMetricMeta), FlushBuckets(FlushBuckets), UpdateSpoolIndex(UpdateSpoolIndex), - SpoolHealth(Sender), RefreshIndexCache(RefreshIndexCache), HandleDequeuedEnvelope(Box, Sender<()>), UpdateProject(ProjectKey), @@ -312,7 +307,6 @@ impl ProjectCache { Self::AddMetricMeta(_) => "AddMetricMeta", Self::FlushBuckets(_) => "FlushBuckets", Self::UpdateSpoolIndex(_) => "UpdateSpoolIndex", - Self::SpoolHealth(_) => "SpoolHealth", Self::RefreshIndexCache(_) => "RefreshIndexCache", Self::HandleDequeuedEnvelope(_, _) => "HandleDequeuedEnvelope", Self::UpdateProject(_) => "UpdateProject", @@ -413,14 +407,6 @@ impl FromMessage for ProjectCache { } } -impl FromMessage for ProjectCache { - type Response = relay_system::AsyncResponse; - - fn from_message(_message: SpoolHealth, sender: Sender) -> Self { - Self::SpoolHealth(sender) - } -} - impl FromMessage for ProjectCache { type Response = relay_system::AsyncResponse<()>; @@ -1304,7 +1290,6 @@ impl ProjectCacheBroker { ProjectCache::AddMetricMeta(message) => self.handle_add_metric_meta(message), ProjectCache::FlushBuckets(message) => self.handle_flush_buckets(message), ProjectCache::UpdateSpoolIndex(message) => self.handle_buffer_index(message), - ProjectCache::SpoolHealth(sender) => self.handle_spool_health(sender), ProjectCache::RefreshIndexCache(message) => { self.handle_refresh_index_cache(message) } diff --git a/relay-server/src/services/spooler/mod.rs b/relay-server/src/services/spooler/mod.rs index 00265b0e92..a1bbc3bffe 100644 --- a/relay-server/src/services/spooler/mod.rs +++ b/relay-server/src/services/spooler/mod.rs @@ -1322,7 +1322,6 @@ impl Drop for BufferService { mod tests { use insta::assert_debug_snapshot; use rand::Rng; - use relay_system::AsyncResponse; use relay_test::mock_service; use sqlx::ConnectOptions; use std::str::FromStr; @@ -1330,7 +1329,6 @@ mod tests { use std::time::{Duration, Instant}; use uuid::Uuid; - use crate::services::project_cache::SpoolHealth; use crate::testutils::empty_envelope; use crate::utils::MemoryStat; @@ -1564,107 +1562,6 @@ mod tests { "###); } - pub enum TestHealth { - SpoolHealth(Sender), - } - - impl Interface for TestHealth {} - - impl FromMessage for TestHealth { - type Response = AsyncResponse; - - fn from_message(_message: SpoolHealth, sender: Sender) -> Self { - Self::SpoolHealth(sender) - } - } - - pub struct TestHealthService { - buffer: Addr, - } - - impl TestHealthService { - fn new(buffer: Addr) -> Self { - Self { buffer } - } - } - - impl Service for TestHealthService { - type Interface = TestHealth; - - fn spawn_handler(self, mut rx: relay_system::Receiver) { - tokio::spawn(async move { - loop { - tokio::select! { - Some(TestHealth::SpoolHealth(sender)) = rx.recv() => self.buffer.send(Health(sender)), - else => break, - } - } - }); - } - } - - #[tokio::test] - async fn health_check_fails() { - relay_log::init_test!(); - - let config: Arc<_> = Config::from_json_value(serde_json::json!({ - "spool": { - "envelopes": { - "path": std::env::temp_dir().join(Uuid::new_v4().to_string()), - "max_memory_size": 0, // 0 bytes, to force to spool to disk all the envelopes. - "max_disk_size": 0, - }, - "health": { - "max_memory_percent": 0.0 - } - } - })) - .unwrap() - .into(); - let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone()); - - let buffer = BufferService::create(memory_checker, services(), config) - .await - .unwrap(); - - let addr = buffer.start(); - - let health_service = TestHealthService::new(addr.clone()).start(); - let healthy = health_service.send(SpoolHealth).await.unwrap(); - assert!(!healthy); - } - - #[tokio::test] - async fn health_check_succeeds() { - relay_log::init_test!(); - - let config: Arc<_> = Config::from_json_value(serde_json::json!({ - "spool": { - "envelopes": { - "path": std::env::temp_dir().join(Uuid::new_v4().to_string()), - "max_memory_size": 0, // 0 bytes, to force to spool to disk all the envelopes. - "max_disk_size": "100KB", - }, - "health": { - "max_memory_percent": 1.0 - } - } - })) - .unwrap() - .into(); - let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone()); - - let buffer = BufferService::create(memory_checker, services(), config) - .await - .unwrap(); - - let addr = buffer.start(); - - let health_service = TestHealthService::new(addr.clone()).start(); - let healthy = health_service.send(SpoolHealth).await.unwrap(); - assert!(healthy); - } - #[tokio::test] async fn index_restore() { relay_log::init_test!();