From 2c85128b90c0a96a2096ad472d9282d293b27ccf Mon Sep 17 00:00:00 2001 From: TwinProduction Date: Fri, 8 May 2020 11:34:18 -0400 Subject: [PATCH] Closes #1: Filter out pods from daemon sets when calculating resources --- k8s/util.go | 11 +++ k8s/util_test.go | 32 ++++---- k8stest/k8stest.go | 8 +- main.go | 12 +-- main_test.go | 178 ++++++++++++++++++++++++++++++++++++++------- 5 files changed, 191 insertions(+), 50 deletions(-) diff --git a/k8s/util.go b/k8s/util.go index 151b2c4f..db8df9be 100644 --- a/k8s/util.go +++ b/k8s/util.go @@ -45,6 +45,17 @@ func CheckIfNodeHasEnoughResourcesToTransferAllPodsInNodes(kubernetesClient Kube return true } for _, podInNode := range podsInNode { + // Ignore DaemonSets in the old node, because these pods will also be present in the target nodes + hasDaemonSetOwnerReference := false + for _, owner := range podInNode.GetOwnerReferences() { + if owner.Kind == "DaemonSet" { + hasDaemonSetOwnerReference = true + break + } + } + if hasDaemonSetOwnerReference { + continue + } for _, container := range podInNode.Spec.Containers { if container.Resources.Requests.Cpu() != nil { // Subtract the cpu request of the pod from the node's total allocatable diff --git a/k8s/util_test.go b/k8s/util_test.go index 368209ae..1356d2b7 100644 --- a/k8s/util_test.go +++ b/k8s/util_test.go @@ -13,7 +13,7 @@ func TestCheckIfNodeHasEnoughResourcesToTransferAllPodsInNodes(t *testing.T) { // space in the target nodes) oldNode := k8stest.CreateTestNode("old-node", "0m", "0m") newNode := k8stest.CreateTestNode("new-node-1", "1000m", "1000Mi") - oldNodePod := k8stest.CreateTestPod("old-pod-1", oldNode.Name, "100m", "100Mi") + oldNodePod := k8stest.CreateTestPod("old-pod-1", oldNode.Name, "100m", "100Mi", false) mockKubernetesClient := k8stest.NewMockKubernetesClient([]v1.Node{oldNode, newNode}, []v1.Pod{oldNodePod}) hasEnoughResources := CheckIfNodeHasEnoughResourcesToTransferAllPodsInNodes(mockKubernetesClient, &oldNode, []*v1.Node{&newNode}) @@ -28,8 +28,8 @@ func TestCheckIfNodeHasEnoughResourcesToTransferAllPodsInNodes(t *testing.T) { func TestCheckIfNodeHasEnoughResourcesToTransferAllPodsInNodes_whenNotEnoughSpaceInNewNodes(t *testing.T) { oldNode := k8stest.CreateTestNode("old-node", "0m", "0m") newNode := k8stest.CreateTestNode("new-node-1", "1000m", "1000Mi") - oldNodePod := k8stest.CreateTestPod("old-pod-1", oldNode.Name, "200m", "200Mi") - newNodePod := k8stest.CreateTestPod("new-pod-1", newNode.Name, "900m", "200Mi") + oldNodePod := k8stest.CreateTestPod("old-pod-1", oldNode.Name, "200m", "200Mi", false) + newNodePod := k8stest.CreateTestPod("new-pod-1", newNode.Name, "900m", "200Mi", false) mockKubernetesClient := k8stest.NewMockKubernetesClient([]v1.Node{oldNode, newNode}, []v1.Pod{oldNodePod, newNodePod}) hasEnoughResources := CheckIfNodeHasEnoughResourcesToTransferAllPodsInNodes(mockKubernetesClient, &oldNode, []*v1.Node{&newNode}) @@ -44,10 +44,10 @@ func TestCheckIfNodeHasEnoughResourcesToTransferAllPodsInNodes_whenNotEnoughSpac func TestCheckIfNodeHasEnoughResourcesToTransferAllPodsInNodes_withMultiplePods(t *testing.T) { oldNode := k8stest.CreateTestNode("old-node", "0m", "0m") newNode := k8stest.CreateTestNode("new-node-1", "1000m", "1000Mi") - oldNodeFirstPod := k8stest.CreateTestPod("old-pod-1", oldNode.Name, "300m", "0") - oldNodeSecondPod := k8stest.CreateTestPod("old-pod-2", oldNode.Name, "300m", "0") - oldNodeThirdPod := k8stest.CreateTestPod("old-pod-3", oldNode.Name, "300m", "0") - newNodePod := k8stest.CreateTestPod("new-pod-1", newNode.Name, "200m", "200Mi") + oldNodeFirstPod := k8stest.CreateTestPod("old-pod-1", oldNode.Name, "300m", "0", false) + oldNodeSecondPod := k8stest.CreateTestPod("old-pod-2", oldNode.Name, "300m", "0", false) + oldNodeThirdPod := k8stest.CreateTestPod("old-pod-3", oldNode.Name, "300m", "0", false) + newNodePod := k8stest.CreateTestPod("new-pod-1", newNode.Name, "200m", "200Mi", false) mockKubernetesClient := k8stest.NewMockKubernetesClient([]v1.Node{oldNode, newNode}, []v1.Pod{oldNodeFirstPod, oldNodeSecondPod, oldNodeThirdPod, newNodePod}) hasEnoughResources := CheckIfNodeHasEnoughResourcesToTransferAllPodsInNodes(mockKubernetesClient, &oldNode, []*v1.Node{&newNode}) @@ -63,9 +63,9 @@ func TestCheckIfNodeHasEnoughResourcesToTransferAllPodsInNodes_withMultipleTarge oldNode := k8stest.CreateTestNode("old-node", "0m", "0m") firstNewNode := k8stest.CreateTestNode("new-node-1", "1000m", "1000Mi") secondNewNode := k8stest.CreateTestNode("new-node-2", "1000m", "1000Mi") - oldNodeFirstPod := k8stest.CreateTestPod("old-node-pod-1", oldNode.Name, "500m", "0") - oldNodeSecondPod := k8stest.CreateTestPod("old-node-pod-2", oldNode.Name, "500m", "0") - oldNodeThirdPod := k8stest.CreateTestPod("old-node-pod-3", oldNode.Name, "500m", "0") + oldNodeFirstPod := k8stest.CreateTestPod("old-node-pod-1", oldNode.Name, "500m", "0", false) + oldNodeSecondPod := k8stest.CreateTestPod("old-node-pod-2", oldNode.Name, "500m", "0", false) + oldNodeThirdPod := k8stest.CreateTestPod("old-node-pod-3", oldNode.Name, "500m", "0", false) mockKubernetesClient := k8stest.NewMockKubernetesClient([]v1.Node{oldNode, firstNewNode, secondNewNode}, []v1.Pod{oldNodeFirstPod, oldNodeSecondPod, oldNodeThirdPod}) hasEnoughResources := CheckIfNodeHasEnoughResourcesToTransferAllPodsInNodes(mockKubernetesClient, &oldNode, []*v1.Node{&firstNewNode, &secondNewNode}) @@ -81,11 +81,11 @@ func TestCheckIfNodeHasEnoughResourcesToTransferAllPodsInNodes_withPodsSpreadAcr oldNode := k8stest.CreateTestNode("old-node", "0m", "0m") firstNewNode := k8stest.CreateTestNode("new-node-1", "1000m", "1000Mi") secondNewNode := k8stest.CreateTestNode("new-node-2", "1000m", "1000Mi") - firstNewNodePod := k8stest.CreateTestPod("new-node-1-pod-1", oldNode.Name, "0", "300Mi") - secondNewNodePod := k8stest.CreateTestPod("new-node-2-pod-1", oldNode.Name, "0", "300Mi") - oldNodeFirstPod := k8stest.CreateTestPod("old-node-pod-1", oldNode.Name, "0", "500Mi") - oldNodeSecondPod := k8stest.CreateTestPod("old-node-pod-2", oldNode.Name, "0", "500Mi") - oldNodeThirdPod := k8stest.CreateTestPod("old-node-pod-3", oldNode.Name, "0", "500Mi") + firstNewNodePod := k8stest.CreateTestPod("new-node-1-pod-1", oldNode.Name, "0", "300Mi", false) + secondNewNodePod := k8stest.CreateTestPod("new-node-2-pod-1", oldNode.Name, "0", "300Mi", false) + oldNodeFirstPod := k8stest.CreateTestPod("old-node-pod-1", oldNode.Name, "0", "500Mi", false) + oldNodeSecondPod := k8stest.CreateTestPod("old-node-pod-2", oldNode.Name, "0", "500Mi", false) + oldNodeThirdPod := k8stest.CreateTestPod("old-node-pod-3", oldNode.Name, "0", "500Mi", false) mockKubernetesClient := k8stest.NewMockKubernetesClient([]v1.Node{oldNode, firstNewNode, secondNewNode}, []v1.Pod{oldNodeFirstPod, oldNodeSecondPod, oldNodeThirdPod, firstNewNodePod, secondNewNodePod}) hasEnoughResources := CheckIfNodeHasEnoughResourcesToTransferAllPodsInNodes(mockKubernetesClient, &oldNode, []*v1.Node{&firstNewNode, &secondNewNode}) @@ -99,7 +99,7 @@ func TestCheckIfNodeHasEnoughResourcesToTransferAllPodsInNodes_withPodsSpreadAcr func TestCheckIfNodeHasEnoughResourcesToTransferAllPodsInNodes_withNoTargetNodes(t *testing.T) { oldNode := k8stest.CreateTestNode("old-node", "0m", "0m") - oldNodePod := k8stest.CreateTestPod("old-node-pod-1", oldNode.Name, "500Mi", "500Mi") + oldNodePod := k8stest.CreateTestPod("old-node-pod-1", oldNode.Name, "500Mi", "500Mi", false) mockKubernetesClient := k8stest.NewMockKubernetesClient([]v1.Node{oldNode}, []v1.Pod{oldNodePod}) hasEnoughResources := CheckIfNodeHasEnoughResourcesToTransferAllPodsInNodes(mockKubernetesClient, &oldNode, []*v1.Node{}) diff --git a/k8stest/k8stest.go b/k8stest/k8stest.go index 11cfe8cb..07978ff0 100644 --- a/k8stest/k8stest.go +++ b/k8stest/k8stest.go @@ -4,6 +4,7 @@ import ( "errors" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) type MockKubernetesClient struct { @@ -84,7 +85,7 @@ func CreateTestNode(name string, allocatableCpu, allocatableMemory string) v1.No return node } -func CreateTestPod(name string, nodeName, cpuRequest, cpuMemory string) v1.Pod { +func CreateTestPod(name, nodeName, cpuRequest, cpuMemory string, isDaemonSet bool) v1.Pod { pod := v1.Pod{ Spec: v1.PodSpec{ NodeName: nodeName, @@ -100,5 +101,10 @@ func CreateTestPod(name string, nodeName, cpuRequest, cpuMemory string) v1.Pod { }, } pod.SetName(name) + if isDaemonSet { + pod.SetOwnerReferences([]metav1.OwnerReference{{Kind: "DaemonSet"}}) + } else { + pod.SetOwnerReferences([]metav1.OwnerReference{{Kind: "ReplicaSet"}}) + } return pod } diff --git a/main.go b/main.go index 8cab4ef4..e59c9037 100644 --- a/main.go +++ b/main.go @@ -60,8 +60,7 @@ func HandleRollingUpgrade(kubernetesClient k8s.KubernetesClientApi, ec2Service e for _, autoScalingGroup := range autoScalingGroups { outdatedInstances, updatedInstances, err := SeparateOutdatedFromUpdatedInstances(autoScalingGroup, ec2Service) if err != nil { - log.Printf("[%s] Unable to separate outdated instances from updated instances: %v", aws.StringValue(autoScalingGroup.AutoScalingGroupName), err.Error()) - log.Printf("[%s] Skipping", aws.StringValue(autoScalingGroup.AutoScalingGroupName)) + log.Printf("[%s] Skipping because unable to separate outdated instances from updated instances: %v", aws.StringValue(autoScalingGroup.AutoScalingGroupName), err.Error()) continue } if config.Get().Debug { @@ -94,8 +93,7 @@ func HandleRollingUpgrade(kubernetesClient k8s.KubernetesClientApi, ec2Service e for _, outdatedInstance := range outdatedInstances { node, err := kubernetesClient.GetNodeByHostName(aws.StringValue(outdatedInstance.InstanceId)) if err != nil { - log.Printf("[%s][%s] Unable to get outdated node from Kubernetes: %v", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId), err.Error()) - log.Printf("[%s][%s] Skipping", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId)) + log.Printf("[%s][%s] Skipping because unable to get outdated node from Kubernetes: %v", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId), err.Error()) continue } @@ -107,8 +105,7 @@ func HandleRollingUpgrade(kubernetesClient k8s.KubernetesClientApi, ec2Service e // Annotate the node to persist the fact that the rolling update process has begun err := k8s.AnnotateNodeByHostName(kubernetesClient, aws.StringValue(outdatedInstance.InstanceId), k8s.RollingUpdateStartedTimestampAnnotationKey, time.Now().Format(time.RFC3339)) if err != nil { - log.Printf("[%s][%s] Unable to annotate node: %v", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId), err.Error()) - log.Printf("[%s][%s] Skipping", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId)) + log.Printf("[%s][%s] Skipping because unable to annotate node: %v", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId), err.Error()) continue } } else { @@ -121,8 +118,7 @@ func HandleRollingUpgrade(kubernetesClient k8s.KubernetesClientApi, ec2Service e log.Printf("[%s][%s] Draining node", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId)) err := kubernetesClient.Drain(node.Name, config.Get().IgnoreDaemonSets, config.Get().DeleteLocalData) if err != nil { - log.Printf("[%s][%s] Ran into error while draining node: %v", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId), err.Error()) - log.Printf("[%s][%s] Skipping", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId)) + log.Printf("[%s][%s] Skipping because ran into error while draining node: %v", aws.StringValue(autoScalingGroup.AutoScalingGroupName), aws.StringValue(outdatedInstance.InstanceId), err.Error()) continue } else { // Only annotate if no error was encountered diff --git a/main_test.go b/main_test.go index 714630af..1322fa97 100644 --- a/main_test.go +++ b/main_test.go @@ -122,7 +122,7 @@ func TestHandleRollingUpgrade(t *testing.T) { asg := cloudtest.CreateTestAutoScalingGroup("asg", "v2", nil, []*autoscaling.Instance{oldInstance}) oldNode := k8stest.CreateTestNode(aws.StringValue(oldInstance.InstanceId), "1000m", "1000Mi") - oldNodePod := k8stest.CreateTestPod("old-pod-1", oldNode.Name, "100m", "100Mi") + oldNodePod := k8stest.CreateTestPod("old-pod-1", oldNode.Name, "100m", "100Mi", false) mockKubernetesClient := k8stest.NewMockKubernetesClient([]v1.Node{oldNode}, []v1.Pod{oldNodePod}) mockEc2Service := cloudtest.NewMockEC2Service(nil) @@ -133,14 +133,14 @@ func TestHandleRollingUpgrade(t *testing.T) { if mockKubernetesClient.Counter["UpdateNode"] != 1 { t.Error("Node should've been annotated, meaning that UpdateNode should've been called once") } - oldNodeAfterFirstRun := mockKubernetesClient.Nodes[aws.StringValue(oldInstance.InstanceId)] - if _, ok := oldNodeAfterFirstRun.GetAnnotations()[k8s.RollingUpdateStartedTimestampAnnotationKey]; !ok { + oldNode = mockKubernetesClient.Nodes[aws.StringValue(oldInstance.InstanceId)] + if _, ok := oldNode.GetAnnotations()[k8s.RollingUpdateStartedTimestampAnnotationKey]; !ok { t.Error("Node should've been annotated with", k8s.RollingUpdateStartedTimestampAnnotationKey) } - if _, ok := oldNodeAfterFirstRun.GetAnnotations()[k8s.RollingUpdateDrainedTimestampAnnotationKey]; ok { + if _, ok := oldNode.GetAnnotations()[k8s.RollingUpdateDrainedTimestampAnnotationKey]; ok { t.Error("Node shouldn't have been drained yet, therefore shouldn't have been annotated with", k8s.RollingUpdateDrainedTimestampAnnotationKey) } - if _, ok := oldNodeAfterFirstRun.GetAnnotations()[k8s.RollingUpdateTerminatedTimestampAnnotationKey]; ok { + if _, ok := oldNode.GetAnnotations()[k8s.RollingUpdateTerminatedTimestampAnnotationKey]; ok { t.Error("Node shouldn't have been terminated yet, therefore shouldn't have been annotated with", k8s.RollingUpdateTerminatedTimestampAnnotationKey) } @@ -149,12 +149,12 @@ func TestHandleRollingUpgrade(t *testing.T) { if mockAutoScalingService.Counter["SetDesiredCapacity"] != 1 { t.Error("ASG should've been increased because there's no updated nodes yet") } - asgAfterSecondRun := mockAutoScalingService.AutoScalingGroups[aws.StringValue(asg.AutoScalingGroupName)] - if aws.Int64Value(asgAfterSecondRun.DesiredCapacity) != 2 { + asg = mockAutoScalingService.AutoScalingGroups[aws.StringValue(asg.AutoScalingGroupName)] + if aws.Int64Value(asg.DesiredCapacity) != 2 { t.Error("The desired capacity of the ASG should've been increased to 2") } - oldNodeAfterSecondRun := mockKubernetesClient.Nodes[aws.StringValue(oldInstance.InstanceId)] - if _, ok := oldNodeAfterSecondRun.GetAnnotations()[k8s.RollingUpdateDrainedTimestampAnnotationKey]; ok { + oldNode = mockKubernetesClient.Nodes[aws.StringValue(oldInstance.InstanceId)] + if _, ok := oldNode.GetAnnotations()[k8s.RollingUpdateDrainedTimestampAnnotationKey]; ok { t.Error("Node shouldn't have been drained yet, therefore shouldn't have been annotated with", k8s.RollingUpdateDrainedTimestampAnnotationKey) } @@ -163,12 +163,12 @@ func TestHandleRollingUpgrade(t *testing.T) { if mockAutoScalingService.Counter["SetDesiredCapacity"] != 1 { t.Error("Desired capacity shouldn't have been updated") } - asgAfterThirdRun := mockAutoScalingService.AutoScalingGroups[aws.StringValue(asg.AutoScalingGroupName)] - if aws.Int64Value(asgAfterThirdRun.DesiredCapacity) != 2 { + asg = mockAutoScalingService.AutoScalingGroups[aws.StringValue(asg.AutoScalingGroupName)] + if aws.Int64Value(asg.DesiredCapacity) != 2 { t.Error("The desired capacity of the ASG should've stayed at 2") } - oldNodeAfterThirdRun := mockKubernetesClient.Nodes[aws.StringValue(oldInstance.InstanceId)] - if _, ok := oldNodeAfterThirdRun.GetAnnotations()[k8s.RollingUpdateDrainedTimestampAnnotationKey]; ok { + oldNode = mockKubernetesClient.Nodes[aws.StringValue(oldInstance.InstanceId)] + if _, ok := oldNode.GetAnnotations()[k8s.RollingUpdateDrainedTimestampAnnotationKey]; ok { t.Error("Node shouldn't have been drained yet, therefore shouldn't have been annotated with", k8s.RollingUpdateDrainedTimestampAnnotationKey) } @@ -179,16 +179,16 @@ func TestHandleRollingUpgrade(t *testing.T) { if mockAutoScalingService.Counter["SetDesiredCapacity"] != 1 { t.Error("Desired capacity shouldn't have been updated") } - oldNodeAfterFourthRun := mockKubernetesClient.Nodes[aws.StringValue(oldInstance.InstanceId)] - if _, ok := oldNodeAfterFourthRun.GetAnnotations()[k8s.RollingUpdateDrainedTimestampAnnotationKey]; ok { + oldNode = mockKubernetesClient.Nodes[aws.StringValue(oldInstance.InstanceId)] + if _, ok := oldNode.GetAnnotations()[k8s.RollingUpdateDrainedTimestampAnnotationKey]; ok { t.Error("Node shouldn't have been drained yet, therefore shouldn't have been annotated with", k8s.RollingUpdateDrainedTimestampAnnotationKey) } // Fifth run (new instance is now InService, but node has still not joined cluster (GetNodeByHostName should return not found)) newInstance.SetLifecycleState("InService") HandleRollingUpgrade(mockKubernetesClient, mockEc2Service, mockAutoScalingService, []*autoscaling.Group{asg}) - oldNodeAfterFifthRun := mockKubernetesClient.Nodes[aws.StringValue(oldInstance.InstanceId)] - if _, ok := oldNodeAfterFifthRun.GetAnnotations()[k8s.RollingUpdateDrainedTimestampAnnotationKey]; ok { + oldNode = mockKubernetesClient.Nodes[aws.StringValue(oldInstance.InstanceId)] + if _, ok := oldNode.GetAnnotations()[k8s.RollingUpdateDrainedTimestampAnnotationKey]; ok { t.Error("Node shouldn't have been drained yet, therefore shouldn't have been annotated with", k8s.RollingUpdateDrainedTimestampAnnotationKey) } @@ -197,21 +197,149 @@ func TestHandleRollingUpgrade(t *testing.T) { newNode.Status.Conditions = []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionFalse}} mockKubernetesClient.Nodes[newNode.Name] = newNode HandleRollingUpgrade(mockKubernetesClient, mockEc2Service, mockAutoScalingService, []*autoscaling.Group{asg}) - oldNodeAfterSixthRun := mockKubernetesClient.Nodes[aws.StringValue(oldInstance.InstanceId)] - if _, ok := oldNodeAfterSixthRun.GetAnnotations()[k8s.RollingUpdateDrainedTimestampAnnotationKey]; ok { + oldNode = mockKubernetesClient.Nodes[aws.StringValue(oldInstance.InstanceId)] + if _, ok := oldNode.GetAnnotations()[k8s.RollingUpdateDrainedTimestampAnnotationKey]; ok { t.Error("Node shouldn't have been drained yet, therefore shouldn't have been annotated with", k8s.RollingUpdateDrainedTimestampAnnotationKey) } // Seventh run (Kubelet is ready to accept new pods. Old node gets drained and terminated) - newNodeAfterSeventhRun := mockKubernetesClient.Nodes[newNode.Name] - newNodeAfterSeventhRun.Status.Conditions = []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}} - mockKubernetesClient.Nodes[newNode.Name] = newNodeAfterSeventhRun + newNode = mockKubernetesClient.Nodes[newNode.Name] + newNode.Status.Conditions = []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}} + mockKubernetesClient.Nodes[newNode.Name] = newNode + HandleRollingUpgrade(mockKubernetesClient, mockEc2Service, mockAutoScalingService, []*autoscaling.Group{asg}) + oldNode = mockKubernetesClient.Nodes[aws.StringValue(oldInstance.InstanceId)] + if _, ok := oldNode.GetAnnotations()[k8s.RollingUpdateDrainedTimestampAnnotationKey]; !ok { + t.Error("Node should've been drained") + } + if _, ok := oldNode.GetAnnotations()[k8s.RollingUpdateTerminatedTimestampAnnotationKey]; !ok { + t.Error("Node should've been terminated") + } +} + +func TestHandleRollingUpgrade_withEnoughPodsToRequireTwoNewNodes(t *testing.T) { + oldInstance := cloudtest.CreateTestAutoScalingInstance("old-1", "v1", nil, "InService") + asg := cloudtest.CreateTestAutoScalingGroup("asg", "v2", nil, []*autoscaling.Instance{oldInstance}) + + oldNode := k8stest.CreateTestNode(aws.StringValue(oldInstance.InstanceId), "1000m", "1000Mi") + oldNodeFirstPod := k8stest.CreateTestPod("old-pod-1", oldNode.Name, "300m", "300Mi", false) + oldNodeSecondPod := k8stest.CreateTestPod("old-pod-2", oldNode.Name, "300m", "300Mi", false) + oldNodeThirdPod := k8stest.CreateTestPod("old-pod-3", oldNode.Name, "300m", "300Mi", false) + oldNodeFourthPod := k8stest.CreateTestPod("old-pod-4", oldNode.Name, "300m", "300Mi", false) + + mockKubernetesClient := k8stest.NewMockKubernetesClient([]v1.Node{oldNode}, []v1.Pod{oldNodeFirstPod, oldNodeSecondPod, oldNodeThirdPod, oldNodeFourthPod}) + mockEc2Service := cloudtest.NewMockEC2Service(nil) + mockAutoScalingService := cloudtest.NewMockAutoScalingService([]*autoscaling.Group{asg}) + + // First run (Node rollout process gets marked as started) + HandleRollingUpgrade(mockKubernetesClient, mockEc2Service, mockAutoScalingService, []*autoscaling.Group{asg}) + if mockKubernetesClient.Counter["UpdateNode"] != 1 { + t.Error("Node should've been annotated, meaning that UpdateNode should've been called once") + } + oldNode = mockKubernetesClient.Nodes[aws.StringValue(oldInstance.InstanceId)] + if _, ok := oldNode.GetAnnotations()[k8s.RollingUpdateStartedTimestampAnnotationKey]; !ok { + t.Error("Node should've been annotated with", k8s.RollingUpdateStartedTimestampAnnotationKey) + } + if _, ok := oldNode.GetAnnotations()[k8s.RollingUpdateDrainedTimestampAnnotationKey]; ok { + t.Error("Node shouldn't have been drained yet, therefore shouldn't have been annotated with", k8s.RollingUpdateDrainedTimestampAnnotationKey) + } + if _, ok := oldNode.GetAnnotations()[k8s.RollingUpdateTerminatedTimestampAnnotationKey]; ok { + t.Error("Node shouldn't have been terminated yet, therefore shouldn't have been annotated with", k8s.RollingUpdateTerminatedTimestampAnnotationKey) + } + + // Second run (ASG's desired capacity gets increased) + HandleRollingUpgrade(mockKubernetesClient, mockEc2Service, mockAutoScalingService, []*autoscaling.Group{asg}) + if mockAutoScalingService.Counter["SetDesiredCapacity"] != 1 { + t.Error("ASG should've been increased because there's no updated nodes yet") + } + asg = mockAutoScalingService.AutoScalingGroups[aws.StringValue(asg.AutoScalingGroupName)] + if aws.Int64Value(asg.DesiredCapacity) != 2 { + t.Error("The desired capacity of the ASG should've been increased to 2") + } + oldNode = mockKubernetesClient.Nodes[aws.StringValue(oldInstance.InstanceId)] + if _, ok := oldNode.GetAnnotations()[k8s.RollingUpdateDrainedTimestampAnnotationKey]; ok { + t.Error("Node shouldn't have been drained yet, therefore shouldn't have been annotated with", k8s.RollingUpdateDrainedTimestampAnnotationKey) + } + + // Third run (Nothing changed) + HandleRollingUpgrade(mockKubernetesClient, mockEc2Service, mockAutoScalingService, []*autoscaling.Group{asg}) + if mockAutoScalingService.Counter["SetDesiredCapacity"] != 1 { + t.Error("Desired capacity shouldn't have been updated") + } + asg = mockAutoScalingService.AutoScalingGroups[aws.StringValue(asg.AutoScalingGroupName)] + if aws.Int64Value(asg.DesiredCapacity) != 2 { + t.Error("The desired capacity of the ASG should've stayed at 2") + } + oldNode = mockKubernetesClient.Nodes[aws.StringValue(oldInstance.InstanceId)] + if _, ok := oldNode.GetAnnotations()[k8s.RollingUpdateDrainedTimestampAnnotationKey]; ok { + t.Error("Node shouldn't have been drained yet, therefore shouldn't have been annotated with", k8s.RollingUpdateDrainedTimestampAnnotationKey) + } + + // Fourth run (new instance has been registered to ASG, but is pending) + newInstance := cloudtest.CreateTestAutoScalingInstance("new-1", "v2", nil, "Pending") + asg.Instances = append(asg.Instances, newInstance) + HandleRollingUpgrade(mockKubernetesClient, mockEc2Service, mockAutoScalingService, []*autoscaling.Group{asg}) + if mockAutoScalingService.Counter["SetDesiredCapacity"] != 1 { + t.Error("Desired capacity shouldn't have been updated") + } + oldNode = mockKubernetesClient.Nodes[aws.StringValue(oldInstance.InstanceId)] + if _, ok := oldNode.GetAnnotations()[k8s.RollingUpdateDrainedTimestampAnnotationKey]; ok { + t.Error("Node shouldn't have been drained yet, therefore shouldn't have been annotated with", k8s.RollingUpdateDrainedTimestampAnnotationKey) + } + + // Fifth run (new instance is now InService, but node has still not joined cluster (GetNodeByHostName should return not found)) + newInstance.SetLifecycleState("InService") + HandleRollingUpgrade(mockKubernetesClient, mockEc2Service, mockAutoScalingService, []*autoscaling.Group{asg}) + oldNode = mockKubernetesClient.Nodes[aws.StringValue(oldInstance.InstanceId)] + if _, ok := oldNode.GetAnnotations()[k8s.RollingUpdateDrainedTimestampAnnotationKey]; ok { + t.Error("Node shouldn't have been drained yet, therefore shouldn't have been annotated with", k8s.RollingUpdateDrainedTimestampAnnotationKey) + } + + // Sixth run (new instance has joined the cluster, but Kubelet isn't ready to accept pods yet) + newNode := k8stest.CreateTestNode(aws.StringValue(newInstance.InstanceId), "1000m", "1000Mi") + newNode.Status.Conditions = []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionFalse}} + mockKubernetesClient.Nodes[newNode.Name] = newNode + HandleRollingUpgrade(mockKubernetesClient, mockEc2Service, mockAutoScalingService, []*autoscaling.Group{asg}) + oldNode = mockKubernetesClient.Nodes[aws.StringValue(oldInstance.InstanceId)] + if _, ok := oldNode.GetAnnotations()[k8s.RollingUpdateDrainedTimestampAnnotationKey]; ok { + t.Error("Node shouldn't have been drained yet, therefore shouldn't have been annotated with", k8s.RollingUpdateDrainedTimestampAnnotationKey) + } + + // Seventh run (Kubelet is ready to accept new pods) + newNode = mockKubernetesClient.Nodes[newNode.Name] + newNode.Status.Conditions = []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}} + mockKubernetesClient.Nodes[newNode.Name] = newNode + HandleRollingUpgrade(mockKubernetesClient, mockEc2Service, mockAutoScalingService, []*autoscaling.Group{asg}) + oldNode = mockKubernetesClient.Nodes[aws.StringValue(oldInstance.InstanceId)] + if _, ok := oldNode.GetAnnotations()[k8s.RollingUpdateDrainedTimestampAnnotationKey]; ok { + t.Error("Node shouldn't have been drained yet, therefore shouldn't have been annotated with", k8s.RollingUpdateDrainedTimestampAnnotationKey) + } + + // Eight run (ASG's desired capacity gets increased) + HandleRollingUpgrade(mockKubernetesClient, mockEc2Service, mockAutoScalingService, []*autoscaling.Group{asg}) + if mockAutoScalingService.Counter["SetDesiredCapacity"] != 2 { + t.Error("ASG should've been increased again") + } + asg = mockAutoScalingService.AutoScalingGroups[aws.StringValue(asg.AutoScalingGroupName)] + if aws.Int64Value(asg.DesiredCapacity) != 3 { + t.Error("The desired capacity of the ASG should've been increased to 3") + } + oldNode = mockKubernetesClient.Nodes[aws.StringValue(oldInstance.InstanceId)] + if _, ok := oldNode.GetAnnotations()[k8s.RollingUpdateDrainedTimestampAnnotationKey]; ok { + t.Error("Node shouldn't have been drained yet, therefore shouldn't have been annotated with", k8s.RollingUpdateDrainedTimestampAnnotationKey) + } + + // Ninth run (fast-forward new instance, node and kubelet ready to accept. Old node gets drained and terminated) + newSecondInstance := cloudtest.CreateTestAutoScalingInstance("new-2", "v2", nil, "InService") + asg.Instances = append(asg.Instances, newSecondInstance) + newSecondNode := k8stest.CreateTestNode(aws.StringValue(newSecondInstance.InstanceId), "1000m", "1000Mi") + newSecondNode.Status.Conditions = []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}} + mockKubernetesClient.Nodes[newSecondNode.Name] = newSecondNode HandleRollingUpgrade(mockKubernetesClient, mockEc2Service, mockAutoScalingService, []*autoscaling.Group{asg}) - oldNodeAfterSeventhRun := mockKubernetesClient.Nodes[aws.StringValue(oldInstance.InstanceId)] - if _, ok := oldNodeAfterSeventhRun.GetAnnotations()[k8s.RollingUpdateDrainedTimestampAnnotationKey]; !ok { + oldNode = mockKubernetesClient.Nodes[aws.StringValue(oldInstance.InstanceId)] + if _, ok := oldNode.GetAnnotations()[k8s.RollingUpdateDrainedTimestampAnnotationKey]; !ok { t.Error("Node should've been drained") } - if _, ok := oldNodeAfterSeventhRun.GetAnnotations()[k8s.RollingUpdateTerminatedTimestampAnnotationKey]; !ok { + if _, ok := oldNode.GetAnnotations()[k8s.RollingUpdateTerminatedTimestampAnnotationKey]; !ok { t.Error("Node should've been terminated") } }