diff --git a/adapter/internal/oasparser/constants/constants.go b/adapter/internal/oasparser/constants/constants.go index 919b3966c..92ce52d45 100644 --- a/adapter/internal/oasparser/constants/constants.go +++ b/adapter/internal/oasparser/constants/constants.go @@ -137,6 +137,8 @@ const ( KindAPIPolicy = "APIPolicy" KindScope = "Scope" KindRateLimitPolicy = "RateLimitPolicy" + KindService = "Service" + KindBackend = "Backend" ) // API environment types diff --git a/adapter/internal/oasparser/envoyconf/routes_with_clusters.go b/adapter/internal/oasparser/envoyconf/routes_with_clusters.go index 17ab9f110..85f673c38 100644 --- a/adapter/internal/oasparser/envoyconf/routes_with_clusters.go +++ b/adapter/internal/oasparser/envoyconf/routes_with_clusters.go @@ -224,29 +224,31 @@ func CreateRoutesWithClusters(adapterInternalAPI *model.AdapterInternalAPI, inte // Creating clusters for request mirroring endpoints for _, op := range resource.GetOperations() { - if op.GetMirrorEndpoints() != nil && len(op.GetMirrorEndpoints().Endpoints) > 0 { - mirrorEndpointCluster := op.GetMirrorEndpoints() - for _, mirrorEndpoint := range mirrorEndpointCluster.Endpoints { - mirrorBasepath := strings.TrimSuffix(mirrorEndpoint.Basepath, "/") - existingMirrorClusterName := getExistingClusterName(*mirrorEndpointCluster, processedEndpoints) - var mirrorClusterName string - if existingMirrorClusterName == "" { - mirrorClusterName = getClusterName(mirrorEndpointCluster.EndpointPrefix, organizationID, vHost, adapterInternalAPI.GetTitle(), apiVersion, resource.GetID()) - mirrorCluster, mirrorAddress, err := processEndpoints(mirrorClusterName, mirrorEndpointCluster, timeout, mirrorBasepath) - if err != nil { - logger.LoggerOasparser.ErrorC(logging.PrintError(logging.Error2239, logging.MAJOR, "Error while adding resource level mirror filter endpoints for %s:%v-%v. %v", apiTitle, apiVersion, resourcePath, err.Error())) + if op.GetMirrorEndpointClusters() != nil && len(op.GetMirrorEndpointClusters()) > 0 { + mirrorEndpointClusters := op.GetMirrorEndpointClusters() + for _, mirrorEndpointCluster := range mirrorEndpointClusters { + for _, mirrorEndpoint := range mirrorEndpointCluster.Endpoints { + mirrorBasepath := strings.TrimSuffix(mirrorEndpoint.Basepath, "/") + existingMirrorClusterName := getExistingClusterName(*mirrorEndpointCluster, processedEndpoints) + var mirrorClusterName string + if existingMirrorClusterName == "" { + mirrorClusterName = getClusterName(mirrorEndpointCluster.EndpointPrefix, organizationID, vHost, adapterInternalAPI.GetTitle(), apiVersion, op.GetID()) + mirrorCluster, mirrorAddress, err := processEndpoints(mirrorClusterName, mirrorEndpointCluster, timeout, mirrorBasepath) + if err != nil { + logger.LoggerOasparser.ErrorC(logging.PrintError(logging.Error2239, logging.MAJOR, "Error while adding resource level mirror filter endpoints for %s:%v-%v. %v", apiTitle, apiVersion, resourcePath, err.Error())) + } else { + clusters = append(clusters, mirrorCluster) + endpoints = append(endpoints, mirrorAddress...) + processedEndpoints[mirrorClusterName] = *mirrorEndpointCluster + } } else { - clusters = append(clusters, mirrorCluster) - endpoints = append(endpoints, mirrorAddress...) - processedEndpoints[mirrorClusterName] = *mirrorEndpointCluster + mirrorClusterName = existingMirrorClusterName } - } else { - mirrorClusterName = existingMirrorClusterName - } - if _, exists := mirrorClusterNames[op.GetID()]; !exists { - mirrorClusterNames[op.GetID()] = []string{} + if _, exists := mirrorClusterNames[op.GetID()]; !exists { + mirrorClusterNames[op.GetID()] = []string{} + } + mirrorClusterNames[op.GetID()] = append(mirrorClusterNames[op.GetID()], mirrorClusterName) } - mirrorClusterNames[op.GetID()] = append(mirrorClusterNames[op.GetID()], mirrorClusterName) } } } diff --git a/adapter/internal/oasparser/model/adapter_internal_api.go b/adapter/internal/oasparser/model/adapter_internal_api.go index d760b256f..0487432fd 100644 --- a/adapter/internal/oasparser/model/adapter_internal_api.go +++ b/adapter/internal/oasparser/model/adapter_internal_api.go @@ -487,8 +487,9 @@ func (adapterInternalAPI *AdapterInternalAPI) SetInfoHTTPRouteCR(httpRoute *gwap var baseIntervalInMillis uint32 hasURLRewritePolicy := false hasRequestRedirectPolicy := false - var mirrorEndpointsList []Endpoint var securityConfig []EndpointSecurity + var mirrorEndpointClusters []*EndpointCluster + backendBasePath := "" for _, backend := range rule.BackendRefs { backendName := types.NamespacedName{ @@ -702,21 +703,101 @@ func (adapterInternalAPI *AdapterInternalAPI) SetInfoHTTPRouteCR(httpRoute *gwap }) case gwapiv1.HTTPRouteFilterRequestMirror: + var mirrorTimeoutInMillis uint32 + var mirrorIdleTimeoutInSeconds uint32 + var mirrorCircuitBreaker *dpv1alpha1.CircuitBreaker + var mirrorHealthCheck *dpv1alpha1.HealthCheck + isMirrorRetryConfig := false + isMirrorRouteTimeout := false + var mirrorBackendRetryCount uint32 + var mirrorStatusCodes []uint32 + mirrorStatusCodes = append(mirrorStatusCodes, config.Envoy.Upstream.Retry.StatusCodes...) + var mirrorBaseIntervalInMillis uint32 policyParameters := make(map[string]interface{}) - backend := &filter.RequestMirror.BackendRef - backendName := types.NamespacedName{ - Name: string(backend.Name), - Namespace: utils.GetNamespace(backend.Namespace, httpRoute.Namespace), + mirrorBackend := &filter.RequestMirror.BackendRef + mirrorBackendName := types.NamespacedName{ + Name: string(mirrorBackend.Name), + Namespace: utils.GetNamespace(mirrorBackend.Namespace, httpRoute.Namespace), } - _, ok := resourceParams.BackendMapping[backendName.String()] - if !ok { - return fmt.Errorf("backend: %s has not been resolved", backendName) + resolvedMirrorBackend, ok := resourceParams.BackendMapping[mirrorBackendName.String()] + + if ok { + if resolvedMirrorBackend.CircuitBreaker != nil { + mirrorCircuitBreaker = &dpv1alpha1.CircuitBreaker{ + MaxConnections: resolvedMirrorBackend.CircuitBreaker.MaxConnections, + MaxPendingRequests: resolvedMirrorBackend.CircuitBreaker.MaxPendingRequests, + MaxRequests: resolvedMirrorBackend.CircuitBreaker.MaxRequests, + MaxRetries: resolvedMirrorBackend.CircuitBreaker.MaxRetries, + MaxConnectionPools: resolvedMirrorBackend.CircuitBreaker.MaxConnectionPools, + } + } + + if resolvedMirrorBackend.Timeout != nil { + isMirrorRouteTimeout = true + mirrorTimeoutInMillis = resolvedMirrorBackend.Timeout.UpstreamResponseTimeout * 1000 + mirrorIdleTimeoutInSeconds = resolvedMirrorBackend.Timeout.DownstreamRequestIdleTimeout + } + + if resolvedMirrorBackend.Retry != nil { + isMirrorRetryConfig = true + mirrorBackendRetryCount = resolvedMirrorBackend.Retry.Count + mirrorBaseIntervalInMillis = resolvedMirrorBackend.Retry.BaseIntervalMillis + if len(resolvedMirrorBackend.Retry.StatusCodes) > 0 { + mirrorStatusCodes = resolvedMirrorBackend.Retry.StatusCodes + } + } + + if resolvedMirrorBackend.HealthCheck != nil { + mirrorHealthCheck = &dpv1alpha1.HealthCheck{ + Interval: resolvedMirrorBackend.HealthCheck.Interval, + Timeout: resolvedMirrorBackend.HealthCheck.Timeout, + UnhealthyThreshold: resolvedMirrorBackend.HealthCheck.UnhealthyThreshold, + HealthyThreshold: resolvedMirrorBackend.HealthCheck.HealthyThreshold, + } + } + } else { + return fmt.Errorf("backend: %s has not been resolved", mirrorBackendName) } - mirrorEndpoints := GetEndpoints(backendName, resourceParams.BackendMapping) + + mirrorEndpoints := GetEndpoints(mirrorBackendName, resourceParams.BackendMapping) if len(mirrorEndpoints) > 0 { - policyParameters["endpoints"] = mirrorEndpoints + mirrorEndpointCluster := &EndpointCluster{ + Endpoints: mirrorEndpoints, + } + mirrorEndpointConfig := &EndpointConfig{} + if isMirrorRouteTimeout { + mirrorEndpointConfig.TimeoutInMillis = mirrorTimeoutInMillis + mirrorEndpointConfig.IdleTimeoutInSeconds = mirrorIdleTimeoutInSeconds + } + if mirrorCircuitBreaker != nil { + mirrorEndpointConfig.CircuitBreakers = &CircuitBreakers{ + MaxConnections: int32(mirrorCircuitBreaker.MaxConnections), + MaxRequests: int32(mirrorCircuitBreaker.MaxRequests), + MaxPendingRequests: int32(mirrorCircuitBreaker.MaxPendingRequests), + MaxRetries: int32(mirrorCircuitBreaker.MaxRetries), + MaxConnectionPools: int32(mirrorCircuitBreaker.MaxConnectionPools), + } + } + if isMirrorRetryConfig { + mirrorEndpointConfig.RetryConfig = &RetryConfig{ + Count: int32(mirrorBackendRetryCount), + StatusCodes: mirrorStatusCodes, + BaseIntervalInMillis: int32(mirrorBaseIntervalInMillis), + } + } + if mirrorHealthCheck != nil { + mirrorEndpointCluster.HealthCheck = &HealthCheck{ + Interval: mirrorHealthCheck.Interval, + Timeout: mirrorHealthCheck.Timeout, + UnhealthyThreshold: mirrorHealthCheck.UnhealthyThreshold, + HealthyThreshold: mirrorHealthCheck.HealthyThreshold, + } + } + if isMirrorRouteTimeout || mirrorCircuitBreaker != nil || mirrorHealthCheck != nil || isMirrorRetryConfig { + mirrorEndpointCluster.Config = mirrorEndpointConfig + } + mirrorEndpointClusters = append(mirrorEndpointClusters, mirrorEndpointCluster) } - mirrorEndpointsList = append(mirrorEndpointsList, mirrorEndpoints...) policies.Request = append(policies.Request, Policy{ PolicyName: string(gwapiv1.HTTPRouteFilterRequestMirror), Action: constants.ActionMirrorRequest, @@ -732,10 +813,6 @@ func (adapterInternalAPI *AdapterInternalAPI) SetInfoHTTPRouteCR(httpRoute *gwap loggers.LoggerOasparser.Debugf("Calculating auths for API ..., API_UUID = %v", adapterInternalAPI.UUID) apiAuth := getSecurity(resourceAuthScheme) - if !hasRequestRedirectPolicy && len(rule.BackendRefs) < 1 { - return fmt.Errorf("no backendref were provided") - } - for _, match := range rule.Matches { if hasURLRewritePolicy && hasRequestRedirectPolicy { return fmt.Errorf("cannot have URL Rewrite and Request Redirect under the same rule") @@ -756,14 +833,9 @@ func (adapterInternalAPI *AdapterInternalAPI) SetInfoHTTPRouteCR(httpRoute *gwap }) } resourcePath := adapterInternalAPI.xWso2Basepath + *match.Path.Value - var mirrorEndpointCluster *EndpointCluster - if len(mirrorEndpointsList) > 0 { - mirrorEndpointCluster = &EndpointCluster{ - Endpoints: mirrorEndpointsList, - } - } + operations := getAllowedOperations(match.Method, policies, apiAuth, - parseRateLimitPolicyToInternal(resourceRatelimitPolicy), scopes, mirrorEndpointCluster) + parseRateLimitPolicyToInternal(resourceRatelimitPolicy), scopes, mirrorEndpointClusters) resource := &Resource{path: resourcePath, methods: operations, pathMatchType: *match.Path.Type, diff --git a/adapter/internal/oasparser/model/api_operation.go b/adapter/internal/oasparser/model/api_operation.go index 0547223d1..6220dbeb6 100644 --- a/adapter/internal/oasparser/model/api_operation.go +++ b/adapter/internal/oasparser/model/api_operation.go @@ -35,15 +35,15 @@ type Operation struct { iD string method string //security map of security scheme names -> list of scopes - scopes []string - auth *Authentication - tier string - disableSecurity bool - vendorExtensions map[string]interface{} - policies OperationPolicies - mockedAPIConfig *api.MockedApiConfig - rateLimitPolicy *RateLimitPolicy - mirrorEndpoints *EndpointCluster + scopes []string + auth *Authentication + tier string + disableSecurity bool + vendorExtensions map[string]interface{} + policies OperationPolicies + mockedAPIConfig *api.MockedApiConfig + rateLimitPolicy *RateLimitPolicy + mirrorEndpointClusters []*EndpointCluster } // Authentication holds authentication related configurations @@ -126,9 +126,9 @@ func (operation *Operation) GetID() string { return operation.iD } -// GetMirrorEndpoints returns the endpoints if a mirror filter has been applied. -func (operation *Operation) GetMirrorEndpoints() *EndpointCluster { - return operation.mirrorEndpoints +// GetMirrorEndpointClusters returns the endpoints if a mirror filter has been applied. +func (operation *Operation) GetMirrorEndpointClusters() []*EndpointCluster { + return operation.mirrorEndpointClusters } // GetCallInterceptorService returns the interceptor configs for a given operation. diff --git a/adapter/internal/oasparser/model/http_route.go b/adapter/internal/oasparser/model/http_route.go index 09b3fed9b..ec77c2155 100644 --- a/adapter/internal/oasparser/model/http_route.go +++ b/adapter/internal/oasparser/model/http_route.go @@ -287,26 +287,25 @@ func getSecurity(authScheme *dpv1alpha2.Authentication) *Authentication { // getAllowedOperations retuns a list of allowed operatons, if httpMethod is not specified then all methods are allowed. func getAllowedOperations(httpMethod *gwapiv1.HTTPMethod, policies OperationPolicies, auth *Authentication, - ratelimitPolicy *RateLimitPolicy, scopes []string, mirrorEndpoints *EndpointCluster) []*Operation { + ratelimitPolicy *RateLimitPolicy, scopes []string, mirrorEndpointClusters []*EndpointCluster) []*Operation { if httpMethod != nil { return []*Operation{{iD: uuid.New().String(), method: string(*httpMethod), policies: policies, - auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpoints: mirrorEndpoints}} - + auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpointClusters: mirrorEndpointClusters}} } return []*Operation{{iD: uuid.New().String(), method: string(gwapiv1.HTTPMethodGet), policies: policies, auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes}, {iD: uuid.New().String(), method: string(gwapiv1.HTTPMethodPost), policies: policies, - auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpoints: mirrorEndpoints}, + auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpointClusters: mirrorEndpointClusters}, {iD: uuid.New().String(), method: string(gwapiv1.HTTPMethodDelete), policies: policies, - auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpoints: mirrorEndpoints}, + auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpointClusters: mirrorEndpointClusters}, {iD: uuid.New().String(), method: string(gwapiv1.HTTPMethodPatch), policies: policies, - auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpoints: mirrorEndpoints}, + auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpointClusters: mirrorEndpointClusters}, {iD: uuid.New().String(), method: string(gwapiv1.HTTPMethodPut), policies: policies, - auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpoints: mirrorEndpoints}, + auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpointClusters: mirrorEndpointClusters}, {iD: uuid.New().String(), method: string(gwapiv1.HTTPMethodHead), policies: policies, - auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpoints: mirrorEndpoints}, + auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpointClusters: mirrorEndpointClusters}, {iD: uuid.New().String(), method: string(gwapiv1.HTTPMethodOptions), policies: policies, - auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpoints: mirrorEndpoints}} + auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpointClusters: mirrorEndpointClusters}} } // SetInfoAPICR populates ID, ApiType, Version and XWso2BasePath of adapterInternalAPI. diff --git a/adapter/internal/operator/controllers/dp/api_controller.go b/adapter/internal/operator/controllers/dp/api_controller.go index d256c657c..292fcc637 100644 --- a/adapter/internal/operator/controllers/dp/api_controller.go +++ b/adapter/internal/operator/controllers/dp/api_controller.go @@ -851,21 +851,31 @@ func (apiReconciler *APIReconciler) getResolvedBackendsMapping(ctx context.Conte for _, filter := range rule.Filters { if filter.RequestMirror != nil { mirrorBackend := filter.RequestMirror.BackendRef - mirrorBackendNamespacedName := types.NamespacedName{ Name: string(mirrorBackend.Name), Namespace: utils.GetNamespace(mirrorBackend.Namespace, httpRoute.Namespace), } - if _, exists := backendMapping[mirrorBackendNamespacedName.String()]; !exists { - resolvedMirrorBackend := utils.GetResolvedBackend(ctx, apiReconciler.client, mirrorBackendNamespacedName, &api) - if resolvedMirrorBackend != nil { - backendMapping[mirrorBackendNamespacedName.String()] = resolvedMirrorBackend - } else { - return nil, fmt.Errorf("unable to find backend %s", mirrorBackendNamespacedName.String()) + if string(*mirrorBackend.Kind) == constants.KindBackend { + if _, exists := backendMapping[mirrorBackendNamespacedName.String()]; !exists { + resolvedMirrorBackend := utils.GetResolvedBackend(ctx, apiReconciler.client, mirrorBackendNamespacedName, &api) + if resolvedMirrorBackend != nil { + backendMapping[mirrorBackendNamespacedName.String()] = resolvedMirrorBackend + } else { + return nil, fmt.Errorf("unable to find backend %s", mirrorBackendNamespacedName.String()) + } + } + } else if string(*mirrorBackend.Kind) == constants.KindService { + var err error + service, err := utils.GetService(ctx, apiReconciler.client, utils.GetNamespace(mirrorBackend.Namespace, httpRoute.Namespace), string(mirrorBackend.Name)) + if err != nil { + return nil, fmt.Errorf("unable to find service %s", mirrorBackendNamespacedName.String()) + } + backendMapping[mirrorBackendNamespacedName.String()], err = utils.GetResolvedBackendFromService(service, int(*mirrorBackend.Port)) + if err != nil { + return nil, fmt.Errorf("error in getting service information %s", service) } } } - } } @@ -1839,9 +1849,7 @@ func addIndexes(ctx context.Context, mgr manager.Manager) error { authentication := rawObj.(*dpv1alpha2.Authentication) var apis []string if authentication.Spec.TargetRef.Kind == constants.KindAPI { - namespace, err := utils.ValidateAndRetrieveNamespace((*gwapiv1.Namespace)(authentication.Spec.TargetRef.Namespace), authentication.Namespace) - if err != nil { loggers.LoggerAPKOperator.Errorf("Namespace mismatch. TargetRef %s needs to be in the same namespace as the Athentication %s. Expected: %s, Actual: %s", string(authentication.Spec.TargetRef.Name), authentication.Name, authentication.Namespace, string(*authentication.Spec.TargetRef.Namespace)) @@ -1956,7 +1964,7 @@ func addIndexes(ctx context.Context, mgr manager.Manager) error { return err } - // ratelimite policy to API indexer + // ratelimit policy to API indexer if err := mgr.GetFieldIndexer().IndexField(ctx, &dpv1alpha1.RateLimitPolicy{}, apiRateLimitIndex, func(rawObj k8client.Object) []string { ratelimitPolicy := rawObj.(*dpv1alpha1.RateLimitPolicy) diff --git a/adapter/internal/operator/utils/utils.go b/adapter/internal/operator/utils/utils.go index 14a046aac..d07defc9c 100644 --- a/adapter/internal/operator/utils/utils.go +++ b/adapter/internal/operator/utils/utils.go @@ -33,6 +33,7 @@ import ( "github.com/wso2/apk/adapter/pkg/logging" "github.com/wso2/apk/adapter/pkg/utils/envutils" "github.com/wso2/apk/adapter/pkg/utils/stringutils" + "github.com/wso2/apk/common-go-libs/apis/dp/v1alpha1" dpv1alpha1 "github.com/wso2/apk/common-go-libs/apis/dp/v1alpha1" dpv1alpha2 "github.com/wso2/apk/common-go-libs/apis/dp/v1alpha2" corev1 "k8s.io/api/core/v1" @@ -293,6 +294,58 @@ func getSecretValue(ctx context.Context, client k8client.Client, return string(secret.Data[key]), nil } +// GetService retrieves the Service object and returns its details. +func GetService(ctx context.Context, client k8client.Client, namespace, serviceName string) (*corev1.Service, error) { + service := &corev1.Service{} + err := client.Get(ctx, types.NamespacedName{ + Name: serviceName, + Namespace: namespace, + }, service) + if err != nil { + return nil, err + } + return service, nil +} + +// GetResolvedBackendFromService converts a Kubernetes Service to a Resolved Backend. +func GetResolvedBackendFromService(k8sService *corev1.Service, svcPort int) (*v1alpha1.ResolvedBackend, error) { + + var host string + var port uint32 + + if len(k8sService.Spec.Ports) == 0 { + port = uint32(svcPort) + } else { + servicePort := k8sService.Spec.Ports[0] + port = uint32(servicePort.Port) + } + + switch k8sService.Spec.Type { + case corev1.ServiceTypeClusterIP, corev1.ServiceTypeNodePort: + // Use the internal DNS name for clusterip and nodeport + host = fmt.Sprintf("%s.%s.svc.cluster.local", k8sService.Name, k8sService.Namespace) + case corev1.ServiceTypeLoadBalancer: + // Use the external IP or hostname for LB services + if len(k8sService.Status.LoadBalancer.Ingress) > 0 { + ingress := k8sService.Status.LoadBalancer.Ingress[0] + if ingress.IP != "" { + host = ingress.IP + } else if ingress.Hostname != "" { + host = ingress.Hostname + } else { + return nil, fmt.Errorf("no valid ingress found for LoadBalancer service %s", k8sService.Name) + } + } else { + return nil, fmt.Errorf("no load balancer ingress found for service %s", k8sService.Name) + } + default: + return nil, fmt.Errorf("unsupported service type %s", k8sService.Spec.Type) + } + + backend := &v1alpha1.ResolvedBackend{Services: []v1alpha1.Service{{Host: host, Port: port}}, Protocol: v1alpha1.HTTPProtocol} + return backend, nil +} + // ResolveAndAddBackendToMapping resolves backend from reference and adds it to the backendMapping. func ResolveAndAddBackendToMapping(ctx context.Context, client k8client.Client, backendMapping map[string]*dpv1alpha1.ResolvedBackend,