Skip to content

Commit

Permalink
Move serve::Listener into its own module
Browse files Browse the repository at this point in the history
  • Loading branch information
jplatte committed Nov 30, 2024
1 parent c000243 commit e8644d8
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 97 deletions.
113 changes: 16 additions & 97 deletions axum/src/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use std::{
io,
marker::PhantomData,
sync::Arc,
time::Duration,
};

use axum_core::{body::Body, extract::Request, response::Response};
Expand All @@ -17,70 +16,13 @@ use hyper::body::Incoming;
use hyper_util::rt::{TokioExecutor, TokioIo};
#[cfg(any(feature = "http1", feature = "http2"))]
use hyper_util::{server::conn::auto::Builder, service::TowerToHyperService};
use tokio::{
io::{AsyncRead, AsyncWrite},
net::{TcpListener, TcpStream},
sync::watch,
};
use tokio::{net::TcpListener, sync::watch};
use tower::ServiceExt as _;
use tower_service::Service;

/// Types that can listen for connections.
pub trait Listener: Send + 'static {
/// The listener's IO type.
type Io: AsyncRead + AsyncWrite + Unpin + Send + 'static;

/// The listener's address type.
type Addr: Send;

/// Accept a new incoming connection to this listener.
///
/// If the underlying accept call can return an error, this function must
/// take care of logging and retrying.
fn accept(&mut self) -> impl Future<Output = (Self::Io, Self::Addr)> + Send;

/// Returns the local address that this listener is bound to.
fn local_addr(&self) -> io::Result<Self::Addr>;
}

impl Listener for TcpListener {
type Io = TcpStream;
type Addr = std::net::SocketAddr;

async fn accept(&mut self) -> (Self::Io, Self::Addr) {
loop {
match Self::accept(self).await {
Ok(tup) => return tup,
Err(e) => handle_accept_error(e).await,
}
}
}

#[inline]
fn local_addr(&self) -> io::Result<Self::Addr> {
Self::local_addr(self)
}
}
mod listener;

#[cfg(unix)]
impl Listener for tokio::net::UnixListener {
type Io = tokio::net::UnixStream;
type Addr = tokio::net::unix::SocketAddr;

async fn accept(&mut self) -> (Self::Io, Self::Addr) {
loop {
match Self::accept(self).await {
Ok(tup) => return tup,
Err(e) => handle_accept_error(e).await,
}
}
}

#[inline]
fn local_addr(&self) -> io::Result<Self::Addr> {
Self::local_addr(self)
}
}
pub use self::listener::Listener;

/// Serve the service with the supplied listener.
///
Expand Down Expand Up @@ -416,35 +358,6 @@ where
}
}

fn is_connection_error(e: &io::Error) -> bool {
matches!(
e.kind(),
io::ErrorKind::ConnectionRefused
| io::ErrorKind::ConnectionAborted
| io::ErrorKind::ConnectionReset
)
}

async fn handle_accept_error(e: io::Error) {
if is_connection_error(&e) {
return;
}

// [From `hyper::Server` in 0.14](https://github.com/hyperium/hyper/blob/v0.14.27/src/server/tcp.rs#L186)
//
// > A possible scenario is that the process has hit the max open files
// > allowed, and so trying to accept a new connection will fail with
// > `EMFILE`. In some cases, it's preferable to just wait for some time, if
// > the application will likely close some files (or connections), and try
// > to accept the connection again. If this option is `true`, the error
// > will be logged at the `error` level, since it is still a big deal,
// > and then the listener will sleep for 1 second.
//
// hyper allowed customizing this but axum does not.
error!("accept error: {e}");
tokio::time::sleep(Duration::from_secs(1)).await;
}

/// An incoming stream.
///
/// Used with [`serve`] and [`IntoMakeServiceWithConnectInfo`].
Expand Down Expand Up @@ -502,21 +415,27 @@ mod private {

#[cfg(test)]
mod tests {
use std::{
future::{pending, IntoFuture as _},
net::{IpAddr, Ipv4Addr},
};

use axum_core::{body::Body, extract::Request};
use http::StatusCode;
use tokio::net::UnixListener;
use hyper_util::rt::TokioIo;
use tokio::{
io::{self, AsyncRead, AsyncWrite},
net::{TcpListener, UnixListener},
};

use super::*;
use super::{serve, IncomingStream, Listener};
use crate::{
body::to_bytes,
extract::connect_info::Connected,
handler::{Handler, HandlerWithoutStateExt},
routing::get,
Router,
};
use std::{
future::pending,
net::{IpAddr, Ipv4Addr},
};

#[allow(dead_code, unused_must_use)]
async fn if_it_compiles_it_works() {
Expand Down Expand Up @@ -674,7 +593,7 @@ mod tests {
}
}

let (client, server) = tokio::io::duplex(1024);
let (client, server) = io::duplex(1024);
let listener = ReadyListener(Some(server));

let app = Router::new().route("/", get(|| async { "Hello, World!" }));
Expand Down
92 changes: 92 additions & 0 deletions axum/src/serve/listener.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use std::{future::Future, time::Duration};

use tokio::{
io::{self, AsyncRead, AsyncWrite},
net::{TcpListener, TcpStream},
};

/// Types that can listen for connections.
pub trait Listener: Send + 'static {
/// The listener's IO type.
type Io: AsyncRead + AsyncWrite + Unpin + Send + 'static;

/// The listener's address type.
type Addr: Send;

/// Accept a new incoming connection to this listener.
///
/// If the underlying accept call can return an error, this function must
/// take care of logging and retrying.
fn accept(&mut self) -> impl Future<Output = (Self::Io, Self::Addr)> + Send;

/// Returns the local address that this listener is bound to.
fn local_addr(&self) -> io::Result<Self::Addr>;
}

impl Listener for TcpListener {
type Io = TcpStream;
type Addr = std::net::SocketAddr;

async fn accept(&mut self) -> (Self::Io, Self::Addr) {
loop {
match Self::accept(self).await {
Ok(tup) => return tup,
Err(e) => handle_accept_error(e).await,
}
}
}

#[inline]
fn local_addr(&self) -> io::Result<Self::Addr> {
Self::local_addr(self)
}
}

#[cfg(unix)]
impl Listener for tokio::net::UnixListener {
type Io = tokio::net::UnixStream;
type Addr = tokio::net::unix::SocketAddr;

async fn accept(&mut self) -> (Self::Io, Self::Addr) {
loop {
match Self::accept(self).await {
Ok(tup) => return tup,
Err(e) => handle_accept_error(e).await,
}
}
}

#[inline]
fn local_addr(&self) -> io::Result<Self::Addr> {
Self::local_addr(self)
}
}

async fn handle_accept_error(e: io::Error) {
if is_connection_error(&e) {
return;
}

// [From `hyper::Server` in 0.14](https://github.com/hyperium/hyper/blob/v0.14.27/src/server/tcp.rs#L186)
//
// > A possible scenario is that the process has hit the max open files
// > allowed, and so trying to accept a new connection will fail with
// > `EMFILE`. In some cases, it's preferable to just wait for some time, if
// > the application will likely close some files (or connections), and try
// > to accept the connection again. If this option is `true`, the error
// > will be logged at the `error` level, since it is still a big deal,
// > and then the listener will sleep for 1 second.
//
// hyper allowed customizing this but axum does not.
error!("accept error: {e}");
tokio::time::sleep(Duration::from_secs(1)).await;
}

fn is_connection_error(e: &io::Error) -> bool {
matches!(
e.kind(),
io::ErrorKind::ConnectionRefused
| io::ErrorKind::ConnectionAborted
| io::ErrorKind::ConnectionReset
)
}

0 comments on commit e8644d8

Please sign in to comment.