Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add request redirect and mirror filters functionality #2392

Merged
merged 4 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions adapter/internal/oasparser/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ const (
KindAPIPolicy = "APIPolicy"
KindScope = "Scope"
KindRateLimitPolicy = "RateLimitPolicy"
KindService = "Service"
KindBackend = "Backend"
)

// API environment types
Expand Down
14 changes: 4 additions & 10 deletions adapter/internal/oasparser/envoyconf/routes_configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func generateRouteAction(apiType string, routeConfig *model.EndpointConfig, rate
return action
}

func generateRequestRedirectRoute(route string, policyParams interface{}) (action *routev3.Route_Redirect) {
func generateRequestRedirectRoute(route string, policyParams interface{}) (*routev3.Route_Redirect, error) {
policyParameters, _ := policyParams.(map[string]interface{})
scheme, _ := policyParameters[constants.RedirectScheme].(string)
hostname, _ := policyParameters[constants.RedirectHostname].(string)
Expand All @@ -137,10 +137,10 @@ func generateRequestRedirectRoute(route string, policyParams interface{}) (actio
replaceFullPath, _ := policyParameters[constants.RedirectPath].(string)
redirectActionStatusCode := mapStatusCodeToEnum(statusCode)
if redirectActionStatusCode == -1 {
_ = fmt.Errorf("Invalid status code provided")
return nil, fmt.Errorf("Invalid status code provided")
}

action = &routev3.Route_Redirect{
action := &routev3.Route_Redirect{
Redirect: &routev3.RedirectAction{
SchemeRewriteSpecifier: &routev3.RedirectAction_HttpsRedirect{
HttpsRedirect: scheme == "https",
Expand All @@ -153,7 +153,7 @@ func generateRequestRedirectRoute(route string, policyParams interface{}) (actio
ResponseCode: routev3.RedirectAction_RedirectResponseCode(redirectActionStatusCode),
},
}
return action
return action, nil
}

func mapStatusCodeToEnum(statusCode int) int {
Expand All @@ -162,12 +162,6 @@ func mapStatusCodeToEnum(statusCode int) int {
return 0
case 302:
return 1
case 303:
return 2
case 307:
return 3
case 308:
return 4
default:
return -1
}
Expand Down
47 changes: 26 additions & 21 deletions adapter/internal/oasparser/envoyconf/routes_with_clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,29 +224,31 @@ func CreateRoutesWithClusters(adapterInternalAPI *model.AdapterInternalAPI, inte

// Creating clusters for request mirroring endpoints
for _, op := range resource.GetOperations() {
if op.GetMirrorEndpoints() != nil && len(op.GetMirrorEndpoints().Endpoints) > 0 {
mirrorEndpointCluster := op.GetMirrorEndpoints()
for _, mirrorEndpoint := range mirrorEndpointCluster.Endpoints {
mirrorBasepath := strings.TrimSuffix(mirrorEndpoint.Basepath, "/")
existingMirrorClusterName := getExistingClusterName(*mirrorEndpointCluster, processedEndpoints)
var mirrorClusterName string
if existingMirrorClusterName == "" {
mirrorClusterName = getClusterName(mirrorEndpointCluster.EndpointPrefix, organizationID, vHost, adapterInternalAPI.GetTitle(), apiVersion, resource.GetID())
mirrorCluster, mirrorAddress, err := processEndpoints(mirrorClusterName, mirrorEndpointCluster, timeout, mirrorBasepath)
if err != nil {
logger.LoggerOasparser.ErrorC(logging.PrintError(logging.Error2239, logging.MAJOR, "Error while adding resource level mirror filter endpoints for %s:%v-%v. %v", apiTitle, apiVersion, resourcePath, err.Error()))
if op.GetMirrorEndpointClusters() != nil && len(op.GetMirrorEndpointClusters()) > 0 {
mirrorEndpointClusters := op.GetMirrorEndpointClusters()
for _, mirrorEndpointCluster := range mirrorEndpointClusters {
for _, mirrorEndpoint := range mirrorEndpointCluster.Endpoints {
mirrorBasepath := strings.TrimSuffix(mirrorEndpoint.Basepath, "/")
existingMirrorClusterName := getExistingClusterName(*mirrorEndpointCluster, processedEndpoints)
var mirrorClusterName string
if existingMirrorClusterName == "" {
mirrorClusterName = getClusterName(mirrorEndpointCluster.EndpointPrefix, organizationID, vHost, adapterInternalAPI.GetTitle(), apiVersion, op.GetID())
mirrorCluster, mirrorAddress, err := processEndpoints(mirrorClusterName, mirrorEndpointCluster, timeout, mirrorBasepath)
if err != nil {
logger.LoggerOasparser.ErrorC(logging.PrintError(logging.Error2239, logging.MAJOR, "Error while adding resource level mirror filter endpoints for %s:%v-%v. %v", apiTitle, apiVersion, resourcePath, err.Error()))
} else {
clusters = append(clusters, mirrorCluster)
endpoints = append(endpoints, mirrorAddress...)
processedEndpoints[mirrorClusterName] = *mirrorEndpointCluster
}
} else {
clusters = append(clusters, mirrorCluster)
endpoints = append(endpoints, mirrorAddress...)
processedEndpoints[mirrorClusterName] = *mirrorEndpointCluster
mirrorClusterName = existingMirrorClusterName
}
} else {
mirrorClusterName = existingMirrorClusterName
}
if _, exists := mirrorClusterNames[op.GetID()]; !exists {
mirrorClusterNames[op.GetID()] = []string{}
if _, exists := mirrorClusterNames[op.GetID()]; !exists {
mirrorClusterNames[op.GetID()] = []string{}
}
mirrorClusterNames[op.GetID()] = append(mirrorClusterNames[op.GetID()], mirrorClusterName)
}
mirrorClusterNames[op.GetID()] = append(mirrorClusterNames[op.GetID()], mirrorClusterName)
}
}
}
Expand Down Expand Up @@ -998,7 +1000,10 @@ func createRoutes(params *routeCreateParams) (routes []*routev3.Route, err error
case constants.ActionRedirectRequest:
logger.LoggerOasparser.Debugf("Adding %s policy to request flow for %s %s",
constants.ActionRedirectRequest, resourcePath, operation.GetMethod())
requestRedirectAction = generateRequestRedirectRoute(resourcePath, requestPolicy.Parameters)
requestRedirectAction, err = generateRequestRedirectRoute(resourcePath, requestPolicy.Parameters)
if err != nil {
return nil, err
}
}
}

Expand Down
116 changes: 94 additions & 22 deletions adapter/internal/oasparser/model/adapter_internal_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,9 @@ func (adapterInternalAPI *AdapterInternalAPI) SetInfoHTTPRouteCR(httpRoute *gwap
var baseIntervalInMillis uint32
hasURLRewritePolicy := false
hasRequestRedirectPolicy := false
var mirrorEndpointsList []Endpoint
var securityConfig []EndpointSecurity
var mirrorEndpointClusters []*EndpointCluster

backendBasePath := ""
for _, backend := range rule.BackendRefs {
backendName := types.NamespacedName{
Expand Down Expand Up @@ -702,21 +703,101 @@ func (adapterInternalAPI *AdapterInternalAPI) SetInfoHTTPRouteCR(httpRoute *gwap
})

case gwapiv1.HTTPRouteFilterRequestMirror:
var mirrorTimeoutInMillis uint32
var mirrorIdleTimeoutInSeconds uint32
var mirrorCircuitBreaker *dpv1alpha1.CircuitBreaker
var mirrorHealthCheck *dpv1alpha1.HealthCheck
isMirrorRetryConfig := false
isMirrorRouteTimeout := false
var mirrorBackendRetryCount uint32
var mirrorStatusCodes []uint32
mirrorStatusCodes = append(mirrorStatusCodes, config.Envoy.Upstream.Retry.StatusCodes...)
var mirrorBaseIntervalInMillis uint32
policyParameters := make(map[string]interface{})
backend := &filter.RequestMirror.BackendRef
backendName := types.NamespacedName{
Name: string(backend.Name),
Namespace: utils.GetNamespace(backend.Namespace, httpRoute.Namespace),
mirrorBackend := &filter.RequestMirror.BackendRef
mirrorBackendName := types.NamespacedName{
Name: string(mirrorBackend.Name),
Namespace: utils.GetNamespace(mirrorBackend.Namespace, httpRoute.Namespace),
}
_, ok := resourceParams.BackendMapping[backendName.String()]
if !ok {
return fmt.Errorf("backend: %s has not been resolved", backendName)
resolvedMirrorBackend, ok := resourceParams.BackendMapping[mirrorBackendName.String()]

if ok {
if resolvedMirrorBackend.CircuitBreaker != nil {
mirrorCircuitBreaker = &dpv1alpha1.CircuitBreaker{
MaxConnections: resolvedMirrorBackend.CircuitBreaker.MaxConnections,
MaxPendingRequests: resolvedMirrorBackend.CircuitBreaker.MaxPendingRequests,
MaxRequests: resolvedMirrorBackend.CircuitBreaker.MaxRequests,
MaxRetries: resolvedMirrorBackend.CircuitBreaker.MaxRetries,
MaxConnectionPools: resolvedMirrorBackend.CircuitBreaker.MaxConnectionPools,
}
}

if resolvedMirrorBackend.Timeout != nil {
isMirrorRouteTimeout = true
mirrorTimeoutInMillis = resolvedMirrorBackend.Timeout.UpstreamResponseTimeout * 1000
mirrorIdleTimeoutInSeconds = resolvedMirrorBackend.Timeout.DownstreamRequestIdleTimeout
}

if resolvedMirrorBackend.Retry != nil {
isMirrorRetryConfig = true
mirrorBackendRetryCount = resolvedMirrorBackend.Retry.Count
mirrorBaseIntervalInMillis = resolvedMirrorBackend.Retry.BaseIntervalMillis
if len(resolvedMirrorBackend.Retry.StatusCodes) > 0 {
mirrorStatusCodes = resolvedMirrorBackend.Retry.StatusCodes
}
}

if resolvedMirrorBackend.HealthCheck != nil {
mirrorHealthCheck = &dpv1alpha1.HealthCheck{
Interval: resolvedMirrorBackend.HealthCheck.Interval,
Timeout: resolvedMirrorBackend.HealthCheck.Timeout,
UnhealthyThreshold: resolvedMirrorBackend.HealthCheck.UnhealthyThreshold,
HealthyThreshold: resolvedMirrorBackend.HealthCheck.HealthyThreshold,
}
}
} else {
return fmt.Errorf("backend: %s has not been resolved", mirrorBackendName)
}
mirrorEndpoints := GetEndpoints(backendName, resourceParams.BackendMapping)

mirrorEndpoints := GetEndpoints(mirrorBackendName, resourceParams.BackendMapping)
if len(mirrorEndpoints) > 0 {
policyParameters["endpoints"] = mirrorEndpoints
mirrorEndpointCluster := &EndpointCluster{
Endpoints: mirrorEndpoints,
}
mirrorEndpointConfig := &EndpointConfig{}
if isMirrorRouteTimeout {
mirrorEndpointConfig.TimeoutInMillis = mirrorTimeoutInMillis
mirrorEndpointConfig.IdleTimeoutInSeconds = mirrorIdleTimeoutInSeconds
}
if mirrorCircuitBreaker != nil {
mirrorEndpointConfig.CircuitBreakers = &CircuitBreakers{
MaxConnections: int32(mirrorCircuitBreaker.MaxConnections),
MaxRequests: int32(mirrorCircuitBreaker.MaxRequests),
MaxPendingRequests: int32(mirrorCircuitBreaker.MaxPendingRequests),
MaxRetries: int32(mirrorCircuitBreaker.MaxRetries),
MaxConnectionPools: int32(mirrorCircuitBreaker.MaxConnectionPools),
}
}
if isMirrorRetryConfig {
mirrorEndpointConfig.RetryConfig = &RetryConfig{
Count: int32(mirrorBackendRetryCount),
StatusCodes: mirrorStatusCodes,
BaseIntervalInMillis: int32(mirrorBaseIntervalInMillis),
}
}
if mirrorHealthCheck != nil {
mirrorEndpointCluster.HealthCheck = &HealthCheck{
Interval: mirrorHealthCheck.Interval,
Timeout: mirrorHealthCheck.Timeout,
UnhealthyThreshold: mirrorHealthCheck.UnhealthyThreshold,
HealthyThreshold: mirrorHealthCheck.HealthyThreshold,
}
}
if isMirrorRouteTimeout || mirrorCircuitBreaker != nil || mirrorHealthCheck != nil || isMirrorRetryConfig {
mirrorEndpointCluster.Config = mirrorEndpointConfig
}
mirrorEndpointClusters = append(mirrorEndpointClusters, mirrorEndpointCluster)
}
mirrorEndpointsList = append(mirrorEndpointsList, mirrorEndpoints...)
policies.Request = append(policies.Request, Policy{
PolicyName: string(gwapiv1.HTTPRouteFilterRequestMirror),
Action: constants.ActionMirrorRequest,
Expand All @@ -732,10 +813,6 @@ func (adapterInternalAPI *AdapterInternalAPI) SetInfoHTTPRouteCR(httpRoute *gwap
loggers.LoggerOasparser.Debugf("Calculating auths for API ..., API_UUID = %v", adapterInternalAPI.UUID)
apiAuth := getSecurity(resourceAuthScheme)

if !hasRequestRedirectPolicy && len(rule.BackendRefs) < 1 {
return fmt.Errorf("no backendref were provided")
}

for _, match := range rule.Matches {
if hasURLRewritePolicy && hasRequestRedirectPolicy {
return fmt.Errorf("cannot have URL Rewrite and Request Redirect under the same rule")
Expand All @@ -756,14 +833,9 @@ func (adapterInternalAPI *AdapterInternalAPI) SetInfoHTTPRouteCR(httpRoute *gwap
})
}
resourcePath := adapterInternalAPI.xWso2Basepath + *match.Path.Value
var mirrorEndpointCluster *EndpointCluster
if len(mirrorEndpointsList) > 0 {
mirrorEndpointCluster = &EndpointCluster{
Endpoints: mirrorEndpointsList,
}
}

operations := getAllowedOperations(match.Method, policies, apiAuth,
parseRateLimitPolicyToInternal(resourceRatelimitPolicy), scopes, mirrorEndpointCluster)
parseRateLimitPolicyToInternal(resourceRatelimitPolicy), scopes, mirrorEndpointClusters)
resource := &Resource{path: resourcePath,
methods: operations,
pathMatchType: *match.Path.Type,
Expand Down
24 changes: 12 additions & 12 deletions adapter/internal/oasparser/model/api_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ type Operation struct {
iD string
method string
//security map of security scheme names -> list of scopes
scopes []string
auth *Authentication
tier string
disableSecurity bool
vendorExtensions map[string]interface{}
policies OperationPolicies
mockedAPIConfig *api.MockedApiConfig
rateLimitPolicy *RateLimitPolicy
mirrorEndpoints *EndpointCluster
scopes []string
auth *Authentication
tier string
disableSecurity bool
vendorExtensions map[string]interface{}
policies OperationPolicies
mockedAPIConfig *api.MockedApiConfig
rateLimitPolicy *RateLimitPolicy
mirrorEndpointClusters []*EndpointCluster
}

// Authentication holds authentication related configurations
Expand Down Expand Up @@ -126,9 +126,9 @@ func (operation *Operation) GetID() string {
return operation.iD
}

// GetMirrorEndpoints returns the endpoints if a mirror filter has been applied.
func (operation *Operation) GetMirrorEndpoints() *EndpointCluster {
return operation.mirrorEndpoints
// GetMirrorEndpointClusters returns the endpoints if a mirror filter has been applied.
func (operation *Operation) GetMirrorEndpointClusters() []*EndpointCluster {
return operation.mirrorEndpointClusters
}

// GetCallInterceptorService returns the interceptor configs for a given operation.
Expand Down
17 changes: 8 additions & 9 deletions adapter/internal/oasparser/model/http_route.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,26 +287,25 @@ func getSecurity(authScheme *dpv1alpha2.Authentication) *Authentication {

// getAllowedOperations retuns a list of allowed operatons, if httpMethod is not specified then all methods are allowed.
func getAllowedOperations(httpMethod *gwapiv1.HTTPMethod, policies OperationPolicies, auth *Authentication,
ratelimitPolicy *RateLimitPolicy, scopes []string, mirrorEndpoints *EndpointCluster) []*Operation {
ratelimitPolicy *RateLimitPolicy, scopes []string, mirrorEndpointClusters []*EndpointCluster) []*Operation {
if httpMethod != nil {
return []*Operation{{iD: uuid.New().String(), method: string(*httpMethod), policies: policies,
auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpoints: mirrorEndpoints}}

auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpointClusters: mirrorEndpointClusters}}
}
return []*Operation{{iD: uuid.New().String(), method: string(gwapiv1.HTTPMethodGet), policies: policies,
auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes},
{iD: uuid.New().String(), method: string(gwapiv1.HTTPMethodPost), policies: policies,
auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpoints: mirrorEndpoints},
auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpointClusters: mirrorEndpointClusters},
{iD: uuid.New().String(), method: string(gwapiv1.HTTPMethodDelete), policies: policies,
auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpoints: mirrorEndpoints},
auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpointClusters: mirrorEndpointClusters},
{iD: uuid.New().String(), method: string(gwapiv1.HTTPMethodPatch), policies: policies,
auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpoints: mirrorEndpoints},
auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpointClusters: mirrorEndpointClusters},
{iD: uuid.New().String(), method: string(gwapiv1.HTTPMethodPut), policies: policies,
auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpoints: mirrorEndpoints},
auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpointClusters: mirrorEndpointClusters},
{iD: uuid.New().String(), method: string(gwapiv1.HTTPMethodHead), policies: policies,
auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpoints: mirrorEndpoints},
auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpointClusters: mirrorEndpointClusters},
{iD: uuid.New().String(), method: string(gwapiv1.HTTPMethodOptions), policies: policies,
auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpoints: mirrorEndpoints}}
auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpointClusters: mirrorEndpointClusters}}
}

// SetInfoAPICR populates ID, ApiType, Version and XWso2BasePath of adapterInternalAPI.
Expand Down
30 changes: 19 additions & 11 deletions adapter/internal/operator/controllers/dp/api_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,21 +851,31 @@ func (apiReconciler *APIReconciler) getResolvedBackendsMapping(ctx context.Conte
for _, filter := range rule.Filters {
if filter.RequestMirror != nil {
mirrorBackend := filter.RequestMirror.BackendRef

mirrorBackendNamespacedName := types.NamespacedName{
Name: string(mirrorBackend.Name),
Namespace: utils.GetNamespace(mirrorBackend.Namespace, httpRoute.Namespace),
}
if _, exists := backendMapping[mirrorBackendNamespacedName.String()]; !exists {
resolvedMirrorBackend := utils.GetResolvedBackend(ctx, apiReconciler.client, mirrorBackendNamespacedName, &api)
if resolvedMirrorBackend != nil {
backendMapping[mirrorBackendNamespacedName.String()] = resolvedMirrorBackend
} else {
return nil, fmt.Errorf("unable to find backend %s", mirrorBackendNamespacedName.String())
if string(*mirrorBackend.Kind) == constants.KindBackend {
if _, exists := backendMapping[mirrorBackendNamespacedName.String()]; !exists {
resolvedMirrorBackend := utils.GetResolvedBackend(ctx, apiReconciler.client, mirrorBackendNamespacedName, &api)
if resolvedMirrorBackend != nil {
backendMapping[mirrorBackendNamespacedName.String()] = resolvedMirrorBackend
} else {
return nil, fmt.Errorf("unable to find backend %s", mirrorBackendNamespacedName.String())
}
}
} else if string(*mirrorBackend.Kind) == constants.KindService {
var err error
service, err := utils.GetService(ctx, apiReconciler.client, utils.GetNamespace(mirrorBackend.Namespace, httpRoute.Namespace), string(mirrorBackend.Name))
if err != nil {
return nil, fmt.Errorf("unable to find service %s", mirrorBackendNamespacedName.String())
}
backendMapping[mirrorBackendNamespacedName.String()], err = utils.GetResolvedBackendFromService(service, int(*mirrorBackend.Port))
if err != nil {
return nil, fmt.Errorf("error in getting service information %s", service)
}
}
}

}
}

Expand Down Expand Up @@ -1839,9 +1849,7 @@ func addIndexes(ctx context.Context, mgr manager.Manager) error {
authentication := rawObj.(*dpv1alpha2.Authentication)
var apis []string
if authentication.Spec.TargetRef.Kind == constants.KindAPI {

namespace, err := utils.ValidateAndRetrieveNamespace((*gwapiv1.Namespace)(authentication.Spec.TargetRef.Namespace), authentication.Namespace)

if err != nil {
loggers.LoggerAPKOperator.Errorf("Namespace mismatch. TargetRef %s needs to be in the same namespace as the Athentication %s. Expected: %s, Actual: %s",
string(authentication.Spec.TargetRef.Name), authentication.Name, authentication.Namespace, string(*authentication.Spec.TargetRef.Namespace))
Expand Down Expand Up @@ -1956,7 +1964,7 @@ func addIndexes(ctx context.Context, mgr manager.Manager) error {
return err
}

// ratelimite policy to API indexer
// ratelimit policy to API indexer
if err := mgr.GetFieldIndexer().IndexField(ctx, &dpv1alpha1.RateLimitPolicy{}, apiRateLimitIndex,
func(rawObj k8client.Object) []string {
ratelimitPolicy := rawObj.(*dpv1alpha1.RateLimitPolicy)
Expand Down
Loading
Loading