Skip to content

Commit

Permalink
[fix] correctly retrieve zip file when uploading from an unzipped pex (
Browse files Browse the repository at this point in the history
…#121)

* [fix] correctly retrieve zip file when uploading from an unzipped pex

* remove corrupted file on remote fs if upload failed

And fix upload log
  • Loading branch information
jcuquemelle committed Sep 2, 2024
1 parent 0ef91c9 commit 87034aa
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 14 deletions.
13 changes: 9 additions & 4 deletions cluster_pack/packaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,14 +399,19 @@ def detect_archive_names(
and os.path.isdir(pex_file)
and not package_path.endswith('.zip')):

pex_files = glob.glob(f"{os.path.dirname(pex_file)}/*.pex.zip")
assert len(pex_files) == 1, \
f"Expected to find single zipped PEX in same dir as {pex_file}, got {pex_files}"
package_path = _build_package_path(os.path.basename(pex_files[0]), None)
zip_pex_file = resolve_zip_from_pex_dir(pex_file)
package_path = _build_package_path(os.path.basename(zip_pex_file), None)

return package_path, env_name, pex_file


def resolve_zip_from_pex_dir(pex_dir: str) -> str:
pex_files = glob.glob(f"{os.path.dirname(pex_dir)}/*.pex.zip")
assert len(pex_files) == 1, \
f"Expected to find single zipped PEX in same dir as {pex_dir}, got {pex_files}"
return pex_files[0]


def detect_packer_from_spec(spec_file: str) -> Packer:
if os.path.basename(spec_file) == "requirements.txt":
return PEX_PACKER
Expand Down
30 changes: 20 additions & 10 deletions cluster_pack/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def upload_zip(
request.urlretrieve(zip_file, tmp_zip_file)
zip_file = tmp_zip_file

_upload_zip(zip_file, package_path, resolved_fs, force_upload)
_upload_pex_file(packer, zip_file, package_path, resolved_fs, force_upload)

return package_path

Expand Down Expand Up @@ -167,7 +167,7 @@ def upload_env(
additional_indexes=additional_indexes
)
else:
_upload_zip(pex_file, package_path, resolved_fs, force_upload)
_upload_pex_file(packer, pex_file, package_path, resolved_fs, force_upload)

return (package_path,
env_name)
Expand Down Expand Up @@ -244,28 +244,38 @@ def _get_hash(spec_file: str) -> str:
return hashlib.sha1(f.read().encode()).hexdigest()


def _upload_zip(
zip_file: str, package_path: str,
def _upload_pex_file(
packer: packaging.Packer,
pex_file_or_dir: str, package_path: str,
resolved_fs: Any = None, force_upload: bool = False
) -> None:
packer = packaging.detect_packer_from_file(zip_file)
if packer == packaging.PEX_PACKER and resolved_fs.exists(package_path):
with tempfile.TemporaryDirectory() as tempdir:
local_copy_path = os.path.join(tempdir, os.path.basename(package_path))
resolved_fs.get(package_path, local_copy_path)
info_from_storage = PexInfo.from_pex(local_copy_path)
info_to_upload = PexInfo.from_pex(zip_file)
info_to_upload = PexInfo.from_pex(pex_file_or_dir)
if not force_upload and info_from_storage.code_hash == info_to_upload.code_hash:
_logger.info(f"skip upload of current {zip_file}"
_logger.info(f"skip upload of current {pex_file_or_dir}"
f" as it is already uploaded on {package_path}")
return

_logger.info(f"upload current {zip_file} to {package_path}")

dir = os.path.dirname(package_path)
if not resolved_fs.exists(dir):
resolved_fs.mkdir(dir)
resolved_fs.put(zip_file, package_path)

pex_file = (
packaging.resolve_zip_from_pex_dir(pex_file_or_dir) if os.path.isdir(pex_file_or_dir)
else pex_file_or_dir
)
_logger.info(f"upload current {pex_file} to {package_path}")

try:
resolved_fs.put(pex_file, package_path)
except OSError:
if resolved_fs.exists(package_path):
resolved_fs.rm(package_path)

# Remove previous metadata
archive_meta_data = _get_archive_metadata_path(package_path)
if resolved_fs.exists(archive_meta_data):
Expand Down

0 comments on commit 87034aa

Please sign in to comment.