Skip to content

Commit

Permalink
Add LoadBalancer IR to HTTPRoute
Browse files Browse the repository at this point in the history
Relates to #1105

Signed-off-by: Arko Dasgupta <arko@tetrate.io>
  • Loading branch information
arkodg committed Oct 19, 2023
1 parent 23c91a6 commit 79d38c4
Show file tree
Hide file tree
Showing 16 changed files with 444 additions and 21 deletions.
64 changes: 64 additions & 0 deletions internal/ir/xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var (
ErrAddHeaderDuplicate = errors.New("header modifier filter attempts to add the same header more than once (case insensitive)")
ErrRemoveHeaderDuplicate = errors.New("header modifier filter attempts to remove the same header more than once (case insensitive)")
ErrRequestAuthenRequiresJwt = errors.New("jwt field is required when request authentication is set")
ErrLoadBalancerInvalid = errors.New("loadBalancer setting is invalid, only one setting can be set")
)

// Xds holds the intermediate representation of a Gateway and is
Expand Down Expand Up @@ -276,6 +277,8 @@ type HTTPRoute struct {
RequestAuthentication *RequestAuthentication `json:"requestAuthentication,omitempty" yaml:"requestAuthentication,omitempty"`
// Timeout is the time until which entire response is received from the upstream.
Timeout *metav1.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"`
// load balancer policy to use when routing to the backend endpoints.
LoadBalancer *LoadBalancer `json:"loadBalancer,omitempty" yaml:"loadBalancer,omitempty"`
// ExtensionRefs holds unstructured resources that were introduced by an extension and used on the HTTPRoute as extensionRef filters
ExtensionRefs []*UnstructuredRef `json:"extensionRefs,omitempty" yaml:"extensionRefs,omitempty"`
}
Expand Down Expand Up @@ -420,6 +423,12 @@ func (h HTTPRoute) Validate() error {
}
}
}
if h.LoadBalancer != nil {
if err := h.LoadBalancer.Validate(); err != nil {
errs = multierror.Append(errs, err)
}
}

return errs
}

Expand Down Expand Up @@ -952,3 +961,58 @@ type TCPKeepalive struct {
// Defaults to `75s`.
Interval *uint32 `json:"interval,omitempty" yaml:"interval,omitempty"`
}

// LoadBalancer defines the load balancer settings.
// +k8s:deepcopy-gen=true
type LoadBalancer struct {
// RoundRobin load balacning policy
RoundRobin *RoundRobin `json:"roundRobin,omitempty" yaml:"roundRobin,omitempty"`
// LeastRequest load balancer policy
LeastRequest *LeastRequest `json:"leastRequest,omitempty" yaml:"leastRequest,omitempty"`
// Random load balancer policy
Random *Random `json:"random,omitempty" yaml:"random,omitempty"`
// ConsistentHash load balancer policy
ConsistentHash *ConsistentHash `json:"consistentHash,omitempty" yaml:"consistentHash,omitempty"`
}

// Validate the fields within the LoadBalancer structure
func (l *LoadBalancer) Validate() error {
var errs error
matchCount := 0
if l.RoundRobin != nil {
matchCount++
}
if l.LeastRequest != nil {
matchCount++
}
if l.Random != nil {
matchCount++
}
if l.ConsistentHash != nil {
matchCount++
}
if matchCount != 1 {
errs = multierror.Append(errs, ErrLoadBalancerInvalid)
}

return errs
}

// RoundRobin load balancer settings
// +k8s:deepcopy-gen=true
type RoundRobin struct{}

// LeastRequest load balancer settings
// +k8s:deepcopy-gen=true
type LeastRequest struct{}

// Random load balancer settings
// +k8s:deepcopy-gen=true
type Random struct{}

// ConsistentHash load balancer settings
// +k8s:deepcopy-gen=true
type ConsistentHash struct {
// Hash based on the Source IP Address
SourceIP *bool `json:"sourceIP,omitempty" yaml:"sourceIP,omitempty"`
}
100 changes: 100 additions & 0 deletions internal/ir/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/xds/translator/accesslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func processClusterForAccessLog(tCtx *types.ResourceVersionTable, al *ir.AccessL
Weight: ptr.To(uint32(1)),
Endpoints: []*ir.DestinationEndpoint{ir.NewDestEndpoint(otel.Host, otel.Port)},
}
if err := addXdsCluster(tCtx, addXdsClusterArgs{
if err := addXdsCluster(tCtx, &xdsClusterArgs{
name: clusterName,
settings: []*ir.DestinationSetting{ds},
tSocket: nil,
Expand Down
2 changes: 1 addition & 1 deletion internal/xds/translator/authentication.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func createJwksClusters(tCtx *types.ResourceVersionTable, routes []*ir.HTTPRoute
if err != nil {
return err
}
if err := addXdsCluster(tCtx, addXdsClusterArgs{
if err := addXdsCluster(tCtx, &xdsClusterArgs{
name: jwks.name,
settings: []*ir.DestinationSetting{ds},
tSocket: tSocket,
Expand Down
29 changes: 21 additions & 8 deletions internal/xds/translator/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,10 @@ const (
tcpClusterPerConnectionBufferLimitBytes = 32768
)

func buildXdsCluster(clusterName string, tSocket *corev3.TransportSocket, protocol ProtocolType, endpointType EndpointType) *clusterv3.Cluster {
func buildXdsCluster(args *xdsClusterArgs) *clusterv3.Cluster {
cluster := &clusterv3.Cluster{
Name: clusterName,
Name: args.name,
ConnectTimeout: durationpb.New(10 * time.Second),
LbPolicy: clusterv3.Cluster_LEAST_REQUEST,
DnsLookupFamily: clusterv3.Cluster_V4_ONLY,
CommonLbConfig: &clusterv3.Cluster_CommonLbConfig{
LocalityConfigSpecifier: &clusterv3.Cluster_CommonLbConfig_LocalityWeightedLbConfig_{
Expand All @@ -40,14 +39,14 @@ func buildXdsCluster(clusterName string, tSocket *corev3.TransportSocket, protoc
PerConnectionBufferLimitBytes: wrapperspb.UInt32(tcpClusterPerConnectionBufferLimitBytes),
}

if tSocket != nil {
cluster.TransportSocket = tSocket
if args.tSocket != nil {
cluster.TransportSocket = args.tSocket
}

if endpointType == Static {
if args.endpointType == Static {
cluster.ClusterDiscoveryType = &clusterv3.Cluster_Type{Type: clusterv3.Cluster_EDS}
cluster.EdsClusterConfig = &clusterv3.Cluster_EdsClusterConfig{
ServiceName: clusterName,
ServiceName: args.name,
EdsConfig: &corev3.ConfigSource{
ResourceApiVersion: resource.DefaultAPIVersion,
ConfigSourceSpecifier: &corev3.ConfigSource_Ads{
Expand All @@ -61,10 +60,24 @@ func buildXdsCluster(clusterName string, tSocket *corev3.TransportSocket, protoc
cluster.RespectDnsTtl = true
}

if protocol == HTTP2 {
if args.protocol == HTTP2 {
cluster.TypedExtensionProtocolOptions = buildTypedExtensionProtocolOptions()
}

// Set Load Balancer policy
//nolint:gocritic
if args.loadBalancer == nil {
cluster.LbPolicy = clusterv3.Cluster_LEAST_REQUEST
} else if args.loadBalancer.LeastRequest != nil {
cluster.LbPolicy = clusterv3.Cluster_LEAST_REQUEST
} else if args.loadBalancer.RoundRobin != nil {
cluster.LbPolicy = clusterv3.Cluster_ROUND_ROBIN
} else if args.loadBalancer.Random != nil {
cluster.LbPolicy = clusterv3.Cluster_RANDOM
} else if args.loadBalancer.ConsistentHash != nil {
cluster.LbPolicy = clusterv3.Cluster_MAGLEV
}

return cluster
}

Expand Down
8 changes: 7 additions & 1 deletion internal/xds/translator/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@ const (
func TestBuildXdsCluster(t *testing.T) {
bootstrapXdsCluster := getXdsClusterObjFromBootstrap(t)

dynamicXdsCluster := buildXdsCluster(bootstrapXdsCluster.Name, bootstrapXdsCluster.TransportSocket, HTTP2, DefaultEndpointType)
args := &xdsClusterArgs{
name: bootstrapXdsCluster.Name,
tSocket: bootstrapXdsCluster.TransportSocket,
protocol: HTTP2,
endpointType: DefaultEndpointType,
}
dynamicXdsCluster := buildXdsCluster(args)

require.Equal(t, bootstrapXdsCluster.Name, dynamicXdsCluster.Name)
require.Equal(t, bootstrapXdsCluster.ClusterDiscoveryType, dynamicXdsCluster.ClusterDiscoveryType)
Expand Down
2 changes: 1 addition & 1 deletion internal/xds/translator/ratelimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ func (t *Translator) createRateLimitServiceCluster(tCtx *types.ResourceVersionTa
return err
}

if err := addXdsCluster(tCtx, addXdsClusterArgs{
if err := addXdsCluster(tCtx, &xdsClusterArgs{
name: clusterName,
settings: []*ir.DestinationSetting{ds},
tSocket: tSocket,
Expand Down
27 changes: 26 additions & 1 deletion internal/xds/translator/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,13 @@ func buildXdsRoute(httpRoute *ir.HTTPRoute) *routev3.Route {
}
}

// Hash Policy
if router.GetRoute() != nil {
router.GetRoute().HashPolicy = buildHashPolicy(httpRoute)
}

// Timeouts
if httpRoute.Timeout != nil {
if router.GetRoute() != nil && httpRoute.Timeout != nil {
router.GetRoute().Timeout = durationpb.New(httpRoute.Timeout.Duration)
}

Expand Down Expand Up @@ -330,3 +335,23 @@ func buildXdsAddedHeaders(headersToAdd []ir.AddHeader) []*corev3.HeaderValueOpti

return headerValueOptions
}

func buildHashPolicy(httpRoute *ir.HTTPRoute) []*routev3.RouteAction_HashPolicy {
// Return early
if httpRoute == nil || httpRoute.LoadBalancer == nil || httpRoute.LoadBalancer.ConsistentHash == nil {
return nil
}

if httpRoute.LoadBalancer.ConsistentHash.SourceIP != nil && *httpRoute.LoadBalancer.ConsistentHash.SourceIP {
hashPolicy := &routev3.RouteAction_HashPolicy{
PolicySpecifier: &routev3.RouteAction_HashPolicy_ConnectionProperties_{
ConnectionProperties: &routev3.RouteAction_HashPolicy_ConnectionProperties{
SourceIp: true,
},
},
}
return []*routev3.RouteAction_HashPolicy{hashPolicy}
}

return nil
}
Loading

0 comments on commit 79d38c4

Please sign in to comment.