diff --git a/Cargo.lock b/Cargo.lock index 9c08eda57..e86384a8f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -361,6 +361,28 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", +] + [[package]] name = "async-task" version = "4.7.1" @@ -1024,6 +1046,7 @@ dependencies = [ "console_error_panic_hook", "console_log", "gloo-net", + "gloo-timers", "log", "serde", "wasm-bindgen", @@ -3296,6 +3319,23 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "streams" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-stream", + "console_error_panic_hook", + "console_log", + "futures", + "getrandom", + "gloo-timers", + "log", + "rand", + "web-sys", + "xilem_web", +] + [[package]] name = "strict-num" version = "0.1.1" diff --git a/Cargo.toml b/Cargo.toml index a9fb0b033..b196072bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ members = [ "xilem_web/web_examples/mathml_svg", "xilem_web/web_examples/raw_dom_access", "xilem_web/web_examples/spawn_tasks", + "xilem_web/web_examples/streams", "xilem_web/web_examples/svgtoy", "xilem_web/web_examples/svgdraw", ] diff --git a/xilem_web/src/concurrent/memoized_await.rs b/xilem_web/src/concurrent/memoized_await.rs index dc63a69f5..dbb540fdc 100644 --- a/xilem_web/src/concurrent/memoized_await.rs +++ b/xilem_web/src/concurrent/memoized_await.rs @@ -1,16 +1,30 @@ // Copyright 2024 the Xilem Authors and the Druid Authors // SPDX-License-Identifier: Apache-2.0 +use std::{fmt, future::Future, marker::PhantomData, rc::Rc}; + +use futures::{Stream, StreamExt}; +use wasm_bindgen::{closure::Closure, JsCast, UnwrapThrowExt}; +use wasm_bindgen_futures::spawn_local; + use crate::{ core::{MessageResult, Mut, NoElement, View, ViewId, ViewMarker, ViewPathTracker}, - DynMessage, OptionalAction, ViewCtx, + DynMessage, MessageThunk, OptionalAction, ViewCtx, }; -use std::{future::Future, marker::PhantomData}; -use wasm_bindgen::{closure::Closure, JsCast, UnwrapThrowExt}; -use wasm_bindgen_futures::spawn_local; -/// Await a future returned by `init_future` invoked with the argument `data`, `callback` is called with the output of the future. `init_future` will be invoked again, when `data` changes. Use [`memoized_await`] for construction of this [`View`] -pub struct MemoizedAwait { +/// Await a future returned by `init_future` invoked with the argument `data`, `callback` is called with the output of the future. +/// `init_future` will be invoked again, when `data` changes. Use [`memoized_await`] for construction of this [`View`] +pub struct MemoizedAwait( + MemoizedFuture, +); + +/// Await a stream returned by `init_stream` invoked with the argument `data`, `callback` is called with the items of the stream. +/// `init_stream` will be invoked again, when `data` changes. Use [`memoized_stream`] for construction of this [`View`] +pub struct MemoizedStream( + MemoizedFuture, +); + +struct MemoizedFuture { init_future: InitFuture, data: Data, callback: Callback, @@ -22,16 +36,16 @@ pub struct MemoizedAwait impl MemoizedAwait where - FOut: std::fmt::Debug + 'static, + FOut: fmt::Debug + 'static, F: Future + 'static, - InitFuture: Fn(&Data) -> F, + InitFuture: Fn(&mut State, &Data) -> F, { /// Debounce the `init_future` function, when `data` updates, /// when `reset_debounce_on_update == false` then this throttles updates each `milliseconds` /// /// The default for this is `0` pub fn debounce_ms(mut self, milliseconds: usize) -> Self { - self.debounce_ms = milliseconds; + self.0.debounce_ms = milliseconds; self } @@ -40,19 +54,67 @@ where /// /// The default for this is `true` pub fn reset_debounce_on_update(mut self, reset: bool) -> Self { - self.reset_debounce_on_update = reset; + self.0.reset_debounce_on_update = reset; self } +} - fn init_future(&self, ctx: &mut ViewCtx, generation: u64) { - ctx.with_id(ViewId::new(generation), |ctx| { - let thunk = ctx.message_thunk(); - let future = (self.init_future)(&self.data); - spawn_local(async move { - thunk.push_message(MemoizedAwaitMessage::::Output(future.await)); - }); - }); +impl + MemoizedStream +where + StreamItem: fmt::Debug + 'static, + F: Stream + 'static, + InitStream: Fn(&mut State, &Data) -> F, +{ + /// Debounce the `init_stream` function, when `data` updates, + /// when `reset_debounce_on_update == false` then this throttles updates each `milliseconds` + /// + /// The default for this is `0` + pub fn debounce_ms(mut self, milliseconds: usize) -> Self { + self.0.debounce_ms = milliseconds; + self } + + /// When `reset` is `true`, everytime `data` updates, the debounce timeout is cleared until `init_stream` is invoked. + /// This is only effective when `debounce > 0` + /// + /// The default for this is `true` + pub fn reset_debounce_on_update(mut self, reset: bool) -> Self { + self.0.reset_debounce_on_update = reset; + self + } +} + +fn init_future( + m: &MemoizedFuture, + thunk: Rc, + state: &mut State, +) where + InitFuture: Fn(&mut State, &Data) -> F + 'static, + FOut: fmt::Debug + 'static, + F: Future + 'static, +{ + let future = (m.init_future)(state, &m.data); + spawn_local(async move { + thunk.push_message(MemoizedFutureMessage::::Output(future.await)); + }); +} + +fn init_stream( + m: &MemoizedFuture, + thunk: Rc, + state: &mut State, +) where + InitStream: Fn(&mut State, &Data) -> F + 'static, + StreamItem: fmt::Debug + 'static, + F: Stream + 'static, +{ + let mut stream = Box::pin((m.init_future)(state, &m.data)); + spawn_local(async move { + while let Some(item) = stream.next().await { + thunk.push_message(MemoizedFutureMessage::::Output(item)); + } + }); } /// Await a future returned by `init_future` invoked with the argument `data`, `callback` is called with the output of the resolved future. `init_future` will be invoked again, when `data` changes. @@ -69,7 +131,7 @@ where /// div(*state), /// memoized_await( /// 10, -/// |count| std::future::ready(*count), +/// |_, count| std::future::ready(*count), /// |state, output| *state = output, /// ) /// ) @@ -84,23 +146,80 @@ where State: 'static, Action: 'static, Data: PartialEq + 'static, - FOut: std::fmt::Debug + 'static, + FOut: fmt::Debug + 'static, F: Future + 'static, - InitFuture: Fn(&Data) -> F + 'static, + InitFuture: Fn(&mut State, &Data) -> F + 'static, OA: OptionalAction + 'static, Callback: Fn(&mut State, FOut) -> OA + 'static, { - MemoizedAwait { + MemoizedAwait(MemoizedFuture { init_future, data, callback, debounce_ms: 0, reset_debounce_on_update: true, phantom: PhantomData, - } + }) +} + +/// Await a stream returned by `init_stream` invoked with the argument `data`, `callback` is called with the items of the stream. +/// `init_stream` will be invoked again, when `data` changes. +/// +/// The behavior of the `init_stream` invocation by changes to `data` can be customized by [`debounce_ms`](`MemoizedStream::debounce_ms`) and [`reset_debounce_on_update`](`MemoizedStream::reset_debounce_on_update`). +/// +/// # Examples +/// +/// ```ignore +/// use gloo_timers::future::TimeoutFuture; +/// use async_stream::stream; +/// use xilem_web::{core::fork, concurrent::memoized_stream, elements::html, interfaces::Element}; +/// +/// fn app_logic(state: &mut Vec) -> impl Element> { +/// fork( +/// html::div(format!("{state:?}")), +/// memoized_stream( +/// 10, +/// |_,n| { +/// let range = 0..*n; +/// stream! { +/// for i in range { +/// TimeoutFuture::new(500).await; +/// yield i; +/// } +/// } +/// }, +/// |state: &mut Vec, item: usize| { +/// state.push(item); +/// } +/// ) +/// ) +/// } +/// ``` +pub fn memoized_stream( + data: Data, + init_future: InitStream, + callback: Callback, +) -> MemoizedStream +where + State: 'static, + Action: 'static, + Data: PartialEq + 'static, + StreamItem: fmt::Debug + 'static, + F: Stream + 'static, + InitStream: Fn(&mut State, &Data) -> F + 'static, + OA: OptionalAction + 'static, + Callback: Fn(&mut State, StreamItem) -> OA + 'static, +{ + MemoizedStream(MemoizedFuture { + init_future, + data, + callback, + debounce_ms: 0, + reset_debounce_on_update: true, + phantom: PhantomData, + }) } -#[derive(Default)] #[allow(unnameable_types)] // reason: Implementation detail, public because of trait visibility rules pub struct MemoizedAwaitState { generation: u64, @@ -109,9 +228,20 @@ pub struct MemoizedAwaitState { schedule_update_fn: Option>, schedule_update_timeout_handle: Option, update: bool, + thunk: Rc, } impl MemoizedAwaitState { + fn new(thunk: MessageThunk) -> Self { + Self { + generation: 0, + schedule_update: false, + schedule_update_fn: None, + schedule_update_timeout_handle: None, + update: false, + thunk: Rc::new(thunk), + } + } fn clear_update_timeout(&mut self) { if let Some(handle) = self.schedule_update_timeout_handle { web_sys::window() @@ -122,16 +252,18 @@ impl MemoizedAwaitState { self.schedule_update_fn = None; } - fn reset_debounce_timeout_and_schedule_update( + fn reset_debounce_timeout_and_schedule_update( &mut self, ctx: &mut ViewCtx, debounce_duration: usize, - ) { + ) where + FOut: fmt::Debug + 'static, + { ctx.with_id(ViewId::new(self.generation), |ctx| { self.clear_update_timeout(); let thunk = ctx.message_thunk(); let schedule_update_fn = Closure::new(move || { - thunk.push_message(MemoizedAwaitMessage::::ScheduleUpdate); + thunk.push_message(MemoizedFutureMessage::::ScheduleUpdate); }); let handle = web_sys::window() .unwrap_throw() @@ -145,26 +277,43 @@ impl MemoizedAwaitState { self.schedule_update = true; }); } + + fn request_init(&mut self, ctx: &mut ViewCtx) + where + FOut: fmt::Debug + 'static, + { + ctx.with_id(ViewId::new(self.generation), |ctx| { + self.thunk = Rc::new(ctx.message_thunk()); + self.thunk + .enqueue_message(MemoizedFutureMessage::::RequestInit); + }); + } } #[derive(Debug)] -enum MemoizedAwaitMessage { +enum MemoizedFutureMessage { Output(Output), ScheduleUpdate, + RequestInit, } impl ViewMarker for MemoizedAwait { } +impl ViewMarker + for MemoizedStream +{ +} + impl View for MemoizedAwait where State: 'static, Action: 'static, OA: OptionalAction + 'static, - InitFuture: Fn(&Data) -> F + 'static, - FOut: std::fmt::Debug + 'static, + InitFuture: Fn(&mut State, &Data) -> F + 'static, + FOut: fmt::Debug + 'static, Data: PartialEq + 'static, F: Future + 'static, CB: Fn(&mut State, FOut) -> OA + 'static, @@ -174,15 +323,54 @@ where type ViewState = MemoizedAwaitState; fn build(&self, ctx: &mut ViewCtx) -> (Self::Element, Self::ViewState) { - let mut state = MemoizedAwaitState::default(); + self.0.build(ctx) + } - if self.debounce_ms > 0 { - state.reset_debounce_timeout_and_schedule_update::(ctx, self.debounce_ms); - } else { - self.init_future(ctx, state.generation); - } + fn rebuild( + &self, + prev: &Self, + view_state: &mut Self::ViewState, + ctx: &mut ViewCtx, + (): Mut, + ) { + self.0.rebuild(&prev.0, view_state, ctx); + } - (NoElement, state) + fn teardown(&self, state: &mut Self::ViewState, _: &mut ViewCtx, (): Mut) { + self.0.teardown(state); + } + + fn message( + &self, + view_state: &mut Self::ViewState, + id_path: &[ViewId], + message: DynMessage, + app_state: &mut State, + ) -> MessageResult { + self.0 + .message(view_state, id_path, message, app_state, init_future) + } +} + +impl + View + for MemoizedStream +where + State: 'static, + Action: 'static, + OA: OptionalAction + 'static, + InitStream: Fn(&mut State, &Data) -> F + 'static, + StreamItem: fmt::Debug + 'static, + Data: PartialEq + 'static, + F: Stream + 'static, + CB: Fn(&mut State, StreamItem) -> OA + 'static, +{ + type Element = NoElement; + + type ViewState = MemoizedAwaitState; + + fn build(&self, ctx: &mut ViewCtx) -> (Self::Element, Self::ViewState) { + self.0.build(ctx) } fn rebuild( @@ -192,6 +380,51 @@ where ctx: &mut ViewCtx, (): Mut, ) { + self.0.rebuild(&prev.0, view_state, ctx); + } + + fn teardown(&self, state: &mut Self::ViewState, _: &mut ViewCtx, (): Mut) { + self.0.teardown(state); + } + + fn message( + &self, + view_state: &mut Self::ViewState, + id_path: &[ViewId], + message: DynMessage, + app_state: &mut State, + ) -> MessageResult { + self.0 + .message(view_state, id_path, message, app_state, init_stream) + } +} + +impl + MemoizedFuture +where + State: 'static, + Action: 'static, + OA: OptionalAction + 'static, + InitFuture: Fn(&mut State, &Data) -> F + 'static, + FOut: fmt::Debug + 'static, + Data: PartialEq + 'static, + F: 'static, + CB: Fn(&mut State, FOut) -> OA + 'static, +{ + fn build(&self, ctx: &mut ViewCtx) -> (NoElement, MemoizedAwaitState) { + let thunk = ctx.message_thunk(); + let mut state = MemoizedAwaitState::new(thunk); + + if self.debounce_ms > 0 { + state.reset_debounce_timeout_and_schedule_update::(ctx, self.debounce_ms); + } else { + state.request_init::(ctx); + } + + (NoElement, state) + } + + fn rebuild(&self, prev: &Self, view_state: &mut MemoizedAwaitState, ctx: &mut ViewCtx) { let debounce_has_changed_and_update_is_scheduled = view_state.schedule_update && (prev.reset_debounce_on_update != self.reset_debounce_on_update || prev.debounce_ms != self.debounce_ms); @@ -221,36 +454,44 @@ where // no debounce view_state.generation += 1; view_state.update = false; - self.init_future(ctx, view_state.generation); + view_state.request_init::(ctx); } } } - fn teardown(&self, state: &mut Self::ViewState, _: &mut ViewCtx, (): Mut) { + fn teardown(&self, state: &mut MemoizedAwaitState) { state.clear_update_timeout(); } - fn message( + fn message( &self, - view_state: &mut Self::ViewState, + view_state: &mut MemoizedAwaitState, id_path: &[ViewId], message: DynMessage, app_state: &mut State, - ) -> MessageResult { + init_future: I, + ) -> MessageResult + where + I: Fn(&Self, Rc, &mut State), + { assert_eq!(id_path.len(), 1); if id_path[0].routing_id() == view_state.generation { match *message.downcast().unwrap_throw() { - MemoizedAwaitMessage::Output(future_output) => { + MemoizedFutureMessage::Output(future_output) => { match (self.callback)(app_state, future_output).action() { Some(action) => MessageResult::Action(action), None => MessageResult::Nop, } } - MemoizedAwaitMessage::ScheduleUpdate => { + MemoizedFutureMessage::ScheduleUpdate => { view_state.update = true; view_state.schedule_update = false; MessageResult::RequestRebuild } + MemoizedFutureMessage::RequestInit => { + init_future(self, Rc::clone(&view_state.thunk), app_state); + MessageResult::RequestRebuild + } } } else { MessageResult::Stale(message) diff --git a/xilem_web/src/concurrent/mod.rs b/xilem_web/src/concurrent/mod.rs index d8708cd7d..d306aa4f1 100644 --- a/xilem_web/src/concurrent/mod.rs +++ b/xilem_web/src/concurrent/mod.rs @@ -10,4 +10,4 @@ mod interval; pub use interval::{interval, Interval}; mod memoized_await; -pub use memoized_await::{memoized_await, MemoizedAwait}; +pub use memoized_await::{memoized_await, memoized_stream, MemoizedAwait, MemoizedStream}; diff --git a/xilem_web/web_examples/fetch/Cargo.toml b/xilem_web/web_examples/fetch/Cargo.toml index a56c22d8c..2e3a3fdf6 100644 --- a/xilem_web/web_examples/fetch/Cargo.toml +++ b/xilem_web/web_examples/fetch/Cargo.toml @@ -13,6 +13,7 @@ workspace = true console_error_panic_hook = "0.1" console_log = "1" gloo-net = { version = "0.6.0", default-features = false, features = ["http", "json", "serde"] } +gloo-timers = { version = "0.3.0", features = ["futures"] } log = "0.4" serde = { version = "1", features = ["derive"] } web-sys = { version = "0.3.69", features = ["Event", "HtmlInputElement"] } diff --git a/xilem_web/web_examples/fetch/src/main.rs b/xilem_web/web_examples/fetch/src/main.rs index d5153702f..aead47c44 100644 --- a/xilem_web/web_examples/fetch/src/main.rs +++ b/xilem_web/web_examples/fetch/src/main.rs @@ -8,6 +8,7 @@ #![allow(clippy::wildcard_imports, reason = "HTML elements are an exception")] use gloo_net::http::Request; +use gloo_timers::future::TimeoutFuture; use serde::{Deserialize, Serialize}; use wasm_bindgen::{JsCast, UnwrapThrowExt}; use xilem_web::{ @@ -56,13 +57,13 @@ impl AppState { FetchState::Finished } else if self.cats_to_fetch >= TOO_MANY_CATS { FetchState::TooMany + } else if self.cats_to_fetch > 0 && self.cats_are_being_fetched { + FetchState::Fetching } else if self.debounce_in_ms > 0 && self.cats_to_fetch > 0 && self.reset_debounce_on_update { FetchState::Debounced } else if self.debounce_in_ms > 0 && self.cats_to_fetch > 0 { FetchState::Throttled - } else if self.cats_to_fetch > 0 && self.cats_are_being_fetched { - FetchState::Fetching } else { FetchState::Initial } @@ -80,6 +81,10 @@ enum FetchState { async fn fetch_cats(count: usize) -> Result, gloo_net::Error> { log::debug!("Fetch {count} cats"); + + // Simulate a delay + TimeoutFuture::new(1_000).await; + if count == 0 { return Ok(Vec::new()); } @@ -119,7 +124,11 @@ fn app_logic(state: &mut AppState) -> impl HtmlDivElement { // or debounced otherwise: // As long as updates are happening within `debounce_in_ms` ms the first closure is not invoked, and a debounce timeout which runs `debounce_in_ms` is reset. state.cats_to_fetch, - |count| fetch_cats(*count), + |state: &mut AppState, count| { + log::debug!("Create new future to fetch"); + state.cats_are_being_fetched = true; + fetch_cats(*count) + }, |state: &mut AppState, cats_result| match cats_result { Ok(cats) => { log::info!("Received {} cats", cats.len()); @@ -182,7 +191,6 @@ fn cat_fetch_controls(state: &AppState) -> impl Element { if !state.cats_are_being_fetched { state.cats.clear(); } - state.cats_are_being_fetched = true; state.cats_to_fetch = input_target(&ev).value().parse().unwrap_or(0); })), )), diff --git a/xilem_web/web_examples/streams/Cargo.toml b/xilem_web/web_examples/streams/Cargo.toml new file mode 100644 index 000000000..c7899287d --- /dev/null +++ b/xilem_web/web_examples/streams/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "streams" +version = "0.1.0" +publish = false +license.workspace = true +edition.workspace = true +rust-version.workspace = true + +[lints] +workspace = true + +[dependencies] +anyhow = "1.0.93" +async-stream = "0.3.6" +console_error_panic_hook = "0.1" +console_log = "1" +futures = "0.3.31" +getrandom = { version = "0.2.15", features = ["js"] } +gloo-timers = { version = "0.3.0", features = ["futures"] } +log = "0.4" +rand = { version = "0.8.5", features = ["small_rng"] } +web-sys = { version = "0.3.72", features = ["KeyboardEvent"] } +xilem_web = { path = "../.." } diff --git a/xilem_web/web_examples/streams/index.html b/xilem_web/web_examples/streams/index.html new file mode 100644 index 000000000..934740173 --- /dev/null +++ b/xilem_web/web_examples/streams/index.html @@ -0,0 +1,7 @@ + + + + + + + diff --git a/xilem_web/web_examples/streams/src/api.rs b/xilem_web/web_examples/streams/src/api.rs new file mode 100644 index 000000000..0589d5211 --- /dev/null +++ b/xilem_web/web_examples/streams/src/api.rs @@ -0,0 +1,98 @@ +// Copyright 2024 the Xilem Authors +// SPDX-License-Identifier: Apache-2.0 + +use std::future::Future; + +use anyhow::anyhow; +use async_stream::stream; +use futures::{channel::oneshot, select, FutureExt, Stream}; +use gloo_timers::future::TimeoutFuture; +use rand::Rng; + +#[derive(Debug)] +pub(crate) enum StreamMessage { + Started(AbortHandle), + SearchResult(anyhow::Result>), + Aborted, + TimedOut, + Finished, +} + +#[derive(Default)] +pub(crate) struct MockConnection; + +impl MockConnection { + pub(crate) fn search(&self, search_term: String) -> impl Stream { + let (shutdown_signal, abort_handle) = ShutdownSignal::new(); + let mut shutdown = shutdown_signal.into_future().fuse(); + stream! { + yield StreamMessage::Started(abort_handle); + loop { + let mut timeout = TimeoutFuture::new(3_000).fuse(); + let mut search_result = Box::pin(simulated_search(&search_term).fuse()); + select! { + () = shutdown => { + yield StreamMessage::Aborted; + break; + } + () = timeout => { + yield StreamMessage::TimedOut; + return; + } + result = search_result => { + let search_more = match &result { + Ok(Some(_)) => true, + Ok(None) | Err(_) => false + }; + yield StreamMessage::SearchResult(result); + if !search_more { + break; + } + } + complete => panic!("stream completed unexpectedly"), + } + } + yield StreamMessage::Finished; + } + } +} + +async fn simulated_search(search_term: &str) -> anyhow::Result> { + let mut rng = rand::thread_rng(); + + let delay_ms = rng.gen_range(5..=600); + TimeoutFuture::new(delay_ms).fuse().await; + let random_value: u8 = rng.gen_range(0..=100); + + match random_value { + 0..=10 => Err(anyhow!("Simulated error")), + 11..=50 => Ok(None), // Nothing found + _ => Ok(Some(format!("Result for '{search_term}'"))), + } +} + +#[derive(Debug)] +pub(crate) struct AbortHandle { + abort_tx: oneshot::Sender<()>, +} + +impl AbortHandle { + pub(crate) fn abort(self) { + let _ = self.abort_tx.send(()); + } +} + +struct ShutdownSignal { + shutdown_rx: oneshot::Receiver<()>, +} + +impl ShutdownSignal { + fn new() -> (Self, AbortHandle) { + let (abort_tx, shutdown_rx) = oneshot::channel(); + (Self { shutdown_rx }, AbortHandle { abort_tx }) + } + + fn into_future(self) -> impl Future { + self.shutdown_rx.map(|_| ()) + } +} diff --git a/xilem_web/web_examples/streams/src/main.rs b/xilem_web/web_examples/streams/src/main.rs new file mode 100644 index 000000000..b62160311 --- /dev/null +++ b/xilem_web/web_examples/streams/src/main.rs @@ -0,0 +1,111 @@ +// Copyright 2024 the Xilem Authors +// SPDX-License-Identifier: Apache-2.0 + +//! This example demonstrates the use of [`memoized_stream`]. + +use std::pin::Pin; + +use futures::{stream, Stream}; +use xilem_web::{ + concurrent::memoized_stream, core::fork, document_body, elements::html, + input_event_target_value, interfaces::Element, App, +}; + +mod api; +use self::api::{AbortHandle, MockConnection, StreamMessage}; + +#[derive(Default)] +struct AppState { + search_term: String, + db: MockConnection, + current_search: Option, +} + +const DEBOUNCE_MILLIS: usize = 500; + +fn app_logic(state: &mut AppState) -> impl Element { + log::debug!("Run app logic"); + + let search_stream = memoized_stream( + state.search_term.clone(), + create_search_stream, + handle_stream_message, + ) + .debounce_ms(DEBOUNCE_MILLIS) + .reset_debounce_on_update(true); + + html::div(fork( + html::div(( + html::input(()) + .on_keyup(on_search_input_keyup) + .attr("value", state.search_term.clone()) + .attr("placeholder", "Type to search"), + html::p(( + "Search is running: ", + if state.current_search.is_some() { + "yes" + } else { + "no" + }, + )), + )), + search_stream, + )) +} + +fn on_search_input_keyup(state: &mut AppState, ev: web_sys::KeyboardEvent) { + if ev.key() == "Escape" { + state.search_term.clear(); + return; + } + if let Some(abort_handle) = state.current_search.take() { + abort_handle.abort(); + }; + state.search_term = input_event_target_value(&ev).expect("input value"); +} + +fn create_search_stream( + state: &mut AppState, + term: &String, +) -> Pin>> { + if term.is_empty() { + Box::pin(stream::empty()) + } else { + Box::pin(state.db.search(term.to_owned())) + } +} + +fn handle_stream_message(state: &mut AppState, message: StreamMessage) { + match message { + StreamMessage::Started(abort_handle) => { + log::debug!("Search stream started"); + debug_assert!( + state.current_search.is_none(), + "The previous search should already have been canceled" + ); + state.current_search = Some(abort_handle); + } + StreamMessage::SearchResult(result) => { + log::debug!("Stream result {result:?}"); + } + StreamMessage::Aborted => { + log::debug!("Stream aborted"); + state.current_search = None; + } + StreamMessage::TimedOut => { + log::debug!("Stream timed out"); + state.current_search = None; + } + StreamMessage::Finished => { + log::debug!("Stream finished"); + state.current_search = None; + } + } +} + +fn main() { + _ = console_log::init_with_level(log::Level::Debug); + console_error_panic_hook::set_once(); + log::info!("Start application"); + App::new(document_body(), AppState::default(), app_logic).run(); +}