Skip to content

Commit

Permalink
Refactor the Container class
Browse files Browse the repository at this point in the history
Changes to the Container class:

1. Change Container class to normal class

The class originally implemented using dataclass didn't have many of the
advantages of dataclass, but rather made creating instances more
difficult to understand. The reason is a container can have multi-arch
images, most attributions doesn't have one-to-one mapping with a single
image data.

2. Use class method `create_from_images` to create Container instances

It was implemented with instantiating it first then load per-arch data
with `load` which is confusing.

3. Remove `resolve_published`

Just check if a container is published by checking that in all deliver
repositories.

JIRA: CWFHEALTH-2313
  • Loading branch information
qixiang committed Sep 27, 2023
1 parent 63425dd commit 36f7131
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 146 deletions.
213 changes: 92 additions & 121 deletions freshmaker/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,88 +37,86 @@ class ExtraRepoNotConfiguredError(ValueError):
pass


@dataclass
class Container:
# Image NVR
nvr: str
def __new__(cls, *args, **kwargs):
raise TypeError(
f"Instances of {cls.__name__} should not be created using __init__. "
f"Use {cls.__name__}.create_from_images instead."
)

def __init__(self, nvr: str):
# Avoid direct instantiation with `Container(nvr)`, use create_from_images method instead
self.nvr = nvr
self.package = None

self.parent_brew_build = None
self.published = None

# Per-arch data
self.content_sets_by_arch = {}
self.rpms_by_arch = {}

parsed_data: dict = field(repr=False, default_factory=dict)
repositories: list[dict[str, Any]] = field(repr=False, default_factory=list)
parent_brew_build: Optional[str] = field(repr=False, default=None)
published: Optional[bool] = field(repr=False, default=None)
self.parsed_data = None
self.repositories = None

# Content sets by architechure
content_sets_by_arch: dict[str, list[str]] = field(repr=False, default_factory=dict)
# Installed rpms without architechure info
rpms: Optional[list[dict[str, Any]]] = field(repr=False, default=None)
# Store build related metadata from Brew
self.build_metadata = None

# Store ODCS composes
self.compose_sources = None

@classmethod
def load(cls, data: dict[str, Any]):
"""Load container data from given image data"""
container = cls(data["brew"]["build"])

exclude_fields = ["nvr", "content_sets_by_arch", "rpms"]
defined_fields = set(f.name for f in fields(cls) if f.name not in exclude_fields)

container.content_sets_by_arch[data["architecture"]] = data["content_sets"]
rpms = data.get("edges", {}).get("rpm_manifest", {}).get("data", {}).get("rpms", None)
if isinstance(rpms, list):
container.rpms = []
# We don't care about rpm architecture, just keep NVR
for rpm in rpms:
parsed_nvra = kobo.rpmlib.parse_nvra(rpm["nvra"])
nvr = "-".join(
[parsed_nvra["name"], parsed_nvra["version"], parsed_nvra["release"]]
)
parsed_nvra = kobo.rpmlib.parse_nvra(rpm["srpm_nevra"])
srpm_nvr = "-".join(
[parsed_nvra["name"], parsed_nvra["version"], parsed_nvra["release"]]
)
container.rpms.append(
{
"name": rpm["name"],
"nvr": nvr,
"srpm_name": rpm["srpm_name"],
"srpm_nvr": srpm_nvr,
}
)

for name, value in data.items():
if name not in defined_fields:
# Silently ignore unknown fields
continue
setattr(container, name, value)
return container
def create_from_images(cls, images: list[dict[str, Any]]):
"""Create an instance with the provided list of images"""
nvrs = {x["brew"]["build"] for x in images}
if len(nvrs) > 1:
raise RuntimeError(
"Images associated with a container should share the same build NVR. "
f"The following NVRs are found: {', '.join(nvrs)}."
)

nvr = nvrs.pop()
container = super().__new__(cls)
container.__init__(nvr)

# Get the following data from the first image, they're same for all arches
container.package = images[0]["brew"]["package"]
container.parent_brew_build = images[0]["parent_brew_build"]
container.repositories = images[0]["repositories"]

# The `parsed_data` can contain arch-specific data, but what we'll consume later is
# not arch-specific
container.parsed_data = images[0]["parsed_data"]

container.published = any(
any(repo["published"] for repo in img["repositories"]) for img in images
)

# Store per-arch data for content_sets and rpms
for image in images:
arch = image["architecture"]
container.content_sets_by_arch[arch] = image["content_sets"]

rpms = image.get("edges", {}).get("rpm_manifest", {}).get("data", {}).get("rpms", None)
if isinstance(rpms, list):
container.rpms_by_arch[arch] = rpms

@staticmethod
def _convert_rpm(rpm):
"""Convert rpm data to dict of rpm names and nvr"""
parsed_nvra = kobo.rpmlib.parse_nvra(rpm["nvra"])
nvr = "-".join([parsed_nvra["name"], parsed_nvra["version"], parsed_nvra["release"]])
parsed_nvra = kobo.rpmlib.parse_nvra(rpm["srpm_nevra"])
srpm_nvr = "-".join([parsed_nvra["name"], parsed_nvra["version"], parsed_nvra["release"]])
return {
"name": rpm["name"],
"nvr": nvr,
"srpm_name": rpm["srpm_name"],
"srpm_nvr": srpm_nvr,
}
return container

@property
def arches(self) -> list[str]:
"""All supported architectures"""
return list(self.content_sets_by_arch.keys())

def add_arch(self, data: dict[str, Any]) -> None:
"""Update container data to add arch specific data for other arches.
:param dict data: data for an arch specific image
"""
if data["architecture"] not in self.arches:
self.content_sets_by_arch[data["architecture"]] = data["content_sets"]
@property
def rpms(self) -> list[dict]:
"""RPMs for all architectures"""
if not self.rpms_by_arch:
return []

def as_dict(self) -> dict[str, Any]:
return {field.name: getattr(self, field.name) for field in fields(self)}
all_rpms = [rpm for rpms in self.rpms_by_arch.values() for rpm in rpms]
unique_rpms = {tuple(dct.items()) for dct in all_rpms}
return [dict(rpm) for rpm in unique_rpms]

def has_older_rpms(self, rpm_nvrs: list[str]) -> bool:
"""Check if container has any installed rpms is older than the provided NVRs
Expand All @@ -127,16 +125,20 @@ def has_older_rpms(self, rpm_nvrs: list[str]) -> bool:
:return: True if container has older rpms installed than provided NVRs, otherwise False
:rtype: bool
"""
if self.rpms is None:
if not self.rpms:
return False

for rpm in self.rpms:
installed_nvr = kobo.rpmlib.parse_nvr(rpm["nvr"])
if any(
kobo.rpmlib.compare_nvr(installed_nvr, kobo.rpmlib.parse_nvr(nvr)) < 0
for nvr in rpm_nvrs
):
return True
parsed_nvrs = [kobo.rpmlib.parse_nvr(nvr) for nvr in rpm_nvrs]

for installed_rpm in self.rpms:
parsed_installed_nvr = kobo.rpmlib.parse_nvra(installed_rpm["nvra"])

for parsed_nvr in parsed_nvrs:
if parsed_installed_nvr["name"] != parsed_nvr["name"]:
continue
# If any installed rpm has lower NVR, return True
if kobo.rpmlib.compare_nvr(parsed_installed_nvr, parsed_nvr) < 0:
return True
return False

def resolve_build_metadata(self, koji_session: KojiService) -> None:
Expand All @@ -145,6 +147,10 @@ def resolve_build_metadata(self, koji_session: KojiService) -> None:
:param KojiService koji_session: koji session to connect
"""
# This has been populated, skip.
if getattr(self, "build_metadata", None) is not None:
return

self.build_metadata = {}

build = koji_session.get_build(self.nvr)
Expand All @@ -160,14 +166,7 @@ def resolve_build_metadata(self, koji_session: KojiService) -> None:

fs_koji_task_id = build.get("extra", {}).get("filesystem_koji_task_id")
if fs_koji_task_id:
parsed_nvr = kobo.rpmlib.parse_nvr(self.nvr)
name_version = f"{parsed_nvr['name']}-{parsed_nvr['version']}"
if name_version not in conf.image_extra_repo:
msg = (
f"{name_version} is a base image, but extra image repo for it "
"is not specified in the Freshmaker configuration."
)
raise ExtraRepoNotConfiguredError(msg)
self.build_metadata["filesystem_koji_task_id"] = fs_koji_task_id

extra_image = build.get("extra", {}).get("image", {})
# Get the list of ODCS composes used to build the image.
Expand Down Expand Up @@ -204,8 +203,8 @@ def resolve_build_metadata(self, koji_session: KojiService) -> None:
# submitting those builds include this character in source URL
# to mark the query part of URL. We need to handle that by
# stripping that character.
container = m.group("container").rstrip("?")
self.build_metadata["repository"] = namespace + "/" + container
container_name = m.group("container").rstrip("?")
self.build_metadata["repository"] = namespace + "/" + container_name

# There might be tasks which have branch name in
# "origin/branch_name" format, so detect it set commit
Expand All @@ -224,9 +223,8 @@ def resolve_build_metadata(self, koji_session: KojiService) -> None:

def resolve_compose_sources(self):
"""Get source values of ODCS composes used in image build task"""
compose_sources = getattr(self, "compose_sources", None)
# This has been populated, skip.
if compose_sources is not None:
if getattr(self, "compose_sources", None) is not None:
return

self.compose_sources = []
Expand Down Expand Up @@ -288,34 +286,11 @@ def resolve_content_sets(
return

log.warning(
"Container image %s does not have 'content_sets' set "
"in Pyxis as well as its children, this "
"is suspicious.",
"Container image %s does not have 'content_sets' set in Pyxis as well "
"as its children, this is suspicious.",
self.nvr,
)

def resolve_published(self, pyxis_instance: PyxisGQL):
# Get the published version of this image to find out if the image
# was actually published.
if self.published is not None:
return
images = pyxis_instance.find_images_by_nvr(self.nvr, include_rpms=False)
for image in images[:1]:
for repo in image["repositories"]:
if repo["published"] is True:
self.published = True
return

self.published = False
images = pyxis_instance.find_images_by_nvr(self.nvr)
if not self.rpms:
return
exist_rpms = [rpm["rpm_name"] for rpm in self.rpms]
for rpm in images[0]["edges"]["rpm_manifest"]["data"]["rpms"]:
new_rpm = self._convert_rpm(rpm)
if new_rpm["nvr"] not in exist_rpms:
self.rpms.append(new_rpm)


class ContainerAPI:
def __init__(self, pyxis_graphql_url: str, cert: Union[str, tuple[str, str]]):
Expand Down Expand Up @@ -395,13 +370,9 @@ def find_auto_rebuild_containers_with_older_rpms(

containers = []
for nvr, images in images_by_nvr.items():
# Create a ContainerImage instance with the first architecture image data
image = Container.load(images[0])
# Update the instance to add data for other arches
for img in images[1:]:
image.add_arch(img)

containers.append(image)
# Create Container instance with the images data
container = Container.create_from_images(images)
containers.append(container)

# Filter out images which don't have older rpms installed
containers = list(filter(lambda x: x.has_older_rpms(rpm_nvrs), containers))
Expand Down
Loading

0 comments on commit 36f7131

Please sign in to comment.