diff --git a/internal/controller/etcdcluster_controller.go b/internal/controller/etcdcluster_controller.go index d3834494..1f6ce78a 100644 --- a/internal/controller/etcdcluster_controller.go +++ b/internal/controller/etcdcluster_controller.go @@ -48,6 +48,7 @@ type EtcdClusterReconciler struct { // +kubebuilder:rbac:groups=etcd.aenix.io,resources=etcdclusters,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=etcd.aenix.io,resources=etcdclusters/status,verbs=get;update;patch // +kubebuilder:rbac:groups=etcd.aenix.io,resources=etcdclusters/finalizers,verbs=update +// +kubebuilder:rbac:groups="",resources=endpoints,verbs=get;list;watch // +kubebuilder:rbac:groups="",resources=configmaps,verbs=get;list;watch;create;update;watch;delete;patch // +kubebuilder:rbac:groups="",resources=services,verbs=get;create;delete;update;patch;list;watch // +kubebuilder:rbac:groups="apps",resources=statefulsets,verbs=get;create;delete;update;patch;list;watch @@ -72,6 +73,57 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) return reconcile.Result{}, nil } + sts := appsv1.StatefulSet{} + // create two services and the pdb, try fetching the sts + { + c := make(chan error) + go func(chan<- error) { + err := factory.CreateOrUpdateClientService(ctx, instance, r.Client) + if err != nil { + err = fmt.Errorf("couldn't ensure client service: %w", err) + } + c <- err + }(c) + go func(chan<- error) { + err := factory.CreateOrUpdateHeadlessService(ctx, instance, r.Client) + if err != nil { + err = fmt.Errorf("couldn't ensure headless service: %w", err) + } + c <- err + }(c) + go func(chan<- error) { + err := factory.CreateOrUpdatePdb(ctx, instance, r.Client) + if err != nil { + err = fmt.Errorf("couldn't ensure pod disruption budget: %w", err) + } + c <- err + }(c) + go func(chan<- error) { + err := r.Get(ctx, req.NamespacedName, &sts) + if client.IgnoreNotFound(err) != nil { + err = fmt.Errorf("couldn't get statefulset: %w", err) + } + c <- err + }(c) + for i := 0; i < 4; i++ { + if err := <-c; err != nil { + return ctrl.Result{}, err + } + } + } + /* + clusterClient, singleClients, err := factory.NewEtcdClientSet(ctx, instance, r.Client) + if err != nil { + return ctrl.Result{}, err + } + if clusterClient == nil || singleClients == nil { + // TODO: no endpoints case + + } + if sts.UID != "" { + r.Patch() + } + */ // fill conditions if len(instance.Status.Conditions) == 0 { factory.FillConditions(instance) diff --git a/internal/controller/factory/etcd_client.go b/internal/controller/factory/etcd_client.go new file mode 100644 index 00000000..f10f295a --- /dev/null +++ b/internal/controller/factory/etcd_client.go @@ -0,0 +1,59 @@ +package factory + +import ( + "context" + "fmt" + + "github.com/aenix-io/etcd-operator/api/v1alpha1" + clientv3 "go.etcd.io/etcd/client/v3" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func NewEtcdClientSet(ctx context.Context, cluster *v1alpha1.EtcdCluster, client client.Client) (*clientv3.Client, []*clientv3.Client, error) { + cfg, err := configFromCluster(ctx, cluster, client) + if err != nil { + return nil, nil, err + } + if len(cfg.Endpoints) == 0 { + return nil, nil, nil + } + eps := cfg.Endpoints + clusterClient, err := clientv3.New(cfg) + if err != nil { + return nil, nil, fmt.Errorf("error building etcd cluster client: %w", err) + } + singleClients := make([]*clientv3.Client, len(eps)) + for i, ep := range eps { + cfg.Endpoints = []string{ep} + singleClients[i], err = clientv3.New(cfg) + if err != nil { + return nil, nil, fmt.Errorf("error building etcd single-endpoint client for endpoint %s: %w", ep, err) + } + } + return clusterClient, singleClients, nil +} + +func configFromCluster(ctx context.Context, cluster *v1alpha1.EtcdCluster, client client.Client) (clientv3.Config, error) { + ep := v1.Endpoints{} + err := client.Get(ctx, types.NamespacedName{Name: GetHeadlessServiceName(cluster), Namespace: cluster.Namespace}, &ep) + if err != nil { + return clientv3.Config{}, err + } + names := map[string]struct{}{} + urls := make([]string, 0, 8) + for _, v := range ep.Subsets { + for _, addr := range v.Addresses { + names[addr.Hostname] = struct{}{} + } + for _, addr := range v.NotReadyAddresses { + names[addr.Hostname] = struct{}{} + } + } + for name := range names { + urls = append(urls, fmt.Sprintf("%s:%s", name, "2379")) + } + + return clientv3.Config{Endpoints: urls}, nil +}