From 05a49b28c213a1b70b18c3fcca9374511d683424 Mon Sep 17 00:00:00 2001 From: bitliu Date: Thu, 26 Oct 2023 16:04:45 +0800 Subject: [PATCH] feat: add subscribed message metrics instrumentations Signed-off-by: bitliu --- internal/gatewayapi/runner/runner.go | 6 +- internal/globalratelimit/runner/runner.go | 5 +- internal/infrastructure/runner/runner.go | 6 +- internal/message/metrics.go | 22 ++++ internal/message/watchutil.go | 46 +++++++- internal/message/watchutil_test.go | 8 +- internal/provider/kubernetes/controller.go | 127 ++++++++++++--------- internal/xds/server/runner/runner.go | 6 +- internal/xds/translator/runner/runner.go | 5 +- 9 files changed, 160 insertions(+), 71 deletions(-) create mode 100644 internal/message/metrics.go diff --git a/internal/gatewayapi/runner/runner.go b/internal/gatewayapi/runner/runner.go index 941286b2f58f..9a03f7228908 100644 --- a/internal/gatewayapi/runner/runner.go +++ b/internal/gatewayapi/runner/runner.go @@ -49,8 +49,8 @@ func (r *Runner) Start(ctx context.Context) (err error) { } func (r *Runner) subscribeAndTranslate(ctx context.Context) { - message.HandleSubscription(r.ProviderResources.GatewayAPIResources.Subscribe(ctx), - func(update message.Update[string, *gatewayapi.Resources]) { + message.HandleSubscription(message.Metadata{Runner: string(v1alpha1.LogComponentGatewayAPIRunner), Resource: "gatewayapi-resources"}, r.ProviderResources.GatewayAPIResources.Subscribe(ctx), + func(update message.Update[string, *gatewayapi.Resources], errChan chan error) { r.Logger.Info("received an update") val := update.Value @@ -93,6 +93,7 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) { for key, val := range result.InfraIR { if err := val.Validate(); err != nil { r.Logger.Error(err, "unable to validate infra ir, skipped sending it") + errChan <- err } else { r.InfraIR.Store(key, val) newKeys = append(newKeys, key) @@ -102,6 +103,7 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) { for key, val := range result.XdsIR { if err := val.Validate(); err != nil { r.Logger.Error(err, "unable to validate xds ir, skipped sending it") + errChan <- err } else { r.XdsIR.Store(key, val) } diff --git a/internal/globalratelimit/runner/runner.go b/internal/globalratelimit/runner/runner.go index 8e64322965f0..5662f56c916f 100644 --- a/internal/globalratelimit/runner/runner.go +++ b/internal/globalratelimit/runner/runner.go @@ -111,13 +111,14 @@ func (r *Runner) serveXdsConfigServer(ctx context.Context) { func (r *Runner) subscribeAndTranslate(ctx context.Context) { // Subscribe to resources. - message.HandleSubscription(r.XdsIR.Subscribe(ctx), - func(update message.Update[string, *ir.Xds]) { + message.HandleSubscription(message.Metadata{Runner: string(v1alpha1.LogComponentGlobalRateLimitRunner), Resource: "xds-ir"}, r.XdsIR.Subscribe(ctx), + func(update message.Update[string, *ir.Xds], errChan chan error) { r.Logger.Info("received a notification") if update.Delete { if err := r.addNewSnapshot(ctx, nil); err != nil { r.Logger.Error(err, "failed to update the config snapshot") + errChan <- err } } else { // Translate to ratelimit xDS Config. diff --git a/internal/infrastructure/runner/runner.go b/internal/infrastructure/runner/runner.go index 98859a2e88bd..6163caa5b385 100644 --- a/internal/infrastructure/runner/runner.go +++ b/internal/infrastructure/runner/runner.go @@ -54,19 +54,21 @@ func (r *Runner) Start(ctx context.Context) (err error) { func (r *Runner) subscribeToProxyInfraIR(ctx context.Context) { // Subscribe to resources - message.HandleSubscription(r.InfraIR.Subscribe(ctx), - func(update message.Update[string, *ir.Infra]) { + message.HandleSubscription(message.Metadata{Runner: string(v1alpha1.LogComponentInfrastructureRunner), Resource: "infra-ir"}, r.InfraIR.Subscribe(ctx), + func(update message.Update[string, *ir.Infra], errChan chan error) { r.Logger.Info("received an update") val := update.Value if update.Delete { if err := r.mgr.DeleteProxyInfra(ctx, val); err != nil { r.Logger.Error(err, "failed to delete infra") + errChan <- err } } else { // Manage the proxy infra. if err := r.mgr.CreateOrUpdateProxyInfra(ctx, val); err != nil { r.Logger.Error(err, "failed to create new infra") + errChan <- err } } }, diff --git a/internal/message/metrics.go b/internal/message/metrics.go new file mode 100644 index 000000000000..7dd34f2adc72 --- /dev/null +++ b/internal/message/metrics.go @@ -0,0 +1,22 @@ +// Copyright Envoy Gateway Authors +// SPDX-License-Identifier: Apache-2.0 +// The full text of the Apache license is available in the LICENSE file at +// the root of the repo. + +package message + +import "github.com/envoyproxy/gateway/internal/metrics" + +var ( + messageDepth = metrics.NewGauge("message_depth", "Current depth of message queue.") + + messageSubscribedDurationSeconds = metrics.NewHistogram("message_subscribed_duration_seconds", "How long in seconds a subscribed message is handled.", []float64{0.001, 0.01, 0.1, 1, 5, 10}) + + messageSubscribedTotal = metrics.NewCounter("message_subscribed_total", "Total number of subscribed message.") + + messageSubscribedErrorsTotal = metrics.NewCounter("message_subscribed_errors_total", "Total number of subscribed message errors.") + + runnerLabel = metrics.NewLabel("runner") + + resourceLabel = metrics.NewLabel("resource") +) diff --git a/internal/message/watchutil.go b/internal/message/watchutil.go index 29cecc37a9fb..e66e1cd38b8b 100644 --- a/internal/message/watchutil.go +++ b/internal/message/watchutil.go @@ -6,11 +6,35 @@ package message import ( + "time" + + "github.com/envoyproxy/gateway/api/v1alpha1" + "github.com/envoyproxy/gateway/internal/logging" + "github.com/envoyproxy/gateway/internal/metrics" "github.com/telepresenceio/watchable" ) type Update[K comparable, V any] watchable.Update[K, V] +var logger = logging.DefaultLogger(v1alpha1.LogLevelInfo).WithName("watchable") + +type Metadata struct { + Runner string + Resource string +} + +func (m Metadata) LabelValues() []metrics.LabelValue { + labels := []metrics.LabelValue{} + if m.Runner != "" { + labels = append(labels, runnerLabel.Value(m.Runner)) + } + if m.Resource != "" { + labels = append(labels, resourceLabel.Value(m.Resource)) + } + + return labels +} + // HandleSubscription takes a channel returned by // watchable.Map.Subscribe() (or .SubscribeSubset()), and calls the // given function for each initial value in the map, and for any @@ -20,20 +44,36 @@ type Update[K comparable, V any] watchable.Update[K, V] // it handles the case where the watchable.Map already contains // entries before .Subscribe is called. func HandleSubscription[K comparable, V any]( + meta Metadata, subscription <-chan watchable.Snapshot[K, V], - handle func(Update[K, V]), + handle func(updateFunc Update[K, V], errChans chan error), ) { + errChans := make(chan error, 10) + go func() { + for err := range errChans { + logger.WithValues("runner", meta.Runner).Error(err, "observed an error") + messageSubscribedErrorsTotal.With(meta.LabelValues()...).Increment() + } + }() + if snapshot, ok := <-subscription; ok { for k, v := range snapshot.State { + startHandleTime := time.Now() handle(Update[K, V]{ Key: k, Value: v, - }) + }, errChans) + messageSubscribedTotal.With(meta.LabelValues()...).Increment() + messageSubscribedDurationSeconds.With(meta.LabelValues()...).Record(time.Since(startHandleTime).Seconds()) } } for snapshot := range subscription { + messageDepth.With(meta.LabelValues()...).Record(float64(len(subscription))) for _, update := range snapshot.Updates { - handle(Update[K, V](update)) + startHandleTime := time.Now() + handle(Update[K, V](update), errChans) + messageSubscribedTotal.With(meta.LabelValues()...).Increment() + messageSubscribedDurationSeconds.With(meta.LabelValues()...).Record(time.Since(startHandleTime).Seconds()) } } } diff --git a/internal/message/watchutil_test.go b/internal/message/watchutil_test.go index 39674ed7eec8..042990421c6f 100644 --- a/internal/message/watchutil_test.go +++ b/internal/message/watchutil_test.go @@ -23,8 +23,9 @@ func TestHandleSubscriptionAlreadyClosed(t *testing.T) { var calls int message.HandleSubscription[string, any]( + message.Metadata{Runner: "demo", Resource: "demo"}, ch, - func(message.Update[string, any]) { calls++ }, + func(update message.Update[string, any], errChans chan error) { calls++ }, ) assert.Equal(t, 0, calls) } @@ -47,8 +48,9 @@ func TestHandleSubscriptionAlreadyInitialized(t *testing.T) { var storeCalls int var deleteCalls int message.HandleSubscription[string, any]( + message.Metadata{Runner: "demo", Resource: "demo"}, m.Subscribe(context.Background()), - func(update message.Update[string, any]) { + func(update message.Update[string, any], errChans chan error) { end() if update.Delete { deleteCalls++ @@ -121,7 +123,7 @@ func TestXdsIRUpdates(t *testing.T) { }() updates := 0 - message.HandleSubscription(snapshotC, func(u message.Update[string, *ir.Xds]) { + message.HandleSubscription(message.Metadata{Runner: "demo", Resource: "demo"}, snapshotC, func(u message.Update[string, *ir.Xds], errChans chan error) { end() if u.Key == "test" { updates += 1 diff --git a/internal/provider/kubernetes/controller.go b/internal/provider/kubernetes/controller.go index 663eb80a2995..6611ea8de501 100644 --- a/internal/provider/kubernetes/controller.go +++ b/internal/provider/kubernetes/controller.go @@ -30,7 +30,7 @@ import ( gwapiv1b1 "sigs.k8s.io/gateway-api/apis/v1beta1" mcsapi "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" - egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1" + "github.com/envoyproxy/gateway/api/v1alpha1" "github.com/envoyproxy/gateway/api/v1alpha1/validation" "github.com/envoyproxy/gateway/internal/envoygateway/config" "github.com/envoyproxy/gateway/internal/gatewayapi" @@ -69,7 +69,7 @@ type gatewayAPIReconciler struct { store *kubernetesProviderStore namespace string namespaceLabels []string - envoyGateway *egv1a1.EnvoyGateway + envoyGateway *v1alpha1.EnvoyGateway mergeGateways bool resources *message.ProviderResources @@ -94,7 +94,7 @@ func newGatewayAPIController(mgr manager.Manager, cfg *config.Server, su status. byNamespaceSelector := cfg.EnvoyGateway.Provider != nil && cfg.EnvoyGateway.Provider.Kubernetes != nil && cfg.EnvoyGateway.Provider.Kubernetes.Watch != nil && - cfg.EnvoyGateway.Provider.Kubernetes.Watch.Type == egv1a1.KubernetesWatchModeTypeNamespaceSelectors && + cfg.EnvoyGateway.Provider.Kubernetes.Watch.Type == v1alpha1.KubernetesWatchModeTypeNamespaceSelectors && len(cfg.EnvoyGateway.Provider.Kubernetes.Watch.NamespaceSelectors) != 0 if byNamespaceSelector { namespaceLabels = cfg.EnvoyGateway.Provider.Kubernetes.Watch.NamespaceSelectors @@ -138,10 +138,10 @@ type resourceMappings struct { allAssociatedRefGrants map[types.NamespacedName]*gwapiv1b1.ReferenceGrant // authenFilters is a map of AuthenticationFilters, where the key is the // namespaced name of the AuthenticationFilter. - authenFilters map[types.NamespacedName]*egv1a1.AuthenticationFilter + authenFilters map[types.NamespacedName]*v1alpha1.AuthenticationFilter // rateLimitFilters is a map of RateLimitFilters, where the key is the // namespaced name of the RateLimitFilter. - rateLimitFilters map[types.NamespacedName]*egv1a1.RateLimitFilter + rateLimitFilters map[types.NamespacedName]*v1alpha1.RateLimitFilter // extensionRefFilters is a map of filters managed by an extension. // The key is the namespaced name of the filter and the value is the // unstructured form of the resource. @@ -153,8 +153,8 @@ func newResourceMapping() *resourceMappings { allAssociatedNamespaces: map[string]struct{}{}, allAssociatedBackendRefs: map[gwapiv1.BackendObjectReference]struct{}{}, allAssociatedRefGrants: map[types.NamespacedName]*gwapiv1b1.ReferenceGrant{}, - authenFilters: map[types.NamespacedName]*egv1a1.AuthenticationFilter{}, - rateLimitFilters: map[types.NamespacedName]*egv1a1.RateLimitFilter{}, + authenFilters: map[types.NamespacedName]*v1alpha1.AuthenticationFilter{}, + rateLimitFilters: map[types.NamespacedName]*v1alpha1.RateLimitFilter{}, extensionRefFilters: map[types.NamespacedName]unstructured.Unstructured{}, } } @@ -279,7 +279,7 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, _ reconcile.Reques // Add all EnvoyPatchPolicies if r.envoyGateway.ExtensionAPIs != nil && r.envoyGateway.ExtensionAPIs.EnableEnvoyPatchPolicy { - envoyPatchPolicies := egv1a1.EnvoyPatchPolicyList{} + envoyPatchPolicies := v1alpha1.EnvoyPatchPolicyList{} if err := r.client.List(ctx, &envoyPatchPolicies); err != nil { return reconcile.Result{}, fmt.Errorf("error listing EnvoyPatchPolicies: %v", err) } @@ -288,13 +288,13 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, _ reconcile.Reques policy := policy // Discard Status to reduce memory consumption in watchable // It will be recomputed by the gateway-api layer - policy.Status = egv1a1.EnvoyPatchPolicyStatus{} + policy.Status = v1alpha1.EnvoyPatchPolicyStatus{} resourceTree.EnvoyPatchPolicies = append(resourceTree.EnvoyPatchPolicies, &policy) } } // Add all ClientTrafficPolicies - clientTrafficPolicies := egv1a1.ClientTrafficPolicyList{} + clientTrafficPolicies := v1alpha1.ClientTrafficPolicyList{} if err := r.client.List(ctx, &clientTrafficPolicies); err != nil { return reconcile.Result{}, fmt.Errorf("error listing ClientTrafficPolicies: %v", err) } @@ -303,13 +303,13 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, _ reconcile.Reques policy := policy // Discard Status to reduce memory consumption in watchable // It will be recomputed by the gateway-api layer - policy.Status = egv1a1.ClientTrafficPolicyStatus{} + policy.Status = v1alpha1.ClientTrafficPolicyStatus{} resourceTree.ClientTrafficPolicies = append(resourceTree.ClientTrafficPolicies, &policy) } // Add all BackendTrafficPolicies - backendTrafficPolicies := egv1a1.BackendTrafficPolicyList{} + backendTrafficPolicies := v1alpha1.BackendTrafficPolicyList{} if err := r.client.List(ctx, &backendTrafficPolicies); err != nil { return reconcile.Result{}, fmt.Errorf("error listing BackendTrafficPolicies: %v", err) } @@ -318,12 +318,12 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, _ reconcile.Reques policy := policy // Discard Status to reduce memory consumption in watchable // It will be recomputed by the gateway-api layer - policy.Status = egv1a1.BackendTrafficPolicyStatus{} + policy.Status = v1alpha1.BackendTrafficPolicyStatus{} resourceTree.BackendTrafficPolicies = append(resourceTree.BackendTrafficPolicies, &policy) } // Add all SecurityPolicies - securityPolicies := egv1a1.SecurityPolicyList{} + securityPolicies := v1alpha1.SecurityPolicyList{} if err := r.client.List(ctx, &securityPolicies); err != nil { return reconcile.Result{}, fmt.Errorf("error listing SecurityPolicies: %v", err) } @@ -332,7 +332,7 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, _ reconcile.Reques policy := policy // Discard Status to reduce memory consumption in watchable // It will be recomputed by the gateway-api layer - policy.Status = egv1a1.SecurityPolicyStatus{} + policy.Status = v1alpha1.SecurityPolicyStatus{} resourceTree.SecurityPolicies = append(resourceTree.SecurityPolicies, &policy) } @@ -1081,8 +1081,8 @@ func (r *gatewayAPIReconciler) addFinalizer(ctx context.Context, gc *gwapiv1.Gat func (r *gatewayAPIReconciler) subscribeAndUpdateStatus(ctx context.Context) { // Gateway object status updater go func() { - message.HandleSubscription(r.resources.GatewayStatuses.Subscribe(ctx), - func(update message.Update[types.NamespacedName, *gwapiv1.GatewayStatus]) { + message.HandleSubscription(message.Metadata{Runner: string(v1alpha1.LogComponentProviderRunner), Resource: "gateway-status"}, r.resources.GatewayStatuses.Subscribe(ctx), + func(update message.Update[types.NamespacedName, *gwapiv1.GatewayStatus], errChan chan error) { // skip delete updates. if update.Delete { return @@ -1091,6 +1091,7 @@ func (r *gatewayAPIReconciler) subscribeAndUpdateStatus(ctx context.Context) { gtw := new(gwapiv1.Gateway) if err := r.client.Get(ctx, update.Key, gtw); err != nil { r.log.Error(err, "gateway not found", "namespace", gtw.Namespace, "name", gtw.Name) + errChan <- err return } // Set the updated Status and call the status update @@ -1103,8 +1104,8 @@ func (r *gatewayAPIReconciler) subscribeAndUpdateStatus(ctx context.Context) { // HTTPRoute object status updater go func() { - message.HandleSubscription(r.resources.HTTPRouteStatuses.Subscribe(ctx), - func(update message.Update[types.NamespacedName, *gwapiv1.HTTPRouteStatus]) { + message.HandleSubscription(message.Metadata{Runner: string(v1alpha1.LogComponentProviderRunner), Resource: "httproute-status"}, r.resources.HTTPRouteStatuses.Subscribe(ctx), + func(update message.Update[types.NamespacedName, *gwapiv1.HTTPRouteStatus], errChan chan error) { // skip delete updates. if update.Delete { return @@ -1117,7 +1118,9 @@ func (r *gatewayAPIReconciler) subscribeAndUpdateStatus(ctx context.Context) { Mutator: status.MutatorFunc(func(obj client.Object) client.Object { h, ok := obj.(*gwapiv1.HTTPRoute) if !ok { - panic(fmt.Sprintf("unsupported object type %T", obj)) + err := fmt.Errorf("unsupported object type %T", obj) + errChan <- err + panic(err) } hCopy := h.DeepCopy() hCopy.Status.Parents = val.Parents @@ -1131,8 +1134,8 @@ func (r *gatewayAPIReconciler) subscribeAndUpdateStatus(ctx context.Context) { // GRPCRoute object status updater go func() { - message.HandleSubscription(r.resources.GRPCRouteStatuses.Subscribe(ctx), - func(update message.Update[types.NamespacedName, *gwapiv1a2.GRPCRouteStatus]) { + message.HandleSubscription(message.Metadata{Runner: string(v1alpha1.LogComponentProviderRunner), Resource: "grpcroute-status"}, r.resources.GRPCRouteStatuses.Subscribe(ctx), + func(update message.Update[types.NamespacedName, *gwapiv1a2.GRPCRouteStatus], errChan chan error) { // skip delete updates. if update.Delete { return @@ -1145,7 +1148,9 @@ func (r *gatewayAPIReconciler) subscribeAndUpdateStatus(ctx context.Context) { Mutator: status.MutatorFunc(func(obj client.Object) client.Object { h, ok := obj.(*gwapiv1a2.GRPCRoute) if !ok { - panic(fmt.Sprintf("unsupported object type %T", obj)) + err := fmt.Errorf("unsupported object type %T", obj) + errChan <- err + panic(err) } hCopy := h.DeepCopy() hCopy.Status.Parents = val.Parents @@ -1159,8 +1164,8 @@ func (r *gatewayAPIReconciler) subscribeAndUpdateStatus(ctx context.Context) { // TLSRoute object status updater go func() { - message.HandleSubscription(r.resources.TLSRouteStatuses.Subscribe(ctx), - func(update message.Update[types.NamespacedName, *gwapiv1a2.TLSRouteStatus]) { + message.HandleSubscription(message.Metadata{Runner: string(v1alpha1.LogComponentProviderRunner), Resource: "tlsroute-status"}, r.resources.TLSRouteStatuses.Subscribe(ctx), + func(update message.Update[types.NamespacedName, *gwapiv1a2.TLSRouteStatus], errChan chan error) { // skip delete updates. if update.Delete { return @@ -1173,7 +1178,9 @@ func (r *gatewayAPIReconciler) subscribeAndUpdateStatus(ctx context.Context) { Mutator: status.MutatorFunc(func(obj client.Object) client.Object { t, ok := obj.(*gwapiv1a2.TLSRoute) if !ok { - panic(fmt.Sprintf("unsupported object type %T", obj)) + err := fmt.Errorf("unsupported object type %T", obj) + errChan <- err + panic(err) } tCopy := t.DeepCopy() tCopy.Status.Parents = val.Parents @@ -1187,8 +1194,8 @@ func (r *gatewayAPIReconciler) subscribeAndUpdateStatus(ctx context.Context) { // TCPRoute object status updater go func() { - message.HandleSubscription(r.resources.TCPRouteStatuses.Subscribe(ctx), - func(update message.Update[types.NamespacedName, *gwapiv1a2.TCPRouteStatus]) { + message.HandleSubscription(message.Metadata{Runner: string(v1alpha1.LogComponentProviderRunner), Resource: "tcproute-status"}, r.resources.TCPRouteStatuses.Subscribe(ctx), + func(update message.Update[types.NamespacedName, *gwapiv1a2.TCPRouteStatus], errChan chan error) { // skip delete updates. if update.Delete { return @@ -1201,7 +1208,9 @@ func (r *gatewayAPIReconciler) subscribeAndUpdateStatus(ctx context.Context) { Mutator: status.MutatorFunc(func(obj client.Object) client.Object { t, ok := obj.(*gwapiv1a2.TCPRoute) if !ok { - panic(fmt.Sprintf("unsupported object type %T", obj)) + err := fmt.Errorf("unsupported object type %T", obj) + errChan <- err + panic(err) } tCopy := t.DeepCopy() tCopy.Status.Parents = val.Parents @@ -1215,8 +1224,8 @@ func (r *gatewayAPIReconciler) subscribeAndUpdateStatus(ctx context.Context) { // UDPRoute object status updater go func() { - message.HandleSubscription(r.resources.UDPRouteStatuses.Subscribe(ctx), - func(update message.Update[types.NamespacedName, *gwapiv1a2.UDPRouteStatus]) { + message.HandleSubscription(message.Metadata{Runner: string(v1alpha1.LogComponentProviderRunner), Resource: "udproute-status"}, r.resources.UDPRouteStatuses.Subscribe(ctx), + func(update message.Update[types.NamespacedName, *gwapiv1a2.UDPRouteStatus], errChan chan error) { // skip delete updates. if update.Delete { return @@ -1229,7 +1238,9 @@ func (r *gatewayAPIReconciler) subscribeAndUpdateStatus(ctx context.Context) { Mutator: status.MutatorFunc(func(obj client.Object) client.Object { t, ok := obj.(*gwapiv1a2.UDPRoute) if !ok { - panic(fmt.Sprintf("unsupported object type %T", obj)) + err := fmt.Errorf("unsupported object type %T", obj) + errChan <- err + panic(err) } tCopy := t.DeepCopy() tCopy.Status.Parents = val.Parents @@ -1243,8 +1254,8 @@ func (r *gatewayAPIReconciler) subscribeAndUpdateStatus(ctx context.Context) { // EnvoyPatchPolicy object status updater go func() { - message.HandleSubscription(r.resources.EnvoyPatchPolicyStatuses.Subscribe(ctx), - func(update message.Update[types.NamespacedName, *egv1a1.EnvoyPatchPolicyStatus]) { + message.HandleSubscription(message.Metadata{Runner: string(v1alpha1.LogComponentProviderRunner), Resource: "envoypatchpolicy-status"}, r.resources.EnvoyPatchPolicyStatuses.Subscribe(ctx), + func(update message.Update[types.NamespacedName, *v1alpha1.EnvoyPatchPolicyStatus], errChan chan error) { // skip delete updates. if update.Delete { return @@ -1253,11 +1264,13 @@ func (r *gatewayAPIReconciler) subscribeAndUpdateStatus(ctx context.Context) { val := update.Value r.statusUpdater.Send(status.Update{ NamespacedName: key, - Resource: new(egv1a1.EnvoyPatchPolicy), + Resource: new(v1alpha1.EnvoyPatchPolicy), Mutator: status.MutatorFunc(func(obj client.Object) client.Object { - t, ok := obj.(*egv1a1.EnvoyPatchPolicy) + t, ok := obj.(*v1alpha1.EnvoyPatchPolicy) if !ok { - panic(fmt.Sprintf("unsupported object type %T", obj)) + err := fmt.Errorf("unsupported object type %T", obj) + errChan <- err + panic(err) } tCopy := t.DeepCopy() tCopy.Status = *val @@ -1271,8 +1284,8 @@ func (r *gatewayAPIReconciler) subscribeAndUpdateStatus(ctx context.Context) { // ClientTrafficPolicy object status updater go func() { - message.HandleSubscription(r.resources.ClientTrafficPolicyStatuses.Subscribe(ctx), - func(update message.Update[types.NamespacedName, *egv1a1.ClientTrafficPolicyStatus]) { + message.HandleSubscription(message.Metadata{Runner: string(v1alpha1.LogComponentProviderRunner), Resource: "clienttrafficpolicy-status"}, r.resources.ClientTrafficPolicyStatuses.Subscribe(ctx), + func(update message.Update[types.NamespacedName, *v1alpha1.ClientTrafficPolicyStatus], errChan chan error) { // skip delete updates. if update.Delete { return @@ -1281,11 +1294,13 @@ func (r *gatewayAPIReconciler) subscribeAndUpdateStatus(ctx context.Context) { val := update.Value r.statusUpdater.Send(status.Update{ NamespacedName: key, - Resource: new(egv1a1.ClientTrafficPolicy), + Resource: new(v1alpha1.ClientTrafficPolicy), Mutator: status.MutatorFunc(func(obj client.Object) client.Object { - t, ok := obj.(*egv1a1.ClientTrafficPolicy) + t, ok := obj.(*v1alpha1.ClientTrafficPolicy) if !ok { - panic(fmt.Sprintf("unsupported object type %T", obj)) + err := fmt.Errorf("unsupported object type %T", obj) + errChan <- err + panic(err) } tCopy := t.DeepCopy() tCopy.Status = *val @@ -1299,8 +1314,8 @@ func (r *gatewayAPIReconciler) subscribeAndUpdateStatus(ctx context.Context) { // BackendTrafficPolicy object status updater go func() { - message.HandleSubscription(r.resources.BackendTrafficPolicyStatuses.Subscribe(ctx), - func(update message.Update[types.NamespacedName, *egv1a1.BackendTrafficPolicyStatus]) { + message.HandleSubscription(message.Metadata{Runner: string(v1alpha1.LogComponentProviderRunner), Resource: "backendtrafficpolicy-status"}, r.resources.BackendTrafficPolicyStatuses.Subscribe(ctx), + func(update message.Update[types.NamespacedName, *v1alpha1.BackendTrafficPolicyStatus], errChan chan error) { // skip delete updates. if update.Delete { return @@ -1309,11 +1324,13 @@ func (r *gatewayAPIReconciler) subscribeAndUpdateStatus(ctx context.Context) { val := update.Value r.statusUpdater.Send(status.Update{ NamespacedName: key, - Resource: new(egv1a1.BackendTrafficPolicy), + Resource: new(v1alpha1.BackendTrafficPolicy), Mutator: status.MutatorFunc(func(obj client.Object) client.Object { - t, ok := obj.(*egv1a1.BackendTrafficPolicy) + t, ok := obj.(*v1alpha1.BackendTrafficPolicy) if !ok { - panic(fmt.Sprintf("unsupported object type %T", obj)) + err := fmt.Errorf("unsupported object type %T", obj) + errChan <- err + panic(err) } tCopy := t.DeepCopy() tCopy.Status = *val @@ -1347,7 +1364,7 @@ func (r *gatewayAPIReconciler) watchResources(ctx context.Context, mgr manager.M epPredicates = append(epPredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) } if err := c.Watch( - source.Kind(mgr.GetCache(), &egv1a1.EnvoyProxy{}), + source.Kind(mgr.GetCache(), &v1alpha1.EnvoyProxy{}), handler.EnqueueRequestsFromMapFunc(r.enqueueClass), epPredicates..., ); err != nil { @@ -1571,7 +1588,7 @@ func (r *gatewayAPIReconciler) watchResources(ctx context.Context, mgr manager.M afPredicates = append(afPredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) } if err := c.Watch( - source.Kind(mgr.GetCache(), &egv1a1.AuthenticationFilter{}), + source.Kind(mgr.GetCache(), &v1alpha1.AuthenticationFilter{}), handler.EnqueueRequestsFromMapFunc(r.enqueueClass), afPredicates..., ); err != nil { @@ -1587,7 +1604,7 @@ func (r *gatewayAPIReconciler) watchResources(ctx context.Context, mgr manager.M } // Watch RateLimitFilter CRUDs and enqueue associated HTTPRoute objects. if err := c.Watch( - source.Kind(mgr.GetCache(), &egv1a1.RateLimitFilter{}), + source.Kind(mgr.GetCache(), &v1alpha1.RateLimitFilter{}), handler.EnqueueRequestsFromMapFunc(r.enqueueClass), rfPredicates..., ); err != nil { @@ -1602,7 +1619,7 @@ func (r *gatewayAPIReconciler) watchResources(ctx context.Context, mgr manager.M if r.envoyGateway.ExtensionAPIs != nil && r.envoyGateway.ExtensionAPIs.EnableEnvoyPatchPolicy { // Watch EnvoyPatchPolicy CRUDs if err := c.Watch( - source.Kind(mgr.GetCache(), &egv1a1.EnvoyPatchPolicy{}), + source.Kind(mgr.GetCache(), &v1alpha1.EnvoyPatchPolicy{}), handler.EnqueueRequestsFromMapFunc(r.enqueueClass), eppPredicates..., ); err != nil { @@ -1617,7 +1634,7 @@ func (r *gatewayAPIReconciler) watchResources(ctx context.Context, mgr manager.M } if err := c.Watch( - source.Kind(mgr.GetCache(), &egv1a1.ClientTrafficPolicy{}), + source.Kind(mgr.GetCache(), &v1alpha1.ClientTrafficPolicy{}), handler.EnqueueRequestsFromMapFunc(r.enqueueClass), ctpPredicates..., ); err != nil { @@ -1631,7 +1648,7 @@ func (r *gatewayAPIReconciler) watchResources(ctx context.Context, mgr manager.M } if err := c.Watch( - source.Kind(mgr.GetCache(), &egv1a1.BackendTrafficPolicy{}), + source.Kind(mgr.GetCache(), &v1alpha1.BackendTrafficPolicy{}), handler.EnqueueRequestsFromMapFunc(r.enqueueClass), btpPredicates..., ); err != nil { @@ -1666,7 +1683,7 @@ func (r *gatewayAPIReconciler) enqueueClass(_ context.Context, _ client.Object) } func (r *gatewayAPIReconciler) hasManagedClass(obj client.Object) bool { - ep, ok := obj.(*egv1a1.EnvoyProxy) + ep, ok := obj.(*v1alpha1.EnvoyProxy) if !ok { panic(fmt.Sprintf("unsupported object type %T", obj)) } @@ -1704,7 +1721,7 @@ func (r *gatewayAPIReconciler) processParamsRef(ctx context.Context, gc *gwapiv1 return fmt.Errorf("unsupported parametersRef for gatewayclass %s", gc.Name) } - epList := new(egv1a1.EnvoyProxyList) + epList := new(v1alpha1.EnvoyProxyList) // The EnvoyProxy must be in the same namespace as EG. if err := r.client.List(ctx, epList, &client.ListOptions{Namespace: r.namespace}); err != nil { diff --git a/internal/xds/server/runner/runner.go b/internal/xds/server/runner/runner.go index 7147eb200a16..b1879ccb2406 100644 --- a/internal/xds/server/runner/runner.go +++ b/internal/xds/server/runner/runner.go @@ -133,8 +133,8 @@ func registerServer(srv serverv3.Server, g *grpc.Server) { func (r *Runner) subscribeAndTranslate(ctx context.Context) { // Subscribe to resources - message.HandleSubscription(r.Xds.Subscribe(ctx), - func(update message.Update[string, *xdstypes.ResourceVersionTable]) { + message.HandleSubscription(message.Metadata{Runner: string(v1alpha1.LogComponentXdsServerRunner), Resource: "xds"}, r.Xds.Subscribe(ctx), + func(update message.Update[string, *xdstypes.ResourceVersionTable], errChan chan error) { key := update.Key val := update.Value @@ -145,6 +145,7 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) { } else if val != nil && val.XdsResources != nil { if r.cache == nil { r.Logger.Error(err, "failed to init snapshot cache") + errChan <- err } else { // Update snapshot cache err = r.cache.GenerateNewSnapshot(key, val.XdsResources) @@ -152,6 +153,7 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) { } if err != nil { r.Logger.Error(err, "failed to generate a snapshot") + errChan <- err } }, ) diff --git a/internal/xds/translator/runner/runner.go b/internal/xds/translator/runner/runner.go index 17ff6c8619a7..25fd383a91fa 100644 --- a/internal/xds/translator/runner/runner.go +++ b/internal/xds/translator/runner/runner.go @@ -49,8 +49,8 @@ func (r *Runner) Start(ctx context.Context) (err error) { func (r *Runner) subscribeAndTranslate(ctx context.Context) { // Subscribe to resources - message.HandleSubscription(r.XdsIR.Subscribe(ctx), - func(update message.Update[string, *ir.Xds]) { + message.HandleSubscription(message.Metadata{Runner: string(v1alpha1.LogComponentXdsTranslatorRunner), Resource: "xds-ir"}, r.XdsIR.Subscribe(ctx), + func(update message.Update[string, *ir.Xds], errChan chan error) { r.Logger.Info("received an update") key := update.Key val := update.Value @@ -80,6 +80,7 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) { result, err := t.Translate(val) if err != nil { r.Logger.Error(err, "failed to translate xds ir") + errChan <- err return }