Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…ence into feature/forecasting
  • Loading branch information
mrDzurb committed Sep 6, 2023
2 parents ea52e30 + a763f81 commit 37c64bb
Show file tree
Hide file tree
Showing 14 changed files with 403 additions and 134 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/run-unittests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ jobs:
fail-fast: false
matrix:
python-version: ["3.8", "3.9", "3.10"]
test-path: ["tests/unitary", "tests/unitary/with_extras/model"]
test-path: ["tests/unitary/with_extras tests/unitary/default_setup", "tests/unitary/with_extras/model"]
include:
- test-path: "tests/unitary"
- test-path: "tests/unitary/with_extras tests/unitary/default_setup"
ignore-path: "--ignore tests/unitary/with_extras/model --ignore tests/unitary/with_extras/feature_store"
name: "unitary"
- test-path: "tests/unitary/with_extras/model"
Expand Down
8 changes: 2 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,6 @@ You have various options when installing ADS.

### Installing extras libraries

The `all-optional` module will install all optional dependencies. Note the single quotes around installation of extra libraries.

```bash
python3 -m pip install 'oracle-ads[all-optional]'
```

To work with gradient boosting models, install the `boosted` module. This module includes XGBoost and LightGBM model classes.

```bash
Expand Down Expand Up @@ -107,6 +101,8 @@ Install the `viz` module to include libraries for visualization tasks. Some of t
python3 -m pip install 'oracle-ads[viz]'
```

See `pyproject.toml` file `[project.optional-dependencies]` section for full list of modules and its list of extra libraries.

**Note**

Multiple extra dependencies can be installed together. For example:
Expand Down
106 changes: 106 additions & 0 deletions ads/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@
from ads.dataset.progress import DummyProgressBar, TqdmProgressBar

from . import auth as authutil
from oci import object_storage
from ads.common.oci_client import OCIClientFactory
from ads.common.object_storage_details import ObjectStorageDetails

# For Model / Model Artifact libraries
lib_translator = {"sklearn": "scikit-learn"}
Expand Down Expand Up @@ -100,6 +103,9 @@

# declare custom exception class

# The number of worker processes to use in parallel for uploading individual parts of a multipart upload.
DEFAULT_PARALLEL_PROCESS_COUNT = 9


class FileOverwriteError(Exception): # pragma: no cover
pass
Expand Down Expand Up @@ -1605,3 +1611,103 @@ def is_path_exists(uri: str, auth: Optional[Dict] = None) -> bool:
if fsspec.filesystem(path_scheme, **storage_options).exists(uri):
return True
return False


def upload_to_os(
src_uri: str,
dst_uri: str,
auth: dict = None,
parallel_process_count: int = DEFAULT_PARALLEL_PROCESS_COUNT,
progressbar_description: str = "Uploading `{src_uri}` to `{dst_uri}`.",
force_overwrite: bool = False,
):
"""Utilizes `oci.object_storage.Uploadmanager` to upload file to Object Storage.
Parameters
----------
src_uri: str
The path to the file to upload. This should be local path.
dst_uri: str
Object Storage path, eg. `oci://my-bucket@my-tenancy/prefix``.
auth: (Dict, optional) Defaults to None.
default_signer()
parallel_process_count: (int, optional) Defaults to 3.
The number of worker processes to use in parallel for uploading individual
parts of a multipart upload.
progressbar_description: (str, optional) Defaults to `"Uploading `{src_uri}` to `{dst_uri}`"`.
Prefix for the progressbar.
force_overwrite: (bool, optional). Defaults to False.
Whether to overwrite existing files or not.
Returns
-------
Response: oci.response.Response
The response from multipart commit operation or the put operation.
Raise
-----
ValueError
When the given `dst_uri` is not a valid Object Storage path.
FileNotFoundError
When the given `src_uri` does not exist.
RuntimeError
When upload operation fails.
"""
if not os.path.exists(src_uri):
raise FileNotFoundError(f"The give src_uri: {src_uri} does not exist.")

if not ObjectStorageDetails.is_oci_path(
dst_uri
) or not ObjectStorageDetails.is_valid_uri(dst_uri):
raise ValueError(
f"The given dst_uri:{dst_uri} is not a valid Object Storage path."
)

auth = auth or authutil.default_signer()

if not force_overwrite and is_path_exists(dst_uri):
raise FileExistsError(
f"The `{dst_uri}` exists. Please use a new file name or "
"set force_overwrite to True if you wish to overwrite."
)

upload_manager = object_storage.UploadManager(
object_storage_client=OCIClientFactory(**auth).object_storage,
parallel_process_count=parallel_process_count,
allow_multipart_uploads=True,
allow_parallel_uploads=True,
)

file_size = os.path.getsize(src_uri)
with open(src_uri, "rb") as fs:
with tqdm(
total=file_size,
unit="B",
unit_scale=True,
unit_divisor=1024,
position=0,
leave=False,
file=sys.stdout,
desc=progressbar_description,
) as pbar:

def progress_callback(progress):
pbar.update(progress)

bucket_details = ObjectStorageDetails.from_path(dst_uri)
response = upload_manager.upload_stream(
namespace_name=bucket_details.namespace,
bucket_name=bucket_details.bucket,
object_name=bucket_details.filepath,
stream_ref=fs,
progress_callback=progress_callback,
)

if response.status == 200:
print(f"{src_uri} has been successfully uploaded to {dst_uri}.")
else:
raise RuntimeError(
f"Failed to upload {src_uri}. Response code is {response.status}"
)

return response
87 changes: 67 additions & 20 deletions ads/model/artifact_uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ def _upload(self):


class SmallArtifactUploader(ArtifactUploader):
"""The class helper to upload small model artifacts."""

PROGRESS_STEPS_COUNT = 1

def _upload(self):
Expand All @@ -104,6 +106,39 @@ def _upload(self):


class LargeArtifactUploader(ArtifactUploader):
"""
The class helper to upload large model artifacts.
Attributes
----------
artifact_path: str
The model artifact location.
artifact_zip_path: str
The uri of the zip of model artifact.
auth: dict
The default authetication is set using `ads.set_auth` API.
If you need to override the default, use the `ads.common.auth.api_keys` or
`ads.common.auth.resource_principal` to create appropriate authentication signer
and kwargs required to instantiate IdentityClient object.
bucket_uri: str
The OCI Object Storage URI where model artifacts will be copied to.
The `bucket_uri` is only necessary for uploading large artifacts which
size is greater than 2GB. Example: `oci://<bucket_name>@<namespace>/prefix/`.
dsc_model: OCIDataScienceModel
The data scince model instance.
overwrite_existing_artifact: bool
Overwrite target bucket artifact if exists.
progress: TqdmProgressBar
An instance of the TqdmProgressBar.
region: str
The destination Object Storage bucket region.
By default the value will be extracted from the `OCI_REGION_METADATA` environment variables.
remove_existing_artifact: bool
Wether artifacts uploaded to object storage bucket need to be removed or not.
upload_manager: UploadManager
The uploadManager simplifies interaction with the Object Storage service.
"""

PROGRESS_STEPS_COUNT = 4

def __init__(
Expand All @@ -115,6 +150,7 @@ def __init__(
region: Optional[str] = None,
overwrite_existing_artifact: Optional[bool] = True,
remove_existing_artifact: Optional[bool] = True,
parallel_process_count: int = utils.DEFAULT_PARALLEL_PROCESS_COUNT,
):
"""Initializes `LargeArtifactUploader` instance.
Expand All @@ -139,7 +175,9 @@ def __init__(
overwrite_existing_artifact: (bool, optional). Defaults to `True`.
Overwrite target bucket artifact if exists.
remove_existing_artifact: (bool, optional). Defaults to `True`.
Wether artifacts uploaded to object storage bucket need to be removed or not.
Whether artifacts uploaded to object storage bucket need to be removed or not.
parallel_process_count: (int, optional).
The number of worker processes to use in parallel for uploading individual parts of a multipart upload.
"""
if not bucket_uri:
raise ValueError("The `bucket_uri` must be provided.")
Expand All @@ -150,36 +188,45 @@ def __init__(
self.bucket_uri = bucket_uri
self.overwrite_existing_artifact = overwrite_existing_artifact
self.remove_existing_artifact = remove_existing_artifact
self._parallel_process_count = parallel_process_count

def _upload(self):
"""Uploads model artifacts to the model catalog."""
self.progress.update("Copying model artifact to the Object Storage bucket")

try:
bucket_uri = self.bucket_uri
bucket_uri_file_name = os.path.basename(bucket_uri)
bucket_uri = self.bucket_uri
bucket_uri_file_name = os.path.basename(bucket_uri)

if not bucket_uri_file_name:
bucket_uri = os.path.join(bucket_uri, f"{self.dsc_model.id}.zip")
elif not bucket_uri.lower().endswith(".zip"):
bucket_uri = f"{bucket_uri}.zip"
if not bucket_uri_file_name:
bucket_uri = os.path.join(bucket_uri, f"{self.dsc_model.id}.zip")
elif not bucket_uri.lower().endswith(".zip"):
bucket_uri = f"{bucket_uri}.zip"

bucket_file_name = utils.copy_file(
self.artifact_zip_path,
bucket_uri,
force_overwrite=self.overwrite_existing_artifact,
auth=self.auth,
progressbar_description="Copying model artifact to the Object Storage bucket",
)
except FileExistsError:
if not self.overwrite_existing_artifact and utils.is_path_exists(
uri=bucket_uri, auth=self.auth
):
raise FileExistsError(
f"The `{self.bucket_uri}` exists. Please use a new file name or "
f"The bucket_uri=`{self.bucket_uri}` exists. Please use a new file name or "
"set `overwrite_existing_artifact` to `True` if you wish to overwrite."
)

try:
utils.upload_to_os(
src_uri=self.artifact_zip_path,
dst_uri=bucket_uri,
auth=self.auth,
parallel_process_count=self._parallel_process_count,
force_overwrite=self.overwrite_existing_artifact,
progressbar_description="Copying model artifact to the Object Storage bucket.",
)
except Exception as ex:
raise RuntimeError(
f"Failed to upload model artifact to the given Object Storage path `{self.bucket_uri}`."
f"See Exception: {ex}"
)

self.progress.update("Exporting model artifact to the model catalog")
self.dsc_model.export_model_artifact(
bucket_uri=bucket_file_name, region=self.region
)
self.dsc_model.export_model_artifact(bucket_uri=bucket_uri, region=self.region)

if self.remove_existing_artifact:
self.progress.update(
Expand Down
9 changes: 8 additions & 1 deletion ads/model/datascience_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
_MAX_ARTIFACT_SIZE_IN_BYTES = 2147483648 # 2GB


class ModelArtifactSizeError(Exception): # pragma: no cover
class ModelArtifactSizeError(Exception): # pragma: no cover
def __init__(self, max_artifact_size: str):
super().__init__(
f"The model artifacts size is greater than `{max_artifact_size}`. "
Expand Down Expand Up @@ -562,6 +562,8 @@ def create(self, **kwargs) -> "DataScienceModel":
and kwargs required to instantiate IdentityClient object.
timeout: (int, optional). Defaults to 10 seconds.
The connection timeout in seconds for the client.
parallel_process_count: (int, optional).
The number of worker processes to use in parallel for uploading individual parts of a multipart upload.
Returns
-------
Expand Down Expand Up @@ -607,6 +609,7 @@ def create(self, **kwargs) -> "DataScienceModel":
region=kwargs.pop("region", None),
auth=kwargs.pop("auth", None),
timeout=kwargs.pop("timeout", None),
parallel_process_count=kwargs.pop("parallel_process_count", None),
)

# Sync up model
Expand All @@ -623,6 +626,7 @@ def upload_artifact(
overwrite_existing_artifact: Optional[bool] = True,
remove_existing_artifact: Optional[bool] = True,
timeout: Optional[int] = None,
parallel_process_count: int = utils.DEFAULT_PARALLEL_PROCESS_COUNT,
) -> None:
"""Uploads model artifacts to the model catalog.
Expand All @@ -646,6 +650,8 @@ def upload_artifact(
Wether artifacts uploaded to object storage bucket need to be removed or not.
timeout: (int, optional). Defaults to 10 seconds.
The connection timeout in seconds for the client.
parallel_process_count: (int, optional)
The number of worker processes to use in parallel for uploading individual parts of a multipart upload.
"""
# Upload artifact to the model catalog
if not self.artifact:
Expand Down Expand Up @@ -676,6 +682,7 @@ def upload_artifact(
bucket_uri=bucket_uri,
overwrite_existing_artifact=overwrite_existing_artifact,
remove_existing_artifact=remove_existing_artifact,
parallel_process_count=parallel_process_count,
)
else:
artifact_uploader = SmallArtifactUploader(
Expand Down
Loading

0 comments on commit 37c64bb

Please sign in to comment.