Skip to content

Commit

Permalink
Add service support for mirror backend refs
Browse files Browse the repository at this point in the history
  • Loading branch information
sgayangi committed Jul 11, 2024
1 parent e4280a4 commit 7edbae0
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 74 deletions.
2 changes: 2 additions & 0 deletions adapter/internal/oasparser/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ const (
KindAPIPolicy = "APIPolicy"
KindScope = "Scope"
KindRateLimitPolicy = "RateLimitPolicy"
KindService = "Service"
KindBackend = "Backend"
)

// API environment types
Expand Down
42 changes: 22 additions & 20 deletions adapter/internal/oasparser/envoyconf/routes_with_clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,29 +224,31 @@ func CreateRoutesWithClusters(adapterInternalAPI *model.AdapterInternalAPI, inte

// Creating clusters for request mirroring endpoints
for _, op := range resource.GetOperations() {
if op.GetMirrorEndpoints() != nil && len(op.GetMirrorEndpoints().Endpoints) > 0 {
mirrorEndpointCluster := op.GetMirrorEndpoints()
for _, mirrorEndpoint := range mirrorEndpointCluster.Endpoints {
mirrorBasepath := strings.TrimSuffix(mirrorEndpoint.Basepath, "/")
existingMirrorClusterName := getExistingClusterName(*mirrorEndpointCluster, processedEndpoints)
var mirrorClusterName string
if existingMirrorClusterName == "" {
mirrorClusterName = getClusterName(mirrorEndpointCluster.EndpointPrefix, organizationID, vHost, adapterInternalAPI.GetTitle(), apiVersion, resource.GetID())
mirrorCluster, mirrorAddress, err := processEndpoints(mirrorClusterName, mirrorEndpointCluster, timeout, mirrorBasepath)
if err != nil {
logger.LoggerOasparser.ErrorC(logging.PrintError(logging.Error2239, logging.MAJOR, "Error while adding resource level mirror filter endpoints for %s:%v-%v. %v", apiTitle, apiVersion, resourcePath, err.Error()))
if op.GetMirrorEndpointClusters() != nil && len(op.GetMirrorEndpointClusters()) > 0 {
mirrorEndpointClusters := op.GetMirrorEndpointClusters()
for _, mirrorEndpointCluster := range mirrorEndpointClusters {
for _, mirrorEndpoint := range mirrorEndpointCluster.Endpoints {
mirrorBasepath := strings.TrimSuffix(mirrorEndpoint.Basepath, "/")
existingMirrorClusterName := getExistingClusterName(*mirrorEndpointCluster, processedEndpoints)
var mirrorClusterName string
if existingMirrorClusterName == "" {
mirrorClusterName = getClusterName(mirrorEndpointCluster.EndpointPrefix, organizationID, vHost, adapterInternalAPI.GetTitle(), apiVersion, op.GetID())
mirrorCluster, mirrorAddress, err := processEndpoints(mirrorClusterName, mirrorEndpointCluster, timeout, mirrorBasepath)
if err != nil {
logger.LoggerOasparser.ErrorC(logging.PrintError(logging.Error2239, logging.MAJOR, "Error while adding resource level mirror filter endpoints for %s:%v-%v. %v", apiTitle, apiVersion, resourcePath, err.Error()))
} else {
clusters = append(clusters, mirrorCluster)
endpoints = append(endpoints, mirrorAddress...)
processedEndpoints[mirrorClusterName] = *mirrorEndpointCluster
}
} else {
clusters = append(clusters, mirrorCluster)
endpoints = append(endpoints, mirrorAddress...)
processedEndpoints[mirrorClusterName] = *mirrorEndpointCluster
mirrorClusterName = existingMirrorClusterName
}
} else {
mirrorClusterName = existingMirrorClusterName
}
if _, exists := mirrorClusterNames[op.GetID()]; !exists {
mirrorClusterNames[op.GetID()] = []string{}
if _, exists := mirrorClusterNames[op.GetID()]; !exists {
mirrorClusterNames[op.GetID()] = []string{}
}
mirrorClusterNames[op.GetID()] = append(mirrorClusterNames[op.GetID()], mirrorClusterName)
}
mirrorClusterNames[op.GetID()] = append(mirrorClusterNames[op.GetID()], mirrorClusterName)
}
}
}
Expand Down
116 changes: 94 additions & 22 deletions adapter/internal/oasparser/model/adapter_internal_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,9 @@ func (adapterInternalAPI *AdapterInternalAPI) SetInfoHTTPRouteCR(httpRoute *gwap
var baseIntervalInMillis uint32
hasURLRewritePolicy := false
hasRequestRedirectPolicy := false
var mirrorEndpointsList []Endpoint
var securityConfig []EndpointSecurity
var mirrorEndpointClusters []*EndpointCluster

backendBasePath := ""
for _, backend := range rule.BackendRefs {
backendName := types.NamespacedName{
Expand Down Expand Up @@ -702,21 +703,101 @@ func (adapterInternalAPI *AdapterInternalAPI) SetInfoHTTPRouteCR(httpRoute *gwap
})

case gwapiv1.HTTPRouteFilterRequestMirror:
var mirrorTimeoutInMillis uint32
var mirrorIdleTimeoutInSeconds uint32
var mirrorCircuitBreaker *dpv1alpha1.CircuitBreaker
var mirrorHealthCheck *dpv1alpha1.HealthCheck
isMirrorRetryConfig := false
isMirrorRouteTimeout := false
var mirrorBackendRetryCount uint32
var mirrorStatusCodes []uint32
mirrorStatusCodes = append(mirrorStatusCodes, config.Envoy.Upstream.Retry.StatusCodes...)
var mirrorBaseIntervalInMillis uint32
policyParameters := make(map[string]interface{})
backend := &filter.RequestMirror.BackendRef
backendName := types.NamespacedName{
Name: string(backend.Name),
Namespace: utils.GetNamespace(backend.Namespace, httpRoute.Namespace),
mirrorBackend := &filter.RequestMirror.BackendRef
mirrorBackendName := types.NamespacedName{
Name: string(mirrorBackend.Name),
Namespace: utils.GetNamespace(mirrorBackend.Namespace, httpRoute.Namespace),
}
_, ok := resourceParams.BackendMapping[backendName.String()]
if !ok {
return fmt.Errorf("backend: %s has not been resolved", backendName)
resolvedMirrorBackend, ok := resourceParams.BackendMapping[mirrorBackendName.String()]

if ok {
if resolvedMirrorBackend.CircuitBreaker != nil {
mirrorCircuitBreaker = &dpv1alpha1.CircuitBreaker{
MaxConnections: resolvedMirrorBackend.CircuitBreaker.MaxConnections,
MaxPendingRequests: resolvedMirrorBackend.CircuitBreaker.MaxPendingRequests,
MaxRequests: resolvedMirrorBackend.CircuitBreaker.MaxRequests,
MaxRetries: resolvedMirrorBackend.CircuitBreaker.MaxRetries,
MaxConnectionPools: resolvedMirrorBackend.CircuitBreaker.MaxConnectionPools,
}
}

if resolvedMirrorBackend.Timeout != nil {
isMirrorRouteTimeout = true
mirrorTimeoutInMillis = resolvedMirrorBackend.Timeout.UpstreamResponseTimeout * 1000
mirrorIdleTimeoutInSeconds = resolvedMirrorBackend.Timeout.DownstreamRequestIdleTimeout
}

if resolvedMirrorBackend.Retry != nil {
isMirrorRetryConfig = true
mirrorBackendRetryCount = resolvedMirrorBackend.Retry.Count
mirrorBaseIntervalInMillis = resolvedMirrorBackend.Retry.BaseIntervalMillis
if len(resolvedMirrorBackend.Retry.StatusCodes) > 0 {
mirrorStatusCodes = resolvedMirrorBackend.Retry.StatusCodes
}
}

if resolvedMirrorBackend.HealthCheck != nil {
mirrorHealthCheck = &dpv1alpha1.HealthCheck{
Interval: resolvedMirrorBackend.HealthCheck.Interval,
Timeout: resolvedMirrorBackend.HealthCheck.Timeout,
UnhealthyThreshold: resolvedMirrorBackend.HealthCheck.UnhealthyThreshold,
HealthyThreshold: resolvedMirrorBackend.HealthCheck.HealthyThreshold,
}
}
} else {
return fmt.Errorf("backend: %s has not been resolved", mirrorBackendName)
}
mirrorEndpoints := GetEndpoints(backendName, resourceParams.BackendMapping)

mirrorEndpoints := GetEndpoints(mirrorBackendName, resourceParams.BackendMapping)
if len(mirrorEndpoints) > 0 {
policyParameters["endpoints"] = mirrorEndpoints
mirrorEndpointCluster := &EndpointCluster{
Endpoints: mirrorEndpoints,
}
mirrorEndpointConfig := &EndpointConfig{}
if isMirrorRouteTimeout {
mirrorEndpointConfig.TimeoutInMillis = mirrorTimeoutInMillis
mirrorEndpointConfig.IdleTimeoutInSeconds = mirrorIdleTimeoutInSeconds
}
if mirrorCircuitBreaker != nil {
mirrorEndpointConfig.CircuitBreakers = &CircuitBreakers{
MaxConnections: int32(mirrorCircuitBreaker.MaxConnections),
MaxRequests: int32(mirrorCircuitBreaker.MaxRequests),
MaxPendingRequests: int32(mirrorCircuitBreaker.MaxPendingRequests),
MaxRetries: int32(mirrorCircuitBreaker.MaxRetries),
MaxConnectionPools: int32(mirrorCircuitBreaker.MaxConnectionPools),
}
}
if isMirrorRetryConfig {
mirrorEndpointConfig.RetryConfig = &RetryConfig{
Count: int32(mirrorBackendRetryCount),
StatusCodes: mirrorStatusCodes,
BaseIntervalInMillis: int32(mirrorBaseIntervalInMillis),
}
}
if mirrorHealthCheck != nil {
mirrorEndpointCluster.HealthCheck = &HealthCheck{
Interval: mirrorHealthCheck.Interval,
Timeout: mirrorHealthCheck.Timeout,
UnhealthyThreshold: mirrorHealthCheck.UnhealthyThreshold,
HealthyThreshold: mirrorHealthCheck.HealthyThreshold,
}
}
if isMirrorRouteTimeout || mirrorCircuitBreaker != nil || mirrorHealthCheck != nil || isMirrorRetryConfig {
mirrorEndpointCluster.Config = mirrorEndpointConfig
}
mirrorEndpointClusters = append(mirrorEndpointClusters, mirrorEndpointCluster)
}
mirrorEndpointsList = append(mirrorEndpointsList, mirrorEndpoints...)
policies.Request = append(policies.Request, Policy{
PolicyName: string(gwapiv1.HTTPRouteFilterRequestMirror),
Action: constants.ActionMirrorRequest,
Expand All @@ -732,10 +813,6 @@ func (adapterInternalAPI *AdapterInternalAPI) SetInfoHTTPRouteCR(httpRoute *gwap
loggers.LoggerOasparser.Debugf("Calculating auths for API ..., API_UUID = %v", adapterInternalAPI.UUID)
apiAuth := getSecurity(resourceAuthScheme)

if !hasRequestRedirectPolicy && len(rule.BackendRefs) < 1 {
return fmt.Errorf("no backendref were provided")
}

for _, match := range rule.Matches {
if hasURLRewritePolicy && hasRequestRedirectPolicy {
return fmt.Errorf("cannot have URL Rewrite and Request Redirect under the same rule")
Expand All @@ -756,14 +833,9 @@ func (adapterInternalAPI *AdapterInternalAPI) SetInfoHTTPRouteCR(httpRoute *gwap
})
}
resourcePath := adapterInternalAPI.xWso2Basepath + *match.Path.Value
var mirrorEndpointCluster *EndpointCluster
if len(mirrorEndpointsList) > 0 {
mirrorEndpointCluster = &EndpointCluster{
Endpoints: mirrorEndpointsList,
}
}

operations := getAllowedOperations(match.Method, policies, apiAuth,
parseRateLimitPolicyToInternal(resourceRatelimitPolicy), scopes, mirrorEndpointCluster)
parseRateLimitPolicyToInternal(resourceRatelimitPolicy), scopes, mirrorEndpointClusters)
resource := &Resource{path: resourcePath,
methods: operations,
pathMatchType: *match.Path.Type,
Expand Down
24 changes: 12 additions & 12 deletions adapter/internal/oasparser/model/api_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ type Operation struct {
iD string
method string
//security map of security scheme names -> list of scopes
scopes []string
auth *Authentication
tier string
disableSecurity bool
vendorExtensions map[string]interface{}
policies OperationPolicies
mockedAPIConfig *api.MockedApiConfig
rateLimitPolicy *RateLimitPolicy
mirrorEndpoints *EndpointCluster
scopes []string
auth *Authentication
tier string
disableSecurity bool
vendorExtensions map[string]interface{}
policies OperationPolicies
mockedAPIConfig *api.MockedApiConfig
rateLimitPolicy *RateLimitPolicy
mirrorEndpointClusters []*EndpointCluster
}

// Authentication holds authentication related configurations
Expand Down Expand Up @@ -126,9 +126,9 @@ func (operation *Operation) GetID() string {
return operation.iD
}

// GetMirrorEndpoints returns the endpoints if a mirror filter has been applied.
func (operation *Operation) GetMirrorEndpoints() *EndpointCluster {
return operation.mirrorEndpoints
// GetMirrorEndpointClusters returns the endpoints if a mirror filter has been applied.
func (operation *Operation) GetMirrorEndpointClusters() []*EndpointCluster {
return operation.mirrorEndpointClusters
}

// GetCallInterceptorService returns the interceptor configs for a given operation.
Expand Down
17 changes: 8 additions & 9 deletions adapter/internal/oasparser/model/http_route.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,26 +287,25 @@ func getSecurity(authScheme *dpv1alpha2.Authentication) *Authentication {

// getAllowedOperations retuns a list of allowed operatons, if httpMethod is not specified then all methods are allowed.
func getAllowedOperations(httpMethod *gwapiv1.HTTPMethod, policies OperationPolicies, auth *Authentication,
ratelimitPolicy *RateLimitPolicy, scopes []string, mirrorEndpoints *EndpointCluster) []*Operation {
ratelimitPolicy *RateLimitPolicy, scopes []string, mirrorEndpointClusters []*EndpointCluster) []*Operation {
if httpMethod != nil {
return []*Operation{{iD: uuid.New().String(), method: string(*httpMethod), policies: policies,
auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpoints: mirrorEndpoints}}

auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpointClusters: mirrorEndpointClusters}}
}
return []*Operation{{iD: uuid.New().String(), method: string(gwapiv1.HTTPMethodGet), policies: policies,
auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes},
{iD: uuid.New().String(), method: string(gwapiv1.HTTPMethodPost), policies: policies,
auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpoints: mirrorEndpoints},
auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpointClusters: mirrorEndpointClusters},
{iD: uuid.New().String(), method: string(gwapiv1.HTTPMethodDelete), policies: policies,
auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpoints: mirrorEndpoints},
auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpointClusters: mirrorEndpointClusters},
{iD: uuid.New().String(), method: string(gwapiv1.HTTPMethodPatch), policies: policies,
auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpoints: mirrorEndpoints},
auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpointClusters: mirrorEndpointClusters},
{iD: uuid.New().String(), method: string(gwapiv1.HTTPMethodPut), policies: policies,
auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpoints: mirrorEndpoints},
auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpointClusters: mirrorEndpointClusters},
{iD: uuid.New().String(), method: string(gwapiv1.HTTPMethodHead), policies: policies,
auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpoints: mirrorEndpoints},
auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpointClusters: mirrorEndpointClusters},
{iD: uuid.New().String(), method: string(gwapiv1.HTTPMethodOptions), policies: policies,
auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpoints: mirrorEndpoints}}
auth: auth, rateLimitPolicy: ratelimitPolicy, scopes: scopes, mirrorEndpointClusters: mirrorEndpointClusters}}
}

// SetInfoAPICR populates ID, ApiType, Version and XWso2BasePath of adapterInternalAPI.
Expand Down
30 changes: 19 additions & 11 deletions adapter/internal/operator/controllers/dp/api_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,21 +851,31 @@ func (apiReconciler *APIReconciler) getResolvedBackendsMapping(ctx context.Conte
for _, filter := range rule.Filters {
if filter.RequestMirror != nil {
mirrorBackend := filter.RequestMirror.BackendRef

mirrorBackendNamespacedName := types.NamespacedName{
Name: string(mirrorBackend.Name),
Namespace: utils.GetNamespace(mirrorBackend.Namespace, httpRoute.Namespace),
}
if _, exists := backendMapping[mirrorBackendNamespacedName.String()]; !exists {
resolvedMirrorBackend := utils.GetResolvedBackend(ctx, apiReconciler.client, mirrorBackendNamespacedName, &api)
if resolvedMirrorBackend != nil {
backendMapping[mirrorBackendNamespacedName.String()] = resolvedMirrorBackend
} else {
return nil, fmt.Errorf("unable to find backend %s", mirrorBackendNamespacedName.String())
if string(*mirrorBackend.Kind) == constants.KindBackend {
if _, exists := backendMapping[mirrorBackendNamespacedName.String()]; !exists {
resolvedMirrorBackend := utils.GetResolvedBackend(ctx, apiReconciler.client, mirrorBackendNamespacedName, &api)
if resolvedMirrorBackend != nil {
backendMapping[mirrorBackendNamespacedName.String()] = resolvedMirrorBackend
} else {
return nil, fmt.Errorf("unable to find backend %s", mirrorBackendNamespacedName.String())
}
}
} else if string(*mirrorBackend.Kind) == constants.KindService {
var err error
service, err := utils.GetService(ctx, apiReconciler.client, utils.GetNamespace(mirrorBackend.Namespace, httpRoute.Namespace), string(mirrorBackend.Name))
if err != nil {
return nil, fmt.Errorf("unable to find service %s", mirrorBackendNamespacedName.String())
}
backendMapping[mirrorBackendNamespacedName.String()], err = utils.GetResolvedBackendFromService(service, int(*mirrorBackend.Port))
if err != nil {
return nil, fmt.Errorf("error in getting service information %s", service)
}
}
}

}
}

Expand Down Expand Up @@ -1839,9 +1849,7 @@ func addIndexes(ctx context.Context, mgr manager.Manager) error {
authentication := rawObj.(*dpv1alpha2.Authentication)
var apis []string
if authentication.Spec.TargetRef.Kind == constants.KindAPI {

namespace, err := utils.ValidateAndRetrieveNamespace((*gwapiv1.Namespace)(authentication.Spec.TargetRef.Namespace), authentication.Namespace)

if err != nil {
loggers.LoggerAPKOperator.Errorf("Namespace mismatch. TargetRef %s needs to be in the same namespace as the Athentication %s. Expected: %s, Actual: %s",
string(authentication.Spec.TargetRef.Name), authentication.Name, authentication.Namespace, string(*authentication.Spec.TargetRef.Namespace))
Expand Down Expand Up @@ -1956,7 +1964,7 @@ func addIndexes(ctx context.Context, mgr manager.Manager) error {
return err
}

// ratelimite policy to API indexer
// ratelimit policy to API indexer
if err := mgr.GetFieldIndexer().IndexField(ctx, &dpv1alpha1.RateLimitPolicy{}, apiRateLimitIndex,
func(rawObj k8client.Object) []string {
ratelimitPolicy := rawObj.(*dpv1alpha1.RateLimitPolicy)
Expand Down
Loading

0 comments on commit 7edbae0

Please sign in to comment.