Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace futures-util with futures-lite #2469

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion axum-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ __private_docs = ["dep:tower-http"]
[dependencies]
async-trait = "0.1.67"
bytes = "1.0"
futures-util = { version = "0.3", default-features = false, features = ["alloc"] }
futures-core = { version = "0.3.0", default-features = false }
futures-lite = { version = "2.0.0", default-features = false }
http = "1.0.0"
http-body = "1.0.0"
http-body-util = "0.1.0"
Expand Down
8 changes: 4 additions & 4 deletions axum-core/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

use crate::{BoxError, Error};
use bytes::Bytes;
use futures_util::stream::Stream;
use futures_util::TryStream;
use futures_core::TryStream;
use futures_lite::prelude::*;
use http_body::{Body as _, Frame};
use http_body_util::BodyExt;
use pin_project_lite::pin_project;
Expand Down Expand Up @@ -147,7 +147,7 @@ impl Stream for BodyDataStream {
#[inline]
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
match futures_util::ready!(Pin::new(&mut self.inner).poll_frame(cx)?) {
match futures_lite::ready!(Pin::new(&mut self.inner).poll_frame(cx)?) {
Some(frame) => match frame.into_data() {
Ok(data) => return Poll::Ready(Some(Ok(data))),
Err(_frame) => {}
Expand Down Expand Up @@ -202,7 +202,7 @@ where
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let stream = self.project().stream.get_pin_mut();
match futures_util::ready!(stream.try_poll_next(cx)) {
match futures_lite::ready!(stream.try_poll_next(cx)) {
Some(Ok(chunk)) => Poll::Ready(Some(Ok(Frame::data(chunk.into())))),
Some(Err(err)) => Poll::Ready(Some(Err(Error::new(err)))),
None => Poll::Ready(None),
Expand Down
5 changes: 5 additions & 0 deletions axum-core/src/ext_traits/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
pub(crate) mod request;
pub(crate) mod request_parts;

use core::future::Future;
use core::pin::Pin;

type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;

#[cfg(test)]
mod tests {
use std::convert::Infallible;
Expand Down
3 changes: 2 additions & 1 deletion axum-core/src/ext_traits/request.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::body::Body;
use crate::extract::{DefaultBodyLimitKind, FromRequest, FromRequestParts, Request};
use futures_util::future::BoxFuture;

use super::BoxFuture;

mod sealed {
pub trait Sealed {}
Expand Down
3 changes: 2 additions & 1 deletion axum-core/src/ext_traits/request_parts.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::extract::FromRequestParts;
use futures_util::future::BoxFuture;
use http::request::Parts;

use super::BoxFuture;

mod sealed {
pub trait Sealed {}
impl Sealed for http::request::Parts {}
Expand Down
8 changes: 5 additions & 3 deletions axum/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ query = ["dep:serde_urlencoded"]
tokio = ["dep:hyper-util", "dep:tokio", "tokio/net", "tokio/rt", "tower/make", "tokio/macros"]
tower-log = ["tower/log"]
tracing = ["dep:tracing", "axum-core/tracing"]
ws = ["dep:hyper", "tokio", "dep:tokio-tungstenite", "dep:sha1", "dep:base64"]
ws = ["dep:hyper", "tokio", "dep:tokio-tungstenite", "dep:sha1", "dep:base64", "dep:futures-sink"]

# Required for intra-doc links to resolve correctly
__private_docs = ["tower/full", "dep:tower-http"]
Expand All @@ -44,7 +44,9 @@ __private_docs = ["tower/full", "dep:tower-http"]
async-trait = "0.1.67"
axum-core = { path = "../axum-core", version = "0.4.3" }
bytes = "1.0"
futures-util = { version = "0.3", default-features = false, features = ["alloc"] }
futures-core = { version = "0.3", default-features = false, features = ["alloc"] }
futures-lite = { version = "2.0.0", default-features = false, features = ["alloc"] }
futures-sink = { version = "0.3", default-features = false, features = ["alloc"], optional = true }
http = "1.0.0"
http-body = "1.0.0"
http-body-util = "0.1.0"
Expand Down Expand Up @@ -115,6 +117,7 @@ rustversion = "1.0.9"
[dev-dependencies]
anyhow = "1.0"
axum-macros = { path = "../axum-macros", version = "0.4.1", features = ["__private"] }
futures-util = "0.3"
quickcheck = "1.0"
quickcheck_macros = "1.0"
reqwest = { version = "0.11.14", default-features = false, features = ["json", "stream", "multipart"] }
Expand Down Expand Up @@ -197,7 +200,6 @@ allowed = [
# not 1.0
"futures_core",
"futures_sink",
"futures_util",
"tower_layer",
"tower_service",

Expand Down
2 changes: 1 addition & 1 deletion axum/src/extract/multipart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use axum_core::{
response::{IntoResponse, Response},
RequestExt,
};
use futures_util::stream::Stream;
use futures_lite::stream::Stream;
use http::{
header::{HeaderMap, CONTENT_TYPE},
StatusCode,
Expand Down
24 changes: 18 additions & 6 deletions axum/src/extract/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,11 @@ use super::FromRequestParts;
use crate::{body::Bytes, response::Response, Error};
use async_trait::async_trait;
use axum_core::body::Body;
use futures_util::{
sink::{Sink, SinkExt},
use futures_lite::{
future::poll_fn,
stream::{Stream, StreamExt},
};
use futures_sink::Sink;
use http::{
header::{self, HeaderMap, HeaderName, HeaderValue},
request::Parts,
Expand Down Expand Up @@ -158,7 +159,9 @@ impl<F> WebSocketUpgrade<F> {
/// If set to `0` each message will be eagerly written to the underlying stream.
/// It is often more optimal to allow them to buffer a little, hence the default value.
///
/// Note: [`flush`](SinkExt::flush) will always fully write the buffer regardless.
/// Note: [`flush`] will always fully write the buffer regardless.
///
/// [`flush`]: https://docs.rs/futures-util/latest/futures_util/sink/trait.SinkExt.html#method.flush
pub fn write_buffer_size(mut self, size: usize) -> Self {
self.config.write_buffer_size = size;
self
Expand Down Expand Up @@ -470,8 +473,16 @@ impl WebSocket {

/// Send a message.
pub async fn send(&mut self, msg: Message) -> Result<(), Error> {
self.inner
.send(msg.into_tungstenite())
// Start sending the message.
poll_fn(|cx| Pin::new(&mut self.inner).poll_ready(cx))
.await
.map_err(Error::new)?;
Pin::new(&mut self.inner)
.start_send(msg.into_tungstenite())
.map_err(Error::new)?;

// Send the message.
poll_fn(|cx| Pin::new(&mut self.inner).poll_flush(cx))
.await
.map_err(Error::new)
}
Expand All @@ -492,7 +503,7 @@ impl Stream for WebSocket {

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
match futures_util::ready!(self.inner.poll_next_unpin(cx)) {
match futures_lite::ready!(self.inner.poll_next(cx)) {
Some(Ok(msg)) => {
if let Some(msg) = Message::from_tungstenite(msg) {
return Poll::Ready(Some(Ok(msg)));
Expand Down Expand Up @@ -834,6 +845,7 @@ mod tests {

use super::*;
use crate::{body::Body, routing::get, test_helpers::spawn_service, Router};
use futures_util::sink::SinkExt;
use http::{Request, Version};
use tokio_tungstenite::tungstenite;
use tower::ServiceExt;
Expand Down
8 changes: 4 additions & 4 deletions axum/src/handler/future.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
//! Handler future types.

use super::MapFuture;
use crate::response::Response;
use axum_core::extract::Request;
use futures_util::future::Map;
use pin_project_lite::pin_project;
use std::{convert::Infallible, future::Future, pin::Pin, task::Context};
use tower::util::Oneshot;
Expand All @@ -11,7 +11,7 @@ use tower_service::Service;
opaque_future! {
/// The response future for [`IntoService`](super::IntoService).
pub type IntoServiceFuture<F> =
Map<
MapFuture<
F,
fn(Response) -> Result<Response, Infallible>,
>;
Expand All @@ -24,7 +24,7 @@ pin_project! {
S: Service<Request>,
{
#[pin]
inner: Map<Oneshot<S, Request>, fn(Result<S::Response, S::Error>) -> Response>,
inner: MapFuture<Oneshot<S, Request>, fn(Result<S::Response, S::Error>) -> Response>,
}
}

Expand All @@ -33,7 +33,7 @@ where
S: Service<Request>,
{
pub(super) fn new(
inner: Map<Oneshot<S, Request>, fn(Result<S::Response, S::Error>) -> Response>,
inner: MapFuture<Oneshot<S, Request>, fn(Result<S::Response, S::Error>) -> Response>,
) -> Self {
Self { inner }
}
Expand Down
45 changes: 40 additions & 5 deletions axum/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,14 @@ use crate::{
response::{IntoResponse, Response},
routing::IntoMakeService,
};
use std::{convert::Infallible, fmt, future::Future, marker::PhantomData, pin::Pin};
use std::{
convert::Infallible,
fmt,
future::Future,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
use tower::ServiceExt;
use tower_layer::Layer;
use tower_service::Service;
Expand Down Expand Up @@ -313,20 +320,18 @@ where
type Future = future::LayeredFuture<L::Service>;

fn call(self, req: Request, state: S) -> Self::Future {
use futures_util::future::{FutureExt, Map};

let svc = self.handler.with_state(state);
let svc = self.layer.layer(svc);

let future: Map<
let future: MapFuture<
_,
fn(
Result<
<L::Service as Service<Request>>::Response,
<L::Service as Service<Request>>::Error,
>,
) -> _,
> = svc.oneshot(req).map(|result| match result {
> = MapFuture::new(svc.oneshot(req), |result| match result {
Ok(res) => res.into_response(),
Err(err) => match err {},
});
Expand Down Expand Up @@ -383,6 +388,36 @@ where
}
}

pin_project_lite::pin_project! {
#[doc(hidden)]
pub struct MapFuture<Fut, F> {
#[pin]
future: Fut,
mapper: F
}
}

impl<T, Fut: Future, F: FnMut(Fut::Output) -> T> MapFuture<Fut, F> {
#[inline]
fn new(future: Fut, mapper: F) -> Self {
Self { future, mapper }
}
}

impl<T, Fut: Future, F: FnMut(Fut::Output) -> T> Future for MapFuture<Fut, F> {
type Output = T;

#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
let mut this = self.project();
let poll = this.future.as_mut().poll(cx);
match poll {
Poll::Ready(input) => Poll::Ready((this.mapper)(input)),
Poll::Pending => Poll::Pending,
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
6 changes: 2 additions & 4 deletions axum/src/handler/service.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::Handler;
use super::{Handler, MapFuture};
use crate::body::{Body, Bytes, HttpBody};
#[cfg(feature = "tokio")]
use crate::extract::connect_info::IntoMakeServiceWithConnectInfo;
Expand Down Expand Up @@ -165,13 +165,11 @@ where
}

fn call(&mut self, req: Request<B>) -> Self::Future {
use futures_util::future::FutureExt;

let req = req.map(Body::new);

let handler = self.handler.clone();
let future = Handler::call(handler, req, self.state.clone());
let future = future.map(Ok as _);
let future = MapFuture::new(future, Ok as _);

super::future::IntoServiceFuture::new(future)
}
Expand Down
4 changes: 2 additions & 2 deletions axum/src/middleware/from_extractor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{
extract::FromRequestParts,
response::{IntoResponse, Response},
};
use futures_util::{future::BoxFuture, ready};
use futures_lite::{future::Boxed, ready};
use http::Request;
use pin_project_lite::pin_project;
use std::{
Expand Down Expand Up @@ -254,7 +254,7 @@ pin_project! {
T: Service<Request<B>>,
{
Extracting {
future: BoxFuture<'static, (Request<B>, Result<E, E::Rejection>)>,
future: Boxed<(Request<B>, Result<E, E::Rejection>)>,
},
Call { #[pin] future: T::Future },
}
Expand Down
4 changes: 2 additions & 2 deletions axum/src/middleware/from_fn.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::response::{IntoResponse, Response};
use axum_core::extract::{FromRequest, FromRequestParts, Request};
use futures_util::future::BoxFuture;
use futures_lite::future::Boxed;
use std::{
any::type_name,
convert::Infallible,
Expand Down Expand Up @@ -361,7 +361,7 @@ impl Service<Request> for Next {

/// Response future for [`FromFn`].
pub struct ResponseFuture {
inner: BoxFuture<'static, Response>,
inner: Boxed<Response>,
}

impl Future for ResponseFuture {
Expand Down
4 changes: 2 additions & 2 deletions axum/src/middleware/map_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::body::{Body, Bytes, HttpBody};
use crate::response::{IntoResponse, Response};
use crate::BoxError;
use axum_core::extract::{FromRequest, FromRequestParts};
use futures_util::future::BoxFuture;
use futures_lite::future::Boxed;
use http::Request;
use std::{
any::type_name,
Expand Down Expand Up @@ -336,7 +336,7 @@ where

/// Response future for [`MapRequest`].
pub struct ResponseFuture {
inner: BoxFuture<'static, Response>,
inner: Boxed<Response>,
}

impl Future for ResponseFuture {
Expand Down
4 changes: 2 additions & 2 deletions axum/src/middleware/map_response.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::response::{IntoResponse, Response};
use axum_core::extract::FromRequestParts;
use futures_util::future::BoxFuture;
use futures_lite::future::Boxed;
use http::Request;
use std::{
any::type_name,
Expand Down Expand Up @@ -324,7 +324,7 @@ where

/// Response future for [`MapResponse`].
pub struct ResponseFuture {
inner: BoxFuture<'static, Response>,
inner: Boxed<Response>,
}

impl Future for ResponseFuture {
Expand Down
Loading
Loading