diff --git a/internal/gatewayapi/runner/runner.go b/internal/gatewayapi/runner/runner.go index ca72bbfe1e3..2b34b8ad33f 100644 --- a/internal/gatewayapi/runner/runner.go +++ b/internal/gatewayapi/runner/runner.go @@ -9,6 +9,7 @@ import ( "context" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" v1 "sigs.k8s.io/gateway-api/apis/v1" "github.com/envoyproxy/gateway/api/v1alpha1" @@ -56,15 +57,23 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) { // so when a delete is triggered, delete all IR keys if update.Delete || val == nil { r.deleteAllIRKeys() + r.deleteAllStatusKeys() return } - var curKeys, newKeys []string + // IR keys for watchable + var curIRKeys, newIRKeys []string + // Get current IR keys for key := range r.InfraIR.LoadAll() { - curKeys = append(curKeys, key) + curIRKeys = append(curIRKeys, key) } + // Get all status keys from watchable and save them in this StatusesToDelete structure. + // Iterating through the controller resources, any valid keys will be removed from statusesToDelete. + // Remaining keys will be deleted from watchable before we exit this function. + statusesToDelete := r.getAllStatuses() + for _, resources := range *val { // Translate and publish IRs. t := &gatewayapi.Translator{ @@ -95,7 +104,7 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) { errChan <- err } else { r.InfraIR.Store(key, val) - newKeys = append(newKeys, key) + newIRKeys = append(newIRKeys, key) } } @@ -114,61 +123,75 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) { gateway := gateway key := utils.NamespacedName(gateway) r.ProviderResources.GatewayStatuses.Store(key, &gateway.Status) + delete(statusesToDelete.GatewayStatusKeys, key) } for _, httpRoute := range result.HTTPRoutes { httpRoute := httpRoute key := utils.NamespacedName(httpRoute) r.ProviderResources.HTTPRouteStatuses.Store(key, &httpRoute.Status) + delete(statusesToDelete.HTTPRouteStatusKeys, key) } for _, grpcRoute := range result.GRPCRoutes { grpcRoute := grpcRoute key := utils.NamespacedName(grpcRoute) r.ProviderResources.GRPCRouteStatuses.Store(key, &grpcRoute.Status) + delete(statusesToDelete.GRPCRouteStatusKeys, key) } - for _, tlsRoute := range result.TLSRoutes { tlsRoute := tlsRoute key := utils.NamespacedName(tlsRoute) r.ProviderResources.TLSRouteStatuses.Store(key, &tlsRoute.Status) + delete(statusesToDelete.TLSRouteStatusKeys, key) } for _, tcpRoute := range result.TCPRoutes { tcpRoute := tcpRoute key := utils.NamespacedName(tcpRoute) r.ProviderResources.TCPRouteStatuses.Store(key, &tcpRoute.Status) + delete(statusesToDelete.TCPRouteStatusKeys, key) } for _, udpRoute := range result.UDPRoutes { udpRoute := udpRoute key := utils.NamespacedName(udpRoute) r.ProviderResources.UDPRouteStatuses.Store(key, &udpRoute.Status) + delete(statusesToDelete.UDPRouteStatusKeys, key) + } + for _, backendTLSPolicy := range result.BackendTLSPolicies { + backendTLSPolicy := backendTLSPolicy + key := utils.NamespacedName(backendTLSPolicy) + r.ProviderResources.BackendTLSPolicyStatuses.Store(key, &backendTLSPolicy.Status) + delete(statusesToDelete.BackendTLSPolicyStatusKeys, key) } + for _, clientTrafficPolicy := range result.ClientTrafficPolicies { clientTrafficPolicy := clientTrafficPolicy key := utils.NamespacedName(clientTrafficPolicy) r.ProviderResources.ClientTrafficPolicyStatuses.Store(key, &clientTrafficPolicy.Status) + delete(statusesToDelete.ClientTrafficPolicyStatusKeys, key) } for _, backendTrafficPolicy := range result.BackendTrafficPolicies { backendTrafficPolicy := backendTrafficPolicy key := utils.NamespacedName(backendTrafficPolicy) r.ProviderResources.BackendTrafficPolicyStatuses.Store(key, &backendTrafficPolicy.Status) + delete(statusesToDelete.BackendTrafficPolicyStatusKeys, key) } for _, securityPolicy := range result.SecurityPolicies { securityPolicy := securityPolicy key := utils.NamespacedName(securityPolicy) r.ProviderResources.SecurityPolicyStatuses.Store(key, &securityPolicy.Status) - } - for _, backendTLSPolicy := range result.BackendTLSPolicies { - backendTLSPolicy := backendTLSPolicy - key := utils.NamespacedName(backendTLSPolicy) - r.ProviderResources.BackendTLSPolicyStatuses.Store(key, &backendTLSPolicy.Status) + delete(statusesToDelete.SecurityPolicyStatusKeys, key) } } - // Delete keys + + // Delete IR keys // There is a 1:1 mapping between infra and xds IR keys - delKeys := getIRKeysToDelete(curKeys, newKeys) + delKeys := getIRKeysToDelete(curIRKeys, newIRKeys) for _, key := range delKeys { r.InfraIR.Delete(key) r.XdsIR.Delete(key) } + + // Delete status keys + r.deleteStatusKeys(statusesToDelete) }, ) r.Logger.Info("shutting down") @@ -182,6 +205,153 @@ func (r *Runner) deleteAllIRKeys() { } } +type StatusesToDelete struct { + GatewayStatusKeys map[types.NamespacedName]bool + HTTPRouteStatusKeys map[types.NamespacedName]bool + GRPCRouteStatusKeys map[types.NamespacedName]bool + TLSRouteStatusKeys map[types.NamespacedName]bool + TCPRouteStatusKeys map[types.NamespacedName]bool + UDPRouteStatusKeys map[types.NamespacedName]bool + BackendTLSPolicyStatusKeys map[types.NamespacedName]bool + + ClientTrafficPolicyStatusKeys map[types.NamespacedName]bool + BackendTrafficPolicyStatusKeys map[types.NamespacedName]bool + SecurityPolicyStatusKeys map[types.NamespacedName]bool +} + +func (r *Runner) getAllStatuses() *StatusesToDelete { + // Maps storing status keys to be deleted + ds := &StatusesToDelete{ + GatewayStatusKeys: make(map[types.NamespacedName]bool), + HTTPRouteStatusKeys: make(map[types.NamespacedName]bool), + GRPCRouteStatusKeys: make(map[types.NamespacedName]bool), + TLSRouteStatusKeys: make(map[types.NamespacedName]bool), + TCPRouteStatusKeys: make(map[types.NamespacedName]bool), + UDPRouteStatusKeys: make(map[types.NamespacedName]bool), + + ClientTrafficPolicyStatusKeys: make(map[types.NamespacedName]bool), + BackendTrafficPolicyStatusKeys: make(map[types.NamespacedName]bool), + SecurityPolicyStatusKeys: make(map[types.NamespacedName]bool), + BackendTLSPolicyStatusKeys: make(map[types.NamespacedName]bool), + } + + // Get current status keys + for key := range r.ProviderResources.GatewayStatuses.LoadAll() { + ds.GatewayStatusKeys[key] = true + } + for key := range r.ProviderResources.HTTPRouteStatuses.LoadAll() { + ds.HTTPRouteStatusKeys[key] = true + } + for key := range r.ProviderResources.GRPCRouteStatuses.LoadAll() { + ds.GRPCRouteStatusKeys[key] = true + } + for key := range r.ProviderResources.TLSRouteStatuses.LoadAll() { + ds.TLSRouteStatusKeys[key] = true + } + for key := range r.ProviderResources.TCPRouteStatuses.LoadAll() { + ds.TCPRouteStatusKeys[key] = true + } + for key := range r.ProviderResources.UDPRouteStatuses.LoadAll() { + ds.UDPRouteStatusKeys[key] = true + } + for key := range r.ProviderResources.BackendTLSPolicyStatuses.LoadAll() { + ds.BackendTLSPolicyStatusKeys[key] = true + } + + for key := range r.ProviderResources.ClientTrafficPolicyStatuses.LoadAll() { + ds.ClientTrafficPolicyStatusKeys[key] = true + } + for key := range r.ProviderResources.BackendTrafficPolicyStatuses.LoadAll() { + ds.BackendTrafficPolicyStatusKeys[key] = true + } + for key := range r.ProviderResources.SecurityPolicyStatuses.LoadAll() { + ds.SecurityPolicyStatusKeys[key] = true + } + + return ds +} + +func (r *Runner) deleteStatusKeys(ds *StatusesToDelete) { + for key := range ds.GatewayStatusKeys { + r.ProviderResources.GatewayStatuses.Delete(key) + delete(ds.GatewayStatusKeys, key) + } + for key := range ds.HTTPRouteStatusKeys { + r.ProviderResources.HTTPRouteStatuses.Delete(key) + delete(ds.HTTPRouteStatusKeys, key) + } + for key := range ds.GRPCRouteStatusKeys { + r.ProviderResources.GRPCRouteStatuses.Delete(key) + delete(ds.GRPCRouteStatusKeys, key) + } + for key := range ds.TLSRouteStatusKeys { + r.ProviderResources.TLSRouteStatuses.Delete(key) + delete(ds.TLSRouteStatusKeys, key) + } + for key := range ds.TCPRouteStatusKeys { + r.ProviderResources.TCPRouteStatuses.Delete(key) + delete(ds.TCPRouteStatusKeys, key) + } + for key := range ds.UDPRouteStatusKeys { + r.ProviderResources.UDPRouteStatuses.Delete(key) + delete(ds.UDPRouteStatusKeys, key) + } + + for key := range ds.ClientTrafficPolicyStatusKeys { + r.ProviderResources.ClientTrafficPolicyStatuses.Delete(key) + delete(ds.ClientTrafficPolicyStatusKeys, key) + } + for key := range ds.BackendTrafficPolicyStatusKeys { + r.ProviderResources.BackendTrafficPolicyStatuses.Delete(key) + delete(ds.BackendTrafficPolicyStatusKeys, key) + } + for key := range ds.SecurityPolicyStatusKeys { + r.ProviderResources.SecurityPolicyStatuses.Delete(key) + delete(ds.SecurityPolicyStatusKeys, key) + } + for key := range ds.BackendTLSPolicyStatusKeys { + r.ProviderResources.BackendTLSPolicyStatuses.Delete(key) + delete(ds.BackendTLSPolicyStatusKeys, key) + } +} + +// deleteAllStatusKeys deletes all status keys stored by the subscriber. +func (r *Runner) deleteAllStatusKeys() { + // Fields of GatewayAPIStatuses + for key := range r.ProviderResources.GatewayStatuses.LoadAll() { + r.ProviderResources.GatewayStatuses.Delete(key) + } + for key := range r.ProviderResources.HTTPRouteStatuses.LoadAll() { + r.ProviderResources.HTTPRouteStatuses.Delete(key) + } + for key := range r.ProviderResources.GRPCRouteStatuses.LoadAll() { + r.ProviderResources.GRPCRouteStatuses.Delete(key) + } + for key := range r.ProviderResources.TLSRouteStatuses.LoadAll() { + r.ProviderResources.TLSRouteStatuses.Delete(key) + } + for key := range r.ProviderResources.TCPRouteStatuses.LoadAll() { + r.ProviderResources.TCPRouteStatuses.Delete(key) + } + for key := range r.ProviderResources.UDPRouteStatuses.LoadAll() { + r.ProviderResources.UDPRouteStatuses.Delete(key) + } + for key := range r.ProviderResources.BackendTLSPolicyStatuses.LoadAll() { + r.ProviderResources.BackendTLSPolicyStatuses.Delete(key) + } + + // Fields of PolicyStatuses + for key := range r.ProviderResources.ClientTrafficPolicyStatuses.LoadAll() { + r.ProviderResources.ClientTrafficPolicyStatuses.Delete(key) + } + for key := range r.ProviderResources.BackendTrafficPolicyStatuses.LoadAll() { + r.ProviderResources.BackendTrafficPolicyStatuses.Delete(key) + } + for key := range r.ProviderResources.SecurityPolicyStatuses.LoadAll() { + r.ProviderResources.SecurityPolicyStatuses.Delete(key) + } +} + // getIRKeysToDelete returns the list of IR keys to delete // based on the difference between the current keys and the // new keys parameters passed to the function. diff --git a/internal/gatewayapi/runner/runner_test.go b/internal/gatewayapi/runner/runner_test.go index b159933b508..772f0372a3c 100644 --- a/internal/gatewayapi/runner/runner_test.go +++ b/internal/gatewayapi/runner/runner_test.go @@ -13,6 +13,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/types" + gwapiv1 "sigs.k8s.io/gateway-api/apis/v1" + gwapiv1a2 "sigs.k8s.io/gateway-api/apis/v1alpha2" egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1" "github.com/envoyproxy/gateway/internal/envoygateway/config" @@ -104,3 +107,175 @@ func TestGetIRKeysToDelete(t *testing.T) { }) } } + +func TestDeleteStatusKeys(t *testing.T) { + // Setup + pResources := new(message.ProviderResources) + xdsIR := new(message.XdsIR) + infraIR := new(message.InfraIR) + cfg, err := config.New() + require.NoError(t, err) + r := New(&Config{ + Server: *cfg, + ProviderResources: pResources, + XdsIR: xdsIR, + InfraIR: infraIR, + ExtensionManager: testutils.NewManager(egv1a1.ExtensionManager{}), + }) + ctx := context.Background() + + // Start + err = r.Start(ctx) + require.NoError(t, err) + + // A new status gets stored + keys := []types.NamespacedName{ + { + Name: "test1", + Namespace: "test-namespace", + }, + { + Name: "test2", + Namespace: "test-namespace", + }, + { + Name: "test3", + Namespace: "test-namespace", + }, + { + Name: "test4", + Namespace: "test-namespace", + }, + { + Name: "test5", + Namespace: "test-namespace", + }, + { + Name: "test6", + Namespace: "test-namespace", + }, + { + Name: "test7", + Namespace: "test-namespace", + }, + } + + r.ProviderResources.GatewayStatuses.Store(keys[0], &gwapiv1.GatewayStatus{}) + r.ProviderResources.HTTPRouteStatuses.Store(keys[1], &gwapiv1.HTTPRouteStatus{}) + r.ProviderResources.GRPCRouteStatuses.Store(keys[2], &gwapiv1a2.GRPCRouteStatus{}) + r.ProviderResources.TLSRouteStatuses.Store(keys[3], &gwapiv1a2.TLSRouteStatus{}) + r.ProviderResources.TCPRouteStatuses.Store(keys[4], &gwapiv1a2.TCPRouteStatus{}) + r.ProviderResources.UDPRouteStatuses.Store(keys[5], &gwapiv1a2.UDPRouteStatus{}) + r.ProviderResources.UDPRouteStatuses.Store(keys[6], &gwapiv1a2.UDPRouteStatus{}) + + // Checks that the keys are successfully stored to DeletableStatus and watchable maps + ds := r.getAllStatuses() + + require.True(t, ds.GatewayStatusKeys[keys[0]]) + require.True(t, ds.HTTPRouteStatusKeys[keys[1]]) + require.True(t, ds.GRPCRouteStatusKeys[keys[2]]) + require.True(t, ds.TLSRouteStatusKeys[keys[3]]) + require.True(t, ds.TCPRouteStatusKeys[keys[4]]) + require.True(t, ds.UDPRouteStatusKeys[keys[5]]) + require.True(t, ds.UDPRouteStatusKeys[keys[6]]) + + require.Equal(t, 1, r.ProviderResources.GatewayStatuses.Len()) + require.Equal(t, 1, r.ProviderResources.HTTPRouteStatuses.Len()) + require.Equal(t, 1, r.ProviderResources.GRPCRouteStatuses.Len()) + require.Equal(t, 1, r.ProviderResources.TLSRouteStatuses.Len()) + require.Equal(t, 1, r.ProviderResources.TCPRouteStatuses.Len()) + require.Equal(t, 2, r.ProviderResources.UDPRouteStatuses.Len()) + + // Delete all keys except the last UDPRouteStatus key + delete(ds.UDPRouteStatusKeys, keys[6]) + r.deleteStatusKeys(ds) + + require.Equal(t, 0, r.ProviderResources.GatewayStatuses.Len()) + require.Equal(t, 0, r.ProviderResources.HTTPRouteStatuses.Len()) + require.Equal(t, 0, r.ProviderResources.GRPCRouteStatuses.Len()) + require.Equal(t, 0, r.ProviderResources.TLSRouteStatuses.Len()) + require.Equal(t, 0, r.ProviderResources.TCPRouteStatuses.Len()) + require.Equal(t, 1, r.ProviderResources.UDPRouteStatuses.Len()) +} + +func TestDeleteAllStatusKeys(t *testing.T) { + // Setup + pResources := new(message.ProviderResources) + xdsIR := new(message.XdsIR) + infraIR := new(message.InfraIR) + cfg, err := config.New() + require.NoError(t, err) + r := New(&Config{ + Server: *cfg, + ProviderResources: pResources, + XdsIR: xdsIR, + InfraIR: infraIR, + ExtensionManager: testutils.NewManager(egv1a1.ExtensionManager{}), + }) + ctx := context.Background() + + // Start + err = r.Start(ctx) + require.NoError(t, err) + + // A new status gets stored + keys := []types.NamespacedName{ + { + Name: "test1", + Namespace: "test-namespace", + }, + { + Name: "test2", + Namespace: "test-namespace", + }, + { + Name: "test3", + Namespace: "test-namespace", + }, + { + Name: "test4", + Namespace: "test-namespace", + }, + { + Name: "test5", + Namespace: "test-namespace", + }, + { + Name: "test6", + Namespace: "test-namespace", + }, + } + + r.ProviderResources.GatewayStatuses.Store(keys[0], &gwapiv1.GatewayStatus{}) + r.ProviderResources.HTTPRouteStatuses.Store(keys[1], &gwapiv1.HTTPRouteStatus{}) + r.ProviderResources.GRPCRouteStatuses.Store(keys[2], &gwapiv1a2.GRPCRouteStatus{}) + r.ProviderResources.TLSRouteStatuses.Store(keys[3], &gwapiv1a2.TLSRouteStatus{}) + r.ProviderResources.TCPRouteStatuses.Store(keys[4], &gwapiv1a2.TCPRouteStatus{}) + r.ProviderResources.UDPRouteStatuses.Store(keys[5], &gwapiv1a2.UDPRouteStatus{}) + + // Checks that the keys are successfully stored to DeletableStatus and watchable maps + ds := r.getAllStatuses() + + require.True(t, ds.GatewayStatusKeys[keys[0]]) + require.True(t, ds.HTTPRouteStatusKeys[keys[1]]) + require.True(t, ds.GRPCRouteStatusKeys[keys[2]]) + require.True(t, ds.TLSRouteStatusKeys[keys[3]]) + require.True(t, ds.TCPRouteStatusKeys[keys[4]]) + require.True(t, ds.UDPRouteStatusKeys[keys[5]]) + + require.Equal(t, 1, r.ProviderResources.GatewayStatuses.Len()) + require.Equal(t, 1, r.ProviderResources.HTTPRouteStatuses.Len()) + require.Equal(t, 1, r.ProviderResources.GRPCRouteStatuses.Len()) + require.Equal(t, 1, r.ProviderResources.TLSRouteStatuses.Len()) + require.Equal(t, 1, r.ProviderResources.TCPRouteStatuses.Len()) + require.Equal(t, 1, r.ProviderResources.UDPRouteStatuses.Len()) + + // Delete all keys + r.deleteAllStatusKeys() + require.Equal(t, 0, r.ProviderResources.GatewayStatuses.Len()) + require.Equal(t, 0, r.ProviderResources.HTTPRouteStatuses.Len()) + require.Equal(t, 0, r.ProviderResources.GRPCRouteStatuses.Len()) + require.Equal(t, 0, r.ProviderResources.TLSRouteStatuses.Len()) + require.Equal(t, 0, r.ProviderResources.TCPRouteStatuses.Len()) + require.Equal(t, 0, r.ProviderResources.UDPRouteStatuses.Len()) +} diff --git a/internal/xds/translator/runner/runner.go b/internal/xds/translator/runner/runner.go index bae2a9fe939..114b00b4550 100644 --- a/internal/xds/translator/runner/runner.go +++ b/internal/xds/translator/runner/runner.go @@ -90,6 +90,14 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) { return } + // Get all status keys from watchable and save them in the map statusesToDelete. + // Iterating through result.EnvoyPatchPolicyStatuses, any valid keys will be removed from statusesToDelete. + // Remaining keys will be deleted from watchable before we exit this function. + statusesToDelete := make(map[ktypes.NamespacedName]bool) + for key := range r.ProviderResources.EnvoyPatchPolicyStatuses.LoadAll() { + statusesToDelete[key] = true + } + // Publish EnvoyPatchPolicyStatus for _, e := range result.EnvoyPatchPolicyStatuses { key := ktypes.NamespacedName{ @@ -97,12 +105,18 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) { Namespace: e.Namespace, } r.ProviderResources.EnvoyPatchPolicyStatuses.Store(key, e.Status) + delete(statusesToDelete, key) } // Discard the EnvoyPatchPolicyStatuses to reduce memory footprint result.EnvoyPatchPolicyStatuses = nil // Publish r.Xds.Store(key, result) + + // Delete all the deletable status keys + for key := range statusesToDelete { + r.ProviderResources.EnvoyPatchPolicyStatuses.Delete(key) + } } }, ) diff --git a/internal/xds/translator/runner/runner_test.go b/internal/xds/translator/runner/runner_test.go index ab8a2c65e78..9f3d7035bd6 100644 --- a/internal/xds/translator/runner/runner_test.go +++ b/internal/xds/translator/runner/runner_test.go @@ -27,12 +27,14 @@ func TestRunner(t *testing.T) { // Setup xdsIR := new(message.XdsIR) xds := new(message.Xds) + pResource := new(message.ProviderResources) cfg, err := config.New() require.NoError(t, err) r := New(&Config{ - Server: *cfg, - XdsIR: xdsIR, - Xds: xds, + Server: *cfg, + ProviderResources: pResource, + XdsIR: xdsIR, + Xds: xds, }) ctx := context.Background() @@ -103,13 +105,16 @@ func TestRunner_withExtensionManager(t *testing.T) { // Setup xdsIR := new(message.XdsIR) xds := new(message.Xds) + pResource := new(message.ProviderResources) + cfg, err := config.New() require.NoError(t, err) r := New(&Config{ - Server: *cfg, - XdsIR: xdsIR, - Xds: xds, - ExtensionManager: &extManagerMock{}, + Server: *cfg, + ProviderResources: pResource, + XdsIR: xdsIR, + Xds: xds, + ExtensionManager: &extManagerMock{}, }) ctx := context.Background()