From 0b34a1434abb5c408442e7899ad5454a7c5a1586 Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Sat, 30 Nov 2024 15:31:28 +0100 Subject: [PATCH] Handle accept errors inside Listener::accept --- axum/src/serve.rs | 81 +++++++++++++++++++++++------------------------ 1 file changed, 39 insertions(+), 42 deletions(-) diff --git a/axum/src/serve.rs b/axum/src/serve.rs index 30715455af..d137a6c5b7 100644 --- a/axum/src/serve.rs +++ b/axum/src/serve.rs @@ -33,8 +33,11 @@ pub trait Listener: Send + 'static { /// The listener's address type. type Addr: Send; - /// Accept a new incoming connection to this listener - fn accept(&mut self) -> impl Future> + Send; + /// Accept a new incoming connection to this listener. + /// + /// If the underlying accept call can return an error, this function must + /// take care of logging and retrying. + fn accept(&mut self) -> impl Future + Send; /// Returns the local address that this listener is bound to. fn local_addr(&self) -> io::Result; @@ -44,9 +47,13 @@ impl Listener for TcpListener { type Io = TcpStream; type Addr = std::net::SocketAddr; - #[inline] - async fn accept(&mut self) -> io::Result<(Self::Io, Self::Addr)> { - Self::accept(self).await + async fn accept(&mut self) -> (Self::Io, Self::Addr) { + loop { + match Self::accept(self).await { + Ok(tup) => return tup, + Err(e) => handle_accept_error(e).await, + } + } } #[inline] @@ -60,9 +67,13 @@ impl Listener for tokio::net::UnixListener { type Io = tokio::net::UnixStream; type Addr = tokio::net::unix::SocketAddr; - #[inline] - async fn accept(&mut self) -> io::Result<(Self::Io, Self::Addr)> { - Self::accept(self).await + async fn accept(&mut self) -> (Self::Io, Self::Addr) { + loop { + match Self::accept(self).await { + Ok(tup) => return tup, + Err(e) => handle_accept_error(e).await, + } + } } #[inline] @@ -400,12 +411,7 @@ where loop { let (io, remote_addr) = tokio::select! { - conn = accept(&mut listener) => { - match conn { - Some(conn) => conn, - None => continue, - } - } + conn = listener.accept() => conn, _ = signal_tx.closed() => { trace!("signal received, not accepting new connections"); break; @@ -497,33 +503,24 @@ fn is_connection_error(e: &io::Error) -> bool { ) } -async fn accept(listener: &mut L) -> Option<(L::Io, L::Addr)> -where - L: Listener, -{ - match listener.accept().await { - Ok(conn) => Some(conn), - Err(e) => { - if is_connection_error(&e) { - return None; - } - - // [From `hyper::Server` in 0.14](https://github.com/hyperium/hyper/blob/v0.14.27/src/server/tcp.rs#L186) - // - // > A possible scenario is that the process has hit the max open files - // > allowed, and so trying to accept a new connection will fail with - // > `EMFILE`. In some cases, it's preferable to just wait for some time, if - // > the application will likely close some files (or connections), and try - // > to accept the connection again. If this option is `true`, the error - // > will be logged at the `error` level, since it is still a big deal, - // > and then the listener will sleep for 1 second. - // - // hyper allowed customizing this but axum does not. - error!("accept error: {e}"); - tokio::time::sleep(Duration::from_secs(1)).await; - None - } +async fn handle_accept_error(e: io::Error) { + if is_connection_error(&e) { + return; } + + // [From `hyper::Server` in 0.14](https://github.com/hyperium/hyper/blob/v0.14.27/src/server/tcp.rs#L186) + // + // > A possible scenario is that the process has hit the max open files + // > allowed, and so trying to accept a new connection will fail with + // > `EMFILE`. In some cases, it's preferable to just wait for some time, if + // > the application will likely close some files (or connections), and try + // > to accept the connection again. If this option is `true`, the error + // > will be logged at the `error` level, since it is still a big deal, + // > and then the listener will sleep for 1 second. + // + // hyper allowed customizing this but axum does not. + error!("accept error: {e}"); + tokio::time::sleep(Duration::from_secs(1)).await; } /// An incoming stream. @@ -757,9 +754,9 @@ mod tests { type Io = T; type Addr = (); - async fn accept(&mut self) -> io::Result<(Self::Io, Self::Addr)> { + async fn accept(&mut self) -> (Self::Io, Self::Addr) { match self.0.take() { - Some(server) => Ok((server, ())), + Some(server) => (server, ()), None => std::future::pending().await, } }