diff --git a/python/hsml/core/dataset_api.py b/python/hsml/core/dataset_api.py index 27d663bb7..f6a282dfc 100644 --- a/python/hsml/core/dataset_api.py +++ b/python/hsml/core/dataset_api.py @@ -239,9 +239,7 @@ def download(self, path, local_path): ) as response: with open(local_path, "wb") as f: downloaded = 0 - file_size = response.headers.get("Content-Length") - if not file_size: - print("Downloading file ...", end=" ") + # if not response.headers.get("Content-Length"), file is still downloading for chunk in response.iter_content( chunk_size=self.DEFAULT_FLOW_CHUNK_SIZE ): diff --git a/python/hsml/core/native_hdfs_api.py b/python/hsml/core/native_hdfs_api.py index 1fc5c90ef..fadd856ea 100644 --- a/python/hsml/core/native_hdfs_api.py +++ b/python/hsml/core/native_hdfs_api.py @@ -48,7 +48,6 @@ def upload(self, local_path: str, remote_path: str): def download(self, remote_path: str, local_path: str): # copy from hdfs to local fs - print("Downloading file ...", end=" ") hdfs.get(remote_path, local_path) def copy(self, source_path: str, destination_path: str): diff --git a/python/hsml/engine/model_engine.py b/python/hsml/engine/model_engine.py index e4701217e..7e871e600 100644 --- a/python/hsml/engine/model_engine.py +++ b/python/hsml/engine/model_engine.py @@ -117,6 +117,59 @@ def _copy_or_move_hopsfs_model( n_dirs += 1 update_upload_progress(n_dirs=n_dirs, n_files=n_files) + def _download_model_from_hopsfs_recursive( + self, + from_hdfs_model_path: str, + to_local_path: str, + update_download_progress, + n_dirs, + n_files, + ): + """Download model files from a model path in hdfs, recursively""" + + for entry in self._dataset_api.list(from_hdfs_model_path, sort_by="NAME:desc")[ + "items" + ]: + path = entry["attributes"]["path"] + basename = os.path.basename(path) + if "." in path: + # we assume that if a dot is contained in the path, it's the path to a file + local_file_path = os.path.join(to_local_path, basename) + self._engine.download(path, local_file_path) + n_files += 1 + update_download_progress(n_dirs=n_dirs, n_files=n_files) + else: + # otherwise, it's a folder + if basename == "Artifacts": + continue # skip Artifacts subfolder + local_folder_path = os.path.join(to_local_path, basename) + os.mkdir(local_folder_path) + n_dirs, n_files = self._download_model_from_hopsfs_recursive( + from_hdfs_model_path=path, + to_local_path=local_folder_path, + update_download_progress=update_download_progress, + n_dirs=n_dirs, + n_files=n_files, + ) + n_dirs += 1 + update_download_progress(n_dirs=n_dirs, n_files=n_files) + + return n_dirs, n_files + + def _download_model_from_hopsfs( + self, from_hdfs_model_path: str, to_local_path: str, update_download_progress + ): + """Download model files from a model path in hdfs.""" + + n_dirs, n_files = self._download_model_from_hopsfs_recursive( + from_hdfs_model_path=from_hdfs_model_path, + to_local_path=to_local_path, + update_download_progress=update_download_progress, + n_dirs=0, + n_files=0, + ) + update_download_progress(n_dirs=n_dirs, n_files=n_files, done=True) + def _upload_local_model( self, from_local_model_path, @@ -341,31 +394,28 @@ def download(self, model_instance): tempfile.gettempdir(), str(uuid.uuid4()), model_instance._name ) model_version_path = model_name_path + "/" + str(model_instance._version) - zip_path = model_version_path + ".zip" - os.makedirs(model_name_path) + os.makedirs(model_version_path) - temp_download_dir = "/Resources" + "/" + str(uuid.uuid4()) - try: - self._engine.mkdir(temp_download_dir) - self._dataset_api.zip( - model_instance.version_path, - destination_path=temp_download_dir, - block=True, - timeout=600, + def update_download_progress(n_dirs, n_files, done=False): + print( + "Downloading model artifact (%s dirs, %s files)... %s" + % (n_dirs, n_files, "DONE" if done else ""), + end="\r", ) - self._engine.download( - temp_download_dir + "/" + str(model_instance._version) + ".zip", - zip_path, + + try: + from_hdfs_model_path = model_instance.version_path + if from_hdfs_model_path.startswith("hdfs:/"): + projects_index = from_hdfs_model_path.find("/Projects", 0) + from_hdfs_model_path = from_hdfs_model_path[projects_index:] + + self._download_model_from_hopsfs( + from_hdfs_model_path=from_hdfs_model_path, + to_local_path=model_version_path, + update_download_progress=update_download_progress, ) - self._dataset_api.rm(temp_download_dir) - util.decompress(zip_path, extract_dir=model_name_path) - os.remove(zip_path) except BaseException as be: raise be - finally: - if os.path.exists(zip_path): - os.remove(zip_path) - self._dataset_api.rm(temp_download_dir) return model_version_path