From 221d61de95ee71e00a58da86505b7912f1df5f99 Mon Sep 17 00:00:00 2001 From: lucaw Date: Tue, 1 Oct 2024 13:53:11 -0700 Subject: [PATCH 1/4] Web-based procedural asset hook. --- ai2thor/hooks/procedural_asset_hook.py | 225 +++++++++++++++--- unity/Assets/Scripts/AgentManager.cs | 17 +- .../Assets/Scripts/BaseFPSAgentController.cs | 30 +-- .../TestThirdPartyCameraAndMainCamera.cs | 32 +-- 4 files changed, 227 insertions(+), 77 deletions(-) diff --git a/ai2thor/hooks/procedural_asset_hook.py b/ai2thor/hooks/procedural_asset_hook.py index 171ce5173f..b164552eeb 100644 --- a/ai2thor/hooks/procedural_asset_hook.py +++ b/ai2thor/hooks/procedural_asset_hook.py @@ -6,23 +6,33 @@ controller.step to locally run some local code """ +import concurrent.futures import logging import os -import warnings import pathlib +import tarfile +import warnings +from concurrent.futures import ThreadPoolExecutor +from contextlib import nullcontext +from tempfile import TemporaryDirectory +from typing import Dict, Any, List, TYPE_CHECKING, Sequence + +import requests +import tqdm +from filelock import FileLock -from typing import Dict, Any, List +if TYPE_CHECKING: + from ai2thor.controller import Controller from objathor.asset_conversion.util import ( - get_existing_thor_asset_file_path, create_runtime_asset_file, get_existing_thor_asset_file_path, change_asset_paths, add_default_annotations, - load_existing_thor_asset_file, ) -logger = logging.getLogger(__name__) +logger = logging.getLogger(os.path.basename(__file__)) +logger.setLevel(logging.INFO) EXTENSIONS_LOADABLE_IN_UNITY = { ".json", @@ -47,12 +57,12 @@ def get_all_asset_ids_recursively(objects: List[Dict[str, Any]], asset_ids: List def create_asset( - thor_controller, - asset_id, - asset_directory, + thor_controller: "Controller", + asset_id: str, + asset_directory: str, copy_to_dir=None, verbose=False, - load_file_in_unity=False, + load_file_in_unity=True, extension=None, raise_for_failure=True, fail_if_not_unity_loadable=False, @@ -76,7 +86,7 @@ def create_assets( assets_dir: str, copy_to_dir=None, verbose=False, - load_file_in_unity=False, + load_file_in_unity=True, extension=None, fail_if_not_unity_loadable=False, raise_for_failure=True, @@ -251,10 +261,10 @@ def create_assets_if_not_exist( class ProceduralAssetHookRunner: def __init__( self, - asset_directory, + asset_directory: str, target_dir="processed_models", asset_symlink=True, - load_file_in_unity=False, + load_file_in_unity=True, stop_if_fail=False, asset_limit=-1, extension=None, @@ -320,27 +330,180 @@ def GetHouseFromTemplate(self, action, controller): ) -class ObjaverseAssetHookRunner(object): - def __init__(self): - import objaverse +def download_with_progress_bar(save_path: str, url: str, verbose: bool = False): + os.makedirs(os.path.dirname(save_path), exist_ok=True) + + with open(save_path, "wb") as f: + if verbose: + print(f"Downloading to {save_path}") - self.objaverse_uid_set = set(objaverse.load_uids()) + response = requests.get(url, stream=True) + total_length = response.headers.get("content-length") - def CreateHouse(self, action, controller): - raise NotImplemented("Not yet implemented.") + content_type = response.headers.get("content-type") + if content_type is not None and content_type.startswith("text/html"): + raise ValueError(f"Invalid URL: {url}") - house = action["house"] - asset_ids = list(set(obj["assetId"] for obj in house["objects"])) - evt = controller.step(action="AssetsInDatabase", assetIds=asset_ids) - asset_in_db = evt.metadata["actionReturn"] - assets_not_created = [asset_id for (asset_id, in_db) in asset_in_db.items() if in_db] - not_created_set = set(assets_not_created) - not_objeverse_not_created = not_created_set.difference(self.objaverse_uid_set) - if len(not_created_set): - raise ValueError( - f"Invalid asset ids are not in THOR AssetDatabase or part of objeverse: {not_objeverse_not_created}" + if total_length is None: # no content length header + f.write(response.content) + else: + dl = 0 + total_length = int(total_length) + + with ( + tqdm.tqdm( + total=total_length, + unit="B", + unit_scale=True, + desc=f"Downloading asset {url}", + ) + if verbose + else nullcontext() + ) as pbar: + for data in response.iter_content(chunk_size=4096): + dl += len(data) + f.write(data) + if verbose: + pbar.update(len(data)) + + +def download_missing_asset( + asset_id: str, + asset_directory: str, + base_url: str, + verbose: bool = False, +) -> str: + final_save_dir = os.path.join(asset_directory, asset_id) + + if os.path.exists(final_save_dir): + if any(f"{asset_id}." in p for p in os.listdir(final_save_dir)): + return final_save_dir + else: + print( + f"Directory {final_save_dir} exists but could not find" + f" asset {asset_id} in it. Will attempt to redownload." ) - # TODO when transformed assets are in objaverse download them and create them - # objaverse.load_thor_objects - # create_assets() + url = f"{base_url.strip('/')}/{asset_id}.tar" + + td = TemporaryDirectory() + with td as td_name: + save_path = os.path.join(td_name, f"{asset_id}.tar") + + download_with_progress_bar(save_path=save_path, url=url, verbose=verbose) + + os.makedirs(asset_directory, exist_ok=True) + + # Loop through all the files in the tar file and extract them one by one + # to the asset directory keeping the directory structure + with FileLock(os.path.join(os.path.expanduser("~"), ".ai2thor", "asset_extraction.lock")): + with tarfile.open(save_path, "r") as tar: + # Here we sort the members so that the . file is last to ensure that the object + # file is the last thing to be saved to the final location. We do this because + # we check for the existence of the . file to determine if the asset + # has been successfully downloaded previously and we want to avoid partial downloads. + for member in sorted(tar.getmembers(), key=lambda x: f"{asset_id}." in x.name): + # if "_renders" not in member.name and "success.txt" not in member.name: + tar.extract(member=member, path=asset_directory) + return final_save_dir + + +def wait_for_futures_and_raise_errors( + futures: Sequence[concurrent.futures.Future], +) -> Sequence[Any]: + results = [] + concurrent.futures.wait(futures) + for future in futures: + try: + results.append(future.result()) # This will re-raise any exceptions + except Exception: + raise + return results + + +def download_missing_assets( + asset_ids: Sequence[str], + asset_directory: str, + base_url: str, + verbose: bool = True, + threads: int = 1, +): + if verbose and threads > 1: + print(f"Downloading assets with {threads} threads. Will NOT log progress bars.") + + asset_ids = sorted(set(asset_ids)) + + with ThreadPoolExecutor(max_workers=threads) as executor: + futures = [ + executor.submit( + download_missing_asset, + asset_id=asset_id, + asset_directory=asset_directory, + base_url=base_url, + verbose=verbose and (threads == 1), + ) + for asset_id in asset_ids + ] + wait_for_futures_and_raise_errors(futures) + + +class WebProceduralAssetHookRunner(ProceduralAssetHookRunner): + def __init__( + self, + asset_directory: str, + base_url: str, + target_dir: str, + asset_symlink=True, + load_file_in_unity=True, + stop_if_fail=False, + asset_limit=-1, + extension=None, + verbose=True, + ): + super().__init__( + asset_directory=asset_directory, + target_dir=target_dir, + asset_symlink=asset_symlink, + load_file_in_unity=load_file_in_unity, + stop_if_fail=stop_if_fail, + asset_limit=asset_limit, + extension=extension, + verbose=verbose, + ) + self.base_url = base_url + + def _download_missing_assets(self, controller: "Controller", asset_ids: Sequence[str]): + asset_in_db = controller.step( + action="AssetsInDatabase", assetIds=asset_ids, updateProceduralLRUCache=False + ).metadata["actionReturn"] + assets_not_created = [asset_id for (asset_id, in_db) in asset_in_db.items() if not in_db] + download_missing_assets( + asset_ids=assets_not_created, + asset_directory=self.asset_directory, + base_url=self.base_url, + ) + + def Initialize(self, action, controller): + if self.asset_limit > 0: + return controller.step( + action="DeleteLRUFromProceduralCache", assetLimit=self.asset_limit + ) + + def CreateHouse(self, action: Dict[str, Any], controller: "Controller"): + house = action["house"] + asset_ids = get_all_asset_ids_recursively(house["objects"], []) + self._download_missing_assets(controller=controller, asset_ids=asset_ids) + + return super().CreateHouse(action=action, controller=controller) + + def SpawnAsset(self, action, controller): + self._download_missing_assets(controller=controller, asset_ids=[action["assetId"]]) + + return super().SpawnAsset(action=action, controller=controller) + + def GetHouseFromTemplate(self, action, controller): + template = action["template"] + asset_ids = get_all_asset_ids_recursively([v for (k, v) in template["objects"].items()], []) + self._download_missing_assets(controller=controller, asset_ids=asset_ids) + + super().GetHouseFromTemplate(action=action, controller=controller) diff --git a/unity/Assets/Scripts/AgentManager.cs b/unity/Assets/Scripts/AgentManager.cs index 1b869c9fad..54fa8055f4 100644 --- a/unity/Assets/Scripts/AgentManager.cs +++ b/unity/Assets/Scripts/AgentManager.cs @@ -1310,17 +1310,22 @@ bool shouldRenderImageSynthesis if (camera.transform.parent != null) { cMetadata.parentObjectName = camera.transform.parent.name; - cMetadata.parentRelativeThirdPartyCameraPosition = - camera.transform.localPosition; + cMetadata.parentRelativeThirdPartyCameraPosition = camera + .transform + .localPosition; //get third party camera rotation as quaternion in parent space - cMetadata.parentRelativeThirdPartyCameraRotation = - camera.transform.localEulerAngles; + cMetadata.parentRelativeThirdPartyCameraRotation = camera + .transform + .localEulerAngles; } else { //if not parented, default position and rotation to world coordinate space cMetadata.parentObjectName = ""; cMetadata.parentRelativeThirdPartyCameraPosition = camera.transform.position; - cMetadata.parentRelativeThirdPartyCameraRotation = camera.transform.rotation.eulerAngles; + cMetadata.parentRelativeThirdPartyCameraRotation = camera + .transform + .rotation + .eulerAngles; } //if this camera is part of the agent's hierarchy at all, get agent relative info @@ -1337,7 +1342,7 @@ bool shouldRenderImageSynthesis agentSpaceCameraRotationAsQuaternion.eulerAngles; } else { //if this third party camera is not a child of the agent, we don't need agent-relative coordinates - //Note: We don't default this to world space because in the case of a multi-agent scenario, the agent + //Note: We don't default this to world space because in the case of a multi-agent scenario, the agent //to be relative to is ambiguous and UHHHHH cMetadata.agentRelativeThirdPartyCameraPosition = null; cMetadata.agentRelativeThirdPartyCameraRotation = null; diff --git a/unity/Assets/Scripts/BaseFPSAgentController.cs b/unity/Assets/Scripts/BaseFPSAgentController.cs index 1ad6e89011..af3594c651 100644 --- a/unity/Assets/Scripts/BaseFPSAgentController.cs +++ b/unity/Assets/Scripts/BaseFPSAgentController.cs @@ -7729,21 +7729,21 @@ public ActionFinished CreateRuntimeAsset( procAsset.parentTexturesDir = Path.Combine(dir, id); var assetData = ProceduralTools.CreateAsset( - procAsset.vertices, - procAsset.normals, - procAsset.name, - procAsset.triangles, - procAsset.uvs, - procAsset.albedoTexturePath, - procAsset.metallicSmoothnessTexturePath, - procAsset.normalTexturePath, - procAsset.emissionTexturePath, - procAsset.colliders, - procAsset.physicalProperties, - procAsset.visibilityPoints, - procAsset.annotations ?? annotations, - procAsset.receptacleCandidate, - procAsset.yRotOffset, + vertices: procAsset.vertices, + normals: procAsset.normals, + name: procAsset.name, + triangles: procAsset.triangles, + uvs: procAsset.uvs, + albedoTexturePath: procAsset.albedoTexturePath, + metallicSmoothnessTexturePath: procAsset.metallicSmoothnessTexturePath, + normalTexturePath: procAsset.normalTexturePath, + emissionTexturePath: procAsset.emissionTexturePath, + colliders: procAsset.colliders, + physicalProperties: procAsset.physicalProperties, + visibilityPoints: procAsset.visibilityPoints, + annotations: procAsset.annotations ?? annotations, + receptacleCandidate: procAsset.receptacleCandidate, + yRotOffset: procAsset.yRotOffset, returnObject: true, serializable: serializable, parent: null, diff --git a/unity/Assets/UnitTests/TestThirdPartyCameraAndMainCamera.cs b/unity/Assets/UnitTests/TestThirdPartyCameraAndMainCamera.cs index cd20598ace..d3857f9fad 100644 --- a/unity/Assets/UnitTests/TestThirdPartyCameraAndMainCamera.cs +++ b/unity/Assets/UnitTests/TestThirdPartyCameraAndMainCamera.cs @@ -520,7 +520,7 @@ public IEnumerator TestThirdPartyCameraMetadataReturn() metadata.thirdPartyCameras[0].parentRelativeThirdPartyCameraRotation.z, 30.0000000000f ); - Assert.AreEqual(result, true); + Assert.AreEqual(result, true); Assert.AreEqual(metadata.thirdPartyCameras[0].parentObjectName, ""); action.Clear(); @@ -607,50 +607,32 @@ public IEnumerator TestThirdPartyCameraMetadataReturn() ); Assert.AreEqual(result, true); result = Mathf.Approximately( - metadata - .thirdPartyCameras[0] - .parentRelativeThirdPartyCameraPosition - .x, + metadata.thirdPartyCameras[0].parentRelativeThirdPartyCameraPosition.x, 1.0000000000f ); Assert.AreEqual(result, true); result = Mathf.Approximately( - metadata - .thirdPartyCameras[0] - .parentRelativeThirdPartyCameraPosition - .y, + metadata.thirdPartyCameras[0].parentRelativeThirdPartyCameraPosition.y, 2.0000000000f ); Assert.AreEqual(result, true); result = Mathf.Approximately( - metadata - .thirdPartyCameras[0] - .parentRelativeThirdPartyCameraPosition - .z, + metadata.thirdPartyCameras[0].parentRelativeThirdPartyCameraPosition.z, 3.0000020000f ); Assert.AreEqual(result, true); result = Mathf.Approximately( - metadata - .thirdPartyCameras[0] - .parentRelativeThirdPartyCameraRotation - .x, + metadata.thirdPartyCameras[0].parentRelativeThirdPartyCameraRotation.x, 20.0000000000f ); Assert.AreEqual(result, true); result = Mathf.Approximately( - metadata - .thirdPartyCameras[0] - .parentRelativeThirdPartyCameraRotation - .y, + metadata.thirdPartyCameras[0].parentRelativeThirdPartyCameraRotation.y, 20.0000000000f ); Assert.AreEqual(result, true); result = Mathf.Approximately( - metadata - .thirdPartyCameras[0] - .parentRelativeThirdPartyCameraRotation - .z, + metadata.thirdPartyCameras[0].parentRelativeThirdPartyCameraRotation.z, 20.0000000000f ); Assert.AreEqual(result, true); From 9a013c759b961d2a3d350ae0af23d191f64be77c Mon Sep 17 00:00:00 2001 From: AlvaroHG Date: Thu, 24 Oct 2024 13:56:54 -0700 Subject: [PATCH 2/4] Web assets support --- .../Assets/Scripts/BaseFPSAgentController.cs | 138 ++++++++++-------- 1 file changed, 81 insertions(+), 57 deletions(-) diff --git a/unity/Assets/Scripts/BaseFPSAgentController.cs b/unity/Assets/Scripts/BaseFPSAgentController.cs index af3594c651..5a52738ef4 100644 --- a/unity/Assets/Scripts/BaseFPSAgentController.cs +++ b/unity/Assets/Scripts/BaseFPSAgentController.cs @@ -3,11 +3,11 @@ using System.Collections; using System.Collections.Generic; using System.IO; -using System.IO; using System.IO.Compression; using System.Linq; using System.Linq.Expressions; using System.Reflection; +using System.Threading.Tasks; using MessagePack; using MessagePack.Formatters; using MessagePack.Resolvers; @@ -22,6 +22,7 @@ using UnityStandardAssets.CrossPlatformInput; using UnityStandardAssets.ImageEffects; using UnityStandardAssets.Utility; +using Diagnostics = System.Diagnostics; using Random = UnityEngine.Random; namespace UnityStandardAssets.Characters.FirstPerson { @@ -7602,7 +7603,7 @@ string backTexturePath actionFinished(true); } - public ActionFinished CreateRuntimeAsset(ProceduralAsset asset) { + public ActionFinished CreateRuntimeAsset(ProceduralAsset asset, bool returnObject = false) { var assetDb = GameObject.FindObjectOfType(); if (assetDb.ContainsAssetKey(asset.name)) { return new ActionFinished( @@ -7627,27 +7628,20 @@ public ActionFinished CreateRuntimeAsset(ProceduralAsset asset) { annotations: asset.annotations, receptacleCandidate: asset.receptacleCandidate, yRotOffset: asset.yRotOffset, + returnObject: returnObject, serializable: asset.serializable, parentTexturesDir: asset.parentTexturesDir ); return new ActionFinished { success = true, actionReturn = assetData }; } - public ActionFinished CreateRuntimeAsset( + private static async Task LoadAssetAsync( string id, string dir, string extension = ".msgpack.gz", ObjectAnnotations annotations = null, bool serializable = false ) { - var assetDb = GameObject.FindObjectOfType(); - if (assetDb.ContainsAssetKey(id)) { - return new ActionFinished( - success: false, - errorMessage: $"'{id}' already exists in ProceduralAssetDatabase, trying to create procedural object twice, call `SpawnAsset` instead.", - toEmitState: true - ); - } var validDirs = new List() { Application.persistentDataPath, @@ -7663,20 +7657,14 @@ public ActionFinished CreateRuntimeAsset( extension = !extension.StartsWith(".") ? $".{extension}" : extension; extension = extension.Trim(); if (!supportedExtensions.Contains(extension)) { - return new ActionFinished( - success: false, - errorMessage: $"Unsupported extension `{extension}`. Only supported: {string.Join(", ", supportedExtensions)}", - actionReturn: null + throw new ArgumentException( + $"Unsupported extension `{extension}`. Only supported: {string.Join(", ", supportedExtensions)}" ); } var filename = $"{id}{extension}"; var filepath = Path.GetFullPath(Path.Combine(dir, id, filename)); if (!File.Exists(filepath)) { - return new ActionFinished( - success: false, - actionReturn: null, - errorMessage: $"Asset fiile '{filepath}' does not exist." - ); + throw new FileNotFoundException($"Asset file '{filepath}' does not exist."); } // to support different @@ -7716,43 +7704,46 @@ public ActionFinished CreateRuntimeAsset( ObjectCreationHandling = ObjectCreationHandling.Replace }; var json = reader.ReadToEnd(); - // procAsset = Newtonsoft.Json.JsonConvert.DeserializeObject(reader.ReadToEnd(), serializer); procAsset = JsonConvert.DeserializeObject(json); } else { - return new ActionFinished( - success: false, - errorMessage: $"Unexpected error with extension `{extension}`, filepath: `{filepath}`, compression stages: {string.Join(".", presentStages)}. Only supported: {string.Join(", ", supportedExtensions)}", - actionReturn: null + throw new ArgumentException( + $"Unexpected error with extension `{extension}`, filepath: `{filepath}`, compression stages: {string.Join(".", presentStages)}. Only supported: {string.Join(", ", supportedExtensions)}" ); } procAsset.parentTexturesDir = Path.Combine(dir, id); - var assetData = ProceduralTools.CreateAsset( - vertices: procAsset.vertices, - normals: procAsset.normals, - name: procAsset.name, - triangles: procAsset.triangles, - uvs: procAsset.uvs, - albedoTexturePath: procAsset.albedoTexturePath, - metallicSmoothnessTexturePath: procAsset.metallicSmoothnessTexturePath, - normalTexturePath: procAsset.normalTexturePath, - emissionTexturePath: procAsset.emissionTexturePath, - colliders: procAsset.colliders, - physicalProperties: procAsset.physicalProperties, - visibilityPoints: procAsset.visibilityPoints, - annotations: procAsset.annotations ?? annotations, - receptacleCandidate: procAsset.receptacleCandidate, - yRotOffset: procAsset.yRotOffset, - returnObject: true, - serializable: serializable, - parent: null, - addAnotationComponent: false, - parentTexturesDir: procAsset.parentTexturesDir - ); + return procAsset; + } + + public ActionFinished CreateRuntimeAsset( + string id, + string dir, + string extension = ".msgpack.gz", + ObjectAnnotations annotations = null, + bool serializable = false + ) { + var assetDb = GameObject.FindObjectOfType(); + if (assetDb.ContainsAssetKey(id)) { + return new ActionFinished( + success: false, + errorMessage: $"'{id}' already exists in ProceduralAssetDatabase, trying to create procedural object twice, call `SpawnAsset` instead.", + toEmitState: true + ); + } + + var procAsset = LoadAssetAsync( + id: id, + dir: dir, + extension: extension, + annotations: annotations, + serializable: serializable + ).Result; + procAsset.serializable = serializable; + procAsset.annotations = procAsset.annotations ?? annotations; // Debug.Log($"root is null? {parent == null} - {parent}"); - return new ActionFinished(success: true, actionReturn: assetData); + return CreateRuntimeAsset(asset: procAsset, returnObject: true); } public class UnityLoadableAsset { @@ -7767,18 +7758,51 @@ public ActionFinished CreateRuntimeAssets( List assets, string dir = null ) { - foreach (var asset in assets) { - var actionFinished = CreateRuntimeAsset( - id: asset.id, - dir: dir ?? asset.dir, - extension: asset.extension, - annotations: asset.annotations + try { +#if UNITY_EDITOR + Diagnostics.Stopwatch stopWatch = new Diagnostics.Stopwatch(); + stopWatch.Start(); +#endif + // Load assets in parallel + var loadTasks = assets + .Select(asset => + LoadAssetAsync( + id: asset.id, + dir: dir ?? asset.dir, + extension: asset.extension, + annotations: asset.annotations + ) + ) + .ToList(); + Task.WhenAll(loadTasks).Wait(); + + var loadedAssets = loadTasks.Select(t => t.Result).ToList(); + +#if UNITY_EDITOR + stopWatch.Stop(); + Debug.Log( + $"LoadAssetAsync took {stopWatch.ElapsedMilliseconds} ms, per asset time {stopWatch.ElapsedMilliseconds / assets.Count} ms" ); - if (!actionFinished.success) { - return actionFinished; + stopWatch.Restart(); +#endif + // Create assets serially + foreach (var (asset, procAsset) in assets.Zip(loadedAssets, (a, p) => (a, p))) { + var actionFinished = CreateRuntimeAsset(asset: procAsset); + if (!actionFinished.success) { + return actionFinished; + } } +#if UNITY_EDITOR + stopWatch.Stop(); + Debug.Log( + $"CreateRuntimeAsset loop took {stopWatch.ElapsedMilliseconds} ms, per asset time {stopWatch.ElapsedMilliseconds / assets.Count} ms" + ); +#endif + + return ActionFinished.Success; + } catch (Exception ex) { + return new ActionFinished(success: false, errorMessage: ex.Message); } - return ActionFinished.Success; } public void GetStreamingAssetsPath() { From ab9d47640d55c07cb1f3c0a8a012e5b2fc179c52 Mon Sep 17 00:00:00 2001 From: AlvaroHG Date: Thu, 31 Oct 2024 18:28:57 -0700 Subject: [PATCH 3/4] Last refactors and testing --- ai2thor/hooks/procedural_asset_callback.py | 565 +++++++++++++++++++++ 1 file changed, 565 insertions(+) create mode 100644 ai2thor/hooks/procedural_asset_callback.py diff --git a/ai2thor/hooks/procedural_asset_callback.py b/ai2thor/hooks/procedural_asset_callback.py new file mode 100644 index 0000000000..b729d05a9b --- /dev/null +++ b/ai2thor/hooks/procedural_asset_callback.py @@ -0,0 +1,565 @@ +# Copyright Allen Institute for Artificial Intelligence 2017 +""" +ai2thor.action_hook + +Hook runner with method names that are intercepted before running +controller.step to locally run some local code + +""" +import concurrent.futures +import logging +import os +import pathlib +import tarfile +import warnings +from concurrent.futures import ThreadPoolExecutor +from contextlib import nullcontext +from tempfile import TemporaryDirectory +from typing import Dict, Any, List, TYPE_CHECKING, Sequence + +import requests +import tqdm +from filelock import FileLock + +if TYPE_CHECKING: + from ai2thor.controller import Controller + +from objathor.asset_conversion.util import ( + get_existing_thor_asset_file_path, + create_runtime_asset_file, + get_existing_thor_asset_file_path, + change_asset_paths, + add_default_annotations, + load_existing_thor_asset_file, +) + +from objathor.dataset import load_assets_path, DatasetSaveConfig + +logger = logging.getLogger(os.path.basename(__file__)) +logger.setLevel(logging.INFO) + +EXTENSIONS_LOADABLE_IN_UNITY = { + ".json", + ".json.gz", + ".msgpack", + ".msgpack.gz", +} + + +def get_all_asset_ids_recursively(objects: List[Dict[str, Any]], asset_ids: List[str]) -> List[str]: + """ + Get all asset IDs in a house. + """ + for obj in objects: + asset_ids.append(obj["assetId"]) + if "children" in obj: + get_all_asset_ids_recursively(obj["children"], asset_ids) + assets_set = set(asset_ids) + if "" in assets_set: + assets_set.remove("") + return list(assets_set) + + +def create_asset( + thor_controller: "Controller", + asset_id: str, + asset_directory: str, + copy_to_dir=None, + verbose=False, + load_file_in_unity=True, + extension=None, + raise_for_failure=True, + fail_if_not_unity_loadable=False, +): + return create_assets( + thor_controller=thor_controller, + asset_ids=[asset_id], + assets_dir=asset_directory, + copy_to_dir=copy_to_dir, + verbose=verbose, + load_file_in_unity=load_file_in_unity, + extension=extension, + fail_if_not_unity_loadable=fail_if_not_unity_loadable, + raise_for_failure=raise_for_failure, + ) + + +def create_assets( + thor_controller: Any, + asset_ids: List[str], + assets_dir: str, + copy_to_dir=None, + verbose=False, + load_file_in_unity=True, + extension=None, + fail_if_not_unity_loadable=False, + raise_for_failure=True, +): + copy_to_dir = ( + os.path.join(thor_controller._build.base_dir) if copy_to_dir is None else copy_to_dir + ) + + multi_create_unity_loadable = dict( + action="CreateRuntimeAssets", + assets=[], + dir=copy_to_dir, + raise_for_failure=raise_for_failure, + ) + + create_with_data_actions = [] + errors = [] + + for asset_id in asset_ids: + asset_dir = os.path.join(assets_dir, asset_id) + # Verifies the file exists + asset_path = get_existing_thor_asset_file_path( + out_dir=asset_dir, asset_id=asset_id, force_extension=extension + ) + file_extension = ( + "".join(pathlib.Path(asset_path).suffixes) if extension is None else extension + ) + load_asset_in_unity = load_file_in_unity + + if file_extension not in EXTENSIONS_LOADABLE_IN_UNITY: + load_asset_in_unity = False + if fail_if_not_unity_loadable: + errors.append(asset_path) + continue + + # save_dir = os.path.join(controller._build.base_dir, "processed_models") + os.makedirs(copy_to_dir, exist_ok=True) + + if verbose: + logger.info(f"Copying asset to THOR build dir: {copy_to_dir}.") + + asset = create_runtime_asset_file( + asset_directory=asset_dir, + save_dir=copy_to_dir, + asset_id=asset_id, + load_file_in_unity=load_asset_in_unity, + verbose=verbose, + ) + + if not load_asset_in_unity: + # TODO refactor to this when objathor changes + # asset = load_existing_thor_asset_file( + # out_dir=asset_target_dir, object_name=asset_id, force_extension=file_extension + # ) + asset = change_asset_paths(asset=asset, save_dir=copy_to_dir) + asset = add_default_annotations(asset=asset, asset_directory=asset_dir, verbose=verbose) + create_prefab_action = { + "action": "CreateRuntimeAsset", + "asset": asset, + "raise_for_failure": raise_for_failure, + } + create_with_data_actions.append(create_prefab_action) + else: + asset_args = { + "id": asset_id, + "extension": file_extension, + } + asset_args = add_default_annotations( + asset=asset_args, asset_directory=asset_dir, verbose=verbose + ) + multi_create_unity_loadable["assets"].append(asset_args) + + if fail_if_not_unity_loadable: + error_strs = ", ".join(errors) + extensions = ", ".join(EXTENSIONS_LOADABLE_IN_UNITY) + raise RuntimeError( + f"Set to fail if files are not loadable in unity. Invalid extension of files `{error_strs}` must be of any of extensions {extensions}" + ) + + events = [] + # Slow pass asset data to pipe + if len(create_with_data_actions): + for create_asset_action in create_with_data_actions: + evt = thor_controller.step(**create_asset_action) + events.append(evt) + if verbose: + logger.info(f"Last Action: {thor_controller.last_action['action']}") + + if len(multi_create_unity_loadable): + evt = thor_controller.step(**multi_create_unity_loadable) + events.append(evt) + if verbose: + logger.debug(f"Last Action: {thor_controller.last_action['action']}") + + for evt in events: + if not evt.metadata["lastActionSuccess"]: + logger.error( + f'Error: {evt.metadata["errorMessage"]}' + f"\nLast Action: {thor_controller.last_action['action']}" + f"\nAction success: {evt.metadata['lastActionSuccess']}" + ) + return events + + +def create_assets_if_not_exist( + controller, + asset_ids, + asset_directory, + copy_to_dir, + asset_symlink, # TODO remove + stop_if_fail, + load_file_in_unity, + extension=None, + verbose=False, + raise_for_failure=True, + fail_if_not_unity_loadable=False, +): + evt = controller.step( + action="AssetsInDatabase", assetIds=asset_ids, updateProceduralLRUCache=True + ) + + asset_in_db = evt.metadata["actionReturn"] + assets_not_created = [asset_id for (asset_id, in_db) in asset_in_db.items() if not in_db] + + events = create_assets( + thor_controller=controller, + asset_ids=assets_not_created, + assets_dir=asset_directory, + copy_to_dir=copy_to_dir, + verbose=verbose, + load_file_in_unity=load_file_in_unity, + extension=extension, + fail_if_not_unity_loadable=fail_if_not_unity_loadable, + ) + for evt, i in zip(events, range(len(events))): + if not evt.metadata["lastActionSuccess"]: + # TODO do a better matching of asset_ids and event + asset_id = assets_not_created[i] if i < len(events) else None + asset_path = ( + get_existing_thor_asset_file_path(out_dir=asset_directory, asset_id=asset_id) + if asset_id is not None + else "" + ) + warnings.warn( + f"Could not create asset `{asset_path}`." f"\nError: {evt.metadata['errorMessage']}" + ) + return events[-1] + + # slower + # for asset_id in assets_not_created: + # asset_dir = os.path.abspath(os.path.join(asset_directory, asset_id)) + # # print(f"Create {asset_id}") + # evt = create_asset( + # thor_controller=controller, + # asset_id=asset_id, + # asset_directory=asset_dir, + # copy_to_dir=copy_to_dir, + # verbose=verbose, + # load_file_in_unity=load_file_in_unity, + # extension=None, + # # raise_for_failure=raise_for_failure, + # ) + # if not evt.metadata["lastActionSuccess"]: + # warnings.warn( + # f"Could not create asset `{get_existing_thor_asset_file_path(out_dir=asset_dir, asset_id=asset_id)}`." + # f"\nError: {evt.metadata['errorMessage']}" + # ) + # if stop_if_fail: + # return evt + + +class ProceduralAssetActionCallback: + def __init__( + self, + asset_directory: str, + target_dir="processed_models", + asset_symlink=True, + load_file_in_unity=True, + stop_if_fail=False, + asset_limit=-1, + extension=None, + verbose=True, + ): + self.asset_directory = asset_directory + self.asset_symlink = asset_symlink + self.stop_if_fail = stop_if_fail + self.asset_limit = asset_limit + self.load_file_in_unity = load_file_in_unity + self.target_dir = target_dir + self.extension = extension + self.verbose = verbose + self.last_asset_id_set = set() + + def Initialize(self, action, controller): + if self.asset_limit > 0: + return controller.step( + action="DeleteLRUFromProceduralCache", assetLimit=self.asset_limit + ) + + def CreateHouse(self, action, controller): + house = action["house"] + asset_ids = get_all_asset_ids_recursively(house["objects"], []) + asset_ids_set = set(asset_ids) + if not asset_ids_set.issubset(self.last_asset_id_set): + controller.step(action="DeleteLRUFromProceduralCache", assetLimit=0) + self.last_asset_id_set = set(asset_ids) + return create_assets_if_not_exist( + controller=controller, + asset_ids=asset_ids, + asset_directory=self.asset_directory, + copy_to_dir=os.path.join(controller._build.base_dir, self.target_dir), + asset_symlink=self.asset_symlink, + stop_if_fail=self.stop_if_fail, + load_file_in_unity=self.load_file_in_unity, + extension=self.extension, + verbose=self.verbose, + ) + + def SpawnAsset(self, action, controller): + asset_ids = [action["assetId"]] + return create_assets_if_not_exist( + controller=controller, + asset_ids=asset_ids, + asset_directory=self.asset_directory, + copy_to_dir=os.path.join(controller._build.base_dir, self.target_dir), + asset_symlink=self.asset_symlink, + stop_if_fail=self.stop_if_fail, + load_file_in_unity=self.load_file_in_unity, + extension=self.extension, + verbose=self.verbose, + ) + + def GetHouseFromTemplate(self, action, controller): + template = action["template"] + asset_ids = get_all_asset_ids_recursively([v for (k, v) in template["objects"].items()], []) + return create_assets_if_not_exist( + controller=controller, + asset_ids=asset_ids, + asset_directory=self.asset_directory, + copy_to_dir=os.path.join(controller._build.base_dir, self.target_dir), + asset_symlink=self.asset_symlink, + stop_if_fail=self.stop_if_fail, + load_file_in_unity=self.load_file_in_unity, + extension=self.extension, + verbose=self.verbose, + ) + + +class DownloadObjaverseActionCallback(object): + def __init__( + self, + asset_dataset_version, + asset_download_path, + target_dir="processed_models", + asset_symlink=True, + load_file_in_unity=False, + stop_if_fail=False, + asset_limit=-1, + extension=None, + verbose=True, + ): + self.asset_download_path = asset_download_path + self.asset_symlink = asset_symlink + self.stop_if_fail = stop_if_fail + self.asset_limit = asset_limit + self.load_file_in_unity = load_file_in_unity + self.target_dir = target_dir + self.extension = extension + self.verbose = verbose + self.last_asset_id_set = set() + dsc = DatasetSaveConfig( + VERSION=asset_dataset_version, + BASE_PATH=asset_download_path, + ) + self.asset_path = load_assets_path(dsc) + + def CreateHouse(self, action, controller): + house = action["house"] + asset_ids = get_all_asset_ids_recursively(house["objects"], []) + asset_ids_set = set(asset_ids) + if not asset_ids_set.issubset(self.last_asset_id_set): + controller.step(action="DeleteLRUFromProceduralCache", assetLimit=0) + self.last_asset_id_set = set(asset_ids) + return create_assets_if_not_exist( + controller=controller, + asset_ids=asset_ids, + asset_directory=self.asset_path, + copy_to_dir=os.path.join(controller._build.base_dir, self.target_dir), + asset_symlink=self.asset_symlink, + stop_if_fail=self.stop_if_fail, + load_file_in_unity=self.load_file_in_unity, + extension=self.extension, + verbose=self.verbose, + ) + +def download_with_progress_bar(save_path: str, url: str, verbose: bool = False): + os.makedirs(os.path.dirname(save_path), exist_ok=True) + + with open(save_path, "wb") as f: + if verbose: + print(f"Downloading to {save_path}") + + response = requests.get(url, stream=True) + total_length = response.headers.get("content-length") + + content_type = response.headers.get("content-type") + if content_type is not None and content_type.startswith("text/html"): + raise ValueError(f"Invalid URL: {url}") + + if total_length is None: # no content length header + f.write(response.content) + else: + dl = 0 + total_length = int(total_length) + + with ( + tqdm.tqdm( + total=total_length, + unit="B", + unit_scale=True, + desc=f"Downloading asset {url}", + ) + if verbose + else nullcontext() + ) as pbar: + for data in response.iter_content(chunk_size=4096): + dl += len(data) + f.write(data) + if verbose: + pbar.update(len(data)) + + +def download_missing_asset( + asset_id: str, + asset_directory: str, + base_url: str, + verbose: bool = False, +) -> str: + final_save_dir = os.path.join(asset_directory, asset_id) + + if os.path.exists(final_save_dir): + if any(f"{asset_id}." in p for p in os.listdir(final_save_dir)): + return final_save_dir + else: + print( + f"Directory {final_save_dir} exists but could not find" + f" asset {asset_id} in it. Will attempt to redownload." + ) + + url = f"{base_url.strip('/')}/{asset_id}.tar" + + td = TemporaryDirectory() + with td as td_name: + save_path = os.path.join(td_name, f"{asset_id}.tar") + + download_with_progress_bar(save_path=save_path, url=url, verbose=verbose) + + os.makedirs(asset_directory, exist_ok=True) + + # Loop through all the files in the tar file and extract them one by one + # to the asset directory keeping the directory structure + with FileLock(os.path.join(os.path.expanduser("~"), ".ai2thor", "asset_extraction.lock")): + with tarfile.open(save_path, "r") as tar: + # Here we sort the members so that the . file is last to ensure that the object + # file is the last thing to be saved to the final location. We do this because + # we check for the existence of the . file to determine if the asset + # has been successfully downloaded previously and we want to avoid partial downloads. + for member in sorted(tar.getmembers(), key=lambda x: f"{asset_id}." in x.name): + # if "_renders" not in member.name and "success.txt" not in member.name: + tar.extract(member=member, path=asset_directory) + return final_save_dir + + +def wait_for_futures_and_raise_errors( + futures: Sequence[concurrent.futures.Future], +) -> Sequence[Any]: + results = [] + concurrent.futures.wait(futures) + for future in futures: + try: + results.append(future.result()) # This will re-raise any exceptions + except Exception: + raise + return results + + +def download_missing_assets( + asset_ids: Sequence[str], + asset_directory: str, + base_url: str, + verbose: bool = True, + threads: int = 1, +): + if verbose and threads > 1: + print(f"Downloading assets with {threads} threads. Will NOT log progress bars.") + + asset_ids = sorted(set(asset_ids)) + + with ThreadPoolExecutor(max_workers=threads) as executor: + futures = [ + executor.submit( + download_missing_asset, + asset_id=asset_id, + asset_directory=asset_directory, + base_url=base_url, + verbose=verbose and (threads == 1), + ) + for asset_id in asset_ids + ] + wait_for_futures_and_raise_errors(futures) + + +class WebProceduralAssetActionCallback(ProceduralAssetActionCallback): + def __init__( + self, + asset_directory: str, + base_url: str, + target_dir: str, + asset_symlink=True, + load_file_in_unity=True, + stop_if_fail=False, + asset_limit=-1, + extension=None, + verbose=True, + ): + super().__init__( + asset_directory=asset_directory, + target_dir=target_dir, + asset_symlink=asset_symlink, + load_file_in_unity=load_file_in_unity, + stop_if_fail=stop_if_fail, + asset_limit=asset_limit, + extension=extension, + verbose=verbose, + ) + self.base_url = base_url + + def _download_missing_assets(self, controller: "Controller", asset_ids: Sequence[str]): + asset_in_db = controller.step( + action="AssetsInDatabase", assetIds=asset_ids, updateProceduralLRUCache=False + ).metadata["actionReturn"] + assets_not_created = [asset_id for (asset_id, in_db) in asset_in_db.items() if not in_db] + download_missing_assets( + asset_ids=assets_not_created, + asset_directory=self.asset_directory, + base_url=self.base_url, + ) + + def Initialize(self, action, controller): + if self.asset_limit > 0: + return controller.step( + action="DeleteLRUFromProceduralCache", assetLimit=self.asset_limit + ) + + def CreateHouse(self, action: Dict[str, Any], controller: "Controller"): + house = action["house"] + asset_ids = get_all_asset_ids_recursively(house["objects"], []) + self._download_missing_assets(controller=controller, asset_ids=asset_ids) + + return super().CreateHouse(action=action, controller=controller) + + def SpawnAsset(self, action, controller): + self._download_missing_assets(controller=controller, asset_ids=[action["assetId"]]) + + return super().SpawnAsset(action=action, controller=controller) + + def GetHouseFromTemplate(self, action, controller): + template = action["template"] + asset_ids = get_all_asset_ids_recursively([v for (k, v) in template["objects"].items()], []) + self._download_missing_assets(controller=controller, asset_ids=asset_ids) + + super().GetHouseFromTemplate(action=action, controller=controller) From 9cdbe76098a17292b47df200532745abe90d3b02 Mon Sep 17 00:00:00 2001 From: AlvaroHG Date: Thu, 31 Oct 2024 19:08:24 -0700 Subject: [PATCH 4/4] Format --- ai2thor/hooks/procedural_asset_callback.py | 1 + ai2thor/hooks/procedural_asset_hook.py | 561 --------------------- tasks.py | 6 +- unity/Assets/Scripts/UtilityFunctions.cs | 6 +- 4 files changed, 9 insertions(+), 565 deletions(-) delete mode 100644 ai2thor/hooks/procedural_asset_hook.py diff --git a/ai2thor/hooks/procedural_asset_callback.py b/ai2thor/hooks/procedural_asset_callback.py index b729d05a9b..281112cba6 100644 --- a/ai2thor/hooks/procedural_asset_callback.py +++ b/ai2thor/hooks/procedural_asset_callback.py @@ -386,6 +386,7 @@ def CreateHouse(self, action, controller): verbose=self.verbose, ) + def download_with_progress_bar(save_path: str, url: str, verbose: bool = False): os.makedirs(os.path.dirname(save_path), exist_ok=True) diff --git a/ai2thor/hooks/procedural_asset_hook.py b/ai2thor/hooks/procedural_asset_hook.py deleted file mode 100644 index 16cb68d9bc..0000000000 --- a/ai2thor/hooks/procedural_asset_hook.py +++ /dev/null @@ -1,561 +0,0 @@ -# Copyright Allen Institute for Artificial Intelligence 2017 -""" -ai2thor.action_hook - -Hook runner with method names that are intercepted before running -controller.step to locally run some local code - -""" -import concurrent.futures -import logging -import os -import pathlib -import tarfile -import warnings -from concurrent.futures import ThreadPoolExecutor -from contextlib import nullcontext -from tempfile import TemporaryDirectory -from typing import Dict, Any, List, TYPE_CHECKING, Sequence - -import requests -import tqdm -from filelock import FileLock - -if TYPE_CHECKING: - from ai2thor.controller import Controller - -from objathor.asset_conversion.util import ( - create_runtime_asset_file, - get_existing_thor_asset_file_path, - change_asset_paths, - add_default_annotations, -) - -logger = logging.getLogger(os.path.basename(__file__)) -logger.setLevel(logging.INFO) - -EXTENSIONS_LOADABLE_IN_UNITY = { - ".json", - ".json.gz", - ".msgpack", - ".msgpack.gz", -} - - -def get_all_asset_ids_recursively(objects: List[Dict[str, Any]], asset_ids: List[str]) -> List[str]: - """ - Get all asset IDs in a house. - """ - for obj in objects: - asset_ids.append(obj["assetId"]) - if "children" in obj: - get_all_asset_ids_recursively(obj["children"], asset_ids) - assets_set = set(asset_ids) - if "" in assets_set: - assets_set.remove("") - return list(assets_set) - - -def create_asset( - thor_controller: "Controller", - asset_id: str, - asset_directory: str, - copy_to_dir=None, - verbose=False, - load_file_in_unity=True, - extension=None, - raise_for_failure=True, - fail_if_not_unity_loadable=False, -): - return create_assets( - thor_controller=thor_controller, - asset_ids=[asset_id], - assets_dir=asset_directory, - copy_to_dir=copy_to_dir, - verbose=verbose, - load_file_in_unity=load_file_in_unity, - extension=extension, - fail_if_not_unity_loadable=fail_if_not_unity_loadable, - raise_for_failure=raise_for_failure, - ) - - -def create_assets( - thor_controller: Any, - asset_ids: List[str], - assets_dir: str, - copy_to_dir=None, - verbose=False, - load_file_in_unity=True, - extension=None, - fail_if_not_unity_loadable=False, - raise_for_failure=True, -): - copy_to_dir = ( - os.path.join(thor_controller._build.base_dir) if copy_to_dir is None else copy_to_dir - ) - - multi_create_unity_loadable = dict( - action="CreateRuntimeAssets", - assets=[], - dir=copy_to_dir, - raise_for_failure=raise_for_failure, - ) - - create_with_data_actions = [] - errors = [] - - for asset_id in asset_ids: - asset_dir = os.path.join(assets_dir, asset_id) - # Verifies the file exists - asset_path = get_existing_thor_asset_file_path( - out_dir=asset_dir, asset_id=asset_id, force_extension=extension - ) - file_extension = ( - "".join(pathlib.Path(asset_path).suffixes) if extension is None else extension - ) - load_asset_in_unity = load_file_in_unity - - if file_extension not in EXTENSIONS_LOADABLE_IN_UNITY: - load_asset_in_unity = False - if fail_if_not_unity_loadable: - errors.append(asset_path) - continue - - # save_dir = os.path.join(controller._build.base_dir, "processed_models") - os.makedirs(copy_to_dir, exist_ok=True) - - if verbose: - logger.info(f"Copying asset to THOR build dir: {copy_to_dir}.") - - asset = create_runtime_asset_file( - asset_directory=asset_dir, - save_dir=copy_to_dir, - asset_id=asset_id, - load_file_in_unity=load_asset_in_unity, - verbose=verbose, - ) - - if not load_asset_in_unity: - # TODO refactor to this when objathor changes - # asset = load_existing_thor_asset_file( - # out_dir=asset_target_dir, object_name=asset_id, force_extension=file_extension - # ) - asset = change_asset_paths(asset=asset, save_dir=copy_to_dir) - asset = add_default_annotations(asset=asset, asset_directory=asset_dir, verbose=verbose) - create_prefab_action = { - "action": "CreateRuntimeAsset", - "asset": asset, - "raise_for_failure": raise_for_failure, - } - create_with_data_actions.append(create_prefab_action) - else: - asset_args = { - "id": asset_id, - "extension": file_extension, - } - asset_args = add_default_annotations( - asset=asset_args, asset_directory=asset_dir, verbose=verbose - ) - multi_create_unity_loadable["assets"].append(asset_args) - - if fail_if_not_unity_loadable: - error_strs = ", ".join(errors) - extensions = ", ".join(EXTENSIONS_LOADABLE_IN_UNITY) - raise RuntimeError( - f"Set to fail if files are not loadable in unity. Invalid extension of files `{error_strs}` must be of any of extensions {extensions}" - ) - - events = [] - # Slow pass asset data to pipe - if len(create_with_data_actions): - for create_asset_action in create_with_data_actions: - evt = thor_controller.step(**create_asset_action) - events.append(evt) - if verbose: - logger.info(f"Last Action: {thor_controller.last_action['action']}") - - if len(multi_create_unity_loadable): - evt = thor_controller.step(**multi_create_unity_loadable) - events.append(evt) - if verbose: - logger.debug(f"Last Action: {thor_controller.last_action['action']}") - - for evt in events: - if not evt.metadata["lastActionSuccess"]: - logger.error( - f'Error: {evt.metadata["errorMessage"]}' - f"\nLast Action: {thor_controller.last_action['action']}" - f"\nAction success: {evt.metadata['lastActionSuccess']}" - ) - return events - - -def create_assets_if_not_exist( - controller, - asset_ids, - asset_directory, - copy_to_dir, - asset_symlink, # TODO remove - stop_if_fail, - load_file_in_unity, - extension=None, - verbose=False, - raise_for_failure=True, - fail_if_not_unity_loadable=False, -): - evt = controller.step( - action="AssetsInDatabase", assetIds=asset_ids, updateProceduralLRUCache=True - ) - - asset_in_db = evt.metadata["actionReturn"] - assets_not_created = [asset_id for (asset_id, in_db) in asset_in_db.items() if not in_db] - - events = create_assets( - thor_controller=controller, - asset_ids=assets_not_created, - assets_dir=asset_directory, - copy_to_dir=copy_to_dir, - verbose=verbose, - load_file_in_unity=load_file_in_unity, - extension=extension, - fail_if_not_unity_loadable=fail_if_not_unity_loadable, - ) - for evt, i in zip(events, range(len(events))): - if not evt.metadata["lastActionSuccess"]: - # TODO do a better matching of asset_ids and event - asset_id = assets_not_created[i] if i < len(events) else None - asset_path = ( - get_existing_thor_asset_file_path(out_dir=asset_directory, asset_id=asset_id) - if asset_id is not None - else "" - ) - warnings.warn( - f"Could not create asset `{asset_path}`." f"\nError: {evt.metadata['errorMessage']}" - ) - return events[-1] - - # slower - # for asset_id in assets_not_created: - # asset_dir = os.path.abspath(os.path.join(asset_directory, asset_id)) - # # print(f"Create {asset_id}") - # evt = create_asset( - # thor_controller=controller, - # asset_id=asset_id, - # asset_directory=asset_dir, - # copy_to_dir=copy_to_dir, - # verbose=verbose, - # load_file_in_unity=load_file_in_unity, - # extension=None, - # # raise_for_failure=raise_for_failure, - # ) - # if not evt.metadata["lastActionSuccess"]: - # warnings.warn( - # f"Could not create asset `{get_existing_thor_asset_file_path(out_dir=asset_dir, asset_id=asset_id)}`." - # f"\nError: {evt.metadata['errorMessage']}" - # ) - # if stop_if_fail: - # return evt - - -class ProceduralAssetActionCallback: - def __init__( - self, - asset_directory: str, - target_dir="processed_models", - asset_symlink=True, - load_file_in_unity=True, - stop_if_fail=False, - asset_limit=-1, - extension=None, - verbose=True, - ): - self.asset_directory = asset_directory - self.asset_symlink = asset_symlink - self.stop_if_fail = stop_if_fail - self.asset_limit = asset_limit - self.load_file_in_unity = load_file_in_unity - self.target_dir = target_dir - self.extension = extension - self.verbose = verbose - self.last_asset_id_set = set() - - def Initialize(self, action, controller): - if self.asset_limit > 0: - return controller.step( - action="DeleteLRUFromProceduralCache", assetLimit=self.asset_limit - ) - - def CreateHouse(self, action, controller): - house = action["house"] - asset_ids = get_all_asset_ids_recursively(house["objects"], []) - asset_ids_set = set(asset_ids) - if not asset_ids_set.issubset(self.last_asset_id_set): - controller.step(action="DeleteLRUFromProceduralCache", assetLimit=0) - self.last_asset_id_set = set(asset_ids) - return create_assets_if_not_exist( - controller=controller, - asset_ids=asset_ids, - asset_directory=self.asset_directory, - copy_to_dir=os.path.join(controller._build.base_dir, self.target_dir), - asset_symlink=self.asset_symlink, - stop_if_fail=self.stop_if_fail, - load_file_in_unity=self.load_file_in_unity, - extension=self.extension, - verbose=self.verbose, - ) - - def SpawnAsset(self, action, controller): - asset_ids = [action["assetId"]] - return create_assets_if_not_exist( - controller=controller, - asset_ids=asset_ids, - asset_directory=self.asset_directory, - copy_to_dir=os.path.join(controller._build.base_dir, self.target_dir), - asset_symlink=self.asset_symlink, - stop_if_fail=self.stop_if_fail, - load_file_in_unity=self.load_file_in_unity, - extension=self.extension, - verbose=self.verbose, - ) - - def GetHouseFromTemplate(self, action, controller): - template = action["template"] - asset_ids = get_all_asset_ids_recursively([v for (k, v) in template["objects"].items()], []) - return create_assets_if_not_exist( - controller=controller, - asset_ids=asset_ids, - asset_directory=self.asset_directory, - copy_to_dir=os.path.join(controller._build.base_dir, self.target_dir), - asset_symlink=self.asset_symlink, - stop_if_fail=self.stop_if_fail, - load_file_in_unity=self.load_file_in_unity, - extension=self.extension, - verbose=self.verbose, - ) - - -class DownloadObjaverseActionCallback(object): - def __init__( - self, - asset_dataset_version, - asset_download_path, - target_dir="processed_models", - asset_symlink=True, - load_file_in_unity=False, - stop_if_fail=False, - asset_limit=-1, - extension=None, - verbose=True, - ): - self.asset_download_path = asset_download_path - self.asset_symlink = asset_symlink - self.stop_if_fail = stop_if_fail - self.asset_limit = asset_limit - self.load_file_in_unity = load_file_in_unity - self.target_dir = target_dir - self.extension = extension - self.verbose = verbose - self.last_asset_id_set = set() - dsc = DatasetSaveConfig( - VERSION=asset_dataset_version, - BASE_PATH=asset_download_path, - ) - self.asset_path = load_assets_path(dsc) - - def CreateHouse(self, action, controller): - house = action["house"] - asset_ids = get_all_asset_ids_recursively(house["objects"], []) - asset_ids_set = set(asset_ids) - if not asset_ids_set.issubset(self.last_asset_id_set): - controller.step(action="DeleteLRUFromProceduralCache", assetLimit=0) - self.last_asset_id_set = set(asset_ids) - return create_assets_if_not_exist( - controller=controller, - asset_ids=asset_ids, - asset_directory=self.asset_path, - copy_to_dir=os.path.join(controller._build.base_dir, self.target_dir), - asset_symlink=self.asset_symlink, - stop_if_fail=self.stop_if_fail, - load_file_in_unity=self.load_file_in_unity, - extension=self.extension, - verbose=self.verbose, - ) - -def download_with_progress_bar(save_path: str, url: str, verbose: bool = False): - os.makedirs(os.path.dirname(save_path), exist_ok=True) - - with open(save_path, "wb") as f: - if verbose: - print(f"Downloading to {save_path}") - - response = requests.get(url, stream=True) - total_length = response.headers.get("content-length") - - content_type = response.headers.get("content-type") - if content_type is not None and content_type.startswith("text/html"): - raise ValueError(f"Invalid URL: {url}") - - if total_length is None: # no content length header - f.write(response.content) - else: - dl = 0 - total_length = int(total_length) - - with ( - tqdm.tqdm( - total=total_length, - unit="B", - unit_scale=True, - desc=f"Downloading asset {url}", - ) - if verbose - else nullcontext() - ) as pbar: - for data in response.iter_content(chunk_size=4096): - dl += len(data) - f.write(data) - if verbose: - pbar.update(len(data)) - - -def download_missing_asset( - asset_id: str, - asset_directory: str, - base_url: str, - verbose: bool = False, -) -> str: - final_save_dir = os.path.join(asset_directory, asset_id) - - if os.path.exists(final_save_dir): - if any(f"{asset_id}." in p for p in os.listdir(final_save_dir)): - return final_save_dir - else: - print( - f"Directory {final_save_dir} exists but could not find" - f" asset {asset_id} in it. Will attempt to redownload." - ) - - url = f"{base_url.strip('/')}/{asset_id}.tar" - - td = TemporaryDirectory() - with td as td_name: - save_path = os.path.join(td_name, f"{asset_id}.tar") - - download_with_progress_bar(save_path=save_path, url=url, verbose=verbose) - - os.makedirs(asset_directory, exist_ok=True) - - # Loop through all the files in the tar file and extract them one by one - # to the asset directory keeping the directory structure - with FileLock(os.path.join(os.path.expanduser("~"), ".ai2thor", "asset_extraction.lock")): - with tarfile.open(save_path, "r") as tar: - # Here we sort the members so that the . file is last to ensure that the object - # file is the last thing to be saved to the final location. We do this because - # we check for the existence of the . file to determine if the asset - # has been successfully downloaded previously and we want to avoid partial downloads. - for member in sorted(tar.getmembers(), key=lambda x: f"{asset_id}." in x.name): - # if "_renders" not in member.name and "success.txt" not in member.name: - tar.extract(member=member, path=asset_directory) - return final_save_dir - - -def wait_for_futures_and_raise_errors( - futures: Sequence[concurrent.futures.Future], -) -> Sequence[Any]: - results = [] - concurrent.futures.wait(futures) - for future in futures: - try: - results.append(future.result()) # This will re-raise any exceptions - except Exception: - raise - return results - - -def download_missing_assets( - asset_ids: Sequence[str], - asset_directory: str, - base_url: str, - verbose: bool = True, - threads: int = 1, -): - if verbose and threads > 1: - print(f"Downloading assets with {threads} threads. Will NOT log progress bars.") - - asset_ids = sorted(set(asset_ids)) - - with ThreadPoolExecutor(max_workers=threads) as executor: - futures = [ - executor.submit( - download_missing_asset, - asset_id=asset_id, - asset_directory=asset_directory, - base_url=base_url, - verbose=verbose and (threads == 1), - ) - for asset_id in asset_ids - ] - wait_for_futures_and_raise_errors(futures) - - -class WebProceduralAssetHookRunner(ProceduralAssetHookRunner): - def __init__( - self, - asset_directory: str, - base_url: str, - target_dir: str, - asset_symlink=True, - load_file_in_unity=True, - stop_if_fail=False, - asset_limit=-1, - extension=None, - verbose=True, - ): - super().__init__( - asset_directory=asset_directory, - target_dir=target_dir, - asset_symlink=asset_symlink, - load_file_in_unity=load_file_in_unity, - stop_if_fail=stop_if_fail, - asset_limit=asset_limit, - extension=extension, - verbose=verbose, - ) - self.base_url = base_url - - def _download_missing_assets(self, controller: "Controller", asset_ids: Sequence[str]): - asset_in_db = controller.step( - action="AssetsInDatabase", assetIds=asset_ids, updateProceduralLRUCache=False - ).metadata["actionReturn"] - assets_not_created = [asset_id for (asset_id, in_db) in asset_in_db.items() if not in_db] - download_missing_assets( - asset_ids=assets_not_created, - asset_directory=self.asset_directory, - base_url=self.base_url, - ) - - def Initialize(self, action, controller): - if self.asset_limit > 0: - return controller.step( - action="DeleteLRUFromProceduralCache", assetLimit=self.asset_limit - ) - - def CreateHouse(self, action: Dict[str, Any], controller: "Controller"): - house = action["house"] - asset_ids = get_all_asset_ids_recursively(house["objects"], []) - self._download_missing_assets(controller=controller, asset_ids=asset_ids) - - return super().CreateHouse(action=action, controller=controller) - - def SpawnAsset(self, action, controller): - self._download_missing_assets(controller=controller, asset_ids=[action["assetId"]]) - - return super().SpawnAsset(action=action, controller=controller) - - def GetHouseFromTemplate(self, action, controller): - template = action["template"] - asset_ids = get_all_asset_ids_recursively([v for (k, v) in template["objects"].items()], []) - self._download_missing_assets(controller=controller, asset_ids=asset_ids) - - super().GetHouseFromTemplate(action=action, controller=controller) diff --git a/tasks.py b/tasks.py index 4321fbc4c2..15941c81f0 100644 --- a/tasks.py +++ b/tasks.py @@ -4724,10 +4724,10 @@ def test_create_prefab(ctx, json_path): @task -def procedural_asset_hook_test(ctx, asset_dir, house_path, asset_id=""): +def procedural_asset_callback_test(ctx, asset_dir, house_path, asset_id=""): import json import ai2thor.controller - from ai2thor.hooks.procedural_asset_hook import ProceduralAssetActionCallback + from ai2thor.hooks.procedural_asset_callback import ProceduralAssetActionCallback from objathor.asset_conversion.util import view_asset_in_thor hook_runner = ProceduralAssetActionCallback( @@ -4817,7 +4817,7 @@ def procedural_asset_hook_test(ctx, asset_dir, house_path, asset_id=""): def procedural_asset_cache_test(ctx, asset_dir, house_path, asset_ids="", cache_limit=1): import json import ai2thor.controller - from ai2thor.hooks.procedural_asset_hook import ProceduralAssetActionCallback + from ai2thor.hooks.procedural_asset_callback import ProceduralAssetActionCallback hook_runner = ProceduralAssetActionCallback( asset_directory=asset_dir, asset_symlink=True, verbose=True, asset_limit=1 diff --git a/unity/Assets/Scripts/UtilityFunctions.cs b/unity/Assets/Scripts/UtilityFunctions.cs index 10d14ea27d..c9c0157a62 100644 --- a/unity/Assets/Scripts/UtilityFunctions.cs +++ b/unity/Assets/Scripts/UtilityFunctions.cs @@ -484,7 +484,11 @@ public static List GetLightPropertiesOfScene() { return allOfTheLights; } - public static bool ArePositionsApproximatelyEqual(Vector3 position1, Vector3 position2, float epsilon = Vector3.kEpsilon) { + public static bool ArePositionsApproximatelyEqual( + Vector3 position1, + Vector3 position2, + float epsilon = Vector3.kEpsilon + ) { // Compare each component (x, y, z) of the two positions to see if they are approximately equal via the epsilon value return Mathf.Abs(position1.x - position2.x) < epsilon && Mathf.Abs(position1.y - position2.y) < epsilon