Skip to content

Commit

Permalink
[HWORKS-565] parallel upload (#174)
Browse files Browse the repository at this point in the history
  • Loading branch information
ErmiasG authored Oct 12, 2023
1 parent b6ee34c commit c27e4be
Showing 1 changed file with 104 additions and 21 deletions.
125 changes: 104 additions & 21 deletions python/hopsworks/core/dataset_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,24 @@

import math
import os
import time
from tqdm.auto import tqdm
import shutil
import logging
import copy

from hopsworks import client
from hopsworks.client.exceptions import RestAPIError
from hopsworks.client.exceptions import DatasetException
from concurrent.futures import ThreadPoolExecutor, wait


class Chunk:
def __init__(self, content, number, status):
self.content = content
self.number = number
self.status = status
self.retries = 0


class DatasetApi:
Expand All @@ -33,7 +44,7 @@ def __init__(
self._project_id = project_id
self._log = logging.getLogger(__name__)

DEFAULT_FLOW_CHUNK_SIZE = 1048576
FLOW_PERMANENT_ERRORS = [404, 413, 415, 500, 501]

def download(self, path: str, local_path: str = None, overwrite: bool = False):
"""Download file from Hopsworks Filesystem to the current working directory.
Expand Down Expand Up @@ -126,7 +137,16 @@ def download(self, path: str, local_path: str = None, overwrite: bool = False):

return local_path

def upload(self, local_path: str, upload_path: str, overwrite: bool = False):
def upload(
self,
local_path: str,
upload_path: str,
overwrite: bool = False,
chunk_size=1048576,
simultaneous_uploads=3,
max_chunk_retries=1,
chunk_retry_interval=1,
):
"""Upload a file to the Hopsworks filesystem.
```python
Expand All @@ -144,6 +164,10 @@ def upload(self, local_path: str, upload_path: str, overwrite: bool = False):
local_path: local path to file to upload
upload_path: path to directory where to upload the file in Hopsworks Filesystem
overwrite: overwrite file if exists
chunk_size: upload chunk size in bytes. Default 1048576 bytes
simultaneous_uploads: number of simultaneous chunks to upload. Default 3
max_chunk_retries: maximum retry for a chunk. Default is 1
chunk_retry_interval: chunk retry interval in seconds. Default is 1sec
# Returns
`str`: Path to uploaded file
# Raises
Expand Down Expand Up @@ -171,9 +195,11 @@ def upload(self, local_path: str, upload_path: str, overwrite: bool = False):
)
)

num_chunks = math.ceil(file_size / self.DEFAULT_FLOW_CHUNK_SIZE)
num_chunks = math.ceil(file_size / chunk_size)

base_params = self._get_flow_base_params(file_name, num_chunks, file_size)
base_params = self._get_flow_base_params(
file_name, num_chunks, file_size, chunk_size
)

chunk_number = 1
with open(local_path, "rb") as f:
Expand All @@ -187,21 +213,41 @@ def upload(self, local_path: str, upload_path: str, overwrite: bool = False):
except Exception:
self._log.exception("Failed to initialize progress bar.")
self._log.info("Starting upload")

while True:
chunk = f.read(self.DEFAULT_FLOW_CHUNK_SIZE)
if not chunk:
break

query_params = base_params
query_params["flowCurrentChunkSize"] = len(chunk)
query_params["flowChunkNumber"] = chunk_number

self._upload_request(query_params, upload_path, file_name, chunk)
chunk_number += 1

if pbar is not None:
pbar.update(query_params["flowCurrentChunkSize"])
with ThreadPoolExecutor(simultaneous_uploads) as executor:
while True:
chunks = []
for _ in range(simultaneous_uploads):
chunk = f.read(chunk_size)
if not chunk:
break
chunks.append(Chunk(chunk, chunk_number, "pending"))
chunk_number += 1

if len(chunks) == 0:
break

# upload each chunk and update pbar
futures = [
executor.submit(
self._upload_chunk,
base_params,
upload_path,
file_name,
chunk,
pbar,
max_chunk_retries,
chunk_retry_interval,
)
for chunk in chunks
]
# wait for all upload tasks to complete
_, _ = wait(futures)
try:
_ = [future.result() for future in futures]
except Exception as e:
if pbar is not None:
pbar.close()
raise e

if pbar is not None:
pbar.close()
Expand All @@ -210,10 +256,47 @@ def upload(self, local_path: str, upload_path: str, overwrite: bool = False):

return upload_path + "/" + os.path.basename(local_path)

def _get_flow_base_params(self, file_name, num_chunks, size):
def _upload_chunk(
self,
base_params,
upload_path,
file_name,
chunk: Chunk,
pbar,
max_chunk_retries,
chunk_retry_interval,
):
query_params = copy.copy(base_params)
query_params["flowCurrentChunkSize"] = len(chunk.content)
query_params["flowChunkNumber"] = chunk.number

chunk.status = "uploading"
while True:
try:
self._upload_request(
query_params, upload_path, file_name, chunk.content
)
break
except RestAPIError as re:
chunk.retries += 1
if (
re.response.status_code in DatasetApi.FLOW_PERMANENT_ERRORS
or chunk.retries > max_chunk_retries
):
chunk.status = "failed"
raise re
time.sleep(chunk_retry_interval)
continue

chunk.status = "uploaded"

if pbar is not None:
pbar.update(query_params["flowCurrentChunkSize"])

def _get_flow_base_params(self, file_name, num_chunks, size, chunk_size):
return {
"templateId": -1,
"flowChunkSize": self.DEFAULT_FLOW_CHUNK_SIZE,
"flowChunkSize": chunk_size,
"flowTotalSize": size,
"flowIdentifier": str(size) + "_" + file_name,
"flowFilename": file_name,
Expand Down

0 comments on commit c27e4be

Please sign in to comment.