diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7c58e20..3d4852f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -40,7 +40,7 @@ jobs: - name: Run the Python tests shell: bash -l {0} - run: pytest + run: pytest --capture=no # This test requires the conda-forge::icub-models, # robotology::ergocub-software and @@ -53,3 +53,13 @@ jobs: resolve-robotics-uri-py package://ergoCub/robots/ergoCubSN000/model.urdf resolve-robotics-uri-py package://moveit_resources_panda_description/urdf/panda.urdf ! resolve-robotics-uri-py package://this/file/does/not/exist + - name: Check command line helper (Ubuntu and macOS) + if: startsWith(matrix.os, 'ubuntu') || startsWith(matrix.os, 'macos') + shell: bash -l {0} + run: | + mkdir -p /tmp/folder/ && touch "/tmp/folder/file with spaces.txt" + mkdir -p /tmp/folder/ && touch "/tmp/folder/file_without_spaces.txt" + resolve-robotics-uri-py "file:///tmp/folder/file_without_spaces.txt" + resolve-robotics-uri-py "file:///tmp/folder/file with spaces.txt" + resolve-robotics-uri-py "file:/tmp/folder/file_without_spaces.txt" + resolve-robotics-uri-py "file:/tmp/folder/file with spaces.txt" diff --git a/setup.cfg b/setup.cfg index d849aa1..bbe6bb8 100644 --- a/setup.cfg +++ b/setup.cfg @@ -59,4 +59,4 @@ all = [tool:pytest] addopts = -rsxX -v --strict-markers -testpaths = tests +testpaths = test diff --git a/src/resolve_robotics_uri_py/resolve_robotics_uri_py.py b/src/resolve_robotics_uri_py/resolve_robotics_uri_py.py index 234e1d7..d85d0ab 100644 --- a/src/resolve_robotics_uri_py/resolve_robotics_uri_py.py +++ b/src/resolve_robotics_uri_py/resolve_robotics_uri_py.py @@ -2,80 +2,197 @@ import os import pathlib import sys -from typing import List import warnings +from typing import Iterable + +# ===================== +# URI resolving helpers +# ===================== + +# Supported URI schemes +SupportedSchemes = {"file", "package", "model"} + +# Environment variables in the search path. +# +# * https://github.com/robotology/idyntree/issues/291 +# * https://github.com/gazebosim/sdformat/issues/1234 +# +# AMENT_PREFIX_PATH is the only "special" as we need to add +# "share" after each value, see https://github.com/stack-of-tasks/pinocchio/issues/1520 +# +# This list specify the origin of each env variable: +# +# * AMENT_PREFIX_PATH: Used in ROS2 +# * GAZEBO_MODEL_PATH: Used in Gazebo Classic +# * GZ_SIM_RESOURCE_PATH: Used in Gazebo Sim >= 7 +# * IGN_GAZEBO_RESOURCE_PATH: Used in Ignition Gazebo <= 7 +# * ROS_PACKAGE_PATH: Used in ROS1 +# * SDF_PATH: Used in sdformat +# +SupportedEnvVars = { + "AMENT_PREFIX_PATH", + "GAZEBO_MODEL_PATH", + "GZ_SIM_RESOURCE_PATH", + "IGN_GAZEBO_RESOURCE_PATH", + "ROS_PACKAGE_PATH", + "SDF_PATH", +} + # Function inspired from https://github.com/ami-iit/robot-log-visualizer/pull/51 -def get_search_paths_from_envs(env_list): - return [ +def get_search_paths_from_envs(env_list: Iterable[str]) -> list[pathlib.Path]: + # Read the searched paths from all the environment variables + search_paths = [ pathlib.Path(f) if (env != "AMENT_PREFIX_PATH") else pathlib.Path(f) / "share" - for env in env_list if os.getenv(env) is not None + for env in env_list + if os.getenv(env) is not None for f in os.getenv(env).split(os.pathsep) ] -def pathlist_list_to_string(path_list): - return ' '.join(str(path) for path in path_list) + # Resolve and remove duplicate paths + search_paths = list({path.resolve() for path in search_paths}) + + # Keep only existing paths + existing_search_paths = [path for path in search_paths if path.is_dir()] + + # Notify the user of non-existing paths + if len(set(search_paths) - set(existing_search_paths)) > 0: + msg = "resolve-robotics-uri-py: Ignoring non-existing paths from env vars: {}." + warnings.warn( + msg.format( + pathlist_list_to_string(set(search_paths) - set(existing_search_paths)) + ) + ) + + return existing_search_paths + + +def pathlist_list_to_string(path_list: Iterable[str | pathlib.Path]) -> str: + return " ".join(str(path) for path in path_list) + + +# =================== +# URI resolving logic +# =================== + def resolve_robotics_uri(uri: str) -> pathlib.Path: - # List of environment variables to consider, see: - # * https://github.com/robotology/idyntree/issues/291 - # * https://github.com/gazebosim/sdformat/issues/1234 - # AMENT_PREFIX_PATH is the only "special" as we need to add - # "share" after each value, see https://github.com/stack-of-tasks/pinocchio/issues/1520 - # This list specify the origin of each env variable: - # * GAZEBO_MODEL_PATH: Used in Gazebo Classic - # * ROS_PACKAGE_PATH: Used in ROS1 - # * AMENT_PREFIX_PATH: Used in ROS2 - # * SDF_PATH: Used in sdformat - # * IGN_GAZEBO_RESOURCE_PATH: Used in Ignition Gazebo <= 7 - # * GZ_SIM_RESOURCE_PATH: Used in Gazebo Sim >= 7 - env_list = ["GAZEBO_MODEL_PATH", "ROS_PACKAGE_PATH", "AMENT_PREFIX_PATH", "SDF_PATH", "IGN_GAZEBO_RESOURCE_PATH", "GZ_SIM_RESOURCE_PATH"] - - # Preliminary step: if there is no scheme, we just consider this a path and we return it as it is - if "://" not in uri: - return pathlib.Path(uri) + """ + Resolve a robotics URI to an absolute filename. + + Args: + uri: The URI to resolve. + + Returns: + The absolute filename corresponding to the URI. + + Raises: + FileNotFoundError: If no file corresponding to the URI is found. + """ + + # If the URI has no scheme, use by default file:// which maps the resolved input + # path to a URI with empty authority + if not any(uri.startswith(scheme) for scheme in SupportedSchemes): + uri = f"file://{pathlib.Path(uri).resolve()}" + + # ================================================ + # Process file:/ separately from the other schemes + # ================================================ + + # This is the file URI scheme as per RFC8089: + # https://datatracker.ietf.org/doc/html/rfc8089 + + if uri.startswith("file:"): + # Strip the scheme from the URI + uri = uri.replace(f"file://", "") + uri = uri.replace(f"file:", "") + + # Create the file path, resolving symlinks and '..' + uri_file_path = pathlib.Path(uri).resolve() + + # Check that the file exists + if not uri_file_path.is_file(): + msg = "resolve-robotics-uri-py: No file corresponding to URI '{}' found" + raise FileNotFoundError(msg.format(uri)) + + return uri_file_path.resolve() + + # ========================= + # Process the other schemes + # ========================= # Get scheme from URI from urllib.parse import urlparse + + # Parse the URI parsed_uri = urlparse(uri) - # We only support at the moment: - # file:// scheme: to pass a file path directly - # package:// : ROS-style package URI - # model:// : SDF-style model URI - if parsed_uri.scheme not in ["file", "package", "model"]: - raise FileNotFoundError(f"Passed URI \"{uri}\" use non-supported scheme {parsed_uri.scheme}") + # We only support the following URI schemes at the moment: + # + # * file:/ to pass an absolute file path directly + # * model:// SDF-style model URI + # * package:// ROS-style package URI + # + if parsed_uri.scheme not in SupportedSchemes: + msg = "resolve-robotics-uri-py: Passed URI '{}' use non-supported scheme '{}'" + raise FileNotFoundError(msg.format(uri, parsed_uri.scheme)) + # Strip the scheme from the URI + uri_path = uri + uri_path = uri_path.replace(f"{parsed_uri.scheme}://", "") + + # List of matching resources found model_filenames = [] - if parsed_uri.scheme == "file": - model_filenames.append(uri.replace("file:/", "")) + # Search the resource in the path from the env variables + for folder in set(get_search_paths_from_envs(SupportedEnvVars)): + + # Join the folder from environment variable and the URI path + candidate_file_name = folder / uri_path + + # Expand or resolve the file path (symlinks and ..) + candidate_file_name = candidate_file_name.resolve() + + if not candidate_file_name.is_file(): + continue - if parsed_uri.scheme == "package" or parsed_uri.scheme == "model": - uri_path = uri.replace(f"{parsed_uri.scheme}://","") - for folder in get_search_paths_from_envs(env_list): - candidate_file_name = folder / pathlib.Path(uri_path) - if (candidate_file_name.is_file()): - if candidate_file_name not in model_filenames: - model_filenames.append(candidate_file_name) + # Skip if the file is already in the list + if candidate_file_name not in model_filenames: + model_filenames.append(candidate_file_name) - if model_filenames: - if (len(model_filenames) > 1): - warnings.warn(f"resolve-robotics-uri-py: Multiple files ({pathlist_list_to_string(model_filenames)}) found for uri \"{uri}\", returning the first one.") - return pathlib.Path(model_filenames[0]) + if len(model_filenames) == 0: + msg = "resolve-robotics-uri-py: No file corresponding to URI '{}' found" + raise FileNotFoundError(msg.format(uri)) + + if len(model_filenames) > 1: + msg = "resolve-robotics-uri-py: " + msg += "Multiple files ({}) found for URI '{}', returning the first one." + warnings.warn(msg.format(pathlist_list_to_string(model_filenames), uri)) + + if len(model_filenames) >= 1: + assert model_filenames[0].exists() + return pathlib.Path(model_filenames[0]).resolve() - # If no file was found raise error - raise FileNotFoundError(f"resolve-robotics-uri-py: No file corresponding to uri \"{uri}\" found") def main(): - parser = argparse.ArgumentParser(description="Utility resolve a robotics URI (file://, model://, package://) to an absolute filename.") - parser.add_argument("uri", metavar="uri", type=str, help="URI to resolve") + parser = argparse.ArgumentParser( + description="Utility resolve a robotics URI ({}) to an absolute filename.".format( + ", ".join(f"{scheme}://" for scheme in SupportedSchemes) + ) + ) + parser.add_argument("uri", metavar="URI", type=str, help="URI to resolve") args = parser.parse_args() - result = resolve_robotics_uri(args.uri) - print(result) + try: + result = resolve_robotics_uri(args.uri) + except FileNotFoundError as e: + print(e, file=sys.stderr) + sys.exit(1) + + print(result, file=sys.stdout) sys.exit(0) + if __name__ == "__main__": main() diff --git a/test/test_resolve_robotics_uri_py.py b/test/test_resolve_robotics_uri_py.py index 589ac3b..58f5736 100644 --- a/test/test_resolve_robotics_uri_py.py +++ b/test/test_resolve_robotics_uri_py.py @@ -1,6 +1,162 @@ -import resolve_robotics_uri_py +import contextlib +import os +import pathlib +import sys +import tempfile +from typing import ContextManager + import pytest -def test_non_existing_file(): +import resolve_robotics_uri_py + + +def clear_env_vars(): + for env_var in resolve_robotics_uri_py.resolve_robotics_uri_py.SupportedEnvVars: + _ = os.environ.pop(env_var, None) + + +@contextlib.contextmanager +def export_env_var(name: str, value: str) -> ContextManager[None]: + + os.environ[name] = value + yield + del os.environ[name] + + +# ======================================= +# Test the resolve_robotics_uri functions +# ======================================= + + +@pytest.mark.parametrize("scheme", ["model://", "package://"]) +def test_scoped_uri(scheme: str, env_var_name="GZ_SIM_RESOURCE_PATH"): + + clear_env_vars() + + # Non-existing relative URI path + with pytest.raises(FileNotFoundError): + uri = f"{scheme}this/package/and/file/does/not.exist" + resolve_robotics_uri_py.resolve_robotics_uri(uri) + + # Non-existing absolute URI path with pytest.raises(FileNotFoundError): - resolve_robotics_uri_py.resolve_robotics_uri("package://this/package/and/file/does/not.exist") + uri = f"{scheme}/this/package/and/file/does/not.exist" + resolve_robotics_uri_py.resolve_robotics_uri(uri) + + # Existing URI path not in search dirs + with pytest.raises(FileNotFoundError): + with tempfile.NamedTemporaryFile() as temp: + uri = f"{scheme}{temp.name}" + resolve_robotics_uri_py.resolve_robotics_uri(uri) + + # URI path in top-level search dir + with tempfile.TemporaryDirectory() as temp_dir: + + temp_dir_path = pathlib.Path(temp_dir).resolve() + temp_dir_path.mkdir(exist_ok=True) + top_level = temp_dir_path / "top_level.txt" + top_level.touch(exist_ok=True) + + # Existing relative URI path not in search dirs + with pytest.raises(FileNotFoundError): + uri = f"{scheme}top_level.txt" + resolve_robotics_uri_py.resolve_robotics_uri(uri) + + # Existing absolute URI path in search dirs + with pytest.raises(FileNotFoundError): + with export_env_var(name=env_var_name, value=str(temp_dir_path)): + uri = f"{scheme}/top_level.txt" + resolve_robotics_uri_py.resolve_robotics_uri(uri) + + # Existing relative URI path in search dirs + with export_env_var(name=env_var_name, value=str(temp_dir_path)): + relative_path = "top_level.txt" + uri = f"{scheme}{relative_path}" + path_of_file = resolve_robotics_uri_py.resolve_robotics_uri(uri) + assert path_of_file == path_of_file.resolve() + assert path_of_file == temp_dir_path / relative_path + + # Existing relative URI path in search dirs with multiple paths + with export_env_var( + name=env_var_name, value=f"/another/dir{os.pathsep}{str(temp_dir_path)}" + ): + relative_path = "top_level.txt" + uri = f"{scheme}{relative_path}" + path_of_file = resolve_robotics_uri_py.resolve_robotics_uri(uri) + assert path_of_file == path_of_file.resolve() + assert path_of_file == temp_dir_path / relative_path + + # URI path in sub-level search dir + with tempfile.TemporaryDirectory() as temp_dir: + + temp_dir_path = pathlib.Path(temp_dir).resolve() + level1 = temp_dir_path / "sub" / "level1.txt" + level1.parent.mkdir(exist_ok=True, parents=True) + level1.touch(exist_ok=True) + + # Existing relative URI path not in search dirs + with pytest.raises(FileNotFoundError): + uri = f"{scheme}sub/level1.txt" + resolve_robotics_uri_py.resolve_robotics_uri(uri) + + # Existing absolute URI path in search dirs + with pytest.raises(FileNotFoundError): + with export_env_var(name=env_var_name, value=str(temp_dir_path)): + uri = f"{scheme}/sub/level1.txt" + resolve_robotics_uri_py.resolve_robotics_uri(uri) + + # Existing relative URI path in search dirs + with export_env_var(name=env_var_name, value=str(temp_dir_path)): + relative_path = "sub/level1.txt" + uri = f"{scheme}{relative_path}" + path_of_file = resolve_robotics_uri_py.resolve_robotics_uri(uri) + assert path_of_file == temp_dir_path / relative_path + + # Existing relative URI path in search dirs with multiple paths + with export_env_var( + name=env_var_name, value=f"/another/dir{os.pathsep}{str(temp_dir_path)}" + ): + relative_path = "sub/level1.txt" + uri = f"{scheme}{relative_path}" + path_of_file = resolve_robotics_uri_py.resolve_robotics_uri(uri) + assert path_of_file == temp_dir_path / relative_path + + +def test_scheme_file(): + + clear_env_vars() + + # Non-existing absolute URI path + with pytest.raises(FileNotFoundError): + uri_file = "file://" + "/this/file/does/not.exist" + resolve_robotics_uri_py.resolve_robotics_uri(uri_file) + + # Existing absolute URI path with empty authority + with tempfile.NamedTemporaryFile() as temp: + temp_name = pathlib.Path(temp.name).resolve(strict=True) + uri_file = "file://" + temp.name + path_of_file = resolve_robotics_uri_py.resolve_robotics_uri(uri_file) + assert path_of_file == path_of_file.resolve() + assert path_of_file == temp_name + + # Existing absolute URI path without authority + with tempfile.NamedTemporaryFile() as temp: + temp_name = pathlib.Path(temp.name).resolve(strict=True) + uri_file = "file:" + temp.name + path_of_file = resolve_robotics_uri_py.resolve_robotics_uri(uri_file) + assert path_of_file == path_of_file.resolve() + assert path_of_file == temp_name + + # Fallback to file:// with no scheme + with tempfile.NamedTemporaryFile() as temp: + temp_name = pathlib.Path(temp.name).resolve(strict=True) + uri_file = f"{temp_name}" + path_of_file = resolve_robotics_uri_py.resolve_robotics_uri(uri_file) + assert path_of_file == path_of_file.resolve() + assert path_of_file == temp_name + + # Try to find an existing file (the Python executable) without any file:/ scheme + path_of_python_executable = resolve_robotics_uri_py.resolve_robotics_uri(sys.executable) + assert path_of_python_executable == pathlib.Path(sys.executable) + +