Skip to content

Commit

Permalink
add logic for memory usage for resource pool for vsphere provider
Browse files Browse the repository at this point in the history
Signed-off-by: Rahul Ganesh <rahulgab@amazon.com>
  • Loading branch information
Rahul Ganesh committed Sep 14, 2023
1 parent b32241d commit c2be09f
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 24 deletions.
18 changes: 9 additions & 9 deletions pkg/executables/govc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1158,8 +1158,8 @@ type resourcePool struct {
memoryLimit string
}

// ResourcePoolInfo returns the pool info for the provided resource pool.
func (g *Govc) ResourcePoolInfo(ctx context.Context, datacenter, resourcepool string, args ...string) (map[string]int, error) {
// GetResourcePoolInfo returns the pool info for the provided resource pool.
func (g *Govc) GetResourcePoolInfo(ctx context.Context, datacenter, resourcepool string, args ...string) (map[string]int, error) {
params := []string{"pool.info", "-dc", datacenter, resourcepool}
params = append(params, args...)
response, err := g.exec(ctx, params...)
Expand All @@ -1169,32 +1169,32 @@ func (g *Govc) ResourcePoolInfo(ctx context.Context, datacenter, resourcepool st
var resourcePoolInfoResponse resourcePoolInfo
err = yaml.Unmarshal(response.Bytes(), &resourcePoolInfoResponse)
if err != nil {
return nil, fmt.Errorf("unmarshalling devices info: %v", err)
return nil, fmt.Errorf("unmarshalling resource pool info: %v", err)
}
poolInfo, err := getPoolInfo(resourcePoolInfoResponse.resourcePoolIdentifier)
if err != nil {

return nil, err
}
return poolInfo, nil
}

// func validateMemoryAnd
// helper function that parses the resource pool responce and returns CPU and memory requirements.
func getPoolInfo(rp *resourcePool) (map[string]int, error) {
CPUUsed, err := getValueFromString(rp.CPUUsage)
if err != nil {
return nil, fmt.Errorf("Unable to obtain CPU usage for resource pool %s: %v", rp.CPUUsage, err)
return nil, fmt.Errorf("unable to obtain CPU usage for resource pool %s: %v", rp.CPUUsage, err)
}
CPULimit, err := getValueFromString(rp.CPULimit)
if err != nil {
return nil, fmt.Errorf("Unable to obtain CPU limit for resource pool %s: %v", rp.CPULimit, err)
return nil, fmt.Errorf("unable to obtain CPU limit for resource pool %s: %v", rp.CPULimit, err)
}
memoryUsed, err := getValueFromString(rp.memoryUsage)
if err != nil {
return nil, fmt.Errorf("Unable to obtain memory usage for resource pool %s: %v", rp.CPULimit, err)
return nil, fmt.Errorf("unable to obtain memory usage for resource pool %s: %v", rp.CPULimit, err)
}
memoryLimit, err := getValueFromString(rp.memoryLimit)
if err != nil {
return nil, fmt.Errorf("Unable to obtain memory limit for resource pool %s: %v", rp.CPULimit, err)
return nil, fmt.Errorf("unable to obtain memory limit for resource pool %s: %v", rp.CPULimit, err)
}
poolInfo := make(map[string]int)
if memoryLimit != -1 {
Expand Down
20 changes: 20 additions & 0 deletions pkg/providers/vsphere/mocks/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

153 changes: 152 additions & 1 deletion pkg/providers/vsphere/vsphere.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1beta1"

"github.com/aws/eks-anywhere/pkg/api/v1alpha1"
anywherev1 "github.com/aws/eks-anywhere/pkg/api/v1alpha1"
"github.com/aws/eks-anywhere/pkg/bootstrapper"
"github.com/aws/eks-anywhere/pkg/clients/kubernetes"
"github.com/aws/eks-anywhere/pkg/cluster"
Expand Down Expand Up @@ -52,6 +53,8 @@ const (
backOffPeriod = 5 * time.Second
disk1 = "Hard disk 1"
disk2 = "Hard disk 2"
cpuAvailable = "CPU_Available"
memoryAvailable = "Memory_Available"
ethtoolDaemonSetName = "vsphere-disable-udp-offload"
)

Expand Down Expand Up @@ -122,6 +125,7 @@ type ProviderGovcClient interface {
CreateRole(ctx context.Context, name string, privileges []string) error
SetGroupRoleOnObject(ctx context.Context, principal string, role string, object string, domain string) error
GetHardDiskSize(ctx context.Context, vm, datacenter string) (map[string]float64, error)
GetResourcePoolInfo(ctx context.Context, datacenter, resourcepool string, args ...string) (map[string]int, error)
}

type ProviderKubectlClient interface {
Expand Down Expand Up @@ -338,7 +342,9 @@ func (p *vsphereProvider) SetupAndValidateCreateCluster(ctx context.Context, clu
if err := p.validateDatastoreUsageForCreate(ctx, vSphereClusterSpec); err != nil {
return fmt.Errorf("validating vsphere machine configs datastore usage: %v", err)
}

if err := p.validateMemoryUsageForCreate(ctx, vSphereClusterSpec.VSphereDatacenter, vSphereClusterSpec); err != nil {
return fmt.Errorf("validating vsphere machine configs resource pool memory usage: %v", err)
}
if err := p.generateSSHKeysIfNotSet(clusterSpec.VSphereMachineConfigs); err != nil {
return fmt.Errorf("failed setup and validations: %v", err)
}
Expand Down Expand Up @@ -419,6 +425,10 @@ func (p *vsphereProvider) SetupAndValidateUpgradeCluster(ctx context.Context, cl
return fmt.Errorf("validating vsphere machine configs datastore usage: %v", err)
}

if err := p.validateMemoryUsageForUpgrade(ctx, vSphereClusterSpec.VSphereDatacenter, vSphereClusterSpec, cluster); err != nil {
return fmt.Errorf("validating vsphere machine configs resource pool memory usage: %v", err)
}

if !p.skippedValidations[validations.VSphereUserPriv] {
if err := p.validator.validateVsphereUserPrivs(ctx, vSphereClusterSpec); err != nil {
return fmt.Errorf("validating vsphere user privileges: %v", err)
Expand Down Expand Up @@ -590,6 +600,147 @@ func (p *vsphereProvider) validateDatastoreUsageForCreate(ctx context.Context, v
return nil
}

type memoryUsage struct {
availableMemoryMiB int
needMemoryMiB int
}

func (p *vsphereProvider) getPrevMachineConfigMemoryUsage(ctx context.Context, mc *v1alpha1.VSphereMachineConfig, cluster *types.Cluster, count int) (memoryMiB int, err error) {
em, err := p.providerKubectlClient.GetEksaVSphereMachineConfig(ctx, mc.Name, cluster.KubeconfigFile, mc.GetNamespace())
if err != nil {
return 0, err
}
if em != nil && em.Spec.ResourcePool == mc.Spec.ResourcePool {
return em.Spec.MemoryMiB * count, nil
}
return 0, nil
}

func (p *vsphereProvider) getMachineConfigMemoryRequirements(ctx context.Context, dc string, mc *v1alpha1.VSphereMachineConfig, count int) (available int, need int, err error) {
poolInfo, err := p.providerGovcClient.GetResourcePoolInfo(ctx, dc, mc.Spec.ResourcePool)
if err != nil {
return 0, 0, err
}
needMemoryMiB := mc.Spec.MemoryMiB * count
return poolInfo[memoryAvailable], needMemoryMiB, nil
}

func (p *vsphereProvider) calculateResourcePoolMemoryUsage(ctx context.Context, dc string, mc *v1alpha1.VSphereMachineConfig, cluster *types.Cluster, mu map[string]*memoryUsage, prevCount, newCount int) error {
availableMemoryMiB, needMemoryMiB, err := p.getMachineConfigMemoryRequirements(ctx, dc, mc, newCount)
if err != nil {
return err
}

// the last old machine is deleted only after the desired number of new machines are rolled out, so reduce 1 count to accommodate for the last old machine which is not deleted until the end of rollout
prevUsage, err := p.getPrevMachineConfigMemoryUsage(ctx, mc, cluster, prevCount-1)
if err != nil {
return err
}
availableMemoryMiB += prevUsage
updateMemoryUsageMap(mc, needMemoryMiB, availableMemoryMiB, prevUsage, mu)
return nil
}

func updateMemoryUsageMap(mc *v1alpha1.VSphereMachineConfig, needMiB, availableMiB, prevUsage int, mu map[string]*memoryUsage) {
if _, ok := mu[mc.Spec.ResourcePool]; ok {
mu[mc.Spec.ResourcePool].needMemoryMiB += needMiB
mu[mc.Spec.ResourcePool].availableMemoryMiB += prevUsage
} else {
mu[mc.Spec.ResourcePool] = &memoryUsage{
availableMemoryMiB: availableMiB,
needMemoryMiB: needMiB,
}
}
}

func (p *vsphereProvider) validateMemoryUsageForCreate(ctx context.Context, dc *anywherev1.VSphereDatacenterConfig, clusterSpec *Spec) error {
memoryUsage := make(map[string]*memoryUsage)
datacenter := dc.Spec.Datacenter
cpMachineConfig := clusterSpec.controlPlaneMachineConfig()
controlPlaneAvailableMiB, controlPlaneNeedMiB, err := p.getMachineConfigMemoryRequirements(ctx, datacenter, cpMachineConfig, clusterSpec.Cluster.Spec.ControlPlaneConfiguration.Count)
if err != nil {
return err
}
updateMemoryUsageMap(cpMachineConfig, controlPlaneNeedMiB, controlPlaneAvailableMiB, 0, memoryUsage)
for _, workerNodeGroupConfiguration := range clusterSpec.Cluster.Spec.WorkerNodeGroupConfigurations {
workerMachineConfig := clusterSpec.workerMachineConfig(workerNodeGroupConfiguration)
workerAvailableMiB, workerNeedMiB, err := p.getMachineConfigMemoryRequirements(ctx, datacenter, workerMachineConfig, *workerNodeGroupConfiguration.Count)
if err != nil {
return err
}
updateMemoryUsageMap(workerMachineConfig, workerNeedMiB, workerAvailableMiB, 0, memoryUsage)
}
etcdMachineConfig := clusterSpec.etcdMachineConfig()
if etcdMachineConfig != nil {
etcdAvailableMiB, etcdNeedMiB, err := p.getMachineConfigMemoryRequirements(ctx, datacenter, etcdMachineConfig, clusterSpec.Cluster.Spec.ExternalEtcdConfiguration.Count)
if err != nil {
return err
}
updateMemoryUsageMap(etcdMachineConfig, etcdNeedMiB, etcdAvailableMiB, 0, memoryUsage)
}
for resourcePool, usage := range memoryUsage {
if usage.availableMemoryMiB != -1 {
if usage.needMemoryMiB > usage.availableMemoryMiB {
return fmt.Errorf("not enough memory avaialable in resource pool %v for given memoryMiB and count for respective machine groups", resourcePool)
}
}
}
return nil
}

func (p *vsphereProvider) validateMemoryUsageForUpgrade(ctx context.Context, dc *anywherev1.VSphereDatacenterConfig, currentClusterSpec *Spec, cluster *types.Cluster) error {
memoryUsage := make(map[string]*memoryUsage)
prevEksaCluster, err := p.providerKubectlClient.GetEksaCluster(ctx, cluster, currentClusterSpec.Cluster.GetName())
if err != nil {
return err
}

datacenter := dc.Spec.Datacenter
cpMachineConfig := currentClusterSpec.controlPlaneMachineConfig()
if err := p.calculateResourcePoolMemoryUsage(ctx, datacenter, cpMachineConfig, cluster, memoryUsage, prevEksaCluster.Spec.ControlPlaneConfiguration.Count, currentClusterSpec.Cluster.Spec.ControlPlaneConfiguration.Count); err != nil {
return fmt.Errorf("calculating memory usage for control plane: %v", err)
}

prevMachineConfigRefs := machineRefSliceToMap(prevEksaCluster.MachineConfigRefs())
if err := p.getWorkerNodeGroupMemoryUsage(ctx, datacenter, currentClusterSpec, cluster, memoryUsage, prevMachineConfigRefs); err != nil {
return fmt.Errorf("calculating memory usage for worker node groups: %v", err)
}

etcdMachineConfig := currentClusterSpec.etcdMachineConfig()
if etcdMachineConfig != nil {
etcdAvailableMiB, etcdNeedMiB, err := p.getMachineConfigMemoryRequirements(ctx, datacenter, etcdMachineConfig, currentClusterSpec.Cluster.Spec.ExternalEtcdConfiguration.Count)
if err != nil {
return err
}
// older etcd machines are not deleted until the end of rolling so do not account for the previous usage
updateMemoryUsageMap(etcdMachineConfig, etcdNeedMiB, etcdAvailableMiB, 0, memoryUsage)
}

for resourcePool, usage := range memoryUsage {
if usage.availableMemoryMiB != -1 && usage.needMemoryMiB > usage.availableMemoryMiB {
return fmt.Errorf("not enough memory avaialable in resource pool %v for given memoryMiB and count for respective machine groups", resourcePool)
}
}
return nil
}

func (p *vsphereProvider) getWorkerNodeGroupMemoryUsage(ctx context.Context, datacenter string, currentClusterSpec *Spec, cluster *types.Cluster, memoryUsage map[string]*memoryUsage, prevMachineConfigRefs map[string]v1alpha1.Ref) error {
for _, workerNodeGroupConfiguration := range currentClusterSpec.Cluster.Spec.WorkerNodeGroupConfigurations {
prevCount := 0
workerMachineConfig := currentClusterSpec.workerMachineConfig(workerNodeGroupConfiguration)
if _, ok := prevMachineConfigRefs[workerNodeGroupConfiguration.MachineGroupRef.Name]; ok {
prevCount = *workerNodeGroupConfiguration.Count
} else {
// set count to 1 when no previous machines were found in the resource pool to avoid a negative count in calculation
prevCount = 1
}
if err := p.calculateResourcePoolMemoryUsage(ctx, datacenter, workerMachineConfig, cluster, memoryUsage, prevCount, *workerNodeGroupConfiguration.Count); err != nil {
return fmt.Errorf("calculating memory usage: %v", err)
}
}
return nil
}

func (p *vsphereProvider) UpdateSecrets(ctx context.Context, cluster *types.Cluster, _ *cluster.Spec) error {
var contents bytes.Buffer
err := p.createSecret(ctx, cluster, &contents)
Expand Down
37 changes: 23 additions & 14 deletions pkg/providers/vsphere/vsphere_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ func (pc *DummyProviderGovcClient) GetHardDiskSize(ctx context.Context, vm, data
return map[string]float64{"Hard disk 1": 23068672}, nil
}

func (pc *DummyProviderGovcClient) GetResourcePoolInfo(ctx context.Context, datacenter, resourcePool string, args ...string) (map[string]int, error) {
return map[string]int{"Memory_Available": -1}, nil
}

func (pc *DummyProviderGovcClient) GetTags(ctx context.Context, path string) (tags []string, err error) {
return []string{eksd119ReleaseTag, eksd121ReleaseTag, pc.osTag}, nil
}
Expand Down Expand Up @@ -1469,8 +1473,8 @@ func TestSetupAndValidateUpgradeClusterMissingPrivError(t *testing.T) {
provider.providerKubectlClient = kubectl
setupContext(t)

kubectl.EXPECT().GetEksaCluster(ctx, cluster, clusterSpec.Cluster.GetName()).Return(clusterSpec.Cluster.DeepCopy(), nil).Times(1)
kubectl.EXPECT().GetEksaVSphereMachineConfig(ctx, gomock.Any(), cluster.KubeconfigFile, clusterSpec.Cluster.GetNamespace()).Times(3)
kubectl.EXPECT().GetEksaCluster(ctx, cluster, clusterSpec.Cluster.GetName()).Return(clusterSpec.Cluster.DeepCopy(), nil).Times(2)
kubectl.EXPECT().GetEksaVSphereMachineConfig(ctx, gomock.Any(), cluster.KubeconfigFile, clusterSpec.Cluster.GetNamespace()).Times(5)

vscb := mocks.NewMockVSphereClientBuilder(mockCtrl)
vscb.EXPECT().Build(ctx, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), clusterSpec.VSphereDatacenter.Spec.Datacenter).Return(nil, fmt.Errorf("error"))
Expand Down Expand Up @@ -1735,8 +1739,8 @@ func TestSetupAndValidateUpgradeCluster(t *testing.T) {
provider.providerKubectlClient = kubectl
setupContext(t)

kubectl.EXPECT().GetEksaCluster(ctx, cluster, clusterSpec.Cluster.GetName()).Return(clusterSpec.Cluster.DeepCopy(), nil).Times(2)
kubectl.EXPECT().GetEksaVSphereMachineConfig(ctx, gomock.Any(), cluster.KubeconfigFile, clusterSpec.Cluster.GetNamespace()).Times(3)
kubectl.EXPECT().GetEksaCluster(ctx, cluster, clusterSpec.Cluster.GetName()).Return(clusterSpec.Cluster.DeepCopy(), nil).Times(3)
kubectl.EXPECT().GetEksaVSphereMachineConfig(ctx, gomock.Any(), cluster.KubeconfigFile, clusterSpec.Cluster.GetNamespace()).Times(5)
err := provider.SetupAndValidateUpgradeCluster(ctx, cluster, clusterSpec, clusterSpec)
if err != nil {
t.Fatalf("unexpected failure %v", err)
Expand Down Expand Up @@ -1799,8 +1803,8 @@ func TestSetupAndValidateUpgradeClusterCPSshNotExists(t *testing.T) {
provider.providerKubectlClient = kubectl

cluster := &types.Cluster{}
kubectl.EXPECT().GetEksaCluster(ctx, cluster, clusterSpec.Cluster.GetName()).Return(clusterSpec.Cluster.DeepCopy(), nil).Times(2)
kubectl.EXPECT().GetEksaVSphereMachineConfig(ctx, gomock.Any(), cluster.KubeconfigFile, clusterSpec.Cluster.GetNamespace()).Times(3)
kubectl.EXPECT().GetEksaCluster(ctx, cluster, clusterSpec.Cluster.GetName()).Return(clusterSpec.Cluster.DeepCopy(), nil).Times(3)
kubectl.EXPECT().GetEksaVSphereMachineConfig(ctx, gomock.Any(), cluster.KubeconfigFile, clusterSpec.Cluster.GetNamespace()).Times(5)
err := provider.SetupAndValidateUpgradeCluster(ctx, cluster, clusterSpec, clusterSpec)
if err != nil {
t.Fatalf("unexpected failure %v", err)
Expand All @@ -1820,8 +1824,8 @@ func TestSetupAndValidateUpgradeClusterWorkerSshNotExists(t *testing.T) {
provider.providerKubectlClient = kubectl

cluster := &types.Cluster{}
kubectl.EXPECT().GetEksaCluster(ctx, cluster, clusterSpec.Cluster.GetName()).Return(clusterSpec.Cluster.DeepCopy(), nil).Times(2)
kubectl.EXPECT().GetEksaVSphereMachineConfig(ctx, gomock.Any(), cluster.KubeconfigFile, clusterSpec.Cluster.GetNamespace()).Times(3)
kubectl.EXPECT().GetEksaCluster(ctx, cluster, clusterSpec.Cluster.GetName()).Return(clusterSpec.Cluster.DeepCopy(), nil).Times(3)
kubectl.EXPECT().GetEksaVSphereMachineConfig(ctx, gomock.Any(), cluster.KubeconfigFile, clusterSpec.Cluster.GetNamespace()).Times(5)

err := provider.SetupAndValidateUpgradeCluster(ctx, cluster, clusterSpec, clusterSpec)
if err != nil {
Expand All @@ -1842,8 +1846,8 @@ func TestSetupAndValidateUpgradeClusterEtcdSshNotExists(t *testing.T) {
provider.providerKubectlClient = kubectl

cluster := &types.Cluster{}
kubectl.EXPECT().GetEksaCluster(ctx, cluster, clusterSpec.Cluster.GetName()).Return(clusterSpec.Cluster.DeepCopy(), nil).Times(2)
kubectl.EXPECT().GetEksaVSphereMachineConfig(ctx, gomock.Any(), cluster.KubeconfigFile, clusterSpec.Cluster.GetNamespace()).Times(3)
kubectl.EXPECT().GetEksaCluster(ctx, cluster, clusterSpec.Cluster.GetName()).Return(clusterSpec.Cluster.DeepCopy(), nil).Times(3)
kubectl.EXPECT().GetEksaVSphereMachineConfig(ctx, gomock.Any(), cluster.KubeconfigFile, clusterSpec.Cluster.GetNamespace()).Times(5)

err := provider.SetupAndValidateUpgradeCluster(ctx, cluster, clusterSpec, clusterSpec)
if err != nil {
Expand All @@ -1864,10 +1868,11 @@ func TestSetupAndValidateUpgradeClusterSameMachineConfigforCPandEtcd(t *testing.
provider.providerKubectlClient = kubectl

cluster := &types.Cluster{}
kubectl.EXPECT().GetEksaCluster(ctx, cluster, clusterSpec.Cluster.GetName()).Return(clusterSpec.Cluster.DeepCopy(), nil).Times(2)
for _, mc := range clusterSpec.VSphereMachineConfigs {
kubectl.EXPECT().GetEksaVSphereMachineConfig(ctx, gomock.Any(), cluster.KubeconfigFile, clusterSpec.Cluster.GetNamespace()).Return(mc, nil)
}
kubectl.EXPECT().GetEksaCluster(ctx, cluster, clusterSpec.Cluster.GetName()).Return(clusterSpec.Cluster.DeepCopy(), nil).Times(3)
kubectl.EXPECT().GetEksaVSphereMachineConfig(ctx, gomock.Any(), cluster.KubeconfigFile, clusterSpec.Cluster.GetNamespace()).Times(5)
// for _, mc := range clusterSpec.VSphereMachineConfigs {
// kubectl.EXPECT().GetEksaVSphereMachineConfig(ctx, gomock.Any(), cluster.KubeconfigFile, clusterSpec.Cluster.GetNamespace()).Return(mc, nil)
// }

err := provider.SetupAndValidateUpgradeCluster(ctx, cluster, clusterSpec, clusterSpec)
if err != nil {
Expand Down Expand Up @@ -2338,6 +2343,10 @@ func TestSetupAndValidateCreateClusterFullCloneDiskGiBLessThan20TemplateDiskSize
tt.govc.EXPECT().GetWorkloadAvailableSpace(tt.ctx, tt.clusterSpec.VSphereMachineConfigs[controlPlaneMachineConfigName].Spec.Datastore).Return(100.0, nil)
tt.ipValidator.EXPECT().ValidateControlPlaneIPUniqueness(tt.cluster)

resourcePoolResponse := map[string]int{
"Memory_Available": -1,
}
tt.govc.EXPECT().GetResourcePoolInfo(tt.ctx, tt.clusterSpec.VSphereDatacenter.Spec.Datacenter, tt.clusterSpec.VSphereMachineConfigs[controlPlaneMachineConfigName].Spec.ResourcePool).Return(resourcePoolResponse, nil)
err := tt.provider.SetupAndValidateCreateCluster(context.Background(), tt.clusterSpec)

assert.NoError(t, err, "No error expected for provider.SetupAndValidateCreateCluster()")
Expand Down

0 comments on commit c2be09f

Please sign in to comment.