Skip to content

Commit

Permalink
Move kubectl commands to a separate class using @staticmethod
Browse files Browse the repository at this point in the history
Almost all functions in KubernetesCluster don't actually use the instance members of the class & are easily migrated to using staticmethod.

PiperOrigin-RevId: 698419061
  • Loading branch information
Zach Howell authored and copybara-github committed Nov 20, 2024
1 parent e5f06d0 commit 994c0dc
Showing 1 changed file with 110 additions and 82 deletions.
192 changes: 110 additions & 82 deletions perfkitbenchmarker/container_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -808,12 +808,15 @@ def _Delete(self):
RunKubectlCommand(delete_cmd, raise_on_failure=False)


class KubernetesCluster(BaseContainerCluster):
"""A Kubernetes flavor of Container Cluster."""
class KubernetesClusterCommands:
"""Implementation of many Kubernetes commands.
CLUSTER_TYPE = KUBERNETES
All methods just call generic kubectl commands without needing instance
information.
"""

def _DeleteAllFromDefaultNamespace(self):
@staticmethod
def _DeleteAllFromDefaultNamespace():
"""Deletes all resources from a namespace.
Since StatefulSets do not reclaim PVCs upon deletion, they are explicitly
Expand All @@ -834,36 +837,12 @@ def _DeleteAllFromDefaultNamespace(self):
)
time.sleep(RESOURCE_DELETE_SLEEP_SECONDS)

def _Delete(self):
self._DeleteAllFromDefaultNamespace()

def GetResourceMetadata(self):
"""Returns a dict containing metadata about the cluster."""
result = super().GetResourceMetadata()
if self.created:
result['container_cluster_version'] = self.k8s_version
return result
@staticmethod
def _Delete():
KubernetesClusterCommands._DeleteAllFromDefaultNamespace()

def DeployContainer(
self, name: str, container_spec: container_spec_lib.ContainerSpec
):
"""Deploys Containers according to the ContainerSpec."""
base_name = name
name = base_name + str(len(self.containers[base_name]))
container = KubernetesContainer(container_spec=container_spec, name=name)
self.containers[base_name].append(container)
container.Create()

def DeployContainerService(
self, name: str, container_spec: container_spec_lib.ContainerSpec
):
"""Deploys a ContainerSerivice according to the ContainerSpec."""
service = KubernetesContainerService(container_spec, name)
self.services[name] = service
service.Create()

# TODO(pclay): Revisit instance methods that don't rely on instance data.
def ApplyManifest(self, manifest_file: str, **kwargs) -> str:
@staticmethod
def ApplyManifest(manifest_file: str, **kwargs) -> str:
"""Applies a declarative Kubernetes manifest; possibly with jinja.
Args:
Expand All @@ -879,10 +858,12 @@ def _ParseApplyOutput(stdout: str) -> str:
# Example input: deployment.apps/pkb123 created
match = re.search(r'deployment[^\s]+', stdout)
if not match:
raise ValueError(
'Failed to parse the output of kubectl apply to get the name of'
' the deployment.'
)
match = re.search(r'daemonset[^\s]+', stdout)
if not match:
raise ValueError(
'Failed to parse the output of kubectl apply to get the name of'
' the deployment.'
)
return match.group()

filename = data.ResourcePath(manifest_file)
Expand All @@ -901,8 +882,8 @@ def _ParseApplyOutput(stdout: str) -> str:
out, _, _ = RunKubectlCommand(['apply', '-f', rendered_template.name])
return _ParseApplyOutput(out)

@staticmethod
def WaitForResource(
self,
resource_name: str,
condition_name: str,
namespace: str | None = None,
Expand All @@ -922,7 +903,8 @@ def WaitForResource(
run_cmd.append('--all')
RunKubectlCommand(run_cmd, timeout=timeout + 10)

def WaitForRollout(self, resource_name: str):
@staticmethod
def WaitForRollout(resource_name: str):
"""Blocks until a Kubernetes rollout is completed."""
run_cmd = [
'rollout',
Expand All @@ -933,8 +915,9 @@ def WaitForRollout(self, resource_name: str):

RunKubectlCommand(run_cmd)

@staticmethod
@vm_util.Retry(retryable_exceptions=(errors.Resource.RetryableCreationError,))
def GetLoadBalancerIP(self, service_name: str):
def GetLoadBalancerIP(service_name: str):
"""Returns the IP address of a LoadBalancer service when ready."""
get_cmd = [
'get',
Expand All @@ -956,8 +939,9 @@ def GetLoadBalancerIP(self, service_name: str):

return format(ip_address)

@staticmethod
@vm_util.Retry(retryable_exceptions=(errors.Resource.RetryableCreationError,))
def GetClusterIP(self, service_name: str) -> str:
def GetClusterIP(service_name: str) -> str:
"""Returns the IP address of a ClusterIP service when ready."""
get_cmd = [
'get',
Expand All @@ -976,7 +960,8 @@ def GetClusterIP(self, service_name: str) -> str:

return stdout

def CreateConfigMap(self, name: str, from_file_dir: str):
@staticmethod
def CreateConfigMap(name: str, from_file_dir: str):
"""Creates a Kubernetes ConfigMap.
Args:
Expand All @@ -988,8 +973,9 @@ def CreateConfigMap(self, name: str, from_file_dir: str):
['create', 'configmap', name, '--from-file', from_file_dir]
)

@staticmethod
def CreateServiceAccount(
self, name: str, clusterrole: str | None = None, namespace='default'
name: str, clusterrole: str | None = None, namespace='default'
):
"""Create a k8s service account and cluster-role-binding."""
RunKubectlCommand(
Expand All @@ -1007,34 +993,15 @@ def CreateServiceAccount(
namespace,
])

# TODO(pclay): Move to cached property in Python 3.9
@property
@functools.lru_cache(maxsize=1)
def node_memory_allocatable(self) -> units.Quantity:
"""Usable memory of each node in cluster in KiB."""
stdout, _, _ = RunKubectlCommand(
# TODO(pclay): Take a minimum of all nodes?
['get', 'nodes', '-o', 'jsonpath={.items[0].status.allocatable.memory}']
)
return units.ParseExpression(stdout)

@property
@functools.lru_cache(maxsize=1)
def node_num_cpu(self) -> int:
"""vCPU of each node in cluster."""
stdout, _, _ = RunKubectlCommand(
['get', 'nodes', '-o', 'jsonpath={.items[0].status.capacity.cpu}']
)
return int(stdout)

@property
@functools.lru_cache(maxsize=1)
def k8s_version(self) -> str:
"""Actual Kubernetes version reported by server."""
stdout, _, _ = RunKubectlCommand(['version', '-o', 'yaml'])
return yaml.safe_load(stdout)['serverVersion']['gitVersion']

def GetPodLabel(self, resource_name):
@staticmethod
def GetPodLabel(resource_name):
run_cmd = [
'get',
resource_name,
Expand All @@ -1045,7 +1012,8 @@ def GetPodLabel(self, resource_name):
stdout, _, _ = RunKubectlCommand(run_cmd)
return yaml.safe_load(stdout)

def GetPodIpsByLabel(self, pod_label_key, pod_label_value) -> list[str]:
@staticmethod
def GetPodIpsByLabel(pod_label_key, pod_label_value) -> list[str]:
"""Returns a list of internal IPs for pod label key-value.
Args:
Expand All @@ -1064,16 +1032,18 @@ def GetPodIpsByLabel(self, pod_label_key, pod_label_value) -> list[str]:
stdout, _, _ = RunKubectlCommand(get_cmd)
return yaml.safe_load(stdout).split()

def GetPodIps(self, resource_name) -> list[str]:
@staticmethod
def GetPodIps(resource_name) -> list[str]:
"""Returns a list of internal IPs for a pod name.
Args:
resource_name: The pod resource name
"""
pod_label = self.GetPodLabel(resource_name)
return self.GetPodIpsByLabel('app', pod_label)
pod_label = KubernetesClusterCommands.GetPodLabel(resource_name)
return KubernetesClusterCommands.GetPodIpsByLabel('app', pod_label)

def GetAllPodNames(self) -> list[str]:
@staticmethod
def GetAllPodNames() -> list[str]:
"""Returns all pod names in the cluster."""
pods, _, _ = RunKubectlCommand([
'get',
Expand All @@ -1088,31 +1058,26 @@ def GetAllPodNames(self) -> list[str]:
# Remove the 'pod/' prefix from each pod name.
return [pod_name.replace('pod/', '') for pod_name in pod_names_with_prefix]

def RunKubectlExec(self, pod_name, cmd):
@staticmethod
def RunKubectlExec(pod_name, cmd):
run_cmd = ['exec', '-it', pod_name, '--'] + cmd
RunKubectlCommand(run_cmd)

def LabelDisks(self):
"""Propagate cluster labels to disks if not done by cloud provider."""
pass

def _GetPvcs(self) -> Sequence[Any]:
@staticmethod
def _GetPvcs() -> Sequence[Any]:
stdout, _, _ = RunKubectlCommand(['get', 'pvc', '-o', 'yaml'])
return yaml.safe_load(stdout)['items']

# TODO(pclay): integrate with kubernetes_disk.
def GetDefaultStorageClass(self) -> str:
"""Get the default storage class for the provider."""
raise NotImplementedError

def GetNodeNames(self) -> list[str]:
@staticmethod
def GetNodeNames() -> list[str]:
"""Get the node names for the cluster."""
stdout, _, _ = RunKubectlCommand(
['get', 'nodes', '-o', 'jsonpath={.items[*].metadata.name}']
)
return stdout.split()

def GetEvents(self):
@staticmethod
def GetEvents():
"""Get the events for the cluster."""
stdout, _, _ = RunKubectlCommand(['get', 'events', '-o', 'yaml'])
k8s_events = []
Expand All @@ -1123,6 +1088,69 @@ def GetEvents(self):
return k8s_events


class KubernetesCluster(BaseContainerCluster, KubernetesClusterCommands):
"""A Kubernetes flavor of Container Cluster."""

CLUSTER_TYPE = KUBERNETES

def _Delete(self):
self._DeleteAllFromDefaultNamespace()

def GetResourceMetadata(self):
"""Returns a dict containing metadata about the cluster."""
result = super().GetResourceMetadata()
if self.created:
result['container_cluster_version'] = self.k8s_version
return result

def DeployContainer(
self, name: str, container_spec: container_spec_lib.ContainerSpec
):
"""Deploys Containers according to the ContainerSpec."""
base_name = name
name = base_name + str(len(self.containers[base_name]))
container = KubernetesContainer(container_spec=container_spec, name=name)
self.containers[base_name].append(container)
container.Create()

def DeployContainerService(
self, name: str, container_spec: container_spec_lib.ContainerSpec
):
"""Deploys a ContainerSerivice according to the ContainerSpec."""
service = KubernetesContainerService(container_spec, name)
self.services[name] = service
service.Create()

# TODO(pclay): Move to cached property in Python 3.9
@property
@functools.lru_cache(maxsize=1)
def node_memory_allocatable(self) -> units.Quantity:
"""Usable memory of each node in cluster in KiB."""
stdout, _, _ = RunKubectlCommand(
# TODO(pclay): Take a minimum of all nodes?
['get', 'nodes', '-o', 'jsonpath={.items[0].status.allocatable.memory}']
)
return units.ParseExpression(stdout)

@property
@functools.lru_cache(maxsize=1)
def node_num_cpu(self) -> int:
"""vCPU of each node in cluster."""
stdout, _, _ = RunKubectlCommand(
['get', 'nodes', '-o', 'jsonpath={.items[0].status.capacity.cpu}']
)
return int(stdout)

def LabelDisks(self):
"""Propagate cluster labels to disks if not done by cloud provider."""
pass

# TODO(pclay): integrate with kubernetes_disk.
def GetDefaultStorageClass(self) -> str:
"""Get the default storage class for the provider."""
raise NotImplementedError


@dataclasses.dataclass
class KubernetesEventResource:
"""Holder for Kubernetes event involved objects."""
Expand Down

0 comments on commit 994c0dc

Please sign in to comment.