From e8644d85adda4e926044b00b49a6c747cbd07679 Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Sat, 30 Nov 2024 16:12:45 +0100 Subject: [PATCH] Move serve::Listener into its own module --- axum/src/serve.rs | 113 ++++++------------------------------- axum/src/serve/listener.rs | 92 ++++++++++++++++++++++++++++++ 2 files changed, 108 insertions(+), 97 deletions(-) create mode 100644 axum/src/serve/listener.rs diff --git a/axum/src/serve.rs b/axum/src/serve.rs index c87e2dc1d3..9ebf4048f4 100644 --- a/axum/src/serve.rs +++ b/axum/src/serve.rs @@ -8,7 +8,6 @@ use std::{ io, marker::PhantomData, sync::Arc, - time::Duration, }; use axum_core::{body::Body, extract::Request, response::Response}; @@ -17,70 +16,13 @@ use hyper::body::Incoming; use hyper_util::rt::{TokioExecutor, TokioIo}; #[cfg(any(feature = "http1", feature = "http2"))] use hyper_util::{server::conn::auto::Builder, service::TowerToHyperService}; -use tokio::{ - io::{AsyncRead, AsyncWrite}, - net::{TcpListener, TcpStream}, - sync::watch, -}; +use tokio::{net::TcpListener, sync::watch}; use tower::ServiceExt as _; use tower_service::Service; -/// Types that can listen for connections. -pub trait Listener: Send + 'static { - /// The listener's IO type. - type Io: AsyncRead + AsyncWrite + Unpin + Send + 'static; - - /// The listener's address type. - type Addr: Send; - - /// Accept a new incoming connection to this listener. - /// - /// If the underlying accept call can return an error, this function must - /// take care of logging and retrying. - fn accept(&mut self) -> impl Future + Send; - - /// Returns the local address that this listener is bound to. - fn local_addr(&self) -> io::Result; -} - -impl Listener for TcpListener { - type Io = TcpStream; - type Addr = std::net::SocketAddr; - - async fn accept(&mut self) -> (Self::Io, Self::Addr) { - loop { - match Self::accept(self).await { - Ok(tup) => return tup, - Err(e) => handle_accept_error(e).await, - } - } - } - - #[inline] - fn local_addr(&self) -> io::Result { - Self::local_addr(self) - } -} +mod listener; -#[cfg(unix)] -impl Listener for tokio::net::UnixListener { - type Io = tokio::net::UnixStream; - type Addr = tokio::net::unix::SocketAddr; - - async fn accept(&mut self) -> (Self::Io, Self::Addr) { - loop { - match Self::accept(self).await { - Ok(tup) => return tup, - Err(e) => handle_accept_error(e).await, - } - } - } - - #[inline] - fn local_addr(&self) -> io::Result { - Self::local_addr(self) - } -} +pub use self::listener::Listener; /// Serve the service with the supplied listener. /// @@ -416,35 +358,6 @@ where } } -fn is_connection_error(e: &io::Error) -> bool { - matches!( - e.kind(), - io::ErrorKind::ConnectionRefused - | io::ErrorKind::ConnectionAborted - | io::ErrorKind::ConnectionReset - ) -} - -async fn handle_accept_error(e: io::Error) { - if is_connection_error(&e) { - return; - } - - // [From `hyper::Server` in 0.14](https://github.com/hyperium/hyper/blob/v0.14.27/src/server/tcp.rs#L186) - // - // > A possible scenario is that the process has hit the max open files - // > allowed, and so trying to accept a new connection will fail with - // > `EMFILE`. In some cases, it's preferable to just wait for some time, if - // > the application will likely close some files (or connections), and try - // > to accept the connection again. If this option is `true`, the error - // > will be logged at the `error` level, since it is still a big deal, - // > and then the listener will sleep for 1 second. - // - // hyper allowed customizing this but axum does not. - error!("accept error: {e}"); - tokio::time::sleep(Duration::from_secs(1)).await; -} - /// An incoming stream. /// /// Used with [`serve`] and [`IntoMakeServiceWithConnectInfo`]. @@ -502,10 +415,20 @@ mod private { #[cfg(test)] mod tests { + use std::{ + future::{pending, IntoFuture as _}, + net::{IpAddr, Ipv4Addr}, + }; + + use axum_core::{body::Body, extract::Request}; use http::StatusCode; - use tokio::net::UnixListener; + use hyper_util::rt::TokioIo; + use tokio::{ + io::{self, AsyncRead, AsyncWrite}, + net::{TcpListener, UnixListener}, + }; - use super::*; + use super::{serve, IncomingStream, Listener}; use crate::{ body::to_bytes, extract::connect_info::Connected, @@ -513,10 +436,6 @@ mod tests { routing::get, Router, }; - use std::{ - future::pending, - net::{IpAddr, Ipv4Addr}, - }; #[allow(dead_code, unused_must_use)] async fn if_it_compiles_it_works() { @@ -674,7 +593,7 @@ mod tests { } } - let (client, server) = tokio::io::duplex(1024); + let (client, server) = io::duplex(1024); let listener = ReadyListener(Some(server)); let app = Router::new().route("/", get(|| async { "Hello, World!" })); diff --git a/axum/src/serve/listener.rs b/axum/src/serve/listener.rs new file mode 100644 index 0000000000..0f8754e908 --- /dev/null +++ b/axum/src/serve/listener.rs @@ -0,0 +1,92 @@ +use std::{future::Future, time::Duration}; + +use tokio::{ + io::{self, AsyncRead, AsyncWrite}, + net::{TcpListener, TcpStream}, +}; + +/// Types that can listen for connections. +pub trait Listener: Send + 'static { + /// The listener's IO type. + type Io: AsyncRead + AsyncWrite + Unpin + Send + 'static; + + /// The listener's address type. + type Addr: Send; + + /// Accept a new incoming connection to this listener. + /// + /// If the underlying accept call can return an error, this function must + /// take care of logging and retrying. + fn accept(&mut self) -> impl Future + Send; + + /// Returns the local address that this listener is bound to. + fn local_addr(&self) -> io::Result; +} + +impl Listener for TcpListener { + type Io = TcpStream; + type Addr = std::net::SocketAddr; + + async fn accept(&mut self) -> (Self::Io, Self::Addr) { + loop { + match Self::accept(self).await { + Ok(tup) => return tup, + Err(e) => handle_accept_error(e).await, + } + } + } + + #[inline] + fn local_addr(&self) -> io::Result { + Self::local_addr(self) + } +} + +#[cfg(unix)] +impl Listener for tokio::net::UnixListener { + type Io = tokio::net::UnixStream; + type Addr = tokio::net::unix::SocketAddr; + + async fn accept(&mut self) -> (Self::Io, Self::Addr) { + loop { + match Self::accept(self).await { + Ok(tup) => return tup, + Err(e) => handle_accept_error(e).await, + } + } + } + + #[inline] + fn local_addr(&self) -> io::Result { + Self::local_addr(self) + } +} + +async fn handle_accept_error(e: io::Error) { + if is_connection_error(&e) { + return; + } + + // [From `hyper::Server` in 0.14](https://github.com/hyperium/hyper/blob/v0.14.27/src/server/tcp.rs#L186) + // + // > A possible scenario is that the process has hit the max open files + // > allowed, and so trying to accept a new connection will fail with + // > `EMFILE`. In some cases, it's preferable to just wait for some time, if + // > the application will likely close some files (or connections), and try + // > to accept the connection again. If this option is `true`, the error + // > will be logged at the `error` level, since it is still a big deal, + // > and then the listener will sleep for 1 second. + // + // hyper allowed customizing this but axum does not. + error!("accept error: {e}"); + tokio::time::sleep(Duration::from_secs(1)).await; +} + +fn is_connection_error(e: &io::Error) -> bool { + matches!( + e.kind(), + io::ErrorKind::ConnectionRefused + | io::ErrorKind::ConnectionAborted + | io::ErrorKind::ConnectionReset + ) +}