Skip to content

Commit

Permalink
drop pending asset loads (#14808)
Browse files Browse the repository at this point in the history
# Objective

when handles for loading assets are dropped, we currently wait until
load is completed before dropping the handle. drop asset-load tasks
immediately

## Solution

- track tasks for loading assets and drop them immediately when all
handles are dropped.
~~- use `join_all` in `gltf_loader.rs` to allow it to yield and be
dropped.~~

doesn't cover all the load apis - for those it doesn't cover the task
will still be detached and will still complete before the result is
discarded.

separated out from #13170
  • Loading branch information
robtfm authored Aug 27, 2024
1 parent 6ddbf97 commit f06cd44
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 73 deletions.
12 changes: 12 additions & 0 deletions crates/bevy_asset/src/server/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
UntypedAssetId, UntypedHandle,
};
use bevy_ecs::world::World;
use bevy_tasks::Task;
use bevy_utils::tracing::warn;
use bevy_utils::{Entry, HashMap, HashSet, TypeIdMap};
use crossbeam_channel::Sender;
Expand Down Expand Up @@ -76,6 +77,7 @@ pub(crate) struct AssetInfos {
pub(crate) dependency_loaded_event_sender: TypeIdMap<fn(&mut World, UntypedAssetId)>,
pub(crate) dependency_failed_event_sender:
TypeIdMap<fn(&mut World, UntypedAssetId, AssetPath<'static>, AssetLoadError)>,
pub(crate) pending_tasks: HashMap<UntypedAssetId, Task<()>>,
}

impl std::fmt::Debug for AssetInfos {
Expand Down Expand Up @@ -364,6 +366,7 @@ impl AssetInfos {
&mut self.path_to_id,
&mut self.loader_dependants,
&mut self.living_labeled_assets,
&mut self.pending_tasks,
self.watching_for_changes,
id,
)
Expand Down Expand Up @@ -587,6 +590,11 @@ impl AssetInfos {
}

pub(crate) fn process_asset_fail(&mut self, failed_id: UntypedAssetId, error: AssetLoadError) {
// Check whether the handle has been dropped since the asset was loaded.
if !self.infos.contains_key(&failed_id) {
return;
}

let (dependants_waiting_on_load, dependants_waiting_on_rec_load) = {
let Some(info) = self.get_mut(failed_id) else {
// The asset was already dropped.
Expand Down Expand Up @@ -648,6 +656,7 @@ impl AssetInfos {
path_to_id: &mut HashMap<AssetPath<'static>, TypeIdMap<UntypedAssetId>>,
loader_dependants: &mut HashMap<AssetPath<'static>, HashSet<AssetPath<'static>>>,
living_labeled_assets: &mut HashMap<AssetPath<'static>, HashSet<Box<str>>>,
pending_tasks: &mut HashMap<UntypedAssetId, Task<()>>,
watching_for_changes: bool,
id: UntypedAssetId,
) -> bool {
Expand All @@ -662,6 +671,8 @@ impl AssetInfos {
return false;
}

pending_tasks.remove(&id);

let type_id = entry.key().type_id();

let info = entry.remove();
Expand Down Expand Up @@ -704,6 +715,7 @@ impl AssetInfos {
&mut self.path_to_id,
&mut self.loader_dependants,
&mut self.living_labeled_assets,
&mut self.pending_tasks,
self.watching_for_changes,
id.untyped(provider.type_id),
);
Expand Down
164 changes: 91 additions & 73 deletions crates/bevy_asset/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,8 @@ impl AssetServer {
guard: G,
) -> Handle<A> {
let path = path.into().into_owned();
let (handle, should_load) = self.data.infos.write().get_or_create_path_handle::<A>(
let mut infos = self.data.infos.write();
let (handle, should_load) = infos.get_or_create_path_handle::<A>(
path.clone(),
HandleLoadingMode::Request,
meta_transform,
Expand All @@ -377,14 +378,18 @@ impl AssetServer {
if should_load {
let owned_handle = Some(handle.clone().untyped());
let server = self.clone();
IoTaskPool::get()
.spawn(async move {
if let Err(err) = server.load_internal(owned_handle, path, false, None).await {
error!("{}", err);
}
drop(guard);
})
.detach();
let task = IoTaskPool::get().spawn(async move {
if let Err(err) = server.load_internal(owned_handle, path, false, None).await {
error!("{}", err);
}
drop(guard);
});

#[cfg(not(any(target_arch = "wasm32", not(feature = "multi_threaded"))))]
infos.pending_tasks.insert(handle.id().untyped(), task);

#[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
task.detach();
}

handle
Expand Down Expand Up @@ -414,44 +419,47 @@ impl AssetServer {
CowArc::Owned(format!("{source}--{UNTYPED_SOURCE_SUFFIX}").into())
}
});
let (handle, should_load) = self
.data
.infos
.write()
.get_or_create_path_handle::<LoadedUntypedAsset>(
path.clone().with_source(untyped_source),
HandleLoadingMode::Request,
meta_transform,
);
let mut infos = self.data.infos.write();
let (handle, should_load) = infos.get_or_create_path_handle::<LoadedUntypedAsset>(
path.clone().with_source(untyped_source),
HandleLoadingMode::Request,
meta_transform,
);
if !should_load {
return handle;
}
let id = handle.id().untyped();
let owned_handle = Some(handle.clone().untyped());

let server = self.clone();
IoTaskPool::get()
.spawn(async move {
let path_clone = path.clone();
match server.load_untyped_async(path).await {
Ok(handle) => server.send_asset_event(InternalAssetEvent::Loaded {
let task = IoTaskPool::get().spawn(async move {
let path_clone = path.clone();
match server.load_internal(owned_handle, path, false, None).await {
Ok(handle) => server.send_asset_event(InternalAssetEvent::Loaded {
id,
loaded_asset: LoadedAsset::new_with_dependencies(
LoadedUntypedAsset { handle },
None,
)
.into(),
}),
Err(err) => {
error!("{err}");
server.send_asset_event(InternalAssetEvent::Failed {
id,
loaded_asset: LoadedAsset::new_with_dependencies(
LoadedUntypedAsset { handle },
None,
)
.into(),
}),
Err(err) => {
error!("{err}");
server.send_asset_event(InternalAssetEvent::Failed {
id,
path: path_clone,
error: err,
});
}
path: path_clone,
error: err,
});
}
})
.detach();
}
});

#[cfg(not(any(target_arch = "wasm32", not(feature = "multi_threaded"))))]
infos.pending_tasks.insert(handle.id().untyped(), task);

#[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
task.detach();

handle
}

Expand Down Expand Up @@ -488,7 +496,7 @@ impl AssetServer {
/// avoid looking up `should_load` twice, but it means you _must_ be sure a load is necessary when calling this function with [`Some`].
async fn load_internal<'a>(
&self,
input_handle: Option<UntypedHandle>,
mut input_handle: Option<UntypedHandle>,
path: AssetPath<'a>,
force: bool,
meta_transform: Option<MetaTransform>,
Expand All @@ -512,6 +520,13 @@ impl AssetServer {
}
})?;

if let Some(meta_transform) = input_handle.as_ref().and_then(|h| h.meta_transform()) {
(*meta_transform)(&mut *meta);
}
// downgrade the input handle so we don't keep the asset alive just because we're loading it
// note we can't just pass a weak handle in, as only strong handles contain the asset meta transform
input_handle = input_handle.map(|h| h.clone_weak());

// This contains Some(UntypedHandle), if it was retrievable
// If it is None, that is because it was _not_ retrievable, due to
// 1. The handle was not already passed in for this path, meaning we can't just use that
Expand Down Expand Up @@ -580,10 +595,6 @@ impl AssetServer {
(handle.clone().unwrap(), path.clone())
};

if let Some(meta_transform) = base_handle.meta_transform() {
(*meta_transform)(&mut *meta);
}

match self
.load_with_meta_loader_and_reader(&base_path, meta, &*loader, &mut *reader, true, false)
.await
Expand Down Expand Up @@ -721,40 +732,42 @@ impl AssetServer {
&self,
future: impl Future<Output = Result<A, E>> + Send + 'static,
) -> Handle<A> {
let handle = self
.data
.infos
.write()
.create_loading_handle_untyped(TypeId::of::<A>(), std::any::type_name::<A>());
let mut infos = self.data.infos.write();
let handle =
infos.create_loading_handle_untyped(TypeId::of::<A>(), std::any::type_name::<A>());
let id = handle.id();

let event_sender = self.data.asset_event_sender.clone();

IoTaskPool::get()
.spawn(async move {
match future.await {
Ok(asset) => {
let loaded_asset = LoadedAsset::new_with_dependencies(asset, None).into();
event_sender
.send(InternalAssetEvent::Loaded { id, loaded_asset })
.unwrap();
}
Err(error) => {
let error = AddAsyncError {
error: Arc::new(error),
};
error!("{error}");
event_sender
.send(InternalAssetEvent::Failed {
id,
path: Default::default(),
error: AssetLoadError::AddAsyncError(error),
})
.unwrap();
}
let task = IoTaskPool::get().spawn(async move {
match future.await {
Ok(asset) => {
let loaded_asset = LoadedAsset::new_with_dependencies(asset, None).into();
event_sender
.send(InternalAssetEvent::Loaded { id, loaded_asset })
.unwrap();
}
})
.detach();
Err(error) => {
let error = AddAsyncError {
error: Arc::new(error),
};
error!("{error}");
event_sender
.send(InternalAssetEvent::Failed {
id,
path: Default::default(),
error: AssetLoadError::AddAsyncError(error),
})
.unwrap();
}
}
});

#[cfg(not(any(target_arch = "wasm32", not(feature = "multi_threaded"))))]
infos.pending_tasks.insert(id, task);

#[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
task.detach();

handle.typed_debug_checked()
}
Expand Down Expand Up @@ -1312,6 +1325,11 @@ pub fn handle_internal_asset_events(world: &mut World) {
info!("Reloading {path} because it has changed");
server.reload(path);
}

#[cfg(not(any(target_arch = "wasm32", not(feature = "multi_threaded"))))]
infos
.pending_tasks
.retain(|_, load_task| !load_task.is_finished());
});
}

Expand Down

0 comments on commit f06cd44

Please sign in to comment.