Skip to content

Commit

Permalink
Update script
Browse files Browse the repository at this point in the history
  • Loading branch information
miklinux committed Sep 28, 2023
1 parent 979658e commit 5c95ee8
Showing 1 changed file with 73 additions and 20 deletions.
93 changes: 73 additions & 20 deletions delete-package-versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,29 @@ def base64_encode(value):
def urlencode(value):
return urllib.parse.quote(str(value), safe='')

# Pretty print response errors
def response_errors(response):
status_code = response.status_code
try:
data = response.json()
if 'errors' in data:
for error in data.get('errors', []):
print(f"-> {error['code']}: {error['message']} (HTTP Status {status_code})")
elif 'message' in data:
print(f"!! {data['message']} (HTTP Status {status_code})")
except json.decoder.JSONDecodeError as e:
print(f"!! Unknown error (HTTP Status {status_code})")
print(response.content.decode())

# Perform an API call to the ghcr.io registry requesting the
# manifest for the given SHA digest to find out all versions
# which are related to the multi-arch manifest.
def get_children_manifests(sha):
# multi-arch manifest for the given SHA digest to find out
# all versions which are related to it.
def get_children_versions(sha):
response = requests.get(
f"https://ghcr.io/v2/{PACKAGE_OWNER}/{PACKAGE_NAME}/manifests/{sha}",
# The response Content-Type header will change when the manifest
# contains OCI annotations so it's mandatory to send the correct
# Accept headers.
headers={
"Accept": ','.join([
'application/vnd.docker.distribution.manifest.v2+json',
Expand All @@ -48,10 +65,12 @@ def get_children_manifests(sha):
)

if response.status_code == 404:
print(f"WARNING: No children packages found for SHA {sha}")
print(f"WARNING: No children package versions found for {sha}")
response_errors(response)
return []
elif response.status_code != 200:
print(f"Failed to fetch manifest with HTTP status {response.status_code}")
print(f"ERROR: Failed to get children package versions for {sha}")
response_errors(response)
sys.exit(1)

result = []
Expand Down Expand Up @@ -85,9 +104,11 @@ def get_all_versions():
)

if response.status_code == 404:
# Package not found
return []
elif response.status_code != 200:
print(f"Failed to fetch package versions with HTTP status {response.status_code}")
print(f"ERROR: Failed to fetch all package versions")
response_errors(response)
sys.exit(1)

# Store the value into cache
Expand All @@ -103,10 +124,10 @@ def get_all_versions_tagged():

if len(all_versions_tagged) == 0:
for version in get_all_versions():
tags = version['metadata']['container']['tags']
tags = get_version_tag(version)
if len(tags) > 0:
tagged_version = version
tagged_version['children'] = get_children_manifests( version['name'] )
tagged_version['children'] = get_children_versions( version['name'] )
all_versions_tagged.append(tagged_version)

# Store the value into cache
Expand All @@ -127,8 +148,8 @@ def get_versions_to_delete():
# do not have any associated tag.
def get_orphan_versions():
diff_version_ids = list(
set(extract_ids(get_all_versions())) -
set(extract_ids(get_all_versions_tagged()))
set(extract_version_ids(get_all_versions())) -
set(extract_version_ids(get_all_versions_tagged()))
)
result = []
for diff_version_id in diff_version_ids:
Expand All @@ -141,26 +162,28 @@ def get_orphan_versions():

# Extract all 'id' fields from the given data and return
# them as a list
def extract_ids(data):
def extract_version_ids(data):
ids = []
if isinstance(data, dict):
if "id" in data:
ids.append(data["id"])
for key, value in data.items():
ids.extend(extract_ids(value))
ids.extend(extract_version_ids(value))
elif isinstance(data, list):
for item in data:
ids.extend(extract_ids(item))
ids.extend(extract_version_ids(item))

return sorted(ids)

# Shortcut for getting a specific tag for a package
def get_version_tag(version, index = -1):
return version['metadata']['container']['tags'][index]
def get_version_tag(version):
return ", ".join(
version['metadata']['container']['tags']
)

# Perform the API call to delete a version
def delete_version(version_id):
# Make to function return true if DRY_RUN
# Make to return true if DRY_RUN
if DRY_RUN:
return True

Expand All @@ -176,7 +199,30 @@ def delete_version(version_id):
if response.status_code == 204:
return True
else:
print(f"!! Failed to delete version {version_id} (HTTP status {response.status_code})")
print(f"!! Failed to delete version {version_id}")
response_errors(response)
return False

# Perform the API call to delete a package
def delete_package():
# Make sure to return true if DRY_RUN
if DRY_RUN:
return True

package_name = urlencode(PACKAGE_NAME)
response = requests.delete(
f"https://api.github.com/orgs/{PACKAGE_OWNER}/packages/container/{package_name}",
headers={
"Accept": "application/vnd.github+json",
"Authorization": f"Bearer {TOKEN}"
}
)

if response.status_code == 204:
return True
else:
print(f"!! Failed to delete package {PACKAGE_OWNER}/{PACKAGE_NAME}")
response_errors(response)
return False

# Main script logic
Expand Down Expand Up @@ -209,10 +255,17 @@ def main():
keep_version['name'],
keep_version_tag,
))
elif num_keep_versions == 0 and num_delete_versions > 0:
print("ALL existing package versions will be deleted ...")
else:
print("There are no package versions to keep")
# GitHub will prevent deleting all versions for a package, returning 400
# when attempting to remove the last version, so a full package removal
# is required.
if num_delete_versions > 0:
print(f"Deleting package {PACKAGE_NAME} with ALL its versions!")
delete_package()
# Nothing left to do since the package has been deleted.
# We can safely skip all the rest.
sys.exit(0)

# Process package versions to delete
if num_delete_versions > 0:
Expand Down Expand Up @@ -243,7 +296,7 @@ def main():
orphan_versions = get_orphan_versions()
num_orphan_versions = len(orphan_versions)
if num_orphan_versions > 0:
print(f"Deleting {num_orphan_versions} orphan container manifests(s) ...")
print(f"Deleting {num_orphan_versions} orphan container package version(s) ...")
for orphan_version in orphan_versions:
if delete_version( orphan_version['id'] ):
print(f" %-10s : %s" % (
Expand Down

0 comments on commit 5c95ee8

Please sign in to comment.