From 3f7ae6c123dab01da9e9962e6bbf06e8b4638e81 Mon Sep 17 00:00:00 2001 From: Timofei Larkin Date: Sun, 12 May 2024 14:32:40 +0300 Subject: [PATCH 1/5] Extended status check for reconciliation --- internal/controller/etcdcluster_controller.go | 52 ++++++++++++++++ internal/controller/factory/etcd_client.go | 59 +++++++++++++++++++ 2 files changed, 111 insertions(+) create mode 100644 internal/controller/factory/etcd_client.go diff --git a/internal/controller/etcdcluster_controller.go b/internal/controller/etcdcluster_controller.go index d383449..1f6ce78 100644 --- a/internal/controller/etcdcluster_controller.go +++ b/internal/controller/etcdcluster_controller.go @@ -48,6 +48,7 @@ type EtcdClusterReconciler struct { // +kubebuilder:rbac:groups=etcd.aenix.io,resources=etcdclusters,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=etcd.aenix.io,resources=etcdclusters/status,verbs=get;update;patch // +kubebuilder:rbac:groups=etcd.aenix.io,resources=etcdclusters/finalizers,verbs=update +// +kubebuilder:rbac:groups="",resources=endpoints,verbs=get;list;watch // +kubebuilder:rbac:groups="",resources=configmaps,verbs=get;list;watch;create;update;watch;delete;patch // +kubebuilder:rbac:groups="",resources=services,verbs=get;create;delete;update;patch;list;watch // +kubebuilder:rbac:groups="apps",resources=statefulsets,verbs=get;create;delete;update;patch;list;watch @@ -72,6 +73,57 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) return reconcile.Result{}, nil } + sts := appsv1.StatefulSet{} + // create two services and the pdb, try fetching the sts + { + c := make(chan error) + go func(chan<- error) { + err := factory.CreateOrUpdateClientService(ctx, instance, r.Client) + if err != nil { + err = fmt.Errorf("couldn't ensure client service: %w", err) + } + c <- err + }(c) + go func(chan<- error) { + err := factory.CreateOrUpdateHeadlessService(ctx, instance, r.Client) + if err != nil { + err = fmt.Errorf("couldn't ensure headless service: %w", err) + } + c <- err + }(c) + go func(chan<- error) { + err := factory.CreateOrUpdatePdb(ctx, instance, r.Client) + if err != nil { + err = fmt.Errorf("couldn't ensure pod disruption budget: %w", err) + } + c <- err + }(c) + go func(chan<- error) { + err := r.Get(ctx, req.NamespacedName, &sts) + if client.IgnoreNotFound(err) != nil { + err = fmt.Errorf("couldn't get statefulset: %w", err) + } + c <- err + }(c) + for i := 0; i < 4; i++ { + if err := <-c; err != nil { + return ctrl.Result{}, err + } + } + } + /* + clusterClient, singleClients, err := factory.NewEtcdClientSet(ctx, instance, r.Client) + if err != nil { + return ctrl.Result{}, err + } + if clusterClient == nil || singleClients == nil { + // TODO: no endpoints case + + } + if sts.UID != "" { + r.Patch() + } + */ // fill conditions if len(instance.Status.Conditions) == 0 { factory.FillConditions(instance) diff --git a/internal/controller/factory/etcd_client.go b/internal/controller/factory/etcd_client.go new file mode 100644 index 0000000..f10f295 --- /dev/null +++ b/internal/controller/factory/etcd_client.go @@ -0,0 +1,59 @@ +package factory + +import ( + "context" + "fmt" + + "github.com/aenix-io/etcd-operator/api/v1alpha1" + clientv3 "go.etcd.io/etcd/client/v3" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func NewEtcdClientSet(ctx context.Context, cluster *v1alpha1.EtcdCluster, client client.Client) (*clientv3.Client, []*clientv3.Client, error) { + cfg, err := configFromCluster(ctx, cluster, client) + if err != nil { + return nil, nil, err + } + if len(cfg.Endpoints) == 0 { + return nil, nil, nil + } + eps := cfg.Endpoints + clusterClient, err := clientv3.New(cfg) + if err != nil { + return nil, nil, fmt.Errorf("error building etcd cluster client: %w", err) + } + singleClients := make([]*clientv3.Client, len(eps)) + for i, ep := range eps { + cfg.Endpoints = []string{ep} + singleClients[i], err = clientv3.New(cfg) + if err != nil { + return nil, nil, fmt.Errorf("error building etcd single-endpoint client for endpoint %s: %w", ep, err) + } + } + return clusterClient, singleClients, nil +} + +func configFromCluster(ctx context.Context, cluster *v1alpha1.EtcdCluster, client client.Client) (clientv3.Config, error) { + ep := v1.Endpoints{} + err := client.Get(ctx, types.NamespacedName{Name: GetHeadlessServiceName(cluster), Namespace: cluster.Namespace}, &ep) + if err != nil { + return clientv3.Config{}, err + } + names := map[string]struct{}{} + urls := make([]string, 0, 8) + for _, v := range ep.Subsets { + for _, addr := range v.Addresses { + names[addr.Hostname] = struct{}{} + } + for _, addr := range v.NotReadyAddresses { + names[addr.Hostname] = struct{}{} + } + } + for name := range names { + urls = append(urls, fmt.Sprintf("%s:%s", name, "2379")) + } + + return clientv3.Config{Endpoints: urls}, nil +} From a6e16e9f8557831d0b40dabef88ba01f9de1bdf0 Mon Sep 17 00:00:00 2001 From: Timofei Larkin Date: Mon, 17 Jun 2024 20:25:03 +0300 Subject: [PATCH 2/5] make struct for observed state of cluster and factor out some helper functions --- internal/controller/etcdcluster_controller.go | 48 ++++++++++---- internal/controller/observables.go | 66 +++++++++++++++++++ 2 files changed, 100 insertions(+), 14 deletions(-) create mode 100644 internal/controller/observables.go diff --git a/internal/controller/etcdcluster_controller.go b/internal/controller/etcdcluster_controller.go index b01576b..2bf70ac 100644 --- a/internal/controller/etcdcluster_controller.go +++ b/internal/controller/etcdcluster_controller.go @@ -25,6 +25,7 @@ import ( "slices" "strconv" "strings" + "sync" "time" "github.com/aenix-io/etcd-operator/internal/log" @@ -47,6 +48,10 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" ) +const ( + etcdDefaultTimeout = 5 * time.Second +) + // EtcdClusterReconciler reconciles a EtcdCluster object type EtcdClusterReconciler struct { client.Client @@ -81,7 +86,8 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) return reconcile.Result{}, nil } - sts := appsv1.StatefulSet{} + state := observables{} + // create two services and the pdb, try fetching the sts { c := make(chan error) @@ -107,11 +113,11 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) c <- err }(c) go func(chan<- error) { - err := r.Get(ctx, req.NamespacedName, &sts) + err := r.Get(ctx, req.NamespacedName, &state.statefulSet) if client.IgnoreNotFound(err) != nil { err = fmt.Errorf("couldn't get statefulset: %w", err) } - c <- err + c <- client.IgnoreNotFound(err) }(c) for i := 0; i < 4; i++ { if err := <-c; err != nil { @@ -119,19 +125,33 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) } } } - /* - clusterClient, singleClients, err := factory.NewEtcdClientSet(ctx, instance, r.Client) - if err != nil { - return ctrl.Result{}, err - } - if clusterClient == nil || singleClients == nil { - // TODO: no endpoints case + clusterClient, singleClients, err := factory.NewEtcdClientSet(ctx, instance, r.Client) + if err != nil { + return ctrl.Result{}, err + } + state.endpointsFound = clusterClient != nil && singleClients != nil + state.stsExists = state.statefulSet.UID != "" + // get status of every endpoint and member list from every endpoint + state.etcdStatuses = make([]etcdStatus, len(singleClients)) + { + var wg sync.WaitGroup + ctx, cancel := context.WithTimeout(ctx, etcdDefaultTimeout) + for i := range singleClients { + go func(i int) { + wg.Add(1) + defer wg.Done() + state.etcdStatuses[i].fill(ctx, singleClients[i]) + }(i) } - if sts.UID != "" { - r.Patch() - } - */ + wg.Wait() + cancel() + } + state.setClusterID() + if state.inSplitbrain() { + log.Error(ctx, fmt.Errorf("etcd cluster in splitbrain"), "etcd cluster in splitbrain, dropping from reconciliation queue") + return ctrl.Result{}, nil + } // fill conditions if len(instance.Status.Conditions) == 0 { factory.FillConditions(instance) diff --git a/internal/controller/observables.go b/internal/controller/observables.go new file mode 100644 index 0000000..8c9a2b3 --- /dev/null +++ b/internal/controller/observables.go @@ -0,0 +1,66 @@ +package controller + +import ( + "context" + "sync" + + clientv3 "go.etcd.io/etcd/client/v3" + appsv1 "k8s.io/api/apps/v1" +) + +// etcdStatus holds the details of the status that an etcd endpoint +// can return about itself, i.e. its own status and its perceived +// member list +type etcdStatus struct { + endpointStatus *clientv3.StatusResponse + endpointStatusError error + memberList *clientv3.MemberListResponse + memberListError error +} + +// observables stores observations that the operator can make about +// states of objects in kubernetes +type observables struct { + statefulSet appsv1.StatefulSet + stsExists bool + endpointsFound bool + etcdStatuses []etcdStatus + clusterID uint64 + endpointsReached int +} + +// setClusterID populates the clusterID field based on etcdStatuses +func (o *observables) setClusterID() { + for i := range o.etcdStatuses { + if o.etcdStatuses[i].endpointStatus != nil { + o.clusterID = o.etcdStatuses[i].endpointStatus.Header.ClusterId + return + } + } +} + +// inSplitbrain compares clusterID field with clusterIDs in etcdStatuses. +// If more than one unique ID is reported, cluster is in splitbrain. +func (o *observables) inSplitbrain() bool { + for i := range o.etcdStatuses { + if o.etcdStatuses[i].endpointStatus != nil { + if o.clusterID != o.etcdStatuses[i].endpointStatus.Header.ClusterId { + return true + } + } + } + return false +} + +// fill takes a single-endpoint client and populates the fields of etcdStatus +// with the endpoint's status and its perceived member list. +func (s *etcdStatus) fill(ctx context.Context, c *clientv3.Client) { + var wg sync.WaitGroup + go func() { + wg.Add(1) + defer wg.Done() + s.endpointStatus, s.endpointStatusError = c.Status(ctx, c.Endpoints()[0]) + }() + s.memberList, s.memberListError = c.MemberList(ctx) + wg.Wait() +} From 3f2231b625f0d4074a3d293da7d282c4fa3ecfd2 Mon Sep 17 00:00:00 2001 From: Timofei Larkin Date: Tue, 25 Jun 2024 14:44:24 +0300 Subject: [PATCH 3/5] Separate creation of owned objects into conditional and unconditional --- config/rbac/role.yaml | 8 ++ internal/controller/etcdcluster_controller.go | 136 ++++++++++-------- internal/controller/factory/etcd_client.go | 14 +- internal/controller/observables.go | 16 ++- 4 files changed, 109 insertions(+), 65 deletions(-) diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 1d3153c..69ccd7d 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -16,6 +16,14 @@ rules: - patch - update - watch +- apiGroups: + - "" + resources: + - endpoints + verbs: + - get + - list + - watch - apiGroups: - "" resources: diff --git a/internal/controller/etcdcluster_controller.go b/internal/controller/etcdcluster_controller.go index 2bf70ac..b086cec 100644 --- a/internal/controller/etcdcluster_controller.go +++ b/internal/controller/etcdcluster_controller.go @@ -88,49 +88,31 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) state := observables{} - // create two services and the pdb, try fetching the sts - { - c := make(chan error) - go func(chan<- error) { - err := factory.CreateOrUpdateClientService(ctx, instance, r.Client) - if err != nil { - err = fmt.Errorf("couldn't ensure client service: %w", err) - } - c <- err - }(c) - go func(chan<- error) { - err := factory.CreateOrUpdateHeadlessService(ctx, instance, r.Client) - if err != nil { - err = fmt.Errorf("couldn't ensure headless service: %w", err) - } - c <- err - }(c) - go func(chan<- error) { - err := factory.CreateOrUpdatePdb(ctx, instance, r.Client) - if err != nil { - err = fmt.Errorf("couldn't ensure pod disruption budget: %w", err) - } - c <- err - }(c) - go func(chan<- error) { - err := r.Get(ctx, req.NamespacedName, &state.statefulSet) - if client.IgnoreNotFound(err) != nil { - err = fmt.Errorf("couldn't get statefulset: %w", err) - } - c <- client.IgnoreNotFound(err) - }(c) - for i := 0; i < 4; i++ { - if err := <-c; err != nil { - return ctrl.Result{}, err - } - } + // create two services and the pdb + err = r.ensureUnconditionalObjects(ctx, instance) + if err != nil { + return ctrl.Result{}, err + } + + // fetch STS if exists + err = r.Get(ctx, req.NamespacedName, &state.statefulSet) + if client.IgnoreNotFound(err) != nil { + return ctrl.Result{}, fmt.Errorf("couldn't get statefulset: %w", err) } + state.stsExists = state.statefulSet.UID != "" + + // fetch endpoints clusterClient, singleClients, err := factory.NewEtcdClientSet(ctx, instance, r.Client) if err != nil { return ctrl.Result{}, err } state.endpointsFound = clusterClient != nil && singleClients != nil - state.stsExists = state.statefulSet.UID != "" + + if !state.endpointsFound { + if !state.stsExists { + // TODO: happy path for new cluster creation + } + } // get status of every endpoint and member list from every endpoint state.etcdStatuses = make([]etcdStatus, len(singleClients)) @@ -138,8 +120,8 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) var wg sync.WaitGroup ctx, cancel := context.WithTimeout(ctx, etcdDefaultTimeout) for i := range singleClients { + wg.Add(1) go func(i int) { - wg.Add(1) defer wg.Done() state.etcdStatuses[i].fill(ctx, singleClients[i]) }(i) @@ -158,7 +140,7 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) } // ensure managed resources - if err = r.ensureClusterObjects(ctx, instance); err != nil { + if err = r.ensureConditionalClusterObjects(ctx, instance); err != nil { return r.updateStatusOnErr(ctx, instance, fmt.Errorf("cannot create Cluster auxiliary objects: %w", err)) } @@ -210,8 +192,8 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) return r.updateStatus(ctx, instance) } -// ensureClusterObjects creates or updates all objects owned by cluster CR -func (r *EtcdClusterReconciler) ensureClusterObjects( +// ensureConditionalClusterObjects creates or updates all objects owned by cluster CR +func (r *EtcdClusterReconciler) ensureConditionalClusterObjects( ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) error { if err := factory.CreateOrUpdateClusterStateConfigMap(ctx, cluster, r.Client); err != nil { @@ -220,30 +202,12 @@ func (r *EtcdClusterReconciler) ensureClusterObjects( } log.Debug(ctx, "cluster state configmap reconciled") - if err := factory.CreateOrUpdateHeadlessService(ctx, cluster, r.Client); err != nil { - log.Error(ctx, err, "reconcile headless service failed") - return err - } - log.Debug(ctx, "headless service reconciled") - if err := factory.CreateOrUpdateStatefulSet(ctx, cluster, r.Client); err != nil { log.Error(ctx, err, "reconcile statefulset failed") return err } log.Debug(ctx, "statefulset reconciled") - if err := factory.CreateOrUpdateClientService(ctx, cluster, r.Client); err != nil { - log.Error(ctx, err, "reconcile client service failed") - return err - } - log.Debug(ctx, "client service reconciled") - - if err := factory.CreateOrUpdatePdb(ctx, cluster, r.Client); err != nil { - log.Error(ctx, err, "reconcile pdb failed") - return err - } - log.Debug(ctx, "pdb reconciled") - return nil } @@ -570,3 +534,57 @@ func (r *EtcdClusterReconciler) disableAuth(ctx context.Context, authClient clie return nil } + +// ensureUnconditionalObjects creates the two services and the PDB +// which can be created at the start of the reconciliation loop +// without any risk of disrupting the etcd cluster +func (r *EtcdClusterReconciler) ensureUnconditionalObjects(ctx context.Context, instance *etcdaenixiov1alpha1.EtcdCluster) error { + const concurrentOperations = 3 + c := make(chan error) + defer close(c) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + var wg sync.WaitGroup + wg.Add(concurrentOperations) + wrapWithMsg := func(err error, msg string) error { + if err != nil { + return fmt.Errorf(msg+": %w", err) + } + return nil + } + go func(chan<- error) { + defer wg.Done() + select { + case <-ctx.Done(): + case c <- wrapWithMsg(factory.CreateOrUpdateClientService(ctx, instance, r.Client), + "couldn't ensure client service"): + } + }(c) + go func(chan<- error) { + defer wg.Done() + select { + case <-ctx.Done(): + case c <- wrapWithMsg(factory.CreateOrUpdateHeadlessService(ctx, instance, r.Client), + "couldn't ensure headless service"): + } + }(c) + go func(chan<- error) { + defer wg.Done() + select { + case <-ctx.Done(): + case c <- wrapWithMsg(factory.CreateOrUpdatePdb(ctx, instance, r.Client), + "couldn't ensure pod disruption budget"): + } + }(c) + + for i := 0; i < concurrentOperations; i++ { + if err := <-c; err != nil { + cancel() + + // let all goroutines select the ctx.Done() case to avoid races on closed channels + wg.Wait() + return err + } + } + return nil +} diff --git a/internal/controller/factory/etcd_client.go b/internal/controller/factory/etcd_client.go index f10f295..4725171 100644 --- a/internal/controller/factory/etcd_client.go +++ b/internal/controller/factory/etcd_client.go @@ -11,8 +11,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) -func NewEtcdClientSet(ctx context.Context, cluster *v1alpha1.EtcdCluster, client client.Client) (*clientv3.Client, []*clientv3.Client, error) { - cfg, err := configFromCluster(ctx, cluster, client) +func NewEtcdClientSet(ctx context.Context, cluster *v1alpha1.EtcdCluster, cli client.Client) (*clientv3.Client, []*clientv3.Client, error) { + cfg, err := configFromCluster(ctx, cluster, cli) if err != nil { return nil, nil, err } @@ -35,12 +35,16 @@ func NewEtcdClientSet(ctx context.Context, cluster *v1alpha1.EtcdCluster, client return clusterClient, singleClients, nil } -func configFromCluster(ctx context.Context, cluster *v1alpha1.EtcdCluster, client client.Client) (clientv3.Config, error) { +func configFromCluster(ctx context.Context, cluster *v1alpha1.EtcdCluster, cli client.Client) (clientv3.Config, error) { ep := v1.Endpoints{} - err := client.Get(ctx, types.NamespacedName{Name: GetHeadlessServiceName(cluster), Namespace: cluster.Namespace}, &ep) - if err != nil { + err := cli.Get(ctx, types.NamespacedName{Name: GetHeadlessServiceName(cluster), Namespace: cluster.Namespace}, &ep) + if client.IgnoreNotFound(err) != nil { return clientv3.Config{}, err } + if err != nil { + return clientv3.Config{Endpoints: []string{}}, nil + } + names := map[string]struct{}{} urls := make([]string, 0, 8) for _, v := range ep.Subsets { diff --git a/internal/controller/observables.go b/internal/controller/observables.go index 8c9a2b3..e79b35e 100644 --- a/internal/controller/observables.go +++ b/internal/controller/observables.go @@ -6,6 +6,7 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" ) // etcdStatus holds the details of the status that an etcd endpoint @@ -27,6 +28,7 @@ type observables struct { etcdStatuses []etcdStatus clusterID uint64 endpointsReached int + pvcs []corev1.PersistentVolumeClaim } // setClusterID populates the clusterID field based on etcdStatuses @@ -56,11 +58,23 @@ func (o *observables) inSplitbrain() bool { // with the endpoint's status and its perceived member list. func (s *etcdStatus) fill(ctx context.Context, c *clientv3.Client) { var wg sync.WaitGroup + wg.Add(1) go func() { - wg.Add(1) defer wg.Done() s.endpointStatus, s.endpointStatusError = c.Status(ctx, c.Endpoints()[0]) }() s.memberList, s.memberListError = c.MemberList(ctx) wg.Wait() } + +// TODO: make a real function +func (o *observables) desiredReplicas() int { + if o.etcdStatuses != nil { + for i := range o.etcdStatuses { + if o.etcdStatuses[i].memberList != nil { + return len(o.etcdStatuses[i].memberList.Members) + } + } + } + return 0 +} From 384893cd1258583a75a76492110e41ee29631de8 Mon Sep 17 00:00:00 2001 From: Timofei Larkin Date: Tue, 25 Jun 2024 15:52:49 +0300 Subject: [PATCH 4/5] allow time for async things to finish in e2e tests --- internal/controller/etcdcluster_controller.go | 1 + internal/controller/observables.go | 16 ++++----- test/e2e/e2e_test.go | 34 +++++++++++++++++-- 3 files changed, 41 insertions(+), 10 deletions(-) diff --git a/internal/controller/etcdcluster_controller.go b/internal/controller/etcdcluster_controller.go index b086cec..981a3c3 100644 --- a/internal/controller/etcdcluster_controller.go +++ b/internal/controller/etcdcluster_controller.go @@ -111,6 +111,7 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) if !state.endpointsFound { if !state.stsExists { // TODO: happy path for new cluster creation + log.Debug(ctx, "happy path for new cluster creation (not yet implemented)") } } diff --git a/internal/controller/observables.go b/internal/controller/observables.go index e79b35e..adbaba3 100644 --- a/internal/controller/observables.go +++ b/internal/controller/observables.go @@ -22,13 +22,13 @@ type etcdStatus struct { // observables stores observations that the operator can make about // states of objects in kubernetes type observables struct { - statefulSet appsv1.StatefulSet - stsExists bool - endpointsFound bool - etcdStatuses []etcdStatus - clusterID uint64 - endpointsReached int - pvcs []corev1.PersistentVolumeClaim + statefulSet appsv1.StatefulSet + stsExists bool + endpointsFound bool + etcdStatuses []etcdStatus + clusterID uint64 + _ int + _ []corev1.PersistentVolumeClaim } // setClusterID populates the clusterID field based on etcdStatuses @@ -68,7 +68,7 @@ func (s *etcdStatus) fill(ctx context.Context, c *clientv3.Client) { } // TODO: make a real function -func (o *observables) desiredReplicas() int { +func (o *observables) _() int { if o.etcdStatuses != nil { for i := range o.etcdStatuses { if o.etcdStatuses[i].memberList != nil { diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index aadc826..9b18844 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -21,6 +21,7 @@ import ( "os" "os/exec" "sync" + "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -95,6 +96,15 @@ var _ = Describe("etcd-operator", Ordered, func() { ExpectWithOffset(1, err).NotTo(HaveOccurred()) }) + Eventually(func() error { + cmd := exec.Command("kubectl", "get", + "statefulset/test", + "--namespace", namespace, + ) + _, err = utils.Run(cmd) + return err + }, time.Second*20, time.Second*2).Should(Succeed()) + By("wait for statefulset is ready", func() { cmd := exec.Command("kubectl", "wait", "statefulset/test", @@ -144,6 +154,15 @@ var _ = Describe("etcd-operator", Ordered, func() { ExpectWithOffset(1, err).NotTo(HaveOccurred()) }) + Eventually(func() error { + cmd := exec.Command("kubectl", "get", + "statefulset/test", + "--namespace", namespace, + ) + _, err = utils.Run(cmd) + return err + }, time.Second*20, time.Second*2).Should(Succeed()) + By("wait for statefulset is ready", func() { cmd := exec.Command("kubectl", "wait", "statefulset/test", @@ -192,6 +211,15 @@ var _ = Describe("etcd-operator", Ordered, func() { ExpectWithOffset(1, err).NotTo(HaveOccurred()) }) + Eventually(func() error { + cmd := exec.Command("kubectl", "get", + "statefulset/test", + "--namespace", namespace, + ) + _, err = utils.Run(cmd) + return err + }, time.Second*20, time.Second*2).Should(Succeed()) + By("wait for statefulset is ready", func() { cmd := exec.Command("kubectl", "wait", "statefulset/test", @@ -217,8 +245,10 @@ var _ = Describe("etcd-operator", Ordered, func() { auth := clientv3.NewAuth(client) By("check root role is created", func() { - _, err = auth.RoleGet(ctx, "root") - Expect(err).NotTo(HaveOccurred()) + Eventually(func() error { + _, err = auth.RoleGet(ctx, "root") + return err + }, time.Second*20, time.Second*2).Should(Succeed()) }) By("check root user is created and has root role", func() { From f81648b50b6ee16c17bf37438321b628aa32b66f Mon Sep 17 00:00:00 2001 From: Timofei Larkin Date: Tue, 25 Jun 2024 18:27:31 +0300 Subject: [PATCH 5/5] Add splitbrain condition, make tests terser --- api/v1alpha1/etcdcluster_types.go | 3 ++ internal/controller/etcdcluster_controller.go | 8 +++- test/e2e/e2e_test.go | 39 +++---------------- 3 files changed, 16 insertions(+), 34 deletions(-) diff --git a/api/v1alpha1/etcdcluster_types.go b/api/v1alpha1/etcdcluster_types.go index 71cf65a..bd7ef6d 100644 --- a/api/v1alpha1/etcdcluster_types.go +++ b/api/v1alpha1/etcdcluster_types.go @@ -55,6 +55,7 @@ type EtcdClusterSpec struct { const ( EtcdConditionInitialized = "Initialized" EtcdConditionReady = "Ready" + EtcdConditionError = "Error" ) type EtcdCondType string @@ -66,6 +67,7 @@ const ( EtcdCondTypeWaitingForFirstQuorum EtcdCondType = "WaitingForFirstQuorum" EtcdCondTypeStatefulSetReady EtcdCondType = "StatefulSetReady" EtcdCondTypeStatefulSetNotReady EtcdCondType = "StatefulSetNotReady" + EtcdCondTypeSplitbrain EtcdCondType = "Splitbrain" ) const ( @@ -74,6 +76,7 @@ const ( EtcdReadyCondNegMessage EtcdCondMessage = "Cluster StatefulSet is not Ready" EtcdReadyCondPosMessage EtcdCondMessage = "Cluster StatefulSet is Ready" EtcdReadyCondNegWaitingForQuorum EtcdCondMessage = "Waiting for first quorum to be established" + EtcdErrorCondSplitbrainMessage EtcdCondMessage = "Etcd endpoints reporting more than one unique cluster ID" ) // EtcdClusterStatus defines the observed state of EtcdCluster diff --git a/internal/controller/etcdcluster_controller.go b/internal/controller/etcdcluster_controller.go index 981a3c3..87aeab6 100644 --- a/internal/controller/etcdcluster_controller.go +++ b/internal/controller/etcdcluster_controller.go @@ -133,7 +133,13 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) state.setClusterID() if state.inSplitbrain() { log.Error(ctx, fmt.Errorf("etcd cluster in splitbrain"), "etcd cluster in splitbrain, dropping from reconciliation queue") - return ctrl.Result{}, nil + factory.SetCondition(instance, factory.NewCondition(etcdaenixiov1alpha1.EtcdConditionError). + WithStatus(true). + WithReason(string(etcdaenixiov1alpha1.EtcdCondTypeSplitbrain)). + WithMessage(string(etcdaenixiov1alpha1.EtcdErrorCondSplitbrainMessage)). + Complete(), + ) + return r.updateStatus(ctx, instance) } // fill conditions if len(instance.Status.Conditions) == 0 { diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 9b18844..8e5578f 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -97,15 +97,6 @@ var _ = Describe("etcd-operator", Ordered, func() { }) Eventually(func() error { - cmd := exec.Command("kubectl", "get", - "statefulset/test", - "--namespace", namespace, - ) - _, err = utils.Run(cmd) - return err - }, time.Second*20, time.Second*2).Should(Succeed()) - - By("wait for statefulset is ready", func() { cmd := exec.Command("kubectl", "wait", "statefulset/test", "--for", "jsonpath={.status.readyReplicas}=3", @@ -113,8 +104,8 @@ var _ = Describe("etcd-operator", Ordered, func() { "--timeout", "5m", ) _, err = utils.Run(cmd) - ExpectWithOffset(1, err).NotTo(HaveOccurred()) - }) + return err + }, time.Second*20, time.Second*2).Should(Succeed(), "wait for statefulset is ready") client, err := utils.GetEtcdClient(ctx, client.ObjectKey{Namespace: namespace, Name: "test"}) Expect(err).NotTo(HaveOccurred()) @@ -155,15 +146,6 @@ var _ = Describe("etcd-operator", Ordered, func() { }) Eventually(func() error { - cmd := exec.Command("kubectl", "get", - "statefulset/test", - "--namespace", namespace, - ) - _, err = utils.Run(cmd) - return err - }, time.Second*20, time.Second*2).Should(Succeed()) - - By("wait for statefulset is ready", func() { cmd := exec.Command("kubectl", "wait", "statefulset/test", "--for", "jsonpath={.status.readyReplicas}=3", @@ -171,8 +153,8 @@ var _ = Describe("etcd-operator", Ordered, func() { "--timeout", "5m", ) _, err = utils.Run(cmd) - ExpectWithOffset(1, err).NotTo(HaveOccurred()) - }) + return err + }, time.Second*20, time.Second*2).Should(Succeed(), "wait for statefulset is ready") client, err := utils.GetEtcdClient(ctx, client.ObjectKey{Namespace: namespace, Name: "test"}) Expect(err).NotTo(HaveOccurred()) @@ -212,15 +194,6 @@ var _ = Describe("etcd-operator", Ordered, func() { }) Eventually(func() error { - cmd := exec.Command("kubectl", "get", - "statefulset/test", - "--namespace", namespace, - ) - _, err = utils.Run(cmd) - return err - }, time.Second*20, time.Second*2).Should(Succeed()) - - By("wait for statefulset is ready", func() { cmd := exec.Command("kubectl", "wait", "statefulset/test", "--for", "jsonpath={.status.availableReplicas}=3", @@ -228,8 +201,8 @@ var _ = Describe("etcd-operator", Ordered, func() { "--timeout", "5m", ) _, err = utils.Run(cmd) - ExpectWithOffset(1, err).NotTo(HaveOccurred()) - }) + return err + }, time.Second*20, time.Second*2).Should(Succeed(), "wait for statefulset is ready") client, err := utils.GetEtcdClient(ctx, client.ObjectKey{Namespace: namespace, Name: "test"}) Expect(err).NotTo(HaveOccurred())