Skip to content

Commit

Permalink
fix(buffer): Replace spool health check
Browse files Browse the repository at this point in the history
  • Loading branch information
jjbayer committed Sep 20, 2024
1 parent d7ac14d commit 1288b4e
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 127 deletions.
2 changes: 1 addition & 1 deletion relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ impl ServiceState {
MemoryChecker::new(memory_stat.clone(), config.clone()),
aggregator_handle,
upstream_relay.clone(),
project_cache.clone(),
envelope_buffer.clone(),
)
.start();

Expand Down
20 changes: 12 additions & 8 deletions relay-server/src/services/health_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use std::future::Future;
use tokio::sync::watch;
use tokio::time::{timeout, Instant};

use crate::services::buffer::ObservableEnvelopeBuffer;
use crate::services::metrics::RouterHandle;
use crate::services::project_cache::{ProjectCache, SpoolHealth};
use crate::services::upstream::{IsAuthenticated, UpstreamRelay};
use crate::statsd::RelayTimers;
use crate::utils::{MemoryCheck, MemoryChecker};
Expand Down Expand Up @@ -86,7 +86,7 @@ pub struct HealthCheckService {
memory_checker: MemoryChecker,
aggregator: RouterHandle,
upstream_relay: Addr<UpstreamRelay>,
project_cache: Addr<ProjectCache>,
envelope_buffer: Option<ObservableEnvelopeBuffer>, // make non-optional once V1 has been removed
}

impl HealthCheckService {
Expand All @@ -98,14 +98,14 @@ impl HealthCheckService {
memory_checker: MemoryChecker,
aggregator: RouterHandle,
upstream_relay: Addr<UpstreamRelay>,
project_cache: Addr<ProjectCache>,
envelope_buffer: Option<ObservableEnvelopeBuffer>,
) -> Self {
Self {
config,
memory_checker,
aggregator,
upstream_relay,
project_cache,
envelope_buffer,
}
}

Expand Down Expand Up @@ -151,10 +151,14 @@ impl HealthCheckService {
}

async fn spool_health_probe(&self) -> Status {
self.project_cache
.send(SpoolHealth)
.await
.map_or(Status::Unhealthy, Status::from)
let has_capacity = self
.envelope_buffer
.as_ref()
.map_or(true, |b| b.has_capacity());
match has_capacity {
true => Status::Healthy,
false => Status::Unhealthy,
}
}

async fn probe(&self, name: &'static str, fut: impl Future<Output = Status>) -> Status {
Expand Down
15 changes: 0 additions & 15 deletions relay-server/src/services/project_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,6 @@ impl UpdateSpoolIndex {
}
}

/// Checks the status of underlying buffer spool.
#[derive(Debug)]
pub struct SpoolHealth;

/// The current envelopes index fetched from the underlying buffer spool.
///
/// This index will be received only once shortly after startup and will trigger refresh for the
Expand Down Expand Up @@ -293,7 +289,6 @@ pub enum ProjectCache {
AddMetricMeta(AddMetricMeta),
FlushBuckets(FlushBuckets),
UpdateSpoolIndex(UpdateSpoolIndex),
SpoolHealth(Sender<bool>),
RefreshIndexCache(RefreshIndexCache),
HandleDequeuedEnvelope(Box<Envelope>, Sender<()>),
UpdateProject(ProjectKey),
Expand All @@ -312,7 +307,6 @@ impl ProjectCache {
Self::AddMetricMeta(_) => "AddMetricMeta",
Self::FlushBuckets(_) => "FlushBuckets",
Self::UpdateSpoolIndex(_) => "UpdateSpoolIndex",
Self::SpoolHealth(_) => "SpoolHealth",
Self::RefreshIndexCache(_) => "RefreshIndexCache",
Self::HandleDequeuedEnvelope(_, _) => "HandleDequeuedEnvelope",
Self::UpdateProject(_) => "UpdateProject",
Expand Down Expand Up @@ -413,14 +407,6 @@ impl FromMessage<FlushBuckets> for ProjectCache {
}
}

impl FromMessage<SpoolHealth> for ProjectCache {
type Response = relay_system::AsyncResponse<bool>;

fn from_message(_message: SpoolHealth, sender: Sender<bool>) -> Self {
Self::SpoolHealth(sender)
}
}

impl FromMessage<DequeuedEnvelope> for ProjectCache {
type Response = relay_system::AsyncResponse<()>;

Expand Down Expand Up @@ -1304,7 +1290,6 @@ impl ProjectCacheBroker {
ProjectCache::AddMetricMeta(message) => self.handle_add_metric_meta(message),
ProjectCache::FlushBuckets(message) => self.handle_flush_buckets(message),
ProjectCache::UpdateSpoolIndex(message) => self.handle_buffer_index(message),
ProjectCache::SpoolHealth(sender) => self.handle_spool_health(sender),
ProjectCache::RefreshIndexCache(message) => {
self.handle_refresh_index_cache(message)
}
Expand Down
103 changes: 0 additions & 103 deletions relay-server/src/services/spooler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1322,15 +1322,13 @@ impl Drop for BufferService {
mod tests {
use insta::assert_debug_snapshot;
use rand::Rng;
use relay_system::AsyncResponse;
use relay_test::mock_service;
use sqlx::ConnectOptions;
use std::str::FromStr;
use std::sync::Mutex;
use std::time::{Duration, Instant};
use uuid::Uuid;

use crate::services::project_cache::SpoolHealth;
use crate::testutils::empty_envelope;
use crate::utils::MemoryStat;

Expand Down Expand Up @@ -1564,107 +1562,6 @@ mod tests {
"###);
}

pub enum TestHealth {
SpoolHealth(Sender<bool>),
}

impl Interface for TestHealth {}

impl FromMessage<SpoolHealth> for TestHealth {
type Response = AsyncResponse<bool>;

fn from_message(_message: SpoolHealth, sender: Sender<bool>) -> Self {
Self::SpoolHealth(sender)
}
}

pub struct TestHealthService {
buffer: Addr<Buffer>,
}

impl TestHealthService {
fn new(buffer: Addr<Buffer>) -> Self {
Self { buffer }
}
}

impl Service for TestHealthService {
type Interface = TestHealth;

fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
tokio::spawn(async move {
loop {
tokio::select! {
Some(TestHealth::SpoolHealth(sender)) = rx.recv() => self.buffer.send(Health(sender)),
else => break,
}
}
});
}
}

#[tokio::test]
async fn health_check_fails() {
relay_log::init_test!();

let config: Arc<_> = Config::from_json_value(serde_json::json!({
"spool": {
"envelopes": {
"path": std::env::temp_dir().join(Uuid::new_v4().to_string()),
"max_memory_size": 0, // 0 bytes, to force to spool to disk all the envelopes.
"max_disk_size": 0,
},
"health": {
"max_memory_percent": 0.0
}
}
}))
.unwrap()
.into();
let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone());

let buffer = BufferService::create(memory_checker, services(), config)
.await
.unwrap();

let addr = buffer.start();

let health_service = TestHealthService::new(addr.clone()).start();
let healthy = health_service.send(SpoolHealth).await.unwrap();
assert!(!healthy);
}

#[tokio::test]
async fn health_check_succeeds() {
relay_log::init_test!();

let config: Arc<_> = Config::from_json_value(serde_json::json!({
"spool": {
"envelopes": {
"path": std::env::temp_dir().join(Uuid::new_v4().to_string()),
"max_memory_size": 0, // 0 bytes, to force to spool to disk all the envelopes.
"max_disk_size": "100KB",
},
"health": {
"max_memory_percent": 1.0
}
}
}))
.unwrap()
.into();
let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone());

let buffer = BufferService::create(memory_checker, services(), config)
.await
.unwrap();

let addr = buffer.start();

let health_service = TestHealthService::new(addr.clone()).start();
let healthy = health_service.send(SpoolHealth).await.unwrap();
assert!(healthy);
}

#[tokio::test]
async fn index_restore() {
relay_log::init_test!();
Expand Down

0 comments on commit 1288b4e

Please sign in to comment.