Skip to content

Commit

Permalink
[HWORKS-835] Download model artifact without compressing files (logic…
Browse files Browse the repository at this point in the history
  • Loading branch information
javierdlrm authored Nov 13, 2023
1 parent 273c302 commit 4f17aad
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 24 deletions.
4 changes: 1 addition & 3 deletions python/hsml/core/dataset_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,7 @@ def download(self, path, local_path):
) as response:
with open(local_path, "wb") as f:
downloaded = 0
file_size = response.headers.get("Content-Length")
if not file_size:
print("Downloading file ...", end=" ")
# if not response.headers.get("Content-Length"), file is still downloading
for chunk in response.iter_content(
chunk_size=self.DEFAULT_FLOW_CHUNK_SIZE
):
Expand Down
1 change: 0 additions & 1 deletion python/hsml/core/native_hdfs_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ def upload(self, local_path: str, remote_path: str):

def download(self, remote_path: str, local_path: str):
# copy from hdfs to local fs
print("Downloading file ...", end=" ")
hdfs.get(remote_path, local_path)

def copy(self, source_path: str, destination_path: str):
Expand Down
90 changes: 70 additions & 20 deletions python/hsml/engine/model_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,59 @@ def _copy_or_move_hopsfs_model(
n_dirs += 1
update_upload_progress(n_dirs=n_dirs, n_files=n_files)

def _download_model_from_hopsfs_recursive(
self,
from_hdfs_model_path: str,
to_local_path: str,
update_download_progress,
n_dirs,
n_files,
):
"""Download model files from a model path in hdfs, recursively"""

for entry in self._dataset_api.list(from_hdfs_model_path, sort_by="NAME:desc")[
"items"
]:
path = entry["attributes"]["path"]
basename = os.path.basename(path)
if "." in path:
# we assume that if a dot is contained in the path, it's the path to a file
local_file_path = os.path.join(to_local_path, basename)
self._engine.download(path, local_file_path)
n_files += 1
update_download_progress(n_dirs=n_dirs, n_files=n_files)
else:
# otherwise, it's a folder
if basename == "Artifacts":
continue # skip Artifacts subfolder
local_folder_path = os.path.join(to_local_path, basename)
os.mkdir(local_folder_path)
n_dirs, n_files = self._download_model_from_hopsfs_recursive(
from_hdfs_model_path=path,
to_local_path=local_folder_path,
update_download_progress=update_download_progress,
n_dirs=n_dirs,
n_files=n_files,
)
n_dirs += 1
update_download_progress(n_dirs=n_dirs, n_files=n_files)

return n_dirs, n_files

def _download_model_from_hopsfs(
self, from_hdfs_model_path: str, to_local_path: str, update_download_progress
):
"""Download model files from a model path in hdfs."""

n_dirs, n_files = self._download_model_from_hopsfs_recursive(
from_hdfs_model_path=from_hdfs_model_path,
to_local_path=to_local_path,
update_download_progress=update_download_progress,
n_dirs=0,
n_files=0,
)
update_download_progress(n_dirs=n_dirs, n_files=n_files, done=True)

def _upload_local_model(
self,
from_local_model_path,
Expand Down Expand Up @@ -341,31 +394,28 @@ def download(self, model_instance):
tempfile.gettempdir(), str(uuid.uuid4()), model_instance._name
)
model_version_path = model_name_path + "/" + str(model_instance._version)
zip_path = model_version_path + ".zip"
os.makedirs(model_name_path)
os.makedirs(model_version_path)

temp_download_dir = "/Resources" + "/" + str(uuid.uuid4())
try:
self._engine.mkdir(temp_download_dir)
self._dataset_api.zip(
model_instance.version_path,
destination_path=temp_download_dir,
block=True,
timeout=600,
def update_download_progress(n_dirs, n_files, done=False):
print(
"Downloading model artifact (%s dirs, %s files)... %s"
% (n_dirs, n_files, "DONE" if done else ""),
end="\r",
)
self._engine.download(
temp_download_dir + "/" + str(model_instance._version) + ".zip",
zip_path,

try:
from_hdfs_model_path = model_instance.version_path
if from_hdfs_model_path.startswith("hdfs:/"):
projects_index = from_hdfs_model_path.find("/Projects", 0)
from_hdfs_model_path = from_hdfs_model_path[projects_index:]

self._download_model_from_hopsfs(
from_hdfs_model_path=from_hdfs_model_path,
to_local_path=model_version_path,
update_download_progress=update_download_progress,
)
self._dataset_api.rm(temp_download_dir)
util.decompress(zip_path, extract_dir=model_name_path)
os.remove(zip_path)
except BaseException as be:
raise be
finally:
if os.path.exists(zip_path):
os.remove(zip_path)
self._dataset_api.rm(temp_download_dir)

return model_version_path

Expand Down

0 comments on commit 4f17aad

Please sign in to comment.