diff --git a/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py b/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py index 992fa17b2..46d77d8cb 100644 --- a/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py +++ b/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py @@ -82,7 +82,7 @@ def GetFullRegistryTag(self, image: str) -> str: return full_tag def Login(self): - """Configure docker to be able to push to remote repo.""" + """Configures docker to be able to push to remote repo.""" util.GcloudCommand(self, 'auth', 'configure-docker', self.endpoint).Issue() def _Create(self): @@ -107,7 +107,7 @@ def _Delete(self): ).Issue() def RemoteBuild(self, image: container_service.ContainerImage): - """Build the image remotely.""" + """Builds the image remotely.""" if not _CONTAINER_REMOTE_BUILD_CONFIG.value: full_tag = self.GetFullRegistryTag(image.name) else: @@ -118,16 +118,113 @@ def RemoteBuild(self, image: container_service.ContainerImage): build_cmd.Issue(timeout=None) -class GkeCluster(container_service.KubernetesCluster): +class BaseGkeCluster(container_service.KubernetesCluster): + """Base class for regular & Autopilot GKE clusters.""" + + def __init__(self, spec: container_spec_lib.ContainerClusterSpec): + super().__init__(spec) + self.project: str = spec.vm_spec.project or FLAGS.project + self.cluster_version: str = FLAGS.container_cluster_version + self.use_application_default_credentials: bool = True + self.region: str + + def _GcloudCommand(self, *args, **kwargs) -> util.GcloudCommand: + """Creates a gcloud command.""" + return util.GcloudCommand(self, *args, **kwargs) + + def _RunClusterCreateCommand(self, cmd: util.GcloudCommand): + """Adds flags to the cluster create command and runs it.""" + if self.cluster_version: + if self.cluster_version in RELEASE_CHANNELS: + if FLAGS.gke_enable_alpha: + raise errors.Config.InvalidValue( + 'Kubernetes Alpha is not compatible with release channels' + ) + cmd.flags['release-channel'] = self.cluster_version + else: + cmd.flags['cluster-version'] = self.cluster_version + if FLAGS.gke_enable_alpha: + cmd.args.append('--enable-kubernetes-alpha') + cmd.args.append('--no-enable-autorepair') + + user = util.GetDefaultUser() + if FLAGS.gcp_service_account: + cmd.flags['service-account'] = FLAGS.gcp_service_account + # Matches service accounts that either definitely belongs to this project or + # are a GCP managed service account like the GCE default service account, + # which we can't tell to which project they belong. + elif re.match(SERVICE_ACCOUNT_PATTERN, user): + logging.info( + 'Re-using configured service-account for GKE Cluster: %s', user + ) + cmd.flags['service-account'] = user + self.use_application_default_credentials = False + else: + logging.info('Using default GCE service account for GKE cluster') + cmd.flags['scopes'] = 'cloud-platform' + + self._IssueResourceCreationCommand(cmd) + + def _IssueResourceCreationCommand(self, cmd: util.GcloudCommand): + """Issues a command to gcloud to create resources.""" + + # This command needs a long timeout due to the many minutes it + # can take to provision a large GPU-accelerated GKE cluster. + _, stderr, retcode = cmd.Issue(timeout=ONE_HOUR, raise_on_failure=False) + if retcode: + util.CheckGcloudResponseKnownFailures(stderr, retcode) + raise errors.Resource.CreationError(stderr) + + def _PostCreate(self): + """Acquires cluster authentication.""" + super()._PostCreate() + cmd = self._GcloudCommand( + 'container', 'clusters', 'get-credentials', self.name + ) + env = os.environ.copy() + env['KUBECONFIG'] = FLAGS.kubeconfig + cmd.IssueRetryable(env=env) + + # GKE does not wait for kube-dns by default + logging.info('Waiting for kube-dns') + self.WaitForResource( + 'deployment/kube-dns', + condition_name='Available', + namespace='kube-system', + ) + + def _IsDeleting(self): + cmd = self._GcloudCommand('container', 'clusters', 'describe', self.name) + stdout, _, _ = cmd.Issue(raise_on_failure=False) + return True if stdout else False + + def _Delete(self): + """Deletes the cluster.""" + super()._Delete() + cmd = self._GcloudCommand('container', 'clusters', 'delete', self.name) + cmd.args.append('--async') + cmd.Issue(raise_on_failure=False) + + def _Exists(self): + """Returns True if the cluster exits.""" + cmd = self._GcloudCommand('container', 'clusters', 'describe', self.name) + _, _, retcode = cmd.Issue(raise_on_failure=False) + return retcode == 0 + + def GetDefaultStorageClass(self) -> str: + """Gets the default storage class for the provider.""" + # https://cloud.google.com/kubernetes-engine/docs/how-to/persistent-volumes/gce-pd-csi-driver + # PD-SSD + return 'premium-rwo' + + +class GkeCluster(BaseGkeCluster): """Class representing a Google Kubernetes Engine cluster.""" CLOUD = provider_info.GCP def __init__(self, spec: container_spec_lib.ContainerClusterSpec): super().__init__(spec) - self.project = spec.vm_spec.project - self.cluster_version = FLAGS.container_cluster_version - self.use_application_default_credentials = True self.zones = ( self.default_nodepool.zone and self.default_nodepool.zone.split(',') ) @@ -175,6 +272,14 @@ def InitializeNodePoolForCloud( nodepool_config.cpus: int = vm_config.cpus nodepool_config.memory_mib: int = vm_config.memory_mib + def _GcloudCommand(self, *args, **kwargs) -> util.GcloudCommand: + """Fix zone and region.""" + cmd = super()._GcloudCommand(*args, **kwargs) + if len(self.zones) != 1: + del cmd.flags['zone'] + cmd.flags['region'] = self.region + return cmd + def GetResourceMetadata(self) -> dict[str, Any]: """Returns a dict containing metadata about the cluster. @@ -199,52 +304,17 @@ def GetResourceMetadata(self) -> dict[str, Any]: result['gpu_count'] = self.nodepools['nccl'].gpu_count return result - def _GcloudCommand(self, *args, **kwargs) -> util.GcloudCommand: - """Fix zone and region.""" - cmd = util.GcloudCommand(self, *args, **kwargs) - if len(self.zones) != 1: - del cmd.flags['zone'] - cmd.flags['region'] = self.region - return cmd - def _Create(self): """Creates the cluster.""" cmd = self._GcloudCommand('container', 'clusters', 'create', self.name) + if self.default_nodepool.network: + cmd.flags['network'] = self.default_nodepool.network.network_resource.name + cmd.args.append('--no-enable-shielded-nodes') self._AddNodeParamsToCmd( self.default_nodepool, cmd, ) - - if self.cluster_version: - if self.cluster_version in RELEASE_CHANNELS: - if FLAGS.gke_enable_alpha: - raise errors.Config.InvalidValue( - 'Kubernetes Alpha is not compatible with release channels' - ) - cmd.flags['release-channel'] = self.cluster_version - else: - cmd.flags['cluster-version'] = self.cluster_version - if FLAGS.gke_enable_alpha: - cmd.args.append('--enable-kubernetes-alpha') - cmd.args.append('--no-enable-autorepair') - - user = util.GetDefaultUser() - if FLAGS.gcp_service_account: - cmd.flags['service-account'] = FLAGS.gcp_service_account - # Matches service accounts that either definitely belongs to this project or - # are a GCP managed service account like the GCE default service account, - # which we can't tell to which project they belong. - elif re.match(SERVICE_ACCOUNT_PATTERN, user): - logging.info( - 'Re-using configured service-account for GKE Cluster: %s', user - ) - cmd.flags['service-account'] = user - self.use_application_default_credentials = False - else: - logging.info('Using default GCE service account for GKE cluster') - cmd.flags['scopes'] = 'cloud-platform' - if ( self.min_nodes != self.default_nodepool.num_nodes or self.max_nodes != self.default_nodepool.num_nodes @@ -252,16 +322,10 @@ def _Create(self): cmd.args.append('--enable-autoscaling') cmd.flags['max-nodes'] = self.max_nodes cmd.flags['min-nodes'] = self.min_nodes - cmd.flags['cluster-ipv4-cidr'] = f'/{_CalculateCidrSize(self.max_nodes)}' - - if self.default_nodepool.network: - cmd.flags['network'] = self.default_nodepool.network.network_resource.name - cmd.flags['metadata'] = util.MakeFormattedDefaultTags() - cmd.args.append('--no-enable-shielded-nodes') - self._IssueResourceCreationCommand(cmd) + self._RunClusterCreateCommand(cmd) self._CreateNodePools() def _CreateNodePools(self): @@ -276,16 +340,6 @@ def _CreateNodePools(self): ) self._IssueResourceCreationCommand(cmd) - def _IssueResourceCreationCommand(self, cmd: util.GcloudCommand): - """Issues a command to gcloud to create resources.""" - - # This command needs a long timeout due to the many minutes it - # can take to provision a large GPU-accelerated GKE cluster. - _, stderr, retcode = cmd.Issue(timeout=ONE_HOUR, raise_on_failure=False) - if retcode: - util.CheckGcloudResponseKnownFailures(stderr, retcode) - raise errors.Resource.CreationError(stderr) - def _AddNodeParamsToCmd( self, nodepool_config: container_service.BaseNodePoolConfig, @@ -365,14 +419,8 @@ def _AddNodeParamsToCmd( cmd.flags['node-labels'] = f'pkb_nodepool={nodepool_config.name}' def _PostCreate(self): - """Acquire cluster authentication.""" + """Installs nvidia drivers if needed.""" super()._PostCreate() - cmd = self._GcloudCommand( - 'container', 'clusters', 'get-credentials', self.name - ) - env = os.environ.copy() - env['KUBECONFIG'] = FLAGS.kubeconfig - cmd.IssueRetryable(env=env) should_install_nvidia_drivers = self.default_nodepool.gpu_count or any( pool.gpu_count for pool in self.nodepools.values() @@ -383,14 +431,6 @@ def _PostCreate(self): data.ResourcePath(NVIDIA_UNRESTRICTED_PERMISSIONS_DAEMON_SET) ) - # GKE does not wait for kube-dns by default - logging.info('Waiting for kube-dns') - self.WaitForResource( - 'deployment/kube-dns', - condition_name='Available', - namespace='kube-system', - ) - def _GetInstanceGroups(self): cmd = self._GcloudCommand('container', 'node-pools', 'list') cmd.flags['cluster'] = self.name @@ -402,26 +442,8 @@ def _GetInstanceGroups(self): instance_groups.append(group_url.split('/')[-1]) # last url part return instance_groups - def _IsDeleting(self): - cmd = self._GcloudCommand('container', 'clusters', 'describe', self.name) - stdout, _, _ = cmd.Issue(raise_on_failure=False) - return True if stdout else False - - def _Delete(self): - """Deletes the cluster.""" - super()._Delete() - cmd = self._GcloudCommand('container', 'clusters', 'delete', self.name) - cmd.args.append('--async') - cmd.Issue(raise_on_failure=False) - - def _Exists(self): - """Returns True if the cluster exits.""" - cmd = self._GcloudCommand('container', 'clusters', 'describe', self.name) - _, _, retcode = cmd.Issue(raise_on_failure=False) - return retcode == 0 - def LabelDisks(self): - """Set common labels on PVCs. + """Sets common labels on PVCs. GKE does this in the background every hour. Do it immediately in case the cluster is deleted within that hour. @@ -431,16 +453,10 @@ def LabelDisks(self): for pvc in pvcs: gce_disk.AddLabels(self, pvc['spec']['volumeName']) - def GetDefaultStorageClass(self) -> str: - """Get the default storage class for the provider.""" - # https://cloud.google.com/kubernetes-engine/docs/how-to/persistent-volumes/gce-pd-csi-driver - # PD-SSD - return 'premium-rwo' - def ResizeNodePool( self, new_size: int, node_pool: str = container_service.DEFAULT_NODEPOOL ): - """Change the number of nodes in the node pool.""" + """Changes the number of nodes in the node pool.""" cmd = self._GcloudCommand('container', 'clusters', 'resize', self.name) cmd.flags['num-nodes'] = new_size # updates default node pool by default @@ -449,7 +465,7 @@ def ResizeNodePool( cmd.Issue() -class GkeAutopilotCluster(container_service.KubernetesCluster): +class GkeAutopilotCluster(BaseGkeCluster): """Class representing an Autopilot GKE cluster, which has no nodepools.""" CLOUD = provider_info.GCP @@ -457,9 +473,6 @@ class GkeAutopilotCluster(container_service.KubernetesCluster): def __init__(self, spec: container_spec_lib.ContainerClusterSpec): super().__init__(spec) - self.project = FLAGS.project - self.cluster_version = FLAGS.container_cluster_version - self.use_application_default_credentials = True self.region = util.GetRegionFromZone(self.zones[0]) # Nodepools are not supported for Autopilot clusters, but default vm_spec # still used for pod spec input. @@ -483,8 +496,8 @@ def zone(self) -> str: return self.zones[0] or 'us-central1-a' def _GcloudCommand(self, *args, **kwargs) -> util.GcloudCommand: - """Fix zone and region.""" - cmd = util.GcloudCommand(self, *args, **kwargs) + """Creates a gcloud command.""" + cmd = super()._GcloudCommand(*args, **kwargs) if 'zone' in cmd.flags: del cmd.flags['zone'] cmd.flags['region'] = self.region @@ -499,78 +512,18 @@ def _Create(self): self.name, '--no-autoprovisioning-enable-insecure-kubelet-readonly-port', ) - - user = util.GetDefaultUser() - if FLAGS.gcp_service_account: - cmd.flags['service-account'] = FLAGS.gcp_service_account - # Matches service accounts that either definitely belongs to this project or - # are a GCP managed service account like the GCE default service account, - # which we can't tell to which project they belong. - elif re.match(SERVICE_ACCOUNT_PATTERN, user): - logging.info( - 'Re-using configured service-account for GKE Cluster: %s', user - ) - cmd.flags['service-account'] = user - self.use_application_default_credentials = False - else: - logging.info('Using default GCE service account for GKE cluster') - cmd.flags['scopes'] = 'cloud-platform' - if self.default_nodepool.network: cmd.flags['network'] = self.default_nodepool.network.network_resource.name - cmd.flags['labels'] = util.MakeFormattedDefaultTags() - # cmd.args.append('--no-enable-shielded-nodes') - self._IssueResourceCreationCommand(cmd) - def _IssueResourceCreationCommand(self, cmd: util.GcloudCommand): - """Issues a command to gcloud to create resources.""" - - # This command needs a long timeout due to the many minutes it - # can take to provision a large GPU-accelerated GKE cluster. - _, stderr, retcode = cmd.Issue(timeout=ONE_HOUR, raise_on_failure=False) - if retcode: - util.CheckGcloudResponseKnownFailures(stderr, retcode) - raise errors.Resource.CreationError(stderr) - - def _PostCreate(self): - """Acquire cluster authentication.""" - super()._PostCreate() - cmd = self._GcloudCommand( - 'container', 'clusters', 'get-credentials', self.name - ) - env = os.environ.copy() - env['KUBECONFIG'] = FLAGS.kubeconfig - cmd.IssueRetryable(env=env) - - def _GetInstanceGroups(self): - return [] - - def _IsDeleting(self): - cmd = self._GcloudCommand('container', 'clusters', 'describe', self.name) - stdout, _, _ = cmd.Issue(raise_on_failure=False) - return True if stdout else False - - def _Delete(self): - """Deletes the cluster.""" - super()._Delete() - cmd = self._GcloudCommand('container', 'clusters', 'delete', self.name) - cmd.args.append('--async') - cmd.Issue(raise_on_failure=False) - - def _Exists(self): - """Returns True if the cluster exits.""" - cmd = self._GcloudCommand('container', 'clusters', 'describe', self.name) - _, _, retcode = cmd.Issue(raise_on_failure=False) - return retcode == 0 + self._RunClusterCreateCommand(cmd) def GetResourceMetadata(self) -> dict[str, Any]: metadata = { 'cloud': self.CLOUD, 'cluster_type': 'KubernetesAutopilot', 'zone': self.zone, - 'max_size': self.max_nodes, - 'min_size': self.min_nodes, + 'region': self.region, } return metadata