Skip to content

Commit

Permalink
Fix at_end_of_stream/1 for http read stream
Browse files Browse the repository at this point in the history
  • Loading branch information
aarroyoc committed Jul 6, 2024
1 parent 5f30c95 commit 08a9f7c
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 15 deletions.
4 changes: 2 additions & 2 deletions src/http.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::io::BufRead;
use bytes::{Bytes, buf::Reader};
use std::sync::{Arc, Condvar, Mutex};

use warp::http;
Expand All @@ -19,5 +19,5 @@ pub struct HttpRequestData {
pub headers: http::HeaderMap,
pub path: String,
pub query: String,
pub body: Box<dyn BufRead + Send>,
pub body: Reader<Bytes>,
}
14 changes: 11 additions & 3 deletions src/machine/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::machine::machine_indices::*;
use crate::machine::machine_state::*;
use crate::types::*;

use bytes::Buf;
pub use scryer_modular_bitfield::prelude::*;

use std::cmp::Ordering;
Expand All @@ -22,7 +23,7 @@ use std::fs::{File, OpenOptions};
use std::hash::Hash;
use std::io;
#[cfg(feature = "http")]
use std::io::BufRead;
use bytes::{buf::Reader as BufReader, Bytes};
use std::io::{Cursor, ErrorKind, Read, Seek, SeekFrom, Write};
use std::net::{Shutdown, TcpStream};
use std::ops::{Deref, DerefMut};
Expand Down Expand Up @@ -274,7 +275,7 @@ impl Write for NamedTlsStream {
#[cfg(feature = "http")]
pub struct HttpReadStream {
url: Atom,
body_reader: Box<dyn BufRead>,
body_reader: BufReader<Bytes>,
}

#[cfg(feature = "http")]
Expand Down Expand Up @@ -1115,6 +1116,13 @@ impl Stream {
}
}
}
Stream::HttpRead(stream_layout) => {
if stream_layout.stream.get_ref().body_reader.get_ref().has_remaining() {
AtEndOfStream::Not
} else {
AtEndOfStream::Past
}
}
_ => AtEndOfStream::Not,
}
}
Expand Down Expand Up @@ -1203,7 +1211,7 @@ impl Stream {
#[inline]
pub(crate) fn from_http_stream(
url: Atom,
http_stream: Box<dyn BufRead>,
http_stream: BufReader<Bytes>,
arena: &mut Arena,
) -> Self {
Stream::HttpRead(arena_alloc!(
Expand Down
14 changes: 4 additions & 10 deletions src/machine/system_calls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ use std::env;
use std::ffi::CString;
use std::fs;
use std::hash::{BuildHasher, BuildHasherDefault};
#[cfg(feature = "http")]
use std::io::BufRead;
use std::io::{ErrorKind, Read, Write};
use std::iter::{once, FromIterator};
use std::mem;
Expand Down Expand Up @@ -4352,7 +4350,7 @@ impl Machine {

let mut stream = Stream::from_http_stream(
AtomTable::build_with(&self.machine_st.atom_tbl, &address_string),
Box::new(reader),
reader,
&mut self.machine_st.arena,
);
*stream.options_mut() = StreamOptions::default();
Expand Down Expand Up @@ -4447,11 +4445,7 @@ impl Machine {
let runtime = tokio::runtime::Handle::current();
let _guard = runtime.enter();

fn get_reader(body: impl Buf + Send + 'static) -> Box<dyn BufRead + Send> {
Box::new(body.reader())
}

let serve = warp::body::aggregate()
let serve = warp::body::bytes()
.and(warp::header::optional::<u64>(
warp::http::header::CONTENT_LENGTH.as_str(),
))
Expand All @@ -4462,7 +4456,7 @@ impl Machine {
future::ready(Ok::<(String,), warp::Rejection>(("".to_string(),)))
}))
.map(
move |body,
move |body: bytes::Bytes,
content_length,
method,
headers: warp::http::HeaderMap,
Expand All @@ -4482,7 +4476,7 @@ impl Machine {
headers,
path: path.as_str().to_string(),
query,
body: get_reader(body),
body: body.reader(),
};
let response =
Arc::new((Mutex::new(false), Mutex::new(None), Condvar::new()));
Expand Down

0 comments on commit 08a9f7c

Please sign in to comment.