Skip to content

Commit

Permalink
Merge pull request #2092 from DDH13/main
Browse files Browse the repository at this point in the history
Add gRPC support for APK
  • Loading branch information
AmaliMatharaarachchi committed Apr 26, 2024
2 parents d9749db + 60fcea5 commit 2ea0b3a
Show file tree
Hide file tree
Showing 32 changed files with 2,717 additions and 15 deletions.
1 change: 1 addition & 0 deletions adapter/internal/oasparser/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ const (
SOAP string = "SOAP"
WS string = "WS"
GRAPHQL string = "GraphQL"
GRPC string = "GRPC"
WEBHOOK string = "WEBHOOK"
SSE string = "SSE"
Prototyped string = "prototyped"
Expand Down
97 changes: 93 additions & 4 deletions adapter/internal/oasparser/envoyconf/routes_with_clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,73 @@ func CreateRoutesWithClusters(adapterInternalAPI *model.AdapterInternalAPI, inte
}
return routes, clusters, endpoints, nil
}
if adapterInternalAPI.GetAPIType() == constants.GRPC {
basePath := strings.TrimSuffix(adapterInternalAPI.Endpoints.Endpoints[0].Basepath, "/")

clusterName := getClusterName(adapterInternalAPI.Endpoints.EndpointPrefix, organizationID, vHost,
adapterInternalAPI.GetTitle(), apiVersion, "")
adapterInternalAPI.Endpoints.HTTP2BackendEnabled = true
cluster, address, err := processEndpoints(clusterName, adapterInternalAPI.Endpoints, timeout, basePath)
if err != nil {
logger.LoggerOasparser.ErrorC(logging.PrintError(logging.Error2239, logging.MAJOR,
"Error while adding grpc endpoints for %s:%v. %v", apiTitle, apiVersion, err.Error()))
return nil, nil, nil, fmt.Errorf("error while adding grpc endpoints for %s:%v. %v", apiTitle, apiVersion,
err.Error())
}
clusters = append(clusters, cluster)
endpoints = append(endpoints, address...)

for _, resource := range adapterInternalAPI.GetResources() {
var clusterName string
resourcePath := resource.GetPath()
endpoint := resource.GetEndpoints()
endpoint.HTTP2BackendEnabled = true
basePath := strings.TrimSuffix(endpoint.Endpoints[0].Basepath, "/")
existingClusterName := getExistingClusterName(*endpoint, processedEndpoints)

if existingClusterName == "" {
clusterName = getClusterName(endpoint.EndpointPrefix, organizationID, vHost, adapterInternalAPI.GetTitle(), apiVersion, resource.GetID())
cluster, address, err := processEndpoints(clusterName, endpoint, timeout, basePath)
if err != nil {
logger.LoggerOasparser.ErrorC(logging.PrintError(logging.Error2239, logging.MAJOR, "Error while adding resource level endpoints for %s:%v-%v. %v", apiTitle, apiVersion, resourcePath, err.Error()))
} else {
clusters = append(clusters, cluster)
endpoints = append(endpoints, address...)
processedEndpoints[clusterName] = *endpoint
}
} else {
clusterName = existingClusterName
}
// Create resource level interceptor clusters if required
clustersI, endpointsI, operationalReqInterceptors, operationalRespInterceptorVal := createInterceptorResourceClusters(adapterInternalAPI,
interceptorCerts, vHost, organizationID, apiRequestInterceptor, apiResponseInterceptor, resource)
clusters = append(clusters, clustersI...)
endpoints = append(endpoints, endpointsI...)
routeParams := genRouteCreateParams(adapterInternalAPI, resource, vHost, basePath, clusterName, *operationalReqInterceptors, *operationalRespInterceptorVal, organizationID,
false, false)

routeP, err := createRoutes(routeParams)
if err != nil {
logger.LoggerXds.ErrorC(logging.PrintError(logging.Error2231, logging.MAJOR,
"Error while creating routes for GRPC API %s %s for path: %s Error: %s", adapterInternalAPI.GetTitle(),
adapterInternalAPI.GetVersion(), resource.GetPath(), err.Error()))
return nil, nil, nil, fmt.Errorf("error while creating routes. %v", err)
}
routes = append(routes, routeP...)
if adapterInternalAPI.IsDefaultVersion {
defaultRoutes, errDefaultPath := createRoutes(genRouteCreateParams(adapterInternalAPI, resource, vHost, basePath, clusterName, *operationalReqInterceptors, *operationalRespInterceptorVal, organizationID,
false, true))
if errDefaultPath != nil {
logger.LoggerXds.ErrorC(logging.PrintError(logging.Error2231, logging.MAJOR, "Error while creating routes for GRPC API %s %s for path: %s Error: %s", adapterInternalAPI.GetTitle(), adapterInternalAPI.GetVersion(), removeFirstOccurrence(resource.GetPath(), adapterInternalAPI.GetVersion()), errDefaultPath.Error()))
return nil, nil, nil, fmt.Errorf("error while creating routes. %v", errDefaultPath)
}
routes = append(routes, defaultRoutes...)
}

}

return routes, clusters, endpoints, nil
}
for _, resource := range adapterInternalAPI.GetResources() {
var clusterName string
resourcePath := resource.GetPath()
Expand Down Expand Up @@ -860,8 +927,14 @@ func createRoutes(params *routeCreateParams) (routes []*routev3.Route, err error
decorator *routev3.Decorator
)
if params.createDefaultPath {
xWso2Basepath = removeFirstOccurrence(xWso2Basepath, "/"+version)
resourcePath = removeFirstOccurrence(resource.GetPath(), "/"+version)
//check if basepath is separated from version by a . or /
if strings.Contains(basePath, "."+version) {
xWso2Basepath = removeFirstOccurrence(basePath, "."+version)
resourcePath = removeFirstOccurrence(resource.GetPath(), "."+version)
} else {
xWso2Basepath = removeFirstOccurrence(xWso2Basepath, "/"+version)
resourcePath = removeFirstOccurrence(resource.GetPath(), "/"+version)
}
}

if pathMatchType != gwapiv1.PathMatchExact {
Expand Down Expand Up @@ -1058,6 +1131,16 @@ func createRoutes(params *routeCreateParams) (routes []*routev3.Route, err error
rewritePath := generateRoutePathForReWrite(basePath, resourcePath, pathMatchType)
action.Route.RegexRewrite = generateRegexMatchAndSubstitute(rewritePath, resourcePath, pathMatchType)

if apiType == "GRPC" {
match.Headers = nil
newRoutePath := "/" + strings.TrimPrefix(resourcePath, basePath+".")
if newRoutePath == "/"+resourcePath {
temp := removeFirstOccurrence(basePath, "."+version)
newRoutePath = "/" + strings.TrimPrefix(resourcePath, temp+".")
}
action.Route.RegexRewrite = generateRegexMatchAndSubstitute(rewritePath, newRoutePath, pathMatchType)
}

route := generateRouteConfig(xWso2Basepath, match, action, nil, decorator, perRouteFilterConfigs,
nil, nil, nil, nil) // general headers to add and remove are included in this methods
routes = append(routes, route)
Expand Down Expand Up @@ -1209,8 +1292,14 @@ func CreateAPIDefinitionEndpoint(adapterInternalAPI *model.AdapterInternalAPI, v

matchPath := basePath + endpoint
if isDefaultversion {
basePathWithoutVersion := removeLastOccurrence(basePath, "/"+version)
matchPath = basePathWithoutVersion + endpoint
if adapterInternalAPI.GetAPIType() == "GRPC" {
basePathWithoutVersion := removeLastOccurrence(basePath, "."+version)
matchPath = basePathWithoutVersion + "/" + vHost + endpoint
} else {
basePathWithoutVersion := removeLastOccurrence(basePath, "/"+version)
matchPath = basePathWithoutVersion + endpoint

}
}

matchPath = strings.Replace(matchPath, basePath, regexp.QuoteMeta(basePath), 1)
Expand Down
177 changes: 177 additions & 0 deletions adapter/internal/oasparser/model/adapter_internal_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"net/url"
gwapiv1a2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -940,6 +941,182 @@ func (adapterInternalAPI *AdapterInternalAPI) SetInfoGQLRouteCR(gqlRoute *dpv1al
return nil
}

// SetInfoGRPCRouteCR populates resources and endpoints of adapterInternalAPI. httpRoute.Spec.Rules.Matches
// are used to create resources and httpRoute.Spec.Rules.BackendRefs are used to create EndpointClusters.
func (adapterInternalAPI *AdapterInternalAPI) SetInfoGRPCRouteCR(grpcRoute *gwapiv1a2.GRPCRoute, resourceParams ResourceParams) error {
var resources []*Resource
outputAuthScheme := utils.TieBreaker(utils.GetPtrSlice(maps.Values(resourceParams.AuthSchemes)))
outputAPIPolicy := utils.TieBreaker(utils.GetPtrSlice(maps.Values(resourceParams.APIPolicies)))
outputRatelimitPolicy := utils.TieBreaker(utils.GetPtrSlice(maps.Values(resourceParams.RateLimitPolicies)))

disableScopes := true
config := config.ReadConfigs()

var authScheme *dpv1alpha2.Authentication
if outputAuthScheme != nil {
authScheme = *outputAuthScheme
}
var apiPolicy *dpv1alpha2.APIPolicy
if outputAPIPolicy != nil {
apiPolicy = *outputAPIPolicy
}
var ratelimitPolicy *dpv1alpha1.RateLimitPolicy
if outputRatelimitPolicy != nil {
ratelimitPolicy = *outputRatelimitPolicy
}

//We are only supporting one backend for now
backend := grpcRoute.Spec.Rules[0].BackendRefs[0]
backendName := types.NamespacedName{
Name: string(backend.Name),
Namespace: utils.GetNamespace(backend.Namespace, grpcRoute.Namespace),
}
resolvedBackend, ok := resourceParams.BackendMapping[backendName.String()]
if ok {
endpointConfig := &EndpointConfig{}
if resolvedBackend.CircuitBreaker != nil {
endpointConfig.CircuitBreakers = &CircuitBreakers{
MaxConnections: int32(resolvedBackend.CircuitBreaker.MaxConnections),
MaxRequests: int32(resolvedBackend.CircuitBreaker.MaxRequests),
MaxPendingRequests: int32(resolvedBackend.CircuitBreaker.MaxPendingRequests),
MaxRetries: int32(resolvedBackend.CircuitBreaker.MaxRetries),
MaxConnectionPools: int32(resolvedBackend.CircuitBreaker.MaxConnectionPools),
}
}
if resolvedBackend.Timeout != nil {
endpointConfig.TimeoutInMillis = resolvedBackend.Timeout.UpstreamResponseTimeout * 1000
endpointConfig.IdleTimeoutInSeconds = resolvedBackend.Timeout.DownstreamRequestIdleTimeout
}
if resolvedBackend.Retry != nil {
statusCodes := config.Envoy.Upstream.Retry.StatusCodes
if len(resolvedBackend.Retry.StatusCodes) > 0 {
statusCodes = resolvedBackend.Retry.StatusCodes
}
endpointConfig.RetryConfig = &RetryConfig{
Count: int32(resolvedBackend.Retry.Count),
StatusCodes: statusCodes,
BaseIntervalInMillis: int32(resolvedBackend.Retry.BaseIntervalMillis),
}
}
adapterInternalAPI.Endpoints = &EndpointCluster{
Endpoints: GetEndpoints(backendName, resourceParams.BackendMapping),
Config: endpointConfig,
}
if resolvedBackend.HealthCheck != nil {
adapterInternalAPI.Endpoints.HealthCheck = &HealthCheck{
Interval: resolvedBackend.HealthCheck.Interval,
Timeout: resolvedBackend.HealthCheck.Timeout,
UnhealthyThreshold: resolvedBackend.HealthCheck.UnhealthyThreshold,
HealthyThreshold: resolvedBackend.HealthCheck.HealthyThreshold,
}
}

var securityConfig []EndpointSecurity
switch resolvedBackend.Security.Type {
case "Basic":
securityConfig = append(securityConfig, EndpointSecurity{
Password: string(resolvedBackend.Security.Basic.Password),
Username: string(resolvedBackend.Security.Basic.Username),
Type: string(resolvedBackend.Security.Type),
Enabled: true,
})
}
adapterInternalAPI.EndpointSecurity = utils.GetPtrSlice(securityConfig)
} else {
return fmt.Errorf("backend: %s has not been resolved", backendName)
}

for _, rule := range grpcRoute.Spec.Rules {
var policies = OperationPolicies{}
var endPoints []Endpoint
resourceAuthScheme := authScheme
resourceRatelimitPolicy := ratelimitPolicy
var scopes []string
for _, filter := range rule.Filters {
if filter.ExtensionRef != nil && filter.ExtensionRef.Kind == constants.KindAuthentication {
if ref, found := resourceParams.ResourceAuthSchemes[types.NamespacedName{
Name: string(filter.ExtensionRef.Name),
Namespace: grpcRoute.Namespace,
}.String()]; found {
resourceAuthScheme = concatAuthSchemes(authScheme, &ref)
} else {
return fmt.Errorf(`auth scheme: %s has not been resolved, spec.targetRef.kind should be
'Resource' in resource level Authentications`, filter.ExtensionRef.Name)
}
}
if filter.ExtensionRef != nil && filter.ExtensionRef.Kind == constants.KindScope {
if ref, found := resourceParams.ResourceScopes[types.NamespacedName{
Name: string(filter.ExtensionRef.Name),
Namespace: grpcRoute.Namespace,
}.String()]; found {
scopes = ref.Spec.Names
disableScopes = false
} else {
return fmt.Errorf("scope: %s has not been resolved in namespace %s", filter.ExtensionRef.Name, grpcRoute.Namespace)
}
}
if filter.ExtensionRef != nil && filter.ExtensionRef.Kind == constants.KindRateLimitPolicy {
if ref, found := resourceParams.ResourceRateLimitPolicies[types.NamespacedName{
Name: string(filter.ExtensionRef.Name),
Namespace: grpcRoute.Namespace,
}.String()]; found {
resourceRatelimitPolicy = concatRateLimitPolicies(ratelimitPolicy, &ref)
} else {
return fmt.Errorf(`ratelimitpolicy: %s has not been resolved, spec.targetRef.kind should be
'Resource' in resource level RateLimitPolicies`, filter.ExtensionRef.Name)
}
}
}
resourceAuthScheme = concatAuthSchemes(resourceAuthScheme, nil)
resourceRatelimitPolicy = concatRateLimitPolicies(resourceRatelimitPolicy, nil)

loggers.LoggerOasparser.Debugf("Calculating auths for API ..., API_UUID = %v", adapterInternalAPI.UUID)
apiAuth := getSecurity(resourceAuthScheme)

for _, match := range rule.Matches {
resourcePath := adapterInternalAPI.GetXWso2Basepath() + "." + *match.Method.Service + "/" + *match.Method.Method
endPoints = append(endPoints, GetEndpoints(backendName, resourceParams.BackendMapping)...)
resource := &Resource{path: resourcePath, pathMatchType: "Exact",
methods: []*Operation{{iD: uuid.New().String(), method: "post", policies: policies,
auth: apiAuth, rateLimitPolicy: parseRateLimitPolicyToInternal(resourceRatelimitPolicy), scopes: scopes}},
iD: uuid.New().String(),
}
endpoints := GetEndpoints(backendName, resourceParams.BackendMapping)
resource.endpoints = &EndpointCluster{
Endpoints: endpoints,
}
resources = append(resources, resource)
}
}

ratelimitPolicy = concatRateLimitPolicies(ratelimitPolicy, nil)
apiPolicy = concatAPIPolicies(apiPolicy, nil)
authScheme = concatAuthSchemes(authScheme, nil)

adapterInternalAPI.RateLimitPolicy = parseRateLimitPolicyToInternal(ratelimitPolicy)
adapterInternalAPI.resources = resources
adapterInternalAPI.xWso2Cors = getCorsConfigFromAPIPolicy(apiPolicy)
if authScheme.Spec.Override != nil && authScheme.Spec.Override.Disabled != nil {
adapterInternalAPI.disableAuthentications = *authScheme.Spec.Override.Disabled
}
authSpec := utils.SelectPolicy(&authScheme.Spec.Override, &authScheme.Spec.Default, nil, nil)
if authSpec != nil && authSpec.AuthTypes != nil && authSpec.AuthTypes.Oauth2.Required != "" {
adapterInternalAPI.SetXWSO2ApplicationSecurity(authSpec.AuthTypes.Oauth2.Required == "mandatory")
} else {
adapterInternalAPI.SetXWSO2ApplicationSecurity(true)
}
adapterInternalAPI.disableScopes = disableScopes
// Check whether the API has a backend JWT token
if apiPolicy != nil && apiPolicy.Spec.Override != nil && apiPolicy.Spec.Override.BackendJWTPolicy != nil {
backendJWTPolicy := resourceParams.BackendJWTMapping[types.NamespacedName{
Name: apiPolicy.Spec.Override.BackendJWTPolicy.Name,
Namespace: grpcRoute.Namespace,
}.String()].Spec
adapterInternalAPI.backendJWTTokenInfo = parseBackendJWTTokenToInternal(backendJWTPolicy)
}
return nil
}

func (endpoint *Endpoint) validateEndpoint() error {
if endpoint.Port == 0 || endpoint.Port > 65535 {
return errors.New("endpoint port value should be between 0 and 65535")
Expand Down
2 changes: 1 addition & 1 deletion adapter/internal/oasparser/model/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func getHostandBasepathandPort(apiType string, rawURL string) (*Endpoint, error)
rawURL = strings.Trim(rawURL, " ")

if !strings.Contains(rawURL, "://") {
if apiType == constants.REST || apiType == constants.GRAPHQL {
if apiType == constants.REST || apiType == constants.GRAPHQL || apiType == constants.GRPC {
rawURL = "http://" + rawURL
} else if apiType == constants.WS {
rawURL = "ws://" + rawURL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ spec:
type: array
apiType:
description: APIType denotes the type of the API. Possible values
could be REST, GraphQL, Async
could be REST, GraphQL, GRPC Async
enum:
- REST
type: string
Expand Down Expand Up @@ -244,10 +244,11 @@ spec:
type: array
apiType:
description: APIType denotes the type of the API. Possible values
could be REST, GraphQL, Async
could be REST, GraphQL, GRPC Async
enum:
- REST
- GraphQL
- GRPC
type: string
apiVersion:
description: APIVersion is the version number of the API.
Expand Down
Loading

0 comments on commit 2ea0b3a

Please sign in to comment.