From 373ef3247ea1600ad9854e64328f6877e49ceaca Mon Sep 17 00:00:00 2001 From: Den <6628668+den3tsou@users.noreply.github.com> Date: Wed, 13 Sep 2023 08:41:02 +1000 Subject: [PATCH] feat: watch resource with selectors (#1661) * add namespaceSelectors watch mode for Kubernetes provider Add namespaceSelectors watch mode for Kubernetes provider. Users will need to specify `EnvoyGateway.Provider.Type` and precisely one of `EnvoyGateway.Provider.Kubernetes.Wach.Namespaces` and `EnvoyGateway.Provider.Kubernetes.Wach.NamespaceSelectors` to set the KuberNtes wach mode. The namespaceSelectors doesn't change the namespace informers watch. The informer still watches all namespaces. The events which have Objects that are not under namesapces with the labels set by `NamespaceSelectors` are filtered out. Signed-off-by: Den Tsou * fix: fix typos Signed-off-by: Den Tsou * fix: fix a test and fix a bug that pointer wasn't checked Signed-off-by: Den Tsou * chore: remove an unneeded comment Signed-off-by: Den Tsou * chore: replace string type with KubernetesWatchModeType Signed-off-by: Den Tsou * fix: use right values for KubernetesWatchModeType Signed-off-by: Den Tsou * chore: run make generate Signed-off-by: Den Tsou * wip: update to check all object's namespaces returned by client * This is a protytpe. Refactoring is required to make the code more readable because the code is getting more complex after adding this logic * Update to check all labels of namespaces of objects returned by client * Fix a bug that wrong type was checked * Don't apply predicate to filter out the event related to GatewayClass because GatewayClass is cluster scoped object * Simple test to check if only certain number of gateway is returned. More test logic is indeed needed to be added Signed-off-by: Den Tsou * refactor: move namespace labels to struct field Signed-off-by: Den Tsou * resolve the issue casued by resolving conflicts Signed-off-by: Den Tsou * chore: add test for gateway Signed-off-by: Den Tsou * chore: clean up some todo and add a comment Signed-off-by: Den Tsou * chore: address PR comments * Update hasMatchingNamespaceLabels signature because labels is part of the struct field now * Remove the logic to check namespace of certificate ref because it is not necessary * Refactor the checkNamespaceLabels with new interface, so the code is more readable now Signed-off-by: Den Tsou * fix: fix typos Signed-off-by: Den Tsou * fix: fix the lint errors Signed-off-by: Den Tsou * refactor: revert back to get namespace from caller Signed-off-by: Den Tsou * test: add a integration test for the change Signed-off-by: Den Tsou --------- Signed-off-by: Den Tsou Signed-off-by: Den <6628668+den3tsou@users.noreply.github.com> Co-authored-by: Xunzhuo --- api/config/v1alpha1/envoygateway_types.go | 30 +- api/config/v1alpha1/zz_generated.deepcopy.go | 5 + docs/latest/api/config_types.md | 15 +- internal/provider/kubernetes/controller.go | 162 ++++++++- internal/provider/kubernetes/filters.go | 67 +++- internal/provider/kubernetes/kubernetes.go | 13 +- .../provider/kubernetes/kubernetes_test.go | 326 ++++++++++++++++++ internal/provider/kubernetes/predicates.go | 92 ++++- .../provider/kubernetes/predicates_test.go | 73 ++++ internal/provider/kubernetes/routes.go | 105 +++++- 10 files changed, 851 insertions(+), 37 deletions(-) diff --git a/api/config/v1alpha1/envoygateway_types.go b/api/config/v1alpha1/envoygateway_types.go index 690f4ae512b..df7b8eae33c 100644 --- a/api/config/v1alpha1/envoygateway_types.go +++ b/api/config/v1alpha1/envoygateway_types.go @@ -177,15 +177,37 @@ type EnvoyGatewayKubernetesProvider struct { OverwriteControlPlaneCerts bool `json:"overwrite_control_plane_certs,omitempty"` } +const ( + // KubernetesWatchModeTypeNamespaces indicates that the namespace watch mode is used. + KubernetesWatchModeTypeNamespaces = "Namespaces" + + // KubernetesWatchModeTypeNamespaceSelectors indicates that namespaceSelectors watch + // mode is used. + KubernetesWatchModeTypeNamespaceSelectors = "NamespaceSelectors" +) + +// KubernetesWatchModeType defines the type of KubernetesWatchMode +type KubernetesWatchModeType string + // KubernetesWatchMode holds the configuration for which input resources to watch and reconcile. type KubernetesWatchMode struct { + // Type indicates what watch mode to use. KubernetesWatchModeTypeNamespaces and + // KubernetesWatchModeTypeNamespaceSelectors are currently supported + // By default, when this field is unset or empty, Envoy Gateway will watch for input namespaced resources + // from all namespaces. + Type KubernetesWatchModeType + // Namespaces holds the list of namespaces that Envoy Gateway will watch for namespaced scoped // resources such as Gateway, HTTPRoute and Service. // Note that Envoy Gateway will continue to reconcile relevant cluster scoped resources such as - // GatewayClass that it is linked to. - // By default, when this field is unset or empty, Envoy Gateway will watch for input namespaced resources - // from all namespaces. - Namespaces []string `json:"namespaces,omitempty"` + // GatewayClass that it is linked to. Precisely one of Namespaces and NamespaceSelectors must be set + Namespaces []string + + // NamespaceSelectors holds a list of labels that namespaces have to have in order to be watched. + // Note this doesn't set the informer to watch the namespaces with the given labels. Informer still + // watches all namespaces. But the events for objects whois namespce have no given labels + // will be filtered out. Precisely one of Namespaces and NamespaceSelectors must be set + NamespaceSelectors []string `json:"namespaces,omitempty"` } // KubernetesDeployMode holds configuration for how to deploy managed resources such as the Envoy Proxy diff --git a/api/config/v1alpha1/zz_generated.deepcopy.go b/api/config/v1alpha1/zz_generated.deepcopy.go index 23d89812282..670bd007b9d 100644 --- a/api/config/v1alpha1/zz_generated.deepcopy.go +++ b/api/config/v1alpha1/zz_generated.deepcopy.go @@ -831,6 +831,11 @@ func (in *KubernetesWatchMode) DeepCopyInto(out *KubernetesWatchMode) { *out = make([]string, len(*in)) copy(*out, *in) } + if in.NamespaceSelectors != nil { + in, out := &in.NamespaceSelectors, &out.NamespaceSelectors + *out = make([]string, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KubernetesWatchMode. diff --git a/docs/latest/api/config_types.md b/docs/latest/api/config_types.md index caafa97c2bf..73bc1cab5c4 100644 --- a/docs/latest/api/config_types.md +++ b/docs/latest/api/config_types.md @@ -564,7 +564,20 @@ _Appears in:_ | Field | Description | | --- | --- | -| `namespaces` _string array_ | Namespaces holds the list of namespaces that Envoy Gateway will watch for namespaced scoped resources such as Gateway, HTTPRoute and Service. Note that Envoy Gateway will continue to reconcile relevant cluster scoped resources such as GatewayClass that it is linked to. By default, when this field is unset or empty, Envoy Gateway will watch for input namespaced resources from all namespaces. | +| `Type` _[KubernetesWatchModeType](#kuberneteswatchmodetype)_ | Type indicates what watch mode to use. KubernetesWatchModeTypeNamespaces and KubernetesWatchModeTypeNamespaceSelectors are currently supported By default, when this field is unset or empty, Envoy Gateway will watch for input namespaced resources from all namespaces. | +| `Namespaces` _string array_ | Namespaces holds the list of namespaces that Envoy Gateway will watch for namespaced scoped resources such as Gateway, HTTPRoute and Service. Note that Envoy Gateway will continue to reconcile relevant cluster scoped resources such as GatewayClass that it is linked to. Precisely one of Namespaces and NamespaceSelectors must be set | +| `namespaces` _string array_ | NamespaceSelectors holds a list of labels that namespaces have to have in order to be watched. Note this doesn't set the informer to watch the namespaces with the given labels. Informer still watches all namespaces. But the events for objects whois namespce have no given labels will be filtered out. Precisely one of Namespaces and NamespaceSelectors must be set | + + +## KubernetesWatchModeType + +_Underlying type:_ `string` + +KubernetesWatchModeType defines the type of KubernetesWatchMode + +_Appears in:_ +- [KubernetesWatchMode](#kuberneteswatchmode) + ## LiteralCustomTag diff --git a/internal/provider/kubernetes/controller.go b/internal/provider/kubernetes/controller.go index 8b35930fe2a..6e3b65258de 100644 --- a/internal/provider/kubernetes/controller.go +++ b/internal/provider/kubernetes/controller.go @@ -68,6 +68,7 @@ type gatewayAPIReconciler struct { classController gwapiv1b1.GatewayController store *kubernetesProviderStore namespace string + namespaceLabels []string envoyGateway *egcfgv1a1.EnvoyGateway resources *message.ProviderResources @@ -89,11 +90,22 @@ func newGatewayAPIController(mgr manager.Manager, cfg *config.Server, su status. } } + var namespaceLabels []string + byNamespaceSelector := cfg.EnvoyGateway.Provider != nil && + cfg.EnvoyGateway.Provider.Kubernetes != nil && + cfg.EnvoyGateway.Provider.Kubernetes.Watch != nil && + cfg.EnvoyGateway.Provider.Kubernetes.Watch.Type == egcfgv1a1.KubernetesWatchModeTypeNamespaceSelectors && + len(cfg.EnvoyGateway.Provider.Kubernetes.Watch.NamespaceSelectors) != 0 + if byNamespaceSelector { + namespaceLabels = cfg.EnvoyGateway.Provider.Kubernetes.Watch.NamespaceSelectors + } + r := &gatewayAPIReconciler{ client: mgr.GetClient(), log: cfg.Logger, classController: gwapiv1b1.GatewayController(cfg.EnvoyGateway.Gateway.ControllerName), namespace: cfg.Namespace, + namespaceLabels: namespaceLabels, statusUpdater: su, resources: resources, envoyPatchPolicyStatuses: eStatuses, @@ -425,7 +437,26 @@ func (r *gatewayAPIReconciler) findReferenceGrant(ctx context.Context, from, to return nil, fmt.Errorf("failed to list ReferenceGrants: %v", err) } - for _, refGrant := range refGrantList.Items { + refGrants := refGrantList.Items + if len(r.namespaceLabels) != 0 { + var rgs []gwapiv1a2.ReferenceGrant + for _, refGrant := range refGrants { + ns := refGrant.GetNamespace() + ok, err := r.checkObjectNamespaceLabels(ns) + if err != nil { + // TODO: should return? or just proceed? + return nil, fmt.Errorf("failed to check namespace labels for ReferenceGrant %s in namespace %s: %s", refGrant.GetName(), ns, err) + } + if !ok { + // TODO: should log? + continue + } + rgs = append(rgs, refGrant) + } + refGrants = rgs + } + + for _, refGrant := range refGrants { if refGrant.Namespace == to.namespace { for _, src := range refGrant.Spec.From { if src.Kind == gwapiv1a2.Kind(from.kind) && string(src.Namespace) == from.namespace { @@ -450,7 +481,25 @@ func (r *gatewayAPIReconciler) processGateways(ctx context.Context, acceptedGC * return err } - for _, gtw := range gatewayList.Items { + gateways := gatewayList.Items + if len(r.namespaceLabels) != 0 { + var gtws []gwapiv1b1.Gateway + for _, gtw := range gateways { + ns := gtw.GetNamespace() + ok, err := r.checkObjectNamespaceLabels(ns) + if err != nil { + // TODO: should return? or just proceed? + return fmt.Errorf("failed to check namespace labels for gateway %s in namespace %s: %s", gtw.GetName(), ns, err) + } + + if ok { + gtws = append(gtws, gtw) + } + } + gateways = gtws + } + + for _, gtw := range gateways { gtw := gtw r.log.Info("processing Gateway", "namespace", gtw.Namespace, "name", gtw.Name) resourceMap.allAssociatedNamespaces[gtw.Namespace] = struct{}{} @@ -1202,7 +1251,6 @@ func (r *gatewayAPIReconciler) subscribeAndUpdateStatus(ctx context.Context) { // watchResources watches gateway api resources. func (r *gatewayAPIReconciler) watchResources(ctx context.Context, mgr manager.Manager, c controller.Controller) error { - // Only enqueue GatewayClass objects that match this Envoy Gateway's controller name. if err := c.Watch( source.Kind(mgr.GetCache(), &gwapiv1b1.GatewayClass{}), handler.EnqueueRequestsFromMapFunc(r.enqueueClass), @@ -1212,20 +1260,30 @@ func (r *gatewayAPIReconciler) watchResources(ctx context.Context, mgr manager.M } // Only enqueue EnvoyProxy objects that match this Envoy Gateway's GatewayClass. + epPredicates := []predicate.Predicate{ + predicate.ResourceVersionChangedPredicate{}, + predicate.NewPredicateFuncs(r.hasManagedClass), + } + if len(r.namespaceLabels) != 0 { + epPredicates = append(epPredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) + } if err := c.Watch( source.Kind(mgr.GetCache(), &egcfgv1a1.EnvoyProxy{}), handler.EnqueueRequestsFromMapFunc(r.enqueueClass), - predicate.ResourceVersionChangedPredicate{}, - predicate.NewPredicateFuncs(r.hasManagedClass), + epPredicates..., ); err != nil { return err } // Watch Gateway CRUDs and reconcile affected GatewayClass. + gPredicates := []predicate.Predicate{predicate.NewPredicateFuncs(r.validateGatewayForReconcile)} + if len(r.namespaceLabels) != 0 { + gPredicates = append(gPredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) + } if err := c.Watch( source.Kind(mgr.GetCache(), &gwapiv1b1.Gateway{}), handler.EnqueueRequestsFromMapFunc(r.enqueueClass), - predicate.NewPredicateFuncs(r.validateGatewayForReconcile), + gPredicates..., ); err != nil { return err } @@ -1234,9 +1292,14 @@ func (r *gatewayAPIReconciler) watchResources(ctx context.Context, mgr manager.M } // Watch HTTPRoute CRUDs and process affected Gateways. + httprPredicates := []predicate.Predicate{} + if len(r.namespaceLabels) != 0 { + httprPredicates = append(httprPredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) + } if err := c.Watch( source.Kind(mgr.GetCache(), &gwapiv1b1.HTTPRoute{}), handler.EnqueueRequestsFromMapFunc(r.enqueueClass), + httprPredicates..., ); err != nil { return err } @@ -1245,9 +1308,14 @@ func (r *gatewayAPIReconciler) watchResources(ctx context.Context, mgr manager.M } // Watch GRPCRoute CRUDs and process affected Gateways. + grpcrPredicates := []predicate.Predicate{} + if len(r.namespaceLabels) != 0 { + grpcrPredicates = append(grpcrPredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) + } if err := c.Watch( source.Kind(mgr.GetCache(), &gwapiv1a2.GRPCRoute{}), handler.EnqueueRequestsFromMapFunc(r.enqueueClass), + grpcrPredicates..., ); err != nil { return err } @@ -1256,9 +1324,14 @@ func (r *gatewayAPIReconciler) watchResources(ctx context.Context, mgr manager.M } // Watch TLSRoute CRUDs and process affected Gateways. + tlsrPredicates := []predicate.Predicate{} + if len(r.namespaceLabels) != 0 { + tlsrPredicates = append(tlsrPredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) + } if err := c.Watch( source.Kind(mgr.GetCache(), &gwapiv1a2.TLSRoute{}), handler.EnqueueRequestsFromMapFunc(r.enqueueClass), + tlsrPredicates..., ); err != nil { return err } @@ -1267,9 +1340,14 @@ func (r *gatewayAPIReconciler) watchResources(ctx context.Context, mgr manager.M } // Watch UDPRoute CRUDs and process affected Gateways. + udprPredicates := []predicate.Predicate{} + if len(r.namespaceLabels) != 0 { + udprPredicates = append(udprPredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) + } if err := c.Watch( source.Kind(mgr.GetCache(), &gwapiv1a2.UDPRoute{}), handler.EnqueueRequestsFromMapFunc(r.enqueueClass), + udprPredicates..., ); err != nil { return err } @@ -1278,9 +1356,14 @@ func (r *gatewayAPIReconciler) watchResources(ctx context.Context, mgr manager.M } // Watch TCPRoute CRUDs and process affected Gateways. + tcprPredicates := []predicate.Predicate{} + if len(r.namespaceLabels) != 0 { + tcprPredicates = append(tcprPredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) + } if err := c.Watch( source.Kind(mgr.GetCache(), &gwapiv1a2.TCPRoute{}), handler.EnqueueRequestsFromMapFunc(r.enqueueClass), + tcprPredicates..., ); err != nil { return err } @@ -1289,10 +1372,15 @@ func (r *gatewayAPIReconciler) watchResources(ctx context.Context, mgr manager.M } // Watch Service CRUDs and process affected *Route objects. + servicePredicates := []predicate.Predicate{predicate.NewPredicateFuncs(r.validateServiceForReconcile)} + if len(r.namespaceLabels) != 0 { + servicePredicates = append(servicePredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) + } if err := c.Watch( source.Kind(mgr.GetCache(), &corev1.Service{}), handler.EnqueueRequestsFromMapFunc(r.enqueueClass), - predicate.NewPredicateFuncs(r.validateServiceForReconcile)); err != nil { + servicePredicates..., + ); err != nil { return err } @@ -1313,37 +1401,55 @@ func (r *gatewayAPIReconciler) watchResources(ctx context.Context, mgr manager.M } // Watch EndpointSlice CRUDs and process affected *Route objects. + esPredicates := []predicate.Predicate{predicate.NewPredicateFuncs(r.validateEndpointSliceForReconcile)} + if len(r.namespaceLabels) != 0 { + esPredicates = append(esPredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) + } if err := c.Watch( source.Kind(mgr.GetCache(), &discoveryv1.EndpointSlice{}), handler.EnqueueRequestsFromMapFunc(r.enqueueClass), - predicate.NewPredicateFuncs(r.validateEndpointSliceForReconcile)); err != nil { + esPredicates..., + ); err != nil { return err } // Watch Node CRUDs to update Gateway Address exposed by Service of type NodePort. // Node creation/deletion and ExternalIP updates would require update in the Gateway + nPredicates := []predicate.Predicate{predicate.NewPredicateFuncs(r.handleNode)} + if len(r.namespaceLabels) != 0 { + nPredicates = append(nPredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) + } // resource address. if err := c.Watch( source.Kind(mgr.GetCache(), &corev1.Node{}), handler.EnqueueRequestsFromMapFunc(r.enqueueClass), - predicate.NewPredicateFuncs(r.handleNode), + nPredicates..., ); err != nil { return err } // Watch Secret CRUDs and process affected Gateways. + secretPredicates := []predicate.Predicate{predicate.NewPredicateFuncs(r.validateSecretForReconcile)} + if len(r.namespaceLabels) != 0 { + secretPredicates = append(secretPredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) + } if err := c.Watch( source.Kind(mgr.GetCache(), &corev1.Secret{}), handler.EnqueueRequestsFromMapFunc(r.enqueueClass), - predicate.NewPredicateFuncs(r.validateSecretForReconcile), + secretPredicates..., ); err != nil { return err } // Watch ReferenceGrant CRUDs and process affected Gateways. + rgPredicates := []predicate.Predicate{} + if len(r.namespaceLabels) != 0 { + rgPredicates = append(rgPredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) + } if err := c.Watch( source.Kind(mgr.GetCache(), &gwapiv1a2.ReferenceGrant{}), handler.EnqueueRequestsFromMapFunc(r.enqueueClass), + rgPredicates..., ); err != nil { return err } @@ -1352,36 +1458,56 @@ func (r *gatewayAPIReconciler) watchResources(ctx context.Context, mgr manager.M } // Watch Deployment CRUDs and process affected Gateways. + dPredicates := []predicate.Predicate{predicate.NewPredicateFuncs(r.validateDeploymentForReconcile)} + if len(r.namespaceLabels) != 0 { + dPredicates = append(dPredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) + } if err := c.Watch( source.Kind(mgr.GetCache(), &appsv1.Deployment{}), handler.EnqueueRequestsFromMapFunc(r.enqueueClass), - predicate.NewPredicateFuncs(r.validateDeploymentForReconcile), + dPredicates..., ); err != nil { return err } // Watch AuthenticationFilter CRUDs and enqueue associated HTTPRoute objects. + afPredicates := []predicate.Predicate{predicate.NewPredicateFuncs(r.httpRoutesForAuthenticationFilter)} + if len(r.namespaceLabels) != 0 { + afPredicates = append(afPredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) + } if err := c.Watch( source.Kind(mgr.GetCache(), &egv1a1.AuthenticationFilter{}), handler.EnqueueRequestsFromMapFunc(r.enqueueClass), - predicate.NewPredicateFuncs(r.httpRoutesForAuthenticationFilter)); err != nil { + afPredicates..., + ); err != nil { return err } + rfPredicates := []predicate.Predicate{predicate.NewPredicateFuncs(r.httpRoutesForRateLimitFilter)} + if len(r.namespaceLabels) != 0 { + rfPredicates = append(rfPredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) + } // Watch RateLimitFilter CRUDs and enqueue associated HTTPRoute objects. if err := c.Watch( source.Kind(mgr.GetCache(), &egv1a1.RateLimitFilter{}), handler.EnqueueRequestsFromMapFunc(r.enqueueClass), - predicate.NewPredicateFuncs(r.httpRoutesForRateLimitFilter)); err != nil { + rfPredicates..., + ); err != nil { return err } // Watch EnvoyPatchPolicy if enabled in config + eppPredicates := []predicate.Predicate{} + if len(r.namespaceLabels) != 0 { + eppPredicates = append(eppPredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) + } if r.envoyGateway.ExtensionAPIs != nil && r.envoyGateway.ExtensionAPIs.EnableEnvoyPatchPolicy { // Watch EnvoyPatchPolicy CRUDs if err := c.Watch( source.Kind(mgr.GetCache(), &egv1a1.EnvoyPatchPolicy{}), - handler.EnqueueRequestsFromMapFunc(r.enqueueClass)); err != nil { + handler.EnqueueRequestsFromMapFunc(r.enqueueClass), + eppPredicates..., + ); err != nil { return err } } @@ -1389,11 +1515,17 @@ func (r *gatewayAPIReconciler) watchResources(ctx context.Context, mgr manager.M r.log.Info("Watching gatewayAPI related objects") // Watch any additional GVKs from the registered extension. + uPredicates := []predicate.Predicate{} + if len(r.namespaceLabels) != 0 { + uPredicates = append(uPredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) + } for _, gvk := range r.extGVKs { u := &unstructured.Unstructured{} u.SetGroupVersionKind(gvk) if err := c.Watch(source.Kind(mgr.GetCache(), u), - handler.EnqueueRequestsFromMapFunc(r.enqueueClass)); err != nil { + handler.EnqueueRequestsFromMapFunc(r.enqueueClass), + uPredicates..., + ); err != nil { return err } r.log.Info("Watching additional resource", "resource", gvk.String()) diff --git a/internal/provider/kubernetes/filters.go b/internal/provider/kubernetes/filters.go index be216881e4e..c7d402a8593 100644 --- a/internal/provider/kubernetes/filters.go +++ b/internal/provider/kubernetes/filters.go @@ -20,7 +20,26 @@ func (r *gatewayAPIReconciler) getAuthenticationFilters(ctx context.Context) ([] return nil, fmt.Errorf("failed to list AuthenticationFilters: %v", err) } - return authenList.Items, nil + authens := authenList.Items + if len(r.namespaceLabels) != 0 { + var as []egv1a1.AuthenticationFilter + for _, a := range authens { + ns := a.GetNamespace() + ok, err := r.checkObjectNamespaceLabels(ns) + if err != nil { + // TODO: should return? or just proceed? + return nil, fmt.Errorf("failed to check namespace labels for AuthenicationFilter %s in namespace %s: %s", a.GetName(), ns, err) + } + + if ok { + as = append(as, a) + } + } + + authens = as + } + + return authens, nil } func (r *gatewayAPIReconciler) getRateLimitFilters(ctx context.Context) ([]egv1a1.RateLimitFilter, error) { @@ -29,20 +48,56 @@ func (r *gatewayAPIReconciler) getRateLimitFilters(ctx context.Context) ([]egv1a return nil, fmt.Errorf("failed to list RateLimitFilters: %v", err) } - return rateLimitList.Items, nil + rateLimits := rateLimitList.Items + if len(r.namespaceLabels) != 0 { + var rls []egv1a1.RateLimitFilter + for _, rl := range rateLimits { + ns := rl.GetNamespace() + ok, err := r.checkObjectNamespaceLabels(ns) + if err != nil { + // TODO: should return? or just proceed? + return nil, fmt.Errorf("failed to check namespace labels for RateLimitFilter %s in namespace %s: %s", rl.GetName(), ns, err) + } + + if ok { + rls = append(rls, rl) + } + } + + rateLimits = rls + } + + return rateLimits, nil } func (r *gatewayAPIReconciler) getExtensionRefFilters(ctx context.Context) ([]unstructured.Unstructured, error) { var resourceItems []unstructured.Unstructured for _, gvk := range r.extGVKs { - uExtResources := &unstructured.UnstructuredList{} - uExtResources.SetGroupVersionKind(gvk) - if err := r.client.List(ctx, uExtResources); err != nil { + uExtResourceList := &unstructured.UnstructuredList{} + uExtResourceList.SetGroupVersionKind(gvk) + if err := r.client.List(ctx, uExtResourceList); err != nil { r.log.Info("no associated resources found for %s", gvk.String()) return nil, fmt.Errorf("failed to list %s: %v", gvk.String(), err) } - resourceItems = append(resourceItems, uExtResources.Items...) + uExtResources := uExtResourceList.Items + if len(r.namespaceLabels) != 0 { + var extRs []unstructured.Unstructured + for _, extR := range uExtResources { + ns := extR.GetNamespace() + ok, err := r.checkObjectNamespaceLabels(ns) + if err != nil { + // TODO: should return? or just proceed? + return nil, fmt.Errorf("failed to check namespace labels for ExtensionRefFilter %s in namespace %s: %s", extR.GetName(), ns, err) + } + if ok { + extRs = append(extRs, extR) + } + } + uExtResources = extRs + } + + resourceItems = append(resourceItems, uExtResources...) } return resourceItems, nil diff --git a/internal/provider/kubernetes/kubernetes.go b/internal/provider/kubernetes/kubernetes.go index 16d48dad356..c56151202e5 100644 --- a/internal/provider/kubernetes/kubernetes.go +++ b/internal/provider/kubernetes/kubernetes.go @@ -17,6 +17,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + "github.com/envoyproxy/gateway/api/config/v1alpha1" "github.com/envoyproxy/gateway/internal/envoygateway" "github.com/envoyproxy/gateway/internal/envoygateway/config" "github.com/envoyproxy/gateway/internal/message" @@ -45,10 +46,14 @@ func New(cfg *rest.Config, svr *config.Server, resources *message.ProviderResour }, } - if svr.EnvoyGateway.Provider != nil && - svr.EnvoyGateway.Provider.Kubernetes != nil && - (svr.EnvoyGateway.Provider.Kubernetes.Watch != nil) && - (len(svr.EnvoyGateway.Provider.Kubernetes.Watch.Namespaces) > 0) { + // TODO: implement config validation on the watch mode config + byNamespace := + svr.EnvoyGateway.Provider != nil && + svr.EnvoyGateway.Provider.Kubernetes != nil && + (svr.EnvoyGateway.Provider.Kubernetes.Watch != nil) && + (svr.EnvoyGateway.Provider.Kubernetes.Watch.Type == v1alpha1.KubernetesWatchModeTypeNamespaces) && + (len(svr.EnvoyGateway.Provider.Kubernetes.Watch.Namespaces) > 0) + if byNamespace { mgrOpts.Cache.DefaultNamespaces = make(map[string]cache.Config) for _, watchNS := range svr.EnvoyGateway.Provider.Kubernetes.Watch.Namespaces { mgrOpts.Cache.DefaultNamespaces[watchNS] = cache.Config{} diff --git a/internal/provider/kubernetes/kubernetes_test.go b/internal/provider/kubernetes/kubernetes_test.go index d05e823339e..13c7048c8ea 100644 --- a/internal/provider/kubernetes/kubernetes_test.go +++ b/internal/provider/kubernetes/kubernetes_test.go @@ -1693,6 +1693,7 @@ func TestNamespacedProvider(t *testing.T) { // config to watch a subset of namespaces svr.EnvoyGateway.Provider.Kubernetes = &egcfgv1a1.EnvoyGatewayKubernetesProvider{ Watch: &egcfgv1a1.KubernetesWatchMode{ + Type: egcfgv1a1.KubernetesWatchModeTypeNamespaces, Namespaces: []string{"ns1", "ns2"}, }, } @@ -1740,5 +1741,330 @@ func TestNamespacedProvider(t *testing.T) { cancel() require.NoError(t, testEnv.Stop()) }() +} + +func TestNamespaceSelectorsProvider(t *testing.T) { + // Setup the test environment. + testEnv, cliCfg, err := startEnv() + require.NoError(t, err) + + // Setup and start the kube provider. + svr, err := config.New() + require.NoError(t, err) + // config to watch a subset of namespaces + svr.EnvoyGateway.Provider.Kubernetes = &egcfgv1a1.EnvoyGatewayKubernetesProvider{ + Watch: &egcfgv1a1.KubernetesWatchMode{ + Type: egcfgv1a1.KubernetesWatchModeTypeNamespaceSelectors, + NamespaceSelectors: []string{"label-1", "label-2"}, + }, + } + + resources := new(message.ProviderResources) + ePatchPolicyStatuses := new(message.EnvoyPatchPolicyStatuses) + provider, err := New(cliCfg, svr, resources, ePatchPolicyStatuses) + require.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + go func() { + require.NoError(t, provider.Start(ctx)) + }() + + defer func() { + cancel() + require.NoError(t, testEnv.Stop()) + }() + + cli := provider.manager.GetClient() + watchedNS := &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{ + Name: "watched-ns", + Labels: map[string]string{"label-1": "true", "label-2": "true"}, + }} + require.NoError(t, cli.Create(ctx, watchedNS)) + nonWatchedNS := &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "non-watched-ns"}} + require.NoError(t, cli.Create(ctx, nonWatchedNS)) + + gcName := "gc-name" + gc := test.GetGatewayClass(gcName, egcfgv1a1.GatewayControllerName) + require.NoError(t, cli.Create(ctx, gc)) + + require.Eventually(t, func() bool { + if err := cli.Get(ctx, types.NamespacedName{Name: gc.Name}, gc); err != nil { + return false + } + + for _, cond := range gc.Status.Conditions { + if cond.Type == string(gwapiv1b1.GatewayClassConditionStatusAccepted) && cond.Status == metav1.ConditionTrue { + return true + } + } + + return false + }, defaultWait, defaultTick) + + defer func() { + require.NoError(t, cli.Delete(ctx, gc)) + }() + + // Create the gateways + watchedGateway := test.GetGateway(types.NamespacedName{Name: "watched-gateway", Namespace: watchedNS.Name}, gcName) + require.NoError(t, cli.Create(ctx, watchedGateway)) + defer func() { + require.NoError(t, cli.Delete(ctx, watchedGateway)) + }() + + nonWatchedGateway := test.GetGateway(types.NamespacedName{Name: "non-watched-gateway", Namespace: nonWatchedNS.Name}, gcName) + require.NoError(t, cli.Create(ctx, nonWatchedGateway)) + defer func() { + require.NoError(t, cli.Delete(ctx, nonWatchedGateway)) + }() + + require.Eventually(t, func() bool { + res, _ := resources.GatewayAPIResources.Load(gc.Name) + return res != nil && len(res.Gateways) == 1 + }, defaultWait, defaultTick) + + _, ok := resources.GatewayStatuses.Load(types.NamespacedName{Name: "non-watched-gateway", Namespace: nonWatchedNS.Name}) + require.Equal(t, false, ok) + + watchedAuthenFilter := test.GetAuthenticationFilter("watched-authen", watchedNS.Name) + require.NoError(t, cli.Create(ctx, watchedAuthenFilter)) + defer func() { + require.NoError(t, cli.Delete(ctx, watchedAuthenFilter)) + }() + + nonWatchedAuthenFilter := test.GetAuthenticationFilter("non-watched-authen", nonWatchedNS.Name) + require.NoError(t, cli.Create(ctx, nonWatchedAuthenFilter)) + defer func() { + require.NoError(t, cli.Delete(ctx, nonWatchedAuthenFilter)) + }() + + watchedRateLimitFilter := test.GetRateLimitFilter("watched-rate-limit-filter", watchedNS.Name) + require.NoError(t, cli.Create(ctx, watchedRateLimitFilter)) + + defer func() { + require.NoError(t, cli.Delete(ctx, watchedRateLimitFilter)) + }() + + nonWatchedRateLimitFilter := test.GetRateLimitFilter("non-watched-rate-limit-filter", nonWatchedNS.Name) + require.NoError(t, cli.Create(ctx, nonWatchedRateLimitFilter)) + + defer func() { + require.NoError(t, cli.Delete(ctx, nonWatchedRateLimitFilter)) + }() + + watchedSvc := test.GetService(types.NamespacedName{Namespace: watchedNS.Name, Name: "watched-service"}, nil, map[string]int32{ + "http": 80, + "https": 443, + }) + require.NoError(t, cli.Create(ctx, watchedSvc)) + + defer func() { + require.NoError(t, cli.Delete(ctx, watchedSvc)) + }() + nonWatchedSvc := test.GetService(types.NamespacedName{Namespace: nonWatchedNS.Name, Name: "non-watched-service"}, nil, map[string]int32{ + "http": 8001, + "https": 44300, + }) + require.NoError(t, cli.Create(ctx, nonWatchedSvc)) + + defer func() { + require.NoError(t, cli.Delete(ctx, nonWatchedSvc)) + }() + + watchedHTTPRoute := test.GetHTTPRoute( + types.NamespacedName{ + Namespace: watchedNS.Name, + Name: "watched-http-route", + }, + watchedGateway.Name, + types.NamespacedName{Name: watchedSvc.Name}) + watchedHTTPRoute.Spec.Rules[0].Filters = []gwapiv1b1.HTTPRouteFilter{ + { + Type: gwapiv1b1.HTTPRouteFilterExtensionRef, + ExtensionRef: &gwapiv1b1.LocalObjectReference{ + Group: gwapiv1b1.Group(egv1a1.GroupVersion.Group), + Kind: gwapiv1b1.Kind(egv1a1.KindAuthenticationFilter), + Name: gwapiv1b1.ObjectName(watchedAuthenFilter.Name), + }, + }, + { + Type: gwapiv1b1.HTTPRouteFilterExtensionRef, + ExtensionRef: &gwapiv1b1.LocalObjectReference{ + Group: gwapiv1b1.Group(egv1a1.GroupVersion.Group), + Kind: gwapiv1b1.Kind(egv1a1.KindRateLimitFilter), + Name: gwapiv1b1.ObjectName(watchedRateLimitFilter.Name), + }, + }, + } + require.NoError(t, cli.Create(ctx, watchedHTTPRoute)) + defer func() { + require.NoError(t, cli.Delete(ctx, watchedHTTPRoute)) + }() + + nonWatchedHTTPRoute := test.GetHTTPRoute( + types.NamespacedName{ + Namespace: nonWatchedNS.Name, + Name: "non-watched-http-route", + }, + nonWatchedGateway.Name, + types.NamespacedName{Name: nonWatchedSvc.Name}) + nonWatchedHTTPRoute.Spec.Rules[0].Filters = []gwapiv1b1.HTTPRouteFilter{ + { + Type: gwapiv1b1.HTTPRouteFilterExtensionRef, + ExtensionRef: &gwapiv1b1.LocalObjectReference{ + Group: gwapiv1b1.Group(egv1a1.GroupVersion.Group), + Kind: gwapiv1b1.Kind(egv1a1.KindAuthenticationFilter), + Name: gwapiv1b1.ObjectName(nonWatchedAuthenFilter.Name), + }, + }, + { + Type: gwapiv1b1.HTTPRouteFilterExtensionRef, + ExtensionRef: &gwapiv1b1.LocalObjectReference{ + Group: gwapiv1b1.Group(egv1a1.GroupVersion.Group), + Kind: gwapiv1b1.Kind(egv1a1.KindRateLimitFilter), + Name: gwapiv1b1.ObjectName(nonWatchedRateLimitFilter.Name), + }, + }, + } + require.NoError(t, cli.Create(ctx, nonWatchedHTTPRoute)) + defer func() { + require.NoError(t, cli.Delete(ctx, nonWatchedHTTPRoute)) + }() + + watchedGRPCRoute := test.GetGRPCRoute( + types.NamespacedName{ + Namespace: watchedNS.Name, + Name: "watched-grpc-route", + }, + watchedGateway.Name, + types.NamespacedName{Name: watchedSvc.Name}) + require.NoError(t, cli.Create(ctx, watchedGRPCRoute)) + defer func() { + require.NoError(t, cli.Delete(ctx, watchedGRPCRoute)) + }() + + nonWatchedGRPCRoute := test.GetGRPCRoute( + types.NamespacedName{ + Namespace: nonWatchedNS.Name, + Name: "non-watched-grpc-route", + }, + nonWatchedGateway.Name, + types.NamespacedName{Name: nonWatchedNS.Name}) + require.NoError(t, cli.Create(ctx, nonWatchedGRPCRoute)) + defer func() { + require.NoError(t, cli.Delete(ctx, nonWatchedGRPCRoute)) + }() + + watchedTCPRoute := test.GetTCPRoute( + types.NamespacedName{ + Namespace: watchedNS.Name, + Name: "watched-tcp-route", + }, + watchedGateway.Name, + types.NamespacedName{Name: watchedSvc.Name}) + require.NoError(t, cli.Create(ctx, watchedTCPRoute)) + defer func() { + require.NoError(t, cli.Delete(ctx, watchedTCPRoute)) + }() + + nonWatchedTCPRoute := test.GetTCPRoute( + types.NamespacedName{ + Namespace: nonWatchedNS.Name, + Name: "non-watched-tcp-route", + }, + nonWatchedGateway.Name, + types.NamespacedName{Name: nonWatchedNS.Name}) + require.NoError(t, cli.Create(ctx, nonWatchedTCPRoute)) + defer func() { + require.NoError(t, cli.Delete(ctx, nonWatchedTCPRoute)) + }() + + watchedTLSRoute := test.GetTLSRoute( + types.NamespacedName{ + Namespace: watchedNS.Name, + Name: "watched-tls-route", + }, + watchedGateway.Name, + types.NamespacedName{Name: watchedSvc.Name}) + require.NoError(t, cli.Create(ctx, watchedTLSRoute)) + defer func() { + require.NoError(t, cli.Delete(ctx, watchedTLSRoute)) + }() + + nonWatchedTLSRoute := test.GetTLSRoute( + types.NamespacedName{ + Namespace: nonWatchedNS.Name, + Name: "non-watched-tls-route", + }, + nonWatchedGateway.Name, + types.NamespacedName{Name: nonWatchedNS.Name}) + require.NoError(t, cli.Create(ctx, nonWatchedTLSRoute)) + defer func() { + require.NoError(t, cli.Delete(ctx, nonWatchedTLSRoute)) + }() + + watchedUDPRoute := test.GetUDPRoute( + types.NamespacedName{ + Namespace: watchedNS.Name, + Name: "watched-udp-route", + }, + watchedGateway.Name, + types.NamespacedName{Name: watchedSvc.Name}) + require.NoError(t, cli.Create(ctx, watchedUDPRoute)) + defer func() { + require.NoError(t, cli.Delete(ctx, watchedUDPRoute)) + }() + + nonWatchedUDPRoute := test.GetUDPRoute( + types.NamespacedName{ + Namespace: nonWatchedNS.Name, + Name: "non-watched-udp-route", + }, + nonWatchedGateway.Name, + types.NamespacedName{Name: nonWatchedNS.Name}) + require.NoError(t, cli.Create(ctx, nonWatchedUDPRoute)) + defer func() { + require.NoError(t, cli.Delete(ctx, nonWatchedUDPRoute)) + }() + + require.Eventually(t, func() bool { + res, _ := resources.GatewayAPIResources.Load(gc.Name) + // The service number dependes on the service created and the backendRef + return res != nil && len(res.Services) == 5 + }, defaultWait, defaultTick) + + require.Eventually(t, func() bool { + res, _ := resources.GatewayAPIResources.Load(gc.Name) + return res != nil && len(res.HTTPRoutes) == 1 + }, defaultWait, defaultTick) + + require.Eventually(t, func() bool { + res, _ := resources.GatewayAPIResources.Load(gc.Name) + return res != nil && len(res.TCPRoutes) == 1 + }, defaultWait, defaultTick) + + require.Eventually(t, func() bool { + res, _ := resources.GatewayAPIResources.Load(gc.Name) + return res != nil && len(res.TLSRoutes) == 1 + }, defaultWait, defaultTick) + + require.Eventually(t, func() bool { + res, _ := resources.GatewayAPIResources.Load(gc.Name) + return res != nil && len(res.UDPRoutes) == 1 + }, defaultWait, defaultTick) + + require.Eventually(t, func() bool { + res, _ := resources.GatewayAPIResources.Load(gc.Name) + return res != nil && len(res.GRPCRoutes) == 1 + }, defaultWait, defaultTick) + + require.Eventually(t, func() bool { + res, _ := resources.GatewayAPIResources.Load(gc.Name) + return res != nil && len(res.AuthenticationFilters) == 1 + }, defaultWait, defaultTick) + + require.Eventually(t, func() bool { + res, _ := resources.GatewayAPIResources.Load(gc.Name) + return res != nil && len(res.RateLimitFilters) == 1 + }, defaultWait, defaultTick) } diff --git a/internal/provider/kubernetes/predicates.go b/internal/provider/kubernetes/predicates.go index 9329cd59668..746f99c5ee3 100644 --- a/internal/provider/kubernetes/predicates.go +++ b/internal/provider/kubernetes/predicates.go @@ -43,6 +43,65 @@ func (r *gatewayAPIReconciler) hasMatchingController(obj client.Object) bool { return false } +// hasMatchingNamespaceLabels returns true if the namespace of provided object has +// the provided labels or false otherwise. +func (r *gatewayAPIReconciler) hasMatchingNamespaceLabels(obj client.Object) bool { + ok, err := r.checkObjectNamespaceLabels(obj.GetNamespace()) + if err != nil { + r.log.Error( + err, "failed to get Namespace", + "object", obj.GetObjectKind().GroupVersionKind().String(), + "name", obj.GetName()) + return false + } + return ok +} + +type NamespaceGetter interface { + GetNamespace() string +} + +// checkObjectNamespaceLabels checks if labels of namespace of the object is a subset of namespaceLabels +// TODO: check if param can be an interface, so the caller doesn't need to get the namespace before calling +// this function. +func (r *gatewayAPIReconciler) checkObjectNamespaceLabels(nsString string) (bool, error) { + // TODO: add validation here because some objects don't have namespace + ns := &corev1.Namespace{} + if err := r.client.Get( + context.Background(), + client.ObjectKey{ + Namespace: "", // Namespace object should have empty Namespace + Name: nsString, + }, + ns, + ); err != nil { + return false, err + } + return containAll(ns.Labels, r.namespaceLabels), nil +} + +func containAll(labels map[string]string, labelsToCheck []string) bool { + if len(labels) < len(labelsToCheck) { + return false + } + for _, l := range labelsToCheck { + if !contains(labels, l) { + return false + } + } + return true +} + +func contains(m map[string]string, i string) bool { + for k := range m { + if k == i { + return true + } + } + + return false +} + // validateGatewayForReconcile returns true if the provided object is a Gateway // using a GatewayClass matching the configured gatewayclass controller name. func (r *gatewayAPIReconciler) validateGatewayForReconcile(obj client.Object) bool { @@ -60,6 +119,7 @@ func (r *gatewayAPIReconciler) validateGatewayForReconcile(obj client.Object) bo } if gc.Spec.ControllerName != r.classController { + r.log.Info("gatewayclass controller name", gc.Spec.ControllerName, "class controller name", r.classController) r.log.Info("gatewayclass name for gateway doesn't match configured name", "namespace", gw.Namespace, "name", gw.Name) return false @@ -260,7 +320,9 @@ func (r *gatewayAPIReconciler) httpRoutesForAuthenticationFilter(obj client.Obje return false } - return len(httpRouteList.Items) != 0 + httpRoutes := r.filterHTTPRoutesByNamespaceLabels(httpRouteList.Items) + + return len(httpRoutes) != 0 } // httpRoutesForRateLimitFilter tries finding HTTPRoute referents of the provided @@ -282,7 +344,33 @@ func (r *gatewayAPIReconciler) httpRoutesForRateLimitFilter(obj client.Object) b return false } - return len(httpRouteList.Items) != 0 + httpRoutes := r.filterHTTPRoutesByNamespaceLabels(httpRouteList.Items) + + return len(httpRoutes) != 0 +} + +func (r *gatewayAPIReconciler) filterHTTPRoutesByNamespaceLabels(httpRoutes []gwapiv1b1.HTTPRoute) []gwapiv1b1.HTTPRoute { + if len(r.namespaceLabels) == 0 { + return httpRoutes + } + + var routes []gwapiv1b1.HTTPRoute + for _, route := range httpRoutes { + ns := route.GetNamespace() + ok, err := r.checkObjectNamespaceLabels(ns) + if err != nil { + r.log.Error(err, "failed to check namespace labels for HTTPRoute", + "namespace", ns, + "name", route.GetName(), + ) + continue + } + + if ok { + routes = append(routes, route) + } + } + return routes } // envoyDeploymentForGateway returns the Envoy Deployment, returning nil if the Deployment doesn't exist. diff --git a/internal/provider/kubernetes/predicates_test.go b/internal/provider/kubernetes/predicates_test.go index 94410e9740a..a716c39a0af 100644 --- a/internal/provider/kubernetes/predicates_test.go +++ b/internal/provider/kubernetes/predicates_test.go @@ -9,6 +9,8 @@ import ( "testing" "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake" @@ -28,6 +30,7 @@ func TestGatewayClassHasMatchingController(t *testing.T) { testCases := []struct { name string obj client.Object + client client.Client expect bool }{ { @@ -59,6 +62,76 @@ func TestGatewayClassHasMatchingController(t *testing.T) { } } +// TestGatewayClassHasMatchingNamespaceLabels tests the hasMatchingNamespaceLabels +// predicate function. +func TestGatewayClassHasMatchingNamespaceLabels(t *testing.T) { + ns := "namespace-1" + testCases := []struct { + name string + labels []string + namespaceLabels []string + expect bool + }{ + { + name: "matching one label when namespace has one label", + labels: []string{"label-1"}, + namespaceLabels: []string{"label-1"}, + expect: true, + }, + { + name: "matching one label when namespace has two labels", + labels: []string{"label-1"}, + namespaceLabels: []string{"label-1", "label-2"}, + expect: true, + }, + { + name: "namespace has less labels than the specified labels", + labels: []string{"label-1", "label-2"}, + namespaceLabels: []string{"label-1"}, + expect: false, + }, + } + + logger := logging.DefaultLogger(v1alpha1.LogLevelInfo) + + for _, tc := range testCases { + tc := tc + + namespaceLabelsToMap := make(map[string]string) + for _, l := range tc.namespaceLabels { + namespaceLabelsToMap[l] = "" + } + + r := gatewayAPIReconciler{ + classController: v1alpha1.GatewayControllerName, + namespaceLabels: tc.labels, + log: logger, + client: fakeclient.NewClientBuilder(). + WithScheme(envoygateway.GetScheme()). + WithObjects(&corev1.Namespace{ + TypeMeta: v1.TypeMeta{ + Kind: "Namespace", + APIVersion: "v1", + }, + ObjectMeta: v1.ObjectMeta{Name: ns, Labels: namespaceLabelsToMap}, + }). + Build(), + } + t.Run(tc.name, func(t *testing.T) { + res := r.hasMatchingNamespaceLabels( + test.GetHTTPRoute( + types.NamespacedName{ + Namespace: ns, + Name: "httproute-test", + }, + "scheduled-status-test", + types.NamespacedName{Name: "service"}, + )) + require.Equal(t, tc.expect, res) + }) + } +} + // TestValidateGatewayForReconcile tests the validateGatewayForReconcile // predicate function. func TestValidateGatewayForReconcile(t *testing.T) { diff --git a/internal/provider/kubernetes/routes.go b/internal/provider/kubernetes/routes.go index cc33db4e4f2..6edaec0b6b6 100644 --- a/internal/provider/kubernetes/routes.go +++ b/internal/provider/kubernetes/routes.go @@ -8,6 +8,7 @@ package kubernetes import ( "context" "errors" + "fmt" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime/schema" @@ -33,7 +34,25 @@ func (r *gatewayAPIReconciler) processTLSRoutes(ctx context.Context, gatewayName return err } - for _, tlsRoute := range tlsRouteList.Items { + tlsRoutes := tlsRouteList.Items + if len(r.namespaceLabels) != 0 { + var rts []gwapiv1a2.TLSRoute + for _, rt := range tlsRoutes { + ns := rt.GetNamespace() + ok, err := r.checkObjectNamespaceLabels(ns) + if err != nil { + // TODO: should return? or just proceed? + return fmt.Errorf("failed to check namespace labels for TLSRoute %s in namespace %s: %s", rt.GetName(), ns, err) + } + + if ok { + rts = append(rts, rt) + } + } + tlsRoutes = rts + } + + for _, tlsRoute := range tlsRoutes { tlsRoute := tlsRoute r.log.Info("processing TLSRoute", "namespace", tlsRoute.Namespace, "name", tlsRoute.Name) @@ -115,7 +134,26 @@ func (r *gatewayAPIReconciler) processGRPCRoutes(ctx context.Context, gatewayNam r.log.Error(err, "failed to list GRPCRoutes") return err } - for _, grpcRoute := range grpcRouteList.Items { + + grpcRoutes := grpcRouteList.Items + if len(r.namespaceLabels) != 0 { + var grs []gwapiv1a2.GRPCRoute + for _, gr := range grpcRoutes { + ns := gr.GetNamespace() + ok, err := r.checkObjectNamespaceLabels(ns) + if err != nil { + // TODO: should return? or just proceed? + return fmt.Errorf("failed to check namespace labels for GRPCRoute %s in namespace %s: %s", gr.GetName(), ns, err) + } + if ok { + grs = append(grs, gr) + } + } + + grpcRoutes = grs + } + + for _, grpcRoute := range grpcRoutes { grpcRoute := grpcRoute r.log.Info("processing GRPCRoute", "namespace", grpcRoute.Namespace, "name", grpcRoute.Name) @@ -268,7 +306,26 @@ func (r *gatewayAPIReconciler) processHTTPRoutes(ctx context.Context, gatewayNam r.log.Error(err, "failed to list HTTPRoutes") return err } - for _, httpRoute := range httpRouteList.Items { + + httpRoutes := httpRouteList.Items + if len(r.namespaceLabels) != 0 { + var hrs []gwapiv1b1.HTTPRoute + for _, hr := range httpRoutes { + ns := hr.GetNamespace() + ok, err := r.checkObjectNamespaceLabels(ns) + if err != nil { + // TODO: should return? or just proceed? + return fmt.Errorf("failed to check namespace labels for HTTPRoute %s in namespace %s: %s", hr.GetName(), ns, err) + } + + if ok { + hrs = append(hrs, hr) + } + } + httpRoutes = hrs + } + + for _, httpRoute := range httpRoutes { httpRoute := httpRoute r.log.Info("processing HTTPRoute", "namespace", httpRoute.Namespace, "name", httpRoute.Name) @@ -447,7 +504,26 @@ func (r *gatewayAPIReconciler) processTCPRoutes(ctx context.Context, gatewayName return err } - for _, tcpRoute := range tcpRouteList.Items { + tcpRoutes := tcpRouteList.Items + if len(r.namespaceLabels) != 0 { + var trs []gwapiv1a2.TCPRoute + for _, tr := range tcpRoutes { + ns := tr.GetNamespace() + ok, err := r.checkObjectNamespaceLabels(ns) + if err != nil { + // TODO: should return? or just proceed? + return fmt.Errorf("failed to check namespace labels for TCPRoute %s in namespace %s: %s", tr.GetName(), ns, err) + } + + if ok { + trs = append(trs, tr) + } + } + + tcpRoutes = trs + } + + for _, tcpRoute := range tcpRoutes { tcpRoute := tcpRoute r.log.Info("processing TCPRoute", "namespace", tcpRoute.Namespace, "name", tcpRoute.Name) @@ -509,7 +585,26 @@ func (r *gatewayAPIReconciler) processUDPRoutes(ctx context.Context, gatewayName return err } - for _, udpRoute := range udpRouteList.Items { + udpRoutes := udpRouteList.Items + if len(r.namespaceLabels) != 0 { + var urs []gwapiv1a2.UDPRoute + for _, ur := range udpRoutes { + ns := ur.GetNamespace() + ok, err := r.checkObjectNamespaceLabels(ns) + if err != nil { + // TODO: should return? or just proceed? + return fmt.Errorf("failed to check namespace labels for UDPRoute %s in namespace %s: %s", ur.GetName(), ns, err) + } + + if ok { + urs = append(urs, ur) + } + } + + udpRoutes = urs + } + + for _, udpRoute := range udpRoutes { udpRoute := udpRoute r.log.Info("processing UDPRoute", "namespace", udpRoute.Namespace, "name", udpRoute.Name)