Skip to content

Commit

Permalink
Watching versions of bevy/get and bevy/list with HTTP SSE (#15608)
Browse files Browse the repository at this point in the history
## Objective

Add a way to stream BRP requests when the data changes.

## Solution

#### BRP Side (reusable for other transports)

Add a new method handler type that returns a optional value. This
handler is run in update and if a value is returned it will be sent on
the message channel. Custom watching handlers can be added with
`RemotePlugin::with_watching_method`.

#### HTTP Side

If a request comes in with `+watch` in the method, it will respond with
`text/event-stream` rather than a single response.

## Testing

I tested with the podman HTTP client. This client has good support for
SSE's if you want to test it too.

## Parts I want some opinions on

- For separating watching methods I chose to add a `+watch` suffix to
the end kind of like `content-type` headers. A get would be
`bevy/get+watch`.
- Should watching methods send an initial response with everything or
only respond when a change happens? Currently the later is what happens.

## Future work

- The `bevy/query` method would also benefit from this but that
condition will be quite complex so I will leave that to later.

---------

Co-authored-by: Zachary Harrold <zac@harrold.com.au>
  • Loading branch information
LiamGallagher737 and bushrat011899 authored Oct 8, 2024
1 parent 21b78b5 commit f1fbb66
Show file tree
Hide file tree
Showing 4 changed files with 555 additions and 69 deletions.
209 changes: 203 additions & 6 deletions crates/bevy_remote/src/builtin_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ use anyhow::{anyhow, Result as AnyhowResult};
use bevy_ecs::{
component::ComponentId,
entity::Entity,
event::EventCursor,
query::QueryBuilder,
reflect::{AppTypeRegistry, ReflectComponent},
system::In,
removal_detection::RemovedComponentEntity,
system::{In, Local},
world::{EntityRef, EntityWorldMut, FilteredEntityRef, World},
};
use bevy_hierarchy::BuildChildren as _;
Expand Down Expand Up @@ -46,6 +48,12 @@ pub const BRP_REPARENT_METHOD: &str = "bevy/reparent";
/// The method path for a `bevy/list` request.
pub const BRP_LIST_METHOD: &str = "bevy/list";

/// The method path for a `bevy/get+watch` request.
pub const BRP_GET_AND_WATCH_METHOD: &str = "bevy/get+watch";

/// The method path for a `bevy/list+watch` request.
pub const BRP_LIST_AND_WATCH_METHOD: &str = "bevy/list+watch";

/// `bevy/get`: Retrieves one or more components from the entity with the given
/// ID.
///
Expand Down Expand Up @@ -248,9 +256,41 @@ pub enum BrpGetResponse {
Strict(HashMap<String, Value>),
}

/// A single response from a `bevy/get+watch` request.
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(untagged)]
pub enum BrpGetWatchingResponse {
/// The non-strict response that reports errors separately without failing the entire request.
Lenient {
/// A map of successful components with their values that were added or changes in the last
/// tick.
components: HashMap<String, Value>,
/// An array of components that were been removed in the last tick.
removed: Vec<String>,
/// A map of unsuccessful components with their errors.
errors: HashMap<String, Value>,
},
/// The strict response that will fail if any components are not present or aren't
/// reflect-able.
Strict {
/// A map of successful components with their values that were added or changes in the last
/// tick.
components: HashMap<String, Value>,
/// An array of components that were been removed in the last tick.
removed: Vec<String>,
},
}

/// The response to a `bevy/list` request.
pub type BrpListResponse = Vec<String>;

/// A single response from a `bevy/list+watch` request.
#[derive(Debug, Default, Serialize, Deserialize, Clone)]
pub struct BrpListWatchingResponse {
added: Vec<String>,
removed: Vec<String>,
}

/// The response to a `bevy/query` request.
pub type BrpQueryResponse = Vec<BrpQueryRow>;

Expand Down Expand Up @@ -301,6 +341,117 @@ pub fn process_remote_get_request(In(params): In<Option<Value>>, world: &World)
let type_registry = app_type_registry.read();
let entity_ref = get_entity(world, entity)?;

let response =
reflect_components_to_response(components, strict, entity, entity_ref, &type_registry)?;
serde_json::to_value(response).map_err(BrpError::internal)
}

/// Handles a `bevy/get+watch` request coming from a client.
pub fn process_remote_get_watching_request(
In(params): In<Option<Value>>,
world: &World,
mut removal_cursors: Local<HashMap<ComponentId, EventCursor<RemovedComponentEntity>>>,
) -> BrpResult<Option<Value>> {
let BrpGetParams {
entity,
components,
strict,
} = parse_some(params)?;

let app_type_registry = world.resource::<AppTypeRegistry>();
let type_registry = app_type_registry.read();
let entity_ref = get_entity(world, entity)?;

let mut changed = Vec::new();
let mut removed = Vec::new();
let mut errors = HashMap::new();

'component_loop: for component_path in components {
let Ok(type_registration) =
get_component_type_registration(&type_registry, &component_path)
else {
let err =
BrpError::component_error(format!("Unknown component type: `{component_path}`"));
if strict {
return Err(err);
}
errors.insert(
component_path,
serde_json::to_value(err).map_err(BrpError::internal)?,
);
continue;
};
let Some(component_id) = world.components().get_id(type_registration.type_id()) else {
let err = BrpError::component_error(format!("Unknown component: `{component_path}`"));
if strict {
return Err(err);
}
errors.insert(
component_path,
serde_json::to_value(err).map_err(BrpError::internal)?,
);
continue;
};

if let Some(ticks) = entity_ref.get_change_ticks_by_id(component_id) {
if ticks.is_changed(world.last_change_tick(), world.read_change_tick()) {
changed.push(component_path);
continue;
}
};

let Some(events) = world.removed_components().get(component_id) else {
continue;
};
let cursor = removal_cursors
.entry(component_id)
.or_insert_with(|| events.get_cursor());
for event in cursor.read(events) {
if Entity::from(event.clone()) == entity {
removed.push(component_path);
continue 'component_loop;
}
}
}

if changed.is_empty() && removed.is_empty() {
return Ok(None);
}

let response =
reflect_components_to_response(changed, strict, entity, entity_ref, &type_registry)?;

let response = match response {
BrpGetResponse::Lenient {
components,
errors: mut errs,
} => BrpGetWatchingResponse::Lenient {
components,
removed,
errors: {
errs.extend(errors);
errs
},
},
BrpGetResponse::Strict(components) => BrpGetWatchingResponse::Strict {
components,
removed,
},
};

Ok(Some(
serde_json::to_value(response).map_err(BrpError::internal)?,
))
}

/// Reflect a list of components on an entity into a [`BrpGetResponse`].
fn reflect_components_to_response(
components: Vec<String>,
strict: bool,
entity: Entity,
entity_ref: EntityRef,
type_registry: &TypeRegistry,
) -> BrpResult<BrpGetResponse> {
let mut response = if strict {
BrpGetResponse::Strict(Default::default())
} else {
Expand All @@ -311,7 +462,7 @@ pub fn process_remote_get_request(In(params): In<Option<Value>>, world: &World)
};

for component_path in components {
match handle_get_component(&component_path, entity, entity_ref, &type_registry) {
match reflect_component(&component_path, entity, entity_ref, type_registry) {
Ok(serialized_object) => match response {
BrpGetResponse::Strict(ref mut components)
| BrpGetResponse::Lenient {
Expand All @@ -330,16 +481,16 @@ pub fn process_remote_get_request(In(params): In<Option<Value>>, world: &World)
}
}

serde_json::to_value(response).map_err(BrpError::internal)
Ok(response)
}

/// Handle a single component for [`process_remote_get_request`].
fn handle_get_component(
/// Reflect a single component on an entity with the given component path.
fn reflect_component(
component_path: &str,
entity: Entity,
entity_ref: EntityRef,
type_registry: &TypeRegistry,
) -> Result<Map<String, Value>, BrpError> {
) -> BrpResult<Map<String, Value>> {
let reflect_component =
get_reflect_component(type_registry, component_path).map_err(BrpError::component_error)?;

Expand Down Expand Up @@ -596,6 +747,52 @@ pub fn process_remote_list_request(In(params): In<Option<Value>>, world: &World)
serde_json::to_value(response).map_err(BrpError::internal)
}

/// Handles a `bevy/list` request (list all components) coming from a client.
pub fn process_remote_list_watching_request(
In(params): In<Option<Value>>,
world: &World,
mut removal_cursors: Local<HashMap<ComponentId, EventCursor<RemovedComponentEntity>>>,
) -> BrpResult<Option<Value>> {
let BrpListParams { entity } = parse_some(params)?;
let entity_ref = get_entity(world, entity)?;
let mut response = BrpListWatchingResponse::default();

for component_id in entity_ref.archetype().components() {
let ticks = entity_ref
.get_change_ticks_by_id(component_id)
.ok_or(BrpError::internal("Failed to get ticks"))?;

if ticks.is_added(world.last_change_tick(), world.read_change_tick()) {
let Some(component_info) = world.components().get_info(component_id) else {
continue;
};
response.added.push(component_info.name().to_owned());
}
}

for (component_id, events) in world.removed_components().iter() {
let cursor = removal_cursors
.entry(*component_id)
.or_insert_with(|| events.get_cursor());
for event in cursor.read(events) {
if Entity::from(event.clone()) == entity {
let Some(component_info) = world.components().get_info(*component_id) else {
continue;
};
response.removed.push(component_info.name().to_owned());
}
}
}

if response.added.is_empty() && response.removed.is_empty() {
Ok(None)
} else {
Ok(Some(
serde_json::to_value(response).map_err(BrpError::internal)?,
))
}
}

/// Immutably retrieves an entity from the [`World`], returning an error if the
/// entity isn't present.
fn get_entity(world: &World, entity: Entity) -> Result<EntityRef<'_>, BrpError> {
Expand Down
Loading

0 comments on commit f1fbb66

Please sign in to comment.