Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ODSC-46634/utilize oci UploadManager to upload model artifacts #304

Merged
merged 8 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions ads/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@
from ads.dataset.progress import DummyProgressBar, TqdmProgressBar

from . import auth as authutil
from oci import object_storage
from ads.common.oci_client import OCIClientFactory
from ads.common.object_storage_details import ObjectStorageDetails

# For Model / Model Artifact libraries
lib_translator = {"sklearn": "scikit-learn"}
Expand Down Expand Up @@ -100,6 +103,9 @@

# declare custom exception class

# The number of worker processes to use in parallel for uploading individual parts of a multipart upload.
DEFAULT_PARALLEL_PROCESS_COUNT = 9


class FileOverwriteError(Exception): # pragma: no cover
pass
Expand Down Expand Up @@ -1599,3 +1605,103 @@ def is_path_exists(uri: str, auth: Optional[Dict] = None) -> bool:
if fsspec.filesystem(path_scheme, **storage_options).exists(uri):
return True
return False


def upload_to_os(
src_uri: str,
dst_uri: str,
auth: dict = None,
parallel_process_count: int = DEFAULT_PARALLEL_PROCESS_COUNT,
progressbar_description: str = "Uploading `{src_uri}` to `{dst_uri}`.",
force_overwrite: bool = False,
):
"""Utilizes `oci.object_storage.Uploadmanager` to upload file to Object Storage.

Parameters
----------
src_uri: str
The path to the file to upload. This should be local path.
dst_uri: str
Object Storage path, eg. `oci://my-bucket@my-tenancy/prefix``.
auth: (Dict, optional) Defaults to None.
default_signer()
parallel_process_count: (int, optional) Defaults to 3.
The number of worker processes to use in parallel for uploading individual
parts of a multipart upload.
progressbar_description: (str, optional) Defaults to `"Uploading `{src_uri}` to `{dst_uri}`"`.
Prefix for the progressbar.
force_overwrite: (bool, optional). Defaults to False.
Whether to overwrite existing files or not.

Returns
-------
Response: oci.response.Response
The response from multipart commit operation or the put operation.

Raise
-----
ValueError
When the given `dst_uri` is not a valid Object Storage path.
FileNotFoundError
When the given `src_uri` does not exist.
RuntimeError
When upload operation fails.
"""
if not os.path.exists(src_uri):
raise FileNotFoundError(f"The give src_uri: {src_uri} does not exist.")

if not ObjectStorageDetails.is_oci_path(
dst_uri
) or not ObjectStorageDetails.is_valid_uri(dst_uri):
raise ValueError(
f"The given dst_uri:{dst_uri} is not a valid Object Storage path."
)

auth = auth or authutil.default_signer()

if not force_overwrite and is_path_exists(dst_uri):
raise FileExistsError(
f"The `{dst_uri}` exists. Please use a new file name or "
"set force_overwrite to True if you wish to overwrite."
)

upload_manager = object_storage.UploadManager(
object_storage_client=OCIClientFactory(**auth).object_storage,
parallel_process_count=parallel_process_count,
allow_multipart_uploads=True,
allow_parallel_uploads=True,
)

file_size = os.path.getsize(src_uri)
with open(src_uri, "rb") as fs:
with tqdm(
total=file_size,
unit="B",
unit_scale=True,
unit_divisor=1024,
position=0,
leave=False,
file=sys.stdout,
desc=progressbar_description,
) as pbar:

def progress_callback(progress):
pbar.update(progress)

bucket_details = ObjectStorageDetails.from_path(dst_uri)
response = upload_manager.upload_stream(
namespace_name=bucket_details.namespace,
bucket_name=bucket_details.bucket,
object_name=bucket_details.filepath,
stream_ref=fs,
progress_callback=progress_callback,
)

if response.status == 200:
print(f"{src_uri} has been successfully uploaded to {dst_uri}.")
else:
raise RuntimeError(
f"Failed to upload {src_uri}. Response code is {response.status}"
)

return response
87 changes: 67 additions & 20 deletions ads/model/artifact_uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ def _upload(self):


class SmallArtifactUploader(ArtifactUploader):
"""The class helper to upload small model artifacts."""

PROGRESS_STEPS_COUNT = 1

def _upload(self):
Expand All @@ -104,6 +106,39 @@ def _upload(self):


class LargeArtifactUploader(ArtifactUploader):
"""
The class helper to upload large model artifacts.

Attributes
mingkang111 marked this conversation as resolved.
Show resolved Hide resolved
----------
artifact_path: str
The model artifact location.
artifact_zip_path: str
The uri of the zip of model artifact.
auth: dict
The default authetication is set using `ads.set_auth` API.
If you need to override the default, use the `ads.common.auth.api_keys` or
`ads.common.auth.resource_principal` to create appropriate authentication signer
and kwargs required to instantiate IdentityClient object.
bucket_uri: str
The OCI Object Storage URI where model artifacts will be copied to.
The `bucket_uri` is only necessary for uploading large artifacts which
size is greater than 2GB. Example: `oci://<bucket_name>@<namespace>/prefix/`.
dsc_model: OCIDataScienceModel
The data scince model instance.
overwrite_existing_artifact: bool
Overwrite target bucket artifact if exists.
progress: TqdmProgressBar
An instance of the TqdmProgressBar.
region: str
The destination Object Storage bucket region.
By default the value will be extracted from the `OCI_REGION_METADATA` environment variables.
remove_existing_artifact: bool
Wether artifacts uploaded to object storage bucket need to be removed or not.
upload_manager: UploadManager
The uploadManager simplifies interaction with the Object Storage service.
"""

PROGRESS_STEPS_COUNT = 4

def __init__(
Expand All @@ -115,6 +150,7 @@ def __init__(
region: Optional[str] = None,
overwrite_existing_artifact: Optional[bool] = True,
remove_existing_artifact: Optional[bool] = True,
parallel_process_count: int = utils.DEFAULT_PARALLEL_PROCESS_COUNT,
):
"""Initializes `LargeArtifactUploader` instance.

Expand All @@ -139,7 +175,9 @@ def __init__(
overwrite_existing_artifact: (bool, optional). Defaults to `True`.
Overwrite target bucket artifact if exists.
remove_existing_artifact: (bool, optional). Defaults to `True`.
Wether artifacts uploaded to object storage bucket need to be removed or not.
Whether artifacts uploaded to object storage bucket need to be removed or not.
parallel_process_count: (int, optional).
The number of worker processes to use in parallel for uploading individual parts of a multipart upload.
"""
if not bucket_uri:
raise ValueError("The `bucket_uri` must be provided.")
Expand All @@ -150,36 +188,45 @@ def __init__(
self.bucket_uri = bucket_uri
self.overwrite_existing_artifact = overwrite_existing_artifact
self.remove_existing_artifact = remove_existing_artifact
self._parallel_process_count = parallel_process_count

def _upload(self):
"""Uploads model artifacts to the model catalog."""
self.progress.update("Copying model artifact to the Object Storage bucket")

try:
bucket_uri = self.bucket_uri
bucket_uri_file_name = os.path.basename(bucket_uri)
bucket_uri = self.bucket_uri
bucket_uri_file_name = os.path.basename(bucket_uri)

if not bucket_uri_file_name:
bucket_uri = os.path.join(bucket_uri, f"{self.dsc_model.id}.zip")
elif not bucket_uri.lower().endswith(".zip"):
bucket_uri = f"{bucket_uri}.zip"
if not bucket_uri_file_name:
bucket_uri = os.path.join(bucket_uri, f"{self.dsc_model.id}.zip")
elif not bucket_uri.lower().endswith(".zip"):
bucket_uri = f"{bucket_uri}.zip"

bucket_file_name = utils.copy_file(
self.artifact_zip_path,
bucket_uri,
force_overwrite=self.overwrite_existing_artifact,
auth=self.auth,
progressbar_description="Copying model artifact to the Object Storage bucket",
)
except FileExistsError:
if not self.overwrite_existing_artifact and utils.is_path_exists(
uri=bucket_uri, auth=self.auth
):
raise FileExistsError(
f"The `{self.bucket_uri}` exists. Please use a new file name or "
f"The bucket_uri=`{self.bucket_uri}` exists. Please use a new file name or "
"set `overwrite_existing_artifact` to `True` if you wish to overwrite."
)

try:
utils.upload_to_os(
src_uri=self.artifact_zip_path,
dst_uri=bucket_uri,
auth=self.auth,
parallel_process_count=self._parallel_process_count,
force_overwrite=self.overwrite_existing_artifact,
progressbar_description="Copying model artifact to the Object Storage bucket.",
)
except Exception as ex:
raise RuntimeError(
f"Failed to upload model artifact to the given Object Storage path `{self.bucket_uri}`."
f"See Exception: {ex}"
)

self.progress.update("Exporting model artifact to the model catalog")
self.dsc_model.export_model_artifact(
bucket_uri=bucket_file_name, region=self.region
)
self.dsc_model.export_model_artifact(bucket_uri=bucket_uri, region=self.region)

if self.remove_existing_artifact:
self.progress.update(
Expand Down
9 changes: 8 additions & 1 deletion ads/model/datascience_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
_MAX_ARTIFACT_SIZE_IN_BYTES = 2147483648 # 2GB


class ModelArtifactSizeError(Exception): # pragma: no cover
class ModelArtifactSizeError(Exception): # pragma: no cover
def __init__(self, max_artifact_size: str):
super().__init__(
f"The model artifacts size is greater than `{max_artifact_size}`. "
Expand Down Expand Up @@ -562,6 +562,8 @@ def create(self, **kwargs) -> "DataScienceModel":
and kwargs required to instantiate IdentityClient object.
timeout: (int, optional). Defaults to 10 seconds.
The connection timeout in seconds for the client.
parallel_process_count: (int, optional).
The number of worker processes to use in parallel for uploading individual parts of a multipart upload.

Returns
-------
Expand Down Expand Up @@ -607,6 +609,7 @@ def create(self, **kwargs) -> "DataScienceModel":
region=kwargs.pop("region", None),
auth=kwargs.pop("auth", None),
timeout=kwargs.pop("timeout", None),
parallel_process_count=kwargs.pop("parallel_process_count", None),
)

# Sync up model
Expand All @@ -623,6 +626,7 @@ def upload_artifact(
overwrite_existing_artifact: Optional[bool] = True,
remove_existing_artifact: Optional[bool] = True,
timeout: Optional[int] = None,
parallel_process_count: int = utils.DEFAULT_PARALLEL_PROCESS_COUNT,
) -> None:
"""Uploads model artifacts to the model catalog.

Expand All @@ -646,6 +650,8 @@ def upload_artifact(
Wether artifacts uploaded to object storage bucket need to be removed or not.
timeout: (int, optional). Defaults to 10 seconds.
The connection timeout in seconds for the client.
parallel_process_count: (int, optional)
The number of worker processes to use in parallel for uploading individual parts of a multipart upload.
"""
# Upload artifact to the model catalog
if not self.artifact:
Expand Down Expand Up @@ -676,6 +682,7 @@ def upload_artifact(
bucket_uri=bucket_uri,
overwrite_existing_artifact=overwrite_existing_artifact,
remove_existing_artifact=remove_existing_artifact,
parallel_process_count=parallel_process_count,
)
else:
artifact_uploader = SmallArtifactUploader(
Expand Down
Loading