diff --git a/crates/bevy_remote/src/builtin_methods.rs b/crates/bevy_remote/src/builtin_methods.rs index f2e958189205e..08458da1c02cf 100644 --- a/crates/bevy_remote/src/builtin_methods.rs +++ b/crates/bevy_remote/src/builtin_methods.rs @@ -6,9 +6,11 @@ use anyhow::{anyhow, Result as AnyhowResult}; use bevy_ecs::{ component::ComponentId, entity::Entity, + event::EventCursor, query::QueryBuilder, reflect::{AppTypeRegistry, ReflectComponent}, - system::In, + removal_detection::RemovedComponentEntity, + system::{In, Local}, world::{EntityRef, EntityWorldMut, FilteredEntityRef, World}, }; use bevy_hierarchy::BuildChildren as _; @@ -46,6 +48,12 @@ pub const BRP_REPARENT_METHOD: &str = "bevy/reparent"; /// The method path for a `bevy/list` request. pub const BRP_LIST_METHOD: &str = "bevy/list"; +/// The method path for a `bevy/get+watch` request. +pub const BRP_GET_AND_WATCH_METHOD: &str = "bevy/get+watch"; + +/// The method path for a `bevy/list+watch` request. +pub const BRP_LIST_AND_WATCH_METHOD: &str = "bevy/list+watch"; + /// `bevy/get`: Retrieves one or more components from the entity with the given /// ID. /// @@ -248,9 +256,41 @@ pub enum BrpGetResponse { Strict(HashMap), } +/// A single response from a `bevy/get+watch` request. +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(untagged)] +pub enum BrpGetWatchingResponse { + /// The non-strict response that reports errors separately without failing the entire request. + Lenient { + /// A map of successful components with their values that were added or changes in the last + /// tick. + components: HashMap, + /// An array of components that were been removed in the last tick. + removed: Vec, + /// A map of unsuccessful components with their errors. + errors: HashMap, + }, + /// The strict response that will fail if any components are not present or aren't + /// reflect-able. + Strict { + /// A map of successful components with their values that were added or changes in the last + /// tick. + components: HashMap, + /// An array of components that were been removed in the last tick. + removed: Vec, + }, +} + /// The response to a `bevy/list` request. pub type BrpListResponse = Vec; +/// A single response from a `bevy/list+watch` request. +#[derive(Debug, Default, Serialize, Deserialize, Clone)] +pub struct BrpListWatchingResponse { + added: Vec, + removed: Vec, +} + /// The response to a `bevy/query` request. pub type BrpQueryResponse = Vec; @@ -301,6 +341,117 @@ pub fn process_remote_get_request(In(params): In>, world: &World) let type_registry = app_type_registry.read(); let entity_ref = get_entity(world, entity)?; + let response = + reflect_components_to_response(components, strict, entity, entity_ref, &type_registry)?; + serde_json::to_value(response).map_err(BrpError::internal) +} + +/// Handles a `bevy/get+watch` request coming from a client. +pub fn process_remote_get_watching_request( + In(params): In>, + world: &World, + mut removal_cursors: Local>>, +) -> BrpResult> { + let BrpGetParams { + entity, + components, + strict, + } = parse_some(params)?; + + let app_type_registry = world.resource::(); + let type_registry = app_type_registry.read(); + let entity_ref = get_entity(world, entity)?; + + let mut changed = Vec::new(); + let mut removed = Vec::new(); + let mut errors = HashMap::new(); + + 'component_loop: for component_path in components { + let Ok(type_registration) = + get_component_type_registration(&type_registry, &component_path) + else { + let err = + BrpError::component_error(format!("Unknown component type: `{component_path}`")); + if strict { + return Err(err); + } + errors.insert( + component_path, + serde_json::to_value(err).map_err(BrpError::internal)?, + ); + continue; + }; + let Some(component_id) = world.components().get_id(type_registration.type_id()) else { + let err = BrpError::component_error(format!("Unknown component: `{component_path}`")); + if strict { + return Err(err); + } + errors.insert( + component_path, + serde_json::to_value(err).map_err(BrpError::internal)?, + ); + continue; + }; + + if let Some(ticks) = entity_ref.get_change_ticks_by_id(component_id) { + if ticks.is_changed(world.last_change_tick(), world.read_change_tick()) { + changed.push(component_path); + continue; + } + }; + + let Some(events) = world.removed_components().get(component_id) else { + continue; + }; + let cursor = removal_cursors + .entry(component_id) + .or_insert_with(|| events.get_cursor()); + for event in cursor.read(events) { + if Entity::from(event.clone()) == entity { + removed.push(component_path); + continue 'component_loop; + } + } + } + + if changed.is_empty() && removed.is_empty() { + return Ok(None); + } + + let response = + reflect_components_to_response(changed, strict, entity, entity_ref, &type_registry)?; + + let response = match response { + BrpGetResponse::Lenient { + components, + errors: mut errs, + } => BrpGetWatchingResponse::Lenient { + components, + removed, + errors: { + errs.extend(errors); + errs + }, + }, + BrpGetResponse::Strict(components) => BrpGetWatchingResponse::Strict { + components, + removed, + }, + }; + + Ok(Some( + serde_json::to_value(response).map_err(BrpError::internal)?, + )) +} + +/// Reflect a list of components on an entity into a [`BrpGetResponse`]. +fn reflect_components_to_response( + components: Vec, + strict: bool, + entity: Entity, + entity_ref: EntityRef, + type_registry: &TypeRegistry, +) -> BrpResult { let mut response = if strict { BrpGetResponse::Strict(Default::default()) } else { @@ -311,7 +462,7 @@ pub fn process_remote_get_request(In(params): In>, world: &World) }; for component_path in components { - match handle_get_component(&component_path, entity, entity_ref, &type_registry) { + match reflect_component(&component_path, entity, entity_ref, type_registry) { Ok(serialized_object) => match response { BrpGetResponse::Strict(ref mut components) | BrpGetResponse::Lenient { @@ -330,16 +481,16 @@ pub fn process_remote_get_request(In(params): In>, world: &World) } } - serde_json::to_value(response).map_err(BrpError::internal) + Ok(response) } -/// Handle a single component for [`process_remote_get_request`]. -fn handle_get_component( +/// Reflect a single component on an entity with the given component path. +fn reflect_component( component_path: &str, entity: Entity, entity_ref: EntityRef, type_registry: &TypeRegistry, -) -> Result, BrpError> { +) -> BrpResult> { let reflect_component = get_reflect_component(type_registry, component_path).map_err(BrpError::component_error)?; @@ -596,6 +747,52 @@ pub fn process_remote_list_request(In(params): In>, world: &World) serde_json::to_value(response).map_err(BrpError::internal) } +/// Handles a `bevy/list` request (list all components) coming from a client. +pub fn process_remote_list_watching_request( + In(params): In>, + world: &World, + mut removal_cursors: Local>>, +) -> BrpResult> { + let BrpListParams { entity } = parse_some(params)?; + let entity_ref = get_entity(world, entity)?; + let mut response = BrpListWatchingResponse::default(); + + for component_id in entity_ref.archetype().components() { + let ticks = entity_ref + .get_change_ticks_by_id(component_id) + .ok_or(BrpError::internal("Failed to get ticks"))?; + + if ticks.is_added(world.last_change_tick(), world.read_change_tick()) { + let Some(component_info) = world.components().get_info(component_id) else { + continue; + }; + response.added.push(component_info.name().to_owned()); + } + } + + for (component_id, events) in world.removed_components().iter() { + let cursor = removal_cursors + .entry(*component_id) + .or_insert_with(|| events.get_cursor()); + for event in cursor.read(events) { + if Entity::from(event.clone()) == entity { + let Some(component_info) = world.components().get_info(*component_id) else { + continue; + }; + response.removed.push(component_info.name().to_owned()); + } + } + } + + if response.added.is_empty() && response.removed.is_empty() { + Ok(None) + } else { + Ok(Some( + serde_json::to_value(response).map_err(BrpError::internal)?, + )) + } +} + /// Immutably retrieves an entity from the [`World`], returning an error if the /// entity isn't present. fn get_entity(world: &World, entity: Entity) -> Result, BrpError> { diff --git a/crates/bevy_remote/src/http.rs b/crates/bevy_remote/src/http.rs index 7c4af5ecb03cf..1b725ffab36a5 100644 --- a/crates/bevy_remote/src/http.rs +++ b/crates/bevy_remote/src/http.rs @@ -8,26 +8,32 @@ #![cfg(not(target_family = "wasm"))] -use crate::{error_codes, BrpBatch, BrpError, BrpMessage, BrpRequest, BrpResponse, BrpSender}; +use crate::{ + error_codes, BrpBatch, BrpError, BrpMessage, BrpRequest, BrpResponse, BrpResult, BrpSender, +}; use anyhow::Result as AnyhowResult; -use async_channel::Sender; +use async_channel::{Receiver, Sender}; use async_io::Async; use bevy_app::{App, Plugin, Startup}; use bevy_ecs::system::{Res, Resource}; -use bevy_tasks::IoTaskPool; +use bevy_tasks::{futures_lite::StreamExt, IoTaskPool}; use core::net::{IpAddr, Ipv4Addr}; +use core::{ + convert::Infallible, + pin::Pin, + task::{Context, Poll}, +}; use http_body_util::{BodyExt as _, Full}; use hyper::header::{HeaderName, HeaderValue}; use hyper::{ - body::{Bytes, Incoming}, + body::{Body, Bytes, Frame, Incoming}, server::conn::http1, service, Request, Response, }; use serde_json::Value; use smol_hyper::rt::{FuturesIo, SmolTimer}; use std::collections::HashMap; -use std::net::TcpListener; -use std::net::TcpStream; +use std::net::{TcpListener, TcpStream}; /// The default port that Bevy will listen on. /// @@ -259,22 +265,41 @@ async fn process_request_batch( request: Request, request_sender: &Sender, headers: &Headers, -) -> AnyhowResult>> { +) -> AnyhowResult> { let batch_bytes = request.into_body().collect().await?.to_bytes(); let batch: Result = serde_json::from_slice(&batch_bytes); - let serialized = match batch { + let result = match batch { Ok(BrpBatch::Single(request)) => { - serde_json::to_string(&process_single_request(request, request_sender).await?)? + let response = process_single_request(request, request_sender).await?; + match response { + BrpHttpResponse::Complete(res) => { + BrpHttpResponse::Complete(serde_json::to_string(&res)?) + } + BrpHttpResponse::Stream(stream) => BrpHttpResponse::Stream(stream), + } } Ok(BrpBatch::Batch(requests)) => { let mut responses = Vec::new(); for request in requests { - responses.push(process_single_request(request, request_sender).await?); + let response = process_single_request(request, request_sender).await?; + match response { + BrpHttpResponse::Complete(res) => responses.push(res), + BrpHttpResponse::Stream(BrpStream { id, .. }) => { + responses.push(BrpResponse::new( + id, + Err(BrpError { + code: error_codes::INVALID_REQUEST, + message: "Streaming can not be used in batch requests".to_string(), + data: None, + }), + )); + } + } } - serde_json::to_string(&responses)? + BrpHttpResponse::Complete(serde_json::to_string(&responses)?) } Err(err) => { let err = BrpResponse::new( @@ -286,15 +311,30 @@ async fn process_request_batch( }), ); - serde_json::to_string(&err)? + BrpHttpResponse::Complete(serde_json::to_string(&err)?) } }; - let mut response = Response::new(Full::new(Bytes::from(serialized.as_bytes().to_owned()))); - response.headers_mut().insert( - hyper::header::CONTENT_TYPE, - HeaderValue::from_static("application/json"), - ); + let mut response = match result { + BrpHttpResponse::Complete(serialized) => { + let mut response = Response::new(BrpHttpBody::Complete(Full::new(Bytes::from( + serialized.as_bytes().to_owned(), + )))); + response.headers_mut().insert( + hyper::header::CONTENT_TYPE, + HeaderValue::from_static("application/json"), + ); + response + } + BrpHttpResponse::Stream(stream) => { + let mut response = Response::new(BrpHttpBody::Stream(stream)); + response.headers_mut().insert( + hyper::header::CONTENT_TYPE, + HeaderValue::from_static("text/event-stream"), + ); + response + } + }; for (key, value) in &headers.headers { response.headers_mut().insert(key, value.clone()); } @@ -306,36 +346,38 @@ async fn process_request_batch( async fn process_single_request( request: Value, request_sender: &Sender, -) -> AnyhowResult { +) -> AnyhowResult> { // Reach in and get the request ID early so that we can report it even when parsing fails. let id = request.as_object().and_then(|map| map.get("id")).cloned(); let request: BrpRequest = match serde_json::from_value(request) { Ok(v) => v, Err(err) => { - return Ok(BrpResponse::new( + return Ok(BrpHttpResponse::Complete(BrpResponse::new( id, Err(BrpError { code: error_codes::INVALID_REQUEST, message: err.to_string(), data: None, }), - )); + ))); } }; if request.jsonrpc != "2.0" { - return Ok(BrpResponse::new( + return Ok(BrpHttpResponse::Complete(BrpResponse::new( id, Err(BrpError { code: error_codes::INVALID_REQUEST, message: String::from("JSON-RPC request requires `\"jsonrpc\": \"2.0\"`"), data: None, }), - )); + ))); } - let (result_sender, result_receiver) = async_channel::bounded(1); + let watch = request.method.contains("+watch"); + let size = if watch { 8 } else { 1 }; + let (result_sender, result_receiver) = async_channel::bounded(size); let _ = request_sender .send(BrpMessage { @@ -345,6 +387,74 @@ async fn process_single_request( }) .await; - let result = result_receiver.recv().await?; - Ok(BrpResponse::new(request.id, result)) + if watch { + Ok(BrpHttpResponse::Stream(BrpStream { + id: request.id, + rx: Box::pin(result_receiver), + })) + } else { + let result = result_receiver.recv().await?; + Ok(BrpHttpResponse::Complete(BrpResponse::new( + request.id, result, + ))) + } +} + +struct BrpStream { + id: Option, + rx: Pin>>, +} + +impl Body for BrpStream { + type Data = Bytes; + type Error = Infallible; + + fn poll_frame( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + match self.as_mut().rx.poll_next(cx) { + Poll::Ready(result) => match result { + Some(result) => { + let response = BrpResponse::new(self.id.clone(), result); + let serialized = serde_json::to_string(&response).unwrap(); + let bytes = + Bytes::from(format!("data: {serialized}\n\n").as_bytes().to_owned()); + let frame = Frame::data(bytes); + Poll::Ready(Some(Ok(frame))) + } + None => Poll::Ready(None), + }, + Poll::Pending => Poll::Pending, + } + } + + fn is_end_stream(&self) -> bool { + dbg!(self.rx.is_closed()) + } +} + +enum BrpHttpResponse { + Complete(C), + Stream(S), +} + +enum BrpHttpBody { + Complete(Full), + Stream(BrpStream), +} + +impl Body for BrpHttpBody { + type Data = Bytes; + type Error = Infallible; + + fn poll_frame( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + match &mut *self.get_mut() { + BrpHttpBody::Complete(body) => Body::poll_frame(Pin::new(body), cx), + BrpHttpBody::Stream(body) => Body::poll_frame(Pin::new(body), cx), + } + } } diff --git a/crates/bevy_remote/src/lib.rs b/crates/bevy_remote/src/lib.rs index 57bc05035346b..e50feb83b4ad3 100644 --- a/crates/bevy_remote/src/lib.rs +++ b/crates/bevy_remote/src/lib.rs @@ -212,6 +212,51 @@ //! //! `result`: An array of fully-qualified type names of components. //! +//! ### bevy/get+watch +//! +//! Watch the values of one or more components from an entity. +//! +//! `params`: +//! - `entity`: The ID of the entity whose components will be fetched. +//! - `components`: An array of [fully-qualified type names] of components to fetch. +//! - `strict` (optional): A flag to enable strict mode which will fail if any one of the +//! components is not present or can not be reflected. Defaults to false. +//! +//! If `strict` is false: +//! +//! `result`: +//! - `components`: A map of components added or changed in the last tick associating each type +//! name to its value on the requested entity. +//! - `removed`: An array of fully-qualified type names of components removed from the entity +//! in the last tick. +//! - `errors`: A map associating each type name with an error if it was not on the entity +//! or could not be reflected. +//! +//! If `strict` is true: +//! +//! `result`: +//! - `components`: A map of components added or changed in the last tick associating each type +//! name to its value on the requested entity. +//! - `removed`: An array of fully-qualified type names of components removed from the entity +//! in the last tick. +//! +//! ### bevy/list+watch +//! +//! Watch all components present on an entity. +//! +//! When `params` is not provided, this lists all registered components. If `params` is provided, +//! this lists only those components present on the provided entity. +//! +//! `params`: +//! - `entity`: The ID of the entity whose components will be listed. +//! +//! `result`: +//! - `added`: An array of fully-qualified type names of components added to the entity in the +//! last tick. +//! - `removed`: An array of fully-qualified type names of components removed from the entity +//! in the last tick. +//! +//! //! ## Custom methods //! //! In addition to the provided methods, the Bevy Remote Protocol can be extended to include custom @@ -260,7 +305,8 @@ use bevy_app::prelude::*; use bevy_derive::{Deref, DerefMut}; use bevy_ecs::{ entity::Entity, - system::{Commands, In, IntoSystem, Resource, System, SystemId}, + schedule::IntoSystemConfigs, + system::{Commands, In, IntoSystem, ResMut, Resource, System, SystemId}, world::World, }; use bevy_utils::{prelude::default, HashMap}; @@ -281,12 +327,7 @@ const CHANNEL_SIZE: usize = 16; /// [crate-level documentation]: crate pub struct RemotePlugin { /// The verbs that the server will recognize and respond to. - methods: RwLock< - Vec<( - String, - Box>, Out = BrpResult>>, - )>, - >, + methods: RwLock>, } impl RemotePlugin { @@ -305,10 +346,24 @@ impl RemotePlugin { name: impl Into, handler: impl IntoSystem>, BrpResult, M>, ) -> Self { - self.methods - .get_mut() - .unwrap() - .push((name.into(), Box::new(IntoSystem::into_system(handler)))); + self.methods.get_mut().unwrap().push(( + name.into(), + RemoteMethodHandler::Instant(Box::new(IntoSystem::into_system(handler))), + )); + self + } + + /// Add a remote method with a watching handler to the plugin using the given `name`. + #[must_use] + pub fn with_watching_method( + mut self, + name: impl Into, + handler: impl IntoSystem>, BrpResult>, M>, + ) -> Self { + self.methods.get_mut().unwrap().push(( + name.into(), + RemoteMethodHandler::Watching(Box::new(IntoSystem::into_system(handler))), + )); self } } @@ -348,40 +403,93 @@ impl Default for RemotePlugin { builtin_methods::BRP_LIST_METHOD, builtin_methods::process_remote_list_request, ) + .with_watching_method( + builtin_methods::BRP_GET_AND_WATCH_METHOD, + builtin_methods::process_remote_get_watching_request, + ) + .with_watching_method( + builtin_methods::BRP_LIST_AND_WATCH_METHOD, + builtin_methods::process_remote_list_watching_request, + ) } } impl Plugin for RemotePlugin { fn build(&self, app: &mut App) { let mut remote_methods = RemoteMethods::new(); + let plugin_methods = &mut *self.methods.write().unwrap(); - for (name, system) in plugin_methods.drain(..) { + for (name, handler) in plugin_methods.drain(..) { remote_methods.insert( name, - app.main_mut().world_mut().register_boxed_system(system), + match handler { + RemoteMethodHandler::Instant(system) => RemoteMethodSystemId::Instant( + app.main_mut().world_mut().register_boxed_system(system), + ), + RemoteMethodHandler::Watching(system) => RemoteMethodSystemId::Watching( + app.main_mut().world_mut().register_boxed_system(system), + ), + }, ); } app.insert_resource(remote_methods) + .init_resource::() .add_systems(PreStartup, setup_mailbox_channel) - .add_systems(Update, process_remote_requests); + .add_systems( + Update, + ( + process_remote_requests, + process_ongoing_watching_requests, + remove_closed_watching_requests, + ) + .chain(), + ); } } -/// The type of a function that implements a remote method (`bevy/get`, `bevy/query`, etc.) +/// A type to hold the allowed types of systems to be used as method handlers. +#[derive(Debug)] +pub enum RemoteMethodHandler { + /// A handler that only runs once and returns one response. + Instant(Box>, Out = BrpResult>>), + /// A handler that watches for changes and response when a change is detected. + Watching(Box>, Out = BrpResult>>>), +} + +/// The [`SystemId`] of a function that implements a remote instant method (`bevy/get`, `bevy/query`, etc.) /// /// The first parameter is the JSON value of the `params`. Typically, an /// implementation will deserialize these as the first thing they do. /// /// The returned JSON value will be returned as the response. Bevy will /// automatically populate the `id` field before sending. -pub type RemoteMethod = SystemId>, BrpResult>; +pub type RemoteInstantMethodSystemId = SystemId>, BrpResult>; + +/// The [`SystemId`] of a function that implements a remote watching method (`bevy/get+watch`, `bevy/list+watch`, etc.) +/// +/// The first parameter is the JSON value of the `params`. Typically, an +/// implementation will deserialize these as the first thing they do. +/// +/// The optional returned JSON value will be sent as a response. If no +/// changes were detected this should be [`None`]. Re-running of this +/// handler is done in the [`RemotePlugin`]. +pub type RemoteWatchingMethodSystemId = SystemId>, BrpResult>>; + +/// The [`SystemId`] of a function that can be used as a remote method. +#[derive(Debug, Clone, Copy)] +pub enum RemoteMethodSystemId { + /// A handler that only runs once and returns one response. + Instant(RemoteInstantMethodSystemId), + /// A handler that watches for changes and response when a change is detected. + Watching(RemoteWatchingMethodSystemId), +} /// Holds all implementations of methods known to the server. /// /// Custom methods can be added to this list using [`RemoteMethods::insert`]. #[derive(Debug, Resource, Default)] -pub struct RemoteMethods(HashMap); +pub struct RemoteMethods(HashMap); impl RemoteMethods { /// Creates a new [`RemoteMethods`] resource with no methods registered in it. @@ -395,17 +503,21 @@ impl RemoteMethods { pub fn insert( &mut self, method_name: impl Into, - handler: RemoteMethod, - ) -> Option { + handler: RemoteMethodSystemId, + ) -> Option { self.0.insert(method_name.into(), handler) } - /// Retrieves a handler by method name. - pub fn get(&self, method_name: &str) -> Option<&RemoteMethod> { - self.0.get(method_name) + /// Get a [`RemoteMethodSystemId`] with its method name. + pub fn get(&self, method: &str) -> Option<&RemoteMethodSystemId> { + self.0.get(method) } } +/// Holds the [`BrpMessage`]'s of all ongoing watching requests along with their handlers. +#[derive(Debug, Resource, Default)] +pub struct RemoteWatchingRequests(Vec<(BrpMessage, RemoteWatchingMethodSystemId)>); + /// A single request from a Bevy Remote Protocol client to the server, /// serialized in JSON. /// @@ -590,7 +702,7 @@ pub mod error_codes { } /// The result of a request. -pub type BrpResult = Result; +pub type BrpResult = Result; /// The requests may occur on their own or in batches. /// Actual parsing is deferred for the sake of proper @@ -652,30 +764,83 @@ fn process_remote_requests(world: &mut World) { while let Ok(message) = world.resource_mut::().try_recv() { // Fetch the handler for the method. If there's no such handler // registered, return an error. - let methods = world.resource::(); - - let Some(handler) = methods.0.get(&message.method) else { + let Some(&handler) = world.resource::().get(&message.method) else { let _ = message.sender.force_send(Err(BrpError { code: error_codes::METHOD_NOT_FOUND, message: format!("Method `{}` not found", message.method), data: None, })); - continue; + return; }; - // Execute the handler, and send the result back to the client. - let result = match world.run_system_with_input(*handler, message.params) { - Ok(result) => result, - Err(error) => { - let _ = message.sender.force_send(Err(BrpError { - code: error_codes::INTERNAL_ERROR, - message: format!("Failed to run method handler: {error}"), - data: None, - })); - continue; + match handler { + RemoteMethodSystemId::Instant(id) => { + let result = match world.run_system_with_input(id, message.params) { + Ok(result) => result, + Err(error) => { + let _ = message.sender.force_send(Err(BrpError { + code: error_codes::INTERNAL_ERROR, + message: format!("Failed to run method handler: {error}"), + data: None, + })); + continue; + } + }; + + let _ = message.sender.force_send(result); + } + RemoteMethodSystemId::Watching(id) => { + world + .resource_mut::() + .0 + .push((message, id)); + } + } + } +} + +/// A system that checks all ongoing watching requests for changes that should be sent +/// and handles it if so. +fn process_ongoing_watching_requests(world: &mut World) { + world.resource_scope::(|world, requests| { + for (message, system_id) in requests.0.iter() { + let handler_result = process_single_ongoing_watching_request(world, message, system_id); + let sender_result = match handler_result { + Ok(Some(value)) => message.sender.try_send(Ok(value)), + Err(err) => message.sender.try_send(Err(err)), + Ok(None) => continue, + }; + + if sender_result.is_err() { + // The [`remove_closed_watching_requests`] system will clean this up. + message.sender.close(); } + } + }); +} + +fn process_single_ongoing_watching_request( + world: &mut World, + message: &BrpMessage, + system_id: &RemoteWatchingMethodSystemId, +) -> BrpResult> { + world + .run_system_with_input(*system_id, message.params.clone()) + .map_err(|error| BrpError { + code: error_codes::INTERNAL_ERROR, + message: format!("Failed to run method handler: {error}"), + data: None, + })? +} + +fn remove_closed_watching_requests(mut requests: ResMut) { + for i in (0..requests.0.len()).rev() { + let Some((message, _)) = requests.0.get(i) else { + unreachable!() }; - let _ = message.sender.force_send(result); + if message.sender.is_closed() { + requests.0.swap_remove(i); + } } } diff --git a/examples/remote/server.rs b/examples/remote/server.rs index 575c8ae13958a..2d6758e2643b6 100644 --- a/examples/remote/server.rs +++ b/examples/remote/server.rs @@ -1,6 +1,8 @@ //! A Bevy app that you can connect to with the BRP and edit. +use bevy::math::ops::cos; use bevy::{ + input::common_conditions::input_just_pressed, prelude::*, remote::{http::RemoteHttpPlugin, RemotePlugin}, }; @@ -12,6 +14,8 @@ fn main() { .add_plugins(RemotePlugin::default()) .add_plugins(RemoteHttpPlugin::default()) .add_systems(Startup, setup) + .add_systems(Update, remove.run_if(input_just_pressed(KeyCode::Space))) + .add_systems(Update, move_cube) .register_type::() .run(); } @@ -52,6 +56,16 @@ fn setup( )); } +fn move_cube(mut query: Query<&mut Transform, With>, time: Res