Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HWORKS-565] parallel upload #174

Merged
merged 3 commits into from
Oct 12, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 104 additions & 21 deletions python/hopsworks/core/dataset_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,24 @@

import math
import os
import time
from tqdm.auto import tqdm
import shutil
import logging
import copy

from hopsworks import client
from hopsworks.client.exceptions import RestAPIError
from hopsworks.client.exceptions import DatasetException
from concurrent.futures import ThreadPoolExecutor, wait


class Chunk:
def __init__(self, content, number, status):
self.content = content
self.number = number
self.status = status
self.retries = 0


class DatasetApi:
Expand All @@ -33,7 +44,7 @@ def __init__(
self._project_id = project_id
self._log = logging.getLogger(__name__)

DEFAULT_FLOW_CHUNK_SIZE = 1048576
FLOW_PERMANENT_ERRORS = [404, 413, 415, 500, 501]

def download(self, path: str, local_path: str = None, overwrite: bool = False):
"""Download file from Hopsworks Filesystem to the current working directory.
Expand Down Expand Up @@ -126,7 +137,16 @@ def download(self, path: str, local_path: str = None, overwrite: bool = False):

return local_path

def upload(self, local_path: str, upload_path: str, overwrite: bool = False):
def upload(
self,
local_path: str,
upload_path: str,
overwrite: bool = False,
chunk_size=1048576,
simultaneous_uploads=3,
max_chunk_retries=1,
chunk_retry_interval=1,
):
"""Upload a file to the Hopsworks filesystem.

```python
Expand All @@ -144,6 +164,10 @@ def upload(self, local_path: str, upload_path: str, overwrite: bool = False):
local_path: local path to file to upload
upload_path: path to directory where to upload the file in Hopsworks Filesystem
overwrite: overwrite file if exists
chunk_size: upload chunk size in bytes. Default 1048576 bytes
simultaneous_uploads: number of simultaneous chunks to upload. Default 3
max_chunk_retries: maximum retry for a chunk. Default is 1
chunk_retry_interval: chunk retry interval in seconds. Default is 1sec
# Returns
`str`: Path to uploaded file
# Raises
Expand Down Expand Up @@ -171,9 +195,11 @@ def upload(self, local_path: str, upload_path: str, overwrite: bool = False):
)
)

num_chunks = math.ceil(file_size / self.DEFAULT_FLOW_CHUNK_SIZE)
num_chunks = math.ceil(file_size / chunk_size)

base_params = self._get_flow_base_params(file_name, num_chunks, file_size)
base_params = self._get_flow_base_params(
file_name, num_chunks, file_size, chunk_size
)

chunk_number = 1
with open(local_path, "rb") as f:
Expand All @@ -187,21 +213,41 @@ def upload(self, local_path: str, upload_path: str, overwrite: bool = False):
except Exception:
self._log.exception("Failed to initialize progress bar.")
self._log.info("Starting upload")

while True:
chunk = f.read(self.DEFAULT_FLOW_CHUNK_SIZE)
if not chunk:
break

query_params = base_params
query_params["flowCurrentChunkSize"] = len(chunk)
query_params["flowChunkNumber"] = chunk_number

self._upload_request(query_params, upload_path, file_name, chunk)
chunk_number += 1

if pbar is not None:
pbar.update(query_params["flowCurrentChunkSize"])
with ThreadPoolExecutor(simultaneous_uploads) as executor:
while True:
chunks = []
for _ in range(simultaneous_uploads):
chunk = f.read(chunk_size)
if not chunk:
break
chunks.append(Chunk(chunk, chunk_number, "pending"))
chunk_number += 1

if len(chunks) == 0:
break

# upload each chunk and update pbar
futures = [
executor.submit(
self._upload_chunk,
base_params,
upload_path,
file_name,
chunk,
pbar,
max_chunk_retries,
chunk_retry_interval,
)
for chunk in chunks
]
# wait for all upload tasks to complete
_, _ = wait(futures)
try:
_ = [future.result() for future in futures]
except Exception as e:
if pbar is not None:
pbar.close()
raise e

if pbar is not None:
pbar.close()
Expand All @@ -210,10 +256,47 @@ def upload(self, local_path: str, upload_path: str, overwrite: bool = False):

return upload_path + "/" + os.path.basename(local_path)

def _get_flow_base_params(self, file_name, num_chunks, size):
def _upload_chunk(
self,
base_params,
upload_path,
file_name,
chunk: Chunk,
pbar,
max_chunk_retries,
chunk_retry_interval,
):
query_params = copy.copy(base_params)
query_params["flowCurrentChunkSize"] = len(chunk.content)
query_params["flowChunkNumber"] = chunk.number

chunk.status = "uploading"
while True:
try:
self._upload_request(
query_params, upload_path, file_name, chunk.content
)
break
except RestAPIError as re:
chunk.retries += 1
if (
re.response.status_code in DatasetApi.FLOW_PERMANENT_ERRORS
or chunk.retries > max_chunk_retries
):
chunk.status = "failed"
raise re
time.sleep(chunk_retry_interval)
continue

chunk.status = "uploaded"

if pbar is not None:
pbar.update(query_params["flowCurrentChunkSize"])

def _get_flow_base_params(self, file_name, num_chunks, size, chunk_size):
return {
"templateId": -1,
"flowChunkSize": self.DEFAULT_FLOW_CHUNK_SIZE,
"flowChunkSize": chunk_size,
"flowTotalSize": size,
"flowIdentifier": str(size) + "_" + file_name,
"flowFilename": file_name,
Expand Down
Loading