Skip to content

Commit

Permalink
Refactor in-place upgrade cleanup logic (#7656)
Browse files Browse the repository at this point in the history
  • Loading branch information
abhinavmpandey08 authored Feb 21, 2024
1 parent b63889f commit 86586ba
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 231 deletions.
68 changes: 25 additions & 43 deletions controllers/kubeadmcontrolplane_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,6 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, log logr.
return ctrl.Result{}, err
}

cpUpgrade := &anywherev1.ControlPlaneUpgrade{}
cpuGetErr := r.client.Get(ctx, GetNamespacedNameType(cpUpgradeName(kcp.Name), constants.EksaSystemNamespace), cpUpgrade)

mhc := &clusterv1.MachineHealthCheck{}
if err := r.client.Get(ctx, GetNamespacedNameType(cpMachineHealthCheckName(kcp.Name), constants.EksaSystemNamespace), mhc); err != nil {
if apierrors.IsNotFound(err) {
Expand All @@ -131,8 +128,10 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, log logr.
return ctrl.Result{}, err
}

if kcp.Spec.Replicas != nil && (*kcp.Spec.Replicas == kcp.Status.UpdatedReplicas) {
if cpuGetErr == nil && cpUpgrade.Status.Ready {
cpUpgrade := &anywherev1.ControlPlaneUpgrade{}
cpuGetErr := r.client.Get(ctx, GetNamespacedNameType(cpUpgradeName(kcp.Name), constants.EksaSystemNamespace), cpUpgrade)
if cpuGetErr == nil {
if cpUpgrade.Status.Ready && kcp.Status.Version != nil && *kcp.Status.Version == cpUpgrade.Spec.KubernetesVersion {
log.Info("Control plane upgrade complete, deleting object", "ControlPlaneUpgrade", cpUpgrade.Name)
if err := r.client.Delete(ctx, cpUpgrade); err != nil {
return ctrl.Result{}, fmt.Errorf("deleting ControlPlaneUpgrade object: %v", err)
Expand All @@ -141,55 +140,38 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, log logr.
if err := resumeMachineHealthCheck(ctx, mhc, mhcPatchHelper); err != nil {
return ctrl.Result{}, fmt.Errorf("updating annotations for machine health check: %v", err)
}
} else if !apierrors.IsNotFound(cpuGetErr) {
return ctrl.Result{}, fmt.Errorf("getting ControlPlaneUpgrade for KubeadmControlPlane %s: %v", kcp.Name, cpuGetErr)
}

log.Info("KubeadmControlPlane is ready, removing the \"in-place-upgrade-needed\" annotation")
// Remove the in-place-upgrade-needed annotation only after the ControlPlaneUpgrade object is deleted
delete(kcp.Annotations, kcpInPlaceUpgradeNeededAnnotation)
log.Info("KubeadmControlPlane is ready, removing the \"in-place-upgrade-needed\" annotation")
// Remove the in-place-upgrade-needed annotation only after the ControlPlaneUpgrade object is deleted
delete(kcp.Annotations, kcpInPlaceUpgradeNeededAnnotation)
}
return ctrl.Result{}, nil
}

if cpuGetErr != nil {
if apierrors.IsNotFound(cpuGetErr) {
log.Info("Creating ControlPlaneUpgrade object")
machines, err := r.machinesToUpgrade(ctx, kcp)
if err != nil {
return ctrl.Result{}, fmt.Errorf("retrieving list of control plane machines: %v", err)
}
cpUpgrade, err := controlPlaneUpgrade(kcp, machines)
if err != nil {
return ctrl.Result{}, fmt.Errorf("generating ControlPlaneUpgrade: %v", err)
}
if apierrors.IsNotFound(cpuGetErr) {
log.Info("Creating ControlPlaneUpgrade object")
machines, err := r.machinesToUpgrade(ctx, kcp)
if err != nil {
return ctrl.Result{}, fmt.Errorf("retrieving list of control plane machines: %v", err)
}
cpUpgrade, err := controlPlaneUpgrade(kcp, machines)
if err != nil {
return ctrl.Result{}, fmt.Errorf("generating ControlPlaneUpgrade: %v", err)
}

log.Info("Pausing control plane machine health check", "MachineHealthCheck", cpMachineHealthCheckName(kcp.Name))
if err := pauseMachineHealthCheck(ctx, mhc, mhcPatchHelper); err != nil {
return ctrl.Result{}, fmt.Errorf("updating annotations for machine health check: %v", err)
}
log.Info("Pausing control plane machine health check", "MachineHealthCheck", cpMachineHealthCheckName(kcp.Name))
if err := pauseMachineHealthCheck(ctx, mhc, mhcPatchHelper); err != nil {
return ctrl.Result{}, fmt.Errorf("updating annotations for machine health check: %v", err)
}

if err := r.client.Create(ctx, cpUpgrade); client.IgnoreAlreadyExists(err) != nil {
return ctrl.Result{}, fmt.Errorf("failed to create ControlPlaneUpgrade for KubeadmControlPlane %s: %v", kcp.Name, err)
}
return ctrl.Result{}, nil
if err := r.client.Create(ctx, cpUpgrade); client.IgnoreAlreadyExists(err) != nil {
return ctrl.Result{}, fmt.Errorf("failed to create ControlPlaneUpgrade for KubeadmControlPlane %s: %v", kcp.Name, err)
}
return ctrl.Result{}, fmt.Errorf("getting ControlPlaneUpgrade for KubeadmControlPlane %s: %v", kcp.Name, cpuGetErr)
}
if !cpUpgrade.Status.Ready {
return ctrl.Result{}, nil
}

log.Info("Control plane upgrade complete, deleting object", "ControlPlaneUpgrade", cpUpgrade.Name)
if err := r.client.Delete(ctx, cpUpgrade); err != nil {
return ctrl.Result{}, fmt.Errorf("deleting ControlPlaneUpgrade object: %v", err)
}

log.Info("Resuming control plane machine health check", "MachineHealthCheck", cpMachineHealthCheckName(kcp.Name))
if err := resumeMachineHealthCheck(ctx, mhc, mhcPatchHelper); err != nil {
return ctrl.Result{}, fmt.Errorf("updating annotations for machine health check: %v", err)
}
return ctrl.Result{}, fmt.Errorf("getting ControlPlaneUpgrade for KubeadmControlPlane %s: %v", kcp.Name, err)

return ctrl.Result{}, nil
}

func (r *KubeadmControlPlaneReconciler) inPlaceUpgradeNeeded(kcp *controlplanev1.KubeadmControlPlane) bool {
Expand Down
123 changes: 52 additions & 71 deletions controllers/kubeadmcontrolplane_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

. "github.com/onsi/gomega"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -24,6 +25,11 @@ import (
"github.com/aws/eks-anywhere/pkg/constants"
)

const (
kcpInPlaceAnnotation = "controlplane.clusters.x-k8s.io/in-place-upgrade-needed"
capiPausedAnnotation = "cluster.x-k8s.io/paused"
)

type kcpObjects struct {
machines []*clusterv1.Machine
cpUpgrade *anywherev1.ControlPlaneUpgrade
Expand All @@ -39,31 +45,12 @@ func TestKCPSetupWithManager(t *testing.T) {
g.Expect(r.SetupWithManager(env.Manager())).To(Succeed())
}

func TestKCPReconcile(t *testing.T) {
g := NewWithT(t)
ctx := context.Background()
kcpObjs := getObjectsForKCP()

runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.cpUpgrade, kcpObjs.kcp, kcpObjs.mhc}
client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build()
r := controllers.NewKubeadmControlPlaneReconciler(client)
req := kcpRequest(kcpObjs.kcp)
_, err := r.Reconcile(ctx, req)
g.Expect(err).ToNot(HaveOccurred())

cpu := &anywherev1.ControlPlaneUpgrade{}
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.cpUpgrade.Name, Namespace: constants.EksaSystemNamespace}, cpu)
g.Expect(err).ToNot(HaveOccurred())
}

func TestKCPReconcileComplete(t *testing.T) {
func TestKCPReconcileNotNeeded(t *testing.T) {
g := NewWithT(t)
ctx := context.Background()
kcpObjs := getObjectsForKCP()

count := int32(len(kcpObjs.machines))
kcpObjs.kcp.Spec.Replicas = pointer.Int32(count)
kcpObjs.kcp.Status.UpdatedReplicas = count
delete(kcpObjs.kcp.Annotations, kcpInPlaceAnnotation)

runtimeObjs := []runtime.Object{kcpObjs.kcp, kcpObjs.mhc}
client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build()
Expand All @@ -72,42 +59,27 @@ func TestKCPReconcileComplete(t *testing.T) {
_, err := r.Reconcile(ctx, req)
g.Expect(err).ToNot(HaveOccurred())

kcp := &controlplanev1.KubeadmControlPlane{}
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.kcp.Name, Namespace: constants.EksaSystemNamespace}, kcp)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(kcp.Annotations).ToNot(HaveKey("controlplane.clusters.x-k8s.io/in-place-upgrade-needed"))

mhc := &clusterv1.MachineHealthCheck{}
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.mhc.Name, Namespace: constants.EksaSystemNamespace}, mhc)
g.Expect(err).ToNot(HaveOccurred())
g.Eventually(func(g Gomega) error {
func(g Gomega) {
g.Expect(mhc.Annotations).To(HaveKey("cluster.x-k8s.io/paused"))
}(g)

return nil
})
g.Expect(mhc.Annotations).ToNot(HaveKey("cluster.x-k8s.io/paused"))
g.Expect(mhc.Annotations).ToNot(HaveKey(capiPausedAnnotation))
}

func TestKCPReconcileNotNeeded(t *testing.T) {
func TestKCPReconcile(t *testing.T) {
g := NewWithT(t)
ctx := context.Background()
kcpObjs := getObjectsForKCP()

delete(kcpObjs.kcp.Annotations, "controlplane.clusters.x-k8s.io/in-place-upgrade-needed")

runtimeObjs := []runtime.Object{kcpObjs.kcp, kcpObjs.mhc}
runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.cpUpgrade, kcpObjs.kcp, kcpObjs.mhc}
client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build()
r := controllers.NewKubeadmControlPlaneReconciler(client)
req := kcpRequest(kcpObjs.kcp)
_, err := r.Reconcile(ctx, req)
g.Expect(err).ToNot(HaveOccurred())

mhc := &clusterv1.MachineHealthCheck{}
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.mhc.Name, Namespace: constants.EksaSystemNamespace}, mhc)
cpu := &anywherev1.ControlPlaneUpgrade{}
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.cpUpgrade.Name, Namespace: constants.EksaSystemNamespace}, cpu)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(mhc.Annotations).ToNot(HaveKey("cluster.x-k8s.io/paused"))
}

func TestKCPReconcileCreateControlPlaneUpgrade(t *testing.T) {
Expand Down Expand Up @@ -136,14 +108,15 @@ func TestKCPReconcileCreateControlPlaneUpgrade(t *testing.T) {
mhc := &clusterv1.MachineHealthCheck{}
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.mhc.Name, Namespace: constants.EksaSystemNamespace}, mhc)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(mhc.Annotations).To(HaveKey("cluster.x-k8s.io/paused"))
g.Expect(mhc.Annotations).To(HaveKey(capiPausedAnnotation))
}

func TestKCPReconcileControlPlaneUpgradeReady(t *testing.T) {
func TestKCPReconcileKCPAndControlPlaneUpgradeReady(t *testing.T) {
g := NewWithT(t)
ctx := context.Background()
kcpObjs := getObjectsForKCP()

kcpObjs.kcp.Status.Version = &kcpObjs.kcp.Spec.Version
kcpObjs.cpUpgrade.Status.Ready = true

runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.cpUpgrade, kcpObjs.kcp, kcpObjs.mhc}
Expand All @@ -156,68 +129,76 @@ func TestKCPReconcileControlPlaneUpgradeReady(t *testing.T) {
cpu := &anywherev1.ControlPlaneUpgrade{}
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.cpUpgrade.Name, Namespace: constants.EksaSystemNamespace}, cpu)
g.Expect(err).To(HaveOccurred())
g.Expect(err).To(MatchError("controlplaneupgrades.anywhere.eks.amazonaws.com \"my-cluster-cp-upgrade\" not found"))

kcp := &controlplanev1.KubeadmControlPlane{}
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.kcp.Name, Namespace: constants.EksaSystemNamespace}, kcp)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(kcp.Annotations).ToNot(HaveKey(kcpInPlaceAnnotation))

mhc := &clusterv1.MachineHealthCheck{}
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.mhc.Name, Namespace: constants.EksaSystemNamespace}, mhc)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(mhc.Annotations).ToNot(HaveKey("cluster.x-k8s.io/paused"))
g.Expect(mhc.Annotations).ToNot(HaveKey(capiPausedAnnotation))
}

func TestKCPReconcileKCPAndControlPlaneUpgradeReady(t *testing.T) {
func TestKCPReconcileFullFlow(t *testing.T) {
g := NewWithT(t)
ctx := context.Background()
kcpObjs := getObjectsForKCP()

kcpObjs.kcp.Status.UpdatedReplicas = *kcpObjs.kcp.Spec.Replicas
kcpObjs.cpUpgrade.Status.Ready = true

runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.cpUpgrade, kcpObjs.kcp, kcpObjs.mhc}
runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.kcp, kcpObjs.mhc}
client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build()
r := controllers.NewKubeadmControlPlaneReconciler(client)
req := kcpRequest(kcpObjs.kcp)
_, err := r.Reconcile(ctx, req)
g.Expect(err).ToNot(HaveOccurred())

// Expect ControlPlaneUpgrade object to be created and not ready
cpu := &anywherev1.ControlPlaneUpgrade{}
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.cpUpgrade.Name, Namespace: constants.EksaSystemNamespace}, cpu)
g.Expect(err).To(HaveOccurred())
g.Expect(err).To(MatchError("controlplaneupgrades.anywhere.eks.amazonaws.com \"my-cluster-cp-upgrade\" not found"))
g.Expect(err).ToNot(HaveOccurred())
g.Expect(cpu.Status.Ready).To(BeFalse())

// Expect KCP to still have in-place annotation
kcp := &controlplanev1.KubeadmControlPlane{}
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.kcp.Name, Namespace: constants.EksaSystemNamespace}, kcp)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(kcp.Annotations).ToNot(HaveKey("controlplane.clusters.x-k8s.io/in-place-upgrade-needed"))
g.Expect(kcp.Annotations).To(HaveKey(kcpInPlaceAnnotation))

// Expect MHC for KCP to be paused
mhc := &clusterv1.MachineHealthCheck{}
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.mhc.Name, Namespace: constants.EksaSystemNamespace}, mhc)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(mhc.Annotations).ToNot(HaveKey("cluster.x-k8s.io/paused"))
}
g.Expect(mhc.Annotations).To(HaveKey(capiPausedAnnotation))

func TestKCPReconcileKCPReadyAndCPUpgradeAlreadyDeleted(t *testing.T) {
g := NewWithT(t)
ctx := context.Background()
kcpObjs := getObjectsForKCP()

kcpObjs.kcp.Status.UpdatedReplicas = *kcpObjs.kcp.Spec.Replicas
// Mark ControlPlaneUpgrade as ready and update KCP status K8s version
cpu.Status.Ready = true
err = client.Update(ctx, cpu)
g.Expect(err).ToNot(HaveOccurred())
kcp.Status.Version = &kcp.Spec.Version
err = client.Update(ctx, kcp)
g.Expect(err).ToNot(HaveOccurred())

runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.kcp, kcpObjs.mhc}
client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build()
r := controllers.NewKubeadmControlPlaneReconciler(client)
req := kcpRequest(kcpObjs.kcp)
_, err := r.Reconcile(ctx, req)
// trigger another reconcile loop
req = kcpRequest(kcp)
_, err = r.Reconcile(ctx, req)
g.Expect(err).ToNot(HaveOccurred())

// verify the in-place-upgrade-needed annotation is removed even when the ControlPlaneUpgrade object is not found
kcp := &controlplanev1.KubeadmControlPlane{}
// Expect ControlPlaneUpgrade object to be deleted
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.cpUpgrade.Name, Namespace: constants.EksaSystemNamespace}, cpu)
g.Expect(err).To(HaveOccurred())
g.Expect(apierrors.IsNotFound(err)).To(BeTrue())

// Expect KCP to no longer have in-place annotation
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.kcp.Name, Namespace: constants.EksaSystemNamespace}, kcp)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(kcp.Annotations).ToNot(HaveKey("controlplane.clusters.x-k8s.io/in-place-upgrade-needed"))
g.Expect(kcp.Annotations).ToNot(HaveKey(kcpInPlaceAnnotation))

mhc := &clusterv1.MachineHealthCheck{}
// Expect MHC for KCP to not be paused
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.mhc.Name, Namespace: constants.EksaSystemNamespace}, mhc)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(mhc.Annotations).ToNot(HaveKey("cluster.x-k8s.io/paused"))
g.Expect(mhc.Annotations).ToNot(HaveKey(capiPausedAnnotation))
}

func TestKCPReconcileNotFound(t *testing.T) {
Expand Down Expand Up @@ -332,7 +313,7 @@ func generateKCP(name string) *controlplanev1.KubeadmControlPlane {
Namespace: constants.EksaSystemNamespace,
UID: "test-uid",
Annotations: map[string]string{
"controlplane.clusters.x-k8s.io/in-place-upgrade-needed": "true",
kcpInPlaceAnnotation: "true",
},
},
Spec: controlplanev1.KubeadmControlPlaneSpec{
Expand Down
Loading

0 comments on commit 86586ba

Please sign in to comment.