From 87034aa0e8ee072667cdb943576be71f3aba03c1 Mon Sep 17 00:00:00 2001 From: Julien Cuquemelle Date: Fri, 15 Mar 2024 15:04:46 +0100 Subject: [PATCH] [fix] correctly retrieve zip file when uploading from an unzipped pex (#121) * [fix] correctly retrieve zip file when uploading from an unzipped pex * remove corrupted file on remote fs if upload failed And fix upload log --- cluster_pack/packaging.py | 13 +++++++++---- cluster_pack/uploader.py | 30 ++++++++++++++++++++---------- 2 files changed, 29 insertions(+), 14 deletions(-) diff --git a/cluster_pack/packaging.py b/cluster_pack/packaging.py index 002a314..67f0409 100644 --- a/cluster_pack/packaging.py +++ b/cluster_pack/packaging.py @@ -399,14 +399,19 @@ def detect_archive_names( and os.path.isdir(pex_file) and not package_path.endswith('.zip')): - pex_files = glob.glob(f"{os.path.dirname(pex_file)}/*.pex.zip") - assert len(pex_files) == 1, \ - f"Expected to find single zipped PEX in same dir as {pex_file}, got {pex_files}" - package_path = _build_package_path(os.path.basename(pex_files[0]), None) + zip_pex_file = resolve_zip_from_pex_dir(pex_file) + package_path = _build_package_path(os.path.basename(zip_pex_file), None) return package_path, env_name, pex_file +def resolve_zip_from_pex_dir(pex_dir: str) -> str: + pex_files = glob.glob(f"{os.path.dirname(pex_dir)}/*.pex.zip") + assert len(pex_files) == 1, \ + f"Expected to find single zipped PEX in same dir as {pex_dir}, got {pex_files}" + return pex_files[0] + + def detect_packer_from_spec(spec_file: str) -> Packer: if os.path.basename(spec_file) == "requirements.txt": return PEX_PACKER diff --git a/cluster_pack/uploader.py b/cluster_pack/uploader.py index 7b73b48..0c06d8c 100644 --- a/cluster_pack/uploader.py +++ b/cluster_pack/uploader.py @@ -109,7 +109,7 @@ def upload_zip( request.urlretrieve(zip_file, tmp_zip_file) zip_file = tmp_zip_file - _upload_zip(zip_file, package_path, resolved_fs, force_upload) + _upload_pex_file(packer, zip_file, package_path, resolved_fs, force_upload) return package_path @@ -167,7 +167,7 @@ def upload_env( additional_indexes=additional_indexes ) else: - _upload_zip(pex_file, package_path, resolved_fs, force_upload) + _upload_pex_file(packer, pex_file, package_path, resolved_fs, force_upload) return (package_path, env_name) @@ -244,28 +244,38 @@ def _get_hash(spec_file: str) -> str: return hashlib.sha1(f.read().encode()).hexdigest() -def _upload_zip( - zip_file: str, package_path: str, +def _upload_pex_file( + packer: packaging.Packer, + pex_file_or_dir: str, package_path: str, resolved_fs: Any = None, force_upload: bool = False ) -> None: - packer = packaging.detect_packer_from_file(zip_file) if packer == packaging.PEX_PACKER and resolved_fs.exists(package_path): with tempfile.TemporaryDirectory() as tempdir: local_copy_path = os.path.join(tempdir, os.path.basename(package_path)) resolved_fs.get(package_path, local_copy_path) info_from_storage = PexInfo.from_pex(local_copy_path) - info_to_upload = PexInfo.from_pex(zip_file) + info_to_upload = PexInfo.from_pex(pex_file_or_dir) if not force_upload and info_from_storage.code_hash == info_to_upload.code_hash: - _logger.info(f"skip upload of current {zip_file}" + _logger.info(f"skip upload of current {pex_file_or_dir}" f" as it is already uploaded on {package_path}") return - _logger.info(f"upload current {zip_file} to {package_path}") - dir = os.path.dirname(package_path) if not resolved_fs.exists(dir): resolved_fs.mkdir(dir) - resolved_fs.put(zip_file, package_path) + + pex_file = ( + packaging.resolve_zip_from_pex_dir(pex_file_or_dir) if os.path.isdir(pex_file_or_dir) + else pex_file_or_dir + ) + _logger.info(f"upload current {pex_file} to {package_path}") + + try: + resolved_fs.put(pex_file, package_path) + except OSError: + if resolved_fs.exists(package_path): + resolved_fs.rm(package_path) + # Remove previous metadata archive_meta_data = _get_archive_metadata_path(package_path) if resolved_fs.exists(archive_meta_data):