From 2de065500023057f1221bd3ed74348818eaa7134 Mon Sep 17 00:00:00 2001 From: Rory Z <16801068+Rory-Z@users.noreply.github.com> Date: Wed, 10 Apr 2024 15:48:04 +0800 Subject: [PATCH] fix: fix emqx cluster can not ready Signed-off-by: Rory Z <16801068+Rory-Z@users.noreply.github.com> --- .github/workflows/cts.yaml | 2 +- .github/workflows/deploy.yaml | 2 +- controllers/apps/v2beta1/add_headless_svc.go | 61 ++++++++++++++++++++ controllers/apps/v2beta1/add_svc.go | 38 +----------- controllers/apps/v2beta1/emqx_controller.go | 1 + controllers/apps/v2beta1/status_machine.go | 4 +- 6 files changed, 67 insertions(+), 41 deletions(-) create mode 100644 controllers/apps/v2beta1/add_headless_svc.go diff --git a/.github/workflows/cts.yaml b/.github/workflows/cts.yaml index 7cff44d31..5e6282630 100644 --- a/.github/workflows/cts.yaml +++ b/.github/workflows/cts.yaml @@ -135,7 +135,7 @@ jobs: repository: ghcr.io/${{ github.repository }} tag: ${{ github.ref_name }} - name: Deploy emqx - timeout-minutes: 10 + timeout-minutes: 5 uses: ./.github/actions/deploy-emqx with: kind: ${{ matrix.emqx[0] }} diff --git a/.github/workflows/deploy.yaml b/.github/workflows/deploy.yaml index 0f0ecc7ef..9305561a5 100644 --- a/.github/workflows/deploy.yaml +++ b/.github/workflows/deploy.yaml @@ -69,7 +69,7 @@ jobs: timeout-minutes: 5 run: kubectl wait --for=condition=Ready pods -l "control-plane=controller-manager" -n emqx-operator-system - name: Deployment emqx - timeout-minutes: 10 + timeout-minutes: 5 uses: ./.github/actions/deploy-emqx with: kind: ${{ matrix.emqx[0] }} diff --git a/controllers/apps/v2beta1/add_headless_svc.go b/controllers/apps/v2beta1/add_headless_svc.go new file mode 100644 index 000000000..ec92994e2 --- /dev/null +++ b/controllers/apps/v2beta1/add_headless_svc.go @@ -0,0 +1,61 @@ +package v2beta1 + +import ( + "context" + + emperror "emperror.dev/errors" + appsv2beta1 "github.com/emqx/emqx-operator/apis/apps/v2beta1" + innerReq "github.com/emqx/emqx-operator/internal/requester" + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type addHeadlessSvc struct { + *EMQXReconciler +} + +func (a *addHeadlessSvc) reconcile(ctx context.Context, logger logr.Logger, instance *appsv2beta1.EMQX, _ innerReq.RequesterInterface) subResult { + if err := a.CreateOrUpdateList(ctx, a.Scheme, logger, instance, []client.Object{generateHeadlessService(instance)}); err != nil { + return subResult{err: emperror.Wrap(err, "failed to create or update services")} + } + return subResult{} +} + +func generateHeadlessService(instance *appsv2beta1.EMQX) *corev1.Service { + headlessSvc := &corev1.Service{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Service", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: instance.Namespace, + Name: instance.HeadlessServiceNamespacedName().Name, + Labels: appsv2beta1.CloneAndMergeMap(appsv2beta1.DefaultLabels(instance), instance.Labels), + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + ClusterIP: corev1.ClusterIPNone, + SessionAffinity: corev1.ServiceAffinityNone, + PublishNotReadyAddresses: true, + Selector: appsv2beta1.DefaultCoreLabels(instance), + Ports: []corev1.ServicePort{ + { + Name: "erlang-dist", + Port: 4370, + Protocol: corev1.ProtocolTCP, + TargetPort: intstr.FromInt(4370), + }, + { + Name: "gen-rpc", + Port: 5369, + Protocol: corev1.ProtocolTCP, + TargetPort: intstr.FromInt(5369), + }, + }, + }, + } + return headlessSvc +} diff --git a/controllers/apps/v2beta1/add_svc.go b/controllers/apps/v2beta1/add_svc.go index 60d10d6de..df604de7c 100644 --- a/controllers/apps/v2beta1/add_svc.go +++ b/controllers/apps/v2beta1/add_svc.go @@ -32,7 +32,7 @@ func (a *addSvc) reconcile(ctx context.Context, logger logr.Logger, instance *ap return subResult{err: emperror.Wrap(err, "failed to get emqx configs by api")} } - resources := []client.Object{generateHeadlessService(instance)} + resources := []client.Object{} if dashboard := generateDashboardService(instance, configStr); dashboard != nil { resources = append(resources, dashboard) } @@ -61,42 +61,6 @@ func (a *addSvc) getEMQXConfigsByAPI(r innerReq.RequesterInterface) (string, err return string(body), nil } -func generateHeadlessService(instance *appsv2beta1.EMQX) *corev1.Service { - headlessSvc := &corev1.Service{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "v1", - Kind: "Service", - }, - ObjectMeta: metav1.ObjectMeta{ - Namespace: instance.Namespace, - Name: instance.HeadlessServiceNamespacedName().Name, - Labels: appsv2beta1.CloneAndMergeMap(appsv2beta1.DefaultLabels(instance), instance.Labels), - }, - Spec: corev1.ServiceSpec{ - Type: corev1.ServiceTypeClusterIP, - ClusterIP: corev1.ClusterIPNone, - SessionAffinity: corev1.ServiceAffinityNone, - PublishNotReadyAddresses: true, - Selector: appsv2beta1.DefaultCoreLabels(instance), - Ports: []corev1.ServicePort{ - { - Name: "erlang-dist", - Port: 4370, - Protocol: corev1.ProtocolTCP, - TargetPort: intstr.FromInt(4370), - }, - { - Name: "gen-rpc", - Port: 5369, - Protocol: corev1.ProtocolTCP, - TargetPort: intstr.FromInt(5369), - }, - }, - }, - } - return headlessSvc -} - func generateDashboardService(instance *appsv2beta1.EMQX, configStr string) *corev1.Service { svc := &corev1.Service{} if instance.Spec.DashboardServiceTemplate != nil { diff --git a/controllers/apps/v2beta1/emqx_controller.go b/controllers/apps/v2beta1/emqx_controller.go index a66cfa420..3cdf3864a 100644 --- a/controllers/apps/v2beta1/emqx_controller.go +++ b/controllers/apps/v2beta1/emqx_controller.go @@ -123,6 +123,7 @@ func (r *EMQXReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl. &addBootstrap{r}, &updatePodConditions{r}, &updateStatus{r}, + &addHeadlessSvc{r}, &addCore{r}, &addRepl{r}, &addPdb{r}, diff --git a/controllers/apps/v2beta1/status_machine.go b/controllers/apps/v2beta1/status_machine.go index c4c9b63fe..de4a091c7 100644 --- a/controllers/apps/v2beta1/status_machine.go +++ b/controllers/apps/v2beta1/status_machine.go @@ -139,7 +139,7 @@ func (s *coreNodesProgressingStatus) nextStatus(ctx context.Context) { emqx := s.emqxStatusMachine.GetEMQX() updateSts, _, _ := getStateFulSetList(ctx, s.emqxStatusMachine.client, emqx) - if updateSts != nil && updateSts.Status.ReadyReplicas != 0 && updateSts.Status.ReadyReplicas == emqx.Status.CoreNodesStatus.Replicas { + if updateSts != nil && updateSts.Status.ReadyReplicas != 0 && updateSts.Status.ReadyReplicas == *emqx.Spec.CoreTemplate.Spec.Replicas { emqx.Status.SetCondition(metav1.Condition{ Type: appsv2beta1.CoreNodesReady, Status: metav1.ConditionTrue, @@ -191,7 +191,7 @@ func (s *replicantNodesProgressingStatus) nextStatus(ctx context.Context) { } updateRs, _, _ := getReplicaSetList(ctx, s.emqxStatusMachine.client, emqx) - if updateRs != nil && updateRs.Status.ReadyReplicas != 0 && updateRs.Status.ReadyReplicas == emqx.Status.ReplicantNodesStatus.Replicas { + if updateRs != nil && updateRs.Status.ReadyReplicas != 0 && updateRs.Status.ReadyReplicas == *emqx.Spec.ReplicantTemplate.Spec.Replicas { emqx.Status.SetCondition(metav1.Condition{ Type: appsv2beta1.ReplicantNodesReady, Status: metav1.ConditionTrue,