Skip to content

Commit

Permalink
Pause machine health checks during inplace upgrades (#7539)
Browse files Browse the repository at this point in the history
  • Loading branch information
taneyland authored Feb 14, 2024
1 parent 253ac63 commit 73209f6
Show file tree
Hide file tree
Showing 4 changed files with 230 additions and 14 deletions.
42 changes: 42 additions & 0 deletions controllers/kubeadmcontrolplane_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
kerrors "k8s.io/apimachinery/pkg/util/errors"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1beta1"
"sigs.k8s.io/cluster-api/util/annotations"
"sigs.k8s.io/cluster-api/util/collections"
"sigs.k8s.io/cluster-api/util/patch"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -118,12 +119,28 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, log logr.
cpUpgrade := &anywherev1.ControlPlaneUpgrade{}
cpuGetErr := r.client.Get(ctx, GetNamespacedNameType(cpUpgradeName(kcp.Name), constants.EksaSystemNamespace), cpUpgrade)

mhc := &clusterv1.MachineHealthCheck{}
if err := r.client.Get(ctx, GetNamespacedNameType(cpMachineHealthCheckName(kcp.Name), constants.EksaSystemNamespace), mhc); err != nil {
if apierrors.IsNotFound(err) {
return reconcile.Result{}, err
}
return ctrl.Result{}, fmt.Errorf("getting MachineHealthCheck %s: %v", cpMachineHealthCheckName(kcp.Name), err)
}
mhcPatchHelper, err := patch.NewHelper(mhc, r.client)
if err != nil {
return ctrl.Result{}, err
}

if kcp.Spec.Replicas != nil && (*kcp.Spec.Replicas == kcp.Status.UpdatedReplicas) {
if cpuGetErr == nil && cpUpgrade.Status.Ready {
log.Info("Control plane upgrade complete, deleting object", "ControlPlaneUpgrade", cpUpgrade.Name)
if err := r.client.Delete(ctx, cpUpgrade); err != nil {
return ctrl.Result{}, fmt.Errorf("deleting ControlPlaneUpgrade object: %v", err)
}
log.Info("Resuming control plane machine health check", "MachineHealthCheck", cpMachineHealthCheckName(kcp.Name))
if err := resumeMachineHealthCheck(ctx, mhc, mhcPatchHelper); err != nil {
return ctrl.Result{}, fmt.Errorf("updating annotations for machine health check: %v", err)
}
} else if !apierrors.IsNotFound(cpuGetErr) {
return ctrl.Result{}, fmt.Errorf("getting ControlPlaneUpgrade for KubeadmControlPlane %s: %v", kcp.Name, cpuGetErr)
}
Expand All @@ -145,6 +162,12 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, log logr.
if err != nil {
return ctrl.Result{}, fmt.Errorf("generating ControlPlaneUpgrade: %v", err)
}

log.Info("Pausing control plane machine health check", "MachineHealthCheck", cpMachineHealthCheckName(kcp.Name))
if err := pauseMachineHealthCheck(ctx, mhc, mhcPatchHelper); err != nil {
return ctrl.Result{}, fmt.Errorf("updating annotations for machine health check: %v", err)
}

if err := r.client.Create(ctx, cpUpgrade); client.IgnoreAlreadyExists(err) != nil {
return ctrl.Result{}, fmt.Errorf("failed to create ControlPlaneUpgrade for KubeadmControlPlane %s: %v", kcp.Name, err)
}
Expand All @@ -161,6 +184,11 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, log logr.
return ctrl.Result{}, fmt.Errorf("deleting ControlPlaneUpgrade object: %v", err)
}

log.Info("Resuming control plane machine health check", "MachineHealthCheck", cpMachineHealthCheckName(kcp.Name))
if err := resumeMachineHealthCheck(ctx, mhc, mhcPatchHelper); err != nil {
return ctrl.Result{}, fmt.Errorf("updating annotations for machine health check: %v", err)
}

return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -201,6 +229,16 @@ func (r *KubeadmControlPlaneReconciler) validateStackedEtcd(kcp *controlplanev1.
return nil
}

func pauseMachineHealthCheck(ctx context.Context, mhc *clusterv1.MachineHealthCheck, mhcPatchHelper *patch.Helper) error {
annotations.AddAnnotations(mhc, map[string]string{clusterv1.PausedAnnotation: "true"})
return mhcPatchHelper.Patch(ctx, mhc)
}

func resumeMachineHealthCheck(ctx context.Context, mhc *clusterv1.MachineHealthCheck, mhcPatchHelper *patch.Helper) error {
delete(mhc.Annotations, clusterv1.PausedAnnotation)
return mhcPatchHelper.Patch(ctx, mhc)
}

func controlPlaneUpgrade(kcp *controlplanev1.KubeadmControlPlane, machines []corev1.ObjectReference) (*anywherev1.ControlPlaneUpgrade, error) {
kcpSpec, err := json.Marshal(kcp.Spec)
if err != nil {
Expand Down Expand Up @@ -235,3 +273,7 @@ func controlPlaneUpgrade(kcp *controlplanev1.KubeadmControlPlane, machines []cor
func cpUpgradeName(kcpName string) string {
return kcpName + "-cp-upgrade"
}

func cpMachineHealthCheckName(kcpName string) string {
return fmt.Sprintf("%s-kcp-unhealthy", kcpName)
}
83 changes: 76 additions & 7 deletions controllers/kubeadmcontrolplane_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"testing"
"time"

. "github.com/onsi/gomega"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -26,6 +28,7 @@ type kcpObjects struct {
machines []*clusterv1.Machine
cpUpgrade *anywherev1.ControlPlaneUpgrade
kcp *controlplanev1.KubeadmControlPlane
mhc *clusterv1.MachineHealthCheck
}

func TestKCPSetupWithManager(t *testing.T) {
Expand All @@ -41,7 +44,7 @@ func TestKCPReconcile(t *testing.T) {
ctx := context.Background()
kcpObjs := getObjectsForKCP()

runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.cpUpgrade, kcpObjs.kcp}
runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.cpUpgrade, kcpObjs.kcp, kcpObjs.mhc}
client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build()
r := controllers.NewKubeadmControlPlaneReconciler(client)
req := kcpRequest(kcpObjs.kcp)
Expand All @@ -62,7 +65,7 @@ func TestKCPReconcileComplete(t *testing.T) {
kcpObjs.kcp.Spec.Replicas = pointer.Int32(count)
kcpObjs.kcp.Status.UpdatedReplicas = count

runtimeObjs := []runtime.Object{kcpObjs.kcp}
runtimeObjs := []runtime.Object{kcpObjs.kcp, kcpObjs.mhc}
client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build()
r := controllers.NewKubeadmControlPlaneReconciler(client)
req := kcpRequest(kcpObjs.kcp)
Expand All @@ -73,6 +76,18 @@ func TestKCPReconcileComplete(t *testing.T) {
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.kcp.Name, Namespace: constants.EksaSystemNamespace}, kcp)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(kcp.Annotations).ToNot(HaveKey("controlplane.clusters.x-k8s.io/in-place-upgrade-needed"))

mhc := &clusterv1.MachineHealthCheck{}
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.mhc.Name, Namespace: constants.EksaSystemNamespace}, mhc)
g.Expect(err).ToNot(HaveOccurred())
g.Eventually(func(g Gomega) error {
func(g Gomega) {
g.Expect(mhc.Annotations).To(HaveKey("cluster.x-k8s.io/paused"))
}(g)

return nil
})
g.Expect(mhc.Annotations).ToNot(HaveKey("cluster.x-k8s.io/paused"))
}

func TestKCPReconcileNotNeeded(t *testing.T) {
Expand All @@ -82,20 +97,25 @@ func TestKCPReconcileNotNeeded(t *testing.T) {

delete(kcpObjs.kcp.Annotations, "controlplane.clusters.x-k8s.io/in-place-upgrade-needed")

runtimeObjs := []runtime.Object{kcpObjs.kcp}
runtimeObjs := []runtime.Object{kcpObjs.kcp, kcpObjs.mhc}
client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build()
r := controllers.NewKubeadmControlPlaneReconciler(client)
req := kcpRequest(kcpObjs.kcp)
_, err := r.Reconcile(ctx, req)
g.Expect(err).ToNot(HaveOccurred())

mhc := &clusterv1.MachineHealthCheck{}
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.mhc.Name, Namespace: constants.EksaSystemNamespace}, mhc)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(mhc.Annotations).ToNot(HaveKey("cluster.x-k8s.io/paused"))
}

func TestKCPReconcileCreateControlPlaneUpgrade(t *testing.T) {
g := NewWithT(t)
ctx := context.Background()
kcpObjs := getObjectsForKCP()

runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.kcp}
runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.kcp, kcpObjs.mhc}
client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build()
r := controllers.NewKubeadmControlPlaneReconciler(client)
req := kcpRequest(kcpObjs.kcp)
Expand All @@ -112,6 +132,11 @@ func TestKCPReconcileCreateControlPlaneUpgrade(t *testing.T) {
kcpSpec, err := json.Marshal(kcpObjs.kcp.Spec)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(cpu.Spec.ControlPlaneSpecData).To(BeEquivalentTo(base64.StdEncoding.EncodeToString(kcpSpec)))

mhc := &clusterv1.MachineHealthCheck{}
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.mhc.Name, Namespace: constants.EksaSystemNamespace}, mhc)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(mhc.Annotations).To(HaveKey("cluster.x-k8s.io/paused"))
}

func TestKCPReconcileControlPlaneUpgradeReady(t *testing.T) {
Expand All @@ -121,7 +146,7 @@ func TestKCPReconcileControlPlaneUpgradeReady(t *testing.T) {

kcpObjs.cpUpgrade.Status.Ready = true

runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.cpUpgrade, kcpObjs.kcp}
runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.cpUpgrade, kcpObjs.kcp, kcpObjs.mhc}
client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build()
r := controllers.NewKubeadmControlPlaneReconciler(client)
req := kcpRequest(kcpObjs.kcp)
Expand All @@ -131,6 +156,11 @@ func TestKCPReconcileControlPlaneUpgradeReady(t *testing.T) {
cpu := &anywherev1.ControlPlaneUpgrade{}
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.cpUpgrade.Name, Namespace: constants.EksaSystemNamespace}, cpu)
g.Expect(err).To(HaveOccurred())

mhc := &clusterv1.MachineHealthCheck{}
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.mhc.Name, Namespace: constants.EksaSystemNamespace}, mhc)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(mhc.Annotations).ToNot(HaveKey("cluster.x-k8s.io/paused"))
}

func TestKCPReconcileKCPAndControlPlaneUpgradeReady(t *testing.T) {
Expand All @@ -141,7 +171,7 @@ func TestKCPReconcileKCPAndControlPlaneUpgradeReady(t *testing.T) {
kcpObjs.kcp.Status.UpdatedReplicas = *kcpObjs.kcp.Spec.Replicas
kcpObjs.cpUpgrade.Status.Ready = true

runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.cpUpgrade, kcpObjs.kcp}
runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.cpUpgrade, kcpObjs.kcp, kcpObjs.mhc}
client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build()
r := controllers.NewKubeadmControlPlaneReconciler(client)
req := kcpRequest(kcpObjs.kcp)
Expand All @@ -157,6 +187,11 @@ func TestKCPReconcileKCPAndControlPlaneUpgradeReady(t *testing.T) {
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.kcp.Name, Namespace: constants.EksaSystemNamespace}, kcp)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(kcp.Annotations).ToNot(HaveKey("controlplane.clusters.x-k8s.io/in-place-upgrade-needed"))

mhc := &clusterv1.MachineHealthCheck{}
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.mhc.Name, Namespace: constants.EksaSystemNamespace}, mhc)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(mhc.Annotations).ToNot(HaveKey("cluster.x-k8s.io/paused"))
}

func TestKCPReconcileKCPReadyAndCPUpgradeAlreadyDeleted(t *testing.T) {
Expand All @@ -166,7 +201,7 @@ func TestKCPReconcileKCPReadyAndCPUpgradeAlreadyDeleted(t *testing.T) {

kcpObjs.kcp.Status.UpdatedReplicas = *kcpObjs.kcp.Spec.Replicas

runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.kcp}
runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.kcp, kcpObjs.mhc}
client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build()
r := controllers.NewKubeadmControlPlaneReconciler(client)
req := kcpRequest(kcpObjs.kcp)
Expand All @@ -178,6 +213,11 @@ func TestKCPReconcileKCPReadyAndCPUpgradeAlreadyDeleted(t *testing.T) {
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.kcp.Name, Namespace: constants.EksaSystemNamespace}, kcp)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(kcp.Annotations).ToNot(HaveKey("controlplane.clusters.x-k8s.io/in-place-upgrade-needed"))

mhc := &clusterv1.MachineHealthCheck{}
err = client.Get(ctx, types.NamespacedName{Name: kcpObjs.mhc.Name, Namespace: constants.EksaSystemNamespace}, mhc)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(mhc.Annotations).ToNot(HaveKey("cluster.x-k8s.io/paused"))
}

func TestKCPReconcileNotFound(t *testing.T) {
Expand All @@ -192,6 +232,19 @@ func TestKCPReconcileNotFound(t *testing.T) {
g.Expect(err).To(MatchError("kubeadmcontrolplanes.controlplane.cluster.x-k8s.io \"my-cluster\" not found"))
}

func TestKCPReconcileMHCNotFound(t *testing.T) {
g := NewWithT(t)
ctx := context.Background()
kcpObjs := getObjectsForKCP()

runtimeObjs := []runtime.Object{kcpObjs.machines[0], kcpObjs.machines[1], kcpObjs.kcp}
client := fake.NewClientBuilder().WithRuntimeObjects(runtimeObjs...).Build()
r := controllers.NewKubeadmControlPlaneReconciler(client)
req := kcpRequest(kcpObjs.kcp)
_, err := r.Reconcile(ctx, req)
g.Expect(err).To(MatchError("machinehealthchecks.cluster.x-k8s.io \"my-cluster-kcp-unhealthy\" not found"))
}

func TestKCPReconcileClusterConfigurationMissing(t *testing.T) {
g := NewWithT(t)
ctx := context.Background()
Expand Down Expand Up @@ -253,11 +306,13 @@ func getObjectsForKCP() kcpObjects {
Name: kcp.Name,
UID: kcp.UID,
}}
mhc := generateMHCforKCP(kcp.Name)

return kcpObjects{
machines: machines,
cpUpgrade: cpUpgrade,
kcp: kcp,
mhc: mhc,
}
}

Expand Down Expand Up @@ -297,3 +352,17 @@ func generateKCP(name string) *controlplanev1.KubeadmControlPlane {
},
}
}

func generateMHCforKCP(kcpName string) *clusterv1.MachineHealthCheck {
return &clusterv1.MachineHealthCheck{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-kcp-unhealthy", kcpName),
Namespace: "eksa-system",
},
Spec: clusterv1.MachineHealthCheckSpec{
NodeStartupTimeout: &metav1.Duration{
Duration: 20 * time.Minute,
},
},
}
}
31 changes: 31 additions & 0 deletions controllers/machinedeployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,28 @@ func (r *MachineDeploymentReconciler) reconcile(ctx context.Context, log logr.Lo
mdUpgrade := &anywherev1.MachineDeploymentUpgrade{}
mduGetErr := r.client.Get(ctx, GetNamespacedNameType(mdUpgradeName(md.Name), constants.EksaSystemNamespace), mdUpgrade)

mhc := &clusterv1.MachineHealthCheck{}
if err := r.client.Get(ctx, GetNamespacedNameType(mdMachineHealthCheckName(md.Name), constants.EksaSystemNamespace), mhc); err != nil {
if apierrors.IsNotFound(err) {
return reconcile.Result{}, err
}
return ctrl.Result{}, fmt.Errorf("getting MachineHealthCheck %s: %v", mdMachineHealthCheckName(md.Name), err)
}
mhcPatchHelper, err := patch.NewHelper(mhc, r.client)
if err != nil {
return ctrl.Result{}, err
}

if md.Spec.Replicas != nil && (*md.Spec.Replicas == md.Status.UpdatedReplicas) {
if mduGetErr == nil && mdUpgrade.Status.Ready {
log.Info("Machine deployment upgrade complete, deleting object", "MachineDeploymentUpgrade", mdUpgrade.Name)
if err := r.client.Delete(ctx, mdUpgrade); err != nil {
return ctrl.Result{}, fmt.Errorf("deleting MachineDeploymentUpgrade object: %v", err)
}
log.Info("Resuming machine deployment machine health check", "MachineHealthCheck", mdMachineHealthCheckName(md.Name))
if err := resumeMachineHealthCheck(ctx, mhc, mhcPatchHelper); err != nil {
return ctrl.Result{}, fmt.Errorf("updating annotations for machine health check: %v", err)
}
} else if !apierrors.IsNotFound(mduGetErr) {
return ctrl.Result{}, fmt.Errorf("getting MachineDeploymentUpgrade for MachineDeployment %s: %v", md.Name, mduGetErr)
}
Expand All @@ -143,6 +159,12 @@ func (r *MachineDeploymentReconciler) reconcile(ctx context.Context, log logr.Lo
if err != nil {
return ctrl.Result{}, fmt.Errorf("generating MachineDeploymentUpgrade: %v", err)
}

log.Info("Pausing machine deployment machine health check", "MachineHealthCheck", mdMachineHealthCheckName(md.Name))
if err := pauseMachineHealthCheck(ctx, mhc, mhcPatchHelper); err != nil {
return ctrl.Result{}, fmt.Errorf("updating annotations for machine health check: %v", err)
}

if err := r.client.Create(ctx, mdUpgrade); client.IgnoreAlreadyExists(err) != nil {
return ctrl.Result{}, fmt.Errorf("failed to create MachineDeploymentUpgrade for MachineDeployment %s: %v", md.Name, err)
}
Expand All @@ -158,6 +180,11 @@ func (r *MachineDeploymentReconciler) reconcile(ctx context.Context, log logr.Lo
return ctrl.Result{}, fmt.Errorf("deleting MachineDeploymentUpgrade object: %v", err)
}

log.Info("Resuming machine deployment machine health check", "MachineHealthCheck", mdMachineHealthCheckName(md.Name))
if err := resumeMachineHealthCheck(ctx, mhc, mhcPatchHelper); err != nil {
return ctrl.Result{}, fmt.Errorf("updating annotations for machine health check: %v", err)
}

return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -220,3 +247,7 @@ func machineDeploymentUpgrade(md *clusterv1.MachineDeployment, machines []corev1
func mdUpgradeName(mdName string) string {
return mdName + "-md-upgrade"
}

func mdMachineHealthCheckName(mdName string) string {
return fmt.Sprintf("%s-worker-unhealthy", mdName)
}
Loading

0 comments on commit 73209f6

Please sign in to comment.