Skip to content

Commit

Permalink
resume_unwind
Browse files Browse the repository at this point in the history
  • Loading branch information
jjbayer committed Sep 11, 2024
1 parent fd0ae7a commit ffbf7dc
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 28 deletions.
29 changes: 22 additions & 7 deletions relay-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ use std::sync::Arc;
use futures::StreamExt;
use relay_config::Config;
use relay_system::{Controller, Service};
use tokio::select;

use crate::service::ServiceState;
use crate::services::server::HttpServer;
Expand All @@ -302,16 +303,30 @@ pub fn run(config: Config) -> anyhow::Result<()> {
// information on all services.
main_runtime.block_on(async {
Controller::start(config.shutdown_timeout());
let service = ServiceState::start(config.clone())?;
let (service, mut join_handles) = ServiceState::start(config.clone())?;
HttpServer::new(config, service.clone())?.start();

for x in service.join_handles() {}
// while let Some(res) = service.join_handles().next() {
loop {
select! {
Some(res) = join_handles.next() => {
match res {
Ok(()) => {
relay_log::trace!("Service exited normally.");
}
Err(e) => {
if e.is_panic() {
std::panic::resume_unwind(e.into_panic());
}
}
}
}
_ = Controller::shutdown_handle().finished() => {
break
}
else => break
}
}

// }

// TODO: await simultaneously
Controller::shutdown_handle().finished().await;
anyhow::Ok(())
})?;

Expand Down
28 changes: 12 additions & 16 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ struct StateInner {
config: Arc<Config>,
memory_checker: MemoryChecker,
registry: Registry,
join_handles: Vec<JoinHandle<()>>,
}

/// Server state.
Expand All @@ -154,7 +153,7 @@ pub struct ServiceState {

impl ServiceState {
/// Starts all services and returns addresses to all of them.
pub fn start(config: Arc<Config>) -> Result<Self> {
pub fn start(config: Arc<Config>) -> Result<(Self, FuturesUnordered<JoinHandle<()>>)> {
let upstream_relay = UpstreamRelayService::new(config.clone()).start();
let test_store = TestStoreService::new(config.clone()).start();

Expand Down Expand Up @@ -311,18 +310,19 @@ impl ServiceState {
config: config.clone(),
memory_checker: MemoryChecker::new(memory_stat, config.clone()),
registry,
join_handles: {
let mut j = Vec::from_iter([processor_handle]);
if let Some((_, handle)) = envelope_buffer {
j.push(handle);
}
j
},
};

Ok(ServiceState {
inner: Arc::new(state),
})
let join_handles = FuturesUnordered::from_iter([processor_handle]);
if let Some((_, handle)) = envelope_buffer {
join_handles.push(handle);
};

Ok((
ServiceState {
inner: Arc::new(state),
},
join_handles,
))
}

/// Returns a reference to the Relay configuration.
Expand Down Expand Up @@ -386,10 +386,6 @@ impl ServiceState {
pub fn outcome_aggregator(&self) -> &Addr<TrackOutcome> {
&self.inner.registry.outcome_aggregator
}

pub fn join_handles(&self) -> &[JoinHandle<()>] {
self.inner.join_handles.as_slice()
}
}

fn create_redis_pool(
Expand Down
8 changes: 3 additions & 5 deletions relay-server/src/services/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,19 +189,17 @@ fn serve(listener: TcpListener, app: App, config: Arc<Config>) -> tokio::task::J
.keep_alive_timeout(config.keepalive_timeout());

let service = ServiceExt::<Request>::into_make_service_with_connect_info::<SocketAddr>(app);
let server_handle = tokio::spawn(server.serve(service));
let _server_handle = tokio::spawn(server.serve(service));

let shutdown_handle = tokio::spawn(async move {
tokio::spawn(async move {
let Shutdown { timeout } = Controller::shutdown_handle().notified().await;
relay_log::info!("Shutting down HTTP server");

match timeout {
Some(timeout) => handle.graceful_shutdown(Some(timeout)),
None => handle.shutdown(),
}
});

server_handle // TODO: return both
}) // TODO: return both
}

/// HTTP server service.
Expand Down

0 comments on commit ffbf7dc

Please sign in to comment.