From 9bb337ca60e107a1477640c185d8ac111c137314 Mon Sep 17 00:00:00 2001 From: Ermias Date: Mon, 9 Oct 2023 14:55:18 +0200 Subject: [PATCH 1/3] first --- python/hopsworks/core/dataset_api.py | 90 +++++++++++++++++++++------- 1 file changed, 68 insertions(+), 22 deletions(-) diff --git a/python/hopsworks/core/dataset_api.py b/python/hopsworks/core/dataset_api.py index abd20a256..d1585bba2 100644 --- a/python/hopsworks/core/dataset_api.py +++ b/python/hopsworks/core/dataset_api.py @@ -16,13 +16,23 @@ import math import os +import time from tqdm.auto import tqdm import shutil import logging +import copy from hopsworks import client from hopsworks.client.exceptions import RestAPIError from hopsworks.client.exceptions import DatasetException +from concurrent.futures import ThreadPoolExecutor, wait + +class Chunk: + def __init__(self, content, number, status): + self.content = content + self.number = number + self.status = status + self.retries = 0 class DatasetApi: @@ -33,7 +43,8 @@ def __init__( self._project_id = project_id self._log = logging.getLogger(__name__) - DEFAULT_FLOW_CHUNK_SIZE = 1048576 + FLOW_PERMANENT_ERRORS = [404, 413, 415, 500, 501] + FLOW_SUCCESS_STATUSES = [200, 201, 202] def download(self, path: str, local_path: str = None, overwrite: bool = False): """Download file from Hopsworks Filesystem to the current working directory. @@ -126,7 +137,7 @@ def download(self, path: str, local_path: str = None, overwrite: bool = False): return local_path - def upload(self, local_path: str, upload_path: str, overwrite: bool = False): + def upload(self, local_path: str, upload_path: str, overwrite: bool = False, chunk_size = 1048576, simultaneous_uploads = 3, max_chunk_retries = 1, chunk_retry_interval = 1): """Upload a file to the Hopsworks filesystem. ```python @@ -144,6 +155,10 @@ def upload(self, local_path: str, upload_path: str, overwrite: bool = False): local_path: local path to file to upload upload_path: path to directory where to upload the file in Hopsworks Filesystem overwrite: overwrite file if exists + chunk_size: upload chunk size in bytes. Default 1048576 bytes + simultaneous_uploads: number of simultaneous chunks to upload. Default 3 + max_chunk_retries: maximum retry for a chunk. Default is 1 + chunk_retry_interval: chunk retry interval in seconds. Default is 1sec # Returns `str`: Path to uploaded file # Raises @@ -171,9 +186,9 @@ def upload(self, local_path: str, upload_path: str, overwrite: bool = False): ) ) - num_chunks = math.ceil(file_size / self.DEFAULT_FLOW_CHUNK_SIZE) + num_chunks = math.ceil(file_size / chunk_size) - base_params = self._get_flow_base_params(file_name, num_chunks, file_size) + base_params = self._get_flow_base_params(file_name, num_chunks, file_size, chunk_size) chunk_number = 1 with open(local_path, "rb") as f: @@ -187,21 +202,29 @@ def upload(self, local_path: str, upload_path: str, overwrite: bool = False): except Exception: self._log.exception("Failed to initialize progress bar.") self._log.info("Starting upload") - - while True: - chunk = f.read(self.DEFAULT_FLOW_CHUNK_SIZE) - if not chunk: - break - - query_params = base_params - query_params["flowCurrentChunkSize"] = len(chunk) - query_params["flowChunkNumber"] = chunk_number - - self._upload_request(query_params, upload_path, file_name, chunk) - chunk_number += 1 - - if pbar is not None: - pbar.update(query_params["flowCurrentChunkSize"]) + with ThreadPoolExecutor(simultaneous_uploads) as executor: + while True: + chunks = [] + for _ in range(simultaneous_uploads): + chunk = f.read(chunk_size) + if not chunk: + break + chunks.append(Chunk(chunk, chunk_number, 'pending')) + chunk_number += 1 + + if len(chunks) == 0: + break + + # upload each chunk and update pbar + futures = [executor.submit(self._upload_chunk, base_params, upload_path, file_name, chunk, pbar, max_chunk_retries, chunk_retry_interval) for chunk in chunks] + # wait for all upload tasks to complete + _, _ = wait(futures) + try: + _ = [future.result() for future in futures] + except Exception as e: + if pbar is not None: + pbar.close() + raise e if pbar is not None: pbar.close() @@ -209,11 +232,34 @@ def upload(self, local_path: str, upload_path: str, overwrite: bool = False): self._log.info("Upload finished") return upload_path + "/" + os.path.basename(local_path) - - def _get_flow_base_params(self, file_name, num_chunks, size): + + def _upload_chunk(self, base_params, upload_path, file_name, chunk: Chunk, pbar, max_chunk_retries, chunk_retry_interval): + query_params = copy.copy(base_params) + query_params["flowCurrentChunkSize"] = len(chunk.content) + query_params["flowChunkNumber"] = chunk.number + + chunk.status = 'uploading' + while True: + try: + self._upload_request(query_params, upload_path, file_name, chunk.content) + break + except RestAPIError as re: + chunk.retries +=1 + if re.response.status_code in DatasetApi.FLOW_PERMANENT_ERRORS or chunk.retries > max_chunk_retries: + chunk.status = 'failed' + raise re + time.sleep(chunk_retry_interval) + continue + + chunk.status = 'uploaded' + + if pbar is not None: + pbar.update(query_params["flowCurrentChunkSize"]) + + def _get_flow_base_params(self, file_name, num_chunks, size, chunk_size): return { "templateId": -1, - "flowChunkSize": self.DEFAULT_FLOW_CHUNK_SIZE, + "flowChunkSize": chunk_size, "flowTotalSize": size, "flowIdentifier": str(size) + "_" + file_name, "flowFilename": file_name, From cdf47a1b8623a59e535517835f70fcd266317771 Mon Sep 17 00:00:00 2001 From: Ermias Date: Mon, 9 Oct 2023 14:57:45 +0200 Subject: [PATCH 2/3] remove unused --- python/hopsworks/core/dataset_api.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/hopsworks/core/dataset_api.py b/python/hopsworks/core/dataset_api.py index d1585bba2..09e1ed50b 100644 --- a/python/hopsworks/core/dataset_api.py +++ b/python/hopsworks/core/dataset_api.py @@ -44,7 +44,6 @@ def __init__( self._log = logging.getLogger(__name__) FLOW_PERMANENT_ERRORS = [404, 413, 415, 500, 501] - FLOW_SUCCESS_STATUSES = [200, 201, 202] def download(self, path: str, local_path: str = None, overwrite: bool = False): """Download file from Hopsworks Filesystem to the current working directory. From 8681cd105dff61028198998399c5c7e9f65b2cc9 Mon Sep 17 00:00:00 2001 From: Ermias Date: Mon, 9 Oct 2023 16:06:11 +0200 Subject: [PATCH 3/3] format --- python/hopsworks/core/dataset_api.py | 70 +++++++++++++++++++++------- 1 file changed, 54 insertions(+), 16 deletions(-) diff --git a/python/hopsworks/core/dataset_api.py b/python/hopsworks/core/dataset_api.py index 09e1ed50b..4893e2dc1 100644 --- a/python/hopsworks/core/dataset_api.py +++ b/python/hopsworks/core/dataset_api.py @@ -27,6 +27,7 @@ from hopsworks.client.exceptions import DatasetException from concurrent.futures import ThreadPoolExecutor, wait + class Chunk: def __init__(self, content, number, status): self.content = content @@ -136,7 +137,16 @@ def download(self, path: str, local_path: str = None, overwrite: bool = False): return local_path - def upload(self, local_path: str, upload_path: str, overwrite: bool = False, chunk_size = 1048576, simultaneous_uploads = 3, max_chunk_retries = 1, chunk_retry_interval = 1): + def upload( + self, + local_path: str, + upload_path: str, + overwrite: bool = False, + chunk_size=1048576, + simultaneous_uploads=3, + max_chunk_retries=1, + chunk_retry_interval=1, + ): """Upload a file to the Hopsworks filesystem. ```python @@ -157,7 +167,7 @@ def upload(self, local_path: str, upload_path: str, overwrite: bool = False, chu chunk_size: upload chunk size in bytes. Default 1048576 bytes simultaneous_uploads: number of simultaneous chunks to upload. Default 3 max_chunk_retries: maximum retry for a chunk. Default is 1 - chunk_retry_interval: chunk retry interval in seconds. Default is 1sec + chunk_retry_interval: chunk retry interval in seconds. Default is 1sec # Returns `str`: Path to uploaded file # Raises @@ -187,7 +197,9 @@ def upload(self, local_path: str, upload_path: str, overwrite: bool = False, chu num_chunks = math.ceil(file_size / chunk_size) - base_params = self._get_flow_base_params(file_name, num_chunks, file_size, chunk_size) + base_params = self._get_flow_base_params( + file_name, num_chunks, file_size, chunk_size + ) chunk_number = 1 with open(local_path, "rb") as f: @@ -208,14 +220,26 @@ def upload(self, local_path: str, upload_path: str, overwrite: bool = False, chu chunk = f.read(chunk_size) if not chunk: break - chunks.append(Chunk(chunk, chunk_number, 'pending')) + chunks.append(Chunk(chunk, chunk_number, "pending")) chunk_number += 1 if len(chunks) == 0: break # upload each chunk and update pbar - futures = [executor.submit(self._upload_chunk, base_params, upload_path, file_name, chunk, pbar, max_chunk_retries, chunk_retry_interval) for chunk in chunks] + futures = [ + executor.submit( + self._upload_chunk, + base_params, + upload_path, + file_name, + chunk, + pbar, + max_chunk_retries, + chunk_retry_interval, + ) + for chunk in chunks + ] # wait for all upload tasks to complete _, _ = wait(futures) try: @@ -231,26 +255,40 @@ def upload(self, local_path: str, upload_path: str, overwrite: bool = False, chu self._log.info("Upload finished") return upload_path + "/" + os.path.basename(local_path) - - def _upload_chunk(self, base_params, upload_path, file_name, chunk: Chunk, pbar, max_chunk_retries, chunk_retry_interval): + + def _upload_chunk( + self, + base_params, + upload_path, + file_name, + chunk: Chunk, + pbar, + max_chunk_retries, + chunk_retry_interval, + ): query_params = copy.copy(base_params) query_params["flowCurrentChunkSize"] = len(chunk.content) query_params["flowChunkNumber"] = chunk.number - - chunk.status = 'uploading' + + chunk.status = "uploading" while True: try: - self._upload_request(query_params, upload_path, file_name, chunk.content) + self._upload_request( + query_params, upload_path, file_name, chunk.content + ) break except RestAPIError as re: - chunk.retries +=1 - if re.response.status_code in DatasetApi.FLOW_PERMANENT_ERRORS or chunk.retries > max_chunk_retries: - chunk.status = 'failed' - raise re + chunk.retries += 1 + if ( + re.response.status_code in DatasetApi.FLOW_PERMANENT_ERRORS + or chunk.retries > max_chunk_retries + ): + chunk.status = "failed" + raise re time.sleep(chunk_retry_interval) continue - - chunk.status = 'uploaded' + + chunk.status = "uploaded" if pbar is not None: pbar.update(query_params["flowCurrentChunkSize"])