diff --git a/src/http.rs b/src/http.rs index cb1f2d9c7..f301ef645 100644 --- a/src/http.rs +++ b/src/http.rs @@ -1,4 +1,4 @@ -use std::io::BufRead; +use bytes::{Bytes, buf::Reader}; use std::sync::{Arc, Condvar, Mutex}; use warp::http; @@ -19,5 +19,5 @@ pub struct HttpRequestData { pub headers: http::HeaderMap, pub path: String, pub query: String, - pub body: Box, + pub body: Reader, } diff --git a/src/machine/streams.rs b/src/machine/streams.rs index be5591cf1..ae1fcf309 100644 --- a/src/machine/streams.rs +++ b/src/machine/streams.rs @@ -12,6 +12,7 @@ use crate::machine::machine_indices::*; use crate::machine::machine_state::*; use crate::types::*; +use bytes::Buf; pub use scryer_modular_bitfield::prelude::*; use std::cmp::Ordering; @@ -22,7 +23,7 @@ use std::fs::{File, OpenOptions}; use std::hash::Hash; use std::io; #[cfg(feature = "http")] -use std::io::BufRead; +use bytes::{buf::Reader as BufReader, Bytes}; use std::io::{Cursor, ErrorKind, Read, Seek, SeekFrom, Write}; use std::net::{Shutdown, TcpStream}; use std::ops::{Deref, DerefMut}; @@ -274,7 +275,7 @@ impl Write for NamedTlsStream { #[cfg(feature = "http")] pub struct HttpReadStream { url: Atom, - body_reader: Box, + body_reader: BufReader, } #[cfg(feature = "http")] @@ -1115,6 +1116,13 @@ impl Stream { } } } + Stream::HttpRead(stream_layout) => { + if stream_layout.stream.get_ref().body_reader.get_ref().has_remaining() { + AtEndOfStream::Not + } else { + AtEndOfStream::Past + } + } _ => AtEndOfStream::Not, } } @@ -1203,7 +1211,7 @@ impl Stream { #[inline] pub(crate) fn from_http_stream( url: Atom, - http_stream: Box, + http_stream: BufReader, arena: &mut Arena, ) -> Self { Stream::HttpRead(arena_alloc!( diff --git a/src/machine/system_calls.rs b/src/machine/system_calls.rs index 80b579e79..9b11a6e33 100644 --- a/src/machine/system_calls.rs +++ b/src/machine/system_calls.rs @@ -50,8 +50,6 @@ use std::env; use std::ffi::CString; use std::fs; use std::hash::{BuildHasher, BuildHasherDefault}; -#[cfg(feature = "http")] -use std::io::BufRead; use std::io::{ErrorKind, Read, Write}; use std::iter::{once, FromIterator}; use std::mem; @@ -4352,7 +4350,7 @@ impl Machine { let mut stream = Stream::from_http_stream( AtomTable::build_with(&self.machine_st.atom_tbl, &address_string), - Box::new(reader), + reader, &mut self.machine_st.arena, ); *stream.options_mut() = StreamOptions::default(); @@ -4447,11 +4445,7 @@ impl Machine { let runtime = tokio::runtime::Handle::current(); let _guard = runtime.enter(); - fn get_reader(body: impl Buf + Send + 'static) -> Box { - Box::new(body.reader()) - } - - let serve = warp::body::aggregate() + let serve = warp::body::bytes() .and(warp::header::optional::( warp::http::header::CONTENT_LENGTH.as_str(), )) @@ -4462,7 +4456,7 @@ impl Machine { future::ready(Ok::<(String,), warp::Rejection>(("".to_string(),))) })) .map( - move |body, + move |body: bytes::Bytes, content_length, method, headers: warp::http::HeaderMap, @@ -4482,7 +4476,7 @@ impl Machine { headers, path: path.as_str().to_string(), query, - body: get_reader(body), + body: body.reader(), }; let response = Arc::new((Mutex::new(false), Mutex::new(None), Condvar::new()));