Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Reconcile BackendRefs on EnvoyProxy #3190

24 changes: 13 additions & 11 deletions examples/kubernetes/accesslog/otel-accesslog.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ spec:
telemetry:
accessLog:
settings:
- format:
type: Text
text: |
[%START_TIME%] "%REQ(:METHOD)% %REQ(X-ENVOY-ORIGINAL-PATH?:PATH)% %PROTOCOL%" %RESPONSE_CODE% %RESPONSE_FLAGS% %BYTES_RECEIVED% %BYTES_SENT% %DURATION% "%REQ(X-FORWARDED-FOR)%" "%REQ(USER-AGENT)%" "%REQ(X-REQUEST-ID)%" "%REQ(:AUTHORITY)%" "%UPSTREAM_HOST%"
sinks:
- type: OpenTelemetry
openTelemetry:
host: otel-collector.monitoring.svc.cluster.local
port: 4317
resources:
k8s.cluster.name: "cluster-1"
- format:
type: Text
text: |
[%START_TIME%] "%REQ(:METHOD)% %REQ(X-ENVOY-ORIGINAL-PATH?:PATH)% %PROTOCOL%" %RESPONSE_CODE% %RESPONSE_FLAGS% %BYTES_RECEIVED% %BYTES_SENT% %DURATION% "%REQ(X-FORWARDED-FOR)%" "%REQ(USER-AGENT)%" "%REQ(X-REQUEST-ID)%" "%REQ(:AUTHORITY)%" "%UPSTREAM_HOST%"
sinks:
- type: OpenTelemetry
openTelemetry:
backendRefs:
- name: otel-collector
namespace: monitoring
port: 4317
resources:
k8s.cluster.name: "cluster-1"
6 changes: 4 additions & 2 deletions examples/kubernetes/tracing/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ spec:
# sample 100% of requests
samplingRate: 100
provider:
host: otel-collector.monitoring.svc.cluster.local
port: 4317
backendRefs:
- name: otel-collector
namespace: monitoring
port: 4317
type: OpenTelemetry
customTags:
# This is an example of using a literal as a tag value
Expand Down
136 changes: 102 additions & 34 deletions internal/gatewayapi/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ package gatewayapi
import (
"fmt"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
gwapiv1 "sigs.k8s.io/gateway-api/apis/v1"

egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1"
"github.com/envoyproxy/gateway/internal/ir"
"github.com/envoyproxy/gateway/internal/utils"
"github.com/envoyproxy/gateway/internal/utils/naming"
"github.com/envoyproxy/gateway/internal/utils/net"
)

var _ ListenersTranslator = (*Translator)(nil)
Expand Down Expand Up @@ -43,7 +44,8 @@ func (t *Translator) ProcessListeners(gateways []*GatewayContext, xdsIR XdsIRMap
if resources.EnvoyProxy != nil {
infraIR[irKey].Proxy.Config = resources.EnvoyProxy
}
t.processProxyObservability(gateway.Gateway, xdsIR[irKey], infraIR[irKey].Proxy.Config)

t.processProxyObservability(gateway.Gateway, xdsIR[irKey], infraIR[irKey].Proxy.Config, resources)

for _, listener := range gateway.listeners {
// Process protocol & supported kinds
Expand Down Expand Up @@ -127,10 +129,10 @@ func (t *Translator) ProcessListeners(gateways []*GatewayContext, xdsIR XdsIRMap
}
}

func (t *Translator) processProxyObservability(gw *gwapiv1.Gateway, xdsIR *ir.Xds, envoyProxy *egv1a1.EnvoyProxy) {
xdsIR.AccessLog = processAccessLog(envoyProxy)
xdsIR.Tracing = processTracing(gw, envoyProxy, t.MergeGateways)
xdsIR.Metrics = processMetrics(envoyProxy)
func (t *Translator) processProxyObservability(gw *gwapiv1.Gateway, xdsIR *ir.Xds, envoyProxy *egv1a1.EnvoyProxy, resources *Resources) {
xdsIR.AccessLog = t.processAccessLog(gw, envoyProxy, resources)
xdsIR.Tracing = t.processTracing(gw, envoyProxy, resources)
xdsIR.Metrics = t.processMetrics(envoyProxy)
}

func (t *Translator) processInfraIRListener(listener *ListenerContext, infraIR InfraIRMap, irKey string, servicePort *protocolPort) {
Expand Down Expand Up @@ -163,7 +165,7 @@ func (t *Translator) processInfraIRListener(listener *ListenerContext, infraIR I
infraIR[irKey].Proxy.Listeners = append(infraIR[irKey].Proxy.Listeners, proxyListener)
}

func processAccessLog(envoyproxy *egv1a1.EnvoyProxy) *ir.AccessLog {
func (t *Translator) processAccessLog(gw *gwapiv1.Gateway, envoyproxy *egv1a1.EnvoyProxy, resources *Resources) *ir.AccessLog {
if envoyproxy == nil ||
envoyproxy.Spec.Telemetry == nil ||
envoyproxy.Spec.Telemetry.AccessLog == nil ||
Expand All @@ -185,7 +187,7 @@ func processAccessLog(envoyproxy *egv1a1.EnvoyProxy) *ir.AccessLog {
irAccessLog := &ir.AccessLog{}
// translate the access log configuration to the IR
for _, accessLog := range envoyproxy.Spec.Telemetry.AccessLog.Settings {
for _, sink := range accessLog.Sinks {
for index, sink := range accessLog.Sinks {
switch sink.Type {
case egv1a1.ProxyAccessLogSinkTypeFile:
if sink.File == nil {
Expand Down Expand Up @@ -216,18 +218,31 @@ func processAccessLog(envoyproxy *egv1a1.EnvoyProxy) *ir.AccessLog {
continue
}

// TODO: remove support for Host/Port in v1.2
al := &ir.OpenTelemetryAccessLog{
Port: uint32(sink.OpenTelemetry.Port),
Resources: sink.OpenTelemetry.Resources,
Destination: ir.RouteDestination{
Name: fmt.Sprintf("accesslog/%s/%s/sink/%d", gw.Namespace, gw.Name, index),
Settings: make([]*ir.DestinationSetting, 0, len(sink.OpenTelemetry.BackendRefs)),
},
}

if sink.OpenTelemetry.Host != nil {
al.Host = *sink.OpenTelemetry.Host
for _, backendRef := range sink.OpenTelemetry.BackendRefs {
al.Destination.Settings = append(al.Destination.Settings, t.processServiceDestination(backendRef, ir.GRPC, envoyproxy, resources))
}

if len(sink.OpenTelemetry.BackendRefs) > 0 {
al.Host, al.Port = net.BackendHostAndPort(sink.OpenTelemetry.BackendRefs[0].BackendObjectReference, envoyproxy.Namespace)
// TODO: remove support for Host/Port in v1.2
if sink.OpenTelemetry.Host != nil {
al.Destination.Settings = append(al.Destination.Settings, &ir.DestinationSetting{
Weight: ptr.To(uint32(1)),
Protocol: ir.GRPC,
Endpoints: []*ir.DestinationEndpoint{
{
Port: uint32(sink.OpenTelemetry.Port),
Host: *sink.OpenTelemetry.Host,
},
},
AddressType: ptr.To(ir.FQDN),
})
}

switch accessLog.Format.Type {
Expand All @@ -245,44 +260,55 @@ func processAccessLog(envoyproxy *egv1a1.EnvoyProxy) *ir.AccessLog {
return irAccessLog
}

func processTracing(gw *gwapiv1.Gateway, envoyproxy *egv1a1.EnvoyProxy, mergeGateways bool) *ir.Tracing {
func (t *Translator) processTracing(gw *gwapiv1.Gateway, envoyproxy *egv1a1.EnvoyProxy, resources *Resources) *ir.Tracing {
if envoyproxy == nil ||
envoyproxy.Spec.Telemetry == nil ||
envoyproxy.Spec.Telemetry.Tracing == nil {
return nil
}

tracing := envoyproxy.Spec.Telemetry.Tracing
tr := &ir.Tracing{
ServiceName: naming.ServiceName(utils.NamespacedName(gw)),
SamplingRate: 100.0,
CustomTags: tracing.CustomTags,
Destination: ir.RouteDestination{
Name: fmt.Sprintf("tracing/%s/%s", gw.Namespace, gw.Name),
Settings: make([]*ir.DestinationSetting, 0, len(tracing.Provider.BackendRefs)),
},
}

for _, backendRef := range tracing.Provider.BackendRefs {
tr.Destination.Settings = append(tr.Destination.Settings, t.processServiceDestination(backendRef, ir.GRPC, envoyproxy, resources))
}

// TODO: remove support for Host/Port in v1.2
var host string
var port uint32
if tracing.Provider.Host != nil {
host, port = *tracing.Provider.Host, uint32(tracing.Provider.Port)
}
if len(tracing.Provider.BackendRefs) > 0 {
host, port = net.BackendHostAndPort(tracing.Provider.BackendRefs[0].BackendObjectReference, gw.Namespace)
tr.Destination.Settings = append(tr.Destination.Settings, &ir.DestinationSetting{
Weight: ptr.To(uint32(1)),
Protocol: ir.GRPC,
Endpoints: []*ir.DestinationEndpoint{
{
Port: uint32(tracing.Provider.Port),
Host: *tracing.Provider.Host,
},
},
AddressType: ptr.To(ir.FQDN),
})
}

samplingRate := 100.0
if tracing.SamplingRate != nil {
samplingRate = float64(*tracing.SamplingRate)
tr.SamplingRate = float64(*tracing.SamplingRate)
}

serviceName := naming.ServiceName(utils.NamespacedName(gw))
if mergeGateways {
serviceName = string(gw.Spec.GatewayClassName)
if t.MergeGateways {
tr.ServiceName = string(gw.Spec.GatewayClassName)
}

return &ir.Tracing{
ServiceName: serviceName,
Host: host,
Port: port,
SamplingRate: samplingRate,
CustomTags: tracing.CustomTags,
}
return tr
}

func processMetrics(envoyproxy *egv1a1.EnvoyProxy) *ir.Metrics {
func (t *Translator) processMetrics(envoyproxy *egv1a1.EnvoyProxy) *ir.Metrics {
if envoyproxy == nil ||
envoyproxy.Spec.Telemetry == nil ||
envoyproxy.Spec.Telemetry.Metrics == nil {
Expand All @@ -293,3 +319,45 @@ func processMetrics(envoyproxy *egv1a1.EnvoyProxy) *ir.Metrics {
EnablePerEndpointStats: envoyproxy.Spec.Telemetry.Metrics.EnablePerEndpointStats,
}
}

func (t *Translator) processServiceDestination(backendRef egv1a1.BackendRef, protocol ir.AppProtocol, envoyproxy *egv1a1.EnvoyProxy, resources *Resources) *ir.DestinationSetting {
var (
endpoints []*ir.DestinationEndpoint
addrType *ir.DestinationAddressType
servicePort v1.ServicePort
backendTLS *ir.TLSUpstreamConfig
)

// TODO (davidalger) Handle case where Service referenced by backendRef doesn't exist
serviceNamespace := NamespaceDerefOr(backendRef.Namespace, envoyproxy.Namespace)
service := resources.GetService(serviceNamespace, string(backendRef.Name))
for _, port := range service.Spec.Ports {
Comment on lines +331 to +334
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will panic if the backendRef refers to a Service which cannot be found. Ideally this would surface as a message on the status somewhere, similar to how it does on XRoutes.

Status isn't currently being used on EnvoyProxy as far as I can tell. It seems it could make sense to put errors such as this on the Gateway rather than the EnvoyProxy since it's something which will arise when processing the Gateway.

@arkodg @zirain Do either of you have thoughts on where errors such as this should be surfaced?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah

  1. any EnvoyProxy errors are surfaced in the GatewayClass , lets tackle this separately, and raise a issue to track this because moving it to Accepted=False will break everything in the next reconciliation :)
  2. For now lets stick a Direct Response that returns 500 similar to
    ruleRoute.DirectResponse = &ir.DirectResponse{

if port.Port == int32(*backendRef.Port) {
servicePort = port
break
}
}

if servicePort.AppProtocol != nil &&
*servicePort.AppProtocol == "kubernetes.io/h2c" {
protocol = ir.HTTP2
}

// Route to endpoints by default
if !t.EndpointRoutingDisabled {
endpointSlices := resources.GetEndpointSlicesForBackend(serviceNamespace, string(backendRef.Name), KindDerefOr(backendRef.Kind, KindService))
endpoints, addrType = getIREndpointsFromEndpointSlices(endpointSlices, servicePort.Name, servicePort.Protocol)
} else {
// Fall back to Service ClusterIP routing
ep := ir.NewDestEndpoint(service.Spec.ClusterIP, uint32(*backendRef.Port))
endpoints = append(endpoints, ep)
}

return &ir.DestinationSetting{
Weight: ptr.To(uint32(1)),
Protocol: protocol,
Endpoints: endpoints,
AddressType: addrType,
TLS: backendTLS,
}
}
Loading
Loading