Skip to content

Commit

Permalink
Merge pull request #11 from diegoferigo/refactor
Browse files Browse the repository at this point in the history
Enhance and refactor URIs resolving logic
  • Loading branch information
traversaro authored Feb 29, 2024
2 parents c7670f0 + e6ef710 commit 3ffc433
Show file tree
Hide file tree
Showing 4 changed files with 336 additions and 53 deletions.
12 changes: 11 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ jobs:

- name: Run the Python tests
shell: bash -l {0}
run: pytest
run: pytest --capture=no

# This test requires the conda-forge::icub-models,
# robotology::ergocub-software and
Expand All @@ -53,3 +53,13 @@ jobs:
resolve-robotics-uri-py package://ergoCub/robots/ergoCubSN000/model.urdf
resolve-robotics-uri-py package://moveit_resources_panda_description/urdf/panda.urdf
! resolve-robotics-uri-py package://this/file/does/not/exist
- name: Check command line helper (Ubuntu and macOS)
if: startsWith(matrix.os, 'ubuntu') || startsWith(matrix.os, 'macos')
shell: bash -l {0}
run: |
mkdir -p /tmp/folder/ && touch "/tmp/folder/file with spaces.txt"
mkdir -p /tmp/folder/ && touch "/tmp/folder/file_without_spaces.txt"
resolve-robotics-uri-py "file:///tmp/folder/file_without_spaces.txt"
resolve-robotics-uri-py "file:///tmp/folder/file with spaces.txt"
resolve-robotics-uri-py "file:/tmp/folder/file_without_spaces.txt"
resolve-robotics-uri-py "file:/tmp/folder/file with spaces.txt"
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,4 @@ all =

[tool:pytest]
addopts = -rsxX -v --strict-markers
testpaths = tests
testpaths = test
213 changes: 165 additions & 48 deletions src/resolve_robotics_uri_py/resolve_robotics_uri_py.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,80 +2,197 @@
import os
import pathlib
import sys
from typing import List
import warnings
from typing import Iterable

# =====================
# URI resolving helpers
# =====================

# Supported URI schemes
SupportedSchemes = {"file", "package", "model"}

# Environment variables in the search path.
#
# * https://github.com/robotology/idyntree/issues/291
# * https://github.com/gazebosim/sdformat/issues/1234
#
# AMENT_PREFIX_PATH is the only "special" as we need to add
# "share" after each value, see https://github.com/stack-of-tasks/pinocchio/issues/1520
#
# This list specify the origin of each env variable:
#
# * AMENT_PREFIX_PATH: Used in ROS2
# * GAZEBO_MODEL_PATH: Used in Gazebo Classic
# * GZ_SIM_RESOURCE_PATH: Used in Gazebo Sim >= 7
# * IGN_GAZEBO_RESOURCE_PATH: Used in Ignition Gazebo <= 7
# * ROS_PACKAGE_PATH: Used in ROS1
# * SDF_PATH: Used in sdformat
#
SupportedEnvVars = {
"AMENT_PREFIX_PATH",
"GAZEBO_MODEL_PATH",
"GZ_SIM_RESOURCE_PATH",
"IGN_GAZEBO_RESOURCE_PATH",
"ROS_PACKAGE_PATH",
"SDF_PATH",
}


# Function inspired from https://github.com/ami-iit/robot-log-visualizer/pull/51
def get_search_paths_from_envs(env_list):
return [
def get_search_paths_from_envs(env_list: Iterable[str]) -> list[pathlib.Path]:
# Read the searched paths from all the environment variables
search_paths = [
pathlib.Path(f) if (env != "AMENT_PREFIX_PATH") else pathlib.Path(f) / "share"
for env in env_list if os.getenv(env) is not None
for env in env_list
if os.getenv(env) is not None
for f in os.getenv(env).split(os.pathsep)
]

def pathlist_list_to_string(path_list):
return ' '.join(str(path) for path in path_list)
# Resolve and remove duplicate paths
search_paths = list({path.resolve() for path in search_paths})

# Keep only existing paths
existing_search_paths = [path for path in search_paths if path.is_dir()]

# Notify the user of non-existing paths
if len(set(search_paths) - set(existing_search_paths)) > 0:
msg = "resolve-robotics-uri-py: Ignoring non-existing paths from env vars: {}."
warnings.warn(
msg.format(
pathlist_list_to_string(set(search_paths) - set(existing_search_paths))
)
)

return existing_search_paths


def pathlist_list_to_string(path_list: Iterable[str | pathlib.Path]) -> str:
return " ".join(str(path) for path in path_list)


# ===================
# URI resolving logic
# ===================


def resolve_robotics_uri(uri: str) -> pathlib.Path:
# List of environment variables to consider, see:
# * https://github.com/robotology/idyntree/issues/291
# * https://github.com/gazebosim/sdformat/issues/1234
# AMENT_PREFIX_PATH is the only "special" as we need to add
# "share" after each value, see https://github.com/stack-of-tasks/pinocchio/issues/1520
# This list specify the origin of each env variable:
# * GAZEBO_MODEL_PATH: Used in Gazebo Classic
# * ROS_PACKAGE_PATH: Used in ROS1
# * AMENT_PREFIX_PATH: Used in ROS2
# * SDF_PATH: Used in sdformat
# * IGN_GAZEBO_RESOURCE_PATH: Used in Ignition Gazebo <= 7
# * GZ_SIM_RESOURCE_PATH: Used in Gazebo Sim >= 7
env_list = ["GAZEBO_MODEL_PATH", "ROS_PACKAGE_PATH", "AMENT_PREFIX_PATH", "SDF_PATH", "IGN_GAZEBO_RESOURCE_PATH", "GZ_SIM_RESOURCE_PATH"]

# Preliminary step: if there is no scheme, we just consider this a path and we return it as it is
if "://" not in uri:
return pathlib.Path(uri)
"""
Resolve a robotics URI to an absolute filename.
Args:
uri: The URI to resolve.
Returns:
The absolute filename corresponding to the URI.
Raises:
FileNotFoundError: If no file corresponding to the URI is found.
"""

# If the URI has no scheme, use by default file:// which maps the resolved input
# path to a URI with empty authority
if not any(uri.startswith(scheme) for scheme in SupportedSchemes):
uri = f"file://{pathlib.Path(uri).resolve()}"

# ================================================
# Process file:/ separately from the other schemes
# ================================================

# This is the file URI scheme as per RFC8089:
# https://datatracker.ietf.org/doc/html/rfc8089

if uri.startswith("file:"):
# Strip the scheme from the URI
uri = uri.replace(f"file://", "")
uri = uri.replace(f"file:", "")

# Create the file path, resolving symlinks and '..'
uri_file_path = pathlib.Path(uri).resolve()

# Check that the file exists
if not uri_file_path.is_file():
msg = "resolve-robotics-uri-py: No file corresponding to URI '{}' found"
raise FileNotFoundError(msg.format(uri))

return uri_file_path.resolve()

# =========================
# Process the other schemes
# =========================

# Get scheme from URI
from urllib.parse import urlparse

# Parse the URI
parsed_uri = urlparse(uri)

# We only support at the moment:
# file:// scheme: to pass a file path directly
# package:// : ROS-style package URI
# model:// : SDF-style model URI
if parsed_uri.scheme not in ["file", "package", "model"]:
raise FileNotFoundError(f"Passed URI \"{uri}\" use non-supported scheme {parsed_uri.scheme}")
# We only support the following URI schemes at the moment:
#
# * file:/ to pass an absolute file path directly
# * model:// SDF-style model URI
# * package:// ROS-style package URI
#
if parsed_uri.scheme not in SupportedSchemes:
msg = "resolve-robotics-uri-py: Passed URI '{}' use non-supported scheme '{}'"
raise FileNotFoundError(msg.format(uri, parsed_uri.scheme))

# Strip the scheme from the URI
uri_path = uri
uri_path = uri_path.replace(f"{parsed_uri.scheme}://", "")

# List of matching resources found
model_filenames = []

if parsed_uri.scheme == "file":
model_filenames.append(uri.replace("file:/", ""))
# Search the resource in the path from the env variables
for folder in set(get_search_paths_from_envs(SupportedEnvVars)):

# Join the folder from environment variable and the URI path
candidate_file_name = folder / uri_path

# Expand or resolve the file path (symlinks and ..)
candidate_file_name = candidate_file_name.resolve()

if not candidate_file_name.is_file():
continue

if parsed_uri.scheme == "package" or parsed_uri.scheme == "model":
uri_path = uri.replace(f"{parsed_uri.scheme}://","")
for folder in get_search_paths_from_envs(env_list):
candidate_file_name = folder / pathlib.Path(uri_path)
if (candidate_file_name.is_file()):
if candidate_file_name not in model_filenames:
model_filenames.append(candidate_file_name)
# Skip if the file is already in the list
if candidate_file_name not in model_filenames:
model_filenames.append(candidate_file_name)

if model_filenames:
if (len(model_filenames) > 1):
warnings.warn(f"resolve-robotics-uri-py: Multiple files ({pathlist_list_to_string(model_filenames)}) found for uri \"{uri}\", returning the first one.")
return pathlib.Path(model_filenames[0])
if len(model_filenames) == 0:
msg = "resolve-robotics-uri-py: No file corresponding to URI '{}' found"
raise FileNotFoundError(msg.format(uri))

if len(model_filenames) > 1:
msg = "resolve-robotics-uri-py: "
msg += "Multiple files ({}) found for URI '{}', returning the first one."
warnings.warn(msg.format(pathlist_list_to_string(model_filenames), uri))

if len(model_filenames) >= 1:
assert model_filenames[0].exists()
return pathlib.Path(model_filenames[0]).resolve()

# If no file was found raise error
raise FileNotFoundError(f"resolve-robotics-uri-py: No file corresponding to uri \"{uri}\" found")

def main():
parser = argparse.ArgumentParser(description="Utility resolve a robotics URI (file://, model://, package://) to an absolute filename.")
parser.add_argument("uri", metavar="uri", type=str, help="URI to resolve")
parser = argparse.ArgumentParser(
description="Utility resolve a robotics URI ({}) to an absolute filename.".format(
", ".join(f"{scheme}://" for scheme in SupportedSchemes)
)
)
parser.add_argument("uri", metavar="URI", type=str, help="URI to resolve")

args = parser.parse_args()
result = resolve_robotics_uri(args.uri)

print(result)
try:
result = resolve_robotics_uri(args.uri)
except FileNotFoundError as e:
print(e, file=sys.stderr)
sys.exit(1)

print(result, file=sys.stdout)
sys.exit(0)


if __name__ == "__main__":
main()
Loading

0 comments on commit 3ffc433

Please sign in to comment.