From 994c0dce5fe28f705a2a772bf11657ae3dc59d23 Mon Sep 17 00:00:00 2001 From: Zach Howell Date: Wed, 20 Nov 2024 09:31:32 -0800 Subject: [PATCH] Move kubectl commands to a separate class using @staticmethod Almost all functions in KubernetesCluster don't actually use the instance members of the class & are easily migrated to using staticmethod. PiperOrigin-RevId: 698419061 --- perfkitbenchmarker/container_service.py | 192 ++++++++++++++---------- 1 file changed, 110 insertions(+), 82 deletions(-) diff --git a/perfkitbenchmarker/container_service.py b/perfkitbenchmarker/container_service.py index 73e1dc531..d740026cd 100644 --- a/perfkitbenchmarker/container_service.py +++ b/perfkitbenchmarker/container_service.py @@ -808,12 +808,15 @@ def _Delete(self): RunKubectlCommand(delete_cmd, raise_on_failure=False) -class KubernetesCluster(BaseContainerCluster): - """A Kubernetes flavor of Container Cluster.""" +class KubernetesClusterCommands: + """Implementation of many Kubernetes commands. - CLUSTER_TYPE = KUBERNETES + All methods just call generic kubectl commands without needing instance + information. + """ - def _DeleteAllFromDefaultNamespace(self): + @staticmethod + def _DeleteAllFromDefaultNamespace(): """Deletes all resources from a namespace. Since StatefulSets do not reclaim PVCs upon deletion, they are explicitly @@ -834,36 +837,12 @@ def _DeleteAllFromDefaultNamespace(self): ) time.sleep(RESOURCE_DELETE_SLEEP_SECONDS) - def _Delete(self): - self._DeleteAllFromDefaultNamespace() - - def GetResourceMetadata(self): - """Returns a dict containing metadata about the cluster.""" - result = super().GetResourceMetadata() - if self.created: - result['container_cluster_version'] = self.k8s_version - return result + @staticmethod + def _Delete(): + KubernetesClusterCommands._DeleteAllFromDefaultNamespace() - def DeployContainer( - self, name: str, container_spec: container_spec_lib.ContainerSpec - ): - """Deploys Containers according to the ContainerSpec.""" - base_name = name - name = base_name + str(len(self.containers[base_name])) - container = KubernetesContainer(container_spec=container_spec, name=name) - self.containers[base_name].append(container) - container.Create() - - def DeployContainerService( - self, name: str, container_spec: container_spec_lib.ContainerSpec - ): - """Deploys a ContainerSerivice according to the ContainerSpec.""" - service = KubernetesContainerService(container_spec, name) - self.services[name] = service - service.Create() - - # TODO(pclay): Revisit instance methods that don't rely on instance data. - def ApplyManifest(self, manifest_file: str, **kwargs) -> str: + @staticmethod + def ApplyManifest(manifest_file: str, **kwargs) -> str: """Applies a declarative Kubernetes manifest; possibly with jinja. Args: @@ -879,10 +858,12 @@ def _ParseApplyOutput(stdout: str) -> str: # Example input: deployment.apps/pkb123 created match = re.search(r'deployment[^\s]+', stdout) if not match: - raise ValueError( - 'Failed to parse the output of kubectl apply to get the name of' - ' the deployment.' - ) + match = re.search(r'daemonset[^\s]+', stdout) + if not match: + raise ValueError( + 'Failed to parse the output of kubectl apply to get the name of' + ' the deployment.' + ) return match.group() filename = data.ResourcePath(manifest_file) @@ -901,8 +882,8 @@ def _ParseApplyOutput(stdout: str) -> str: out, _, _ = RunKubectlCommand(['apply', '-f', rendered_template.name]) return _ParseApplyOutput(out) + @staticmethod def WaitForResource( - self, resource_name: str, condition_name: str, namespace: str | None = None, @@ -922,7 +903,8 @@ def WaitForResource( run_cmd.append('--all') RunKubectlCommand(run_cmd, timeout=timeout + 10) - def WaitForRollout(self, resource_name: str): + @staticmethod + def WaitForRollout(resource_name: str): """Blocks until a Kubernetes rollout is completed.""" run_cmd = [ 'rollout', @@ -933,8 +915,9 @@ def WaitForRollout(self, resource_name: str): RunKubectlCommand(run_cmd) + @staticmethod @vm_util.Retry(retryable_exceptions=(errors.Resource.RetryableCreationError,)) - def GetLoadBalancerIP(self, service_name: str): + def GetLoadBalancerIP(service_name: str): """Returns the IP address of a LoadBalancer service when ready.""" get_cmd = [ 'get', @@ -956,8 +939,9 @@ def GetLoadBalancerIP(self, service_name: str): return format(ip_address) + @staticmethod @vm_util.Retry(retryable_exceptions=(errors.Resource.RetryableCreationError,)) - def GetClusterIP(self, service_name: str) -> str: + def GetClusterIP(service_name: str) -> str: """Returns the IP address of a ClusterIP service when ready.""" get_cmd = [ 'get', @@ -976,7 +960,8 @@ def GetClusterIP(self, service_name: str) -> str: return stdout - def CreateConfigMap(self, name: str, from_file_dir: str): + @staticmethod + def CreateConfigMap(name: str, from_file_dir: str): """Creates a Kubernetes ConfigMap. Args: @@ -988,8 +973,9 @@ def CreateConfigMap(self, name: str, from_file_dir: str): ['create', 'configmap', name, '--from-file', from_file_dir] ) + @staticmethod def CreateServiceAccount( - self, name: str, clusterrole: str | None = None, namespace='default' + name: str, clusterrole: str | None = None, namespace='default' ): """Create a k8s service account and cluster-role-binding.""" RunKubectlCommand( @@ -1007,26 +993,6 @@ def CreateServiceAccount( namespace, ]) - # TODO(pclay): Move to cached property in Python 3.9 - @property - @functools.lru_cache(maxsize=1) - def node_memory_allocatable(self) -> units.Quantity: - """Usable memory of each node in cluster in KiB.""" - stdout, _, _ = RunKubectlCommand( - # TODO(pclay): Take a minimum of all nodes? - ['get', 'nodes', '-o', 'jsonpath={.items[0].status.allocatable.memory}'] - ) - return units.ParseExpression(stdout) - - @property - @functools.lru_cache(maxsize=1) - def node_num_cpu(self) -> int: - """vCPU of each node in cluster.""" - stdout, _, _ = RunKubectlCommand( - ['get', 'nodes', '-o', 'jsonpath={.items[0].status.capacity.cpu}'] - ) - return int(stdout) - @property @functools.lru_cache(maxsize=1) def k8s_version(self) -> str: @@ -1034,7 +1000,8 @@ def k8s_version(self) -> str: stdout, _, _ = RunKubectlCommand(['version', '-o', 'yaml']) return yaml.safe_load(stdout)['serverVersion']['gitVersion'] - def GetPodLabel(self, resource_name): + @staticmethod + def GetPodLabel(resource_name): run_cmd = [ 'get', resource_name, @@ -1045,7 +1012,8 @@ def GetPodLabel(self, resource_name): stdout, _, _ = RunKubectlCommand(run_cmd) return yaml.safe_load(stdout) - def GetPodIpsByLabel(self, pod_label_key, pod_label_value) -> list[str]: + @staticmethod + def GetPodIpsByLabel(pod_label_key, pod_label_value) -> list[str]: """Returns a list of internal IPs for pod label key-value. Args: @@ -1064,16 +1032,18 @@ def GetPodIpsByLabel(self, pod_label_key, pod_label_value) -> list[str]: stdout, _, _ = RunKubectlCommand(get_cmd) return yaml.safe_load(stdout).split() - def GetPodIps(self, resource_name) -> list[str]: + @staticmethod + def GetPodIps(resource_name) -> list[str]: """Returns a list of internal IPs for a pod name. Args: resource_name: The pod resource name """ - pod_label = self.GetPodLabel(resource_name) - return self.GetPodIpsByLabel('app', pod_label) + pod_label = KubernetesClusterCommands.GetPodLabel(resource_name) + return KubernetesClusterCommands.GetPodIpsByLabel('app', pod_label) - def GetAllPodNames(self) -> list[str]: + @staticmethod + def GetAllPodNames() -> list[str]: """Returns all pod names in the cluster.""" pods, _, _ = RunKubectlCommand([ 'get', @@ -1088,31 +1058,26 @@ def GetAllPodNames(self) -> list[str]: # Remove the 'pod/' prefix from each pod name. return [pod_name.replace('pod/', '') for pod_name in pod_names_with_prefix] - def RunKubectlExec(self, pod_name, cmd): + @staticmethod + def RunKubectlExec(pod_name, cmd): run_cmd = ['exec', '-it', pod_name, '--'] + cmd RunKubectlCommand(run_cmd) - def LabelDisks(self): - """Propagate cluster labels to disks if not done by cloud provider.""" - pass - - def _GetPvcs(self) -> Sequence[Any]: + @staticmethod + def _GetPvcs() -> Sequence[Any]: stdout, _, _ = RunKubectlCommand(['get', 'pvc', '-o', 'yaml']) return yaml.safe_load(stdout)['items'] - # TODO(pclay): integrate with kubernetes_disk. - def GetDefaultStorageClass(self) -> str: - """Get the default storage class for the provider.""" - raise NotImplementedError - - def GetNodeNames(self) -> list[str]: + @staticmethod + def GetNodeNames() -> list[str]: """Get the node names for the cluster.""" stdout, _, _ = RunKubectlCommand( ['get', 'nodes', '-o', 'jsonpath={.items[*].metadata.name}'] ) return stdout.split() - def GetEvents(self): + @staticmethod + def GetEvents(): """Get the events for the cluster.""" stdout, _, _ = RunKubectlCommand(['get', 'events', '-o', 'yaml']) k8s_events = [] @@ -1123,6 +1088,69 @@ def GetEvents(self): return k8s_events +class KubernetesCluster(BaseContainerCluster, KubernetesClusterCommands): + """A Kubernetes flavor of Container Cluster.""" + + CLUSTER_TYPE = KUBERNETES + + def _Delete(self): + self._DeleteAllFromDefaultNamespace() + + def GetResourceMetadata(self): + """Returns a dict containing metadata about the cluster.""" + result = super().GetResourceMetadata() + if self.created: + result['container_cluster_version'] = self.k8s_version + return result + + def DeployContainer( + self, name: str, container_spec: container_spec_lib.ContainerSpec + ): + """Deploys Containers according to the ContainerSpec.""" + base_name = name + name = base_name + str(len(self.containers[base_name])) + container = KubernetesContainer(container_spec=container_spec, name=name) + self.containers[base_name].append(container) + container.Create() + + def DeployContainerService( + self, name: str, container_spec: container_spec_lib.ContainerSpec + ): + """Deploys a ContainerSerivice according to the ContainerSpec.""" + service = KubernetesContainerService(container_spec, name) + self.services[name] = service + service.Create() + + # TODO(pclay): Move to cached property in Python 3.9 + @property + @functools.lru_cache(maxsize=1) + def node_memory_allocatable(self) -> units.Quantity: + """Usable memory of each node in cluster in KiB.""" + stdout, _, _ = RunKubectlCommand( + # TODO(pclay): Take a minimum of all nodes? + ['get', 'nodes', '-o', 'jsonpath={.items[0].status.allocatable.memory}'] + ) + return units.ParseExpression(stdout) + + @property + @functools.lru_cache(maxsize=1) + def node_num_cpu(self) -> int: + """vCPU of each node in cluster.""" + stdout, _, _ = RunKubectlCommand( + ['get', 'nodes', '-o', 'jsonpath={.items[0].status.capacity.cpu}'] + ) + return int(stdout) + + def LabelDisks(self): + """Propagate cluster labels to disks if not done by cloud provider.""" + pass + + # TODO(pclay): integrate with kubernetes_disk. + def GetDefaultStorageClass(self) -> str: + """Get the default storage class for the provider.""" + raise NotImplementedError + + @dataclasses.dataclass class KubernetesEventResource: """Holder for Kubernetes event involved objects."""