-
Notifications
You must be signed in to change notification settings - Fork 360
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor telemetry backendRefs #3293
Changes from all commits
9731e6b
e70b1e8
0d2b332
aa2e267
246a49f
df8beb4
ee2032a
4729e37
7591e44
ea5d012
208dd91
74535d1
3c1d844
bc2883e
ebee69b
c92777e
29886c7
ac7065b
2cd8738
c451672
0fa64ed
654997f
91e990f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,17 +6,19 @@ | |
package gatewayapi | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
|
||
corev1 "k8s.io/api/core/v1" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/utils/ptr" | ||
gwapiv1 "sigs.k8s.io/gateway-api/apis/v1" | ||
|
||
egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1" | ||
"github.com/envoyproxy/gateway/internal/gatewayapi/status" | ||
"github.com/envoyproxy/gateway/internal/ir" | ||
"github.com/envoyproxy/gateway/internal/utils" | ||
"github.com/envoyproxy/gateway/internal/utils/naming" | ||
"github.com/envoyproxy/gateway/internal/utils/net" | ||
) | ||
|
||
var _ ListenersTranslator = (*Translator)(nil) | ||
|
@@ -44,7 +46,7 @@ | |
if resources.EnvoyProxy != nil { | ||
infraIR[irKey].Proxy.Config = resources.EnvoyProxy | ||
} | ||
t.processProxyObservability(gateway.Gateway, xdsIR[irKey], infraIR[irKey].Proxy.Config) | ||
t.processProxyObservability(gateway, xdsIR[irKey], infraIR[irKey].Proxy.Config, resources) | ||
|
||
for _, listener := range gateway.listeners { | ||
// Process protocol & supported kinds | ||
|
@@ -143,10 +145,29 @@ | |
} | ||
} | ||
|
||
func (t *Translator) processProxyObservability(gw *gwapiv1.Gateway, xdsIR *ir.Xds, envoyProxy *egv1a1.EnvoyProxy) { | ||
xdsIR.AccessLog = processAccessLog(envoyProxy) | ||
xdsIR.Tracing = processTracing(gw, envoyProxy, t.MergeGateways) | ||
xdsIR.Metrics = processMetrics(envoyProxy) | ||
func (t *Translator) processProxyObservability(gwCtx *GatewayContext, xdsIR *ir.Xds, envoyProxy *egv1a1.EnvoyProxy, resources *Resources) { | ||
var err error | ||
|
||
xdsIR.AccessLog, err = t.processAccessLog(envoyProxy, resources) | ||
if err != nil { | ||
status.UpdateGatewayListenersNotValidCondition(gwCtx.Gateway, gwapiv1.GatewayReasonInvalid, metav1.ConditionFalse, | ||
fmt.Sprintf("Invalid access log backendRefs: %v", err)) | ||
return | ||
} | ||
|
||
xdsIR.Tracing, err = t.processTracing(gwCtx.Gateway, envoyProxy, t.MergeGateways, resources) | ||
if err != nil { | ||
status.UpdateGatewayListenersNotValidCondition(gwCtx.Gateway, gwapiv1.GatewayReasonInvalid, metav1.ConditionFalse, | ||
fmt.Sprintf("Invalid tracing backendRefs: %v", err)) | ||
return | ||
} | ||
|
||
xdsIR.Metrics, err = t.processMetrics(envoyProxy, resources) | ||
if err != nil { | ||
status.UpdateGatewayListenersNotValidCondition(gwCtx.Gateway, gwapiv1.GatewayReasonInvalid, metav1.ConditionFalse, | ||
fmt.Sprintf("Invalid metrics backendRefs: %v", err)) | ||
return | ||
} | ||
} | ||
|
||
func (t *Translator) processInfraIRListener(listener *ListenerContext, infraIR InfraIRMap, irKey string, servicePort *protocolPort, containerPort int32) { | ||
|
@@ -179,7 +200,7 @@ | |
infraIR[irKey].Proxy.Listeners = append(infraIR[irKey].Proxy.Listeners, proxyListener) | ||
} | ||
|
||
func processAccessLog(envoyproxy *egv1a1.EnvoyProxy) *ir.AccessLog { | ||
func (t *Translator) processAccessLog(envoyproxy *egv1a1.EnvoyProxy, resources *Resources) (*ir.AccessLog, error) { | ||
if envoyproxy == nil || | ||
envoyproxy.Spec.Telemetry == nil || | ||
envoyproxy.Spec.Telemetry.AccessLog == nil || | ||
|
@@ -191,16 +212,16 @@ | |
Path: "/dev/stdout", | ||
}, | ||
}, | ||
} | ||
}, nil | ||
} | ||
|
||
if envoyproxy.Spec.Telemetry.AccessLog.Disable { | ||
return nil | ||
return nil, nil | ||
} | ||
|
||
irAccessLog := &ir.AccessLog{} | ||
// translate the access log configuration to the IR | ||
for _, accessLog := range envoyproxy.Spec.Telemetry.AccessLog.Settings { | ||
for idx, accessLog := range envoyproxy.Spec.Telemetry.AccessLog.Settings { | ||
for _, sink := range accessLog.Sinks { | ||
switch sink.Type { | ||
case egv1a1.ProxyAccessLogSinkTypeFile: | ||
|
@@ -234,16 +255,28 @@ | |
|
||
// TODO: remove support for Host/Port in v1.2 | ||
al := &ir.OpenTelemetryAccessLog{ | ||
Port: uint32(sink.OpenTelemetry.Port), | ||
Resources: sink.OpenTelemetry.Resources, | ||
} | ||
|
||
if sink.OpenTelemetry.Host != nil { | ||
al.Host = *sink.OpenTelemetry.Host | ||
// TODO: how to get authority from the backendRefs? | ||
ds, err := t.processBackendRefs(sink.OpenTelemetry.BackendRefs, envoyproxy.Namespace, resources) | ||
if err != nil { | ||
return nil, err | ||
} | ||
al.Destination = ir.RouteDestination{ | ||
Name: fmt.Sprintf("accesslog-%d", idx), // TODO: rename this, so that we can share backend with tracing? | ||
Settings: ds, | ||
} | ||
|
||
if len(sink.OpenTelemetry.BackendRefs) > 0 { | ||
al.Host, al.Port = net.BackendHostAndPort(sink.OpenTelemetry.BackendRefs[0].BackendObjectReference, envoyproxy.Namespace) | ||
if len(ds) == 0 { | ||
// fallback to host and port | ||
var host string | ||
var port uint32 | ||
if sink.OpenTelemetry.Host != nil { | ||
host, port = *sink.OpenTelemetry.Host, uint32(sink.OpenTelemetry.Port) | ||
} | ||
al.Destination.Settings = destinationSettingFromHostAndPort(host, port) | ||
al.Authority = host | ||
} | ||
|
||
switch accessLog.Format.Type { | ||
|
@@ -258,25 +291,35 @@ | |
} | ||
} | ||
|
||
return irAccessLog | ||
return irAccessLog, nil | ||
} | ||
|
||
func processTracing(gw *gwapiv1.Gateway, envoyproxy *egv1a1.EnvoyProxy, mergeGateways bool) *ir.Tracing { | ||
func (t *Translator) processTracing(gw *gwapiv1.Gateway, envoyproxy *egv1a1.EnvoyProxy, mergeGateways bool, resources *Resources) (*ir.Tracing, error) { | ||
if envoyproxy == nil || | ||
envoyproxy.Spec.Telemetry == nil || | ||
envoyproxy.Spec.Telemetry.Tracing == nil { | ||
return nil | ||
return nil, nil | ||
} | ||
tracing := envoyproxy.Spec.Telemetry.Tracing | ||
|
||
// TODO: remove support for Host/Port in v1.2 | ||
var host string | ||
var port uint32 | ||
if tracing.Provider.Host != nil { | ||
host, port = *tracing.Provider.Host, uint32(tracing.Provider.Port) | ||
// TODO: how to get authority from the backendRefs? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. name.namespace ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what about a service outside of cluster. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yah that we'd need to add an explicit field in the future |
||
ds, err := t.processBackendRefs(tracing.Provider.BackendRefs, envoyproxy.Namespace, resources) | ||
if err != nil { | ||
return nil, err | ||
} | ||
if len(tracing.Provider.BackendRefs) > 0 { | ||
host, port = net.BackendHostAndPort(tracing.Provider.BackendRefs[0].BackendObjectReference, gw.Namespace) | ||
|
||
var authority string | ||
|
||
// fallback to host and port | ||
// TODO: remove support for Host/Port in v1.2 | ||
if len(ds) == 0 { | ||
var host string | ||
var port uint32 | ||
if tracing.Provider.Host != nil { | ||
host, port = *tracing.Provider.Host, uint32(tracing.Provider.Port) | ||
} | ||
ds = destinationSettingFromHostAndPort(host, port) | ||
authority = host | ||
} | ||
|
||
samplingRate := 100.0 | ||
|
@@ -290,22 +333,68 @@ | |
} | ||
|
||
return &ir.Tracing{ | ||
Authority: authority, | ||
ServiceName: serviceName, | ||
Host: host, | ||
Port: port, | ||
SamplingRate: samplingRate, | ||
CustomTags: tracing.CustomTags, | ||
} | ||
Destination: ir.RouteDestination{ | ||
Name: "tracing", // TODO: rename this, so that we can share backend with accesslog? | ||
Settings: ds, | ||
}, | ||
}, nil | ||
} | ||
|
||
func processMetrics(envoyproxy *egv1a1.EnvoyProxy) *ir.Metrics { | ||
func (t *Translator) processMetrics(envoyproxy *egv1a1.EnvoyProxy, resources *Resources) (*ir.Metrics, error) { | ||
if envoyproxy == nil || | ||
envoyproxy.Spec.Telemetry == nil || | ||
envoyproxy.Spec.Telemetry.Metrics == nil { | ||
return nil | ||
return nil, nil | ||
} | ||
|
||
for _, sink := range envoyproxy.Spec.Telemetry.Metrics.Sinks { | ||
if sink.OpenTelemetry == nil { | ||
continue | ||
} | ||
|
||
_, err := t.processBackendRefs(sink.OpenTelemetry.BackendRefs, envoyproxy.Namespace, resources) | ||
if err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
return &ir.Metrics{ | ||
EnableVirtualHostStats: envoyproxy.Spec.Telemetry.Metrics.EnableVirtualHostStats, | ||
EnablePerEndpointStats: envoyproxy.Spec.Telemetry.Metrics.EnablePerEndpointStats, | ||
}, nil | ||
} | ||
|
||
func (t *Translator) processBackendRefs(backendRefs []egv1a1.BackendRef, namespace string, resources *Resources) ([]*ir.DestinationSetting, error) { | ||
result := make([]*ir.DestinationSetting, 0, len(backendRefs)) | ||
for _, ref := range backendRefs { | ||
ns := NamespaceDerefOr(ref.Namespace, namespace) | ||
kind := KindDerefOr(ref.Kind, KindService) | ||
if kind != KindService { | ||
return nil, errors.New("only service kind is supported for backendRefs") | ||
} | ||
if err := validateBackendService(ref.BackendObjectReference, resources, ns, corev1.ProtocolTCP); err != nil { | ||
return nil, err | ||
} | ||
|
||
ds := t.processServiceDestinationSetting(ref.BackendObjectReference, ns, ir.GRPC, resources) | ||
result = append(result, ds) | ||
} | ||
if len(result) == 0 { | ||
return nil, nil | ||
} | ||
return result, nil | ||
} | ||
|
||
func destinationSettingFromHostAndPort(host string, port uint32) []*ir.DestinationSetting { | ||
return []*ir.DestinationSetting{ | ||
{ | ||
Weight: ptr.To[uint32](1), | ||
Protocol: ir.GRPC, | ||
Endpoints: []*ir.DestinationEndpoint{ir.NewDestEndpoint(host, port)}, | ||
}, | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to this prefix, once we add cluster level features like
circuitBreaking
which will live onbackendRef
level , we won't be able to reusetracing-
hereThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but we should reuse with tracing provider, normally there's only one collector per cluster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it may not be possible to reuse if we decide to put features inside
backendRef
field, if feature live in top level policy, it may be possible