diff --git a/python/hopsworks/core/dataset_api.py b/python/hopsworks/core/dataset_api.py index abd20a256..4893e2dc1 100644 --- a/python/hopsworks/core/dataset_api.py +++ b/python/hopsworks/core/dataset_api.py @@ -16,13 +16,24 @@ import math import os +import time from tqdm.auto import tqdm import shutil import logging +import copy from hopsworks import client from hopsworks.client.exceptions import RestAPIError from hopsworks.client.exceptions import DatasetException +from concurrent.futures import ThreadPoolExecutor, wait + + +class Chunk: + def __init__(self, content, number, status): + self.content = content + self.number = number + self.status = status + self.retries = 0 class DatasetApi: @@ -33,7 +44,7 @@ def __init__( self._project_id = project_id self._log = logging.getLogger(__name__) - DEFAULT_FLOW_CHUNK_SIZE = 1048576 + FLOW_PERMANENT_ERRORS = [404, 413, 415, 500, 501] def download(self, path: str, local_path: str = None, overwrite: bool = False): """Download file from Hopsworks Filesystem to the current working directory. @@ -126,7 +137,16 @@ def download(self, path: str, local_path: str = None, overwrite: bool = False): return local_path - def upload(self, local_path: str, upload_path: str, overwrite: bool = False): + def upload( + self, + local_path: str, + upload_path: str, + overwrite: bool = False, + chunk_size=1048576, + simultaneous_uploads=3, + max_chunk_retries=1, + chunk_retry_interval=1, + ): """Upload a file to the Hopsworks filesystem. ```python @@ -144,6 +164,10 @@ def upload(self, local_path: str, upload_path: str, overwrite: bool = False): local_path: local path to file to upload upload_path: path to directory where to upload the file in Hopsworks Filesystem overwrite: overwrite file if exists + chunk_size: upload chunk size in bytes. Default 1048576 bytes + simultaneous_uploads: number of simultaneous chunks to upload. Default 3 + max_chunk_retries: maximum retry for a chunk. Default is 1 + chunk_retry_interval: chunk retry interval in seconds. Default is 1sec # Returns `str`: Path to uploaded file # Raises @@ -171,9 +195,11 @@ def upload(self, local_path: str, upload_path: str, overwrite: bool = False): ) ) - num_chunks = math.ceil(file_size / self.DEFAULT_FLOW_CHUNK_SIZE) + num_chunks = math.ceil(file_size / chunk_size) - base_params = self._get_flow_base_params(file_name, num_chunks, file_size) + base_params = self._get_flow_base_params( + file_name, num_chunks, file_size, chunk_size + ) chunk_number = 1 with open(local_path, "rb") as f: @@ -187,21 +213,41 @@ def upload(self, local_path: str, upload_path: str, overwrite: bool = False): except Exception: self._log.exception("Failed to initialize progress bar.") self._log.info("Starting upload") - - while True: - chunk = f.read(self.DEFAULT_FLOW_CHUNK_SIZE) - if not chunk: - break - - query_params = base_params - query_params["flowCurrentChunkSize"] = len(chunk) - query_params["flowChunkNumber"] = chunk_number - - self._upload_request(query_params, upload_path, file_name, chunk) - chunk_number += 1 - - if pbar is not None: - pbar.update(query_params["flowCurrentChunkSize"]) + with ThreadPoolExecutor(simultaneous_uploads) as executor: + while True: + chunks = [] + for _ in range(simultaneous_uploads): + chunk = f.read(chunk_size) + if not chunk: + break + chunks.append(Chunk(chunk, chunk_number, "pending")) + chunk_number += 1 + + if len(chunks) == 0: + break + + # upload each chunk and update pbar + futures = [ + executor.submit( + self._upload_chunk, + base_params, + upload_path, + file_name, + chunk, + pbar, + max_chunk_retries, + chunk_retry_interval, + ) + for chunk in chunks + ] + # wait for all upload tasks to complete + _, _ = wait(futures) + try: + _ = [future.result() for future in futures] + except Exception as e: + if pbar is not None: + pbar.close() + raise e if pbar is not None: pbar.close() @@ -210,10 +256,47 @@ def upload(self, local_path: str, upload_path: str, overwrite: bool = False): return upload_path + "/" + os.path.basename(local_path) - def _get_flow_base_params(self, file_name, num_chunks, size): + def _upload_chunk( + self, + base_params, + upload_path, + file_name, + chunk: Chunk, + pbar, + max_chunk_retries, + chunk_retry_interval, + ): + query_params = copy.copy(base_params) + query_params["flowCurrentChunkSize"] = len(chunk.content) + query_params["flowChunkNumber"] = chunk.number + + chunk.status = "uploading" + while True: + try: + self._upload_request( + query_params, upload_path, file_name, chunk.content + ) + break + except RestAPIError as re: + chunk.retries += 1 + if ( + re.response.status_code in DatasetApi.FLOW_PERMANENT_ERRORS + or chunk.retries > max_chunk_retries + ): + chunk.status = "failed" + raise re + time.sleep(chunk_retry_interval) + continue + + chunk.status = "uploaded" + + if pbar is not None: + pbar.update(query_params["flowCurrentChunkSize"]) + + def _get_flow_base_params(self, file_name, num_chunks, size, chunk_size): return { "templateId": -1, - "flowChunkSize": self.DEFAULT_FLOW_CHUNK_SIZE, + "flowChunkSize": chunk_size, "flowTotalSize": size, "flowIdentifier": str(size) + "_" + file_name, "flowFilename": file_name,