Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add gRPC support for APK #2092

Merged
merged 52 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
58a6df5
add GRPC api type for API CR
DDH13 Jan 18, 2024
8287d0f
gitignore router/resources folder
DDH13 Jan 18, 2024
29ee66c
c1 after rebase
DDH13 Mar 19, 2024
b63f91a
c2
DDH13 Jan 29, 2024
1f62729
reconciled grpc backend
DDH13 Jan 30, 2024
ea2814e
add function SetInfoGRPCRouteCR
DDH13 Jan 31, 2024
bdcbf93
duplicate methods from GQL to grpc_api.go
DDH13 Jan 31, 2024
9e85a03
minor changes
DDH13 Jan 31, 2024
7186f27
changed synchronizer.go. Builds without errors
DDH13 Jan 31, 2024
db8d739
so far everything works
DDH13 Feb 2, 2024
93d1d75
removed policies
DDH13 Feb 7, 2024
b16758e
enable http2 backend for grpc
DDH13 Feb 8, 2024
7e3b2c8
remove comment
DDH13 Feb 19, 2024
b4a5ba9
add constants
DDH13 Feb 19, 2024
302b36d
commented out unimplemented function in enforcer
DDH13 Feb 20, 2024
0c548bb
bug fixes after rebasing
DDH13 Feb 20, 2024
3e443ae
fixed envoy configurations
DDH13 Feb 26, 2024
7d7af55
configured grpc routes on adapter side
DDH13 Feb 28, 2024
c2b99c8
debugged context extensions not being set
DDH13 Mar 1, 2024
035e9ef
Request gets matched to grpc api correctly
DDH13 Mar 1, 2024
e91eb18
Request gets matched to resource correctly
DDH13 Mar 1, 2024
6f1c0b0
enabled http2 backend and unset transport socket matching
DDH13 Mar 1, 2024
3da8731
add grpc CRs for testing
DDH13 Mar 5, 2024
8220d52
add custom grpc backend deployment to manifest
DDH13 Mar 5, 2024
0af1bfb
update grpc CRs
DDH13 Mar 5, 2024
a1e1ff6
fix transport socket not matching
DDH13 Mar 8, 2024
807edbf
change gRPC CRs
DDH13 Mar 9, 2024
d3ed16a
remove deployment and manifest for grpc backend pod
DDH13 Mar 11, 2024
0ff06d6
add CRs for grpc api
DDH13 Mar 11, 2024
96b3f6a
add empty test for grpc api
DDH13 Mar 11, 2024
4ce242b
works with postman client
DDH13 Mar 14, 2024
ece6f36
changed grpc CR for test
DDH13 Mar 16, 2024
dc5a567
move basepath validation for API CRs
DDH13 Mar 16, 2024
fe60f67
change test files
DDH13 Mar 18, 2024
19084f0
add testcases and helpers
DDH13 Mar 19, 2024
16498ab
add generated client code
DDH13 Mar 19, 2024
dfe02b4
change test case timeout
DDH13 Mar 21, 2024
6bb830d
changed request timeout
DDH13 Mar 26, 2024
cc05715
fix after release
DDH13 Apr 15, 2024
0170a21
changed method to post
DDH13 Apr 15, 2024
67fc8e3
fixed auth
DDH13 Apr 15, 2024
d0a8a48
change 2023 to 2024
DDH13 Apr 16, 2024
c03b37e
removed TODOs
DDH13 Apr 16, 2024
cbdc119
change https to http in testcase
DDH13 Apr 16, 2024
3e66672
generalize testcase
DDH13 Apr 16, 2024
760761b
generalize testcase a bit more
DDH13 Apr 16, 2024
271ff0f
add testcase for grpc api default version
DDH13 Apr 17, 2024
1d7aea9
add default version capability for grpc api
DDH13 Apr 17, 2024
f5e9185
modify test for grpc default version testing
DDH13 Apr 17, 2024
290c65f
resolved comments
DDH13 Apr 24, 2024
4389907
reconciles grpc scopes
DDH13 Apr 25, 2024
60fcea5
remove todo
DDH13 Apr 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions adapter/internal/oasparser/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ const (
SOAP string = "SOAP"
WS string = "WS"
GRAPHQL string = "GraphQL"
GRPC string = "GRPC"
WEBHOOK string = "WEBHOOK"
SSE string = "SSE"
Prototyped string = "prototyped"
Expand Down
97 changes: 93 additions & 4 deletions adapter/internal/oasparser/envoyconf/routes_with_clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,73 @@ func CreateRoutesWithClusters(adapterInternalAPI *model.AdapterInternalAPI, inte
}
return routes, clusters, endpoints, nil
}
if adapterInternalAPI.GetAPIType() == constants.GRPC {
basePath := strings.TrimSuffix(adapterInternalAPI.Endpoints.Endpoints[0].Basepath, "/")

clusterName := getClusterName(adapterInternalAPI.Endpoints.EndpointPrefix, organizationID, vHost,
adapterInternalAPI.GetTitle(), apiVersion, "")
adapterInternalAPI.Endpoints.HTTP2BackendEnabled = true
cluster, address, err := processEndpoints(clusterName, adapterInternalAPI.Endpoints, timeout, basePath)
if err != nil {
logger.LoggerOasparser.ErrorC(logging.PrintError(logging.Error2239, logging.MAJOR,
DDH13 marked this conversation as resolved.
Show resolved Hide resolved
"Error while adding grpc endpoints for %s:%v. %v", apiTitle, apiVersion, err.Error()))
return nil, nil, nil, fmt.Errorf("error while adding grpc endpoints for %s:%v. %v", apiTitle, apiVersion,
err.Error())
}
clusters = append(clusters, cluster)
endpoints = append(endpoints, address...)

for _, resource := range adapterInternalAPI.GetResources() {
var clusterName string
resourcePath := resource.GetPath()
endpoint := resource.GetEndpoints()
endpoint.HTTP2BackendEnabled = true
basePath := strings.TrimSuffix(endpoint.Endpoints[0].Basepath, "/")
existingClusterName := getExistingClusterName(*endpoint, processedEndpoints)

if existingClusterName == "" {
clusterName = getClusterName(endpoint.EndpointPrefix, organizationID, vHost, adapterInternalAPI.GetTitle(), apiVersion, resource.GetID())
DDH13 marked this conversation as resolved.
Show resolved Hide resolved
cluster, address, err := processEndpoints(clusterName, endpoint, timeout, basePath)
if err != nil {
logger.LoggerOasparser.ErrorC(logging.PrintError(logging.Error2239, logging.MAJOR, "Error while adding resource level endpoints for %s:%v-%v. %v", apiTitle, apiVersion, resourcePath, err.Error()))
} else {
clusters = append(clusters, cluster)
endpoints = append(endpoints, address...)
processedEndpoints[clusterName] = *endpoint
}
} else {
clusterName = existingClusterName
}
// Create resource level interceptor clusters if required
clustersI, endpointsI, operationalReqInterceptors, operationalRespInterceptorVal := createInterceptorResourceClusters(adapterInternalAPI,
AmaliMatharaarachchi marked this conversation as resolved.
Show resolved Hide resolved
interceptorCerts, vHost, organizationID, apiRequestInterceptor, apiResponseInterceptor, resource)
clusters = append(clusters, clustersI...)
endpoints = append(endpoints, endpointsI...)
routeParams := genRouteCreateParams(adapterInternalAPI, resource, vHost, basePath, clusterName, *operationalReqInterceptors, *operationalRespInterceptorVal, organizationID,
false, false)

routeP, err := createRoutes(routeParams)
if err != nil {
logger.LoggerXds.ErrorC(logging.PrintError(logging.Error2231, logging.MAJOR,
"Error while creating routes for GRPC API %s %s for path: %s Error: %s", adapterInternalAPI.GetTitle(),
adapterInternalAPI.GetVersion(), resource.GetPath(), err.Error()))
return nil, nil, nil, fmt.Errorf("error while creating routes. %v", err)
}
routes = append(routes, routeP...)
if adapterInternalAPI.IsDefaultVersion {
defaultRoutes, errDefaultPath := createRoutes(genRouteCreateParams(adapterInternalAPI, resource, vHost, basePath, clusterName, *operationalReqInterceptors, *operationalRespInterceptorVal, organizationID,
false, true))
if errDefaultPath != nil {
logger.LoggerXds.ErrorC(logging.PrintError(logging.Error2231, logging.MAJOR, "Error while creating routes for GRPC API %s %s for path: %s Error: %s", adapterInternalAPI.GetTitle(), adapterInternalAPI.GetVersion(), removeFirstOccurrence(resource.GetPath(), adapterInternalAPI.GetVersion()), errDefaultPath.Error()))
return nil, nil, nil, fmt.Errorf("error while creating routes. %v", errDefaultPath)
}
routes = append(routes, defaultRoutes...)
}

}

return routes, clusters, endpoints, nil
}
for _, resource := range adapterInternalAPI.GetResources() {
var clusterName string
resourcePath := resource.GetPath()
Expand Down Expand Up @@ -860,8 +927,14 @@ func createRoutes(params *routeCreateParams) (routes []*routev3.Route, err error
decorator *routev3.Decorator
)
if params.createDefaultPath {
xWso2Basepath = removeFirstOccurrence(xWso2Basepath, "/"+version)
resourcePath = removeFirstOccurrence(resource.GetPath(), "/"+version)
//check if basepath is separated from version by a . or /
if strings.Contains(basePath, "."+version) {
xWso2Basepath = removeFirstOccurrence(basePath, "."+version)
resourcePath = removeFirstOccurrence(resource.GetPath(), "."+version)
} else {
xWso2Basepath = removeFirstOccurrence(xWso2Basepath, "/"+version)
resourcePath = removeFirstOccurrence(resource.GetPath(), "/"+version)
}
}

if pathMatchType != gwapiv1.PathMatchExact {
Expand Down Expand Up @@ -1058,6 +1131,16 @@ func createRoutes(params *routeCreateParams) (routes []*routev3.Route, err error
rewritePath := generateRoutePathForReWrite(basePath, resourcePath, pathMatchType)
action.Route.RegexRewrite = generateRegexMatchAndSubstitute(rewritePath, resourcePath, pathMatchType)

if apiType == "GRPC" {
DDH13 marked this conversation as resolved.
Show resolved Hide resolved
match.Headers = nil
newRoutePath := "/" + strings.TrimPrefix(resourcePath, basePath+".")
if newRoutePath == "/"+resourcePath {
temp := removeFirstOccurrence(basePath, "."+version)
newRoutePath = "/" + strings.TrimPrefix(resourcePath, temp+".")
}
action.Route.RegexRewrite = generateRegexMatchAndSubstitute(rewritePath, newRoutePath, pathMatchType)
}

route := generateRouteConfig(xWso2Basepath, match, action, nil, decorator, perRouteFilterConfigs,
nil, nil, nil, nil) // general headers to add and remove are included in this methods
routes = append(routes, route)
Expand Down Expand Up @@ -1209,8 +1292,14 @@ func CreateAPIDefinitionEndpoint(adapterInternalAPI *model.AdapterInternalAPI, v

matchPath := basePath + endpoint
if isDefaultversion {
basePathWithoutVersion := removeLastOccurrence(basePath, "/"+version)
matchPath = basePathWithoutVersion + endpoint
if adapterInternalAPI.GetAPIType() == "GRPC" {
basePathWithoutVersion := removeLastOccurrence(basePath, "."+version)
matchPath = basePathWithoutVersion + "/" + vHost + endpoint
} else {
basePathWithoutVersion := removeLastOccurrence(basePath, "/"+version)
matchPath = basePathWithoutVersion + endpoint

}
}

matchPath = strings.Replace(matchPath, basePath, regexp.QuoteMeta(basePath), 1)
Expand Down
177 changes: 177 additions & 0 deletions adapter/internal/oasparser/model/adapter_internal_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"net/url"
gwapiv1a2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -940,6 +941,182 @@ func (adapterInternalAPI *AdapterInternalAPI) SetInfoGQLRouteCR(gqlRoute *dpv1al
return nil
}

// SetInfoGRPCRouteCR populates resources and endpoints of adapterInternalAPI. httpRoute.Spec.Rules.Matches
// are used to create resources and httpRoute.Spec.Rules.BackendRefs are used to create EndpointClusters.
func (adapterInternalAPI *AdapterInternalAPI) SetInfoGRPCRouteCR(grpcRoute *gwapiv1a2.GRPCRoute, resourceParams ResourceParams) error {
var resources []*Resource
outputAuthScheme := utils.TieBreaker(utils.GetPtrSlice(maps.Values(resourceParams.AuthSchemes)))
outputAPIPolicy := utils.TieBreaker(utils.GetPtrSlice(maps.Values(resourceParams.APIPolicies)))
outputRatelimitPolicy := utils.TieBreaker(utils.GetPtrSlice(maps.Values(resourceParams.RateLimitPolicies)))

disableScopes := true
config := config.ReadConfigs()

var authScheme *dpv1alpha2.Authentication
if outputAuthScheme != nil {
authScheme = *outputAuthScheme
}
var apiPolicy *dpv1alpha2.APIPolicy
if outputAPIPolicy != nil {
apiPolicy = *outputAPIPolicy
}
var ratelimitPolicy *dpv1alpha1.RateLimitPolicy
if outputRatelimitPolicy != nil {
ratelimitPolicy = *outputRatelimitPolicy
}

//We are only supporting one backend for now
backend := grpcRoute.Spec.Rules[0].BackendRefs[0]
backendName := types.NamespacedName{
Name: string(backend.Name),
Namespace: utils.GetNamespace(backend.Namespace, grpcRoute.Namespace),
}
resolvedBackend, ok := resourceParams.BackendMapping[backendName.String()]
if ok {
endpointConfig := &EndpointConfig{}
if resolvedBackend.CircuitBreaker != nil {
endpointConfig.CircuitBreakers = &CircuitBreakers{
MaxConnections: int32(resolvedBackend.CircuitBreaker.MaxConnections),
MaxRequests: int32(resolvedBackend.CircuitBreaker.MaxRequests),
MaxPendingRequests: int32(resolvedBackend.CircuitBreaker.MaxPendingRequests),
MaxRetries: int32(resolvedBackend.CircuitBreaker.MaxRetries),
MaxConnectionPools: int32(resolvedBackend.CircuitBreaker.MaxConnectionPools),
}
}
if resolvedBackend.Timeout != nil {
endpointConfig.TimeoutInMillis = resolvedBackend.Timeout.UpstreamResponseTimeout * 1000
endpointConfig.IdleTimeoutInSeconds = resolvedBackend.Timeout.DownstreamRequestIdleTimeout
}
if resolvedBackend.Retry != nil {
statusCodes := config.Envoy.Upstream.Retry.StatusCodes
if len(resolvedBackend.Retry.StatusCodes) > 0 {
statusCodes = resolvedBackend.Retry.StatusCodes
}
endpointConfig.RetryConfig = &RetryConfig{
Count: int32(resolvedBackend.Retry.Count),
StatusCodes: statusCodes,
BaseIntervalInMillis: int32(resolvedBackend.Retry.BaseIntervalMillis),
}
}
adapterInternalAPI.Endpoints = &EndpointCluster{
Endpoints: GetEndpoints(backendName, resourceParams.BackendMapping),
Config: endpointConfig,
}
if resolvedBackend.HealthCheck != nil {
adapterInternalAPI.Endpoints.HealthCheck = &HealthCheck{
Interval: resolvedBackend.HealthCheck.Interval,
Timeout: resolvedBackend.HealthCheck.Timeout,
UnhealthyThreshold: resolvedBackend.HealthCheck.UnhealthyThreshold,
HealthyThreshold: resolvedBackend.HealthCheck.HealthyThreshold,
}
}

var securityConfig []EndpointSecurity
switch resolvedBackend.Security.Type {
case "Basic":
securityConfig = append(securityConfig, EndpointSecurity{
Password: string(resolvedBackend.Security.Basic.Password),
Username: string(resolvedBackend.Security.Basic.Username),
Type: string(resolvedBackend.Security.Type),
Enabled: true,
})
}
adapterInternalAPI.EndpointSecurity = utils.GetPtrSlice(securityConfig)
} else {
return fmt.Errorf("backend: %s has not been resolved", backendName)
}

for _, rule := range grpcRoute.Spec.Rules {
var policies = OperationPolicies{}
var endPoints []Endpoint
resourceAuthScheme := authScheme
resourceRatelimitPolicy := ratelimitPolicy
var scopes []string
for _, filter := range rule.Filters {
if filter.ExtensionRef != nil && filter.ExtensionRef.Kind == constants.KindAuthentication {
if ref, found := resourceParams.ResourceAuthSchemes[types.NamespacedName{
Name: string(filter.ExtensionRef.Name),
Namespace: grpcRoute.Namespace,
}.String()]; found {
resourceAuthScheme = concatAuthSchemes(authScheme, &ref)
} else {
return fmt.Errorf(`auth scheme: %s has not been resolved, spec.targetRef.kind should be
'Resource' in resource level Authentications`, filter.ExtensionRef.Name)
}
}
if filter.ExtensionRef != nil && filter.ExtensionRef.Kind == constants.KindScope {
if ref, found := resourceParams.ResourceScopes[types.NamespacedName{
Name: string(filter.ExtensionRef.Name),
Namespace: grpcRoute.Namespace,
}.String()]; found {
scopes = ref.Spec.Names
disableScopes = false
} else {
return fmt.Errorf("scope: %s has not been resolved in namespace %s", filter.ExtensionRef.Name, grpcRoute.Namespace)
}
}
if filter.ExtensionRef != nil && filter.ExtensionRef.Kind == constants.KindRateLimitPolicy {
if ref, found := resourceParams.ResourceRateLimitPolicies[types.NamespacedName{
Name: string(filter.ExtensionRef.Name),
Namespace: grpcRoute.Namespace,
}.String()]; found {
resourceRatelimitPolicy = concatRateLimitPolicies(ratelimitPolicy, &ref)
} else {
return fmt.Errorf(`ratelimitpolicy: %s has not been resolved, spec.targetRef.kind should be
'Resource' in resource level RateLimitPolicies`, filter.ExtensionRef.Name)
}
}
}
resourceAuthScheme = concatAuthSchemes(resourceAuthScheme, nil)
resourceRatelimitPolicy = concatRateLimitPolicies(resourceRatelimitPolicy, nil)

loggers.LoggerOasparser.Debugf("Calculating auths for API ..., API_UUID = %v", adapterInternalAPI.UUID)
apiAuth := getSecurity(resourceAuthScheme)

for _, match := range rule.Matches {
resourcePath := adapterInternalAPI.GetXWso2Basepath() + "." + *match.Method.Service + "/" + *match.Method.Method
endPoints = append(endPoints, GetEndpoints(backendName, resourceParams.BackendMapping)...)
resource := &Resource{path: resourcePath, pathMatchType: "Exact",
methods: []*Operation{{iD: uuid.New().String(), method: "post", policies: policies,
auth: apiAuth, rateLimitPolicy: parseRateLimitPolicyToInternal(resourceRatelimitPolicy), scopes: scopes}},
iD: uuid.New().String(),
}
endpoints := GetEndpoints(backendName, resourceParams.BackendMapping)
resource.endpoints = &EndpointCluster{
Endpoints: endpoints,
}
resources = append(resources, resource)
}
}

ratelimitPolicy = concatRateLimitPolicies(ratelimitPolicy, nil)
apiPolicy = concatAPIPolicies(apiPolicy, nil)
authScheme = concatAuthSchemes(authScheme, nil)

adapterInternalAPI.RateLimitPolicy = parseRateLimitPolicyToInternal(ratelimitPolicy)
adapterInternalAPI.resources = resources
adapterInternalAPI.xWso2Cors = getCorsConfigFromAPIPolicy(apiPolicy)
if authScheme.Spec.Override != nil && authScheme.Spec.Override.Disabled != nil {
adapterInternalAPI.disableAuthentications = *authScheme.Spec.Override.Disabled
}
authSpec := utils.SelectPolicy(&authScheme.Spec.Override, &authScheme.Spec.Default, nil, nil)
if authSpec != nil && authSpec.AuthTypes != nil && authSpec.AuthTypes.Oauth2.Required != "" {
adapterInternalAPI.SetXWSO2ApplicationSecurity(authSpec.AuthTypes.Oauth2.Required == "mandatory")
} else {
adapterInternalAPI.SetXWSO2ApplicationSecurity(true)
}
adapterInternalAPI.disableScopes = disableScopes
// Check whether the API has a backend JWT token
if apiPolicy != nil && apiPolicy.Spec.Override != nil && apiPolicy.Spec.Override.BackendJWTPolicy != nil {
backendJWTPolicy := resourceParams.BackendJWTMapping[types.NamespacedName{
Name: apiPolicy.Spec.Override.BackendJWTPolicy.Name,
Namespace: grpcRoute.Namespace,
}.String()].Spec
adapterInternalAPI.backendJWTTokenInfo = parseBackendJWTTokenToInternal(backendJWTPolicy)
}
return nil
}

func (endpoint *Endpoint) validateEndpoint() error {
if endpoint.Port == 0 || endpoint.Port > 65535 {
return errors.New("endpoint port value should be between 0 and 65535")
Expand Down
2 changes: 1 addition & 1 deletion adapter/internal/oasparser/model/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func getHostandBasepathandPort(apiType string, rawURL string) (*Endpoint, error)
rawURL = strings.Trim(rawURL, " ")

if !strings.Contains(rawURL, "://") {
if apiType == constants.REST || apiType == constants.GRAPHQL {
if apiType == constants.REST || apiType == constants.GRAPHQL || apiType == constants.GRPC {
rawURL = "http://" + rawURL
} else if apiType == constants.WS {
rawURL = "ws://" + rawURL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ spec:
type: array
apiType:
description: APIType denotes the type of the API. Possible values
could be REST, GraphQL, Async
could be REST, GraphQL, GRPC Async
enum:
- REST
type: string
Expand Down Expand Up @@ -244,10 +244,11 @@ spec:
type: array
apiType:
description: APIType denotes the type of the API. Possible values
could be REST, GraphQL, Async
could be REST, GraphQL, GRPC Async
enum:
- REST
- GraphQL
- GRPC
type: string
apiVersion:
description: APIVersion is the version number of the API.
Expand Down
Loading
Loading