Skip to content

Commit

Permalink
ODSC-49155: Operator init method fails when OCI config is not provide…
Browse files Browse the repository at this point in the history
…d. (#394)
  • Loading branch information
mrDzurb authored Oct 30, 2023
2 parents a425a7e + bca8a9f commit ebd8c19
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 44 deletions.
14 changes: 9 additions & 5 deletions ads/jobs/builders/infrastructure/dsc_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -741,8 +741,8 @@ def cancel(self, wait_for_completion: bool = True) -> DataScienceJobRun:
self.client.cancel_job_run(self.id)
if wait_for_completion:
while (
self.lifecycle_state !=
oci.data_science.models.JobRun.LIFECYCLE_STATE_CANCELED
self.lifecycle_state
!= oci.data_science.models.JobRun.LIFECYCLE_STATE_CANCELED
):
self.sync()
time.sleep(SLEEP_INTERVAL)
Expand Down Expand Up @@ -1480,9 +1480,7 @@ def _update_job_infra(self, dsc_job: DSCJob) -> DataScienceJob:
] = JobInfrastructureConfigurationDetails.JOB_INFRASTRUCTURE_TYPE_STANDALONE

if self.storage_mount:
if not hasattr(
oci.data_science.models, "StorageMountConfigurationDetails"
):
if not hasattr(oci.data_science.models, "StorageMountConfigurationDetails"):
raise EnvironmentError(
"Storage mount hasn't been supported in the current OCI SDK installed."
)
Expand All @@ -1494,6 +1492,12 @@ def _update_job_infra(self, dsc_job: DSCJob) -> DataScienceJob:

def build(self) -> DataScienceJob:
self.dsc_job.load_defaults()

try:
self.dsc_job.load_defaults()
except Exception:
logger.exception("Failed to load default properties.")

self._update_from_dsc_model(self.dsc_job, overwrite=False)
return self

Expand Down
35 changes: 28 additions & 7 deletions ads/opctl/operator/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,16 @@ def create(debug: bool, **kwargs: Dict[str, Any]) -> None:
@with_auth
def verify(debug: bool, **kwargs: Dict[str, Any]) -> None:
"""Verifies the operator config."""
with fsspec.open(kwargs["file"], "r", **authutil.default_signer()) as f:

with fsspec.open(
kwargs["file"],
"r",
**(
authutil.default_signer()
if ObjectStorageDetails.is_oci_path(kwargs["file"])
else {}
),
) as f:
operator_spec = suppress_traceback(debug)(yaml.safe_load)(f.read())

suppress_traceback(debug)(cmd_verify)(operator_spec, **kwargs)
Expand Down Expand Up @@ -305,15 +314,27 @@ def run(debug: bool, **kwargs: Dict[str, Any]) -> None:
operator_spec = {}
backend = kwargs.pop("backend")

auth = {}
if any(ObjectStorageDetails.is_oci_path(uri) for uri in (kwargs["file"], backend)):
auth = authutil.default_signer()

with fsspec.open(kwargs["file"], "r", **auth) as f:
with fsspec.open(
kwargs["file"],
"r",
**(
authutil.default_signer()
if ObjectStorageDetails.is_oci_path(kwargs["file"])
else {}
),
) as f:
operator_spec = suppress_traceback(debug)(yaml.safe_load)(f.read())

if backend and backend.lower().endswith((".yaml", ".yml")):
with fsspec.open(backend, "r", **auth) as f:
with fsspec.open(
backend,
"r",
**(
authutil.default_signer()
if ObjectStorageDetails.is_oci_path(backend)
else {}
),
) as f:
backend = suppress_traceback(debug)(yaml.safe_load)(f.read())

suppress_traceback(debug)(cmd_run)(config=operator_spec, backend=backend, **kwargs)
55 changes: 36 additions & 19 deletions ads/opctl/operator/common/backend_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,13 @@ def backend(
backends = cls._init_backend_config(
operator_info=operator_info, backend_kind=backend_kind, **kwargs
)
backend = backends[cls.BACKEND_RUNTIME_MAP[backend_kind][runtime_type]]

backend = backends.get(cls.BACKEND_RUNTIME_MAP[backend_kind][runtime_type])
if not backend:
raise RuntimeError(
"An error occurred while attempting to load the "
f"configuration for the `{backend_kind}.{runtime_type}` backend."
)

p_backend = ConfigProcessor(
{**backend, **{"execution": {"backend": backend_kind}}}
Expand Down Expand Up @@ -400,24 +406,35 @@ def _init_backend_config(
supported_backends = (backend_kind,)

for resource_type in supported_backends:
for runtime_type_item in RUNTIME_TYPE_MAP.get(resource_type.lower(), []):
runtime_type, runtime_kwargs = next(iter(runtime_type_item.items()))

# get config info from ini files
p = ConfigProcessor(
{**runtime_kwargs, **{"execution": {"backend": resource_type}}}
).step(
ConfigMerger,
ads_config=ads_config or DEFAULT_ADS_CONFIG_FOLDER,
**kwargs,
try:
for runtime_type_item in RUNTIME_TYPE_MAP.get(
resource_type.lower(), []
):
runtime_type, runtime_kwargs = next(iter(runtime_type_item.items()))

# get config info from ini files
p = ConfigProcessor(
{**runtime_kwargs, **{"execution": {"backend": resource_type}}}
).step(
ConfigMerger,
ads_config=ads_config or DEFAULT_ADS_CONFIG_FOLDER,
**kwargs,
)

# generate YAML specification template
result[
(resource_type.lower(), runtime_type.value.lower())
] = yaml.load(
_BackendFactory(p.config).backend.init(
runtime_type=runtime_type.value,
**{**kwargs, **runtime_kwargs},
),
Loader=yaml.FullLoader,
)
except Exception as ex:
logger.warning(
f"Unable to generate the configuration for the `{resource_type}` backend. "
f"{ex}"
)

# generate YAML specification template
result[(resource_type.lower(), runtime_type.value.lower())] = yaml.load(
_BackendFactory(p.config).backend.init(
runtime_type=runtime_type.value,
**{**kwargs, **runtime_kwargs},
),
Loader=yaml.FullLoader,
)
return result
18 changes: 10 additions & 8 deletions ads/opctl/operator/lowcode/forecast/model/base_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
import pandas as pd

from ads.common.auth import default_signer
from ads.common.object_storage_details import ObjectStorageDetails
from ads.opctl import logger

from .. import utils
from ..const import SupportedModels, SupportedMetrics, SUMMARY_METRICS_HORIZON_LIMIT
from ..const import SUMMARY_METRICS_HORIZON_LIMIT, SupportedMetrics, SupportedModels
from ..operator_config import ForecastOperatorConfig, ForecastOperatorSpec
from .transformations import Transformations

Expand Down Expand Up @@ -238,7 +239,6 @@ def _load_data(self):
raw_data = utils._load_data(
filename=self.spec.historical_data.url,
format=self.spec.historical_data.format,
storage_options=default_signer(),
columns=self.spec.historical_data.columns,
)
self.original_user_data = raw_data.copy()
Expand All @@ -249,7 +249,6 @@ def _load_data(self):
additional_data = utils._load_data(
filename=self.spec.additional_data.url,
format=self.spec.additional_data.format,
storage_options=default_signer(),
columns=self.spec.additional_data.columns,
)

Expand All @@ -276,7 +275,6 @@ def _test_evaluate_metrics(
data = utils._load_data(
filename=test_filename,
format=self.spec.test_data.format,
storage_options=default_signer(),
columns=self.spec.test_data.columns,
)

Expand Down Expand Up @@ -392,11 +390,17 @@ def _save_report(
with tempfile.TemporaryDirectory() as temp_dir:
report_local_path = os.path.join(temp_dir, "___report.html")
dp.save_report(report_sections, report_local_path)

report_path = os.path.join(output_dir, self.spec.report_file_name)
with open(report_local_path) as f1:
with fsspec.open(
os.path.join(output_dir, self.spec.report_file_name),
report_path,
"w",
**default_signer(),
**(
default_signer()
if ObjectStorageDetails.is_oci_path(report_path)
else {}
),
) as f2:
f2.write(f1.read())

Expand All @@ -405,7 +409,6 @@ def _save_report(
data=result_df,
filename=os.path.join(output_dir, self.spec.forecast_filename),
format="csv",
storage_options=default_signer(),
)

# metrics csv report
Expand All @@ -414,7 +417,6 @@ def _save_report(
data=metrics_df.rename_axis("metrics").reset_index(),
filename=os.path.join(output_dir, self.spec.metrics_filename),
format="csv",
storage_options=default_signer(),
index=False,
)

Expand Down
18 changes: 13 additions & 5 deletions ads/opctl/operator/lowcode/forecast/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/

import os
from ads.opctl import logger
from typing import List

import fsspec
import numpy as np
import pandas as pd
Expand All @@ -17,11 +18,13 @@
mean_squared_error,
r2_score,
)
from typing import List
from .const import SupportedMetrics

from ads.common import auth as authutil
from ads.common.object_storage_details import ObjectStorageDetails
from ads.dataset.label_encoder import DataFrameLabelEncoder
from .const import SupportedModels, MAX_COLUMNS_AUTOMLX
from ads.opctl import logger

from .const import MAX_COLUMNS_AUTOMLX, SupportedMetrics, SupportedModels
from .errors import ForecastInputDataError, ForecastSchemaYamlError


Expand Down Expand Up @@ -126,10 +129,15 @@ def _build_metrics_per_horizon(
def _call_pandas_fsspec(pd_fn, filename, storage_options, **kwargs):
if fsspec.utils.get_protocol(filename) == "file":
return pd_fn(filename, **kwargs)

storage_options = storage_options or (
authutil.default_signer() if ObjectStorageDetails.is_oci_path(filename) else {}
)

return pd_fn(filename, storage_options=storage_options, **kwargs)


def _load_data(filename, format, storage_options, columns, **kwargs):
def _load_data(filename, format, storage_options=None, columns=None, **kwargs):
if not format:
_, format = os.path.splitext(filename)
format = format[1:]
Expand Down

0 comments on commit ebd8c19

Please sign in to comment.