diff --git a/crates/bevy_asset/src/server/info.rs b/crates/bevy_asset/src/server/info.rs index 98d92702077c7..b63c139f326a2 100644 --- a/crates/bevy_asset/src/server/info.rs +++ b/crates/bevy_asset/src/server/info.rs @@ -5,6 +5,7 @@ use crate::{ UntypedAssetId, UntypedHandle, }; use bevy_ecs::world::World; +use bevy_tasks::Task; use bevy_utils::tracing::warn; use bevy_utils::{Entry, HashMap, HashSet, TypeIdMap}; use crossbeam_channel::Sender; @@ -76,6 +77,7 @@ pub(crate) struct AssetInfos { pub(crate) dependency_loaded_event_sender: TypeIdMap, pub(crate) dependency_failed_event_sender: TypeIdMap, AssetLoadError)>, + pub(crate) pending_tasks: HashMap>, } impl std::fmt::Debug for AssetInfos { @@ -364,6 +366,7 @@ impl AssetInfos { &mut self.path_to_id, &mut self.loader_dependants, &mut self.living_labeled_assets, + &mut self.pending_tasks, self.watching_for_changes, id, ) @@ -587,6 +590,11 @@ impl AssetInfos { } pub(crate) fn process_asset_fail(&mut self, failed_id: UntypedAssetId, error: AssetLoadError) { + // Check whether the handle has been dropped since the asset was loaded. + if !self.infos.contains_key(&failed_id) { + return; + } + let (dependants_waiting_on_load, dependants_waiting_on_rec_load) = { let Some(info) = self.get_mut(failed_id) else { // The asset was already dropped. @@ -648,6 +656,7 @@ impl AssetInfos { path_to_id: &mut HashMap, TypeIdMap>, loader_dependants: &mut HashMap, HashSet>>, living_labeled_assets: &mut HashMap, HashSet>>, + pending_tasks: &mut HashMap>, watching_for_changes: bool, id: UntypedAssetId, ) -> bool { @@ -662,6 +671,8 @@ impl AssetInfos { return false; } + pending_tasks.remove(&id); + let type_id = entry.key().type_id(); let info = entry.remove(); @@ -704,6 +715,7 @@ impl AssetInfos { &mut self.path_to_id, &mut self.loader_dependants, &mut self.living_labeled_assets, + &mut self.pending_tasks, self.watching_for_changes, id.untyped(provider.type_id), ); diff --git a/crates/bevy_asset/src/server/mod.rs b/crates/bevy_asset/src/server/mod.rs index b2ea0466f68f0..ef72d2b404295 100644 --- a/crates/bevy_asset/src/server/mod.rs +++ b/crates/bevy_asset/src/server/mod.rs @@ -368,7 +368,8 @@ impl AssetServer { guard: G, ) -> Handle { let path = path.into().into_owned(); - let (handle, should_load) = self.data.infos.write().get_or_create_path_handle::( + let mut infos = self.data.infos.write(); + let (handle, should_load) = infos.get_or_create_path_handle::( path.clone(), HandleLoadingMode::Request, meta_transform, @@ -377,14 +378,18 @@ impl AssetServer { if should_load { let owned_handle = Some(handle.clone().untyped()); let server = self.clone(); - IoTaskPool::get() - .spawn(async move { - if let Err(err) = server.load_internal(owned_handle, path, false, None).await { - error!("{}", err); - } - drop(guard); - }) - .detach(); + let task = IoTaskPool::get().spawn(async move { + if let Err(err) = server.load_internal(owned_handle, path, false, None).await { + error!("{}", err); + } + drop(guard); + }); + + #[cfg(not(any(target_arch = "wasm32", not(feature = "multi_threaded"))))] + infos.pending_tasks.insert(handle.id().untyped(), task); + + #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))] + task.detach(); } handle @@ -414,44 +419,47 @@ impl AssetServer { CowArc::Owned(format!("{source}--{UNTYPED_SOURCE_SUFFIX}").into()) } }); - let (handle, should_load) = self - .data - .infos - .write() - .get_or_create_path_handle::( - path.clone().with_source(untyped_source), - HandleLoadingMode::Request, - meta_transform, - ); + let mut infos = self.data.infos.write(); + let (handle, should_load) = infos.get_or_create_path_handle::( + path.clone().with_source(untyped_source), + HandleLoadingMode::Request, + meta_transform, + ); if !should_load { return handle; } let id = handle.id().untyped(); + let owned_handle = Some(handle.clone().untyped()); let server = self.clone(); - IoTaskPool::get() - .spawn(async move { - let path_clone = path.clone(); - match server.load_untyped_async(path).await { - Ok(handle) => server.send_asset_event(InternalAssetEvent::Loaded { + let task = IoTaskPool::get().spawn(async move { + let path_clone = path.clone(); + match server.load_internal(owned_handle, path, false, None).await { + Ok(handle) => server.send_asset_event(InternalAssetEvent::Loaded { + id, + loaded_asset: LoadedAsset::new_with_dependencies( + LoadedUntypedAsset { handle }, + None, + ) + .into(), + }), + Err(err) => { + error!("{err}"); + server.send_asset_event(InternalAssetEvent::Failed { id, - loaded_asset: LoadedAsset::new_with_dependencies( - LoadedUntypedAsset { handle }, - None, - ) - .into(), - }), - Err(err) => { - error!("{err}"); - server.send_asset_event(InternalAssetEvent::Failed { - id, - path: path_clone, - error: err, - }); - } + path: path_clone, + error: err, + }); } - }) - .detach(); + } + }); + + #[cfg(not(any(target_arch = "wasm32", not(feature = "multi_threaded"))))] + infos.pending_tasks.insert(handle.id().untyped(), task); + + #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))] + task.detach(); + handle } @@ -488,7 +496,7 @@ impl AssetServer { /// avoid looking up `should_load` twice, but it means you _must_ be sure a load is necessary when calling this function with [`Some`]. async fn load_internal<'a>( &self, - input_handle: Option, + mut input_handle: Option, path: AssetPath<'a>, force: bool, meta_transform: Option, @@ -512,6 +520,13 @@ impl AssetServer { } })?; + if let Some(meta_transform) = input_handle.as_ref().and_then(|h| h.meta_transform()) { + (*meta_transform)(&mut *meta); + } + // downgrade the input handle so we don't keep the asset alive just because we're loading it + // note we can't just pass a weak handle in, as only strong handles contain the asset meta transform + input_handle = input_handle.map(|h| h.clone_weak()); + // This contains Some(UntypedHandle), if it was retrievable // If it is None, that is because it was _not_ retrievable, due to // 1. The handle was not already passed in for this path, meaning we can't just use that @@ -580,10 +595,6 @@ impl AssetServer { (handle.clone().unwrap(), path.clone()) }; - if let Some(meta_transform) = base_handle.meta_transform() { - (*meta_transform)(&mut *meta); - } - match self .load_with_meta_loader_and_reader(&base_path, meta, &*loader, &mut *reader, true, false) .await @@ -721,40 +732,42 @@ impl AssetServer { &self, future: impl Future> + Send + 'static, ) -> Handle { - let handle = self - .data - .infos - .write() - .create_loading_handle_untyped(TypeId::of::(), std::any::type_name::()); + let mut infos = self.data.infos.write(); + let handle = + infos.create_loading_handle_untyped(TypeId::of::(), std::any::type_name::()); let id = handle.id(); let event_sender = self.data.asset_event_sender.clone(); - IoTaskPool::get() - .spawn(async move { - match future.await { - Ok(asset) => { - let loaded_asset = LoadedAsset::new_with_dependencies(asset, None).into(); - event_sender - .send(InternalAssetEvent::Loaded { id, loaded_asset }) - .unwrap(); - } - Err(error) => { - let error = AddAsyncError { - error: Arc::new(error), - }; - error!("{error}"); - event_sender - .send(InternalAssetEvent::Failed { - id, - path: Default::default(), - error: AssetLoadError::AddAsyncError(error), - }) - .unwrap(); - } + let task = IoTaskPool::get().spawn(async move { + match future.await { + Ok(asset) => { + let loaded_asset = LoadedAsset::new_with_dependencies(asset, None).into(); + event_sender + .send(InternalAssetEvent::Loaded { id, loaded_asset }) + .unwrap(); } - }) - .detach(); + Err(error) => { + let error = AddAsyncError { + error: Arc::new(error), + }; + error!("{error}"); + event_sender + .send(InternalAssetEvent::Failed { + id, + path: Default::default(), + error: AssetLoadError::AddAsyncError(error), + }) + .unwrap(); + } + } + }); + + #[cfg(not(any(target_arch = "wasm32", not(feature = "multi_threaded"))))] + infos.pending_tasks.insert(id, task); + + #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))] + task.detach(); handle.typed_debug_checked() } @@ -1312,6 +1325,11 @@ pub fn handle_internal_asset_events(world: &mut World) { info!("Reloading {path} because it has changed"); server.reload(path); } + + #[cfg(not(any(target_arch = "wasm32", not(feature = "multi_threaded"))))] + infos + .pending_tasks + .retain(|_, load_task| !load_task.is_finished()); }); }