Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update codebase #198

Merged
merged 2 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 21 additions & 17 deletions internal/controller/etcdcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"

policyv1 "k8s.io/api/policy/v1"
"sigs.k8s.io/controller-runtime/pkg/log"

"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/predicate"
Expand All @@ -32,7 +33,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

etcdaenixiov1alpha1 "github.com/aenix-io/etcd-operator/api/v1alpha1"
Expand Down Expand Up @@ -127,45 +127,49 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// ensureClusterObjects creates or updates all objects owned by cluster CR
func (r *EtcdClusterReconciler) ensureClusterObjects(
ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) error {
if err := factory.CreateOrUpdateClusterStateConfigMap(ctx, cluster, r.Client, r.Scheme); err != nil {
if err := factory.CreateOrUpdateClusterStateConfigMap(ctx, cluster, r.Client); err != nil {
return err
}
if err := factory.CreateOrUpdateHeadlessService(ctx, cluster, r.Client, r.Scheme); err != nil {
if err := factory.CreateOrUpdateHeadlessService(ctx, cluster, r.Client); err != nil {
return err
}
if err := factory.CreateOrUpdateStatefulSet(ctx, cluster, r.Client, r.Scheme); err != nil {
if err := factory.CreateOrUpdateStatefulSet(ctx, cluster, r.Client); err != nil {
return err
}
if err := factory.CreateOrUpdateClientService(ctx, cluster, r.Client, r.Scheme); err != nil {
if err := factory.CreateOrUpdateClientService(ctx, cluster, r.Client); err != nil {
return err
}
if err := factory.CreateOrUpdatePdb(ctx, cluster, r.Client, r.Scheme); err != nil {
if err := factory.CreateOrUpdatePdb(ctx, cluster, r.Client); err != nil {
return err
}

return nil
}

// updateStatusOnErr wraps error and updates EtcdCluster status
func (r *EtcdClusterReconciler) updateStatusOnErr(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster, err error) (ctrl.Result, error) {
res, statusErr := r.updateStatus(ctx, cluster)
// The function 'updateStatusOnErr' will always return non-nil error. Hence, the ctrl.Result will always be ignored.
// Therefore, the ctrl.Result returned by 'updateStatus' function can be discarded.
// REF: https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/reconcile@v0.17.3#Reconciler
_, statusErr := r.updateStatus(ctx, cluster)
if statusErr != nil {
return res, goerrors.Join(statusErr, err)
return ctrl.Result{}, goerrors.Join(statusErr, err)
}
return res, err
return ctrl.Result{}, err
}

// updateStatus updates EtcdCluster status and returns error and requeue in case status could not be updated due to conflict
func (r *EtcdClusterReconciler) updateStatus(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) (ctrl.Result, error) {
logger := log.FromContext(ctx)
if err := r.Status().Update(ctx, cluster); err != nil {
logger.Error(err, "unable to update cluster status")
if errors.IsConflict(err) {
return ctrl.Result{Requeue: true}, nil
}
return ctrl.Result{}, err
err := r.Status().Update(ctx, cluster)
if err == nil {
return ctrl.Result{}, nil
}
if errors.IsConflict(err) {
logger.V(2).Info("conflict during cluster status update")
return ctrl.Result{Requeue: true}, nil
}
return ctrl.Result{}, nil
logger.Error(err, "cannot update cluster status")
return ctrl.Result{}, err
}

// isStatefulSetReady gets managed StatefulSet and checks its readiness.
Expand Down
130 changes: 25 additions & 105 deletions internal/controller/factory/builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,123 +20,43 @@
"context"
"fmt"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/log"
)

func reconcileStatefulSet(ctx context.Context, rclient client.Client, crdName string, sts *appsv1.StatefulSet) error {
logger := log.FromContext(ctx)
logger.V(2).Info("statefulset reconciliation started")

currentSts := &appsv1.StatefulSet{}
logger.V(2).Info("statefulset found", "sts_name", currentSts.Name)

err := rclient.Get(ctx, types.NamespacedName{Namespace: sts.Namespace, Name: sts.Name}, currentSts)
func reconcileOwnedResource(ctx context.Context, c client.Client, resource client.Object) error {
gvk, err := apiutil.GVKForObject(resource, c.Scheme())
if err != nil {
if errors.IsNotFound(err) {
logger.V(2).Info("creating new statefulset", "sts_name", sts.Name, "crd_object", crdName)
return rclient.Create(ctx, sts)
}
return fmt.Errorf("cannot get existing statefulset: %s, for crd_object: %s, err: %w", sts.Name, crdName, err)
return err
}
sts.Annotations = labels.Merge(currentSts.Annotations, sts.Annotations)
logger.V(2).Info("statefulset annotations merged", "sts_annotations", sts.Annotations)
sts.Status = currentSts.Status
return rclient.Update(ctx, sts)
}

func reconcileConfigMap(ctx context.Context, rclient client.Client, crdName string, configMap *corev1.ConfigMap) error {
logger := log.FromContext(ctx)
logger.V(2).Info("configmap reconciliation started")

currentConfigMap := &corev1.ConfigMap{}
logger.V(2).Info("configmap found", "cm_name", currentConfigMap.Name)

err := rclient.Get(ctx, types.NamespacedName{Namespace: configMap.Namespace, Name: configMap.Name}, currentConfigMap)
if err != nil {
if errors.IsNotFound(err) {
logger.V(2).Info("creating new configMap", "cm_name", configMap.Name, "crd_object", crdName)
return rclient.Create(ctx, configMap)
}
return fmt.Errorf("cannot get existing configMap: %s, for crd_object: %s, err: %w", configMap.Name, crdName, err)
logger := log.FromContext(ctx).WithValues("group", gvk.GroupVersion().String(), "kind", gvk.Kind, "name", resource.GetName())
logger.V(2).Info("reconciling owned resource")

base := resource.DeepCopyObject().(client.Object)
err = c.Get(ctx, client.ObjectKeyFromObject(resource), base)
if err == nil {
logger.V(2).Info("updating owned resource")
resource.SetAnnotations(labels.Merge(base.GetAnnotations(), resource.GetAnnotations()))
resource.SetResourceVersion(base.GetResourceVersion())
logger.V(2).Info("owned resource annotations merged", "annotations", resource.GetAnnotations())

Check failure on line 44 in internal/controller/factory/builders.go

View workflow job for this annotation

GitHub Actions / nilaway-lint

error: Potential nil panic detected. Observed nil flow from source to dereference point:
return c.Update(ctx, resource)
}
configMap.Annotations = labels.Merge(currentConfigMap.Annotations, configMap.Annotations)
logger.V(2).Info("configmap annotations merged", "cm_annotations", configMap.Annotations)
return rclient.Update(ctx, configMap)
}

func reconcileService(ctx context.Context, rclient client.Client, crdName string, svc *corev1.Service) error {
logger := log.FromContext(ctx)
logger.V(2).Info("service reconciliation started")

if svc == nil {
return fmt.Errorf("service is nil for crd_object: %s", crdName)
if errors.IsNotFound(err) {
logger.V(2).Info("creating new owned resource")
return c.Create(ctx, resource)
}

currentSvc := &corev1.Service{}
logger.V(2).Info("service found", "svc_name", currentSvc.Name)

err := rclient.Get(ctx, types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name}, currentSvc)
if err != nil {
if errors.IsNotFound(err) {
logger.V(2).Info("creating new service", "svc_name", svc.Name, "crd_object", crdName)
return rclient.Create(ctx, svc)
}
return fmt.Errorf("cannot get existing service: %s, for crd_object: %s, err: %w", svc.Name, crdName, err)
}
svc.Annotations = labels.Merge(currentSvc.Annotations, svc.Annotations)
logger.V(2).Info("service annotations merged", "svc_annotations", svc.Annotations)
svc.Status = currentSvc.Status
return rclient.Update(ctx, svc)
}

// deleteManagedPdb deletes cluster PDB if it exists.
// pdb parameter should have at least metadata.name and metadata.namespace fields filled.
func deleteManagedPdb(ctx context.Context, rclient client.Client, pdb *v1.PodDisruptionBudget) error {
logger := log.FromContext(ctx)

currentPdb := &v1.PodDisruptionBudget{}
logger.V(2).Info("pdb found", "pdb_name", currentPdb.Name)

err := rclient.Get(ctx, types.NamespacedName{Namespace: pdb.Namespace, Name: pdb.Name}, currentPdb)
if err != nil {
logger.V(2).Info("error getting cluster PDB", "error", err)
return client.IgnoreNotFound(err)
}
err = rclient.Delete(ctx, currentPdb)
if err != nil {
logger.Error(err, "error deleting cluster PDB", "name", pdb.Name)
return client.IgnoreNotFound(err)
}

return nil
return fmt.Errorf("error getting owned resource: %w", err)
}

func reconcilePdb(ctx context.Context, rclient client.Client, crdName string, pdb *v1.PodDisruptionBudget) error {
logger := log.FromContext(ctx)
logger.V(2).Info("pdb reconciliation started")

currentPdb := &v1.PodDisruptionBudget{}
logger.V(2).Info("pdb found", "pdb_name", currentPdb.Name)

err := rclient.Get(ctx, types.NamespacedName{Namespace: pdb.Namespace, Name: pdb.Name}, currentPdb)
func deleteOwnedResource(ctx context.Context, c client.Client, resource client.Object) error {
gvk, err := apiutil.GVKForObject(resource, c.Scheme())
if err != nil {
if errors.IsNotFound(err) {
logger.V(2).Info("creating new PDB", "pdb_name", pdb.Name, "crd_object", crdName)
return rclient.Create(ctx, pdb)
}
logger.V(2).Info("error getting cluster PDB", "error", err)
return fmt.Errorf("cannot get existing pdb resource: %s for crd_object: %s, err: %w", pdb.Name, crdName, err)
return err
}
pdb.Annotations = labels.Merge(currentPdb.Annotations, pdb.Annotations)
logger.V(2).Info("pdb annotations merged", "pdb_annotations", pdb.Annotations)
pdb.ResourceVersion = currentPdb.ResourceVersion
pdb.Status = currentPdb.Status
return rclient.Update(ctx, pdb)
logger := log.FromContext(ctx).WithValues("group", gvk.GroupVersion().String(), "kind", gvk.Kind, "name", resource.GetName())
logger.V(2).Info("deleting owned resource")
return client.IgnoreNotFound(c.Delete(ctx, resource))
}
10 changes: 3 additions & 7 deletions internal/controller/factory/configMap.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,12 @@ import (
"context"
"fmt"

etcdaenixiov1alpha1 "github.com/aenix-io/etcd-operator/api/v1alpha1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"sigs.k8s.io/controller-runtime/pkg/log"

etcdaenixiov1alpha1 "github.com/aenix-io/etcd-operator/api/v1alpha1"
)

func GetClusterStateConfigMapName(cluster *etcdaenixiov1alpha1.EtcdCluster) string {
Expand All @@ -39,7 +36,6 @@ func CreateOrUpdateClusterStateConfigMap(
ctx context.Context,
cluster *etcdaenixiov1alpha1.EtcdCluster,
rclient client.Client,
rscheme *runtime.Scheme,
) error {
initialCluster := ""
clusterService := fmt.Sprintf("%s.%s.svc:2380", GetHeadlessServiceName(cluster), cluster.Namespace)
Expand Down Expand Up @@ -73,11 +69,11 @@ func CreateOrUpdateClusterStateConfigMap(
}
logger.V(2).Info("configmap spec generated", "cm_name", configMap.Name, "cm_spec", configMap.Data)

if err := ctrl.SetControllerReference(cluster, configMap, rscheme); err != nil {
if err := ctrl.SetControllerReference(cluster, configMap, rclient.Scheme()); err != nil {
return fmt.Errorf("cannot set controller reference: %w", err)
}

return reconcileConfigMap(ctx, rclient, cluster.Name, configMap)
return reconcileOwnedResource(ctx, rclient, configMap)
}

// isEtcdClusterReady returns true if condition "Ready" has progressed
Expand Down
10 changes: 4 additions & 6 deletions internal/controller/factory/configmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package factory
import (
"github.com/google/uuid"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/utils/ptr"
. "sigs.k8s.io/controller-runtime/pkg/envtest/komega"

Expand Down Expand Up @@ -88,7 +87,7 @@ var _ = Describe("CreateOrUpdateClusterStateConfigMap handlers", func() {
It("should successfully ensure the configmap", func(ctx SpecContext) {
var configMapUID types.UID
By("processing new etcd cluster", func() {
Expect(CreateOrUpdateClusterStateConfigMap(ctx, &etcdcluster, k8sClient, k8sClient.Scheme())).To(Succeed())
Expect(CreateOrUpdateClusterStateConfigMap(ctx, &etcdcluster, k8sClient)).To(Succeed())
Eventually(Get(&configMap)).Should(Succeed())
Expect(configMap.Data["ETCD_INITIAL_CLUSTER_STATE"]).To(Equal("new"))
configMapUID = configMap.GetUID()
Expand All @@ -99,7 +98,7 @@ var _ = Describe("CreateOrUpdateClusterStateConfigMap handlers", func() {
WithReason(string(etcdaenixiov1alpha1.EtcdCondTypeStatefulSetReady)).
WithStatus(true).
Complete())
Expect(CreateOrUpdateClusterStateConfigMap(ctx, &etcdcluster, k8sClient, k8sClient.Scheme())).To(Succeed())
Expect(CreateOrUpdateClusterStateConfigMap(ctx, &etcdcluster, k8sClient)).To(Succeed())
Eventually(Object(&configMap)).Should(HaveField("ObjectMeta.UID", Equal(configMapUID)))
Expect(configMap.Data["ETCD_INITIAL_CLUSTER_STATE"]).To(Equal("existing"))
})
Expand All @@ -109,15 +108,14 @@ var _ = Describe("CreateOrUpdateClusterStateConfigMap handlers", func() {
WithReason(string(etcdaenixiov1alpha1.EtcdCondTypeWaitingForFirstQuorum)).
WithStatus(true).
Complete())
Expect(CreateOrUpdateClusterStateConfigMap(ctx, &etcdcluster, k8sClient, k8sClient.Scheme())).To(Succeed())
Expect(CreateOrUpdateClusterStateConfigMap(ctx, &etcdcluster, k8sClient)).To(Succeed())
Eventually(Object(&configMap)).Should(HaveField("ObjectMeta.UID", Equal(configMapUID)))
Expect(configMap.Data["ETCD_INITIAL_CLUSTER_STATE"]).To(Equal("new"))
})
})

It("should fail to create the configmap with invalid owner reference", func(ctx SpecContext) {
emptyScheme := runtime.NewScheme()
Expect(CreateOrUpdateClusterStateConfigMap(ctx, &etcdcluster, k8sClient, emptyScheme)).NotTo(Succeed())
Expect(CreateOrUpdateClusterStateConfigMap(ctx, &etcdcluster, clientWithEmptyScheme)).NotTo(Succeed())
})
})
})
8 changes: 3 additions & 5 deletions internal/controller/factory/pdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
etcdaenixiov1alpha1 "github.com/aenix-io/etcd-operator/api/v1alpha1"
v1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -35,10 +34,9 @@ func CreateOrUpdatePdb(
ctx context.Context,
cluster *etcdaenixiov1alpha1.EtcdCluster,
rclient client.Client,
rscheme *runtime.Scheme,
) error {
if cluster.Spec.PodDisruptionBudgetTemplate == nil {
return deleteManagedPdb(ctx, rclient, &v1.PodDisruptionBudget{
return deleteOwnedResource(ctx, rclient, &v1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Namespace: cluster.Namespace,
Name: cluster.Name,
Expand Down Expand Up @@ -67,9 +65,9 @@ func CreateOrUpdatePdb(

logger.V(2).Info("pdb spec generated", "pdb_name", pdb.Name, "pdb_spec", pdb.Spec)

if err := ctrl.SetControllerReference(cluster, pdb, rscheme); err != nil {
if err := ctrl.SetControllerReference(cluster, pdb, rclient.Scheme()); err != nil {
return fmt.Errorf("cannot set controller reference: %w", err)
}

return reconcilePdb(ctx, rclient, cluster.Name, pdb)
return reconcileOwnedResource(ctx, rclient, pdb)
}
10 changes: 5 additions & 5 deletions internal/controller/factory/pdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,15 @@ var _ = Describe("CreateOrUpdatePdb handlers", func() {

It("should create PDB with pre-filled data", func(ctx SpecContext) {
etcdcluster.Spec.PodDisruptionBudgetTemplate.Spec.MinAvailable = ptr.To(intstr.FromInt32(int32(3)))
Expect(CreateOrUpdatePdb(ctx, &etcdcluster, k8sClient, k8sClient.Scheme())).To(Succeed())
Expect(CreateOrUpdatePdb(ctx, &etcdcluster, k8sClient)).To(Succeed())
Eventually(Get(&podDisruptionBudget)).Should(Succeed())
Expect(podDisruptionBudget.Spec.MinAvailable).NotTo(BeNil())
Expect(podDisruptionBudget.Spec.MinAvailable.IntValue()).To(Equal(3))
Expect(podDisruptionBudget.Spec.MaxUnavailable).To(BeNil())
})

It("should create PDB with empty data", func(ctx SpecContext) {
Expect(CreateOrUpdatePdb(ctx, &etcdcluster, k8sClient, k8sClient.Scheme())).To(Succeed())
Expect(CreateOrUpdatePdb(ctx, &etcdcluster, k8sClient)).To(Succeed())
Eventually(Get(&podDisruptionBudget)).Should(Succeed())
Expect(etcdcluster.Spec.PodDisruptionBudgetTemplate.Spec.MinAvailable).To(BeNil())
Expect(podDisruptionBudget.Spec.MinAvailable.IntValue()).To(Equal(2))
Expand All @@ -104,14 +104,14 @@ var _ = Describe("CreateOrUpdatePdb handlers", func() {

It("should skip deletion of PDB if not filled and not exist", func(ctx SpecContext) {
etcdcluster.Spec.PodDisruptionBudgetTemplate = nil
Expect(CreateOrUpdatePdb(ctx, &etcdcluster, k8sClient, k8sClient.Scheme())).NotTo(HaveOccurred())
Expect(CreateOrUpdatePdb(ctx, &etcdcluster, k8sClient)).NotTo(HaveOccurred())
})

It("should delete created PDB after updating CR", func(ctx SpecContext) {
Expect(CreateOrUpdatePdb(ctx, &etcdcluster, k8sClient, k8sClient.Scheme())).To(Succeed())
Expect(CreateOrUpdatePdb(ctx, &etcdcluster, k8sClient)).To(Succeed())
Eventually(Get(&podDisruptionBudget)).Should(Succeed())
etcdcluster.Spec.PodDisruptionBudgetTemplate = nil
Expect(CreateOrUpdatePdb(ctx, &etcdcluster, k8sClient, k8sClient.Scheme())).NotTo(HaveOccurred())
Expect(CreateOrUpdatePdb(ctx, &etcdcluster, k8sClient)).NotTo(HaveOccurred())
err = Get(&podDisruptionBudget)()
Expect(apierrors.IsNotFound(err)).To(BeTrue())
})
Expand Down
Loading
Loading