Skip to content

Commit

Permalink
Applied comments
Browse files Browse the repository at this point in the history
  • Loading branch information
e-n-0 committed Dec 11, 2024
1 parent ee0cd57 commit 22d7095
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 87 deletions.
163 changes: 86 additions & 77 deletions contrib/envoyproxy/go-control-plane/envoy.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"net/http"
"strings"

grpctrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/google.golang.org/grpc"
"gopkg.in/DataDog/dd-trace-go.v1/contrib/internal/httptrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
Expand All @@ -23,7 +22,6 @@ import (
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
"gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand All @@ -39,6 +37,16 @@ func init() {
tracer.MarkIntegrationImported("github.com/envoyproxy/go-control-plane")
}

// appsecEnvoyExternalProcessorServer is a server that implements the Envoy ExternalProcessorServer interface.
type appsecEnvoyExternalProcessorServer struct {
envoyextproc.ExternalProcessorServer
}

// AppsecEnvoyExternalProcessorServer creates and returns a new instance of appsecEnvoyExternalProcessorServer.
func AppsecEnvoyExternalProcessorServer(userImplementation envoyextproc.ExternalProcessorServer) envoyextproc.ExternalProcessorServer {
return &appsecEnvoyExternalProcessorServer{userImplementation}
}

type currentRequest struct {
span tracer.Span
afterHandle func()
Expand All @@ -47,96 +55,99 @@ type currentRequest struct {
wrappedResponseWriter http.ResponseWriter
}

func StreamServerInterceptor(opts ...grpctrace.Option) grpc.StreamServerInterceptor {
interceptor := grpctrace.StreamServerInterceptor(opts...)

return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
if info.FullMethod != "/envoy.service.ext_proc.v3.ExternalProcessor/Process" {
return interceptor(srv, ss, info, handler)
// Process handles the bidirectional stream that Envoy uses to give the server control
// over what the filter does. It processes incoming requests and sends appropriate responses
// based on the type of request received.
//
// The method receive incoming requests, processes them, and sends responses back to the client.
// It handles different types of requests such as request headers, response headers, request body,
// response body, request trailers, and response trailers.
//
// If the request is blocked, it sends an immediate response and ends the stream. If an error occurs
// during processing, it logs the error and returns an appropriate gRPC status error.
func (s *appsecEnvoyExternalProcessorServer) Process(processServer envoyextproc.ExternalProcessor_ProcessServer) error {

var (
ctx = processServer.Context()
blocked bool
currentRequest *currentRequest
processingRequest envoyextproc.ProcessingRequest
processingResponse *envoyextproc.ProcessingResponse
)

// Close the span when the request is done processing
defer func() {
if currentRequest != nil {
log.Warn("external_processing: stream stopped during a request, making sure the current span is closed\n")
currentRequest.span.Finish()
currentRequest = nil
}
}()

var (
ctx = ss.Context()
blocked bool
currentRequest *currentRequest
processingRequest envoyextproc.ProcessingRequest
processingResponse *envoyextproc.ProcessingResponse
)

// Close the span when the request is done processing
defer func() {
if currentRequest != nil {
log.Warn("external_processing: stream stopped during a request, making sure the current span is closed\n")
currentRequest.span.Finish()
currentRequest = nil
for {
select {
case <-ctx.Done():
if errors.Is(ctx.Err(), context.Canceled) {
return nil
}
}()

for {
select {
case <-ctx.Done():
if errors.Is(ctx.Err(), context.Canceled) {
return nil
}
return ctx.Err()

return ctx.Err()
default:
}

default:
err := processServer.RecvMsg(&processingRequest)
if err != nil {
// Note: Envoy is inconsistent with the "end_of_stream" value of its headers responses,
// so we can't fully rely on it to determine when it will close (cancel) the stream.
if s, ok := status.FromError(err); (ok && s.Code() == codes.Canceled) || err == io.EOF {
return nil
}

err := ss.RecvMsg(&processingRequest)
if err != nil {
// Note: Envoy is inconsistent with the "end_of_stream" value of its headers responses,
// so we can't fully rely on it to determine when it will close (cancel) the stream.
if err == io.EOF || err.(interface{ GRPCStatus() *status.Status }).GRPCStatus().Code() == codes.Canceled {
return nil
}

log.Warn("external_processing: error receiving request/response: %v\n", err)
return status.Errorf(codes.Unknown, "Error receiving request/response: %v", err)
}
log.Warn("external_processing: error receiving request/response: %v\n", err)
return status.Errorf(codes.Unknown, "Error receiving request/response: %v", err)
}

processingResponse, err = envoyExternalProcessingRequestTypeAssert(&processingRequest)
if err != nil {
log.Error("external_processing: error asserting request type: %v\n", err)
return status.Errorf(codes.Unknown, "Error asserting request type: %v", err)
}
processingResponse, err = envoyExternalProcessingRequestTypeAssert(&processingRequest)
if err != nil {
log.Error("external_processing: error asserting request type: %v\n", err)
return status.Errorf(codes.Unknown, "Error asserting request type: %v", err)
}

switch v := processingRequest.Request.(type) {
case *envoyextproc.ProcessingRequest_RequestHeaders:
processingResponse, currentRequest, blocked, err = ProcessRequestHeaders(ctx, v)
case *envoyextproc.ProcessingRequest_ResponseHeaders:
processingResponse, err = processResponseHeaders(v, currentRequest)
currentRequest = nil // Request is done, reset the current request
}
switch v := processingRequest.Request.(type) {
case *envoyextproc.ProcessingRequest_RequestHeaders:
processingResponse, currentRequest, blocked, err = processRequestHeaders(ctx, v)
case *envoyextproc.ProcessingRequest_ResponseHeaders:
processingResponse, err = processResponseHeaders(v, currentRequest)
currentRequest = nil // Request is done, reset the current request
}

if err != nil {
log.Error("external_processing: error processing request: %v\n", err)
return err
}
if err != nil {
log.Error("external_processing: error processing request: %v\n", err)
return err
}

// End of stream reached, no more data to process
if processingResponse == nil {
log.Debug("external_processing: end of stream reached")
return nil
}
// End of stream reached, no more data to process
if processingResponse == nil {
log.Debug("external_processing: end of stream reached")
return nil
}

if err := ss.SendMsg(processingResponse); err != nil {
log.Warn("external_processing: error sending response (probably because of an Envoy timeout): %v", err)
return status.Errorf(codes.Unknown, "Error sending response (probably because of an Envoy timeout): %v", err)
}
if err := processServer.SendMsg(processingResponse); err != nil {
log.Warn("external_processing: error sending response (probably because of an Envoy timeout): %v", err)
return status.Errorf(codes.Unknown, "Error sending response (probably because of an Envoy timeout): %v", err)
}

if blocked {
log.Debug("external_processing: request blocked, end the stream")
currentRequest = nil
return nil
}
if blocked {
log.Debug("external_processing: request blocked, end the stream")
currentRequest = nil
return nil
}
}
}

func envoyExternalProcessingRequestTypeAssert(req *envoyextproc.ProcessingRequest) (*envoyextproc.ProcessingResponse, error) {
switch v := req.Request.(type) {
switch r := req.Request.(type) {
case *envoyextproc.ProcessingRequest_RequestHeaders, *envoyextproc.ProcessingRequest_ResponseHeaders:
return nil, nil

Expand All @@ -158,8 +169,6 @@ func envoyExternalProcessingRequestTypeAssert(req *envoyextproc.ProcessingReques
}, nil

case *envoyextproc.ProcessingRequest_ResponseBody:
r := req.Request.(*envoyextproc.ProcessingRequest_ResponseBody)

// Note: The end of stream bool value is not reliable
// Sometimes it's not set to true even if there is no more data to process
if r.ResponseBody.GetEndOfStream() {
Expand All @@ -177,11 +186,11 @@ func envoyExternalProcessingRequestTypeAssert(req *envoyextproc.ProcessingReques
}, nil

default:
return nil, status.Errorf(codes.Unknown, "Unknown request type: %T", v)
return nil, status.Errorf(codes.Unknown, "Unknown request type: %T", r)
}
}

func ProcessRequestHeaders(ctx context.Context, req *envoyextproc.ProcessingRequest_RequestHeaders) (*envoyextproc.ProcessingResponse, *currentRequest, bool, error) {
func processRequestHeaders(ctx context.Context, req *envoyextproc.ProcessingRequest_RequestHeaders) (*envoyextproc.ProcessingResponse, *currentRequest, bool, error) {
log.Debug("external_processing: received request headers: %v\n", req.RequestHeaders)

request, err := newRequest(ctx, req)
Expand Down
7 changes: 3 additions & 4 deletions contrib/envoyproxy/go-control-plane/envoy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,12 +370,11 @@ func newEnvoyAppsecRig(t *testing.T, traceClient bool, interceptorOpts ...ddgrpc

interceptorOpts = append([]ddgrpc.InterceptorOption{ddgrpc.WithServiceName("grpc")}, interceptorOpts...)

server := grpc.NewServer(
grpc.StreamInterceptor(StreamServerInterceptor(interceptorOpts...)),
)
server := grpc.NewServer()

fixtureServer := new(envoyFixtureServer)
envoyextproc.RegisterExternalProcessorServer(server, fixtureServer)
appsecSrv := AppsecEnvoyExternalProcessorServer(fixtureServer)
envoyextproc.RegisterExternalProcessorServer(server, appsecSrv)

li, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
Expand Down
22 changes: 16 additions & 6 deletions contrib/envoyproxy/go-control-plane/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,34 @@
package go_control_plane_test

import (
"google.golang.org/grpc"
"gopkg.in/DataDog/dd-trace-go.v1/contrib/envoyproxy/go-control-plane"
"log"
"net"

"google.golang.org/grpc"

extprocv3 "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
gocontrolplane "gopkg.in/DataDog/dd-trace-go.v1/contrib/envoyproxy/go-control-plane"
)

// interface fpr external processing server
type envoyExtProcServer struct {
extprocv3.ExternalProcessorServer
}

func Example_server() {
// Create a listener for the server.
ln, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatal(err)
}

// Create the server interceptor using the envoy go control plane package.
si := go_control_plane.StreamServerInterceptor()

// Initialize the grpc server as normal, using the envoy server interceptor.
s := grpc.NewServer(grpc.StreamInterceptor(si))
s := grpc.NewServer()
srv := &envoyExtProcServer{}

// Register the appsec envoy external processor service
appsecSrv := gocontrolplane.AppsecEnvoyExternalProcessorServer(srv)
extprocv3.RegisterExternalProcessorServer(s, appsecSrv)

// ... register your services

Expand Down

0 comments on commit 22d7095

Please sign in to comment.