Skip to content

Commit

Permalink
Extended status check for reconciliation
Browse files Browse the repository at this point in the history
  • Loading branch information
lllamnyp committed May 13, 2024
1 parent e509604 commit 3f7ae6c
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 0 deletions.
52 changes: 52 additions & 0 deletions internal/controller/etcdcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type EtcdClusterReconciler struct {
// +kubebuilder:rbac:groups=etcd.aenix.io,resources=etcdclusters,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=etcd.aenix.io,resources=etcdclusters/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=etcd.aenix.io,resources=etcdclusters/finalizers,verbs=update
// +kubebuilder:rbac:groups="",resources=endpoints,verbs=get;list;watch
// +kubebuilder:rbac:groups="",resources=configmaps,verbs=get;list;watch;create;update;watch;delete;patch
// +kubebuilder:rbac:groups="",resources=services,verbs=get;create;delete;update;patch;list;watch
// +kubebuilder:rbac:groups="apps",resources=statefulsets,verbs=get;create;delete;update;patch;list;watch
Expand All @@ -72,6 +73,57 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return reconcile.Result{}, nil
}

sts := appsv1.StatefulSet{}
// create two services and the pdb, try fetching the sts
{
c := make(chan error)
go func(chan<- error) {
err := factory.CreateOrUpdateClientService(ctx, instance, r.Client)
if err != nil {
err = fmt.Errorf("couldn't ensure client service: %w", err)
}
c <- err
}(c)
go func(chan<- error) {
err := factory.CreateOrUpdateHeadlessService(ctx, instance, r.Client)
if err != nil {
err = fmt.Errorf("couldn't ensure headless service: %w", err)
}
c <- err
}(c)
go func(chan<- error) {
err := factory.CreateOrUpdatePdb(ctx, instance, r.Client)
if err != nil {
err = fmt.Errorf("couldn't ensure pod disruption budget: %w", err)
}
c <- err
}(c)
go func(chan<- error) {
err := r.Get(ctx, req.NamespacedName, &sts)
if client.IgnoreNotFound(err) != nil {
err = fmt.Errorf("couldn't get statefulset: %w", err)
}
c <- err
}(c)
for i := 0; i < 4; i++ {
if err := <-c; err != nil {
return ctrl.Result{}, err
}
}
}
/*
clusterClient, singleClients, err := factory.NewEtcdClientSet(ctx, instance, r.Client)
if err != nil {
return ctrl.Result{}, err
}
if clusterClient == nil || singleClients == nil {
// TODO: no endpoints case
}
if sts.UID != "" {
r.Patch()
}
*/
// fill conditions
if len(instance.Status.Conditions) == 0 {
factory.FillConditions(instance)
Expand Down
59 changes: 59 additions & 0 deletions internal/controller/factory/etcd_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package factory

import (
"context"
"fmt"

"github.com/aenix-io/etcd-operator/api/v1alpha1"
clientv3 "go.etcd.io/etcd/client/v3"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func NewEtcdClientSet(ctx context.Context, cluster *v1alpha1.EtcdCluster, client client.Client) (*clientv3.Client, []*clientv3.Client, error) {
cfg, err := configFromCluster(ctx, cluster, client)
if err != nil {
return nil, nil, err
}
if len(cfg.Endpoints) == 0 {
return nil, nil, nil
}
eps := cfg.Endpoints
clusterClient, err := clientv3.New(cfg)
if err != nil {
return nil, nil, fmt.Errorf("error building etcd cluster client: %w", err)
}
singleClients := make([]*clientv3.Client, len(eps))
for i, ep := range eps {
cfg.Endpoints = []string{ep}
singleClients[i], err = clientv3.New(cfg)
if err != nil {
return nil, nil, fmt.Errorf("error building etcd single-endpoint client for endpoint %s: %w", ep, err)
}
}
return clusterClient, singleClients, nil
}

func configFromCluster(ctx context.Context, cluster *v1alpha1.EtcdCluster, client client.Client) (clientv3.Config, error) {
ep := v1.Endpoints{}
err := client.Get(ctx, types.NamespacedName{Name: GetHeadlessServiceName(cluster), Namespace: cluster.Namespace}, &ep)
if err != nil {
return clientv3.Config{}, err
}
names := map[string]struct{}{}
urls := make([]string, 0, 8)
for _, v := range ep.Subsets {
for _, addr := range v.Addresses {
names[addr.Hostname] = struct{}{}
}
for _, addr := range v.NotReadyAddresses {
names[addr.Hostname] = struct{}{}
}
}
for name := range names {
urls = append(urls, fmt.Sprintf("%s:%s", name, "2379"))
}

return clientv3.Config{Endpoints: urls}, nil
}

0 comments on commit 3f7ae6c

Please sign in to comment.