Skip to content

Commit

Permalink
update codebase (#198)
Browse files Browse the repository at this point in the history
  • Loading branch information
aobort authored Apr 25, 2024
2 parents c9d3806 + 6cf51c5 commit eaca8d6
Show file tree
Hide file tree
Showing 11 changed files with 96 additions and 188 deletions.
38 changes: 21 additions & 17 deletions internal/controller/etcdcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"

policyv1 "k8s.io/api/policy/v1"
"sigs.k8s.io/controller-runtime/pkg/log"

"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/predicate"
Expand All @@ -32,7 +33,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

etcdaenixiov1alpha1 "github.com/aenix-io/etcd-operator/api/v1alpha1"
Expand Down Expand Up @@ -127,45 +127,49 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// ensureClusterObjects creates or updates all objects owned by cluster CR
func (r *EtcdClusterReconciler) ensureClusterObjects(
ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) error {
if err := factory.CreateOrUpdateClusterStateConfigMap(ctx, cluster, r.Client, r.Scheme); err != nil {
if err := factory.CreateOrUpdateClusterStateConfigMap(ctx, cluster, r.Client); err != nil {
return err
}
if err := factory.CreateOrUpdateHeadlessService(ctx, cluster, r.Client, r.Scheme); err != nil {
if err := factory.CreateOrUpdateHeadlessService(ctx, cluster, r.Client); err != nil {
return err
}
if err := factory.CreateOrUpdateStatefulSet(ctx, cluster, r.Client, r.Scheme); err != nil {
if err := factory.CreateOrUpdateStatefulSet(ctx, cluster, r.Client); err != nil {
return err
}
if err := factory.CreateOrUpdateClientService(ctx, cluster, r.Client, r.Scheme); err != nil {
if err := factory.CreateOrUpdateClientService(ctx, cluster, r.Client); err != nil {
return err
}
if err := factory.CreateOrUpdatePdb(ctx, cluster, r.Client, r.Scheme); err != nil {
if err := factory.CreateOrUpdatePdb(ctx, cluster, r.Client); err != nil {
return err
}

return nil
}

// updateStatusOnErr wraps error and updates EtcdCluster status
func (r *EtcdClusterReconciler) updateStatusOnErr(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster, err error) (ctrl.Result, error) {
res, statusErr := r.updateStatus(ctx, cluster)
// The function 'updateStatusOnErr' will always return non-nil error. Hence, the ctrl.Result will always be ignored.
// Therefore, the ctrl.Result returned by 'updateStatus' function can be discarded.
// REF: https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/reconcile@v0.17.3#Reconciler
_, statusErr := r.updateStatus(ctx, cluster)
if statusErr != nil {
return res, goerrors.Join(statusErr, err)
return ctrl.Result{}, goerrors.Join(statusErr, err)
}
return res, err
return ctrl.Result{}, err
}

// updateStatus updates EtcdCluster status and returns error and requeue in case status could not be updated due to conflict
func (r *EtcdClusterReconciler) updateStatus(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) (ctrl.Result, error) {
logger := log.FromContext(ctx)
if err := r.Status().Update(ctx, cluster); err != nil {
logger.Error(err, "unable to update cluster status")
if errors.IsConflict(err) {
return ctrl.Result{Requeue: true}, nil
}
return ctrl.Result{}, err
err := r.Status().Update(ctx, cluster)
if err == nil {
return ctrl.Result{}, nil
}
if errors.IsConflict(err) {
logger.V(2).Info("conflict during cluster status update")
return ctrl.Result{Requeue: true}, nil
}
return ctrl.Result{}, nil
logger.Error(err, "cannot update cluster status")
return ctrl.Result{}, err
}

// isStatefulSetReady gets managed StatefulSet and checks its readiness.
Expand Down
131 changes: 27 additions & 104 deletions internal/controller/factory/builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,123 +20,46 @@ import (
"context"
"fmt"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/log"
)

func reconcileStatefulSet(ctx context.Context, rclient client.Client, crdName string, sts *appsv1.StatefulSet) error {
logger := log.FromContext(ctx)
logger.V(2).Info("statefulset reconciliation started")

currentSts := &appsv1.StatefulSet{}
logger.V(2).Info("statefulset found", "sts_name", currentSts.Name)

err := rclient.Get(ctx, types.NamespacedName{Namespace: sts.Namespace, Name: sts.Name}, currentSts)
if err != nil {
if errors.IsNotFound(err) {
logger.V(2).Info("creating new statefulset", "sts_name", sts.Name, "crd_object", crdName)
return rclient.Create(ctx, sts)
}
return fmt.Errorf("cannot get existing statefulset: %s, for crd_object: %s, err: %w", sts.Name, crdName, err)
}
sts.Annotations = labels.Merge(currentSts.Annotations, sts.Annotations)
logger.V(2).Info("statefulset annotations merged", "sts_annotations", sts.Annotations)
sts.Status = currentSts.Status
return rclient.Update(ctx, sts)
}

func reconcileConfigMap(ctx context.Context, rclient client.Client, crdName string, configMap *corev1.ConfigMap) error {
logger := log.FromContext(ctx)
logger.V(2).Info("configmap reconciliation started")

currentConfigMap := &corev1.ConfigMap{}
logger.V(2).Info("configmap found", "cm_name", currentConfigMap.Name)

err := rclient.Get(ctx, types.NamespacedName{Namespace: configMap.Namespace, Name: configMap.Name}, currentConfigMap)
if err != nil {
if errors.IsNotFound(err) {
logger.V(2).Info("creating new configMap", "cm_name", configMap.Name, "crd_object", crdName)
return rclient.Create(ctx, configMap)
}
return fmt.Errorf("cannot get existing configMap: %s, for crd_object: %s, err: %w", configMap.Name, crdName, err)
}
configMap.Annotations = labels.Merge(currentConfigMap.Annotations, configMap.Annotations)
logger.V(2).Info("configmap annotations merged", "cm_annotations", configMap.Annotations)
return rclient.Update(ctx, configMap)
}

func reconcileService(ctx context.Context, rclient client.Client, crdName string, svc *corev1.Service) error {
logger := log.FromContext(ctx)
logger.V(2).Info("service reconciliation started")

if svc == nil {
return fmt.Errorf("service is nil for crd_object: %s", crdName)
func reconcileOwnedResource(ctx context.Context, c client.Client, resource client.Object) error {
if resource == nil {
return fmt.Errorf("resource cannot be nil")
}

currentSvc := &corev1.Service{}
logger.V(2).Info("service found", "svc_name", currentSvc.Name)

err := rclient.Get(ctx, types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name}, currentSvc)
gvk, err := apiutil.GVKForObject(resource, c.Scheme())
if err != nil {
if errors.IsNotFound(err) {
logger.V(2).Info("creating new service", "svc_name", svc.Name, "crd_object", crdName)
return rclient.Create(ctx, svc)
}
return fmt.Errorf("cannot get existing service: %s, for crd_object: %s, err: %w", svc.Name, crdName, err)
return fmt.Errorf("failed to get GVK: %w", err)
}
svc.Annotations = labels.Merge(currentSvc.Annotations, svc.Annotations)
logger.V(2).Info("service annotations merged", "svc_annotations", svc.Annotations)
svc.Status = currentSvc.Status
return rclient.Update(ctx, svc)
}

// deleteManagedPdb deletes cluster PDB if it exists.
// pdb parameter should have at least metadata.name and metadata.namespace fields filled.
func deleteManagedPdb(ctx context.Context, rclient client.Client, pdb *v1.PodDisruptionBudget) error {
logger := log.FromContext(ctx)

currentPdb := &v1.PodDisruptionBudget{}
logger.V(2).Info("pdb found", "pdb_name", currentPdb.Name)

err := rclient.Get(ctx, types.NamespacedName{Namespace: pdb.Namespace, Name: pdb.Name}, currentPdb)
if err != nil {
logger.V(2).Info("error getting cluster PDB", "error", err)
return client.IgnoreNotFound(err)
logger := log.FromContext(ctx).WithValues("group", gvk.GroupVersion().String(), "kind", gvk.Kind, "name", resource.GetName())
logger.V(2).Info("reconciling owned resource")

base := resource.DeepCopyObject().(client.Object)
err = c.Get(ctx, client.ObjectKeyFromObject(resource), base)
if err == nil {
logger.V(2).Info("updating owned resource")
resource.SetAnnotations(labels.Merge(base.GetAnnotations(), resource.GetAnnotations()))
resource.SetResourceVersion(base.GetResourceVersion())
logger.V(2).Info("owned resource annotations merged", "annotations", resource.GetAnnotations())
return c.Update(ctx, resource)
}
err = rclient.Delete(ctx, currentPdb)
if err != nil {
logger.Error(err, "error deleting cluster PDB", "name", pdb.Name)
return client.IgnoreNotFound(err)
if errors.IsNotFound(err) {
logger.V(2).Info("creating new owned resource")
return c.Create(ctx, resource)
}

return nil
return fmt.Errorf("error getting owned resource: %w", err)
}

func reconcilePdb(ctx context.Context, rclient client.Client, crdName string, pdb *v1.PodDisruptionBudget) error {
logger := log.FromContext(ctx)
logger.V(2).Info("pdb reconciliation started")

currentPdb := &v1.PodDisruptionBudget{}
logger.V(2).Info("pdb found", "pdb_name", currentPdb.Name)

err := rclient.Get(ctx, types.NamespacedName{Namespace: pdb.Namespace, Name: pdb.Name}, currentPdb)
func deleteOwnedResource(ctx context.Context, c client.Client, resource client.Object) error {
gvk, err := apiutil.GVKForObject(resource, c.Scheme())
if err != nil {
if errors.IsNotFound(err) {
logger.V(2).Info("creating new PDB", "pdb_name", pdb.Name, "crd_object", crdName)
return rclient.Create(ctx, pdb)
}
logger.V(2).Info("error getting cluster PDB", "error", err)
return fmt.Errorf("cannot get existing pdb resource: %s for crd_object: %s, err: %w", pdb.Name, crdName, err)
return err
}
pdb.Annotations = labels.Merge(currentPdb.Annotations, pdb.Annotations)
logger.V(2).Info("pdb annotations merged", "pdb_annotations", pdb.Annotations)
pdb.ResourceVersion = currentPdb.ResourceVersion
pdb.Status = currentPdb.Status
return rclient.Update(ctx, pdb)
logger := log.FromContext(ctx).WithValues("group", gvk.GroupVersion().String(), "kind", gvk.Kind, "name", resource.GetName())
logger.V(2).Info("deleting owned resource")
return client.IgnoreNotFound(c.Delete(ctx, resource))
}
10 changes: 3 additions & 7 deletions internal/controller/factory/configMap.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,12 @@ import (
"context"
"fmt"

etcdaenixiov1alpha1 "github.com/aenix-io/etcd-operator/api/v1alpha1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"sigs.k8s.io/controller-runtime/pkg/log"

etcdaenixiov1alpha1 "github.com/aenix-io/etcd-operator/api/v1alpha1"
)

func GetClusterStateConfigMapName(cluster *etcdaenixiov1alpha1.EtcdCluster) string {
Expand All @@ -39,7 +36,6 @@ func CreateOrUpdateClusterStateConfigMap(
ctx context.Context,
cluster *etcdaenixiov1alpha1.EtcdCluster,
rclient client.Client,
rscheme *runtime.Scheme,
) error {
initialCluster := ""
clusterService := fmt.Sprintf("%s.%s.svc:2380", GetHeadlessServiceName(cluster), cluster.Namespace)
Expand Down Expand Up @@ -73,11 +69,11 @@ func CreateOrUpdateClusterStateConfigMap(
}
logger.V(2).Info("configmap spec generated", "cm_name", configMap.Name, "cm_spec", configMap.Data)

if err := ctrl.SetControllerReference(cluster, configMap, rscheme); err != nil {
if err := ctrl.SetControllerReference(cluster, configMap, rclient.Scheme()); err != nil {
return fmt.Errorf("cannot set controller reference: %w", err)
}

return reconcileConfigMap(ctx, rclient, cluster.Name, configMap)
return reconcileOwnedResource(ctx, rclient, configMap)
}

// isEtcdClusterReady returns true if condition "Ready" has progressed
Expand Down
10 changes: 4 additions & 6 deletions internal/controller/factory/configmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package factory
import (
"github.com/google/uuid"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/utils/ptr"
. "sigs.k8s.io/controller-runtime/pkg/envtest/komega"

Expand Down Expand Up @@ -88,7 +87,7 @@ var _ = Describe("CreateOrUpdateClusterStateConfigMap handlers", func() {
It("should successfully ensure the configmap", func(ctx SpecContext) {
var configMapUID types.UID
By("processing new etcd cluster", func() {
Expect(CreateOrUpdateClusterStateConfigMap(ctx, &etcdcluster, k8sClient, k8sClient.Scheme())).To(Succeed())
Expect(CreateOrUpdateClusterStateConfigMap(ctx, &etcdcluster, k8sClient)).To(Succeed())
Eventually(Get(&configMap)).Should(Succeed())
Expect(configMap.Data["ETCD_INITIAL_CLUSTER_STATE"]).To(Equal("new"))
configMapUID = configMap.GetUID()
Expand All @@ -99,7 +98,7 @@ var _ = Describe("CreateOrUpdateClusterStateConfigMap handlers", func() {
WithReason(string(etcdaenixiov1alpha1.EtcdCondTypeStatefulSetReady)).
WithStatus(true).
Complete())
Expect(CreateOrUpdateClusterStateConfigMap(ctx, &etcdcluster, k8sClient, k8sClient.Scheme())).To(Succeed())
Expect(CreateOrUpdateClusterStateConfigMap(ctx, &etcdcluster, k8sClient)).To(Succeed())
Eventually(Object(&configMap)).Should(HaveField("ObjectMeta.UID", Equal(configMapUID)))
Expect(configMap.Data["ETCD_INITIAL_CLUSTER_STATE"]).To(Equal("existing"))
})
Expand All @@ -109,15 +108,14 @@ var _ = Describe("CreateOrUpdateClusterStateConfigMap handlers", func() {
WithReason(string(etcdaenixiov1alpha1.EtcdCondTypeWaitingForFirstQuorum)).
WithStatus(true).
Complete())
Expect(CreateOrUpdateClusterStateConfigMap(ctx, &etcdcluster, k8sClient, k8sClient.Scheme())).To(Succeed())
Expect(CreateOrUpdateClusterStateConfigMap(ctx, &etcdcluster, k8sClient)).To(Succeed())
Eventually(Object(&configMap)).Should(HaveField("ObjectMeta.UID", Equal(configMapUID)))
Expect(configMap.Data["ETCD_INITIAL_CLUSTER_STATE"]).To(Equal("new"))
})
})

It("should fail to create the configmap with invalid owner reference", func(ctx SpecContext) {
emptyScheme := runtime.NewScheme()
Expect(CreateOrUpdateClusterStateConfigMap(ctx, &etcdcluster, k8sClient, emptyScheme)).NotTo(Succeed())
Expect(CreateOrUpdateClusterStateConfigMap(ctx, &etcdcluster, clientWithEmptyScheme)).NotTo(Succeed())
})
})
})
8 changes: 3 additions & 5 deletions internal/controller/factory/pdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
etcdaenixiov1alpha1 "github.com/aenix-io/etcd-operator/api/v1alpha1"
v1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -35,10 +34,9 @@ func CreateOrUpdatePdb(
ctx context.Context,
cluster *etcdaenixiov1alpha1.EtcdCluster,
rclient client.Client,
rscheme *runtime.Scheme,
) error {
if cluster.Spec.PodDisruptionBudgetTemplate == nil {
return deleteManagedPdb(ctx, rclient, &v1.PodDisruptionBudget{
return deleteOwnedResource(ctx, rclient, &v1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Namespace: cluster.Namespace,
Name: cluster.Name,
Expand Down Expand Up @@ -67,9 +65,9 @@ func CreateOrUpdatePdb(

logger.V(2).Info("pdb spec generated", "pdb_name", pdb.Name, "pdb_spec", pdb.Spec)

if err := ctrl.SetControllerReference(cluster, pdb, rscheme); err != nil {
if err := ctrl.SetControllerReference(cluster, pdb, rclient.Scheme()); err != nil {
return fmt.Errorf("cannot set controller reference: %w", err)
}

return reconcilePdb(ctx, rclient, cluster.Name, pdb)
return reconcileOwnedResource(ctx, rclient, pdb)
}
10 changes: 5 additions & 5 deletions internal/controller/factory/pdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,15 @@ var _ = Describe("CreateOrUpdatePdb handlers", func() {

It("should create PDB with pre-filled data", func(ctx SpecContext) {
etcdcluster.Spec.PodDisruptionBudgetTemplate.Spec.MinAvailable = ptr.To(intstr.FromInt32(int32(3)))
Expect(CreateOrUpdatePdb(ctx, &etcdcluster, k8sClient, k8sClient.Scheme())).To(Succeed())
Expect(CreateOrUpdatePdb(ctx, &etcdcluster, k8sClient)).To(Succeed())
Eventually(Get(&podDisruptionBudget)).Should(Succeed())
Expect(podDisruptionBudget.Spec.MinAvailable).NotTo(BeNil())
Expect(podDisruptionBudget.Spec.MinAvailable.IntValue()).To(Equal(3))
Expect(podDisruptionBudget.Spec.MaxUnavailable).To(BeNil())
})

It("should create PDB with empty data", func(ctx SpecContext) {
Expect(CreateOrUpdatePdb(ctx, &etcdcluster, k8sClient, k8sClient.Scheme())).To(Succeed())
Expect(CreateOrUpdatePdb(ctx, &etcdcluster, k8sClient)).To(Succeed())
Eventually(Get(&podDisruptionBudget)).Should(Succeed())
Expect(etcdcluster.Spec.PodDisruptionBudgetTemplate.Spec.MinAvailable).To(BeNil())
Expect(podDisruptionBudget.Spec.MinAvailable.IntValue()).To(Equal(2))
Expand All @@ -104,14 +104,14 @@ var _ = Describe("CreateOrUpdatePdb handlers", func() {

It("should skip deletion of PDB if not filled and not exist", func(ctx SpecContext) {
etcdcluster.Spec.PodDisruptionBudgetTemplate = nil
Expect(CreateOrUpdatePdb(ctx, &etcdcluster, k8sClient, k8sClient.Scheme())).NotTo(HaveOccurred())
Expect(CreateOrUpdatePdb(ctx, &etcdcluster, k8sClient)).NotTo(HaveOccurred())
})

It("should delete created PDB after updating CR", func(ctx SpecContext) {
Expect(CreateOrUpdatePdb(ctx, &etcdcluster, k8sClient, k8sClient.Scheme())).To(Succeed())
Expect(CreateOrUpdatePdb(ctx, &etcdcluster, k8sClient)).To(Succeed())
Eventually(Get(&podDisruptionBudget)).Should(Succeed())
etcdcluster.Spec.PodDisruptionBudgetTemplate = nil
Expect(CreateOrUpdatePdb(ctx, &etcdcluster, k8sClient, k8sClient.Scheme())).NotTo(HaveOccurred())
Expect(CreateOrUpdatePdb(ctx, &etcdcluster, k8sClient)).NotTo(HaveOccurred())
err = Get(&podDisruptionBudget)()
Expect(apierrors.IsNotFound(err)).To(BeTrue())
})
Expand Down
Loading

0 comments on commit eaca8d6

Please sign in to comment.