Skip to content

Commit

Permalink
feat: gRPC Access Log Service (ALS) logging sink
Browse files Browse the repository at this point in the history
Signed-off-by: David Alger <davidmalger@gmail.com>
  • Loading branch information
davidalger committed Apr 2, 2024
1 parent 21ae2a5 commit 04a216d
Show file tree
Hide file tree
Showing 19 changed files with 983 additions and 27 deletions.
39 changes: 39 additions & 0 deletions examples/kubernetes/accesslog/als-accesslog.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
apiVersion: gateway.networking.k8s.io/v1
kind: GatewayClass
metadata:
name: eg
spec:
controllerName: gateway.envoyproxy.io/gatewayclass-controller
parametersRef:
group: gateway.envoyproxy.io
kind: EnvoyProxy
name: als-access-logging
namespace: envoy-gateway-system
---
apiVersion: gateway.envoyproxy.io/v1alpha1
kind: EnvoyProxy
metadata:
name: als-access-logging
namespace: envoy-gateway-system
spec:
telemetry:
accessLog:
settings:
- format:
type: JSON
json:
attr1: val1
attr2: val2
sinks:
- type: ALS
als:
backendRef:
name: envoy-als
namespace: monitoring
port: 9000
http:
requestHeaders:
- x-client-ip-address
responseHeaders:
- cache-control
type: HTTP
7 changes: 7 additions & 0 deletions examples/kubernetes/accesslog/multi-sinks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ spec:
- type: File
file:
path: /dev/stdout
- type: ALS
als:
backendRef:
name: envoy-als
namespace: monitoring
port: 9000
type: HTTP
- type: OpenTelemetry
openTelemetry:
host: otel-collector.monitoring.svc.cluster.local
Expand Down
93 changes: 91 additions & 2 deletions internal/gatewayapi/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ package gatewayapi
import (
"fmt"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
gwapiv1 "sigs.k8s.io/gateway-api/apis/v1"

egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1"
Expand Down Expand Up @@ -43,7 +45,7 @@ func (t *Translator) ProcessListeners(gateways []*GatewayContext, xdsIR XdsIRMap
infraIR[irKey].Proxy.Config = resources.EnvoyProxy
}

xdsIR[irKey].AccessLog = processAccessLog(infraIR[irKey].Proxy.Config)
xdsIR[irKey].AccessLog = t.processAccessLog(infraIR[irKey].Proxy.Config, gateway.Gateway, resources)
xdsIR[irKey].Tracing = processTracing(gateway.Gateway, infraIR[irKey].Proxy.Config)
xdsIR[irKey].Metrics = processMetrics(infraIR[irKey].Proxy.Config)

Expand Down Expand Up @@ -160,7 +162,7 @@ func (t *Translator) ProcessListeners(gateways []*GatewayContext, xdsIR XdsIRMap
}
}

func processAccessLog(envoyproxy *egv1a1.EnvoyProxy) *ir.AccessLog {
func (t *Translator) processAccessLog(envoyproxy *egv1a1.EnvoyProxy, gw *gwapiv1.Gateway, resources *Resources) *ir.AccessLog {
if envoyproxy == nil ||
envoyproxy.Spec.Telemetry == nil ||
envoyproxy.Spec.Telemetry.AccessLog == nil ||
Expand Down Expand Up @@ -208,6 +210,52 @@ func processAccessLog(envoyproxy *egv1a1.EnvoyProxy) *ir.AccessLog {
}
irAccessLog.JSON = append(irAccessLog.JSON, al)
}
case egv1a1.ProxyAccessLogSinkTypeALS:
if sink.ALS == nil {
continue
}

var logName string
if sink.ALS.LogName != nil {
logName = *sink.ALS.LogName
} else {
logName = fmt.Sprintf("accesslog/%s/%s", gw.Namespace, gw.Name)
}

clusterName := fmt.Sprintf("accesslog/%s/%s/port/%d",
NamespaceDerefOr(sink.ALS.BackendRef.Namespace, envoyproxy.Namespace),
string(sink.ALS.BackendRef.Name),
*sink.ALS.BackendRef.Port,
)

al := &ir.ALSAccessLog{
LogName: logName,
Destination: ir.RouteDestination{
Name: clusterName,
Settings: []*ir.DestinationSetting{
t.processServiceDestination(sink.ALS.BackendRef, ir.GRPC, envoyproxy, resources),
},
},
Type: sink.ALS.Type,
}

if al.Type == egv1a1.ALSEnvoyProxyAccessLogTypeHTTP {
http := &ir.ALSAccessLogHTTP{
RequestHeaders: sink.ALS.HTTP.RequestHeaders,
ResponseHeaders: sink.ALS.HTTP.ResponseHeaders,
ResponseTrailers: sink.ALS.HTTP.ResponseTrailers,
}
al.HTTP = http
}

switch accessLog.Format.Type {
case egv1a1.ProxyAccessLogFormatTypeJSON:
al.Attributes = accessLog.Format.JSON
case egv1a1.ProxyAccessLogFormatTypeText:
al.Text = accessLog.Format.Text
}

irAccessLog.ALS = append(irAccessLog.ALS, al)
case egv1a1.ProxyAccessLogSinkTypeOpenTelemetry:
if sink.OpenTelemetry == nil {
continue
Expand All @@ -234,6 +282,47 @@ func processAccessLog(envoyproxy *egv1a1.EnvoyProxy) *ir.AccessLog {
return irAccessLog
}

func (t *Translator) processServiceDestination(backendRef gwapiv1.BackendObjectReference, protocol ir.AppProtocol, envoyproxy *egv1a1.EnvoyProxy, resources *Resources) *ir.DestinationSetting {
var (
endpoints []*ir.DestinationEndpoint
addrType *ir.DestinationAddressType
servicePort v1.ServicePort
backendTLS *ir.TLSUpstreamConfig
)

serviceNamespace := NamespaceDerefOr(backendRef.Namespace, envoyproxy.Namespace)
service := resources.GetService(serviceNamespace, string(backendRef.Name))
for _, port := range service.Spec.Ports {
if port.Port == int32(*backendRef.Port) {
servicePort = port
break
}
}

if servicePort.AppProtocol != nil &&
*servicePort.AppProtocol == "kubernetes.io/h2c" {
protocol = ir.HTTP2
}

Check warning on line 305 in internal/gatewayapi/listener.go

View check run for this annotation

Codecov / codecov/patch

internal/gatewayapi/listener.go#L304-L305

Added lines #L304 - L305 were not covered by tests

// Route to endpoints by default
if !t.EndpointRoutingDisabled {
endpointSlices := resources.GetEndpointSlicesForBackend(serviceNamespace, string(backendRef.Name), KindDerefOr(backendRef.Kind, KindService))
endpoints, addrType = getIREndpointsFromEndpointSlices(endpointSlices, servicePort.Name, servicePort.Protocol)
} else {
// Fall back to Service ClusterIP routing
ep := ir.NewDestEndpoint(service.Spec.ClusterIP, uint32(*backendRef.Port))
endpoints = append(endpoints, ep)
}

Check warning on line 315 in internal/gatewayapi/listener.go

View check run for this annotation

Codecov / codecov/patch

internal/gatewayapi/listener.go#L312-L315

Added lines #L312 - L315 were not covered by tests

return &ir.DestinationSetting{
Weight: ptr.To(uint32(1)),
Protocol: protocol,
Endpoints: endpoints,
AddressType: addrType,
TLS: backendTLS,
}
}

func processTracing(gw *gwapiv1.Gateway, envoyproxy *egv1a1.EnvoyProxy) *ir.Tracing {
if envoyproxy == nil ||
envoyproxy.Spec.Telemetry == nil ||
Expand Down
132 changes: 132 additions & 0 deletions internal/gatewayapi/testdata/envoyproxy-accesslog-als-json.in.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
envoyproxy:
apiVersion: gateway.envoyproxy.io/v1alpha1
kind: EnvoyProxy
metadata:
namespace: envoy-gateway-system
name: test
spec:
telemetry:
accessLog:
settings:
- format:
type: JSON
json:
attr1: val1
attr2: val2
sinks:
- type: ALS
als:
logName: accesslog
backendRef:
name: envoy-als
namespace: monitoring
port: 9000
http:
requestHeaders:
- x-client-ip-address
responseHeaders:
- cache-control
responseTrailers:
- expires
type: HTTP
- type: ALS
als:
backendRef:
name: envoy-als
namespace: monitoring
port: 9000
type: TCP
provider:
type: Kubernetes
kubernetes:
envoyService:
type: LoadBalancer
envoyDeployment:
replicas: 2
container:
env:
- name: env_a
value: env_a_value
- name: env_b
value: env_b_name
image: "envoyproxy/envoy:distroless-dev"
resources:
requests:
cpu: 100m
memory: 512Mi
securityContext:
runAsUser: 2000
allowPrivilegeEscalation: false
pod:
annotations:
key1: val1
key2: val2
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: cloud.google.com/gke-nodepool
operator: In
values:
- router-node
tolerations:
- effect: NoSchedule
key: node-type
operator: Exists
value: "router"
securityContext:
runAsUser: 1000
runAsGroup: 3000
fsGroup: 2000
fsGroupChangePolicy: "OnRootMismatch"
volumes:
- name: certs
secret:
secretName: envoy-cert
gateways:
- apiVersion: gateway.networking.k8s.io/v1
kind: Gateway
metadata:
namespace: envoy-gateway
name: gateway-1
spec:
gatewayClassName: envoy-gateway-class
listeners:
- name: http
protocol: HTTP
port: 80
allowedRoutes:
namespaces:
from: Same
services:
- apiVersion: v1
kind: Service
metadata:
name: envoy-als
namespace: monitoring
spec:
type: ClusterIP
ports:
- name: grpc
port: 9000
protocol: TCP
targetPort: 9000
endpointSlices:
- apiVersion: discovery.k8s.io/v1
kind: EndpointSlice
metadata:
name: endpointslice-envoy-als
namespace: monitoring
labels:
kubernetes.io/service-name: envoy-als
addressType: IPv4
ports:
- name: grpc
protocol: TCP
port: 9090
endpoints:
- addresses:
- "10.240.0.10"
conditions:
ready: true
Loading

0 comments on commit 04a216d

Please sign in to comment.