diff --git a/relay-server/src/lib.rs b/relay-server/src/lib.rs index 923a78c56d..9bade220a0 100644 --- a/relay-server/src/lib.rs +++ b/relay-server/src/lib.rs @@ -281,6 +281,7 @@ use std::sync::Arc; use futures::StreamExt; use relay_config::Config; use relay_system::{Controller, Service}; +use tokio::select; use crate::service::ServiceState; use crate::services::server::HttpServer; @@ -302,16 +303,30 @@ pub fn run(config: Config) -> anyhow::Result<()> { // information on all services. main_runtime.block_on(async { Controller::start(config.shutdown_timeout()); - let service = ServiceState::start(config.clone())?; + let (service, mut join_handles) = ServiceState::start(config.clone())?; HttpServer::new(config, service.clone())?.start(); - for x in service.join_handles() {} - // while let Some(res) = service.join_handles().next() { + loop { + select! { + Some(res) = join_handles.next() => { + match res { + Ok(()) => { + relay_log::trace!("Service exited normally."); + } + Err(e) => { + if e.is_panic() { + std::panic::resume_unwind(e.into_panic()); + } + } + } + } + _ = Controller::shutdown_handle().finished() => { + break + } + else => break + } + } - // } - - // TODO: await simultaneously - Controller::shutdown_handle().finished().await; anyhow::Ok(()) })?; diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 10bc72fe92..55094bb4df 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -143,7 +143,6 @@ struct StateInner { config: Arc, memory_checker: MemoryChecker, registry: Registry, - join_handles: Vec>, } /// Server state. @@ -154,7 +153,7 @@ pub struct ServiceState { impl ServiceState { /// Starts all services and returns addresses to all of them. - pub fn start(config: Arc) -> Result { + pub fn start(config: Arc) -> Result<(Self, FuturesUnordered>)> { let upstream_relay = UpstreamRelayService::new(config.clone()).start(); let test_store = TestStoreService::new(config.clone()).start(); @@ -311,18 +310,19 @@ impl ServiceState { config: config.clone(), memory_checker: MemoryChecker::new(memory_stat, config.clone()), registry, - join_handles: { - let mut j = Vec::from_iter([processor_handle]); - if let Some((_, handle)) = envelope_buffer { - j.push(handle); - } - j - }, }; - Ok(ServiceState { - inner: Arc::new(state), - }) + let join_handles = FuturesUnordered::from_iter([processor_handle]); + if let Some((_, handle)) = envelope_buffer { + join_handles.push(handle); + }; + + Ok(( + ServiceState { + inner: Arc::new(state), + }, + join_handles, + )) } /// Returns a reference to the Relay configuration. @@ -386,10 +386,6 @@ impl ServiceState { pub fn outcome_aggregator(&self) -> &Addr { &self.inner.registry.outcome_aggregator } - - pub fn join_handles(&self) -> &[JoinHandle<()>] { - self.inner.join_handles.as_slice() - } } fn create_redis_pool( diff --git a/relay-server/src/services/server.rs b/relay-server/src/services/server.rs index a842659603..ec2670fbbd 100644 --- a/relay-server/src/services/server.rs +++ b/relay-server/src/services/server.rs @@ -189,9 +189,9 @@ fn serve(listener: TcpListener, app: App, config: Arc) -> tokio::task::J .keep_alive_timeout(config.keepalive_timeout()); let service = ServiceExt::::into_make_service_with_connect_info::(app); - let server_handle = tokio::spawn(server.serve(service)); + let _server_handle = tokio::spawn(server.serve(service)); - let shutdown_handle = tokio::spawn(async move { + tokio::spawn(async move { let Shutdown { timeout } = Controller::shutdown_handle().notified().await; relay_log::info!("Shutting down HTTP server"); @@ -199,9 +199,7 @@ fn serve(listener: TcpListener, app: App, config: Arc) -> tokio::task::J Some(timeout) => handle.graceful_shutdown(Some(timeout)), None => handle.shutdown(), } - }); - - server_handle // TODO: return both + }) // TODO: return both } /// HTTP server service.