From 33d925ad7f1bcbf442017f11cc39220181deefcb Mon Sep 17 00:00:00 2001 From: Timofei Larkin Date: Tue, 5 Nov 2024 23:17:13 +0300 Subject: [PATCH] move high level logic to reconciler methods --- Dockerfile | 1 + internal/controller/etcdcluster_controller.go | 103 ++++++++++-------- internal/controller/observables.go | 10 ++ 3 files changed, 71 insertions(+), 43 deletions(-) diff --git a/Dockerfile b/Dockerfile index ab23db9f..390ecef7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -13,6 +13,7 @@ RUN go mod download # Copy the go source COPY cmd/ ./cmd/ COPY api/ ./api/ +COPY pkg/ ./pkg/ COPY internal/ ./internal/ # Build diff --git a/internal/controller/etcdcluster_controller.go b/internal/controller/etcdcluster_controller.go index f6287fd3..f7984566 100644 --- a/internal/controller/etcdcluster_controller.go +++ b/internal/controller/etcdcluster_controller.go @@ -75,7 +75,7 @@ type EtcdClusterReconciler struct { // Reconcile checks CR and current cluster state and performs actions to transform current state to desired. func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { log.Debug(ctx, "reconciling object") - state := observables{} + state := &observables{} state.instance = &etcdaenixiov1alpha1.EtcdCluster{} err := r.Get(ctx, req.NamespacedName, state.instance) if err != nil { @@ -92,7 +92,7 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) } // create two services and the pdb - err = r.ensureUnconditionalObjects(ctx, state.instance) + err = r.ensureUnconditionalObjects(ctx, state) if err != nil { return ctrl.Result{}, err } @@ -123,7 +123,7 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) if !state.endpointsFound { if !state.stsExists { - return r.createClusterFromScratch(ctx, &state) // TODO: needs implementing + return r.createClusterFromScratch(ctx, state) // TODO: needs implementing } // update sts pod template (and only pod template) if it doesn't match desired state @@ -146,7 +146,7 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, nil } - return r.scaleUpFromZero(ctx, &state) // TODO: needs implementing + return ctrl.Result{}, r.scaleUpFromZero(ctx) // TODO: needs implementing } // get status of every endpoint and member list from every endpoint @@ -174,7 +174,7 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) } if !memberReached { - return r.createOrUpdateStatefulSet(ctx, &state, state.instance) + return ctrl.Result{}, r.createOrUpdateStatefulSet(ctx) } state.setClusterID() @@ -189,7 +189,7 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) Message: string(etcdaenixiov1alpha1.EtcdErrorCondSplitbrainMessage), }, ) - return r.updateStatus(ctx, state.instance) + return r.updateStatus(ctx, state) } if !state.clusterHasQuorum() { @@ -198,19 +198,19 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) } if state.hasLearners() { - return ctrl.Result{}, r.promoteLearners(ctx, &state) + return ctrl.Result{}, r.promoteLearners(ctx) } - if err := r.createOrUpdateClusterStateConfigMap(ctx, &state); err != nil { + if err := r.createOrUpdateClusterStateConfigMap(ctx); err != nil { return ctrl.Result{}, err } if !state.statefulSetPodSpecCorrect() { - return ctrl.Result{}, r.createOrUpdateStatefulSet(ctx, &state) + return ctrl.Result{}, r.createOrUpdateStatefulSet(ctx) } // if size is different we have to remove statefulset it will be recreated in the next step - if err := r.checkAndDeleteStatefulSetIfNecessary(ctx, &state, state.instance); err != nil { + if err := r.checkAndDeleteStatefulSetIfNecessary(ctx, state); err != nil { return ctrl.Result{}, err } @@ -226,17 +226,17 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) }, ) */ - return r.updateStatus(ctx, state.instance) + return r.updateStatus(ctx, state) } // checkAndDeleteStatefulSetIfNecessary deletes the StatefulSet if the specified storage size has changed. -func (r *EtcdClusterReconciler) checkAndDeleteStatefulSetIfNecessary(ctx context.Context, state *observables, instance *etcdaenixiov1alpha1.EtcdCluster) error { +func (r *EtcdClusterReconciler) checkAndDeleteStatefulSetIfNecessary(ctx context.Context, state *observables) error { for _, volumeClaimTemplate := range state.statefulSet.Spec.VolumeClaimTemplates { if volumeClaimTemplate.Name != "data" { continue } currentStorage := volumeClaimTemplate.Spec.Resources.Requests[corev1.ResourceStorage] - desiredStorage := instance.Spec.Storage.VolumeClaimTemplate.Spec.Resources.Requests[corev1.ResourceStorage] + desiredStorage := state.instance.Spec.Storage.VolumeClaimTemplate.Spec.Resources.Requests[corev1.ResourceStorage] if desiredStorage.Cmp(currentStorage) != 0 { deletePolicy := metav1.DeletePropagationOrphan log.Info(ctx, "Deleting StatefulSet due to storage change", "statefulSet", state.statefulSet.Name) @@ -252,21 +252,20 @@ func (r *EtcdClusterReconciler) checkAndDeleteStatefulSetIfNecessary(ctx context } // ensureConditionalClusterObjects creates or updates all objects owned by cluster CR -func (r *EtcdClusterReconciler) ensureConditionalClusterObjects( - ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) error { +func (r *EtcdClusterReconciler) ensureConditionalClusterObjects(ctx context.Context, state *observables) error { - if err := factory.CreateOrUpdateClusterStateConfigMap(ctx, cluster, r.Client); err != nil { + if err := factory.CreateOrUpdateClusterStateConfigMap(ctx, state.instance, r.Client); err != nil { log.Error(ctx, err, "reconcile cluster state configmap failed") return err } log.Debug(ctx, "cluster state configmap reconciled") - if err := factory.CreateOrUpdateStatefulSet(ctx, cluster, r.Client); err != nil { + if err := factory.CreateOrUpdateStatefulSet(ctx, state.instance, r.Client); err != nil { log.Error(ctx, err, "reconcile statefulset failed") return err } - if err := factory.UpdatePersistentVolumeClaims(ctx, cluster, r.Client); err != nil { + if err := factory.UpdatePersistentVolumeClaims(ctx, state.instance, r.Client); err != nil { log.Error(ctx, err, "reconcile persistentVolumeClaims failed") return err } @@ -276,11 +275,11 @@ func (r *EtcdClusterReconciler) ensureConditionalClusterObjects( } // updateStatusOnErr wraps error and updates EtcdCluster status -func (r *EtcdClusterReconciler) updateStatusOnErr(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster, err error) (ctrl.Result, error) { +func (r *EtcdClusterReconciler) updateStatusOnErr(ctx context.Context, state *observables, err error) (ctrl.Result, error) { // The function 'updateStatusOnErr' will always return non-nil error. Hence, the ctrl.Result will always be ignored. // Therefore, the ctrl.Result returned by 'updateStatus' function can be discarded. // REF: https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/reconcile@v0.17.3#Reconciler - _, statusErr := r.updateStatus(ctx, cluster) + _, statusErr := r.updateStatus(ctx, state) if statusErr != nil { return ctrl.Result{}, goerrors.Join(statusErr, err) } @@ -288,8 +287,8 @@ func (r *EtcdClusterReconciler) updateStatusOnErr(ctx context.Context, cluster * } // updateStatus updates EtcdCluster status and returns error and requeue in case status could not be updated due to conflict -func (r *EtcdClusterReconciler) updateStatus(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) (ctrl.Result, error) { - err := r.Status().Update(ctx, cluster) +func (r *EtcdClusterReconciler) updateStatus(ctx context.Context, state *observables) (ctrl.Result, error) { + err := r.Status().Update(ctx, state.instance) if err == nil { return ctrl.Result{}, nil } @@ -302,9 +301,9 @@ func (r *EtcdClusterReconciler) updateStatus(ctx context.Context, cluster *etcda } // isStatefulSetReady gets managed StatefulSet and checks its readiness. -func (r *EtcdClusterReconciler) isStatefulSetReady(ctx context.Context, c *etcdaenixiov1alpha1.EtcdCluster) (bool, error) { +func (r *EtcdClusterReconciler) isStatefulSetReady(ctx context.Context, state *observables) (bool, error) { sts := &appsv1.StatefulSet{} - err := r.Get(ctx, client.ObjectKeyFromObject(c), sts) + err := r.Get(ctx, client.ObjectKeyFromObject(state.instance), sts) if err == nil { return sts.Status.ReadyReplicas == *sts.Spec.Replicas, nil } @@ -322,11 +321,11 @@ func (r *EtcdClusterReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } -func (r *EtcdClusterReconciler) configureAuth(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) error { +func (r *EtcdClusterReconciler) configureAuth(ctx context.Context, state *observables) error { var err error - cli, err := r.GetEtcdClient(ctx, cluster) + cli, err := r.GetEtcdClient(ctx, state.instance) if err != nil { return err } @@ -342,7 +341,7 @@ func (r *EtcdClusterReconciler) configureAuth(ctx context.Context, cluster *etcd auth := clientv3.NewAuth(cli) - if cluster.Spec.Security != nil && cluster.Spec.Security.EnableAuth { + if state.instance.Spec.Security != nil && state.instance.Spec.Security.EnableAuth { if err := r.createRoleIfNotExists(ctx, auth, "root"); err != nil { return err @@ -393,12 +392,12 @@ func testMemberList(ctx context.Context, cli *clientv3.Client) error { return err } -func (r *EtcdClusterReconciler) GetEtcdClient(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) (*clientv3.Client, error) { +func (r *EtcdClusterReconciler) GetEtcdClient(ctx context.Context, instance *etcdaenixiov1alpha1.EtcdCluster) (*clientv3.Client, error) { - endpoints := getEndpointsSlice(cluster) + endpoints := getEndpointsSlice(instance) log.Debug(ctx, "endpoints built", "endpoints", endpoints) - tlsConfig, err := r.getTLSConfig(ctx, cluster) + tlsConfig, err := r.getTLSConfig(ctx, instance) if err != nil { log.Error(ctx, err, "failed to build tls config") return nil, err @@ -421,17 +420,17 @@ func (r *EtcdClusterReconciler) GetEtcdClient(ctx context.Context, cluster *etcd } -func (r *EtcdClusterReconciler) getTLSConfig(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) (*tls.Config, error) { +func (r *EtcdClusterReconciler) getTLSConfig(ctx context.Context, instance *etcdaenixiov1alpha1.EtcdCluster) (*tls.Config, error) { var err error caCertPool := &x509.CertPool{} - if cluster.IsServerTrustedCADefined() { + if instance.IsServerTrustedCADefined() { serverCASecret := &corev1.Secret{} - if err = r.Get(ctx, client.ObjectKey{Namespace: cluster.Namespace, Name: cluster.Spec.Security.TLS.ServerTrustedCASecret}, serverCASecret); err != nil { + if err = r.Get(ctx, client.ObjectKey{Namespace: instance.Namespace, Name: instance.Spec.Security.TLS.ServerTrustedCASecret}, serverCASecret); err != nil { log.Error(ctx, err, "failed to get server trusted CA secret") return nil, err } @@ -448,10 +447,10 @@ func (r *EtcdClusterReconciler) getTLSConfig(ctx context.Context, cluster *etcda cert := tls.Certificate{} - if cluster.IsClientSecurityEnabled() { + if instance.IsClientSecurityEnabled() { rootSecret := &corev1.Secret{} - if err = r.Get(ctx, client.ObjectKey{Namespace: cluster.Namespace, Name: cluster.Spec.Security.TLS.ClientSecret}, rootSecret); err != nil { + if err = r.Get(ctx, client.ObjectKey{Namespace: instance.Namespace, Name: instance.Spec.Security.TLS.ClientSecret}, rootSecret); err != nil { log.Error(ctx, err, "failed to get root client secret") return nil, err } @@ -465,7 +464,7 @@ func (r *EtcdClusterReconciler) getTLSConfig(ctx context.Context, cluster *etcda } tlsConfig := &tls.Config{ - InsecureSkipVerify: !cluster.IsServerTrustedCADefined(), + InsecureSkipVerify: !instance.IsServerTrustedCADefined(), RootCAs: caCertPool, Certificates: []tls.Certificate{ cert, @@ -602,7 +601,7 @@ func (r *EtcdClusterReconciler) disableAuth(ctx context.Context, authClient clie // ensureUnconditionalObjects creates the two services and the PDB // which can be created at the start of the reconciliation loop // without any risk of disrupting the etcd cluster -func (r *EtcdClusterReconciler) ensureUnconditionalObjects(ctx context.Context, instance *etcdaenixiov1alpha1.EtcdCluster) error { +func (r *EtcdClusterReconciler) ensureUnconditionalObjects(ctx context.Context, state *observables) error { const concurrentOperations = 3 c := make(chan error) defer close(c) @@ -620,7 +619,7 @@ func (r *EtcdClusterReconciler) ensureUnconditionalObjects(ctx context.Context, defer wg.Done() select { case <-ctx.Done(): - case c <- wrapWithMsg(factory.CreateOrUpdateClientService(ctx, instance, r.Client), + case c <- wrapWithMsg(factory.CreateOrUpdateClientService(ctx, state.instance, r.Client), "couldn't ensure client service"): } }(c) @@ -628,7 +627,7 @@ func (r *EtcdClusterReconciler) ensureUnconditionalObjects(ctx context.Context, defer wg.Done() select { case <-ctx.Done(): - case c <- wrapWithMsg(factory.CreateOrUpdateHeadlessService(ctx, instance, r.Client), + case c <- wrapWithMsg(factory.CreateOrUpdateHeadlessService(ctx, state.instance, r.Client), "couldn't ensure headless service"): } }(c) @@ -636,7 +635,7 @@ func (r *EtcdClusterReconciler) ensureUnconditionalObjects(ctx context.Context, defer wg.Done() select { case <-ctx.Done(): - case c <- wrapWithMsg(factory.CreateOrUpdatePdb(ctx, instance, r.Client), + case c <- wrapWithMsg(factory.CreateOrUpdatePdb(ctx, state.instance, r.Client), "couldn't ensure pod disruption budget"): } }(c) @@ -698,14 +697,32 @@ func (r *EtcdClusterReconciler) createClusterFromScratch(ctx context.Context, st ) // ensure managed resources - if err = r.ensureConditionalClusterObjects(ctx, state.instance); err != nil { - return r.updateStatusOnErr(ctx, state.instance, fmt.Errorf("cannot create Cluster auxiliary objects: %w", err)) + if err = r.ensureConditionalClusterObjects(ctx, state); err != nil { + return r.updateStatusOnErr(ctx, state, fmt.Errorf("cannot create Cluster auxiliary objects: %w", err)) } panic("not yet implemented") } // TODO! // nolint:unused -func (r *EtcdClusterReconciler) scaleUpFromZero(ctx context.Context, state *observables) (ctrl.Result, error) { +func (r *EtcdClusterReconciler) scaleUpFromZero(ctx context.Context) error { + panic("not yet implemented") +} + +// TODO! +// nolint:unused +func (r *EtcdClusterReconciler) createOrUpdateClusterStateConfigMap(ctx context.Context) error { + panic("not yet implemented") +} + +// TODO! +// nolint:unused +func (r *EtcdClusterReconciler) createOrUpdateStatefulSet(ctx context.Context) error { + panic("not yet implemented") +} + +// TODO! +// nolint:unused +func (r *EtcdClusterReconciler) promoteLearners(ctx context.Context) error { panic("not yet implemented") } diff --git a/internal/controller/observables.go b/internal/controller/observables.go index 77ea461e..34102a57 100644 --- a/internal/controller/observables.go +++ b/internal/controller/observables.go @@ -183,3 +183,13 @@ func (o *observables) statefulSetPodSpecCorrect() bool { func (o *observables) statefulSetReady() bool { return o.statefulSet.Status.ReadyReplicas == *o.statefulSet.Spec.Replicas } + +// TODO: +func (o *observables) clusterHasQuorum() bool { + return false +} + +// TODO: +func (o *observables) hasLearners() bool { + return false +}