diff --git a/contrib/envoyproxy/go-control-plane/envoy.go b/contrib/envoyproxy/go-control-plane/envoy.go index 853b4657bf..80a20ab0e5 100644 --- a/contrib/envoyproxy/go-control-plane/envoy.go +++ b/contrib/envoyproxy/go-control-plane/envoy.go @@ -13,7 +13,6 @@ import ( "net/http" "strings" - grpctrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/google.golang.org/grpc" "gopkg.in/DataDog/dd-trace-go.v1/contrib/internal/httptrace" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" @@ -23,7 +22,6 @@ import ( "gopkg.in/DataDog/dd-trace-go.v1/internal/log" "gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry" - "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -39,6 +37,16 @@ func init() { tracer.MarkIntegrationImported("github.com/envoyproxy/go-control-plane") } +// appsecEnvoyExternalProcessorServer is a server that implements the Envoy ExternalProcessorServer interface. +type appsecEnvoyExternalProcessorServer struct { + envoyextproc.ExternalProcessorServer +} + +// AppsecEnvoyExternalProcessorServer creates and returns a new instance of appsecEnvoyExternalProcessorServer. +func AppsecEnvoyExternalProcessorServer(userImplementation envoyextproc.ExternalProcessorServer) envoyextproc.ExternalProcessorServer { + return &appsecEnvoyExternalProcessorServer{userImplementation} +} + type currentRequest struct { span tracer.Span afterHandle func() @@ -47,96 +55,99 @@ type currentRequest struct { wrappedResponseWriter http.ResponseWriter } -func StreamServerInterceptor(opts ...grpctrace.Option) grpc.StreamServerInterceptor { - interceptor := grpctrace.StreamServerInterceptor(opts...) - - return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { - if info.FullMethod != "/envoy.service.ext_proc.v3.ExternalProcessor/Process" { - return interceptor(srv, ss, info, handler) +// Process handles the bidirectional stream that Envoy uses to give the server control +// over what the filter does. It processes incoming requests and sends appropriate responses +// based on the type of request received. +// +// The method receive incoming requests, processes them, and sends responses back to the client. +// It handles different types of requests such as request headers, response headers, request body, +// response body, request trailers, and response trailers. +// +// If the request is blocked, it sends an immediate response and ends the stream. If an error occurs +// during processing, it logs the error and returns an appropriate gRPC status error. +func (s *appsecEnvoyExternalProcessorServer) Process(processServer envoyextproc.ExternalProcessor_ProcessServer) error { + + var ( + ctx = processServer.Context() + blocked bool + currentRequest *currentRequest + processingRequest envoyextproc.ProcessingRequest + processingResponse *envoyextproc.ProcessingResponse + ) + + // Close the span when the request is done processing + defer func() { + if currentRequest != nil { + log.Warn("external_processing: stream stopped during a request, making sure the current span is closed\n") + currentRequest.span.Finish() + currentRequest = nil } + }() - var ( - ctx = ss.Context() - blocked bool - currentRequest *currentRequest - processingRequest envoyextproc.ProcessingRequest - processingResponse *envoyextproc.ProcessingResponse - ) - - // Close the span when the request is done processing - defer func() { - if currentRequest != nil { - log.Warn("external_processing: stream stopped during a request, making sure the current span is closed\n") - currentRequest.span.Finish() - currentRequest = nil + for { + select { + case <-ctx.Done(): + if errors.Is(ctx.Err(), context.Canceled) { + return nil } - }() - for { - select { - case <-ctx.Done(): - if errors.Is(ctx.Err(), context.Canceled) { - return nil - } + return ctx.Err() - return ctx.Err() + default: + } - default: + err := processServer.RecvMsg(&processingRequest) + if err != nil { + // Note: Envoy is inconsistent with the "end_of_stream" value of its headers responses, + // so we can't fully rely on it to determine when it will close (cancel) the stream. + if s, ok := status.FromError(err); (ok && s.Code() == codes.Canceled) || err == io.EOF { + return nil } - err := ss.RecvMsg(&processingRequest) - if err != nil { - // Note: Envoy is inconsistent with the "end_of_stream" value of its headers responses, - // so we can't fully rely on it to determine when it will close (cancel) the stream. - if err == io.EOF || err.(interface{ GRPCStatus() *status.Status }).GRPCStatus().Code() == codes.Canceled { - return nil - } - - log.Warn("external_processing: error receiving request/response: %v\n", err) - return status.Errorf(codes.Unknown, "Error receiving request/response: %v", err) - } + log.Warn("external_processing: error receiving request/response: %v\n", err) + return status.Errorf(codes.Unknown, "Error receiving request/response: %v", err) + } - processingResponse, err = envoyExternalProcessingRequestTypeAssert(&processingRequest) - if err != nil { - log.Error("external_processing: error asserting request type: %v\n", err) - return status.Errorf(codes.Unknown, "Error asserting request type: %v", err) - } + processingResponse, err = envoyExternalProcessingRequestTypeAssert(&processingRequest) + if err != nil { + log.Error("external_processing: error asserting request type: %v\n", err) + return status.Errorf(codes.Unknown, "Error asserting request type: %v", err) + } - switch v := processingRequest.Request.(type) { - case *envoyextproc.ProcessingRequest_RequestHeaders: - processingResponse, currentRequest, blocked, err = ProcessRequestHeaders(ctx, v) - case *envoyextproc.ProcessingRequest_ResponseHeaders: - processingResponse, err = processResponseHeaders(v, currentRequest) - currentRequest = nil // Request is done, reset the current request - } + switch v := processingRequest.Request.(type) { + case *envoyextproc.ProcessingRequest_RequestHeaders: + processingResponse, currentRequest, blocked, err = processRequestHeaders(ctx, v) + case *envoyextproc.ProcessingRequest_ResponseHeaders: + processingResponse, err = processResponseHeaders(v, currentRequest) + currentRequest = nil // Request is done, reset the current request + } - if err != nil { - log.Error("external_processing: error processing request: %v\n", err) - return err - } + if err != nil { + log.Error("external_processing: error processing request: %v\n", err) + return err + } - // End of stream reached, no more data to process - if processingResponse == nil { - log.Debug("external_processing: end of stream reached") - return nil - } + // End of stream reached, no more data to process + if processingResponse == nil { + log.Debug("external_processing: end of stream reached") + return nil + } - if err := ss.SendMsg(processingResponse); err != nil { - log.Warn("external_processing: error sending response (probably because of an Envoy timeout): %v", err) - return status.Errorf(codes.Unknown, "Error sending response (probably because of an Envoy timeout): %v", err) - } + if err := processServer.SendMsg(processingResponse); err != nil { + log.Warn("external_processing: error sending response (probably because of an Envoy timeout): %v", err) + return status.Errorf(codes.Unknown, "Error sending response (probably because of an Envoy timeout): %v", err) + } - if blocked { - log.Debug("external_processing: request blocked, end the stream") - currentRequest = nil - return nil - } + if blocked { + log.Debug("external_processing: request blocked, end the stream") + currentRequest = nil + return nil } } } func envoyExternalProcessingRequestTypeAssert(req *envoyextproc.ProcessingRequest) (*envoyextproc.ProcessingResponse, error) { - switch v := req.Request.(type) { + switch r := req.Request.(type) { case *envoyextproc.ProcessingRequest_RequestHeaders, *envoyextproc.ProcessingRequest_ResponseHeaders: return nil, nil @@ -158,8 +169,6 @@ func envoyExternalProcessingRequestTypeAssert(req *envoyextproc.ProcessingReques }, nil case *envoyextproc.ProcessingRequest_ResponseBody: - r := req.Request.(*envoyextproc.ProcessingRequest_ResponseBody) - // Note: The end of stream bool value is not reliable // Sometimes it's not set to true even if there is no more data to process if r.ResponseBody.GetEndOfStream() { @@ -177,11 +186,11 @@ func envoyExternalProcessingRequestTypeAssert(req *envoyextproc.ProcessingReques }, nil default: - return nil, status.Errorf(codes.Unknown, "Unknown request type: %T", v) + return nil, status.Errorf(codes.Unknown, "Unknown request type: %T", r) } } -func ProcessRequestHeaders(ctx context.Context, req *envoyextproc.ProcessingRequest_RequestHeaders) (*envoyextproc.ProcessingResponse, *currentRequest, bool, error) { +func processRequestHeaders(ctx context.Context, req *envoyextproc.ProcessingRequest_RequestHeaders) (*envoyextproc.ProcessingResponse, *currentRequest, bool, error) { log.Debug("external_processing: received request headers: %v\n", req.RequestHeaders) request, err := newRequest(ctx, req) diff --git a/contrib/envoyproxy/go-control-plane/envoy_test.go b/contrib/envoyproxy/go-control-plane/envoy_test.go index 9393278958..8af05eaab3 100644 --- a/contrib/envoyproxy/go-control-plane/envoy_test.go +++ b/contrib/envoyproxy/go-control-plane/envoy_test.go @@ -370,12 +370,11 @@ func newEnvoyAppsecRig(t *testing.T, traceClient bool, interceptorOpts ...ddgrpc interceptorOpts = append([]ddgrpc.InterceptorOption{ddgrpc.WithServiceName("grpc")}, interceptorOpts...) - server := grpc.NewServer( - grpc.StreamInterceptor(StreamServerInterceptor(interceptorOpts...)), - ) + server := grpc.NewServer() fixtureServer := new(envoyFixtureServer) - envoyextproc.RegisterExternalProcessorServer(server, fixtureServer) + appsecSrv := AppsecEnvoyExternalProcessorServer(fixtureServer) + envoyextproc.RegisterExternalProcessorServer(server, appsecSrv) li, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { diff --git a/contrib/envoyproxy/go-control-plane/example_test.go b/contrib/envoyproxy/go-control-plane/example_test.go index d9837d3091..f1e255dcaf 100644 --- a/contrib/envoyproxy/go-control-plane/example_test.go +++ b/contrib/envoyproxy/go-control-plane/example_test.go @@ -6,12 +6,20 @@ package go_control_plane_test import ( - "google.golang.org/grpc" - "gopkg.in/DataDog/dd-trace-go.v1/contrib/envoyproxy/go-control-plane" "log" "net" + + "google.golang.org/grpc" + + extprocv3 "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + gocontrolplane "gopkg.in/DataDog/dd-trace-go.v1/contrib/envoyproxy/go-control-plane" ) +// interface fpr external processing server +type envoyExtProcServer struct { + extprocv3.ExternalProcessorServer +} + func Example_server() { // Create a listener for the server. ln, err := net.Listen("tcp", ":50051") @@ -19,11 +27,13 @@ func Example_server() { log.Fatal(err) } - // Create the server interceptor using the envoy go control plane package. - si := go_control_plane.StreamServerInterceptor() - // Initialize the grpc server as normal, using the envoy server interceptor. - s := grpc.NewServer(grpc.StreamInterceptor(si)) + s := grpc.NewServer() + srv := &envoyExtProcServer{} + + // Register the appsec envoy external processor service + appsecSrv := gocontrolplane.AppsecEnvoyExternalProcessorServer(srv) + extprocv3.RegisterExternalProcessorServer(s, appsecSrv) // ... register your services