Skip to content

Commit

Permalink
ODSC-47777/Custom Operator Integration (#348)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrDzurb authored Sep 26, 2023
2 parents 02c1f00 + d0669f5 commit 1d7d13a
Show file tree
Hide file tree
Showing 27 changed files with 1,299 additions and 633 deletions.
1 change: 0 additions & 1 deletion ads/common/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -1084,7 +1084,6 @@ def __enter__(self):
"""
self.previous_state = copy.deepcopy(AuthState())
set_auth(**self.kwargs)
return default_signer()

def __exit__(self, exc_type, exc_val, exc_tb):
"""
Expand Down
6 changes: 3 additions & 3 deletions ads/common/object_storage_details.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*--

# Copyright (c) 2021, 2022 Oracle and/or its affiliates.
# Copyright (c) 2021, 2023 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/

import json
Expand All @@ -15,7 +15,7 @@
from ads.common import oci_client


class InvalidObjectStoragePath(Exception): # pragma: no cover
class InvalidObjectStoragePath(Exception): # pragma: no cover
"""Invalid Object Storage Path."""

pass
Expand Down Expand Up @@ -137,4 +137,4 @@ def is_oci_path(uri: str = None) -> bool:
"""
if not uri:
return False
return uri.startswith("oci://")
return uri.lower().startswith("oci://")
6 changes: 5 additions & 1 deletion ads/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1324,11 +1324,15 @@ def copy_file(
If a destination file exists and `force_overwrite` set to `False`.
"""
chunk_size = chunk_size or DEFAULT_BUFFER_SIZE
auth = auth or authutil.default_signer()

if not os.path.basename(uri_dst):
uri_dst = os.path.join(uri_dst, os.path.basename(uri_src))
src_path_scheme = urlparse(uri_src).scheme or "file"

auth = auth or {}
if src_path_scheme.lower() == "oci" and not auth:
auth = authutil.default_signer()

src_file_system = fsspec.filesystem(src_path_scheme, **auth)

if not fsspec.filesystem(src_path_scheme, **auth).exists(uri_src):
Expand Down
42 changes: 37 additions & 5 deletions ads/opctl/backend/ads_dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from ads.opctl.constants import OPERATOR_MODULE_PATH
from ads.opctl.decorator.common import print_watch_command
from ads.opctl.operator.common.const import ENV_OPERATOR_ARGS
from ads.opctl.operator.common.operator_loader import OperatorInfo, OperatorLoader

REQUIRED_FIELDS = [
"compartment_id",
Expand Down Expand Up @@ -214,10 +215,36 @@ def watch(self):


class DataFlowOperatorBackend(DataFlowBackend):
"""Backend class to run operator on Data Flow Applications."""
"""
Backend class to run operator on Data Flow Application.
Attributes
----------
runtime_config: (Dict)
The runtime config for the operator.
operator_config: (Dict)
The operator specification config.
operator_type: str
The type of the operator.
operator_version: str
The version of the operator.
job: Job
The Data Science Job.
"""

def __init__(self, config: Dict, operator_info: OperatorInfo = None) -> None:
"""
Instantiates the operator backend.
def __init__(self, config: Dict) -> None:
super().__init__(config=config)
Parameters
----------
config: (Dict)
The configuration file containing operator's specification details and execution section.
operator_info: (OperatorInfo, optional)
The operator's detailed information extracted from the operator.__init__ file.
Will be extracted from the operator type in case if not provided.
"""
super().__init__(config=config or {})

self.job = None

Expand All @@ -231,13 +258,15 @@ def __init__(self, config: Dict) -> None:
}
self.operator_type = self.operator_config.get("type", "unknown")
self.operator_version = self.operator_config.get("version", "unknown")
self.operator_info = operator_info

def _adjust_common_information(self):
"""Adjusts common information of the application."""

if self.job.name.lower().startswith("{job"):
self.job.with_name(
f"job_{self.operator_type.lower()}" f"_{self.operator_version.lower()}"
f"job_{self.operator_info.name.lower()}"
f"_{self.operator_version.lower()}"
)
self.job.runtime.with_maximum_runtime_in_minutes(
self.config["execution"].get("max_wait_time", 1200)
Expand All @@ -247,7 +276,7 @@ def _adjust_common_information(self):

# prepare run.py file to run the operator
script_file = os.path.join(
temp_dir, f"{self.operator_type}_{int(time.time())}_run.py"
temp_dir, f"{self.operator_info.name}_{int(time.time())}_run.py"
)

operator_module = f"{OPERATOR_MODULE_PATH}.{self.operator_type}"
Expand Down Expand Up @@ -291,6 +320,9 @@ def run(self, **kwargs: Dict) -> Union[Dict, None]:
"""
Runs the operator on the Data Flow service.
"""
if not self.operator_info:
self.operator_info = OperatorLoader.from_uri(self.operator_type).load()

self.job = Job.from_dict(self.runtime_config).build()

# adjust job's common information
Expand Down
65 changes: 48 additions & 17 deletions ads/opctl/backend/ads_ml_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/

import copy
import importlib
import inspect
import json
import os
import shlex
Expand All @@ -31,12 +29,13 @@
from ads.opctl import logger
from ads.opctl.backend.base import Backend, RuntimeFactory
from ads.opctl.config.resolver import ConfigResolver
from ads.opctl.constants import DEFAULT_IMAGE_SCRIPT_DIR, OPERATOR_MODULE_PATH
from ads.opctl.constants import DEFAULT_IMAGE_SCRIPT_DIR
from ads.opctl.decorator.common import print_watch_command
from ads.opctl.distributed.common.cluster_config_helper import (
ClusterConfigToJobSpecConverter,
)
from ads.opctl.operator.common.const import ENV_OPERATOR_ARGS
from ads.opctl.operator.common.operator_loader import OperatorInfo, OperatorLoader

REQUIRED_FIELDS = [
"project_id",
Expand Down Expand Up @@ -528,10 +527,41 @@ def run(self, cluster_info, dry_run=False) -> None:


class MLJobOperatorBackend(MLJobBackend):
"""Backend class to run operator on Data Science Jobs."""
"""
Backend class to run operator on Data Science Jobs.
Currently supported two scenarios:
* Running operator within container runtime.
* Running operator within python runtime.
Attributes
----------
runtime_config: (Dict)
The runtime config for the operator.
operator_config: (Dict)
The operator specification config.
operator_type: str
The type of the operator.
operator_version: str
The version of the operator.
operator_info: OperatorInfo
The detailed information about the operator.
job: Job
The Data Science Job.
"""

def __init__(self, config: Dict, operator_info: OperatorInfo = None) -> None:
"""
Instantiates the operator backend.
def __init__(self, config: Dict) -> None:
super().__init__(config=config)
Parameters
----------
config: (Dict)
The configuration file containing operator's specification details and execution section.
operator_info: (OperatorInfo, optional)
The operator's detailed information extracted from the operator.__init__ file.
Will be extracted from the operator type in case if not provided.
"""
super().__init__(config=config or {})

self.job = None

Expand All @@ -552,12 +582,15 @@ def __init__(self, config: Dict) -> None:
PythonRuntime().type: self._adjust_python_runtime,
}

self.operator_info = operator_info

def _adjust_common_information(self):
"""Adjusts common information of the job."""

if self.job.name.lower().startswith("{job"):
self.job.with_name(
f"job_{self.operator_type.lower()}" f"_{self.operator_version.lower()}"
f"job_{self.operator_info.name.lower()}"
f"_{self.operator_version.lower()}"
)
self.job.runtime.with_maximum_runtime_in_minutes(
self.config["execution"].get("max_wait_time", 1200)
Expand All @@ -571,7 +604,7 @@ def _adjust_container_runtime(self):
[
"python3",
"-m",
f"{self.operator_type}",
f"{self.operator_info.name}",
]
)
self.job.runtime.with_environment_variable(
Expand All @@ -591,20 +624,15 @@ def _adjust_python_runtime(self):

# prepare run.sh file to run the operator's code
script_file = os.path.join(
temp_dir, f"{self.operator_type}_{int(time.time())}_run.sh"
temp_dir, f"{self.operator_info.name}_{int(time.time())}_run.sh"
)
with open(script_file, "w") as fp:
fp.write(f"python3 -m {self.operator_type}")
fp.write(f"python3 -m {self.operator_info.name}")

# copy the operator's source code to the temporary folder
operator_source_folder = os.path.dirname(
inspect.getfile(
importlib.import_module(f"{OPERATOR_MODULE_PATH}.{self.operator_type}")
)
)
shutil.copytree(
operator_source_folder.rstrip("/"),
os.path.join(temp_dir, self.operator_type),
self.operator_info.path.rstrip("/"),
os.path.join(temp_dir, self.operator_info.name),
dirs_exist_ok=True,
)

Expand All @@ -628,6 +656,9 @@ def run(self, **kwargs: Dict) -> Union[Dict, None]:
"""
Runs the operator on the Data Science Jobs.
"""
if not self.operator_info:
self.operator_info = OperatorLoader.from_uri(self.operator_type).load()

self.job = Job.from_dict(self.runtime_config).build()

# adjust job's common information
Expand Down
51 changes: 40 additions & 11 deletions ads/opctl/backend/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,12 @@
DEFAULT_NOTEBOOK_SESSION_SPARK_CONF_DIR,
ML_JOB_GPU_IMAGE,
ML_JOB_IMAGE,
OPERATOR_MODULE_PATH,
)
from ads.opctl.distributed.cmds import load_ini, local_run
from ads.opctl.model.cmds import _download_model
from ads.opctl.operator import __operators__
from ads.opctl.operator.common.const import ENV_OPERATOR_ARGS
from ads.opctl.operator.common.errors import OperatorNotFoundError
from ads.opctl.operator.common.operator_loader import OperatorInfo, OperatorLoader
from ads.opctl.operator.runtime import const as operator_runtime_const
from ads.opctl.operator.runtime import runtime as operator_runtime
from ads.opctl.spark.cmds import (
Expand Down Expand Up @@ -830,9 +829,38 @@ def _get_conda_info_from_runtime(artifact_dir):


class LocalOperatorBackend(Backend):
"""Class representing local operator backend."""
"""
The local operator backend to execute operator in the local environment.
Currently supported two scenarios:
* Running operator within local conda environment.
* Running operator within local container.
Attributes
----------
runtime_config: (Dict)
The runtime config for the operator.
operator_config: (Dict)
The operator specification config.
operator_type: str
The type of the operator.
operator_info: OperatorInfo
The detailed information about the operator.
"""

def __init__(
self, config: Optional[Dict], operator_info: OperatorInfo = None
) -> None:
"""
Instantiates the operator backend.
def __init__(self, config: Optional[Dict]) -> None:
Parameters
----------
config: (Dict)
The configuration file containing operator's specification details and execution section.
operator_info: (OperatorInfo, optional)
The operator's detailed information extracted from the operator.__init__ file.
Will be extracted from the operator type in case if not provided.
"""
super().__init__(config=config or {})

self.runtime_config = self.config.get("runtime", {})
Expand All @@ -850,6 +878,8 @@ def __init__(self, config: Optional[Dict]) -> None:
operator_runtime.PythonRuntime.type: self._run_with_python,
}

self.operator_info = operator_info

def _run_with_python(self) -> int:
"""Runs the operator within a local python environment.
Expand All @@ -865,15 +895,14 @@ def _run_with_python(self) -> int:
)

# run operator
operator_module = f"{OPERATOR_MODULE_PATH}.{self.operator_type}"
operator_spec = json.dumps(self.operator_config)
sys.argv = [operator_module, "--spec", operator_spec]
sys.argv = [self.operator_info.name, "--spec", operator_spec]

print(f"{'*' * 50} Runtime Config {'*' * 50}")
print(runtime.to_yaml())

try:
runpy.run_module(operator_module, run_name="__main__")
runpy.run_module(self.operator_info.name, run_name="__main__")
except SystemExit as exception:
return exception.code
else:
Expand Down Expand Up @@ -915,7 +944,7 @@ def _run_with_container(self) -> int:
image=runtime.spec.image,
bind_volumes=bind_volumes,
env_vars=env_vars,
command=f"'python3 -m {self.operator_type}'",
command=f"'python3 -m {self.operator_info.name}'",
)

def run(self, **kwargs: Dict) -> Dict:
Expand All @@ -925,15 +954,15 @@ def run(self, **kwargs: Dict) -> Dict:
runtime_type = self.runtime_config.get(
"type", operator_runtime.OPERATOR_LOCAL_RUNTIME_TYPE.PYTHON
)

if runtime_type not in self._RUNTIME_RUN_MAP:
raise RuntimeError(
f"Not supported runtime - {runtime_type} for local backend. "
f"Supported values: {self._RUNTIME_RUN_MAP.keys()}"
)

# check if operator exists
if self.operator_type not in __operators__:
raise OperatorNotFoundError(self.operator_type)
if not self.operator_info:
self.operator_info = OperatorLoader.from_uri(self.operator_type).load()

# run operator with provided runtime
exit_code = self._RUNTIME_RUN_MAP.get(runtime_type, lambda: None)(**kwargs)
Expand Down
Loading

0 comments on commit 1d7d13a

Please sign in to comment.