From 4b22a707bbd2003d77b8374f757ed6919de415fd Mon Sep 17 00:00:00 2001 From: Timofei Larkin Date: Mon, 12 Aug 2024 23:10:02 +0300 Subject: [PATCH] Add remaining changes to PR This commit includes all changes for feature #181 that have not been split up into small stacked PRs. It should not be merged and will later be undone and split into smaller logical chunks of work. --- internal/controller/etcdcluster_controller.go | 64 +++++++++- internal/controller/factory/pvc.go | 18 +++ internal/controller/factory/statefulset.go | 25 ++-- internal/controller/observables.go | 111 ++++++++++++++++-- 4 files changed, 200 insertions(+), 18 deletions(-) diff --git a/internal/controller/etcdcluster_controller.go b/internal/controller/etcdcluster_controller.go index f4ec6ec..f398c02 100644 --- a/internal/controller/etcdcluster_controller.go +++ b/internal/controller/etcdcluster_controller.go @@ -91,6 +91,7 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) } state := observables{} + state.instance = instance // create two services and the pdb err = r.ensureUnconditionalObjects(ctx, instance) @@ -112,11 +113,40 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) } state.endpointsFound = clusterClient != nil && singleClients != nil + if clusterClient != nil { + state.endpoints = clusterClient.Endpoints() + } + + // fetch PVCs + state.pvcs, err = factory.PVCs(ctx, instance, r.Client) + if err != nil { + return ctrl.Result{}, err + } + if !state.endpointsFound { if !state.stsExists { - // TODO: happy path for new cluster creation - log.Debug(ctx, "happy path for new cluster creation (not yet implemented)") + return r.createClusterFromScratch(ctx, &state) // TODO: needs implementing } + // else try reconciling the sts + existingSts := state.statefulSet.DeepCopy() + desiredSts := factory.TemplateStatefulSet() // TODO: needs implementing + existingSts.Spec.Template.Spec = desiredSts.Spec.Template.Spec + err := r.patchOrCreateObject(ctx, existingSts) + if err != nil { + return ctrl.Result{}, err + } + state.statefulSet = *existingSts + if existingSts.Status.ReadyReplicas != *existingSts.Spec.Replicas { // TODO: this check might not be the best to check for a ready sts + return ctrl.Result{}, fmt.Errorf("waiting for statefulset to become ready") + } + if *existingSts.Spec.Replicas > 0 { + return ctrl.Result{}, fmt.Errorf("reached an impossible state (no endpoints, but active pods)") + } + if *instance.Spec.Replicas == 0 { + // cluster successfully scaled down to zero + return ctrl.Result{}, nil + } + return r.scaleUpFromZero(ctx, &state) // TODO: needs implementing } // get status of every endpoint and member list from every endpoint @@ -661,3 +691,33 @@ func (r *EtcdClusterReconciler) ensureUnconditionalObjects(ctx context.Context, } return nil } + +func (r *EtcdClusterReconciler) patchOrCreateObject(ctx context.Context, obj client.Object) error { + err := r.Patch(ctx, obj, client.Apply, &client.PatchOptions{FieldManager: "etcd-operator"}, client.ForceOwnership) + if err == nil { + return nil + } + if client.IgnoreNotFound(err) == nil { + err = r.Create(ctx, obj) + } + return err +} + +// TODO! +func (r *EtcdClusterReconciler) createClusterFromScratch(ctx context.Context, state *observables) (ctrl.Result, error) { + cm := factory.TemplateClusterStateConfigMap(state.instance, "new", state.desiredReplicas()) + err := ctrl.SetControllerReference(state.instance, cm, r.Scheme) + if err != nil { + return ctrl.Result{}, err + } + err = r.patchOrCreateObject(ctx, cm) + if err != nil { + return ctrl.Result{}, err + } + panic("not yet implemented") +} + +// TODO! +func (r *EtcdClusterReconciler) scaleUpFromZero(ctx context.Context, state *observables) (ctrl.Result, error) { + panic("not yet implemented") +} diff --git a/internal/controller/factory/pvc.go b/internal/controller/factory/pvc.go index 1b49d2c..60d2d51 100644 --- a/internal/controller/factory/pvc.go +++ b/internal/controller/factory/pvc.go @@ -30,6 +30,14 @@ import ( "k8s.io/apimachinery/pkg/types" ) +func PVCLabels(cluster *etcdaenixiov1alpha1.EtcdCluster) map[string]string { + labels := PodLabels(cluster) + for key, value := range cluster.Spec.Storage.VolumeClaimTemplate.Labels { + labels[key] = value + } + return labels +} + func GetPVCName(cluster *etcdaenixiov1alpha1.EtcdCluster) string { if len(cluster.Spec.Storage.VolumeClaimTemplate.Name) > 0 { return cluster.Spec.Storage.VolumeClaimTemplate.Name @@ -38,6 +46,16 @@ func GetPVCName(cluster *etcdaenixiov1alpha1.EtcdCluster) string { return "data" } +func PVCs(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster, cli client.Client) ([]corev1.PersistentVolumeClaim, error) { + labels := PVCLabels(cluster) + pvcs := corev1.PersistentVolumeClaimList{} + err := cli.List(ctx, &pvcs, client.MatchingLabels(labels)) + if err != nil { + return nil, err + } + return pvcs.Items, nil +} + // UpdatePersistentVolumeClaims checks and updates the sizes of PVCs in an EtcdCluster if the specified storage size is larger than the current. func UpdatePersistentVolumeClaims(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster, rclient client.Client) error { labelSelector := labels.SelectorFromSet(labels.Set{ diff --git a/internal/controller/factory/statefulset.go b/internal/controller/factory/statefulset.go index e8d4836..202419b 100644 --- a/internal/controller/factory/statefulset.go +++ b/internal/controller/factory/statefulset.go @@ -41,19 +41,30 @@ const ( defaultBackendQuotaBytesFraction = 0.95 ) +// TODO! +func TemplateStatefulSet() *appsv1.StatefulSet { + panic("not yet implemented") +} + +func PodLabels(cluster *etcdaenixiov1alpha1.EtcdCluster) map[string]string { + labels := NewLabelsBuilder().WithName().WithInstance(cluster.Name).WithManagedBy() + + if cluster.Spec.PodTemplate.Labels != nil { + for key, value := range cluster.Spec.PodTemplate.Labels { + labels[key] = value + } + } + + return labels +} + func CreateOrUpdateStatefulSet( ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster, rclient client.Client, ) error { podMetadata := metav1.ObjectMeta{ - Labels: NewLabelsBuilder().WithName().WithInstance(cluster.Name).WithManagedBy(), - } - - if cluster.Spec.PodTemplate.Labels != nil { - for key, value := range cluster.Spec.PodTemplate.Labels { - podMetadata.Labels[key] = value - } + Labels: PodLabels(cluster), } if cluster.Spec.PodTemplate.Annotations != nil { diff --git a/internal/controller/observables.go b/internal/controller/observables.go index adbaba3..f1af8e1 100644 --- a/internal/controller/observables.go +++ b/internal/controller/observables.go @@ -2,8 +2,12 @@ package controller import ( "context" + "strconv" + "strings" "sync" + "github.com/aenix-io/etcd-operator/api/v1alpha1" + "github.com/aenix-io/etcd-operator/pkg/set" clientv3 "go.etcd.io/etcd/client/v3" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -22,13 +26,14 @@ type etcdStatus struct { // observables stores observations that the operator can make about // states of objects in kubernetes type observables struct { + instance *v1alpha1.EtcdCluster statefulSet appsv1.StatefulSet stsExists bool + endpoints []string endpointsFound bool etcdStatuses []etcdStatus clusterID uint64 - _ int - _ []corev1.PersistentVolumeClaim + pvcs []corev1.PersistentVolumeClaim } // setClusterID populates the clusterID field based on etcdStatuses @@ -43,15 +48,43 @@ func (o *observables) setClusterID() { // inSplitbrain compares clusterID field with clusterIDs in etcdStatuses. // If more than one unique ID is reported, cluster is in splitbrain. +// Also if members have different opinions on the list of members, this is +// also a splitbrain. func (o *observables) inSplitbrain() bool { + return o.clusterIDsAllEqual() && o.memberListsAllEqual() +} + +func (o *observables) clusterIDsAllEqual() bool { + ids := set.New[uint64]() for i := range o.etcdStatuses { if o.etcdStatuses[i].endpointStatus != nil { - if o.clusterID != o.etcdStatuses[i].endpointStatus.Header.ClusterId { - return true + ids.Add(o.etcdStatuses[i].endpointStatus.Header.ClusterId) + } + } + return len(ids) <= 1 +} + +func (o *observables) memberListsAllEqual() bool { + type m struct { + Name string + ID uint64 + } + memberLists := make([]set.Set[m], 0, len(o.etcdStatuses)) + for i := range o.etcdStatuses { + if o.etcdStatuses[i].memberList != nil { + memberSet := set.New[m]() + for _, member := range o.etcdStatuses[i].memberList.Members { + memberSet.Add(m{member.Name, member.ID}) } + memberLists = append(memberLists, memberSet) + } + } + for i := range memberLists { + if !memberLists[0].Equals(memberLists[i]) { + return false } } - return false + return true } // fill takes a single-endpoint client and populates the fields of etcdStatus @@ -67,14 +100,74 @@ func (s *etcdStatus) fill(ctx context.Context, c *clientv3.Client) { wg.Wait() } -// TODO: make a real function -func (o *observables) _() int { +func (o *observables) pvcMaxIndex() (max int) { + max = -1 + for i := range o.pvcs { + tokens := strings.Split(o.pvcs[i].Name, "-") + index, err := strconv.Atoi(tokens[len(tokens)-1]) + if err != nil { + continue + } + if index > max { + max = index + } + } + return max +} + +func (o *observables) endpointMaxIndex() (max int) { + for i := range o.endpoints { + tokens := strings.Split(o.endpoints[i], ":") + if len(tokens) < 2 { + continue + } + tokens = strings.Split(tokens[len(tokens)-2], "-") + index, err := strconv.Atoi(tokens[len(tokens)-1]) + if err != nil { + continue + } + if index > max { + max = index + } + } + return max +} + +// TODO: make a real function to determine the right number of replicas. +// Hint: if ClientURL in the member list is absent, the member has not yet +// started, but if the name field is populated, this is a member of the +// initial cluster. If the name field is empty, this member has just been +// added with etcdctl member add (or equivalent API call). +func (o *observables) desiredReplicas() (max int) { + max = -1 if o.etcdStatuses != nil { for i := range o.etcdStatuses { if o.etcdStatuses[i].memberList != nil { - return len(o.etcdStatuses[i].memberList.Members) + for j := range o.etcdStatuses[i].memberList.Members { + tokens := strings.Split(o.etcdStatuses[i].memberList.Members[j].Name, "-") + index, err := strconv.Atoi(tokens[len(tokens)-1]) + if err != nil { + continue + } + if index > max { + max = index + } + } } } } - return 0 + if max > -1 { + return max + 1 + } + + if epMax := o.endpointMaxIndex(); epMax > max { + max = epMax + } + if pvcMax := o.pvcMaxIndex(); pvcMax > max { + max = pvcMax + } + if max == -1 { + return int(*o.instance.Spec.Replicas) + } + return max + 1 }