Skip to content

Commit

Permalink
Handle accept errors inside Listener::accept
Browse files Browse the repository at this point in the history
  • Loading branch information
jplatte committed Nov 30, 2024
1 parent 8c030a3 commit 0b34a14
Showing 1 changed file with 39 additions and 42 deletions.
81 changes: 39 additions & 42 deletions axum/src/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@ pub trait Listener: Send + 'static {
/// The listener's address type.
type Addr: Send;

/// Accept a new incoming connection to this listener
fn accept(&mut self) -> impl Future<Output = io::Result<(Self::Io, Self::Addr)>> + Send;
/// Accept a new incoming connection to this listener.
///
/// If the underlying accept call can return an error, this function must
/// take care of logging and retrying.
fn accept(&mut self) -> impl Future<Output = (Self::Io, Self::Addr)> + Send;

/// Returns the local address that this listener is bound to.
fn local_addr(&self) -> io::Result<Self::Addr>;
Expand All @@ -44,9 +47,13 @@ impl Listener for TcpListener {
type Io = TcpStream;
type Addr = std::net::SocketAddr;

#[inline]
async fn accept(&mut self) -> io::Result<(Self::Io, Self::Addr)> {
Self::accept(self).await
async fn accept(&mut self) -> (Self::Io, Self::Addr) {
loop {
match Self::accept(self).await {
Ok(tup) => return tup,
Err(e) => handle_accept_error(e).await,
}
}
}

#[inline]
Expand All @@ -60,9 +67,13 @@ impl Listener for tokio::net::UnixListener {
type Io = tokio::net::UnixStream;
type Addr = tokio::net::unix::SocketAddr;

#[inline]
async fn accept(&mut self) -> io::Result<(Self::Io, Self::Addr)> {
Self::accept(self).await
async fn accept(&mut self) -> (Self::Io, Self::Addr) {
loop {
match Self::accept(self).await {
Ok(tup) => return tup,
Err(e) => handle_accept_error(e).await,
}
}
}

#[inline]
Expand Down Expand Up @@ -400,12 +411,7 @@ where

loop {
let (io, remote_addr) = tokio::select! {
conn = accept(&mut listener) => {
match conn {
Some(conn) => conn,
None => continue,
}
}
conn = listener.accept() => conn,
_ = signal_tx.closed() => {
trace!("signal received, not accepting new connections");
break;
Expand Down Expand Up @@ -497,33 +503,24 @@ fn is_connection_error(e: &io::Error) -> bool {
)
}

async fn accept<L>(listener: &mut L) -> Option<(L::Io, L::Addr)>
where
L: Listener,
{
match listener.accept().await {
Ok(conn) => Some(conn),
Err(e) => {
if is_connection_error(&e) {
return None;
}

// [From `hyper::Server` in 0.14](https://github.com/hyperium/hyper/blob/v0.14.27/src/server/tcp.rs#L186)
//
// > A possible scenario is that the process has hit the max open files
// > allowed, and so trying to accept a new connection will fail with
// > `EMFILE`. In some cases, it's preferable to just wait for some time, if
// > the application will likely close some files (or connections), and try
// > to accept the connection again. If this option is `true`, the error
// > will be logged at the `error` level, since it is still a big deal,
// > and then the listener will sleep for 1 second.
//
// hyper allowed customizing this but axum does not.
error!("accept error: {e}");
tokio::time::sleep(Duration::from_secs(1)).await;
None
}
async fn handle_accept_error(e: io::Error) {
if is_connection_error(&e) {
return;
}

// [From `hyper::Server` in 0.14](https://github.com/hyperium/hyper/blob/v0.14.27/src/server/tcp.rs#L186)
//
// > A possible scenario is that the process has hit the max open files
// > allowed, and so trying to accept a new connection will fail with
// > `EMFILE`. In some cases, it's preferable to just wait for some time, if
// > the application will likely close some files (or connections), and try
// > to accept the connection again. If this option is `true`, the error
// > will be logged at the `error` level, since it is still a big deal,
// > and then the listener will sleep for 1 second.
//
// hyper allowed customizing this but axum does not.
error!("accept error: {e}");
tokio::time::sleep(Duration::from_secs(1)).await;
}

/// An incoming stream.
Expand Down Expand Up @@ -757,9 +754,9 @@ mod tests {
type Io = T;
type Addr = ();

async fn accept(&mut self) -> io::Result<(Self::Io, Self::Addr)> {
async fn accept(&mut self) -> (Self::Io, Self::Addr) {
match self.0.take() {
Some(server) => Ok((server, ())),
Some(server) => (server, ()),
None => std::future::pending().await,
}
}
Expand Down

0 comments on commit 0b34a14

Please sign in to comment.