diff --git a/ads/common/auth.py b/ads/common/auth.py index 7058484dd..48c1dcfd6 100644 --- a/ads/common/auth.py +++ b/ads/common/auth.py @@ -1084,7 +1084,6 @@ def __enter__(self): """ self.previous_state = copy.deepcopy(AuthState()) set_auth(**self.kwargs) - return default_signer() def __exit__(self, exc_type, exc_val, exc_tb): """ diff --git a/ads/common/object_storage_details.py b/ads/common/object_storage_details.py index 9becf72cb..63642b0f2 100644 --- a/ads/common/object_storage_details.py +++ b/ads/common/object_storage_details.py @@ -1,7 +1,7 @@ #!/usr/bin/env python # -*- coding: utf-8 -*-- -# Copyright (c) 2021, 2022 Oracle and/or its affiliates. +# Copyright (c) 2021, 2023 Oracle and/or its affiliates. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ import json @@ -15,7 +15,7 @@ from ads.common import oci_client -class InvalidObjectStoragePath(Exception): # pragma: no cover +class InvalidObjectStoragePath(Exception): # pragma: no cover """Invalid Object Storage Path.""" pass @@ -137,4 +137,4 @@ def is_oci_path(uri: str = None) -> bool: """ if not uri: return False - return uri.startswith("oci://") + return uri.lower().startswith("oci://") diff --git a/ads/common/utils.py b/ads/common/utils.py index 51dddbe92..920d68801 100644 --- a/ads/common/utils.py +++ b/ads/common/utils.py @@ -1324,11 +1324,15 @@ def copy_file( If a destination file exists and `force_overwrite` set to `False`. """ chunk_size = chunk_size or DEFAULT_BUFFER_SIZE - auth = auth or authutil.default_signer() if not os.path.basename(uri_dst): uri_dst = os.path.join(uri_dst, os.path.basename(uri_src)) src_path_scheme = urlparse(uri_src).scheme or "file" + + auth = auth or {} + if src_path_scheme.lower() == "oci" and not auth: + auth = authutil.default_signer() + src_file_system = fsspec.filesystem(src_path_scheme, **auth) if not fsspec.filesystem(src_path_scheme, **auth).exists(uri_src): diff --git a/ads/opctl/backend/ads_dataflow.py b/ads/opctl/backend/ads_dataflow.py index eb9848c14..2d4e76c6c 100644 --- a/ads/opctl/backend/ads_dataflow.py +++ b/ads/opctl/backend/ads_dataflow.py @@ -26,6 +26,7 @@ from ads.opctl.constants import OPERATOR_MODULE_PATH from ads.opctl.decorator.common import print_watch_command from ads.opctl.operator.common.const import ENV_OPERATOR_ARGS +from ads.opctl.operator.common.operator_loader import OperatorInfo, OperatorLoader REQUIRED_FIELDS = [ "compartment_id", @@ -214,10 +215,36 @@ def watch(self): class DataFlowOperatorBackend(DataFlowBackend): - """Backend class to run operator on Data Flow Applications.""" + """ + Backend class to run operator on Data Flow Application. + + Attributes + ---------- + runtime_config: (Dict) + The runtime config for the operator. + operator_config: (Dict) + The operator specification config. + operator_type: str + The type of the operator. + operator_version: str + The version of the operator. + job: Job + The Data Science Job. + """ + + def __init__(self, config: Dict, operator_info: OperatorInfo = None) -> None: + """ + Instantiates the operator backend. - def __init__(self, config: Dict) -> None: - super().__init__(config=config) + Parameters + ---------- + config: (Dict) + The configuration file containing operator's specification details and execution section. + operator_info: (OperatorInfo, optional) + The operator's detailed information extracted from the operator.__init__ file. + Will be extracted from the operator type in case if not provided. + """ + super().__init__(config=config or {}) self.job = None @@ -231,13 +258,15 @@ def __init__(self, config: Dict) -> None: } self.operator_type = self.operator_config.get("type", "unknown") self.operator_version = self.operator_config.get("version", "unknown") + self.operator_info = operator_info def _adjust_common_information(self): """Adjusts common information of the application.""" if self.job.name.lower().startswith("{job"): self.job.with_name( - f"job_{self.operator_type.lower()}" f"_{self.operator_version.lower()}" + f"job_{self.operator_info.name.lower()}" + f"_{self.operator_version.lower()}" ) self.job.runtime.with_maximum_runtime_in_minutes( self.config["execution"].get("max_wait_time", 1200) @@ -247,7 +276,7 @@ def _adjust_common_information(self): # prepare run.py file to run the operator script_file = os.path.join( - temp_dir, f"{self.operator_type}_{int(time.time())}_run.py" + temp_dir, f"{self.operator_info.name}_{int(time.time())}_run.py" ) operator_module = f"{OPERATOR_MODULE_PATH}.{self.operator_type}" @@ -291,6 +320,9 @@ def run(self, **kwargs: Dict) -> Union[Dict, None]: """ Runs the operator on the Data Flow service. """ + if not self.operator_info: + self.operator_info = OperatorLoader.from_uri(self.operator_type).load() + self.job = Job.from_dict(self.runtime_config).build() # adjust job's common information diff --git a/ads/opctl/backend/ads_ml_job.py b/ads/opctl/backend/ads_ml_job.py index 114c8592d..96177351f 100644 --- a/ads/opctl/backend/ads_ml_job.py +++ b/ads/opctl/backend/ads_ml_job.py @@ -5,8 +5,6 @@ # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ import copy -import importlib -import inspect import json import os import shlex @@ -31,12 +29,13 @@ from ads.opctl import logger from ads.opctl.backend.base import Backend, RuntimeFactory from ads.opctl.config.resolver import ConfigResolver -from ads.opctl.constants import DEFAULT_IMAGE_SCRIPT_DIR, OPERATOR_MODULE_PATH +from ads.opctl.constants import DEFAULT_IMAGE_SCRIPT_DIR from ads.opctl.decorator.common import print_watch_command from ads.opctl.distributed.common.cluster_config_helper import ( ClusterConfigToJobSpecConverter, ) from ads.opctl.operator.common.const import ENV_OPERATOR_ARGS +from ads.opctl.operator.common.operator_loader import OperatorInfo, OperatorLoader REQUIRED_FIELDS = [ "project_id", @@ -528,10 +527,41 @@ def run(self, cluster_info, dry_run=False) -> None: class MLJobOperatorBackend(MLJobBackend): - """Backend class to run operator on Data Science Jobs.""" + """ + Backend class to run operator on Data Science Jobs. + Currently supported two scenarios: + * Running operator within container runtime. + * Running operator within python runtime. + + Attributes + ---------- + runtime_config: (Dict) + The runtime config for the operator. + operator_config: (Dict) + The operator specification config. + operator_type: str + The type of the operator. + operator_version: str + The version of the operator. + operator_info: OperatorInfo + The detailed information about the operator. + job: Job + The Data Science Job. + """ + + def __init__(self, config: Dict, operator_info: OperatorInfo = None) -> None: + """ + Instantiates the operator backend. - def __init__(self, config: Dict) -> None: - super().__init__(config=config) + Parameters + ---------- + config: (Dict) + The configuration file containing operator's specification details and execution section. + operator_info: (OperatorInfo, optional) + The operator's detailed information extracted from the operator.__init__ file. + Will be extracted from the operator type in case if not provided. + """ + super().__init__(config=config or {}) self.job = None @@ -552,12 +582,15 @@ def __init__(self, config: Dict) -> None: PythonRuntime().type: self._adjust_python_runtime, } + self.operator_info = operator_info + def _adjust_common_information(self): """Adjusts common information of the job.""" if self.job.name.lower().startswith("{job"): self.job.with_name( - f"job_{self.operator_type.lower()}" f"_{self.operator_version.lower()}" + f"job_{self.operator_info.name.lower()}" + f"_{self.operator_version.lower()}" ) self.job.runtime.with_maximum_runtime_in_minutes( self.config["execution"].get("max_wait_time", 1200) @@ -571,7 +604,7 @@ def _adjust_container_runtime(self): [ "python3", "-m", - f"{self.operator_type}", + f"{self.operator_info.name}", ] ) self.job.runtime.with_environment_variable( @@ -591,20 +624,15 @@ def _adjust_python_runtime(self): # prepare run.sh file to run the operator's code script_file = os.path.join( - temp_dir, f"{self.operator_type}_{int(time.time())}_run.sh" + temp_dir, f"{self.operator_info.name}_{int(time.time())}_run.sh" ) with open(script_file, "w") as fp: - fp.write(f"python3 -m {self.operator_type}") + fp.write(f"python3 -m {self.operator_info.name}") # copy the operator's source code to the temporary folder - operator_source_folder = os.path.dirname( - inspect.getfile( - importlib.import_module(f"{OPERATOR_MODULE_PATH}.{self.operator_type}") - ) - ) shutil.copytree( - operator_source_folder.rstrip("/"), - os.path.join(temp_dir, self.operator_type), + self.operator_info.path.rstrip("/"), + os.path.join(temp_dir, self.operator_info.name), dirs_exist_ok=True, ) @@ -628,6 +656,9 @@ def run(self, **kwargs: Dict) -> Union[Dict, None]: """ Runs the operator on the Data Science Jobs. """ + if not self.operator_info: + self.operator_info = OperatorLoader.from_uri(self.operator_type).load() + self.job = Job.from_dict(self.runtime_config).build() # adjust job's common information diff --git a/ads/opctl/backend/local.py b/ads/opctl/backend/local.py index ee0b7baf0..73b4d3d20 100644 --- a/ads/opctl/backend/local.py +++ b/ads/opctl/backend/local.py @@ -39,13 +39,12 @@ DEFAULT_NOTEBOOK_SESSION_SPARK_CONF_DIR, ML_JOB_GPU_IMAGE, ML_JOB_IMAGE, - OPERATOR_MODULE_PATH, ) from ads.opctl.distributed.cmds import load_ini, local_run from ads.opctl.model.cmds import _download_model from ads.opctl.operator import __operators__ from ads.opctl.operator.common.const import ENV_OPERATOR_ARGS -from ads.opctl.operator.common.errors import OperatorNotFoundError +from ads.opctl.operator.common.operator_loader import OperatorInfo, OperatorLoader from ads.opctl.operator.runtime import const as operator_runtime_const from ads.opctl.operator.runtime import runtime as operator_runtime from ads.opctl.spark.cmds import ( @@ -830,9 +829,38 @@ def _get_conda_info_from_runtime(artifact_dir): class LocalOperatorBackend(Backend): - """Class representing local operator backend.""" + """ + The local operator backend to execute operator in the local environment. + Currently supported two scenarios: + * Running operator within local conda environment. + * Running operator within local container. + + Attributes + ---------- + runtime_config: (Dict) + The runtime config for the operator. + operator_config: (Dict) + The operator specification config. + operator_type: str + The type of the operator. + operator_info: OperatorInfo + The detailed information about the operator. + """ + + def __init__( + self, config: Optional[Dict], operator_info: OperatorInfo = None + ) -> None: + """ + Instantiates the operator backend. - def __init__(self, config: Optional[Dict]) -> None: + Parameters + ---------- + config: (Dict) + The configuration file containing operator's specification details and execution section. + operator_info: (OperatorInfo, optional) + The operator's detailed information extracted from the operator.__init__ file. + Will be extracted from the operator type in case if not provided. + """ super().__init__(config=config or {}) self.runtime_config = self.config.get("runtime", {}) @@ -850,6 +878,8 @@ def __init__(self, config: Optional[Dict]) -> None: operator_runtime.PythonRuntime.type: self._run_with_python, } + self.operator_info = operator_info + def _run_with_python(self) -> int: """Runs the operator within a local python environment. @@ -865,15 +895,14 @@ def _run_with_python(self) -> int: ) # run operator - operator_module = f"{OPERATOR_MODULE_PATH}.{self.operator_type}" operator_spec = json.dumps(self.operator_config) - sys.argv = [operator_module, "--spec", operator_spec] + sys.argv = [self.operator_info.name, "--spec", operator_spec] print(f"{'*' * 50} Runtime Config {'*' * 50}") print(runtime.to_yaml()) try: - runpy.run_module(operator_module, run_name="__main__") + runpy.run_module(self.operator_info.name, run_name="__main__") except SystemExit as exception: return exception.code else: @@ -915,7 +944,7 @@ def _run_with_container(self) -> int: image=runtime.spec.image, bind_volumes=bind_volumes, env_vars=env_vars, - command=f"'python3 -m {self.operator_type}'", + command=f"'python3 -m {self.operator_info.name}'", ) def run(self, **kwargs: Dict) -> Dict: @@ -925,15 +954,15 @@ def run(self, **kwargs: Dict) -> Dict: runtime_type = self.runtime_config.get( "type", operator_runtime.OPERATOR_LOCAL_RUNTIME_TYPE.PYTHON ) + if runtime_type not in self._RUNTIME_RUN_MAP: raise RuntimeError( f"Not supported runtime - {runtime_type} for local backend. " f"Supported values: {self._RUNTIME_RUN_MAP.keys()}" ) - # check if operator exists - if self.operator_type not in __operators__: - raise OperatorNotFoundError(self.operator_type) + if not self.operator_info: + self.operator_info = OperatorLoader.from_uri(self.operator_type).load() # run operator with provided runtime exit_code = self._RUNTIME_RUN_MAP.get(runtime_type, lambda: None)(**kwargs) diff --git a/ads/opctl/cli.py b/ads/opctl/cli.py index 744a0dbd6..0623be9e4 100644 --- a/ads/opctl/cli.py +++ b/ads/opctl/cli.py @@ -11,14 +11,16 @@ import fsspec import yaml -import ads.opctl.operator.cli import ads.opctl.conda.cli import ads.opctl.distributed.cli import ads.opctl.model.cli +import ads.opctl.operator.cli import ads.opctl.spark.cli from ads.common import auth as authutil -from ads.common.auth import AuthType, AuthContext +from ads.common.auth import AuthType +from ads.common.object_storage_details import ObjectStorageDetails from ads.opctl.cmds import activate as activate_cmd +from ads.opctl.cmds import apply as apply_cmd from ads.opctl.cmds import cancel as cancel_cmd from ads.opctl.cmds import configure as configure_cmd from ads.opctl.cmds import deactivate as deactivate_cmd @@ -27,17 +29,16 @@ from ads.opctl.cmds import init_vscode as init_vscode_cmd from ads.opctl.cmds import predict as predict_cmd from ads.opctl.cmds import run as run_cmd -from ads.opctl.cmds import apply as apply_cmd from ads.opctl.cmds import run_diagnostics as run_diagnostics_cmd from ads.opctl.cmds import watch as watch_cmd from ads.opctl.config.merger import ConfigMerger -from ads.opctl.config.base import ConfigProcessor from ads.opctl.constants import ( BACKEND_NAME, DEFAULT_MODEL_FOLDER, RESOURCE_TYPE, RUNTIME_TYPE, ) +from ads.opctl.decorator.common import with_auth from ads.opctl.utils import build_image as build_image_cmd from ads.opctl.utils import publish_image as publish_image_cmd from ads.opctl.utils import suppress_traceback @@ -764,11 +765,13 @@ def predict(**kwargs): "-a", help=( "The authentication method to leverage OCI resources. " - "The default value will be taken form the ADS `config.ini` file." + "The default value will be taken from the ADS `config.ini` file. " + "Check the `ads opctl configure --help` command to get details about the `config.ini`." ), type=click.Choice(AuthType.values()), default=None, ) +@with_auth def apply(debug: bool, **kwargs: Dict[str, Any]) -> None: """ Runs the operator with the given specification on the targeted backend. @@ -776,27 +779,20 @@ def apply(debug: bool, **kwargs: Dict[str, Any]) -> None: operator_spec = {} backend = kwargs.pop("backend") - p = ConfigProcessor().step(ConfigMerger, **kwargs) - - with AuthContext( - **{ - key: value - for key, value in { - "auth": kwargs["auth"], - "oci_config_location": p.config["execution"]["oci_config"], - "profile": p.config["execution"]["oci_profile"], - }.items() - if value - } - ) as auth: - with fsspec.open(kwargs["file"], "r", **auth) as f: - operator_spec = suppress_traceback(debug)(yaml.safe_load)(f.read()) - - if backend and backend.lower().endswith((".yaml", ".yml")): - with fsspec.open(backend, "r", **auth) as f: - backend = suppress_traceback(debug)(yaml.safe_load)(f.read()) - - suppress_traceback(debug)(apply_cmd)(operator_spec, backend, **kwargs) + auth = {} + if any(ObjectStorageDetails.is_oci_path(uri) for uri in (kwargs["file"], backend)): + auth = authutil.default_signer() + + with fsspec.open(kwargs["file"], "r", **auth) as f: + operator_spec = suppress_traceback(debug)(yaml.safe_load)(f.read()) + + if backend and backend.lower().endswith((".yaml", ".yml")): + with fsspec.open(backend, "r", **auth) as f: + backend = suppress_traceback(debug)(yaml.safe_load)(f.read()) + + suppress_traceback(debug)(apply_cmd)( + config=operator_spec, backend=backend, **kwargs + ) commands.add_command(ads.opctl.conda.cli.commands) diff --git a/ads/opctl/cmds.py b/ads/opctl/cmds.py index 2bd3b7618..dc30d7081 100644 --- a/ads/opctl/cmds.py +++ b/ads/opctl/cmds.py @@ -518,6 +518,12 @@ def configure() -> None: if "CONDA" not in config_parser: config_parser["CONDA"] = {} + oci_auth = click.prompt( + text="Default OCI authentication type:", + type=click.Choice(AuthType.values()), + default=None, + ) + oci_config_path = click.prompt( "OCI config path:", default=config_parser["OCI"].get("oci_config", DEFAULT_OCI_CONFIG_FILE), @@ -533,6 +539,7 @@ def configure() -> None: config_parser["OCI"] = { "oci_config": oci_config_path, "oci_profile": oci_profile, + "auth": oci_auth, } conda_pack_path = click.prompt( "Conda pack install folder:", @@ -859,14 +866,22 @@ def apply(config: Dict, backend: Union[Dict, str] = None, **kwargs) -> None: p = ConfigProcessor(config).step(ConfigMerger, **kwargs) if p.config.get("kind", "").lower() == "operator": - from ads.opctl.operator import OperatorNotFoundError, __operators__ from ads.opctl.operator import cmd as operator_cmd - from ads.opctl.operator.common.utils import OperatorInfo, _operator_info + from ads.opctl.operator.common.operator_loader import ( + OperatorLoader, + OperatorInfo, + ) operator_type = p.config.get("type", "").lower() - if not (operator_type and operator_type in __operators__): - raise OperatorNotFoundError(operator_type or "unknown") + # validation + if not operator_type: + raise ValueError( + f"The `type` attribute must be specified in the operator's config." + ) + + # extracting details about the operator + operator_info: OperatorInfo = OperatorLoader.from_uri(uri=operator_type).load() supported_backends = ( BACKEND_NAME.JOB.value, @@ -927,13 +942,6 @@ def apply(config: Dict, backend: Union[Dict, str] = None, **kwargs) -> None: # generate backend specification in case if it is not provided if not backend.get("spec"): - # get operator physical location - operator_path = os.path.join( - os.path.dirname(__file__), "operator", "lowcode", operator_type - ) - # load operator info - operator_info: OperatorInfo = _operator_info(path=operator_path) - backends = operator_cmd._init_backend_config( operator_info=operator_info, **kwargs ) @@ -956,10 +964,10 @@ def apply(config: Dict, backend: Union[Dict, str] = None, **kwargs) -> None: "The dry run option is not supported for " "the local backend and will be ignored." ) - LocalOperatorBackend(config=p.config).run() + LocalOperatorBackend(config=p.config, operator_info=operator_info).run() elif p_backend.config["execution"]["backend"] == BACKEND_NAME.JOB.value: - MLJobOperatorBackend(config=p.config).run() + MLJobOperatorBackend(config=p.config, operator_info=operator_info).run() elif p_backend.config["execution"]["backend"] == BACKEND_NAME.DATAFLOW.value: - DataFlowOperatorBackend(config=p.config).run() + DataFlowOperatorBackend(config=p.config, operator_info=operator_info).run() else: - raise RuntimeError("Not supported operator.") + raise RuntimeError("Not supported kind of workload.") diff --git a/ads/opctl/config/merger.py b/ads/opctl/config/merger.py index 1eb3bfa87..35ba3a944 100644 --- a/ads/opctl/config/merger.py +++ b/ads/opctl/config/merger.py @@ -117,9 +117,13 @@ def _fill_config_with_defaults(self, ads_config_path: str) -> None: # set default auth if not self.config["execution"].get("auth", None): if is_in_notebook_session(): - self.config["execution"]["auth"] = AuthType.RESOURCE_PRINCIPAL + self.config["execution"]["auth"] = ( + exec_config.get("auth") or AuthType.RESOURCE_PRINCIPAL + ) else: - self.config["execution"]["auth"] = AuthType.API_KEY + self.config["execution"]["auth"] = ( + exec_config.get("auth") or AuthType.API_KEY + ) # determine profile if self.config["execution"]["auth"] == AuthType.RESOURCE_PRINCIPAL: profile = self.config["execution"]["auth"].upper() @@ -173,6 +177,7 @@ def _get_config_from_config_ini(ads_config_folder: str) -> Dict: return { "oci_config": parser["OCI"].get("oci_config"), "oci_profile": parser["OCI"].get("oci_profile"), + "auth": parser["OCI"].get("auth"), "conda_pack_folder": parser["CONDA"].get("conda_pack_folder"), "conda_pack_os_prefix": parser["CONDA"].get("conda_pack_os_prefix"), } @@ -197,6 +202,8 @@ def _get_service_config(self, oci_profile: str, ads_config_folder: str) -> Dict: parser = read_from_ini(os.path.join(ads_config_folder, config_file)) if oci_profile in parser: return parser[oci_profile] + if DEFAULT_PROFILE in parser: + return parser[DEFAULT_PROFILE] else: logger.info( f"{os.path.join(ads_config_folder, config_file)} does not exist. No config loaded." diff --git a/ads/opctl/decorator/common.py b/ads/opctl/decorator/common.py index a6668fa34..dd5a3bb02 100644 --- a/ads/opctl/decorator/common.py +++ b/ads/opctl/decorator/common.py @@ -7,13 +7,10 @@ from functools import wraps from typing import Callable, Dict, List -from ads.config import ( - JOB_RUN_OCID, - NB_SESSION_OCID, - PIPELINE_RUN_OCID, - DATAFLOW_RUN_OCID, - MD_OCID, -) +from ads.common.auth import AuthContext +from ads.opctl import logger +from ads.opctl.config.base import ConfigProcessor +from ads.opctl.config.merger import ConfigMerger RUN_ID_FIELD = "run_id" @@ -44,7 +41,7 @@ def wrapper(*args: List, **kwargs: Dict) -> Dict: if result and isinstance(result, Dict) and RUN_ID_FIELD in result: msg_header = ( f"{'*' * 40} To monitor the progress of the task, " - "execute the following command {'*' * 40}" + f"execute the following command {'*' * 40}" ) print(msg_header) print(f"ads opctl watch {result[RUN_ID_FIELD]}") @@ -59,16 +56,12 @@ def validate_environment(func: callable) -> Callable: @wraps(func) def wrapper(*args: List, **kwargs: Dict) -> Dict: - if any( - value - for value in ( - JOB_RUN_OCID, - NB_SESSION_OCID, - PIPELINE_RUN_OCID, - DATAFLOW_RUN_OCID, - MD_OCID, - ) - ): + try: + import docker + + docker.from_env().version() + except Exception as ex: + logger.debug(ex) raise OpctlEnvironmentError() return func(*args, **kwargs) @@ -85,3 +78,26 @@ def _add_options(func): return func return _add_options + + +def with_auth(func: Callable) -> Callable: + """The decorator to add AuthContext to the method.""" + + @wraps(func) + def wrapper(*args, **kwargs) -> Dict: + p = ConfigProcessor().step(ConfigMerger, **kwargs) + + with AuthContext( + **{ + key: value + for key, value in { + "auth": p.config["execution"]["auth"], + "oci_config_location": p.config["execution"]["oci_config"], + "profile": p.config["execution"]["oci_profile"], + }.items() + if value + } + ): + return func(*args, **kwargs) + + return wrapper diff --git a/ads/opctl/docker/merge_dependencies.py b/ads/opctl/docker/merge_dependencies.py deleted file mode 100644 index e275e63ed..000000000 --- a/ads/opctl/docker/merge_dependencies.py +++ /dev/null @@ -1,52 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8; -*- - -# Copyright (c) 2021, 2022 Oracle and/or its affiliates. -# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ - -import yaml -import os -import click - - -@click.command() -@click.argument("output_path") -@click.option( - "--source-folder", - "-s", - help="source folder to search for environment yaml", - required=False, - default=None, -) -def merge(output_path, source_folder): - merged = _merge(source_folder) - with open(output_path, "w") as f: - yaml.safe_dump(merged, f) - - -def _merge(source_folder=None): - conda_dependencies = set([]) - pip_dependencies = set([]) - if not source_folder: - source_folder = os.path.join("operators") - for dirpath, dirnames, filenames in os.walk(source_folder): - for fname in filenames: - if fname == "environment.yaml": - env_yaml = os.path.join(dirpath, fname) - print(env_yaml) - with open(env_yaml, "r") as f: - dependencies = yaml.safe_load(f.read())["dependencies"] - for dep in dependencies: - if isinstance(dep, dict) and "pip" in dep: - pip_dependencies.update(dep["pip"]) - else: - conda_dependencies.add(dep) - conda_dependencies.add("pip") - merged_dependencies = { - "dependencies": list(conda_dependencies) + [{"pip": list(pip_dependencies)}] - } - return merged_dependencies - - -if __name__ == "__main__": - merge() diff --git a/ads/opctl/docker/operator/Dockerfile b/ads/opctl/docker/operator/Dockerfile index a70c0dd61..905b101a0 100644 --- a/ads/opctl/docker/operator/Dockerfile +++ b/ads/opctl/docker/operator/Dockerfile @@ -3,7 +3,13 @@ FROM ghcr.io/oracle/oraclelinux8-instantclient:21 as base -RUN rm -rf /var/cache/yum/* && yum clean all && yum install -y gcc make patch vim iproute net-tools git && rm -rf /var/cache/yum/* +RUN \ + rm -rf /var/cache/yum/* && \ + yum install -y gcc make patch vim iproute net-tools git && \ + yum clean all && \ + rm -rf /var/cache/yum/* + +########################### CONDA INSTALLATION ######################################## RUN curl -L https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh >> miniconda.sh RUN bash ./miniconda.sh -b -p /miniconda; rm ./miniconda.sh; ENV PATH="/miniconda/bin:$PATH" @@ -21,17 +27,12 @@ ENV PATH="/miniconda/envs/${CONDA_ENV_NAME}}/bin:$PATH" RUN conda init bash && source ~/.bashrc && conda activate ${CONDA_ENV_NAME} +########################### SETUP WORKDIR ######################################## RUN mkdir ${OPERATOR_DIR} -# COPY ./artifacts/* ${OPERATOR_DIR}/ ENV OPERATOR_DIR=${OPERATOR_DIR} ENV CONDA_ENV_NAME=${CONDA_ENV_NAME} -# RUN if [ -f ${OPERATOR_DIR}/oracle_ads*.whl ]; then \ -# local_whl=$(find ${OPERATOR_DIR} -name "*.whl" -exec basename {} \; | head -n 1 ); \ -# source ~/.bashrc && conda activate ${CONDA_ENV_NAME} && pip install ${OPERATOR_DIR}/$local_whl; \ -# fi - WORKDIR ${OPERATOR_DIR} RUN echo "conda activate $CONDA_ENV_NAME">>/root/.bashrc diff --git a/ads/opctl/docker/operator/Dockerfile.gpu b/ads/opctl/docker/operator/Dockerfile.gpu index 01be15ba2..f74f924dd 100644 --- a/ads/opctl/docker/operator/Dockerfile.gpu +++ b/ads/opctl/docker/operator/Dockerfile.gpu @@ -3,7 +3,57 @@ FROM ghcr.io/oracle/oraclelinux8-instantclient:21 as base -RUN rm -rf /var/cache/yum/* && yum clean all && yum install -y gcc make patch vim iproute net-tools git && rm -rf /var/cache/yum/* +RUN \ + rm -rf /var/cache/yum/* && \ + yum install -y gcc make patch vim iproute net-tools git && \ + yum clean all && \ + rm -rf /var/cache/yum/* + +########################### CUDA INSTALLATION ######################################## +#Reference: https://gitlab.com/nvidia/container-images/cuda/-/blob/master/dist/10.1/centos7/runtime/cudnn7/Dockerfile +#Reference: https://gitlab.com/nvidia/container-images/cuda/-/blob/master/dist/10.1/centos7/runtime/Dockerfile +#Reference: https://gitlab.com/nvidia/container-images/cuda/-/blob/master/dist/10.1/centos7/base/Dockerfile + +RUN NVIDIA_GPGKEY_SUM=d0664fbbdb8c32356d45de36c5984617217b2d0bef41b93ccecd326ba3b80c87 && \ +curl -fsSL https://developer.download.nvidia.com/compute/cuda/repos/rhel7/x86_64/D42D0685.pub | sed '/^Version/d' > /etc/pki/rpm-gpg/RPM-GPG-KEY-NVIDIA && \ + echo "$NVIDIA_GPGKEY_SUM /etc/pki/rpm-gpg/RPM-GPG-KEY-NVIDIA" | sha256sum -c --strict - + +COPY cuda.repo /etc/yum.repos.d/cuda.repo + +ENV CUDA_VERSION 10.1.243 + +ENV CUDA_PKG_VERSION 10-1-$CUDA_VERSION-1 +# For libraries in the cuda-compat-* package: https://docs.nvidia.com/cuda/eula/index.html#attachment-a +RUN yum install -y \ +cuda-cudart-$CUDA_PKG_VERSION \ +cuda-compat-10-1 \ +&& \ + ln -s cuda-10.1 /usr/local/cuda && \ + rm -rf /var/cache/yum/* + +# nvidia-docker 1.0 +RUN echo "/usr/local/nvidia/lib" >> /etc/ld.so.conf.d/nvidia.conf && \ + echo "/usr/local/nvidia/lib64" >> /etc/ld.so.conf.d/nvidia.conf + +ENV PATH /usr/local/nvidia/bin:/usr/local/cuda/bin:${PATH} +ENV LD_LIBRARY_PATH /lib64:/usr/local/nvidia/lib:/usr/local/nvidia/lib64 + +# nvidia-container-runtime +ENV NVIDIA_VISIBLE_DEVICES all +ENV NVIDIA_DRIVER_CAPABILITIES compute,utility +ENV NVIDIA_REQUIRE_CUDA "cuda>=10.1 brand=tesla,driver>=384,driver<385 brand=tesla,driver>=396,driver<397 brand=tesla,driver>=410,driver<411" + +ENV CUDNN_VERSION 7.6.5.32 +LABEL com.nvidia.cudnn.version="${CUDNN_VERSION}" + +RUN CUDNN_DOWNLOAD_SUM=7eaec8039a2c30ab0bc758d303588767693def6bf49b22485a2c00bf2e136cb3 && \ + curl -fsSL http://developer.download.nvidia.com/compute/redist/cudnn/v7.6.5/cudnn-10.1-linux-x64-v7.6.5.32.tgz -O && \ + echo "$CUDNN_DOWNLOAD_SUM cudnn-10.1-linux-x64-v7.6.5.32.tgz" | sha256sum -c - && \ + tar --no-same-owner -xzf cudnn-10.1-linux-x64-v7.6.5.32.tgz -C /usr/local --wildcards 'cuda/lib64/libcudnn.so.*' && \ + rm cudnn-10.1-linux-x64-v7.6.5.32.tgz && \ + ldconfig + +########################### CONDA INSTALLATION ######################################## RUN curl -L https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh >> miniconda.sh RUN bash ./miniconda.sh -b -p /miniconda; rm ./miniconda.sh; ENV PATH="/miniconda/bin:$PATH" @@ -21,17 +71,12 @@ ENV PATH="/miniconda/envs/${CONDA_ENV_NAME}}/bin:$PATH" RUN conda init bash && source ~/.bashrc && conda activate ${CONDA_ENV_NAME} +########################### SETUP WORKDIR ######################################## RUN mkdir ${OPERATOR_DIR} -COPY ./artifacts/* ${OPERATOR_DIR}/ ENV OPERATOR_DIR=${OPERATOR_DIR} ENV CONDA_ENV_NAME=${CONDA_ENV_NAME} -RUN if [ -f ${OPERATOR_DIR}/oracle_ads*.whl ]; then \ - local_whl=$(find ${OPERATOR_DIR} -name "*.whl" -exec basename {} \; | head -n 1 ); \ - source ~/.bashrc && conda activate ${CONDA_ENV_NAME} && pip install ${OPERATOR_DIR}/$local_whl; \ - fi - WORKDIR ${OPERATOR_DIR} RUN echo "conda activate $CONDA_ENV_NAME">>/root/.bashrc diff --git a/ads/opctl/docker/operator/cuda.repo b/ads/opctl/docker/operator/cuda.repo new file mode 100644 index 000000000..358420e3a --- /dev/null +++ b/ads/opctl/docker/operator/cuda.repo @@ -0,0 +1,6 @@ +[cuda] +name=cuda +baseurl=http://developer.download.nvidia.com/compute/cuda/repos/rhel7/x86_64 +enabled=1 +gpgcheck=1 +gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-NVIDIA diff --git a/ads/opctl/operator/cli.py b/ads/opctl/operator/cli.py index 537d6b934..67a3b3ae1 100644 --- a/ads/opctl/operator/cli.py +++ b/ads/opctl/operator/cli.py @@ -10,10 +10,8 @@ import fsspec import yaml -from ads.common.auth import AuthContext, AuthType -from ads.opctl.decorator.common import click_options -from ads.opctl.config.base import ConfigProcessor -from ads.opctl.config.merger import ConfigMerger +from ads.common.auth import AuthType +from ads.opctl.decorator.common import click_options, with_auth from ads.opctl.utils import suppress_traceback from .__init__ import __operators__ @@ -35,8 +33,9 @@ click.option( "--ads-config", help=( - "The folder where the ADS opctl config located. " - "The default location is: `~/.ads_ops` folder." + "The folder where the ADS `config.ini` located. " + "The default location is: `~/.ads_ops` folder. " + "Check the `ads opctl configure --help` command to get details about the `config.ini`." ), required=False, default=None, @@ -48,13 +47,27 @@ "--name", "-n", help=( - "The name of the service operator. " - f"Available operators: `{'`, `'.join(__operators__)}`." + "The name of the operator. " + f"Available service operators: `{'`, `'.join(__operators__)}`." ), required=True, ), ) +AUTH_TYPE_OPTION = ( + click.option( + "--auth", + "-a", + help=( + "The authentication method to leverage OCI resources. " + "The default value will be taken from the ADS `config.ini` file. " + "Check the `ads opctl configure --help` command to get details about the `config.ini`." + ), + type=click.Choice(AuthType.values()), + default=None, + ), +) + @click.group("operator") def commands(): @@ -70,7 +83,10 @@ def list(debug: bool, **kwargs: Dict[str, Any]) -> None: @commands.command() -@click_options(DEBUG_OPTION + OPERATOR_NAME_OPTION) +@click_options( + DEBUG_OPTION + OPERATOR_NAME_OPTION + ADS_CONFIG_OPTION + AUTH_TYPE_OPTION +) +@with_auth def info(debug: bool, **kwargs: Dict[str, Any]) -> None: """Prints the detailed information about the particular operator.""" suppress_traceback(debug)(cmd_info)(**kwargs) @@ -91,6 +107,7 @@ def info(debug: bool, **kwargs: Dict[str, Any]) -> None: is_flag=True, default=False, ) +@with_auth def init(debug: bool, **kwargs: Dict[str, Any]) -> None: """Generates starter YAML configs for the operator.""" suppress_traceback(debug)(cmd_init)(**kwargs) @@ -113,6 +130,7 @@ def init(debug: bool, **kwargs: Dict[str, Any]) -> None: is_flag=True, default=False, ) +@with_auth def build_image(debug: bool, **kwargs: Dict[str, Any]) -> None: """Creates a new image for the specified operator.""" suppress_traceback(debug)(cmd_build_image)(**kwargs) @@ -127,6 +145,7 @@ def build_image(debug: bool, **kwargs: Dict[str, Any]) -> None: required=False, default=None, ) +@with_auth def publish_image(debug, **kwargs): """Publishes an operator's image to the container registry.""" suppress_traceback(debug)(cmd_publish_image)(**kwargs) @@ -147,44 +166,22 @@ def publish_image(debug, **kwargs): required=False, default=None, ) +@with_auth def create(debug: bool, **kwargs: Dict[str, Any]) -> None: """Creates new operator.""" suppress_traceback(debug)(cmd_create)(**kwargs) @commands.command() -@click_options(DEBUG_OPTION + OPERATOR_NAME_OPTION + ADS_CONFIG_OPTION) +@click_options(DEBUG_OPTION + ADS_CONFIG_OPTION + AUTH_TYPE_OPTION) @click.option( "--file", "-f", help="The path to resource YAML file.", required=True, default=None ) -@click.option( - "--auth", - "-a", - help=( - "The authentication method to leverage OCI resources. " - "The default value will be taken from the ADS `config.ini` file." - ), - type=click.Choice(AuthType.values()), - default=None, -) +@with_auth def verify(debug: bool, **kwargs: Dict[str, Any]) -> None: """Verifies the operator config.""" - - p = ConfigProcessor().step(ConfigMerger, **kwargs) - - with AuthContext( - **{ - key: value - for key, value in { - "auth": kwargs["auth"], - "oci_config_location": p.config["execution"]["oci_config"], - "profile": p.config["execution"]["oci_profile"], - }.items() - if value - } - ) as auth: - with fsspec.open(kwargs["file"], "r", **auth) as f: - operator_spec = suppress_traceback(debug)(yaml.safe_load)(f.read()) + with fsspec.open(kwargs["file"], "r", **(kwargs.get("auth", {}) or {})) as f: + operator_spec = suppress_traceback(debug)(yaml.safe_load)(f.read()) suppress_traceback(debug)(cmd_verify)(operator_spec, **kwargs) @@ -208,13 +205,16 @@ def verify(debug: bool, **kwargs: Dict[str, Any]) -> None: is_flag=True, default=False, ) +@with_auth def build_conda(debug: bool, **kwargs: Dict[str, Any]) -> None: """Creates a new conda environment for the specified operator.""" suppress_traceback(debug)(cmd_build_conda)(**kwargs) @commands.command() -@click_options(DEBUG_OPTION + OPERATOR_NAME_OPTION + ADS_CONFIG_OPTION) +@click_options( + DEBUG_OPTION + OPERATOR_NAME_OPTION + ADS_CONFIG_OPTION + AUTH_TYPE_OPTION +) @click.option( "--conda-pack-folder", help=( @@ -232,6 +232,7 @@ def build_conda(debug: bool, **kwargs: Dict[str, Any]) -> None: is_flag=True, default=False, ) +@with_auth def publish_conda(debug: bool, **kwargs: Dict[str, Any]) -> None: """Publishes an operator's conda environment to the Object Storage bucket.""" suppress_traceback(debug)(cmd_publish_conda)(**kwargs) diff --git a/ads/opctl/operator/cmd.py b/ads/opctl/operator/cmd.py index e522a1926..70f68912e 100644 --- a/ads/opctl/operator/cmd.py +++ b/ads/opctl/operator/cmd.py @@ -4,11 +4,11 @@ # Copyright (c) 2023 Oracle and/or its affiliates. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ -import importlib -import inspect import os import re +import runpy import shutil +import sys import tempfile from typing import Any, Dict, Union @@ -30,31 +30,24 @@ from ads.opctl.constants import ( BACKEND_NAME, DEFAULT_ADS_CONFIG_FOLDER, - OPERATOR_MODULE_PATH, RESOURCE_TYPE, RUNTIME_TYPE, ) +from ads.opctl.decorator.common import validate_environment from ads.opctl.operator.common.const import ( - PACK_TYPE, OPERATOR_BASE_DOCKER_FILE, OPERATOR_BASE_DOCKER_GPU_FILE, OPERATOR_BASE_GPU_IMAGE, OPERATOR_BASE_IMAGE, + PACK_TYPE, ) -from ads.opctl.operator.common.utils import OperatorInfo, _operator_info -from ads.opctl.decorator.common import validate_environment +from ads.opctl.operator.common.operator_loader import OperatorInfo, OperatorLoader from ads.opctl.utils import publish_image as publish_image_cmd from .__init__ import __operators__ -from .common.errors import ( - OperatorCondaNotFoundError, - OperatorImageNotFoundError, - OperatorNotFoundError, -) -from .common.utils import ( - _build_image, - _operator_info_list, -) +from .common.errors import OperatorCondaNotFoundError, OperatorImageNotFoundError +from .common.operator_loader import _operator_info_list +from .common.utils import _build_image def list() -> None: @@ -102,10 +95,7 @@ def info( from rich.markdown import Markdown console = Console() - - operator_info = _operator_info(name=name) - if not operator_info: - raise OperatorNotFoundError(name) + operator_info = OperatorLoader.from_uri(uri=name).load() console.print( Markdown( @@ -262,24 +252,16 @@ def init( Raises ------ ValueError - If `operator` not specified. + If `name` not specified. OperatorNotFoundError If `operator` not found. """ - # validation if not name: - raise ValueError( - f"The `name` attribute must be specified. Supported values: {__operators__}" - ) + raise ValueError(f"The `name` attribute must be specified.") - if name not in __operators__: - raise OperatorNotFoundError(name) - - # generating operator specification - operator_cmd_module = importlib.import_module(f"{OPERATOR_MODULE_PATH}.{name}.cmd") - importlib.reload(operator_cmd_module) - operator_specification_template = getattr(operator_cmd_module, "init")(**kwargs) + # load operator info + operator_info: OperatorInfo = OperatorLoader.from_uri(uri=name).load() # create TMP folder if one is not provided by user if output: @@ -292,20 +274,30 @@ def init( overwrite = True output = os.path.join(tempfile.TemporaryDirectory().name, "") - # get operator physical location - operator_path = os.path.join(os.path.dirname(__file__), "lowcode", name) - - # load operator info - operator_info: OperatorInfo = _operator_info(path=operator_path) - - # save operator spec YAML - with fsspec.open(os.path.join(output, f"{name}.yaml"), mode="w") as f: - f.write(operator_specification_template) + # generating operator specification + try: + operator_cmd_module = runpy.run_module( + f"{operator_info.name}.cmd", run_name="init" + ) + operator_specification_template = operator_cmd_module.get("init", lambda: "")( + **{**kwargs, **{"type": name}} + ) + if operator_specification_template: + with fsspec.open( + os.path.join(output, f"{operator_info.name}.yaml"), mode="w" + ) as f: + f.write(operator_specification_template) + except Exception as ex: + logger.info( + "The operator's specification was not generated " + f"because it is not supported by the `{operator_info.name}` operator." + ) + logger.debug(ex) # copy README and original schema files into a destination folder for src_file in ("README.md", "schema.yaml", "environment.yaml"): ads_common_utils.copy_file( - uri_src=os.path.join(operator_path, src_file), + uri_src=os.path.join(operator_info.path, src_file), uri_dst=output, force_overwrite=overwrite, ) @@ -328,15 +320,12 @@ def init( @validate_environment def build_image( name: str = None, - source_folder: str = None, gpu: bool = None, rebuild_base_image: bool = None, **kwargs: Dict[str, Any], ) -> None: """ Builds the image for the particular operator. - For the service operators, the name needs to be provided. - For the custom operators, the path (source_folder) to the operator needs to be provided. Parameters ---------- @@ -344,9 +333,6 @@ def build_image( Name of the operator to build the image. gpu: (bool, optional) Whether to build a GPU-enabled Docker image. - source_folder: (str, optional) - The folder containing the operator source code. - Only relevant for custom operators. rebuild_base_image: (optional, bool) If rebuilding both base and operator's images required. kwargs: (Dict, optional). @@ -355,45 +341,17 @@ def build_image( Raises ------ ValueError - If neither `name` nor `source_folder` were provided. - OperatorNotFoundError - If the service operator not found. - FileNotFoundError - If source_folder not exists. + If `name` not specified. """ import docker - operator_image_name = "" - operator_name = name - - if name: - if name not in __operators__: - raise OperatorNotFoundError(name) - source_folder = os.path.dirname( - inspect.getfile(importlib.import_module(f"{OPERATOR_MODULE_PATH}.{name}")) - ) - operator_image_name = operator_image_name or name - logger.info(f"Building Docker image for the `{name}` service operator.") - elif source_folder: - source_folder = os.path.abspath(os.path.expanduser(source_folder)) - if not os.path.isdir(source_folder): - raise FileNotFoundError(f"The path {source_folder} does not exist") - - operator_name = os.path.basename(source_folder.rstrip("/")) - operator_image_name = operator_image_name or operator_name - logger.info( - "Building Docker image for custom operator using source folder: " - f"`{source_folder}`." - ) - else: - raise ValueError( - "No operator name or source folder specified." - "Please provide relevant options." - ) + # validation + if not name: + raise ValueError(f"The `name` attribute must be specified.") - # get operator details stored in operator's init file. - operator_info: OperatorInfo = _operator_info(path=source_folder) - tag = operator_info.version + # load operator info + operator_info: OperatorInfo = OperatorLoader.from_uri(uri=name).load() + logger.info(f"Building Docker image for the `{operator_info.name}` operator.") # checks if GPU base image needs to be used. gpu = operator_info.gpu or gpu @@ -405,9 +363,9 @@ def build_image( client = docker.from_env() client.api.inspect_image(base_image_name) if rebuild_base_image: - raise docker.errors.ImageNotFound("Rebuilding base image requested.") + raise docker.errors.ImageNotFound() except docker.errors.ImageNotFound: - logger.info(f"Building base operator image {base_image_name}") + logger.info(f"Building the base operator's image `{base_image_name}`.") base_docker_file = os.path.join( cur_dir, @@ -428,16 +386,16 @@ def build_image( ) with tempfile.TemporaryDirectory() as td: - shutil.copytree(source_folder, os.path.join(td, "operator")) + shutil.copytree(operator_info.path, os.path.join(td, "operator")) run_command = [ f"FROM {base_image_name}", - f"COPY ./operator/ $OPERATOR_DIR/{operator_name}/", + f"COPY ./operator/ $OPERATOR_DIR/{operator_info.name}/", "RUN yum install -y libX11", ] if os.path.exists(os.path.join(td, "operator", "environment.yaml")): run_command.append( - f"RUN mamba env update -f $OPERATOR_DIR/{operator_name}/environment.yaml " + f"RUN mamba env update -f $OPERATOR_DIR/{operator_info.name}/environment.yaml " "--name $CONDA_ENV_NAME && conda clean -afy" ) @@ -447,11 +405,15 @@ def build_image( f.writelines("\n".join(run_command)) result_image_name = _build_image( - dockerfile=custom_docker_file, image_name=operator_image_name, tag=tag + dockerfile=custom_docker_file, + image_name=operator_info.name, + tag=operator_info.version, ) logger.info( - f"The operator image `{result_image_name}` has been successfully built." + f"The operator image `{result_image_name}` has been successfully built. " + "To publish the image to OCI Container Registry run the " + f"`ads opctl operator publish-image -n {result_image_name}` command" ) @@ -469,7 +431,7 @@ def publish_image( Parameters ---------- name: (str, optional) - Operator's name for publishing the image. + The operator or image name for publishing to container registry. registry: str Container registry. ads_config: (str, optional) @@ -480,33 +442,32 @@ def publish_image( Raises ------ ValueError - When operator's name is not provided. - OperatorNotFoundError - If the service operator not found. + If `name` not specified. OperatorImageNotFoundError If the operator's image doesn't exist. """ import docker + # validation if not name: - raise ValueError( - f"The `name` attribute must be specified. Supported values: {__operators__}" - ) - - if name not in __operators__: - raise OperatorNotFoundError(name) + raise ValueError(f"The `name` attribute must be specified.") - # get operator details stored in operator's init file. - operator_info: OperatorInfo = _operator_info(name=name) + client = docker.from_env() + # Check if image with given name exists + image = name try: - image = f"{operator_info.name}:{operator_info.version or 'undefined'}" - # check if the operator's image exists - client = docker.from_env() client.api.inspect_image(image) except docker.errors.ImageNotFound: - raise OperatorImageNotFoundError(operator_info.name) + # load operator info + operator_info: OperatorInfo = OperatorLoader.from_uri(uri=name).load() + try: + image = f"{operator_info.name}:{operator_info.version or 'undefined'}" + # check if the operator's image exists + client.api.inspect_image(image) + except docker.errors.ImageNotFound: + raise OperatorImageNotFoundError(operator_info.name) # extract registry from the ADS config. if not registry: @@ -537,20 +498,32 @@ def verify( kwargs: (Dict, optional). Additional key value arguments. """ - operator_type = config.get("type", "unknown") + operator_type = config.get("type") - if operator_type not in __operators__: - raise OperatorNotFoundError(operator_type) + # validation + if not operator_type: + raise ValueError(f"The `type` attribute must be specified.") - operator_module = importlib.import_module( - f"{OPERATOR_MODULE_PATH}.{operator_type}.operator" - ) - operator_module.verify(config, **kwargs) + # load operator info + operator_info: OperatorInfo = OperatorLoader.from_uri(uri=operator_type).load() + + # validate operator + try: + operator_module = runpy.run_module( + f"{operator_info.name}.operator", + run_name="verify", + ) + operator_module.get("verify")(config, **kwargs) + except Exception as ex: + print(ex) + logger.debug(ex) + raise ValueError( + f"The validator is not implemented for the `{operator_info.name}` operator." + ) def build_conda( name: str = None, - source_folder: str = None, conda_pack_folder: str = None, overwrite: bool = False, ads_config: Union[str, None] = None, @@ -565,9 +538,6 @@ def build_conda( ---------- name: str The name of the operator to build conda environment for.. - source_folder: (str, optional) - The folder containing the operator source code. - Only relevant for custom operators. conda_pack_folder: str The destination folder to save the conda environment. By default will be used the path specified in the config file generated @@ -578,43 +548,30 @@ def build_conda( The folder where the ads opctl config located. kwargs: (Dict, optional). Additional key value arguments. + + Returns + ------- + None + + Raises + ------ + ValueError + If `name` not specified. """ - operator_conda_name = name - operator_name = name - - if name: - if name not in __operators__: - raise OperatorNotFoundError(name) - source_folder = os.path.dirname( - inspect.getfile(importlib.import_module(f"{OPERATOR_MODULE_PATH}.{name}")) - ) - operator_conda_name = operator_conda_name or name - logger.info(f"Building conda environment for the `{name}` operator.") - elif source_folder: - source_folder = os.path.abspath(os.path.expanduser(source_folder)) - if not os.path.isdir(source_folder): - raise FileNotFoundError(f"The path {source_folder} does not exist") - - operator_name = os.path.basename(source_folder.rstrip("/")) - operator_conda_name = operator_conda_name or operator_name - logger.info( - "Building conda environment for custom operator using source folder: " - f"`{source_folder}`." - ) - else: - raise ValueError( - "No operator name or source folder specified." - "Please provide relevant options." - ) - # get operator details stored in operator's __init__.py file. - operator_info: OperatorInfo = _operator_info(path=source_folder) + # validation + if not name: + raise ValueError(f"The `name` attribute must be specified.") + + # load operator info + operator_info: OperatorInfo = OperatorLoader.from_uri(uri=name).load() + logger.info(f"Building conda environment for the `{operator_info.name}` operator.") # invoke the conda create command conda_create( - name=name, + name=operator_info.name, version=re.sub("[^0-9.]", "", operator_info.version), - environment_file=os.path.join(source_folder, "environment.yaml"), + environment_file=os.path.join(operator_info.path, "environment.yaml"), conda_pack_folder=conda_pack_folder, gpu=operator_info.gpu, overwrite=overwrite, @@ -651,22 +608,18 @@ def publish_conda( Raises ------ ValueError - When operator's name is not provided. - OperatorNotFoundError - If the service operator not found. + If `name` not specified. OperatorCondaNotFoundError - If the operator's image doesn't exist. + If the operator's conda environment not exists. """ + + # validation if not name: - raise ValueError( - f"The `name` attribute must be specified. Supported values: {__operators__}" - ) + raise ValueError(f"The `name` attribute must be specified.") - if name not in __operators__: - raise OperatorNotFoundError(name) + # load operator info + operator_info: OperatorInfo = OperatorLoader.from_uri(uri=name).load() - # get operator details stored in operator's init file. - operator_info: OperatorInfo = _operator_info(name=name) version = re.sub("[^0-9.]", "", operator_info.version) slug = f"{operator_info.name}_v{version}".replace(" ", "").replace(".", "_").lower() diff --git a/ads/opctl/operator/common/errors.py b/ads/opctl/operator/common/errors.py index 5ab8bf306..bcc1af421 100644 --- a/ads/opctl/operator/common/errors.py +++ b/ads/opctl/operator/common/errors.py @@ -10,8 +10,8 @@ class OperatorNotFoundError(Exception): def __init__(self, operator: str): super().__init__( - f"The provided operator: `{operator}` " - f"is not found. Available service operators: `{'`, `'.join(__operators__)}`" + f"The provided operator: `{operator}` is not found. You can pick up one from the " + f"registered service operators: `{'`, `'.join(__operators__)}`." ) @@ -20,8 +20,6 @@ def __init__(self, operator: str): super().__init__( f"The Docker image for the operator: `{operator}` nas not been built yet. " "Please ensure that you build the image before attempting to publish it. " - f"Use the `ads opctl operator build-image --name {operator}` command " - "to build the image." ) @@ -30,6 +28,4 @@ def __init__(self, operator: str): super().__init__( f"The Conda environment for the operator: `{operator}` nas not been built yet. " "Please ensure that you build the conda environment before attempting to publish it. " - f"Use the `ads opctl operator build-conda --name {operator}` " - "command to build the conda environment." ) diff --git a/ads/opctl/operator/common/operator_loader.py b/ads/opctl/operator/common/operator_loader.py new file mode 100644 index 000000000..47dfda34c --- /dev/null +++ b/ads/opctl/operator/common/operator_loader.py @@ -0,0 +1,729 @@ +#!/usr/bin/env python +# -*- coding: utf-8; -*- + +# Copyright (c) 2023 Oracle and/or its affiliates. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ + +import glob +import importlib +import inspect +import os +import re +import shutil +import sys +import tempfile +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Any, Dict, List +from urllib.parse import urlparse + +from ads.common import auth as authutil +from ads.common.decorator.runtime_dependency import runtime_dependency +from ads.common.utils import copy_from_uri +from ads.opctl import logger +from ads.opctl.constants import OPERATOR_MODULE_PATH +from ads.opctl.operator import __operators__ + +from .const import ARCH_TYPE, PACK_TYPE +from .errors import OperatorNotFoundError + +LOCAL_SCHEME = "local" +MAIN_BRANCH = "main" + + +@dataclass +class OperatorInfo: + """Class representing brief information about the operator. + + Attributes + ---------- + name (str) + The name of the operator. + gpu (bool) + Whether the operator supports GPU. + short_description (str) + A short description of the operator. + description (str) + A detailed description of the operator. + version (str) + The version of the operator. + conda (str) + The conda environment required to run the operator. + conda_type (str) + The type of conda pack (e.g., PACK_TYPE.CUSTOM). + path (str) + The location of the operator. + keywords (List[str]) + Keywords associated with the operator. + backends (List[str]) + List of supported backends. + + Properties + ---------- + conda_prefix (str) + Generates the conda prefix for the custom conda pack. + """ + + name: str + gpu: bool + short_description: str + description: str + version: str + conda: str + conda_type: str + path: str + keywords: List[str] + backends: List[str] + + @property + def conda_prefix(self) -> str: + """ + Generates conda prefix for the custom conda pack. + + Example + ------- + conda = "forecast_v1" + conda_prefix == "cpu/forecast/1/forecast_v1" + + Returns + ------- + str + The conda prefix for the custom conda pack. + """ + return os.path.join( + f"{ARCH_TYPE.GPU if self.gpu else ARCH_TYPE.CPU}", + self.name, + re.sub("[^0-9.]", "", self.version), + f"{self.name}_{self.version}", + ) + + @classmethod + def from_init(cls, **kwargs: Dict) -> "OperatorInfo": + """ + Instantiates the class from the initial operator details config. + + Parameters + ---------- + **kwargs (Dict) + Keyword arguments containing operator details. + + Returns + ------- + OperatorInfo + An instance of OperatorInfo. + """ + path = kwargs.get("__operator_path__") + operator_readme = None + if path: + readme_file_path = os.path.join(path, "readme.md") + if os.path.exists(readme_file_path): + with open(readme_file_path, "r") as readme_file: + operator_readme = readme_file.read() + + return OperatorInfo( + name=kwargs.get("__type__"), + gpu=kwargs.get("__gpu__", "").lower() == "yes", + description=operator_readme or kwargs.get("__short_description__"), + short_description=kwargs.get("__short_description__"), + version=kwargs.get("__version__"), + conda=kwargs.get("__conda__"), + conda_type=kwargs.get("__conda_type__", PACK_TYPE.CUSTOM), + path=path, + keywords=kwargs.get("__keywords__", []), + backends=kwargs.get("__backends__", []), + ) + + +class Loader(ABC): + """Operator Loader Interface. + + Attributes + ---------- + uri (str) + The operator's location (e.g., local path, HTTP path, OCI path, GIT path). + uri_dst (str) + The local folder where the operator can be downloaded from the remote location. + A temporary folder will be generated if not provided. + auth (Dict, optional) + Default authentication settings. + + Methods + ------- + load (**kwargs) + Downloads the operator's source code to the local folder. + cleanup (**kwargs) + Cleans up all temporary files and folders created during operator loading. + """ + + def __init__(self, uri: str, uri_dst: str = None, auth: Dict = None) -> None: + """ + Instantiates Loader. + + Parameters + ---------- + uri (str) + The operator's location. + uri_dst (str) + The local folder where the operator can be downloaded from the remote location. + A temporary folder will be generated if not provided. + auth (Dict, optional) + Default authentication settings. + """ + self.uri = uri + self.uri_dst = uri_dst + self.auth = auth + + @abstractmethod + def _load(self, **kwargs: Dict) -> OperatorInfo: + """ + Downloads the operator's source code to the local folder. + This method needs to be implemented on the child level. + + Parameters + ------------ + **kwargs (Dict) + Additional optional attributes. + + Returns + ------- + OperatorInfo + Information about the operator. + """ + pass + + def load(self, **kwargs: Dict) -> OperatorInfo: + """ + Downloads the operator's source code to the local folder. + + Parameters + ------------ + **kwargs (Dict) + Additional optional attributes. + + Returns + ------- + OperatorInfo + Information about the operator. + """ + operator_info = self._load(**kwargs) + # Adds the operators path to the system path. + # This will allow to execute the operator via runpy.run_module() + sys.path.insert(0, "/".join(operator_info.path.split("/")[0:-1])) + return operator_info + + def cleanup(self, **kwargs: Dict) -> None: + """ + Cleans up all temporary files and folders created during the loading of the operator. + + Parameters + ------------ + **kwargs (Dict) + Additional optional attributes. + """ + pass + + @classmethod + @abstractmethod + def compatible(cls, uri: str, **kwargs: Dict) -> bool: + """ + Checks if the loader is compatible with the given URI. + + Parameters + ------------ + uri (str) + The operator's location. + **kwargs (Dict) + Additional optional attributes. + Returns + ------- + bool + Whether the loader is compatible with the given URI. + """ + pass + + +class OperatorLoader: + """ + The operator loader class. + Helps to download the operator's source code to the local folder. + + Attributes + ---------- + loader (Loader) + The specific operator's loader. + """ + + def __init__(self, loader: Loader): + """ + Initializes OperatorLoader. + + Parameters + ---------- + loader (Loader) + The particular operator loader. + """ + self.loader = loader + + def load(self, **kwargs: Dict) -> OperatorInfo: + """ + Downloads the operator's source code to the local folder. + + Parameters + ------------ + **kwargs (Dict) + Additional optional attributes. + + Returns + ------- + OperatorInfo + Detailed information about the operator. + """ + return self.loader.load(**kwargs) + + @classmethod + def from_uri( + cls, uri: str, uri_dst: str = None, auth: Dict = None + ) -> "OperatorLoader": + """ + Constructs the operator's loader instance. + + Parameters + ---------- + uri (str) + The operator's location. + uri_dst (str) + The local folder where the operator can be downloaded from the remote location. + A temporary folder will be generated if not provided. + auth (Dict, optional) + Default authentication settings. + + Returns + ------- + OperatorLoader + An instance of OperatorLoader. + """ + if not uri: + raise ValueError("The `uri` attribute must be provided.") + + uri = os.path.expanduser(uri) + + for loader in ( + ServiceOperatorLoader, + LocalOperatorLoader, + GitOperatorLoader, + RemoteOperatorLoader, + ): + if loader.compatible(uri=uri, auth=auth): + return cls(loader=loader(uri=uri, uri_dst=uri_dst, auth=auth)) + + raise ValueError(f"The operator cannot be loaded from the given source: {uri}.") + + +class ServiceOperatorLoader(Loader): + """ + Class to load a service operator. + + Attributes + ---------- + uri (str) + The operator's location (e.g., local path, HTTP path, OCI path, GIT path). + uri_dst (str) + The local folder where the operator can be downloaded from the remote location. + A temporary folder will be generated if not provided. + auth (Dict, optional) + Default authentication settings. + """ + + def _load(self, **kwargs: Dict) -> OperatorInfo: + """ + Loads the service operator info. + + Parameters + ---------- + **kwargs (Dict) + Additional optional attributes. + + Returns + ------- + OperatorInfo + Detailed information about the operator. + """ + return _operator_info(name=self.uri) + + @classmethod + def compatible(cls, uri: str, **kwargs: Dict) -> bool: + """ + Checks if the loader is compatible with the given URI. + + Parameters + ---------- + uri (str) + The operator's location. + **kwargs (Dict) + Additional optional attributes. + + Returns + ------- + bool + Whether the loader is compatible with the given URI. + """ + return uri.lower() in __operators__ + + +class LocalOperatorLoader(Loader): + """ + Class to load a local operator. + + Attributes + ---------- + uri (str) + The operator's location (e.g., local path, HTTP path, OCI path, GIT path). + uri_dst (str) + The local folder where the operator can be downloaded from the remote location. + A temporary folder will be generated if not provided. + auth (Dict, optional) + Default authentication settings. + """ + + def _load(self, **kwargs: Dict) -> OperatorInfo: + """ + Loads the local operator info. + + Parameters + ---------- + **kwargs (Dict) + Additional optional attributes. + + Returns + ------- + OperatorInfo + Detailed information about the operator. + """ + return _operator_info(path=self.uri) + + @classmethod + def compatible(cls, uri: str, **kwargs: Dict) -> bool: + """Checks if the loader is compatible with the given URI. + + Parameters + ---------- + uri (str) + The operator's location. + **kwargs (Dict) + Additional optional attributes. + + Returns + ------- + bool + Whether the loader is compatible with the given URI. + """ + return not urlparse(uri).scheme + + +class RemoteOperatorLoader(Loader): + """ + Class to load an operator from a remote location (OCI Object Storage). + + Attributes + ---------- + uri (str) + The operator's location (e.g., local path, HTTP path, OCI path, GIT path). + uri_dst (str) + The local folder where the operator can be downloaded from the remote location. + A temporary folder will be generated if not provided. + auth (Dict, optional) + Default authentication settings. + """ + + def __init__(self, uri: str, uri_dst: str = None, auth: Dict = None) -> None: + """ + Instantiates Loader. + + Parameters + ---------- + uri (str) + The operator's location. + uri_dst (str) + The local folder where the operator can be downloaded from the remote location. + A temporary folder will be generated if not provided. + auth (Dict, optional) + Default authentication settings. + """ + super().__init__( + uri=uri, uri_dst=uri_dst, auth=auth or authutil.default_signer() + ) + + def _load(self, **kwargs: Dict) -> OperatorInfo: + """Downloads the operator's source code to the local folder. + + Parameters + ---------- + **kwargs (Dict) + Additional optional attributes. + + Returns + ------- + OperatorInfo + Detailed information about the operator. + """ + self.tmp_dir = tempfile.mkdtemp() if not self.uri_dst else None + uri_dst = os.path.join( + (self.uri_dst or self.tmp_dir).rstrip(), + os.path.splitext(os.path.basename(self.uri.rstrip()))[0], + ) + + logger.info(f"Downloading operator from `{self.uri}` to `{uri_dst}`.") + copy_from_uri( + self.uri, uri_dst, force_overwrite=True, auth=self.auth, unpack=True + ) + + return _operator_info(path=uri_dst) + + def cleanup(self, **kwargs: Dict) -> None: + """Cleans up all temporary files and folders created during operator loading. + + Parameters + ---------- + **kwargs (Dict) + Additional optional attributes. + """ + super().cleanup(**kwargs) + try: + shutil.rmtree(self.tmp_dir) + except Exception as ex: + logger.debug(ex) + + @classmethod + def compatible(cls, uri: str, **kwargs: Dict) -> bool: + """Checks if the loader is compatible with the given URI. + + Parameters + ---------- + uri (str) + The operator's location. + **kwargs (Dict) + Additional optional attributes. + Returns + ------- + bool + Whether the loader is compatible with the given URI. + """ + return urlparse(uri).scheme.lower() == "oci" + + +class GitOperatorLoader(Loader): + """ + Class to load an operator from a GIT repository. + Supported URI format: https://github.com/@# + Examples: + - https://github.com/my-operator-repository.git@feature-branch#forecasting + - https://github.com/my-operator-repository#forecasting + - https://github.com/my-operator-repository + + Attributes + ---------- + uri (str) + The operator's location (e.g., local path, HTTP path, OCI path, GIT path). + uri_dst (str) + The local folder where the operator can be downloaded from the remote location. + A temporary folder will be generated if not provided. + auth (Dict, optional) + Default authentication settings. + """ + + @runtime_dependency( + module="git", + err_msg=( + "The `git` library is required. " + "Use `pip install git` to install the `git` library." + ), + ) + def _load(self, **kwargs: Dict) -> OperatorInfo: + """ + Downloads the operator's source code to the local folder. + + Parameters + ---------- + **kwargs (Dict) + Additional optional attributes. + + Returns + ------- + OperatorInfo + Detailed information about the operator. + """ + import git + + self.tmp_dir = tempfile.mkdtemp() if not self.uri_dst else None + uri_dst = self.uri_dst or self.tmp_dir + + uri_dst = os.path.join( + (self.uri_dst or self.tmp_dir).rstrip(), + os.path.splitext(os.path.basename(self.uri.rstrip()))[0], + ) + + logger.info(f"Fetching operator from `{self.uri}` to `{uri_dst}`.") + + # Parse the GitHub URL + parsed_url = urlparse(self.uri) + logger.debug(parsed_url) + + branch = "main" # Default branch + repo_name = parsed_url.path + + if "@" in parsed_url.path: + # Extract the branch if provided in the URL + branch = parsed_url.path.split("@")[1] + repo_name = parsed_url.path.split("@")[0] + + # Construct the repository URL + repo_url = f"https://{parsed_url.netloc}{repo_name}" + logger.debug(repo_url) + + # Clone the GitHub repository to a temporary directory + with tempfile.TemporaryDirectory() as tmp_git_dir: + repo = git.Repo.clone_from(repo_url, tmp_git_dir, branch=branch) + + # Find the folder to download + if parsed_url.fragment: + folder_to_download = parsed_url.fragment + folder_path = os.path.join(tmp_git_dir, folder_to_download) + + if not os.path.exists(folder_path): + raise ValueError( + f"Folder '{folder_to_download}' not found in the repository." + ) + + # Move the folder to the desired local path + for item in glob.glob(os.path.join(folder_path, "**"), recursive=True): + destination_item = os.path.join( + uri_dst, os.path.relpath(item, folder_path) + ) + if os.path.isdir(item): + # If it's a directory, create it in the destination directory + if not os.path.exists(destination_item): + os.makedirs(destination_item) + else: + # If it's a file, move it to the destination directory + shutil.move(item, destination_item) + + # Clean up the temporary directory + repo.close() + + def cleanup(self, **kwargs: Dict) -> None: + """Cleans up all temporary files and folders created during operator loading. + + Parameters + ---------- + **kwargs (Dict) + Additional optional attributes. + """ + super().cleanup(**kwargs) + try: + shutil.rmtree(self.tmp_dir) + except Exception as ex: + logger.debug(ex) + + @classmethod + def compatible(cls, uri: str, **kwargs: Dict) -> bool: + """Checks if the loader is compatible with the given URI. + + Parameters + ---------- + uri (str) + The operator's location. + **kwargs (Dict) + Additional optional attributes. + + Returns + ------- + bool + Whether the loader is compatible with the given URI. + """ + return any(element in uri.lower() for element in ("github", ".git")) + + +def _module_from_file(module_name: str, module_path: str) -> Any: + """ + Loads module by it's location. + + Parameters + ---------- + module_name (str) + The name of the module to be imported. + module_path (str) + The physical path of the module. + + Returns + ------- + Loaded module. + """ + spec = importlib.util.spec_from_file_location(module_name, module_path) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module + + +def _module_constant_values(module_name: str, module_path: str) -> Dict[str, Any]: + """Returns the list of constant variables from a given module. + + Parameters + ---------- + module_name (str) + The name of the module to be imported. + module_path (str) + The physical path of the module. + + Returns + ------- + Dict[str, Any] + Map of variable names and their values. + """ + module = _module_from_file(module_name, module_path) + return {name: value for name, value in vars(module).items()} + + +def _operator_info(path: str = None, name: str = None) -> OperatorInfo: + """ + Extracts operator's details by given path. + The expectation is that the operator has an init file where all details are placed. + + Parameters + ------------ + path (str, optional) + The path to the operator. + name (str, optional) + The name of the service operator. + + Returns + ------- + OperatorInfo + The operator details. + """ + try: + if name: + path = os.path.dirname( + inspect.getfile( + importlib.import_module(f"{OPERATOR_MODULE_PATH}.{name}") + ) + ) + + module_name = os.path.basename(path.rstrip("/")) + module_path = f"{path.rstrip('/')}/__init__.py" + return OperatorInfo.from_init( + **_module_constant_values(module_name, module_path) + ) + except (ModuleNotFoundError, FileNotFoundError) as ex: + logger.debug(ex) + raise OperatorNotFoundError(name or path) + + +def _operator_info_list() -> List[OperatorInfo]: + """Returns the list of registered operators. + + Returns + ------- + List[OperatorInfo] + The list of registered operators. + """ + return (_operator_info(name=operator_name) for operator_name in __operators__) diff --git a/ads/opctl/operator/common/operator_yaml_generator.py b/ads/opctl/operator/common/operator_yaml_generator.py new file mode 100644 index 000000000..82f466788 --- /dev/null +++ b/ads/opctl/operator/common/operator_yaml_generator.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python +# -*- coding: utf-8; -*- + +# Copyright (c) 2023 Oracle and/or its affiliates. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ + + +from dataclasses import dataclass +from typing import Any, Dict, Optional + +import yaml + + +@dataclass +class YamlGenerator: + """ + Class for generating the YAML config based on the given YAML schema. + + Attributes + ---------- + schema: Dict + The schema of the template. + """ + + schema: Dict[str, Any] = None + + def generate_example(self, values: Optional[Dict[str, Any]] = None) -> str: + """ + Generate the YAML config based on the YAML schema. + + Properties + ---------- + values: Optional dictionary containing specific values for the attributes. + + Returns + ------- + str + The generated YAML config. + """ + example = self._generate_example(self.schema, values) + return yaml.dump(example) + + def _check_condition( + self, condition: Dict[str, Any], example: Dict[str, Any] + ) -> bool: + """ + Checks if the YAML schema condition fulfils. + This method is used to include conditional fields into the final config. + + Properties + ---------- + condition: Dict[str, Any] + The schema condition. + Example: + In the example below the `owner_name` field has dependency on the `model` field. + The `owner_name` will be included to the final config if only `model` is `prophet`. + owner_name: + type: string + dependencies: {"model":"prophet"} + example: Dict[str, Any] + The config to check if the dependable value presented there. + Returns + ------- + bool + True if the condition fulfils, false otherwise. + """ + for key, value in condition.items(): + if key not in example or example[key] != value: + return False + return True + + def _generate_example( + self, schema: Dict[str, Any], values: Optional[Dict[str, Any]] = None + ) -> Dict[str, Any]: + """ + Generates the final YAML config. + This is a recursive method, which iterates through the entire schema. + + Properties + ---------- + schema: Dict[str, Any] + The schema to generate the config. + values: Optional[Dict[str, Any]] + The optional values that would be used instead of default values provided in the schema. + + Returns + ------- + Dict + The result config. + """ + example = {} + for key, value in schema.items(): + # only generate values fro required fields + if value.get("required", False) or value.get("dependencies", False): + if not "dependencies" in value or self._check_condition( + value["dependencies"], example + ): + data_type = value.get("type") + + if key in values: + example[key] = values[key] + elif "default" in value: + example[key] = value["default"] + elif data_type == "string": + example[key] = "value" + elif data_type == "number": + example[key] = 1 + elif data_type == "boolean": + example[key] = True + elif data_type == "array": + example[key] = ["item1", "item2"] + elif data_type == "dict": + example[key] = self._generate_example( + schema=value.get("schema", {}), values=values + ) + return example diff --git a/ads/opctl/operator/common/utils.py b/ads/opctl/operator/common/utils.py index febcf9cef..94480fd90 100644 --- a/ads/opctl/operator/common/utils.py +++ b/ads/opctl/operator/common/utils.py @@ -5,13 +5,9 @@ # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ import argparse -import importlib -import inspect import os -import re -from dataclasses import dataclass from string import Template -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, List, Tuple import fsspec import yaml @@ -19,13 +15,9 @@ from yaml import SafeLoader from ads.opctl import logger -from ads.opctl.constants import OPERATOR_MODULE_PATH -from ads.opctl.operator.common.errors import OperatorNotFoundError from ads.opctl.operator import __operators__ from ads.opctl.utils import run_command -from .const import ARCH_TYPE, PACK_TYPE - CONTAINER_NETWORK = "CONTAINER_NETWORK" @@ -35,188 +27,6 @@ class OperatorValidator(Validator): pass -@dataclass -class OperatorInfo: - """Class representing short information about the operator. - - Attributes - ---------- - name: str - The name of the operator. - short_description: str - The short description of the operator. - description: str - The detailed description of the operator. - version: str - The version of the operator. - conda: str - The conda environment that have to be used to run the operator. - path: str - The operator location. - """ - - name: str - gpu: bool - short_description: str - description: str - version: str - conda: str - conda_type: str - path: str - keywords: List[str] - backends: List[str] - - @property - def conda_prefix(self) -> str: - """Generates conda prefix for the custom conda pack. - - Example: - conda = "forecast_v1" - conda_prefix == "cpu/forecast/1/forecast_v1" - - Returns - ------- - str - The conda prefix for the custom conda pack. - """ - return os.path.join( - f"{ARCH_TYPE.GPU if self.gpu else ARCH_TYPE.CPU}", - self.name, - re.sub("[^0-9.]", "", self.version), - f"{self.name}_{self.version}", - ) - - @classmethod - def from_init(*args: List, **kwargs: Dict) -> "OperatorInfo": - """Instantiates the class from the initial operator details config.""" - - path = kwargs.get("__operator_path__") - operator_readme = None - if path: - readme_file_path = os.path.join(path, "readme.md") - if os.path.exists(readme_file_path): - with open(readme_file_path, "r") as readme_file: - operator_readme = readme_file.read() - - return OperatorInfo( - name=kwargs.get("__type__"), - gpu=kwargs.get("__gpu__", "").lower() == "yes", - description=operator_readme or kwargs.get("__short_description__"), - short_description=kwargs.get("__short_description__"), - version=kwargs.get("__version__"), - conda=kwargs.get("__conda__"), - conda_type=kwargs.get("__conda_type__", PACK_TYPE.CUSTOM), - path=path, - keywords=kwargs.get("__keywords__", []), - backends=kwargs.get("__backends__", []), - ) - - -@dataclass -class YamlGenerator: - """ - Class for generating the YAML config based on the given YAML schema. - - Attributes - ---------- - schema: Dict - The schema of the template. - """ - - schema: Dict[str, Any] = None - - def generate_example(self, values: Optional[Dict[str, Any]] = None) -> str: - """ - Generate the YAML config based on the YAML schema. - - Properties - ---------- - values: Optional dictionary containing specific values for the attributes. - - Returns - ------- - str - The generated YAML config. - """ - example = self._generate_example(self.schema, values) - return yaml.dump(example) - - def _check_condition( - self, condition: Dict[str, Any], example: Dict[str, Any] - ) -> bool: - """ - Checks if the YAML schema condition fulfils. - This method is used to include conditional fields into the final config. - - Properties - ---------- - condition: Dict[str, Any] - The schema condition. - Example: - In the example below the `owner_name` field has dependency on the `model` field. - The `owner_name` will be included to the final config if only `model` is `prophet`. - owner_name: - type: string - dependencies: {"model":"prophet"} - example: Dict[str, Any] - The config to check if the dependable value presented there. - Returns - ------- - bool - True if the condition fulfils, false otherwise. - """ - for key, value in condition.items(): - if key not in example or example[key] != value: - return False - return True - - def _generate_example( - self, schema: Dict[str, Any], values: Optional[Dict[str, Any]] = None - ) -> Dict[str, Any]: - """ - Generates the final YAML config. - This is a recursive method, which iterates through the entire schema. - - Properties - ---------- - schema: Dict[str, Any] - The schema to generate the config. - values: Optional[Dict[str, Any]] - The optional values that would be used instead of default values provided in the schama. - - Returns - ------- - Dict - The result config. - """ - example = {} - for key, value in schema.items(): - # only generate values fro required fields - if value.get("required", False) or value.get("dependencies", False): - if not "dependencies" in value or self._check_condition( - value["dependencies"], example - ): - data_type = value.get("type") - - if key in values: - example[key] = values[key] - elif "default" in value: - example[key] = value["default"] - elif data_type == "string": - example[key] = "value" - elif data_type == "number": - example[key] = 1 - elif data_type == "boolean": - example[key] = True - elif data_type == "array": - example[key] = ["item1", "item2"] - elif data_type == "dict": - example[key] = self._generate_example( - schema=value.get("schema", {}), values=values - ) - return example - - def _build_image( dockerfile: str, image_name: str, @@ -292,70 +102,6 @@ def _build_image( return image_name -def _module_constant_values(module_name: str, module_path: str) -> Dict[str, Any]: - """Returns the list of constant variables from a given module. - - Parameters - ---------- - module_name: str - The name of the module to be imported. - module_path: str - The physical path of the module. - - Returns - ------- - Dict[str, Any] - Map of variable names and their values. - """ - spec = importlib.util.spec_from_file_location(module_name, module_path) - module = importlib.util.module_from_spec(spec) - spec.loader.exec_module(module) - return {name: value for name, value in vars(module).items()} - - -def _operator_info(path: str = None, name: str = None) -> OperatorInfo: - """Extracts operator's details by given path. - The expectation is that operator has init file where the all details placed. - - Parameters - ---------- - path: (str, optional). The path to the operator. - name: (str, optional). The name of the service operator. - - Returns - ------- - OperatorInfo - The operator details. - """ - try: - if name: - path = os.path.dirname( - inspect.getfile( - importlib.import_module(f"{OPERATOR_MODULE_PATH}.{name}") - ) - ) - - module_name = os.path.basename(path.rstrip("/")) - module_path = f"{path.rstrip('/')}/__init__.py" - return OperatorInfo.from_init( - **_module_constant_values(module_name, module_path) - ) - except ModuleNotFoundError as ex: - logger.debug(ex) - raise OperatorNotFoundError(name or path) - - -def _operator_info_list() -> List[OperatorInfo]: - """Returns the list of registered operators. - - Returns - ------- - List[OperatorInfo] - The list of registered operators. - """ - return (_operator_info(name=operator_name) for operator_name in __operators__) - - def _extant_file(x: str): """Checks the extension of the file to yaml.""" if not (x.lower().endswith(".yml") or x.lower().endswith(".yaml")): diff --git a/ads/opctl/operator/lowcode/forecast/__init__.py b/ads/opctl/operator/lowcode/forecast/__init__.py index 440acbf7f..518063a50 100644 --- a/ads/opctl/operator/lowcode/forecast/__init__.py +++ b/ads/opctl/operator/lowcode/forecast/__init__.py @@ -6,9 +6,11 @@ import os -__version__ = "v1" +__operator_path__ = os.path.dirname(__file__) -__type__ = "forecast" +__type__ = os.path.basename(__operator_path__.rstrip("/")) + +__version__ = "v1" __conda__ = f"{__type__}_{__version__}" @@ -18,9 +20,8 @@ __keywords__ = ["Prophet", "AutoML", "ARIMA", "RNN", "LSTM"] -__backends__ = ["job", "dataflow"] # job/dataflow +__backends__ = ["job", "dataflow"] # job/dataflow/ -__operator_path__ = os.path.dirname(__file__) __short_description__ = """ Forecasting operator, that leverages historical time series data to generate accurate diff --git a/ads/opctl/operator/lowcode/forecast/cmd.py b/ads/opctl/operator/lowcode/forecast/cmd.py index 8be6eb7d9..85f0619d3 100644 --- a/ads/opctl/operator/lowcode/forecast/cmd.py +++ b/ads/opctl/operator/lowcode/forecast/cmd.py @@ -9,7 +9,8 @@ import click from ads.opctl import logger -from ads.opctl.operator.common.utils import YamlGenerator, _load_yaml_from_uri +from ads.opctl.operator.common.utils import _load_yaml_from_uri +from ads.opctl.operator.common.operator_yaml_generator import YamlGenerator from .const import SupportedModels @@ -23,6 +24,9 @@ def init(**kwargs: Dict) -> str: kwargs: (Dict, optional). Additional key value arguments. + - type: str + The type of the operator. + Returns ------- str @@ -38,4 +42,4 @@ def init(**kwargs: Dict) -> str: return YamlGenerator( schema=_load_yaml_from_uri(__file__.replace("cmd.py", "schema.yaml")) - ).generate_example(values={"model": model_type}) + ).generate_example(values={"model": model_type, "type": kwargs.get("type")}) diff --git a/ads/opctl/operator/lowcode/forecast/environment.yaml b/ads/opctl/operator/lowcode/forecast/environment.yaml index 3d06f2d58..1292558ca 100644 --- a/ads/opctl/operator/lowcode/forecast/environment.yaml +++ b/ads/opctl/operator/lowcode/forecast/environment.yaml @@ -12,7 +12,7 @@ dependencies: - datapane - cerberus - sktime - - autots['additional'] + - autots[additional] - optuna==2.9.0 - oracle-automlx==23.2.3 - "git+https://github.com/oracle/accelerated-data-science.git@feature/forecasting#egg=oracle-ads" diff --git a/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py b/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py index 2fffb4736..6863ddf60 100644 --- a/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py +++ b/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py @@ -22,7 +22,7 @@ ) from ads.opctl import logger -from ...forecast.const import DEFAULT_TRIALS +from ..const import DEFAULT_TRIALS from .. import utils from .base_model import ForecastOperatorBaseModel diff --git a/ads/opctl/operator/lowcode/forecast/model/prophet.py b/ads/opctl/operator/lowcode/forecast/model/prophet.py index a02f65fc4..e256837af 100644 --- a/ads/opctl/operator/lowcode/forecast/model/prophet.py +++ b/ads/opctl/operator/lowcode/forecast/model/prophet.py @@ -9,7 +9,7 @@ import pandas as pd from ads.opctl import logger -from ...forecast.const import DEFAULT_TRIALS +from ..const import DEFAULT_TRIALS from .. import utils from .base_model import ForecastOperatorBaseModel diff --git a/ads/opctl/operator/lowcode/forecast/operator.py b/ads/opctl/operator/lowcode/forecast/operator.py index cd9ac5dcf..dbc66b362 100644 --- a/ads/opctl/operator/lowcode/forecast/operator.py +++ b/ads/opctl/operator/lowcode/forecast/operator.py @@ -15,7 +15,7 @@ def operate(operator_config: ForecastOperatorConfig) -> None: ForecastOperatorModelFactory.get_model(operator_config).generate_report() -def verify(spec: Dict, **kwargs) -> bool: +def verify(spec: Dict, **kwargs: Dict) -> bool: """Verifies the forecasting operator config.""" operator = ForecastOperatorConfig.from_dict(spec) msg_header = ( diff --git a/ads/opctl/operator/lowcode/forecast/schema.yaml b/ads/opctl/operator/lowcode/forecast/schema.yaml index e49d41be3..c0b62f0a0 100644 --- a/ads/opctl/operator/lowcode/forecast/schema.yaml +++ b/ads/opctl/operator/lowcode/forecast/schema.yaml @@ -17,8 +17,6 @@ version: description: "Operators may change yaml file schemas from version to version, as well as implementation details. Double check the version to ensure compatibility." type: - allowed: - - forecast required: true type: string default: forecast