From 394a30154dfae4dd9af1d10f851fb9a929709101 Mon Sep 17 00:00:00 2001 From: davidjurado Date: Tue, 15 Aug 2023 10:54:02 -0500 Subject: [PATCH] Add mount options from the CLI (#279) New CLI argument `--mount` that is a shortcut to override mount options for `input` parameters. Works with docker and singularity runners. This is useful to ensure (in a convenient way) that input directories and files are not accidently modified by MLCube tasks. --------- Co-authored-by: Sergey Serebryakov --- mlcube/mlcube/__main__.py | 18 +- mlcube/mlcube/config.py | 2 +- mlcube/mlcube/parser.py | 78 ++++- mlcube/mlcube/shell.py | 309 +++++++++++++----- mlcube/mlcube/tests/test_parser.py | 124 ++++--- mlcube/mlcube/tests/test_shell.py | 151 ++++++--- .../mlcube_docker/mlcube_docker/docker_run.py | 3 +- .../mlcube_singularity/singularity_run.py | 5 +- 8 files changed, 489 insertions(+), 201 deletions(-) diff --git a/mlcube/mlcube/__main__.py b/mlcube/mlcube/__main__.py index 696e04a..c539bdb 100644 --- a/mlcube/mlcube/__main__.py +++ b/mlcube/mlcube/__main__.py @@ -16,6 +16,7 @@ UsageExamples, parse_cli_args, ) +from mlcube.config import MountType from mlcube.errors import ExecutionError, IllegalParameterValueError, MLCubeError from mlcube.parser import CliParser from mlcube.shell import Shell @@ -123,6 +124,14 @@ def parser_process(value: str, state: click.parser.ParsingState): default=None, help="CPU options defined during MLCube container execution.", ) +mount_option = click.option( + "--mount", + required=False, + type=click.Choice([MountType.RW, MountType.RO]), + default=None, + help="Mount options for all input parameters. These mount options override any other mount options defined for " + "each input parameters. A typical use case is to ensure that inputs are mounted in read-only (ro) mode.", +) @click.group(name="mlcube", add_help_option=False) @@ -279,6 +288,7 @@ def configure(mlcube: t.Optional[str], platform: str, p: t.Tuple[str]) -> None: @gpus_option @memory_option @cpu_option +@mount_option @Options.help @click.pass_context def run( @@ -292,6 +302,7 @@ def run( gpus: str, memory: str, cpu: str, + mount: str, ) -> None: """Run MLCube task(s). @@ -307,9 +318,12 @@ def run( gpus: GPU usage options defined during MLCube container execution. memory: Memory RAM options defined during MLCube container execution. cpu: CPU options defined during MLCube container execution. + mount: Mount (global) options defined for all input parameters in all tasks to be executed. They override any + mount options defined for individual parameters. """ logger.info( - "run input_arg mlcube=%s, platform=%s, task=%s, workspace=%s, network=%s, security=%s, gpus=%s, memory=%s, " + "run input_arg mlcube=%s, platform=%s, task=%s, workspace=%s, network=%s, security=%s, gpus=%s, " + "memory=%s, mount=%s" "cpu=%s", mlcube, platform, @@ -320,6 +334,7 @@ def run( gpus, memory, cpu, + mount, ) runner_cls, mlcube_config = parse_cli_args( unparsed_args=ctx.args, @@ -332,6 +347,7 @@ def run( "gpus": gpus, "memory": memory, "cpu": cpu, + "mount": mount, }, resolve=True, ) diff --git a/mlcube/mlcube/config.py b/mlcube/mlcube/config.py index bd904f6..ac5034d 100644 --- a/mlcube/mlcube/config.py +++ b/mlcube/mlcube/config.py @@ -261,7 +261,7 @@ def check_parameters(parameters: DictConfig, task_cli_args: t.Dict) -> None: ) # for name in parameters.keys(): - # The `_param_name` is anyway there, so check it's not None. + # The `name` is anyway there, so check it's not None. [param_def] = MLCubeConfig.ensure_values_exist(parameters, name, dict) # Deal with the case when value is a string (default value). if isinstance(param_def, str): diff --git a/mlcube/mlcube/parser.py b/mlcube/mlcube/parser.py index 669bd08..08aa76c 100644 --- a/mlcube/mlcube/parser.py +++ b/mlcube/mlcube/parser.py @@ -78,7 +78,7 @@ def parse_list_arg( @staticmethod def parse_extra_arg( - unparsed_args: t.List[str], parsed_args: t.Dict[str, t.Optional[str]] + unparsed_args: t.List[str], parsed_args: t.Dict[str, t.Optional[str]] ) -> t.Tuple[DictConfig, t.Dict]: """Parse extra arguments on a command line. @@ -111,31 +111,79 @@ def parse_extra_arg( [arg[2:] for arg in unparsed_args if arg.startswith("-P")] ) - task_args = [arg.split("=") for arg in unparsed_args if not arg.startswith("-P")] + task_args = [ + arg.split("=") for arg in unparsed_args if not arg.startswith("-P") + ] task_args = {arg[0]: arg[1] for arg in task_args} # Parse unparsed arguments - platform: t.Optional[str] = parsed_args.get('platform', None) - if platform in {'docker', 'singularity'}: + platform: t.Optional[str] = parsed_args.get("platform", None) + if platform in {"docker", "singularity"}: runner_run_args = {} - if parsed_args.get('network', None): - runner_run_args["--network"] = parsed_args['network'] - if parsed_args.get('security', None): + if parsed_args.get("network", None): + runner_run_args["--network"] = parsed_args["network"] + if parsed_args.get("security", None): key = "--security-opt" if platform == "docker" else "--security" - runner_run_args[key] = parsed_args['security'] - if parsed_args.get('gpus', None): + runner_run_args[key] = parsed_args["security"] + if parsed_args.get("gpus", None): if platform == "docker": - runner_run_args["--gpus"] = parsed_args['gpus'] + runner_run_args["--gpus"] = parsed_args["gpus"] else: runner_run_args["--nv"] = "" - os.environ['SINGULARITYENV_CUDA_VISIBLE_DEVICES'] = parsed_args['gpus'] - if parsed_args.get('memory', None): + os.environ["SINGULARITYENV_CUDA_VISIBLE_DEVICES"] = parsed_args[ + "gpus" + ] + if parsed_args.get("memory", None): key = "--memory" if platform == "docker" else "--vm-ram" - runner_run_args[key] = parsed_args['memory'] - if parsed_args.get('cpu', None): + runner_run_args[key] = parsed_args["memory"] + if parsed_args.get("cpu", None): key = "--cpuset-cpus" if platform == "docker" else "--vm-cpu" - runner_run_args[key] = parsed_args['cpu'] + runner_run_args[key] = parsed_args["cpu"] + runner_run_args["--mount_opts"] = parsed_args["mount"] mlcube_args.merge_with({platform: runner_run_args}) return mlcube_args, task_args + + @staticmethod + def parse_optional_arg( + platform: t.Optional[str], + network_option: t.Optional[str], + security_option: t.Optional[str], + gpus_option: t.Optional[str], + memory_option: t.Optional[str], + cpu_option: t.Optional[str], + mount_option: t.Optional[str], + ) -> t.Tuple[DictConfig, t.Dict]: + """platform: Platform to use to run this MLCube (docker, singularity, gcp, k8s etc). + network_option: Networking options defined during MLCube container execution. + security_option: Security options defined during MLCube container execution. + gpus_option: GPU usage options defined during MLCube container execution. + memory_option: Memory RAM options defined during MLCube container execution. + cpu_option: CPU options defined during MLCube container execution. + mount_option: Mount options for paths. + """ + mlcube_args, opts = {}, {} + + opts["--mount_opts"] = mount_option + if network_option is not None: + opts["--network"] = network_option + + if security_option is not None: + key = "--security-opt" if platform == "docker" else "--security" + opts[key] = security_option + + if gpus_option is not None: + key = "--gpus" if platform == "docker" else "--nv" + opts[key] = gpus_option + + if memory_option is not None: + key = "--memory" if platform == "docker" else "--vm-ram" + opts[key] = memory_option + + if cpu_option is not None: + key = "--cpu-shares" if platform == "docker" else "--vm-cpu" + opts[key] = cpu_option + + mlcube_args[platform] = opts + return mlcube_args, {} diff --git a/mlcube/mlcube/shell.py b/mlcube/mlcube/shell.py index 61ae8ce..8f49301 100644 --- a/mlcube/mlcube/shell.py +++ b/mlcube/mlcube/shell.py @@ -12,13 +12,12 @@ from distutils import dir_util from pathlib import Path -from mlcube.config import (IOType, ParameterType, MountType) -from mlcube.errors import (ConfigurationError, ExecutionError) - from omegaconf import DictConfig +from mlcube.config import IOType, MountType, ParameterType +from mlcube.errors import ConfigurationError, ExecutionError -__all__ = ['Shell'] +__all__ = ["Shell"] logger = logging.getLogger(__name__) @@ -32,9 +31,9 @@ def null() -> str: TODO: In powershell, $null works. Is below the correct implementation? """ - if os.name == 'nt': - return 'NUL' - return '/dev/null' + if os.name == "nt": + return "NUL" + return "/dev/null" @staticmethod def parse_exec_status(status: int) -> t.Tuple[int, str]: @@ -48,21 +47,21 @@ def parse_exec_status(status: int) -> t.Tuple[int, str]: https://github.com/mlperf/training_results_v0.5/blob/7238ee7edc18f64f0869923a04de2a92418c6c28/v0.5.0/nvidia/ submission/code/translation/pytorch/cutlass/tools/external/googletest/googletest/test/gtest_test_utils.py#L185 """ - if os.name == 'nt': - exit_code, exit_status = (status, 'exited') + if os.name == "nt": + exit_code, exit_status = (status, "exited") else: if os.WIFEXITED(status): - exit_code, exit_status = (os.WEXITSTATUS(status), 'exited') + exit_code, exit_status = (os.WEXITSTATUS(status), "exited") elif os.WIFSTOPPED(status): - exit_code, exit_status = (-os.WSTOPSIG(status), 'stopped') + exit_code, exit_status = (-os.WSTOPSIG(status), "stopped") elif os.WIFSIGNALED(status): - exit_code, exit_status = (-os.WTERMSIG(status), 'signalled') + exit_code, exit_status = (-os.WTERMSIG(status), "signalled") else: - exit_code, exit_status = (status, 'na') + exit_code, exit_status = (status, "na") return exit_code, exit_status @staticmethod - def run(cmd: t.Union[str, t.List], on_error: str = 'raise') -> int: + def run(cmd: t.Union[str, t.List], on_error: str = "raise") -> int: """Run the `cmd` command in an external process. Args: @@ -76,28 +75,37 @@ def run(cmd: t.Union[str, t.List], on_error: str = 'raise') -> int: """ logger.debug("Shell.run input_arg: cmd=%s, on_error=%s)", cmd, on_error) if isinstance(cmd, t.List): - cmd = ' '.join(c for c in (c.strip() for c in cmd) if c) - logger.debug("Shell.run list->str: cmd=\"%s\")", cmd) + cmd = " ".join(c for c in (c.strip() for c in cmd) if c) + logger.debug('Shell.run list->str: cmd="%s")', cmd) - if on_error not in ('raise', 'die', 'ignore'): + if on_error not in ("raise", "die", "ignore"): raise ValueError( f"Unrecognized 'on_error' action ({on_error}). Valid options are ('raise', 'die', 'ignore')." ) status: int = os.system(cmd) exit_code, exit_status = Shell.parse_exec_status(status) - if exit_status == 'na': - logger.warning("Shell.run command (cmd=%s) did not exit properly (status=%d).", cmd, status) + if exit_status == "na": + logger.warning( + "Shell.run command (cmd=%s) did not exit properly (status=%d).", + cmd, + status, + ) - msg = f"Shell.run command='{cmd}' status={status} exit_status={exit_status} exit_code={exit_code} "\ - f"on_error={on_error}" + msg = ( + f"Shell.run command='{cmd}' status={status} exit_status={exit_status} exit_code={exit_code} " + f"on_error={on_error}" + ) if exit_code != 0: logger.error(msg) - if on_error == 'die': + if on_error == "die": sys.exit(exit_code) - if on_error == 'raise': + if on_error == "raise": raise ExecutionError( - 'Failed to execute shell command.', status=exit_status, code=exit_code, cmd=cmd + "Failed to execute shell command.", + status=exit_status, + code=exit_code, + cmd=cmd, ) else: logger.info(msg) @@ -123,8 +131,10 @@ def run_and_capture_output(cmd: t.List[str]) -> t.Tuple[int, str]: exit_code, output = err.returncode, err.output.decode() logger.debug( - "Shell.run_and_capture_output cmd=%s, exit_code=%d, output=\"%s\"", - cmd, exit_code, output.replace("\n", " ") + 'Shell.run_and_capture_output cmd=%s, exit_code=%d, output="%s"', + cmd, + exit_code, + output.replace("\n", " "), ) return exit_code, output.strip() @@ -138,12 +148,14 @@ def docker_image_exists(docker: t.Optional[str], image: str) -> bool: Returns: True if image exists, else false. """ - docker = docker or 'docker' - cmd = f'{docker} inspect --type=image {image} > {Shell.null()}' - return Shell.run(cmd, on_error='ignore') == 0 + docker = docker or "docker" + cmd = f"{docker} inspect --type=image {image} > {Shell.null()}" + return Shell.run(cmd, on_error="ignore") == 0 @staticmethod - def ssh(connection_str: str, command: t.Optional[str], on_error: str = 'raise') -> int: + def ssh( + connection_str: str, command: t.Optional[str], on_error: str = "raise" + ) -> int: """Execute a command on a remote host via SSH. Args: @@ -153,10 +165,13 @@ def ssh(connection_str: str, command: t.Optional[str], on_error: str = 'raise') """ if not command: return 0 - return Shell.run(f"ssh -o StrictHostKeyChecking=no {connection_str} '{command}'", on_error=on_error) + return Shell.run( + f"ssh -o StrictHostKeyChecking=no {connection_str} '{command}'", + on_error=on_error, + ) @staticmethod - def rsync_dirs(source: str, dest: str, on_error: str = 'raise') -> int: + def rsync_dirs(source: str, dest: str, on_error: str = "raise") -> int: """Synchronize directories. Args: @@ -179,17 +194,18 @@ def get_host_path(workspace_path: str, path_from_config: str) -> str: """ # Omega conf will resolve any variables defined in MLCube configuration file. We need to take care about `~` # (user home directory) and environment variables. - host_path = Path( - os.path.expandvars(os.path.expanduser(path_from_config)) - ) + host_path = Path(os.path.expandvars(os.path.expanduser(path_from_config))) # According to MLCube contract, relative paths are relative to MLCube workspace directory. if not host_path.is_absolute(): host_path = Path(workspace_path) / host_path return host_path.as_posix() @staticmethod - def generate_mounts_and_args(mlcube: DictConfig, task: str, - make_dirs: bool = True) -> t.Tuple[t.Dict, t.List, t.Dict]: + def generate_mounts_and_args( + mlcube: DictConfig, + task: str, + make_dirs: bool = True, + ) -> t.Tuple[t.Dict, t.List, t.Dict]: """Generate mount points, task arguments and mount options for the given task. Args: @@ -204,9 +220,25 @@ def generate_mounts_and_args(mlcube: DictConfig, task: str, - A mapping from host paths to mount options (optional). """ # First task argument is always the task name. - mounts: t.Dict[str, str] = {} # Mapping from host paths to container paths. - args: t.List[str] = [task] # List of arguments for the given task. - mounts_opts: t.Dict[str, str] = {} # Mapping from host paths to mount options (rw/ro). + mounts: t.Dict[str, str] = {} # Mapping from host paths to container paths. + args: t.List[str] = [task] # List of arguments for the given task. + mounts_opts: t.Dict[ + str, str + ] = {} # Mapping from host paths to mount options (rw/ro). + + # By design, when users provide `--mount=rw|ro` on a command line, this applies only to input parameters, + # and overrides anything that's specified in config files. + # TODO: MLCube has the concept of effective configuration. This should move there at some point in time. + mount_opts_for_input_params: t.Optional[str] = None + if isinstance(mlcube.get("runner", None), (DictConfig, dict)): + mount_opts_for_input_params = mlcube.runner.get("--mount_opts", None) + if mount_opts_for_input_params and not MountType.is_valid( + mount_opts_for_input_params + ): + raise ConfigurationError( + "Shell.generate_mounts_and_args invalid global mount options: " + f"--mount_opts={mount_opts_for_input_params}." + ) def _generate(_params: DictConfig, _io: str) -> None: """Process parameters (could be inputs or outputs). @@ -220,7 +252,9 @@ def _generate(_params: DictConfig, _io: str) -> None: if not IOType.is_valid(_io): raise ConfigurationError(f"Invalid IO = {_io}") for _param_name, _param_def in _params.items(): - assert isinstance(_param_def, DictConfig), f"Unexpected parameter definition: {_param_def}." + assert isinstance( + _param_def, DictConfig + ), f"Unexpected parameter definition: {_param_def}." if not ParameterType.is_valid(_param_def.type): raise ConfigurationError( f"Invalid task: task={task}, param={_param_name}, type={_param_def.type}. Type is invalid." @@ -232,7 +266,9 @@ def _generate(_params: DictConfig, _io: str) -> None: # and datasets. We also need to be able to resolve `~` (user home directory), as well as environment # variables (BTW, this is probably needs some discussion at some point in time). This environment # variable could be, for instance, `${HOME}`. - _host_path: str = Shell.get_host_path(mlcube.runtime.workspace, _param_def.default) + _host_path: str = Shell.get_host_path( + mlcube.runtime.workspace, _param_def.default + ) if _param_def.type == ParameterType.UNKNOWN: if _io == IOType.OUTPUT: @@ -254,30 +290,58 @@ def _generate(_params: DictConfig, _io: str) -> None: if _param_def.type == ParameterType.DIRECTORY: if make_dirs: os.makedirs(_host_path, exist_ok=True) - mounts[_host_path] = mounts.get(_host_path, f"/mlcube_io{len(mounts)}") - args.append('--{}={}'.format(_param_name, mounts[_host_path])) + mounts[_host_path] = mounts.get( + _host_path, f"/mlcube_io{len(mounts)}" + ) + args.append("--{}={}".format(_param_name, mounts[_host_path])) elif _param_def.type == ParameterType.FILE: _host_path, _file_name = os.path.split(_host_path) if make_dirs: os.makedirs(_host_path, exist_ok=True) - mounts[_host_path] = mounts.get(_host_path, f"/mlcube_io{len(mounts)}") - args.append('--{}={}'.format(_param_name, mounts[_host_path] + '/' + _file_name)) + mounts[_host_path] = mounts.get( + _host_path, f"/mlcube_io{len(mounts)}" + ) + args.append( + "--{}={}".format( + _param_name, mounts[_host_path] + "/" + _file_name + ) + ) + + mount_type: t.Optional[str] = _param_def.get("opts", None) + if _io == IOType.INPUT and mount_opts_for_input_params: + logger.debug( + "Shell.generate_mounts_and_args overriding parameter mount options (task=%s, param=%, " + "io=%s, mount_opts=%s) with global mount option (%s) for input parameters.", + task, + _param_name, + _io, + mount_type, + mount_opts_for_input_params, + ) + mount_type = mount_opts_for_input_params - mount_type: t.Optional[str] = _param_def.get('opts', None) if mount_type: - if not MountType.is_valid(_param_def.opts): + if not MountType.is_valid(mount_type): raise ConfigurationError( - f"Invalid mount options: mount={task}, param={_param_name}, opts={_param_def.opts}." + f"Invalid mount options: task={task}, param={_param_name}, opts={mount_type}." ) if mount_type == MountType.RO and _io == IOType.OUTPUT: logger.warning( - "Task's (%s) parameter (%s) is OUTPUT and requested to mount as RO.", task, _param_name + "Task's (%s) parameter (%s) is OUTPUT and requested to mount as RO.", + task, + _param_name, ) - if _host_path in mounts_opts and mounts_opts[_host_path] != mount_type: + if ( + _host_path in mounts_opts + and mounts_opts[_host_path] != mount_type + ): logger.warning( - "Conflicting mount options found. Host path (%s) has already been requested to mount as " - "'%s', but new parameter (%s) requests to mount as '%s'.", - _host_path, mounts_opts[_host_path], _param_name, mount_type + "Conflicting mount options found. Host path (%s) has already been requested to mount " + "as '%s', but new parameter (%s) requests to mount as '%s'.", + _host_path, + mounts_opts[_host_path], + _param_name, + mount_type, ) # Since we can only have `ro`/`rw`, we'll set the mount option to `rw`. mount_type = MountType.RW @@ -285,17 +349,23 @@ def _generate(_params: DictConfig, _io: str) -> None: mounts_opts[_host_path] = mount_type logger.info( "Host path (%s) for parameter '%s' will be mounted with '%s' option.", - _host_path, _param_name, mount_type + _host_path, + _param_name, + mount_type, ) - params = mlcube.tasks[task].parameters # Dictionary of input and output parameters for the task. - _generate(params.inputs, IOType.INPUT) # Process input parameters. - _generate(params.outputs, IOType.OUTPUT) # Process output parameters. + params = mlcube.tasks[ + task + ].parameters # Dictionary of input and output parameters for the task. + _generate(params.inputs, IOType.INPUT) # Process input parameters. + _generate(params.outputs, IOType.OUTPUT) # Process output parameters. return mounts, args, mounts_opts @staticmethod - def to_cli_args(args: t.Mapping[str, t.Any], sep: str = '=', parent_arg: t.Optional[str] = None) -> str: + def to_cli_args( + args: t.Mapping[str, t.Any], sep: str = "=", parent_arg: t.Optional[str] = None + ) -> str: """Convert dict to CLI arguments. Args: @@ -303,8 +373,8 @@ def to_cli_args(args: t.Mapping[str, t.Any], sep: str = '=', parent_arg: t.Optio sep: Key-value separator. For build args and environment variables it's '=', for mount points it is ':'. parent_arg: If not None, a parent parameter name for each arg in args, e.g. --build-arg """ - parent_arg = '' if not parent_arg else parent_arg + ' ' - return ' '.join(f'{parent_arg}{k}{sep}{v}' for k, v in args.items()) + parent_arg = "" if not parent_arg else parent_arg + " " + return " ".join(f"{parent_arg}{k}{sep}{v}" for k, v in args.items()) @staticmethod def sync_workspace(target_mlcube: DictConfig, task: str) -> None: @@ -316,6 +386,7 @@ def sync_workspace(target_mlcube: DictConfig, task: str) -> None: refer to the MLCube configuration with default (internal) workspace. task: Task name to be executed. """ + def _storage_not_supported(_uri: str) -> str: """Raise an exception if the given URI is not supported. @@ -323,56 +394,107 @@ def _storage_not_supported(_uri: str) -> str: _uri: URI to check. If it starts with `storage:` (yet unsupported schema), raise an exception. """ _uri = _uri.strip() - if _uri.startswith('storage:'): - raise NotImplementedError(f"Storage protocol (uri={_uri}) is not supported yet.") + if _uri.startswith("storage:"): + raise NotImplementedError( + f"Storage protocol (uri={_uri}) is not supported yet." + ) return _uri def _is_inside_workspace(_workspace: str, _artifact: str) -> bool: """Check if artifact is inside this workspace. Workspace directory and artifact must exist.""" - return os.path.commonpath([_workspace]) == os.path.commonpath([_workspace, _artifact]) + return os.path.commonpath([_workspace]) == os.path.commonpath( + [_workspace, _artifact] + ) - def _is_ok(_parameter: str, _kind: str, _workspace: str, _artifact: str, _must_exist: bool) -> bool: + def _is_ok( + _parameter: str, + _kind: str, + _workspace: str, + _artifact: str, + _must_exist: bool, + ) -> bool: """Return true if this artifact needs to be synced.""" if not _is_inside_workspace(_workspace, _artifact): - logger.debug("[sync_workspace] task = %s, parameter = %s, artifact is not inside %s workspace " - "(workspace = %s, uri = %s)", task, _parameter, _kind, _workspace, _artifact) + logger.debug( + "[sync_workspace] task = %s, parameter = %s, artifact is not inside %s workspace " + "(workspace = %s, uri = %s)", + task, + _parameter, + _kind, + _workspace, + _artifact, + ) return False if _must_exist and not os.path.exists(_artifact): - logger.debug("[sync_workspace] task = %s, parameter = %s, artifact does not exist in %s workspace " - "(workspace = %s, uri = %s)", task, _parameter, _kind, _workspace, _artifact) + logger.debug( + "[sync_workspace] task = %s, parameter = %s, artifact does not exist in %s workspace " + "(workspace = %s, uri = %s)", + task, + _parameter, + _kind, + _workspace, + _artifact, + ) return False if not _must_exist and os.path.exists(_artifact): - logger.debug("[sync_workspace] task = %s, parameter = %s, artifact exists in %s workspace " - "(workspace = %s, uri = %s)", task, _parameter, _kind, _workspace, _artifact) + logger.debug( + "[sync_workspace] task = %s, parameter = %s, artifact exists in %s workspace " + "(workspace = %s, uri = %s)", + task, + _parameter, + _kind, + _workspace, + _artifact, + ) return False return True def _is_task_output(_target_artifact: str, _input_parameter: str) -> bool: """Check of this artifact is an output of some task.""" for _task_name, _task_def in target_mlcube.tasks.items(): - for _output_param_name, _output_param_def in _task_def.parameters.outputs.items(): - _target_output_artifact: str = \ - Path(target_workspace) / _storage_not_supported(_output_param_def.default) + for ( + _output_param_name, + _output_param_def, + ) in _task_def.parameters.outputs.items(): + _target_output_artifact: str = Path( + target_workspace + ) / _storage_not_supported(_output_param_def.default) # Can't really use `os.path.samefile` here since files may not exist. # if os.path.samefile(_target_artifact, _target_output_artifact): if _target_artifact == _target_output_artifact: - logger.debug("[sync_workspace] task = %s, parameter = %s is an output of task = %s, " - "parameter = %s", task, _input_parameter, _task_name, _output_param_name) + logger.debug( + "[sync_workspace] task = %s, parameter = %s is an output of task = %s, " + "parameter = %s", + task, + _input_parameter, + _task_name, + _output_param_name, + ) return True return False # Check if actual workspace is not internal one (which is default workspace). - target_workspace = os.path.abspath(_storage_not_supported(target_mlcube.runtime.workspace)) + target_workspace = os.path.abspath( + _storage_not_supported(target_mlcube.runtime.workspace) + ) os.makedirs(target_workspace, exist_ok=True) - source_workspace = os.path.abspath(Path(target_mlcube.runtime.root) / 'workspace') + source_workspace = os.path.abspath( + Path(target_mlcube.runtime.root) / "workspace" + ) if not os.path.exists(source_workspace): - logger.debug("[sync_workspace] source workspace (%s) does not exist, nothing to sync.", source_workspace) + logger.debug( + "[sync_workspace] source workspace (%s) does not exist, nothing to sync.", + source_workspace, + ) return if os.path.samefile(target_workspace, source_workspace): - logger.debug("[sync_workspace] target workspace (%s) is the same as source workspace (%s).", - target_workspace, source_workspace) + logger.debug( + "[sync_workspace] target workspace (%s) is the same as source workspace (%s).", + target_workspace, + source_workspace, + ) return if task not in target_mlcube.tasks: @@ -392,13 +514,21 @@ def _is_task_output(_target_artifact: str, _input_parameter: str) -> bool: # means that the `storage` section defines some storage labelled as `home`, and MLCube needs to use # ${name} path within that storage. - source_uri: str = Path(source_workspace) / _storage_not_supported(input_def.default) + source_uri: str = Path(source_workspace) / _storage_not_supported( + input_def.default + ) - if not _is_ok(input_name, 'source', source_workspace, source_uri, _must_exist=True): + if not _is_ok( + input_name, "source", source_workspace, source_uri, _must_exist=True + ): continue - target_uri: str = Path(target_workspace) / _storage_not_supported(input_def.default) - if not _is_ok(input_name, 'target', target_workspace, target_uri, _must_exist=False): + target_uri: str = Path(target_workspace) / _storage_not_supported( + input_def.default + ) + if not _is_ok( + input_name, "target", target_workspace, target_uri, _must_exist=False + ): continue if _is_task_output(target_uri, input_name): @@ -411,5 +541,10 @@ def _is_task_output(_target_artifact: str, _input_parameter: str) -> bool: dir_util.copy_tree(source_uri, target_uri) else: raise RuntimeError(f"Unknown artifact type ({source_uri}).") - logger.debug("[sync_workspace] task = %s, parameter = %s, source (%s) copied to target (%s).", - task, input_name, source_uri, target_uri) + logger.debug( + "[sync_workspace] task = %s, parameter = %s, source (%s) copied to target (%s).", + task, + input_name, + source_uri, + target_uri, + ) diff --git a/mlcube/mlcube/tests/test_parser.py b/mlcube/mlcube/tests/test_parser.py index 076ca79..49d801f 100644 --- a/mlcube/mlcube/tests/test_parser.py +++ b/mlcube/mlcube/tests/test_parser.py @@ -2,53 +2,71 @@ import typing as t from unittest import TestCase -from mlcube.parser import CliParser, MLCubeDirectory +from omegaconf import DictConfig, OmegaConf -from omegaconf import (DictConfig, OmegaConf) +from mlcube.parser import CliParser, MLCubeDirectory class TestParser(TestCase): def setUp(self) -> None: - if 'SINGULARITYENV_CUDA_VISIBLE_DEVICES' in os.environ: - self._singularityenv_cuda_visible_devices = os.environ['SINGULARITYENV_CUDA_VISIBLE_DEVICES'] + if "SINGULARITYENV_CUDA_VISIBLE_DEVICES" in os.environ: + self._singularityenv_cuda_visible_devices = os.environ[ + "SINGULARITYENV_CUDA_VISIBLE_DEVICES" + ] def tearDown(self) -> None: - if hasattr(self, '_singularityenv_cuda_visible_devices'): - os.environ['SINGULARITYENV_CUDA_VISIBLE_DEVICES'] = self._singularityenv_cuda_visible_devices - elif 'SINGULARITYENV_CUDA_VISIBLE_DEVICES' in os.environ: - del os.environ['SINGULARITYENV_CUDA_VISIBLE_DEVICES'] - - def _check_mlcube_directory(self, mlcube: MLCubeDirectory, path: str, file: str) -> None: + if hasattr(self, "_singularityenv_cuda_visible_devices"): + os.environ[ + "SINGULARITYENV_CUDA_VISIBLE_DEVICES" + ] = self._singularityenv_cuda_visible_devices + elif "SINGULARITYENV_CUDA_VISIBLE_DEVICES" in os.environ: + del os.environ["SINGULARITYENV_CUDA_VISIBLE_DEVICES"] + + def _check_mlcube_directory( + self, mlcube: MLCubeDirectory, path: str, file: str + ) -> None: self.assertIsInstance(mlcube, MLCubeDirectory) self.assertEqual(mlcube.path, path) self.assertEqual(mlcube.file, file) def test_mlcube_instances(self) -> None: self._check_mlcube_directory(MLCubeDirectory(), os.getcwd(), "mlcube.yaml") - self._check_mlcube_directory(MLCubeDirectory(os.getcwd()), os.getcwd(), "mlcube.yaml") + self._check_mlcube_directory( + MLCubeDirectory(os.getcwd()), os.getcwd(), "mlcube.yaml" + ) def test_cli_parser(self) -> None: for method_name in ("parse_mlcube_arg", "parse_list_arg", "parse_extra_arg"): self.assertTrue(getattr(CliParser, method_name)) def test_parse_mlcube_arg(self) -> None: - self._check_mlcube_directory(CliParser.parse_mlcube_arg(os.getcwd()), os.getcwd(), "mlcube.yaml") - self._check_mlcube_directory(CliParser.parse_mlcube_arg(None), os.getcwd(), "mlcube.yaml") + self._check_mlcube_directory( + CliParser.parse_mlcube_arg(os.getcwd()), os.getcwd(), "mlcube.yaml" + ) + self._check_mlcube_directory( + CliParser.parse_mlcube_arg(None), os.getcwd(), "mlcube.yaml" + ) def test_parse_list_arg(self) -> None: for arg in ("", None): - self.assertListEqual(CliParser.parse_list_arg(arg, 'main'), ['main']) + self.assertListEqual(CliParser.parse_list_arg(arg, "main"), ["main"]) - self.assertListEqual(CliParser.parse_list_arg('download'), ['download']) - self.assertListEqual(CliParser.parse_list_arg('download,train'), ['download', 'train']) + self.assertListEqual(CliParser.parse_list_arg("download"), ["download"]) + self.assertListEqual( + CliParser.parse_list_arg("download,train"), ["download", "train"] + ) def _check_cli_args( - self, - actual_mlcube_args: DictConfig, actual_task_args: t.Dict, - expected_mlcube_args: t.Dict, expected_task_args: t.Dict + self, + actual_mlcube_args: DictConfig, + actual_task_args: t.Dict, + expected_mlcube_args: t.Dict, + expected_task_args: t.Dict, ) -> None: self.assertIsInstance(actual_mlcube_args, DictConfig) - self.assertEqual(OmegaConf.to_container(actual_mlcube_args), expected_mlcube_args) + self.assertEqual( + OmegaConf.to_container(actual_mlcube_args), expected_mlcube_args + ) self.assertIsInstance(actual_task_args, dict) self.assertEqual(actual_task_args, expected_task_args) @@ -59,15 +77,21 @@ def test_parse_extra_args_unparsed(self) -> None: "-Pdocker.image=IMAGE_NAME", "data_config=/configs/data.yaml", "-Pplatform.host_memory_gb=30", - "data_dir=/data/imagenet" + "data_dir=/data/imagenet", ], - parsed_args={} + parsed_args={}, ) self._check_cli_args( actual_mlcube_args=mlcube_args, actual_task_args=task_args, - expected_mlcube_args={'docker': {'image': 'IMAGE_NAME'}, 'platform': {'host_memory_gb': 30}}, - expected_task_args={'data_config': '/configs/data.yaml', 'data_dir': '/data/imagenet'} + expected_mlcube_args={ + "docker": {"image": "IMAGE_NAME"}, + "platform": {"host_memory_gb": 30}, + }, + expected_task_args={ + "data_config": "/configs/data.yaml", + "data_dir": "/data/imagenet", + }, ) def test_parse_extra_args_parsed_docker(self) -> None: @@ -75,22 +99,28 @@ def test_parse_extra_args_parsed_docker(self) -> None: unparsed_args=[], parsed_args={ "platform": "docker", - "network": "NETWORK_1", "security": "SECURITY_1", "gpus": "GPUS_1", "memory": "MEMORY_1", "cpu": "CPU_1" - } + "network": "NETWORK_1", + "security": "SECURITY_1", + "gpus": "GPUS_1", + "memory": "MEMORY_1", + "cpu": "CPU_1", + "mount": "MOUNT_1", + }, ) self._check_cli_args( actual_mlcube_args=mlcube_args, actual_task_args=task_args, expected_mlcube_args={ - 'docker': { - '--network': 'NETWORK_1', - '--security-opt': 'SECURITY_1', - '--gpus': 'GPUS_1', - '--memory': 'MEMORY_1', - '--cpuset-cpus': 'CPU_1' + "docker": { + "--network": "NETWORK_1", + "--security-opt": "SECURITY_1", + "--gpus": "GPUS_1", + "--memory": "MEMORY_1", + "--cpuset-cpus": "CPU_1", + "--mount_opts": "MOUNT_1", } }, - expected_task_args={} + expected_task_args={}, ) def test_parse_extra_args_parsed_singularity(self) -> None: @@ -98,22 +128,28 @@ def test_parse_extra_args_parsed_singularity(self) -> None: unparsed_args=[], parsed_args={ "platform": "singularity", - "network": "NETWORK_2", "security": "SECURITY_2", "gpus": "GPUS_2", "memory": "MEMORY_2", "cpu": "CPU_2" - } + "network": "NETWORK_2", + "security": "SECURITY_2", + "gpus": "GPUS_2", + "memory": "MEMORY_2", + "cpu": "CPU_2", + "mount": "MOUNT_2", + }, ) self._check_cli_args( actual_mlcube_args=mlcube_args, actual_task_args=task_args, expected_mlcube_args={ - 'singularity': { - '--network': 'NETWORK_2', - '--security': 'SECURITY_2', - '--nv': '', - '--vm-ram': 'MEMORY_2', - '--vm-cpu': 'CPU_2' + "singularity": { + "--network": "NETWORK_2", + "--security": "SECURITY_2", + "--nv": "", + "--vm-ram": "MEMORY_2", + "--vm-cpu": "CPU_2", + "--mount_opts": "MOUNT_2", } }, - expected_task_args={} + expected_task_args={}, ) - self.assertIn('SINGULARITYENV_CUDA_VISIBLE_DEVICES', os.environ) - self.assertEqual(os.environ['SINGULARITYENV_CUDA_VISIBLE_DEVICES'], 'GPUS_2') + self.assertIn("SINGULARITYENV_CUDA_VISIBLE_DEVICES", os.environ) + self.assertEqual(os.environ["SINGULARITYENV_CUDA_VISIBLE_DEVICES"], "GPUS_2") diff --git a/mlcube/mlcube/tests/test_shell.py b/mlcube/mlcube/tests/test_shell.py index 8bbe883..90d95cc 100644 --- a/mlcube/mlcube/tests/test_shell.py +++ b/mlcube/mlcube/tests/test_shell.py @@ -1,45 +1,65 @@ import typing as t from unittest import TestCase +from omegaconf import DictConfig, OmegaConf + +from mlcube.config import MountType from mlcube.errors import ExecutionError from mlcube.shell import Shell -from omegaconf import DictConfig - class TestShell(TestCase): def test_run_01(self) -> None: - for cmd in ('python --version', ['python', '--version']): + for cmd in ("python --version", ["python", "--version"]): for die_on_error in (True, False): - exit_code = Shell.run(cmd, on_error='die') - self.assertEqual(exit_code, 0, f"cmd = {cmd}, die_on_error = {die_on_error}") + exit_code = Shell.run(cmd, on_error="die") + self.assertEqual( + exit_code, 0, f"cmd = {cmd}, die_on_error = {die_on_error}" + ) def test_run_02(self) -> None: cmds = [ 'python -c "print(message)"', 'python -c "import os, signal; os.kill(os.getpid(), signal.SIGUSR1);"', - '8389dfb48c6f4a1aaa16bdda76c1fb11' + "8389dfb48c6f4a1aaa16bdda76c1fb11", ] for cmd in cmds: - exit_code = Shell.run(cmd, on_error='ignore') + exit_code = Shell.run(cmd, on_error="ignore") self.assertGreater(exit_code, 0, f"cmd = {cmd}") def test_run_03(self) -> None: with self.assertRaises(ExecutionError): - _ = Shell.run('python -c "print(message)"', on_error='raise') + _ = Shell.run('python -c "print(message)"', on_error="raise") def test_run_and_capture_output(self) -> None: exit_code, version_str = Shell.run_and_capture_output(["python", "--version"]) - self.assertEqual(exit_code, 0, "Expecting exit code to be zero for `python --version`") - self.assertTrue(version_str.startswith("Python"), "Expecting version string to start with `Python`.") + self.assertEqual( + exit_code, 0, "Expecting exit code to be zero for `python --version`" + ) + self.assertTrue( + version_str.startswith("Python"), + "Expecting version string to start with `Python`.", + ) - exit_code, version_str = Shell.run_and_capture_output(["python", "-c" "print(message)"]) - self.assertNotEqual(exit_code, 0, "Expecting exit code to be non zero for `python -c 'print(message)'.`") - self.assertIn("NameError: name 'message' is not defined", version_str, "No expected error.") + exit_code, version_str = Shell.run_and_capture_output( + ["python", "-c" "print(message)"] + ) + self.assertNotEqual( + exit_code, + 0, + "Expecting exit code to be non zero for `python -c 'print(message)'.`", + ) + self.assertIn( + "NameError: name 'message' is not defined", + version_str, + "No expected error.", + ) def test_generate_mount_points(self) -> None: def _call_with_type_check(_task: str) -> t.Tuple[t.Dict, t.List, t.Dict]: - _mounts, _args, _mounts_opts = Shell.generate_mounts_and_args(_mlcube_config, _task, make_dirs=False) + _mounts, _args, _mounts_opts = Shell.generate_mounts_and_args( + _mlcube_config, _task, make_dirs=False + ) self.assertIsInstance(_mounts, dict, "Invalid mounts dictionary") self.assertIsInstance(_args, list, "Invalid args list") @@ -48,59 +68,90 @@ def _call_with_type_check(_task: str) -> t.Tuple[t.Dict, t.List, t.Dict]: return _mounts, _args, _mounts_opts # Test Case 1 - mounts, args, mounts_opts = _call_with_type_check('process') + mounts, args, mounts_opts = _call_with_type_check("process") self.assertDictEqual( mounts, { - '/mlcube/workspace/input': '/mlcube_io0', - '/mlcube/workspace/output': '/mlcube_io1' - } + "/mlcube/workspace/input": "/mlcube_io0", + "/mlcube/workspace/output": "/mlcube_io1", + }, ) - self.assertListEqual(args, ['process', '--input_dir=/mlcube_io0', '--output_dir=/mlcube_io1']) - self.assertDictEqual( - mounts_opts, - {'/mlcube/workspace/output': 'rw'} + self.assertListEqual( + args, ["process", "--input_dir=/mlcube_io0", "--output_dir=/mlcube_io1"] ) + self.assertDictEqual(mounts_opts, {"/mlcube/workspace/output": "rw"}) # Test Case 2 - mounts, args, mounts_opts = _call_with_type_check('split') + mounts, args, mounts_opts = _call_with_type_check("split") self.assertDictEqual( mounts, { - '/mlcube/workspace/input': '/mlcube_io0', - '/mlcube/workspace': '/mlcube_io1', - '/datasets/my_split': '/mlcube_io2' - } + "/mlcube/workspace/input": "/mlcube_io0", + "/mlcube/workspace": "/mlcube_io1", + "/datasets/my_split": "/mlcube_io2", + }, ) self.assertListEqual( args, - ['split', '--input_dir=/mlcube_io0', '--config=/mlcube_io1/config.yaml', '--output_dir=/mlcube_io2'] + [ + "split", + "--input_dir=/mlcube_io0", + "--config=/mlcube_io1/config.yaml", + "--output_dir=/mlcube_io2", + ], ) self.assertDictEqual( - mounts_opts, - {'/mlcube/workspace': 'ro', '/datasets/my_split': 'rw'} + mounts_opts, {"/mlcube/workspace": "ro", "/datasets/my_split": "rw"} + ) + + def test_generate_mount_points_with_global_mount(self) -> None: + _mounts, _args, _mounts_opts = Shell.generate_mounts_and_args( + OmegaConf.merge( + _mlcube_config.copy(), {"runner": {"--mount_opts": MountType.RO}} + ), + "process", + make_dirs=False, ) + # Two distinct options must present (ro for input_dir and rw for output_dir) + self.assertSetEqual(set(_mounts_opts.values()), {"rw", "ro"}) -_mlcube_config = DictConfig({ - 'runtime': {'workspace': '/mlcube/workspace'}, - 'tasks': { - 'process': { - 'parameters': { - 'inputs': {'input_dir': {'type': 'directory', 'default': 'input'}}, - 'outputs': {'output_dir': {'type': 'directory', 'default': 'output', 'opts': 'rw'}} - } - }, - 'split': { - 'parameters': { - 'inputs': { - 'input_dir': {'type': 'directory', 'default': 'input'}, - 'config': {'type': 'file', 'default': 'config.yaml', 'opts': 'ro'} - }, - 'outputs': { - 'output_dir': {'type': 'directory', 'default': '/datasets/my_split', 'opts': 'rw'} + +_mlcube_config = DictConfig( + { + "runtime": {"workspace": "/mlcube/workspace"}, + "tasks": { + "process": { + "parameters": { + "inputs": {"input_dir": {"type": "directory", "default": "input"}}, + "outputs": { + "output_dir": { + "type": "directory", + "default": "output", + "opts": "rw", + } + }, + } + }, + "split": { + "parameters": { + "inputs": { + "input_dir": {"type": "directory", "default": "input"}, + "config": { + "type": "file", + "default": "config.yaml", + "opts": "ro", + }, + }, + "outputs": { + "output_dir": { + "type": "directory", + "default": "/datasets/my_split", + "opts": "rw", + } + }, } - } - } + }, + }, } -}) +) diff --git a/runners/mlcube_docker/mlcube_docker/docker_run.py b/runners/mlcube_docker/mlcube_docker/docker_run.py index 1d73bfa..a8f918f 100644 --- a/runners/mlcube_docker/mlcube_docker/docker_run.py +++ b/runners/mlcube_docker/mlcube_docker/docker_run.py @@ -89,6 +89,7 @@ def validate(build_strategy: t.Text) -> None: "--gpus": None, # GPU usage options defined during MLCube container execution. "--memory": None, # RAM options defined during MLCube container execution. "--cpuset-cpus": None, # CPU cores options for Docker. + "--mount_opts": "", # Mount options for Docker volumes. } ) @@ -259,7 +260,7 @@ def run(self) -> None: extra_args_list = [ f"{key}={value}" for key, value in self.mlcube.runner.items() - if key.startswith("--") and value is not None + if key.startswith("--") and value is not None and key != "--mount_opts" ] extra_args = " ".join(extra_args_list) if extra_args: diff --git a/runners/mlcube_singularity/mlcube_singularity/singularity_run.py b/runners/mlcube_singularity/mlcube_singularity/singularity_run.py index 139ec38..d93ff50 100644 --- a/runners/mlcube_singularity/mlcube_singularity/singularity_run.py +++ b/runners/mlcube_singularity/mlcube_singularity/singularity_run.py @@ -2,7 +2,7 @@ import typing as t from pathlib import Path -from mlcube_singularity.singularity_client import Client, DockerHubClient, ImageSpec +from mlcube_singularity.singularity_client import Client, DockerHubClient from omegaconf import DictConfig, OmegaConf from mlcube.errors import ConfigurationError, ExecutionError, MLCubeError @@ -34,6 +34,7 @@ class Config(RunnerConfig): "--nv": None, # usage options defined during MLCube container execution. "--vm-ram": None, # RAM options defined during MLCube container execution. "--vm-cpu": None, # CPU options defined during MLCube container execution. + "--mount_opts": "", # Mount options for Singularity volumes. } ) @@ -129,7 +130,7 @@ def _get_extra_args(self) -> str: extra_args = [ f"{key}={value}" for key, value in self.mlcube.runner.items() - if key.startswith("--") and value is not None + if key.startswith("--") and value is not None and key != "--mount_opts" ] return " ".join(extra_args)