From 8c846491fb1d603db52fc210973530fb2a6272ce Mon Sep 17 00:00:00 2001 From: Huabing Zhao Date: Fri, 22 Nov 2024 07:49:41 +0000 Subject: [PATCH 01/21] decoup gateway status update Signed-off-by: Huabing Zhao --- internal/provider/kubernetes/predicates.go | 6 +++--- internal/provider/kubernetes/predicates_test.go | 9 ++++++--- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/internal/provider/kubernetes/predicates.go b/internal/provider/kubernetes/predicates.go index d25ec2fb7d4..e135f16b476 100644 --- a/internal/provider/kubernetes/predicates.go +++ b/internal/provider/kubernetes/predicates.go @@ -294,7 +294,7 @@ func (r *gatewayAPIReconciler) validateServiceForReconcile(obj client.Object) bo // Check if the Service belongs to a Gateway, if so, update the Gateway status. gtw := r.findOwningGateway(ctx, labels) if gtw != nil { - r.updateStatusForGateway(ctx, gtw) + r.resources.GatewayStatuses.Store(utils.NamespacedName(gtw), >w.Status) return false } @@ -528,7 +528,7 @@ func (r *gatewayAPIReconciler) validateObjectForReconcile(obj client.Object) boo // Check if the obj belongs to a Gateway, if so, update the Gateway status. gtw := r.findOwningGateway(ctx, labels) if gtw != nil { - r.updateStatusForGateway(ctx, gtw) + r.resources.GatewayStatuses.Store(utils.NamespacedName(gtw), >w.Status) return false } } @@ -636,7 +636,7 @@ func (r *gatewayAPIReconciler) updateStatusForGatewaysUnderGatewayClass(ctx cont } for _, gateway := range gateways.Items { - r.updateStatusForGateway(ctx, &gateway) + r.resources.GatewayStatuses.Store(utils.NamespacedName(&gateway), &gateway.Status) } return nil diff --git a/internal/provider/kubernetes/predicates_test.go b/internal/provider/kubernetes/predicates_test.go index d8abf845f4d..cdb77f78854 100644 --- a/internal/provider/kubernetes/predicates_test.go +++ b/internal/provider/kubernetes/predicates_test.go @@ -26,6 +26,7 @@ import ( "github.com/envoyproxy/gateway/internal/gatewayapi/resource" "github.com/envoyproxy/gateway/internal/infrastructure/kubernetes/proxy" "github.com/envoyproxy/gateway/internal/logging" + "github.com/envoyproxy/gateway/internal/message" "github.com/envoyproxy/gateway/internal/provider/kubernetes/test" ) @@ -851,9 +852,10 @@ func TestValidateServiceForReconcile(t *testing.T) { logger := logging.DefaultLogger(egv1a1.LogLevelInfo) r := gatewayAPIReconciler{ - classController: egv1a1.GatewayControllerName, - log: logger, - mergeGateways: sets.New[string]("test-mg"), + classController: egv1a1.GatewayControllerName, + log: logger, + mergeGateways: sets.New[string]("test-mg"), + resources: &message.ProviderResources{}, grpcRouteCRDExists: true, tcpRouteCRDExists: true, udpRouteCRDExists: true, @@ -972,6 +974,7 @@ func TestValidateObjectForReconcile(t *testing.T) { classController: egv1a1.GatewayControllerName, log: logger, mergeGateways: sets.New[string]("test-mg"), + resources: &message.ProviderResources{}, } for _, tc := range testCases { From 8c9d33e04ac44e37c781bd609cf321a8f5fdfba3 Mon Sep 17 00:00:00 2001 From: Huabing Zhao Date: Fri, 22 Nov 2024 09:23:04 +0000 Subject: [PATCH 02/21] decoup gatewayclass status update Signed-off-by: Huabing Zhao --- internal/message/types.go | 13 ++-- internal/provider/kubernetes/controller.go | 20 +++--- .../provider/kubernetes/predicates_test.go | 10 +-- internal/provider/kubernetes/status.go | 61 +++++++++---------- 4 files changed, 53 insertions(+), 51 deletions(-) diff --git a/internal/message/types.go b/internal/message/types.go index 3e3923e6cb2..2eee7f90345 100644 --- a/internal/message/types.go +++ b/internal/message/types.go @@ -75,12 +75,13 @@ func (p *ProviderResources) Close() { // GatewayAPIStatuses contains gateway API resources statuses type GatewayAPIStatuses struct { - GatewayStatuses watchable.Map[types.NamespacedName, *gwapiv1.GatewayStatus] - HTTPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1.HTTPRouteStatus] - GRPCRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1.GRPCRouteStatus] - TLSRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.TLSRouteStatus] - TCPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.TCPRouteStatus] - UDPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.UDPRouteStatus] + GatewayClassStatuses watchable.Map[types.NamespacedName, *gwapiv1.GatewayClassStatus] + GatewayStatuses watchable.Map[types.NamespacedName, *gwapiv1.GatewayStatus] + HTTPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1.HTTPRouteStatus] + GRPCRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1.GRPCRouteStatus] + TLSRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.TLSRouteStatus] + TCPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.TCPRouteStatus] + UDPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.UDPRouteStatus] } func (s *GatewayAPIStatuses) Close() { diff --git a/internal/provider/kubernetes/controller.go b/internal/provider/kubernetes/controller.go index 28a0eafaa77..249871aa04e 100644 --- a/internal/provider/kubernetes/controller.go +++ b/internal/provider/kubernetes/controller.go @@ -199,9 +199,12 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, _ reconcile.Reques if managedGC.Spec.ParametersRef != nil && managedGC.DeletionTimestamp == nil { if err := r.processGatewayClassParamsRef(ctx, managedGC, resourceMappings, gwcResource); err != nil { msg := fmt.Sprintf("%s: %v", status.MsgGatewayClassInvalidParams, err) - if err := r.updateStatusForGatewayClass(ctx, managedGC, false, string(gwapiv1.GatewayClassReasonInvalidParameters), msg); err != nil { - r.log.Error(err, "unable to update GatewayClass status") - } + gc := status.SetGatewayClassAccepted( + managedGC.DeepCopy(), + false, + string(gwapiv1.GatewayClassReasonInvalidParameters), + msg) + r.resources.GatewayClassStatuses.Store(utils.NamespacedName(gc), &gc.Status) r.log.Error(err, "failed to process parametersRef for gatewayclass", "name", managedGC.Name) return reconcile.Result{}, err } @@ -293,11 +296,12 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, _ reconcile.Reques // process envoy gateway secret refs r.processEnvoyProxySecretRef(ctx, gwcResource) - - if err := r.updateStatusForGatewayClass(ctx, managedGC, true, string(gwapiv1.GatewayClassReasonAccepted), status.MsgValidGatewayClass); err != nil { - r.log.Error(err, "unable to update GatewayClass status") - return reconcile.Result{}, err - } + gc := status.SetGatewayClassAccepted( + managedGC.DeepCopy(), + true, + string(gwapiv1.GatewayClassReasonAccepted), + status.MsgValidGatewayClass) + r.resources.GatewayClassStatuses.Store(utils.NamespacedName(gc), &gc.Status) if len(gwcResource.Gateways) == 0 { r.log.Info("No gateways found for accepted gatewayclass") diff --git a/internal/provider/kubernetes/predicates_test.go b/internal/provider/kubernetes/predicates_test.go index cdb77f78854..8ff155f46f4 100644 --- a/internal/provider/kubernetes/predicates_test.go +++ b/internal/provider/kubernetes/predicates_test.go @@ -852,10 +852,10 @@ func TestValidateServiceForReconcile(t *testing.T) { logger := logging.DefaultLogger(egv1a1.LogLevelInfo) r := gatewayAPIReconciler{ - classController: egv1a1.GatewayControllerName, - log: logger, - mergeGateways: sets.New[string]("test-mg"), - resources: &message.ProviderResources{}, + classController: egv1a1.GatewayControllerName, + log: logger, + mergeGateways: sets.New[string]("test-mg"), + resources: &message.ProviderResources{}, grpcRouteCRDExists: true, tcpRouteCRDExists: true, udpRouteCRDExists: true, @@ -974,7 +974,7 @@ func TestValidateObjectForReconcile(t *testing.T) { classController: egv1a1.GatewayControllerName, log: logger, mergeGateways: sets.New[string]("test-mg"), - resources: &message.ProviderResources{}, + resources: &message.ProviderResources{}, } for _, tc := range testCases { diff --git a/internal/provider/kubernetes/status.go b/internal/provider/kubernetes/status.go index a59eb82f75a..d9ff03f9b66 100644 --- a/internal/provider/kubernetes/status.go +++ b/internal/provider/kubernetes/status.go @@ -10,7 +10,6 @@ import ( "fmt" "reflect" - kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" @@ -28,6 +27,35 @@ import ( // subscribeAndUpdateStatus subscribes to gateway API object status updates and // writes it into the Kubernetes API Server. func (r *gatewayAPIReconciler) subscribeAndUpdateStatus(ctx context.Context, extensionManagerEnabled bool) { + // GatewayClass object status updater + go func() { + message.HandleSubscription( + message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: "gatewayclass-status"}, + r.resources.GatewayClassStatuses.Subscribe(ctx), + func(update message.Update[types.NamespacedName, *gwapiv1.GatewayClassStatus], errChan chan error) { + // skip delete updates. + if update.Delete { + return + } + + r.statusUpdater.Send(Update{ + NamespacedName: update.Key, + Resource: new(gwapiv1.GatewayClass), + Mutator: MutatorFunc(func(obj client.Object) client.Object { + gc, ok := obj.(*gwapiv1.GatewayClass) + if !ok { + panic(fmt.Sprintf("unsupported object type %T", obj)) + } + gcCopy := gc.DeepCopy() + gcCopy.Status = *update.Value + return gcCopy + }), + }) + }, + ) + r.log.Info("gatewayclass status subscriber shutting down") + }() + // Gateway object status updater go func() { message.HandleSubscription( @@ -564,34 +592,3 @@ func (r *gatewayAPIReconciler) updateStatusForGateway(ctx context.Context, gtw * }), }) } - -func (r *gatewayAPIReconciler) updateStatusForGatewayClass( - ctx context.Context, - gc *gwapiv1.GatewayClass, - accepted bool, - reason, - msg string, -) error { - if r.statusUpdater != nil { - r.statusUpdater.Send(Update{ - NamespacedName: types.NamespacedName{Name: gc.Name}, - Resource: &gwapiv1.GatewayClass{}, - Mutator: MutatorFunc(func(obj client.Object) client.Object { - gc, ok := obj.(*gwapiv1.GatewayClass) - if !ok { - panic(fmt.Sprintf("unsupported object type %T", obj)) - } - - return status.SetGatewayClassAccepted(gc.DeepCopy(), accepted, reason, msg) - }), - }) - } else { - // this branch makes testing easier by not going through the status.Updater. - duplicate := status.SetGatewayClassAccepted(gc.DeepCopy(), accepted, reason, msg) - - if err := r.client.Status().Update(ctx, duplicate); err != nil && !kerrors.IsNotFound(err) { - return fmt.Errorf("error updating status of gatewayclass %s: %w", duplicate.Name, err) - } - } - return nil -} From fc97316bbe29faf4433398dd918e5805c27140a2 Mon Sep 17 00:00:00 2001 From: Huabing Zhao Date: Sat, 23 Nov 2024 07:11:03 +0000 Subject: [PATCH 03/21] fix test Signed-off-by: Huabing Zhao --- internal/gatewayapi/runner/runner.go | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/internal/gatewayapi/runner/runner.go b/internal/gatewayapi/runner/runner.go index 62975892918..aa07954a026 100644 --- a/internal/gatewayapi/runner/runner.go +++ b/internal/gatewayapi/runner/runner.go @@ -369,7 +369,6 @@ type StatusesToDelete struct { func (r *Runner) getAllStatuses() *StatusesToDelete { // Maps storing status keys to be deleted ds := &StatusesToDelete{ - GatewayStatusKeys: make(map[types.NamespacedName]bool), HTTPRouteStatusKeys: make(map[types.NamespacedName]bool), GRPCRouteStatusKeys: make(map[types.NamespacedName]bool), TLSRouteStatusKeys: make(map[types.NamespacedName]bool), @@ -387,9 +386,9 @@ func (r *Runner) getAllStatuses() *StatusesToDelete { } // Get current status keys - for key := range r.ProviderResources.GatewayStatuses.LoadAll() { - ds.GatewayStatusKeys[key] = true - } + // Do not delete the status keys for the Gateway because the Gateway status is also stored in the provider runner + // to update the address and workload status. + // TODO: zhaohuabing move all the status handling to Gateway API translator to avoid this. for key := range r.ProviderResources.HTTPRouteStatuses.LoadAll() { ds.HTTPRouteStatusKeys[key] = true } @@ -428,10 +427,6 @@ func (r *Runner) getAllStatuses() *StatusesToDelete { } func (r *Runner) deleteStatusKeys(ds *StatusesToDelete) { - for key := range ds.GatewayStatusKeys { - r.ProviderResources.GatewayStatuses.Delete(key) - delete(ds.GatewayStatusKeys, key) - } for key := range ds.HTTPRouteStatusKeys { r.ProviderResources.HTTPRouteStatuses.Delete(key) delete(ds.HTTPRouteStatusKeys, key) From bccc70c67e7630d643c720552e43d99d20d81577 Mon Sep 17 00:00:00 2001 From: Huabing Zhao Date: Sat, 23 Nov 2024 07:16:15 +0000 Subject: [PATCH 04/21] add comment Signed-off-by: Huabing Zhao --- internal/gatewayapi/runner/runner.go | 8 +++++--- internal/provider/kubernetes/predicates.go | 6 ++++++ 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/internal/gatewayapi/runner/runner.go b/internal/gatewayapi/runner/runner.go index aa07954a026..2f79fe86d6a 100644 --- a/internal/gatewayapi/runner/runner.go +++ b/internal/gatewayapi/runner/runner.go @@ -386,9 +386,11 @@ func (r *Runner) getAllStatuses() *StatusesToDelete { } // Get current status keys - // Do not delete the status keys for the Gateway because the Gateway status is also stored in the provider runner - // to update the address and workload status. - // TODO: zhaohuabing move all the status handling to Gateway API translator to avoid this. + // Do not delete the status keys for the Gateway because the Gateway status has also been stored into the ProviderResources + // by the kubernetes provider to update the address and workload status. + // + // TODO: zhaohuabing It's not a big issue as the Gateway status typically does not occupy a lot of memory. + // but it's better to move all the status handling to Gateway API translator layer to avoid this. for key := range r.ProviderResources.HTTPRouteStatuses.LoadAll() { ds.HTTPRouteStatusKeys[key] = true } diff --git a/internal/provider/kubernetes/predicates.go b/internal/provider/kubernetes/predicates.go index e135f16b476..79a15a01c15 100644 --- a/internal/provider/kubernetes/predicates.go +++ b/internal/provider/kubernetes/predicates.go @@ -294,6 +294,9 @@ func (r *gatewayAPIReconciler) validateServiceForReconcile(obj client.Object) bo // Check if the Service belongs to a Gateway, if so, update the Gateway status. gtw := r.findOwningGateway(ctx, labels) if gtw != nil { + // Trigger a status update for the Gateway. + // The status updater will check the service to get the addresses of the Gateway, + // and check the Deployment/DaemonSet to get the status of the Gateway workload. r.resources.GatewayStatuses.Store(utils.NamespacedName(gtw), >w.Status) return false } @@ -528,6 +531,9 @@ func (r *gatewayAPIReconciler) validateObjectForReconcile(obj client.Object) boo // Check if the obj belongs to a Gateway, if so, update the Gateway status. gtw := r.findOwningGateway(ctx, labels) if gtw != nil { + // Trigger a status update for the Gateway. + // The status updater will check the service to get the addresses of the Gateway, + // and check the Deployment/DaemonSet to get the status of the Gateway workload. r.resources.GatewayStatuses.Store(utils.NamespacedName(gtw), >w.Status) return false } From 7a4c51e4f865cd0f30f19fe1973e6a15d60663bf Mon Sep 17 00:00:00 2001 From: Huabing Zhao Date: Sat, 23 Nov 2024 07:33:35 +0000 Subject: [PATCH 05/21] fix test Signed-off-by: Huabing Zhao --- internal/gatewayapi/runner/runner.go | 14 +++++++++----- internal/gatewayapi/runner/runner_test.go | 2 -- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/internal/gatewayapi/runner/runner.go b/internal/gatewayapi/runner/runner.go index 2f79fe86d6a..cab5220da50 100644 --- a/internal/gatewayapi/runner/runner.go +++ b/internal/gatewayapi/runner/runner.go @@ -369,6 +369,7 @@ type StatusesToDelete struct { func (r *Runner) getAllStatuses() *StatusesToDelete { // Maps storing status keys to be deleted ds := &StatusesToDelete{ + GatewayStatusKeys: make(map[types.NamespacedName]bool), HTTPRouteStatusKeys: make(map[types.NamespacedName]bool), GRPCRouteStatusKeys: make(map[types.NamespacedName]bool), TLSRouteStatusKeys: make(map[types.NamespacedName]bool), @@ -386,11 +387,9 @@ func (r *Runner) getAllStatuses() *StatusesToDelete { } // Get current status keys - // Do not delete the status keys for the Gateway because the Gateway status has also been stored into the ProviderResources - // by the kubernetes provider to update the address and workload status. - // - // TODO: zhaohuabing It's not a big issue as the Gateway status typically does not occupy a lot of memory. - // but it's better to move all the status handling to Gateway API translator layer to avoid this. + for key := range r.ProviderResources.GatewayStatuses.LoadAll() { + ds.GatewayStatusKeys[key] = true + } for key := range r.ProviderResources.HTTPRouteStatuses.LoadAll() { ds.HTTPRouteStatusKeys[key] = true } @@ -429,6 +428,11 @@ func (r *Runner) getAllStatuses() *StatusesToDelete { } func (r *Runner) deleteStatusKeys(ds *StatusesToDelete) { + // Do not delete the status keys for the Gateway because the Gateway status has also been stored into the ProviderResources + // by the kubernetes provider to update the address and workload status. + // + // TODO: zhaohuabing this is acceptable as the Gateway status typically does not occupy a lot of memory, + // but it's better to move all the status handling to Gateway API translator layer to avoid this. for key := range ds.HTTPRouteStatusKeys { r.ProviderResources.HTTPRouteStatuses.Delete(key) delete(ds.HTTPRouteStatusKeys, key) diff --git a/internal/gatewayapi/runner/runner_test.go b/internal/gatewayapi/runner/runner_test.go index 58515da7e16..2b5c54fd12e 100644 --- a/internal/gatewayapi/runner/runner_test.go +++ b/internal/gatewayapi/runner/runner_test.go @@ -203,7 +203,6 @@ func TestDeleteStatusKeys(t *testing.T) { delete(ds.UDPRouteStatusKeys, keys[6]) r.deleteStatusKeys(ds) - require.Equal(t, 0, r.ProviderResources.GatewayStatuses.Len()) require.Equal(t, 0, r.ProviderResources.HTTPRouteStatuses.Len()) require.Equal(t, 0, r.ProviderResources.GRPCRouteStatuses.Len()) require.Equal(t, 0, r.ProviderResources.TLSRouteStatuses.Len()) @@ -278,7 +277,6 @@ func TestDeleteAllStatusKeys(t *testing.T) { // Checks that the keys are successfully stored to DeletableStatus and watchable maps ds := r.getAllStatuses() - require.True(t, ds.GatewayStatusKeys[keys[0]]) require.True(t, ds.HTTPRouteStatusKeys[keys[1]]) require.True(t, ds.GRPCRouteStatusKeys[keys[2]]) require.True(t, ds.TLSRouteStatusKeys[keys[3]]) From e406088d7ddfc55d3e4956013243118a9d76ff5c Mon Sep 17 00:00:00 2001 From: Huabing Zhao Date: Sat, 23 Nov 2024 11:47:33 +0000 Subject: [PATCH 06/21] fix test Signed-off-by: Huabing Zhao --- internal/provider/kubernetes/predicates.go | 26 ++++++++++++++-------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/internal/provider/kubernetes/predicates.go b/internal/provider/kubernetes/predicates.go index 79a15a01c15..82fcfd328ef 100644 --- a/internal/provider/kubernetes/predicates.go +++ b/internal/provider/kubernetes/predicates.go @@ -294,10 +294,7 @@ func (r *gatewayAPIReconciler) validateServiceForReconcile(obj client.Object) bo // Check if the Service belongs to a Gateway, if so, update the Gateway status. gtw := r.findOwningGateway(ctx, labels) if gtw != nil { - // Trigger a status update for the Gateway. - // The status updater will check the service to get the addresses of the Gateway, - // and check the Deployment/DaemonSet to get the status of the Gateway workload. - r.resources.GatewayStatuses.Store(utils.NamespacedName(gtw), >w.Status) + r.updateGatewayStatus(gtw) return false } @@ -531,10 +528,7 @@ func (r *gatewayAPIReconciler) validateObjectForReconcile(obj client.Object) boo // Check if the obj belongs to a Gateway, if so, update the Gateway status. gtw := r.findOwningGateway(ctx, labels) if gtw != nil { - // Trigger a status update for the Gateway. - // The status updater will check the service to get the addresses of the Gateway, - // and check the Deployment/DaemonSet to get the status of the Gateway workload. - r.resources.GatewayStatuses.Store(utils.NamespacedName(gtw), >w.Status) + r.updateGatewayStatus(gtw) return false } } @@ -642,12 +636,26 @@ func (r *gatewayAPIReconciler) updateStatusForGatewaysUnderGatewayClass(ctx cont } for _, gateway := range gateways.Items { - r.resources.GatewayStatuses.Store(utils.NamespacedName(&gateway), &gateway.Status) + r.updateGatewayStatus(&gateway) } return nil } +// updateGatewayStatus triggers a status update for the Gateway. +func (r *gatewayAPIReconciler) updateGatewayStatus(gateway *gwapiv1.Gateway) { + // The status added to GatewayStatuses is solely used to trigger the status updater + // and does not reflect the real changed status. + // + // The status updater will check the Envoy Proxy service to get the addresses of the Gateway, + // and check the Envoy Proxy Deployment/DaemonSet to get the status of the Gateway workload. + // + // Since the status does not reflect the actual changed status, we need to delete it first + // to prevent it from being considered unchanged. This ensures that subscribers receive the update event. + r.resources.GatewayStatuses.Delete(utils.NamespacedName(gateway)) + r.resources.GatewayStatuses.Store(utils.NamespacedName(gateway), &gateway.Status) +} + func (r *gatewayAPIReconciler) handleNode(obj client.Object) bool { ctx := context.Background() node, ok := obj.(*corev1.Node) From 163cf5cac83d3b95bbdb18837d6b3980533cec74 Mon Sep 17 00:00:00 2001 From: Huabing Zhao Date: Tue, 26 Nov 2024 03:02:07 +0000 Subject: [PATCH 07/21] revert gateway api runner Signed-off-by: Huabing Zhao --- internal/gatewayapi/runner/runner.go | 9 ++++----- internal/gatewayapi/runner/runner_test.go | 2 ++ 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/internal/gatewayapi/runner/runner.go b/internal/gatewayapi/runner/runner.go index cab5220da50..62975892918 100644 --- a/internal/gatewayapi/runner/runner.go +++ b/internal/gatewayapi/runner/runner.go @@ -428,11 +428,10 @@ func (r *Runner) getAllStatuses() *StatusesToDelete { } func (r *Runner) deleteStatusKeys(ds *StatusesToDelete) { - // Do not delete the status keys for the Gateway because the Gateway status has also been stored into the ProviderResources - // by the kubernetes provider to update the address and workload status. - // - // TODO: zhaohuabing this is acceptable as the Gateway status typically does not occupy a lot of memory, - // but it's better to move all the status handling to Gateway API translator layer to avoid this. + for key := range ds.GatewayStatusKeys { + r.ProviderResources.GatewayStatuses.Delete(key) + delete(ds.GatewayStatusKeys, key) + } for key := range ds.HTTPRouteStatusKeys { r.ProviderResources.HTTPRouteStatuses.Delete(key) delete(ds.HTTPRouteStatusKeys, key) diff --git a/internal/gatewayapi/runner/runner_test.go b/internal/gatewayapi/runner/runner_test.go index 2b5c54fd12e..58515da7e16 100644 --- a/internal/gatewayapi/runner/runner_test.go +++ b/internal/gatewayapi/runner/runner_test.go @@ -203,6 +203,7 @@ func TestDeleteStatusKeys(t *testing.T) { delete(ds.UDPRouteStatusKeys, keys[6]) r.deleteStatusKeys(ds) + require.Equal(t, 0, r.ProviderResources.GatewayStatuses.Len()) require.Equal(t, 0, r.ProviderResources.HTTPRouteStatuses.Len()) require.Equal(t, 0, r.ProviderResources.GRPCRouteStatuses.Len()) require.Equal(t, 0, r.ProviderResources.TLSRouteStatuses.Len()) @@ -277,6 +278,7 @@ func TestDeleteAllStatusKeys(t *testing.T) { // Checks that the keys are successfully stored to DeletableStatus and watchable maps ds := r.getAllStatuses() + require.True(t, ds.GatewayStatusKeys[keys[0]]) require.True(t, ds.HTTPRouteStatusKeys[keys[1]]) require.True(t, ds.GRPCRouteStatusKeys[keys[2]]) require.True(t, ds.TLSRouteStatusKeys[keys[3]]) From bf3d07ea76c681afab9dbfe16c702475c9b39a67 Mon Sep 17 00:00:00 2001 From: Huabing Zhao Date: Tue, 26 Nov 2024 03:36:40 +0000 Subject: [PATCH 08/21] update address and programming status Signed-off-by: Huabing Zhao --- internal/provider/kubernetes/predicates.go | 18 +++------- internal/provider/kubernetes/status.go | 40 +++++++++++++--------- 2 files changed, 28 insertions(+), 30 deletions(-) diff --git a/internal/provider/kubernetes/predicates.go b/internal/provider/kubernetes/predicates.go index 82fcfd328ef..c80477d7788 100644 --- a/internal/provider/kubernetes/predicates.go +++ b/internal/provider/kubernetes/predicates.go @@ -294,7 +294,7 @@ func (r *gatewayAPIReconciler) validateServiceForReconcile(obj client.Object) bo // Check if the Service belongs to a Gateway, if so, update the Gateway status. gtw := r.findOwningGateway(ctx, labels) if gtw != nil { - r.updateGatewayStatus(gtw) + r.updateGatewayStatus(ctx, gtw) return false } @@ -528,7 +528,7 @@ func (r *gatewayAPIReconciler) validateObjectForReconcile(obj client.Object) boo // Check if the obj belongs to a Gateway, if so, update the Gateway status. gtw := r.findOwningGateway(ctx, labels) if gtw != nil { - r.updateGatewayStatus(gtw) + r.updateGatewayStatus(ctx, gtw) return false } } @@ -636,23 +636,15 @@ func (r *gatewayAPIReconciler) updateStatusForGatewaysUnderGatewayClass(ctx cont } for _, gateway := range gateways.Items { - r.updateGatewayStatus(&gateway) + r.updateGatewayStatus(ctx, &gateway) } return nil } // updateGatewayStatus triggers a status update for the Gateway. -func (r *gatewayAPIReconciler) updateGatewayStatus(gateway *gwapiv1.Gateway) { - // The status added to GatewayStatuses is solely used to trigger the status updater - // and does not reflect the real changed status. - // - // The status updater will check the Envoy Proxy service to get the addresses of the Gateway, - // and check the Envoy Proxy Deployment/DaemonSet to get the status of the Gateway workload. - // - // Since the status does not reflect the actual changed status, we need to delete it first - // to prevent it from being considered unchanged. This ensures that subscribers receive the update event. - r.resources.GatewayStatuses.Delete(utils.NamespacedName(gateway)) +func (r *gatewayAPIReconciler) updateGatewayStatus(ctx context.Context, gateway *gwapiv1.Gateway) { + r.updateGatewayAddressAndProgrammingStatus(ctx, gateway) r.resources.GatewayStatuses.Store(utils.NamespacedName(gateway), &gateway.Status) } diff --git a/internal/provider/kubernetes/status.go b/internal/provider/kubernetes/status.go index d9ff03f9b66..82888582508 100644 --- a/internal/provider/kubernetes/status.go +++ b/internal/provider/kubernetes/status.go @@ -555,23 +555,7 @@ func (r *gatewayAPIReconciler) updateStatusForGateway(ctx context.Context, gtw * return } - // Get envoyObjects - envoyObj, err := r.envoyObjectForGateway(ctx, gtw) - if err != nil { - r.log.Info("failed to get Deployment for gateway", - "namespace", gtw.Namespace, "name", gtw.Name) - } - - // Get service - svc, err := r.envoyServiceForGateway(ctx, gtw) - if err != nil { - r.log.Info("failed to get Service for gateway", - "namespace", gtw.Namespace, "name", gtw.Name) - } - // update accepted condition - status.UpdateGatewayStatusAcceptedCondition(gtw, true) - // update address field and programmed condition - status.UpdateGatewayStatusProgrammedCondition(gtw, svc, envoyObj, r.store.listNodeAddresses()...) + r.updateGatewayAddressAndProgrammingStatus(ctx, gtw) key := utils.NamespacedName(gtw) @@ -592,3 +576,25 @@ func (r *gatewayAPIReconciler) updateStatusForGateway(ctx context.Context, gtw * }), }) } + +// updateGatewayAddressAndProgrammingStatus updates the Gateway status with the envoy service address and the envoy +// deployment or daemonset programming status. +func (r *gatewayAPIReconciler) updateGatewayAddressAndProgrammingStatus(ctx context.Context, gtw *gwapiv1.Gateway) { + // Get envoyObjects + envoyObj, err := r.envoyObjectForGateway(ctx, gtw) + if err != nil { + r.log.Info("failed to get Deployment for gateway", + "namespace", gtw.Namespace, "name", gtw.Name) + } + + // Get service + svc, err := r.envoyServiceForGateway(ctx, gtw) + if err != nil { + r.log.Info("failed to get Service for gateway", + "namespace", gtw.Namespace, "name", gtw.Name) + } + // update accepted condition + status.UpdateGatewayStatusAcceptedCondition(gtw, true) + // update address field and programmed condition + status.UpdateGatewayStatusProgrammedCondition(gtw, svc, envoyObj, r.store.listNodeAddresses()...) +} From c9931592764438f6f88b86d8034fe273cc15d0d6 Mon Sep 17 00:00:00 2001 From: Huabing Zhao Date: Tue, 26 Nov 2024 03:51:10 +0000 Subject: [PATCH 09/21] Revert "update address and programming status" This reverts commit bf3d07ea76c681afab9dbfe16c702475c9b39a67. --- internal/provider/kubernetes/predicates.go | 18 +++++++--- internal/provider/kubernetes/status.go | 40 +++++++++------------- 2 files changed, 30 insertions(+), 28 deletions(-) diff --git a/internal/provider/kubernetes/predicates.go b/internal/provider/kubernetes/predicates.go index c80477d7788..82fcfd328ef 100644 --- a/internal/provider/kubernetes/predicates.go +++ b/internal/provider/kubernetes/predicates.go @@ -294,7 +294,7 @@ func (r *gatewayAPIReconciler) validateServiceForReconcile(obj client.Object) bo // Check if the Service belongs to a Gateway, if so, update the Gateway status. gtw := r.findOwningGateway(ctx, labels) if gtw != nil { - r.updateGatewayStatus(ctx, gtw) + r.updateGatewayStatus(gtw) return false } @@ -528,7 +528,7 @@ func (r *gatewayAPIReconciler) validateObjectForReconcile(obj client.Object) boo // Check if the obj belongs to a Gateway, if so, update the Gateway status. gtw := r.findOwningGateway(ctx, labels) if gtw != nil { - r.updateGatewayStatus(ctx, gtw) + r.updateGatewayStatus(gtw) return false } } @@ -636,15 +636,23 @@ func (r *gatewayAPIReconciler) updateStatusForGatewaysUnderGatewayClass(ctx cont } for _, gateway := range gateways.Items { - r.updateGatewayStatus(ctx, &gateway) + r.updateGatewayStatus(&gateway) } return nil } // updateGatewayStatus triggers a status update for the Gateway. -func (r *gatewayAPIReconciler) updateGatewayStatus(ctx context.Context, gateway *gwapiv1.Gateway) { - r.updateGatewayAddressAndProgrammingStatus(ctx, gateway) +func (r *gatewayAPIReconciler) updateGatewayStatus(gateway *gwapiv1.Gateway) { + // The status added to GatewayStatuses is solely used to trigger the status updater + // and does not reflect the real changed status. + // + // The status updater will check the Envoy Proxy service to get the addresses of the Gateway, + // and check the Envoy Proxy Deployment/DaemonSet to get the status of the Gateway workload. + // + // Since the status does not reflect the actual changed status, we need to delete it first + // to prevent it from being considered unchanged. This ensures that subscribers receive the update event. + r.resources.GatewayStatuses.Delete(utils.NamespacedName(gateway)) r.resources.GatewayStatuses.Store(utils.NamespacedName(gateway), &gateway.Status) } diff --git a/internal/provider/kubernetes/status.go b/internal/provider/kubernetes/status.go index 82888582508..d9ff03f9b66 100644 --- a/internal/provider/kubernetes/status.go +++ b/internal/provider/kubernetes/status.go @@ -555,7 +555,23 @@ func (r *gatewayAPIReconciler) updateStatusForGateway(ctx context.Context, gtw * return } - r.updateGatewayAddressAndProgrammingStatus(ctx, gtw) + // Get envoyObjects + envoyObj, err := r.envoyObjectForGateway(ctx, gtw) + if err != nil { + r.log.Info("failed to get Deployment for gateway", + "namespace", gtw.Namespace, "name", gtw.Name) + } + + // Get service + svc, err := r.envoyServiceForGateway(ctx, gtw) + if err != nil { + r.log.Info("failed to get Service for gateway", + "namespace", gtw.Namespace, "name", gtw.Name) + } + // update accepted condition + status.UpdateGatewayStatusAcceptedCondition(gtw, true) + // update address field and programmed condition + status.UpdateGatewayStatusProgrammedCondition(gtw, svc, envoyObj, r.store.listNodeAddresses()...) key := utils.NamespacedName(gtw) @@ -576,25 +592,3 @@ func (r *gatewayAPIReconciler) updateStatusForGateway(ctx context.Context, gtw * }), }) } - -// updateGatewayAddressAndProgrammingStatus updates the Gateway status with the envoy service address and the envoy -// deployment or daemonset programming status. -func (r *gatewayAPIReconciler) updateGatewayAddressAndProgrammingStatus(ctx context.Context, gtw *gwapiv1.Gateway) { - // Get envoyObjects - envoyObj, err := r.envoyObjectForGateway(ctx, gtw) - if err != nil { - r.log.Info("failed to get Deployment for gateway", - "namespace", gtw.Namespace, "name", gtw.Name) - } - - // Get service - svc, err := r.envoyServiceForGateway(ctx, gtw) - if err != nil { - r.log.Info("failed to get Service for gateway", - "namespace", gtw.Namespace, "name", gtw.Name) - } - // update accepted condition - status.UpdateGatewayStatusAcceptedCondition(gtw, true) - // update address field and programmed condition - status.UpdateGatewayStatusProgrammedCondition(gtw, svc, envoyObj, r.store.listNodeAddresses()...) -} From f8f2e68a39b357b701c18a8e6d31d90174d38cd3 Mon Sep 17 00:00:00 2001 From: Huabing Zhao Date: Wed, 11 Dec 2024 01:45:55 +0000 Subject: [PATCH 10/21] avoid overriding the gateway status from Gateway API translator Signed-off-by: Huabing Zhao --- internal/provider/kubernetes/predicates.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/internal/provider/kubernetes/predicates.go b/internal/provider/kubernetes/predicates.go index 82fcfd328ef..7734a227d71 100644 --- a/internal/provider/kubernetes/predicates.go +++ b/internal/provider/kubernetes/predicates.go @@ -652,8 +652,13 @@ func (r *gatewayAPIReconciler) updateGatewayStatus(gateway *gwapiv1.Gateway) { // // Since the status does not reflect the actual changed status, we need to delete it first // to prevent it from being considered unchanged. This ensures that subscribers receive the update event. - r.resources.GatewayStatuses.Delete(utils.NamespacedName(gateway)) - r.resources.GatewayStatuses.Store(utils.NamespacedName(gateway), &gateway.Status) + gwName := utils.NamespacedName(gateway) + status:= &gateway.Status + if existing, ok := r.resources.GatewayStatuses.Load(gwName); ok { + status = existing + } + r.resources.GatewayStatuses.Delete(gwName) + r.resources.GatewayStatuses.Store(gwName, status) } func (r *gatewayAPIReconciler) handleNode(obj client.Object) bool { From 272348a9627c4d737cb93c9e15f9d453562a96a2 Mon Sep 17 00:00:00 2001 From: Huabing Zhao Date: Wed, 11 Dec 2024 01:50:50 +0000 Subject: [PATCH 11/21] minor wording Signed-off-by: Huabing Zhao --- internal/provider/kubernetes/predicates.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/provider/kubernetes/predicates.go b/internal/provider/kubernetes/predicates.go index 7734a227d71..8e68009b94e 100644 --- a/internal/provider/kubernetes/predicates.go +++ b/internal/provider/kubernetes/predicates.go @@ -654,6 +654,7 @@ func (r *gatewayAPIReconciler) updateGatewayStatus(gateway *gwapiv1.Gateway) { // to prevent it from being considered unchanged. This ensures that subscribers receive the update event. gwName := utils.NamespacedName(gateway) status:= &gateway.Status + // Use the existing status if it exists to avoid losing the status calculated by the Gateway API translator. if existing, ok := r.resources.GatewayStatuses.Load(gwName); ok { status = existing } From 3d2009018e5764835dcbdb4fa1619845ec0b8c90 Mon Sep 17 00:00:00 2001 From: Huabing Zhao Date: Wed, 11 Dec 2024 02:16:42 +0000 Subject: [PATCH 12/21] minor wording Signed-off-by: Huabing Zhao --- internal/provider/kubernetes/predicates.go | 16 ++++++++-------- .../out/xds-ir/http-route-timeout.clusters.yaml | 4 ---- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/internal/provider/kubernetes/predicates.go b/internal/provider/kubernetes/predicates.go index 8e68009b94e..ae457b3920b 100644 --- a/internal/provider/kubernetes/predicates.go +++ b/internal/provider/kubernetes/predicates.go @@ -644,21 +644,21 @@ func (r *gatewayAPIReconciler) updateStatusForGatewaysUnderGatewayClass(ctx cont // updateGatewayStatus triggers a status update for the Gateway. func (r *gatewayAPIReconciler) updateGatewayStatus(gateway *gwapiv1.Gateway) { - // The status added to GatewayStatuses is solely used to trigger the status updater - // and does not reflect the real changed status. - // - // The status updater will check the Envoy Proxy service to get the addresses of the Gateway, - // and check the Envoy Proxy Deployment/DaemonSet to get the status of the Gateway workload. - // - // Since the status does not reflect the actual changed status, we need to delete it first - // to prevent it from being considered unchanged. This ensures that subscribers receive the update event. gwName := utils.NamespacedName(gateway) status:= &gateway.Status // Use the existing status if it exists to avoid losing the status calculated by the Gateway API translator. if existing, ok := r.resources.GatewayStatuses.Load(gwName); ok { status = existing } + + // Since the status does not reflect the actual changed status, we need to delete it first + // to prevent it from being considered unchanged. This ensures that subscribers receive the update event. r.resources.GatewayStatuses.Delete(gwName) + // The status that is stored in the GatewayStatuses GatewayStatuses is solely used to trigger the status updater + // and does not reflect the real changed status. + // + // The status updater will check the Envoy Proxy service to get the addresses of the Gateway, + // and check the Envoy Proxy Deployment/DaemonSet to get the status of the Gateway workload. r.resources.GatewayStatuses.Store(gwName, status) } diff --git a/internal/xds/translator/testdata/out/xds-ir/http-route-timeout.clusters.yaml b/internal/xds/translator/testdata/out/xds-ir/http-route-timeout.clusters.yaml index a52b95baeae..b70cda2ec59 100644 --- a/internal/xds/translator/testdata/out/xds-ir/http-route-timeout.clusters.yaml +++ b/internal/xds/translator/testdata/out/xds-ir/http-route-timeout.clusters.yaml @@ -64,7 +64,6 @@ ignoreHealthOnHostRemoval: true lbPolicy: LEAST_REQUEST name: fourth-route-dest - outlierDetection: {} perConnectionBufferLimitBytes: 32768 type: EDS - circuitBreakers: @@ -82,7 +81,6 @@ ignoreHealthOnHostRemoval: true lbPolicy: LEAST_REQUEST name: fifth-route-dest - outlierDetection: {} perConnectionBufferLimitBytes: 32768 type: EDS - circuitBreakers: @@ -100,7 +98,6 @@ ignoreHealthOnHostRemoval: true lbPolicy: LEAST_REQUEST name: sixth-route-dest - outlierDetection: {} perConnectionBufferLimitBytes: 32768 type: EDS - circuitBreakers: @@ -118,6 +115,5 @@ ignoreHealthOnHostRemoval: true lbPolicy: LEAST_REQUEST name: seventh-route-dest - outlierDetection: {} perConnectionBufferLimitBytes: 32768 type: EDS From c7b7f44e1277f88404269733a24a6121b7c04f17 Mon Sep 17 00:00:00 2001 From: Huabing Zhao Date: Wed, 11 Dec 2024 02:50:22 +0000 Subject: [PATCH 13/21] only subscribe to status updates upon acquiring leadership Signed-off-by: Huabing Zhao --- internal/provider/kubernetes/controller.go | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/internal/provider/kubernetes/controller.go b/internal/provider/kubernetes/controller.go index 314792b3ebc..a3224c13e7a 100644 --- a/internal/provider/kubernetes/controller.go +++ b/internal/provider/kubernetes/controller.go @@ -129,13 +129,28 @@ func newGatewayAPIController(mgr manager.Manager, cfg *config.Server, su Updater } r.log.Info("created gatewayapi controller") - // Subscribe to status updates - r.subscribeAndUpdateStatus(ctx, cfg.EnvoyGateway.EnvoyGatewaySpec.ExtensionManager != nil) - // Watch resources if err := r.watchResources(ctx, mgr, c); err != nil { return fmt.Errorf("error watching resources: %w", err) } + + // When leader election is active, only subscribe to status updates upon acquiring leadership to avoid multiple + // EG instances processing status updates. + if cfg.EnvoyGateway.Provider.Type == egv1a1.ProviderTypeKubernetes && + !ptr.Deref(cfg.EnvoyGateway.Provider.Kubernetes.LeaderElection.Disable, false) { + go func() { + select { + case <-ctx.Done(): + return + case <-mgr.Elected(): + // Subscribe to status updates + r.subscribeAndUpdateStatus(ctx, cfg.EnvoyGateway.EnvoyGatewaySpec.ExtensionManager != nil) + } + }() + } else { + // Subscribe to status updates + r.subscribeAndUpdateStatus(ctx, cfg.EnvoyGateway.EnvoyGatewaySpec.ExtensionManager != nil) + } return nil } From be6df9ae168ed8edbfa4cb4a555384767c5bb6fc Mon Sep 17 00:00:00 2001 From: Huabing Zhao Date: Wed, 11 Dec 2024 03:03:58 +0000 Subject: [PATCH 14/21] fix lint Signed-off-by: Huabing Zhao --- internal/provider/kubernetes/predicates.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/provider/kubernetes/predicates.go b/internal/provider/kubernetes/predicates.go index ae457b3920b..16bb9361b04 100644 --- a/internal/provider/kubernetes/predicates.go +++ b/internal/provider/kubernetes/predicates.go @@ -645,7 +645,7 @@ func (r *gatewayAPIReconciler) updateStatusForGatewaysUnderGatewayClass(ctx cont // updateGatewayStatus triggers a status update for the Gateway. func (r *gatewayAPIReconciler) updateGatewayStatus(gateway *gwapiv1.Gateway) { gwName := utils.NamespacedName(gateway) - status:= &gateway.Status + status := &gateway.Status // Use the existing status if it exists to avoid losing the status calculated by the Gateway API translator. if existing, ok := r.resources.GatewayStatuses.Load(gwName); ok { status = existing From 9aa5c7c0e81f766821b39011d24298632f696a32 Mon Sep 17 00:00:00 2001 From: Huabing Zhao Date: Wed, 11 Dec 2024 10:48:23 +0000 Subject: [PATCH 15/21] minor wording Signed-off-by: Huabing Zhao --- internal/provider/kubernetes/controller.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/internal/provider/kubernetes/controller.go b/internal/provider/kubernetes/controller.go index a3224c13e7a..6803de62d5d 100644 --- a/internal/provider/kubernetes/controller.go +++ b/internal/provider/kubernetes/controller.go @@ -134,8 +134,7 @@ func newGatewayAPIController(mgr manager.Manager, cfg *config.Server, su Updater return fmt.Errorf("error watching resources: %w", err) } - // When leader election is active, only subscribe to status updates upon acquiring leadership to avoid multiple - // EG instances processing status updates. + // When leader election is enabled, only subscribe to status updates upon acquiring leadership. if cfg.EnvoyGateway.Provider.Type == egv1a1.ProviderTypeKubernetes && !ptr.Deref(cfg.EnvoyGateway.Provider.Kubernetes.LeaderElection.Disable, false) { go func() { @@ -143,12 +142,10 @@ func newGatewayAPIController(mgr manager.Manager, cfg *config.Server, su Updater case <-ctx.Done(): return case <-mgr.Elected(): - // Subscribe to status updates r.subscribeAndUpdateStatus(ctx, cfg.EnvoyGateway.EnvoyGatewaySpec.ExtensionManager != nil) } }() } else { - // Subscribe to status updates r.subscribeAndUpdateStatus(ctx, cfg.EnvoyGateway.EnvoyGatewaySpec.ExtensionManager != nil) } return nil From 75dcfbdad75681b8e4dabddd410b43d4b8c8aae3 Mon Sep 17 00:00:00 2001 From: Huabing Zhao Date: Thu, 12 Dec 2024 06:54:31 +0000 Subject: [PATCH 16/21] address comment Signed-off-by: Huabing Zhao --- internal/envoygateway/config/config.go | 5 +++-- internal/infrastructure/runner/runner.go | 8 ++------ internal/provider/kubernetes/controller.go | 8 ++------ internal/provider/kubernetes/kubernetes.go | 5 ++--- 4 files changed, 9 insertions(+), 17 deletions(-) diff --git a/internal/envoygateway/config/config.go b/internal/envoygateway/config/config.go index c842c184e4c..0609f316acc 100644 --- a/internal/envoygateway/config/config.go +++ b/internal/envoygateway/config/config.go @@ -7,6 +7,7 @@ package config import ( "errors" + "sync" egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1" "github.com/envoyproxy/gateway/api/v1alpha1/validation" @@ -37,7 +38,7 @@ type Server struct { // Logger is the logr implementation used by Envoy Gateway. Logger logging.Logger // Elected chan is used to signal what a leader is elected - Elected chan struct{} + Elected *sync.WaitGroup } // New returns a Server with default parameters. @@ -48,7 +49,7 @@ func New() (*Server, error) { DNSDomain: env.Lookup("KUBERNETES_CLUSTER_DOMAIN", DefaultDNSDomain), // the default logger Logger: logging.DefaultLogger(egv1a1.LogLevelInfo), - Elected: make(chan struct{}), + Elected: &sync.WaitGroup{}, }, nil } diff --git a/internal/infrastructure/runner/runner.go b/internal/infrastructure/runner/runner.go index 6896a6e5a16..3344ca0d349 100644 --- a/internal/infrastructure/runner/runner.go +++ b/internal/infrastructure/runner/runner.go @@ -72,12 +72,8 @@ func (r *Runner) Start(ctx context.Context) (err error) { if r.EnvoyGateway.Provider.Type == egv1a1.ProviderTypeKubernetes && !ptr.Deref(r.EnvoyGateway.Provider.Kubernetes.LeaderElection.Disable, false) { go func() { - select { - case <-ctx.Done(): - return - case <-r.Elected: - initInfra() - } + r.Elected.Wait() + initInfra() }() return } diff --git a/internal/provider/kubernetes/controller.go b/internal/provider/kubernetes/controller.go index 6803de62d5d..bcb6fa8772c 100644 --- a/internal/provider/kubernetes/controller.go +++ b/internal/provider/kubernetes/controller.go @@ -138,12 +138,8 @@ func newGatewayAPIController(mgr manager.Manager, cfg *config.Server, su Updater if cfg.EnvoyGateway.Provider.Type == egv1a1.ProviderTypeKubernetes && !ptr.Deref(cfg.EnvoyGateway.Provider.Kubernetes.LeaderElection.Disable, false) { go func() { - select { - case <-ctx.Done(): - return - case <-mgr.Elected(): - r.subscribeAndUpdateStatus(ctx, cfg.EnvoyGateway.EnvoyGatewaySpec.ExtensionManager != nil) - } + cfg.Elected.Wait() + r.subscribeAndUpdateStatus(ctx, cfg.EnvoyGateway.EnvoyGatewaySpec.ExtensionManager != nil) }() } else { r.subscribeAndUpdateStatus(ctx, cfg.EnvoyGateway.EnvoyGatewaySpec.ExtensionManager != nil) diff --git a/internal/provider/kubernetes/kubernetes.go b/internal/provider/kubernetes/kubernetes.go index 4fdbc329dd0..de40a18138a 100644 --- a/internal/provider/kubernetes/kubernetes.go +++ b/internal/provider/kubernetes/kubernetes.go @@ -109,11 +109,10 @@ func New(cfg *rest.Config, svr *ec.Server, resources *message.ProviderResources) return nil, fmt.Errorf("unable to set up ready check: %w", err) } - // Emit elected & continue with envoyObjects of infra resources + // Emit elected & continue with the tasks that require leadership. go func() { <-mgr.Elected() - // WARN: DO NOT CLOSE IT - svr.Elected <- struct{}{} + svr.Elected.Done() }() return &Provider{ From ba06d2b211b3835476a390538126eaa23bd6bb99 Mon Sep 17 00:00:00 2001 From: Huabing Zhao Date: Thu, 12 Dec 2024 06:56:49 +0000 Subject: [PATCH 17/21] address comment Signed-off-by: Huabing Zhao --- internal/envoygateway/config/config.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/internal/envoygateway/config/config.go b/internal/envoygateway/config/config.go index 0609f316acc..b08a09a829f 100644 --- a/internal/envoygateway/config/config.go +++ b/internal/envoygateway/config/config.go @@ -43,14 +43,16 @@ type Server struct { // New returns a Server with default parameters. func New() (*Server, error) { - return &Server{ + server:= &Server{ EnvoyGateway: egv1a1.DefaultEnvoyGateway(), Namespace: env.Lookup("ENVOY_GATEWAY_NAMESPACE", DefaultNamespace), DNSDomain: env.Lookup("KUBERNETES_CLUSTER_DOMAIN", DefaultDNSDomain), // the default logger Logger: logging.DefaultLogger(egv1a1.LogLevelInfo), Elected: &sync.WaitGroup{}, - }, nil + } + server.Elected.Add(1) + return server, nil } // Validate validates a Server config. From d77aa7d433acc23a78a4c878b7c47aed41835498 Mon Sep 17 00:00:00 2001 From: Huabing Zhao Date: Thu, 12 Dec 2024 06:58:21 +0000 Subject: [PATCH 18/21] minor wording Signed-off-by: Huabing Zhao --- internal/envoygateway/config/config.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/envoygateway/config/config.go b/internal/envoygateway/config/config.go index b08a09a829f..a8d81f69a3a 100644 --- a/internal/envoygateway/config/config.go +++ b/internal/envoygateway/config/config.go @@ -51,6 +51,7 @@ func New() (*Server, error) { Logger: logging.DefaultLogger(egv1a1.LogLevelInfo), Elected: &sync.WaitGroup{}, } + // Block the tasks that are waiting for the leader to be elected server.Elected.Add(1) return server, nil } From 99a29109c89077971462bac4c2a1b418d6c47dbc Mon Sep 17 00:00:00 2001 From: Huabing Zhao Date: Thu, 12 Dec 2024 07:13:31 +0000 Subject: [PATCH 19/21] fix lint Signed-off-by: Huabing Zhao --- internal/envoygateway/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/envoygateway/config/config.go b/internal/envoygateway/config/config.go index a8d81f69a3a..af05dac0753 100644 --- a/internal/envoygateway/config/config.go +++ b/internal/envoygateway/config/config.go @@ -43,7 +43,7 @@ type Server struct { // New returns a Server with default parameters. func New() (*Server, error) { - server:= &Server{ + server := &Server{ EnvoyGateway: egv1a1.DefaultEnvoyGateway(), Namespace: env.Lookup("ENVOY_GATEWAY_NAMESPACE", DefaultNamespace), DNSDomain: env.Lookup("KUBERNETES_CLUSTER_DOMAIN", DefaultDNSDomain), From f36b72f958528de158da2045a80d906e15c8109d Mon Sep 17 00:00:00 2001 From: Huabing Zhao Date: Thu, 12 Dec 2024 07:24:44 +0000 Subject: [PATCH 20/21] minor change Signed-off-by: Huabing Zhao --- internal/provider/kubernetes/kubernetes.go | 30 +++++++++++----------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/internal/provider/kubernetes/kubernetes.go b/internal/provider/kubernetes/kubernetes.go index de40a18138a..56f96e70a18 100644 --- a/internal/provider/kubernetes/kubernetes.go +++ b/internal/provider/kubernetes/kubernetes.go @@ -36,40 +36,40 @@ type Provider struct { } // New creates a new Provider from the provided EnvoyGateway. -func New(cfg *rest.Config, svr *ec.Server, resources *message.ProviderResources) (*Provider, error) { +func New(restCfg *rest.Config, svrCfg *ec.Server, resources *message.ProviderResources) (*Provider, error) { // TODO: Decide which mgr opts should be exposed through envoygateway.provider.kubernetes API. mgrOpts := manager.Options{ Scheme: envoygateway.GetScheme(), - Logger: svr.Logger.Logger, + Logger: svrCfg.Logger.Logger, HealthProbeBindAddress: ":8081", LeaderElectionID: "5b9825d2.gateway.envoyproxy.io", - LeaderElectionNamespace: svr.Namespace, + LeaderElectionNamespace: svrCfg.Namespace, } log.SetLogger(mgrOpts.Logger) klog.SetLogger(mgrOpts.Logger) - if !ptr.Deref(svr.EnvoyGateway.Provider.Kubernetes.LeaderElection.Disable, false) { + if !ptr.Deref(svrCfg.EnvoyGateway.Provider.Kubernetes.LeaderElection.Disable, false) { mgrOpts.LeaderElection = true - if svr.EnvoyGateway.Provider.Kubernetes.LeaderElection.LeaseDuration != nil { - ld, err := time.ParseDuration(string(*svr.EnvoyGateway.Provider.Kubernetes.LeaderElection.LeaseDuration)) + if svrCfg.EnvoyGateway.Provider.Kubernetes.LeaderElection.LeaseDuration != nil { + ld, err := time.ParseDuration(string(*svrCfg.EnvoyGateway.Provider.Kubernetes.LeaderElection.LeaseDuration)) if err != nil { return nil, err } mgrOpts.LeaseDuration = ptr.To(ld) } - if svr.EnvoyGateway.Provider.Kubernetes.LeaderElection.RetryPeriod != nil { - rp, err := time.ParseDuration(string(*svr.EnvoyGateway.Provider.Kubernetes.LeaderElection.RetryPeriod)) + if svrCfg.EnvoyGateway.Provider.Kubernetes.LeaderElection.RetryPeriod != nil { + rp, err := time.ParseDuration(string(*svrCfg.EnvoyGateway.Provider.Kubernetes.LeaderElection.RetryPeriod)) if err != nil { return nil, err } mgrOpts.RetryPeriod = ptr.To(rp) } - if svr.EnvoyGateway.Provider.Kubernetes.LeaderElection.RenewDeadline != nil { - rd, err := time.ParseDuration(string(*svr.EnvoyGateway.Provider.Kubernetes.LeaderElection.RenewDeadline)) + if svrCfg.EnvoyGateway.Provider.Kubernetes.LeaderElection.RenewDeadline != nil { + rd, err := time.ParseDuration(string(*svrCfg.EnvoyGateway.Provider.Kubernetes.LeaderElection.RenewDeadline)) if err != nil { return nil, err } @@ -78,13 +78,13 @@ func New(cfg *rest.Config, svr *ec.Server, resources *message.ProviderResources) mgrOpts.Controller = config.Controller{NeedLeaderElection: ptr.To(false)} } - if svr.EnvoyGateway.NamespaceMode() { + if svrCfg.EnvoyGateway.NamespaceMode() { mgrOpts.Cache.DefaultNamespaces = make(map[string]cache.Config) - for _, watchNS := range svr.EnvoyGateway.Provider.Kubernetes.Watch.Namespaces { + for _, watchNS := range svrCfg.EnvoyGateway.Provider.Kubernetes.Watch.Namespaces { mgrOpts.Cache.DefaultNamespaces[watchNS] = cache.Config{} } } - mgr, err := ctrl.NewManager(cfg, mgrOpts) + mgr, err := ctrl.NewManager(restCfg, mgrOpts) if err != nil { return nil, fmt.Errorf("failed to create manager: %w", err) } @@ -95,7 +95,7 @@ func New(cfg *rest.Config, svr *ec.Server, resources *message.ProviderResources) } // Create and register the controllers with the manager. - if err := newGatewayAPIController(mgr, svr, updateHandler.Writer(), resources); err != nil { + if err := newGatewayAPIController(mgr, svrCfg, updateHandler.Writer(), resources); err != nil { return nil, fmt.Errorf("failted to create gatewayapi controller: %w", err) } @@ -112,7 +112,7 @@ func New(cfg *rest.Config, svr *ec.Server, resources *message.ProviderResources) // Emit elected & continue with the tasks that require leadership. go func() { <-mgr.Elected() - svr.Elected.Done() + svrCfg.Elected.Done() }() return &Provider{ From fb00fb0c1cc72c490696061244b23e63e5f8459f Mon Sep 17 00:00:00 2001 From: Huabing Zhao Date: Thu, 12 Dec 2024 07:30:16 +0000 Subject: [PATCH 21/21] release note Signed-off-by: Huabing Zhao --- release-notes/current.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/release-notes/current.yaml b/release-notes/current.yaml index 123759f7a84..3f281a27737 100644 --- a/release-notes/current.yaml +++ b/release-notes/current.yaml @@ -22,6 +22,7 @@ bug fixes: | Fixed BackendTLSPolicy didn't support using port name as the sectionName in the targetRefs Fixed reference grant from EnvoyExtensionPolicy to referenced ext-proc backend not respected Fixed BackendTrafficPolicy not applying to Gateway Route when Route has a Request Timeout defined + Fixed proxies connected to the secondary EG were not receiving xDS configuration # Enhancements that improve performance. performance improvements: |