Skip to content

Commit

Permalink
Merge pull request #22 from MattiasBuelens/use-web-sys
Browse files Browse the repository at this point in the history
Use web-sys for raw types
  • Loading branch information
MattiasBuelens authored Oct 31, 2023
2 parents c0b3ee7 + f0ac76b commit 365fcc3
Show file tree
Hide file tree
Showing 23 changed files with 290 additions and 432 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## Unreleased

* Stop calling `byobRequest.respond(0)` on cancel ([#16](https://github.com/MattiasBuelens/wasm-streams/pull/16))
***Breaking change:** The system modules (`readable::sys`, `writable::sys` and `transform::sys`) now re-export directly from [the `web-sys` crate](https://docs.rs/web-sys/latest/web_sys/). This should make it easier to use `from_raw()`, `as_raw()` and `into_raw()`. ([#22](https://github.com/MattiasBuelens/wasm-streams/pull/22/))

## v0.3.0 (2022-10-16)

Expand Down
37 changes: 29 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,45 @@ exclude = [
crate-type = ["cdylib", "rlib"]

[dependencies]
js-sys = "^0.3.47"
wasm-bindgen = "0.2.73"
wasm-bindgen-futures = "^0.4.20"
futures-util = { version = "0.3.21", features = ["io", "sink"] }
js-sys = "^0.3.64"
wasm-bindgen = "0.2.87"
wasm-bindgen-futures = "^0.4.37"
futures-util = { version = "^0.3.28", features = ["io", "sink"] }

[dependencies.web-sys]
version = "^0.3.47"
version = "^0.3.64"
features = [
"AbortSignal",
"QueuingStrategy",
"ReadableStream",
"ReadableStreamType",
"ReadableWritablePair",
"ReadableStreamByobReader",
"ReadableStreamReaderMode",
"ReadableStreamReadResult",
"ReadableStreamByobRequest",
"ReadableStreamDefaultReader",
"ReadableByteStreamController",
"ReadableStreamGetReaderOptions",
"ReadableStreamDefaultController",
"StreamPipeOptions",
"TransformStream",
"TransformStreamDefaultController",
"Transformer",
"UnderlyingSink",
"UnderlyingSource",
"WritableStream",
"WritableStreamDefaultController",
"WritableStreamDefaultWriter",
]

[dev-dependencies]
wasm-bindgen-test = "0.3.20"
wasm-bindgen-test = "0.3.37"
tokio = { version = "^1", features = ["macros", "rt"] }
pin-project = "^1.0.6"
pin-project = "^1.1.3"

[dev-dependencies.web-sys]
version = "^0.3.47"
version = "^0.3.64"
features = [
"console",
"AbortSignal",
Expand Down
2 changes: 1 addition & 1 deletion examples/fetch_as_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

// Get the response's body as a JS ReadableStream
let raw_body = resp.body().unwrap_throw();
let body = ReadableStream::from_raw(raw_body.dyn_into().unwrap_throw());
let body = ReadableStream::from_raw(raw_body);

// Convert the JS ReadableStream to a Rust stream
let mut stream = body.into_stream();
Expand Down
6 changes: 3 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
//! [`WritableStream`](crate::WritableStream) and [`TransformStream`](crate::TransformStream).
//! It also supports converting from and into [`Stream`]s and [`Sink`]s from the [futures] crate.
//!
//! [`Stream`]: https://docs.rs/futures/0.3.18/futures/stream/trait.Stream.html
//! [`Sink`]: https://docs.rs/futures/0.3.18/futures/sink/trait.Sink.html
//! [futures]: https://docs.rs/futures/0.3.18/futures/index.html
//! [`Stream`]: https://docs.rs/futures/0.3.28/futures/stream/trait.Stream.html
//! [`Sink`]: https://docs.rs/futures/0.3.28/futures/sink/trait.Sink.html
//! [futures]: https://docs.rs/futures/0.3.28/futures/index.html

pub use readable::ReadableStream;
pub use transform::TransformStream;
Expand Down
37 changes: 27 additions & 10 deletions src/queuing_strategy/mod.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,38 @@
use js_sys::Object;
use wasm_bindgen::prelude::*;

#[wasm_bindgen]
#[derive(Debug)]
pub(crate) struct QueuingStrategy {
high_water_mark: f64,
extern "C" {
#[wasm_bindgen(extends = Object, js_name = QueuingStrategy)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) type QueuingStrategy;

#[wasm_bindgen(method, getter, js_name = highWaterMark)]
pub fn high_water_mark(this: &QueuingStrategy) -> f64;

#[wasm_bindgen(method, setter, js_name = highWaterMark)]
pub fn set_high_water_mark(this: &QueuingStrategy, value: f64);
}

impl QueuingStrategy {
pub fn new(high_water_mark: f64) -> Self {
Self { high_water_mark }
let strategy = Object::new().unchecked_into::<QueuingStrategy>();
strategy.set_high_water_mark(high_water_mark);
strategy
}
}

#[wasm_bindgen]
impl QueuingStrategy {
#[wasm_bindgen(getter, js_name = highWaterMark)]
pub fn high_water_mark(&self) -> f64 {
self.high_water_mark
#[cfg(web_sys_unstable_apis)]
pub fn from_raw(raw: web_sys::QueuingStrategy) -> Self {
raw.unchecked_into()
}

#[cfg(web_sys_unstable_apis)]
pub fn as_raw(&self) -> &web_sys::QueuingStrategy {
self.unchecked_ref()
}

#[cfg(web_sys_unstable_apis)]
pub fn into_raw(self) -> web_sys::QueuingStrategy {
self.unchecked_into()
}
}
50 changes: 27 additions & 23 deletions src/readable/byob_reader.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::marker::PhantomData;

use js_sys::Uint8Array;
use wasm_bindgen::{throw_val, JsCast, JsValue};
use js_sys::{Object, Uint8Array};
use wasm_bindgen::{JsCast, JsValue};
use wasm_bindgen_futures::JsFuture;

use crate::util::{checked_cast_to_usize, clamp_to_u32, promise_to_void_future};
Expand All @@ -23,9 +23,14 @@ pub struct ReadableStreamBYOBReader<'stream> {
impl<'stream> ReadableStreamBYOBReader<'stream> {
pub(crate) fn new(stream: &mut ReadableStream) -> Result<Self, js_sys::Error> {
Ok(Self {
raw: stream.as_raw().get_reader_with_options(
sys::ReadableStreamGetReaderOptions::new(sys::ReadableStreamReaderMode::BYOB),
)?,
raw: stream
.as_raw()
.unchecked_ref::<sys::ReadableStreamExt>()
.try_get_reader_with_options(
sys::ReadableStreamGetReaderOptions::new()
.mode(sys::ReadableStreamReaderMode::Byob),
)?
.unchecked_into(),
_stream: PhantomData,
})
}
Expand Down Expand Up @@ -109,20 +114,18 @@ impl<'stream> ReadableStreamBYOBReader<'stream> {
let buffer_len = buffer.byte_length();
// Limit view to destination slice's length.
let dst_len = clamp_to_u32(dst.len());
let view = buffer
.subarray(0, dst_len)
.unchecked_into::<sys::ArrayBufferView>();
let view = buffer.subarray(0, dst_len).unchecked_into::<Object>();
// Read into view. This transfers `buffer.buffer()`.
let promise = self.as_raw().read(&view);
let js_value = JsFuture::from(promise).await?;
let result = sys::ReadableStreamBYOBReadResult::from(js_value);
let filled_view = match result.value() {
Some(view) => view,
None => {
// No new view was returned. The stream must have been canceled.
assert!(result.is_done());
return Ok((0, None));
}
let promise = self.as_raw().read_with_array_buffer_view(&view);
let js_result = JsFuture::from(promise).await?;
let result = sys::ReadableStreamReadResult::from(js_result);
let js_value = result.value();
let filled_view = if js_value.is_undefined() {
// No new view was returned. The stream must have been canceled.
assert!(result.is_done());
return Ok((0, None));
} else {
js_value.unchecked_into::<Uint8Array>()
};
let filled_len = checked_cast_to_usize(filled_view.byte_length());
debug_assert!(filled_len <= dst.len());
Expand Down Expand Up @@ -157,9 +160,7 @@ impl<'stream> ReadableStreamBYOBReader<'stream> {
}

fn release_lock_mut(&mut self) {
self.as_raw()
.release_lock()
.unwrap_or_else(|error| throw_val(error.into()))
self.as_raw().release_lock()
}

/// Try to [release](https://streams.spec.whatwg.org/#release-a-lock) this reader's lock on the
Expand All @@ -175,7 +176,10 @@ impl<'stream> ReadableStreamBYOBReader<'stream> {
/// return an error and leave the reader locked to the stream.
#[inline]
pub fn try_release_lock(self) -> Result<(), (js_sys::Error, Self)> {
self.as_raw().release_lock().map_err(|error| (error, self))
self.as_raw()
.unchecked_ref::<sys::ReadableStreamReaderExt>()
.try_release_lock()
.map_err(|err| (err, self))
}

/// Converts this `ReadableStreamBYOBReader` into an [`AsyncRead`].
Expand All @@ -185,7 +189,7 @@ impl<'stream> ReadableStreamBYOBReader<'stream> {
/// still usable. This allows reading only a few bytes from the `AsyncRead`, while still
/// allowing another reader to read the remaining bytes later on.
///
/// [`AsyncRead`]: https://docs.rs/futures/0.3.18/futures/io/trait.AsyncRead.html
/// [`AsyncRead`]: https://docs.rs/futures/0.3.28/futures/io/trait.AsyncRead.html
#[inline]
pub fn into_async_read(self) -> IntoAsyncRead<'stream> {
IntoAsyncRead::new(self, false)
Expand Down
24 changes: 15 additions & 9 deletions src/readable/default_reader.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::marker::PhantomData;

use wasm_bindgen::{throw_val, JsValue};
use wasm_bindgen::JsCast;
use wasm_bindgen::JsValue;
use wasm_bindgen_futures::JsFuture;

use crate::util::promise_to_void_future;
Expand All @@ -22,7 +23,11 @@ pub struct ReadableStreamDefaultReader<'stream> {
impl<'stream> ReadableStreamDefaultReader<'stream> {
pub(crate) fn new(stream: &mut ReadableStream) -> Result<Self, js_sys::Error> {
Ok(Self {
raw: stream.as_raw().get_reader()?,
raw: stream
.as_raw()
.unchecked_ref::<sys::ReadableStreamExt>()
.try_get_reader()?
.unchecked_into(),
_stream: PhantomData,
})
}
Expand Down Expand Up @@ -65,8 +70,8 @@ impl<'stream> ReadableStreamDefaultReader<'stream> {
/// * If the stream encounters an `error`, this returns `Err(error)`.
pub async fn read(&mut self) -> Result<Option<JsValue>, JsValue> {
let promise = self.as_raw().read();
let js_value = JsFuture::from(promise).await?;
let result = sys::ReadableStreamDefaultReadResult::from(js_value);
let js_result = JsFuture::from(promise).await?;
let result = sys::ReadableStreamReadResult::from(js_result);
if result.is_done() {
Ok(None)
} else {
Expand All @@ -91,9 +96,7 @@ impl<'stream> ReadableStreamDefaultReader<'stream> {
}

fn release_lock_mut(&mut self) {
self.as_raw()
.release_lock()
.unwrap_or_else(|error| throw_val(error.into()))
self.as_raw().release_lock()
}

/// Try to [release](https://streams.spec.whatwg.org/#release-a-lock) this reader's lock on the
Expand All @@ -109,7 +112,10 @@ impl<'stream> ReadableStreamDefaultReader<'stream> {
/// return an error and leave the reader locked to the stream.
#[inline]
pub fn try_release_lock(self) -> Result<(), (js_sys::Error, Self)> {
self.as_raw().release_lock().map_err(|error| (error, self))
self.as_raw()
.unchecked_ref::<sys::ReadableStreamReaderExt>()
.try_release_lock()
.map_err(|err| (err, self))
}

/// Converts this `ReadableStreamDefaultReader` into a [`Stream`].
Expand All @@ -119,7 +125,7 @@ impl<'stream> ReadableStreamDefaultReader<'stream> {
/// usable. This allows reading only a few chunks from the `Stream`, while still allowing
/// another reader to read the remaining chunks later on.
///
/// [`Stream`]: https://docs.rs/futures/0.3.18/futures/stream/trait.Stream.html
/// [`Stream`]: https://docs.rs/futures/0.3.28/futures/stream/trait.Stream.html
#[inline]
pub fn into_stream(self) -> IntoStream<'stream> {
IntoStream::new(self, false)
Expand Down
17 changes: 8 additions & 9 deletions src/readable/into_async_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ use core::task::{Context, Poll};
use futures_util::io::{AsyncRead, Error};
use futures_util::ready;
use futures_util::FutureExt;
use js_sys::Uint8Array;
use js_sys::{Object, Uint8Array};
use wasm_bindgen::prelude::*;
use wasm_bindgen::JsCast;
use wasm_bindgen_futures::JsFuture;

use crate::util::{checked_cast_to_usize, clamp_to_u32, js_to_io_error};

use super::sys::{ArrayBufferView, ReadableStreamBYOBReadResult};
use super::sys::ReadableStreamReadResult;
use super::ReadableStreamBYOBReader;

/// An [`AsyncRead`] for the [`into_async_read`](super::ReadableStream::into_async_read) method.
Expand All @@ -20,7 +20,7 @@ use super::ReadableStreamBYOBReader;
/// When this `AsyncRead` is dropped, it also drops its reader which in turn
/// [releases its lock](https://streams.spec.whatwg.org/#release-a-lock).
///
/// [`AsyncRead`]: https://docs.rs/futures/0.3.18/futures/io/trait.AsyncRead.html
/// [`AsyncRead`]: https://docs.rs/futures/0.3.28/futures/io/trait.AsyncRead.html
#[must_use = "readers do nothing unless polled"]
#[derive(Debug)]
pub struct IntoAsyncRead<'reader> {
Expand Down Expand Up @@ -84,13 +84,12 @@ impl<'reader> AsyncRead for IntoAsyncRead<'reader> {
_ => Uint8Array::new_with_length(buf_len),
};
// Limit to output buffer size
let buffer = buffer
.subarray(0, buf_len)
.unchecked_into::<ArrayBufferView>();
let buffer = buffer.subarray(0, buf_len).unchecked_into::<Object>();
match &self.reader {
Some(reader) => {
// Read into internal buffer and store its future
let fut = JsFuture::from(reader.as_raw().read(&buffer));
let fut =
JsFuture::from(reader.as_raw().read_with_array_buffer_view(&buffer));
self.fut.insert(fut)
}
None => {
Expand All @@ -108,14 +107,14 @@ impl<'reader> AsyncRead for IntoAsyncRead<'reader> {
// Read completed
Poll::Ready(match js_result {
Ok(js_value) => {
let result = ReadableStreamBYOBReadResult::from(js_value);
let result = ReadableStreamReadResult::from(js_value);
if result.is_done() {
// End of stream
self.discard_reader();
Ok(0)
} else {
// Cannot be canceled, so view must exist
let filled_view = result.value().unwrap_throw();
let filled_view = result.value().unchecked_into::<Uint8Array>();
// Copy bytes to output buffer
let filled_len = checked_cast_to_usize(filled_view.byte_length());
debug_assert!(filled_len <= buf.len());
Expand Down
2 changes: 1 addition & 1 deletion src/readable/into_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use super::ReadableStreamDefaultReader;
/// When this `Stream` is dropped, it also drops its reader which in turn
/// [releases its lock](https://streams.spec.whatwg.org/#release-a-lock).
///
/// [`Stream`]: https://docs.rs/futures/0.3.18/futures/stream/trait.Stream.html
/// [`Stream`]: https://docs.rs/futures/0.3.28/futures/stream/trait.Stream.html
#[must_use = "streams do nothing unless polled"]
#[derive(Debug)]
pub struct IntoStream<'reader> {
Expand Down
6 changes: 3 additions & 3 deletions src/readable/into_underlying_byte_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl Inner {
// We set autoAllocateChunkSize, so there should always be a BYOB request.
let request = controller.byob_request().unwrap_throw();
// Resize the buffer to fit the BYOB request.
let request_view = request.view().unwrap_throw();
let request_view = request.view().unwrap_throw().unchecked_into::<Uint8Array>();
let request_len = clamp_to_usize(request_view.byte_length());
if self.buffer.len() < request_len {
self.buffer.resize(request_len, 0);
Expand All @@ -114,7 +114,7 @@ impl Inner {
// The stream has closed, drop it.
self.discard();
controller.close()?;
request.respond(0)?;
request.respond_with_u32(0)?;
}
Ok(bytes_read) => {
// Copy read bytes from buffer to BYOB request view
Expand All @@ -127,7 +127,7 @@ impl Inner {
);
dest.copy_from(&self.buffer[0..bytes_read]);
// Respond to BYOB request
request.respond(bytes_read_u32)?;
request.respond_with_u32(bytes_read_u32)?;
}
Err(err) => {
// The stream encountered an error, drop it.
Expand Down
2 changes: 1 addition & 1 deletion src/readable/into_underlying_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl Inner {
// after the stream has closed or encountered an error.
let stream = self.stream.as_mut().unwrap_throw();
match stream.try_next().await {
Ok(Some(chunk)) => controller.enqueue(&chunk)?,
Ok(Some(chunk)) => controller.enqueue_with_chunk(&chunk)?,
Ok(None) => {
// The stream has closed, drop it.
self.stream = None;
Expand Down
Loading

0 comments on commit 365fcc3

Please sign in to comment.