From 87369f754046733321c3805c963f126fcaca3f29 Mon Sep 17 00:00:00 2001 From: Adel Haj Hassan <41540817+adel121@users.noreply.github.com> Date: Mon, 21 Oct 2024 16:02:24 +0200 Subject: [PATCH] Cut tagger stream event responses into chunks of 4MB max size each (#30192) --- cmd/cluster-agent/api/server.go | 6 +- comp/api/api/apiimpl/server_cmd.go | 6 +- comp/core/tagger/taggerimpl/server/server.go | 26 +++-- comp/core/tagger/taggerimpl/server/util.go | 50 +++++++++ .../tagger/taggerimpl/server/util_test.go | 105 ++++++++++++++++++ 5 files changed, 181 insertions(+), 12 deletions(-) create mode 100644 comp/core/tagger/taggerimpl/server/util.go create mode 100644 comp/core/tagger/taggerimpl/server/util_test.go diff --git a/cmd/cluster-agent/api/server.go b/cmd/cluster-agent/api/server.go index 0b6ba72b63f08..7b2047da8229e 100644 --- a/cmd/cluster-agent/api/server.go +++ b/cmd/cluster-agent/api/server.go @@ -53,6 +53,8 @@ var ( apiRouter *mux.Router ) +const maxMessageSize = 4 * 1024 * 1024 // 4 MB + // StartServer creates the router and starts the HTTP server func StartServer(ctx context.Context, w workloadmeta.Component, taggerComp tagger.Component, ac autodiscovery.Component, statusComponent status.Component, settings settings.Component, cfg config.Component) error { // create the root HTTP router @@ -126,11 +128,13 @@ func StartServer(ctx context.Context, w workloadmeta.Component, taggerComp tagge opts := []grpc.ServerOption{ grpc.StreamInterceptor(grpc_auth.StreamServerInterceptor(authInterceptor)), grpc.UnaryInterceptor(grpc_auth.UnaryServerInterceptor(authInterceptor)), + grpc.MaxSendMsgSize(maxMessageSize), + grpc.MaxRecvMsgSize(maxMessageSize), } grpcSrv := grpc.NewServer(opts...) pb.RegisterAgentSecureServer(grpcSrv, &serverSecure{ - taggerServer: taggerserver.NewServer(taggerComp), + taggerServer: taggerserver.NewServer(taggerComp, maxMessageSize), }) timeout := pkgconfigsetup.Datadog().GetDuration("cluster_agent.server.idle_timeout_seconds") * time.Second diff --git a/comp/api/api/apiimpl/server_cmd.go b/comp/api/api/apiimpl/server_cmd.go index 56215842c06d0..2a5a8485f421f 100644 --- a/comp/api/api/apiimpl/server_cmd.go +++ b/comp/api/api/apiimpl/server_cmd.go @@ -31,6 +31,7 @@ import ( const cmdServerName string = "CMD API Server" const cmdServerShortName string = "CMD" +const maxMessageSize = 4 * 1024 * 1024 // 4 MB func (server *apiServer) startCMDServer( cmdAddr string, @@ -48,10 +49,13 @@ func (server *apiServer) startCMDServer( // gRPC server authInterceptor := grpcutil.AuthInterceptor(parseToken) + opts := []grpc.ServerOption{ grpc.Creds(credentials.NewClientTLSFromCert(tlsCertPool, cmdAddr)), grpc.StreamInterceptor(grpc_auth.StreamServerInterceptor(authInterceptor)), grpc.UnaryInterceptor(grpc_auth.UnaryServerInterceptor(authInterceptor)), + grpc.MaxRecvMsgSize(maxMessageSize), + grpc.MaxSendMsgSize(maxMessageSize), } s := grpc.NewServer(opts...) @@ -59,7 +63,7 @@ func (server *apiServer) startCMDServer( pb.RegisterAgentSecureServer(s, &serverSecure{ configService: server.rcService, configServiceMRF: server.rcServiceMRF, - taggerServer: taggerserver.NewServer(server.taggerComp), + taggerServer: taggerserver.NewServer(server.taggerComp, maxMessageSize), taggerComp: server.taggerComp, // TODO(components): decide if workloadmetaServer should be componentized itself workloadmetaServer: workloadmetaServer.NewServer(server.wmeta), diff --git a/comp/core/tagger/taggerimpl/server/server.go b/comp/core/tagger/taggerimpl/server/server.go index 0542973b1074c..df5f5c45b7bda 100644 --- a/comp/core/tagger/taggerimpl/server/server.go +++ b/comp/core/tagger/taggerimpl/server/server.go @@ -30,12 +30,14 @@ const ( // Server is a grpc server that streams tagger entities type Server struct { taggerComponent tagger.Component + maxMessageSize int } // NewServer returns a new Server -func NewServer(t tagger.Component) *Server { +func NewServer(t tagger.Component, maxMessageSize int) *Server { return &Server{ taggerComponent: t, + maxMessageSize: maxMessageSize, } } @@ -86,16 +88,20 @@ func (s *Server) TaggerStreamEntities(in *pb.StreamTagsRequest, out pb.AgentSecu responseEvents = append(responseEvents, e) } - err = grpc.DoWithTimeout(func() error { - return out.Send(&pb.StreamTagsResponse{ - Events: responseEvents, - }) - }, taggerStreamSendTimeout) + // Split events into chunks and send each one + chunks := splitEvents(responseEvents, s.maxMessageSize) + for _, chunk := range chunks { + err = grpc.DoWithTimeout(func() error { + return out.Send(&pb.StreamTagsResponse{ + Events: chunk, + }) + }, taggerStreamSendTimeout) - if err != nil { - log.Warnf("error sending tagger event: %s", err) - s.taggerComponent.GetTaggerTelemetryStore().ServerStreamErrors.Inc() - return err + if err != nil { + log.Warnf("error sending tagger event: %s", err) + s.taggerComponent.GetTaggerTelemetryStore().ServerStreamErrors.Inc() + return err + } } case <-out.Context().Done(): diff --git a/comp/core/tagger/taggerimpl/server/util.go b/comp/core/tagger/taggerimpl/server/util.go new file mode 100644 index 0000000000000..28fc2c54a1f3d --- /dev/null +++ b/comp/core/tagger/taggerimpl/server/util.go @@ -0,0 +1,50 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package server + +import ( + "google.golang.org/protobuf/proto" + + pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/core" +) + +// splitBySize splits the given slice into contiguous non-overlapping subslices such that +// the size of each sub-slice is at most maxChunkSize. +// The size of each item is calculated using computeSize +// +// This function assumes that the size of each single item of the initial slice is not larger than maxChunkSize +func splitBySize[T any](slice []T, maxChunkSize int, computeSize func(T) int) [][]T { + + // TODO: return an iter.Seq[[]T] instead of [][]T once we upgrade to golang v1.23 + // returning iter.Seq[[]T] has better performance in terms of memory consumption + var chunks [][]T + currentChunk := []T{} + currentSize := 0 + + for _, item := range slice { + eventSize := computeSize(item) + if currentSize+eventSize > maxChunkSize { + chunks = append(chunks, currentChunk) + currentChunk = []T{} + currentSize = 0 + } + currentChunk = append(currentChunk, item) + currentSize += eventSize + } + if len(currentChunk) > 0 { + chunks = append(chunks, currentChunk) + } + return chunks +} + +// splitEvents splits the array of events to chunks with at most maxChunkSize each +func splitEvents(events []*pb.StreamTagsEvent, maxChunkSize int) [][]*pb.StreamTagsEvent { + return splitBySize( + events, + maxChunkSize, + func(event *pb.StreamTagsEvent) int { return proto.Size(event) }, + ) +} diff --git a/comp/core/tagger/taggerimpl/server/util_test.go b/comp/core/tagger/taggerimpl/server/util_test.go new file mode 100644 index 0000000000000..76f94c4988630 --- /dev/null +++ b/comp/core/tagger/taggerimpl/server/util_test.go @@ -0,0 +1,105 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package server + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +type mockStreamTagsEvent struct { + id int + size int +} + +func TestSplitEvents(t *testing.T) { + testCases := []struct { + name string + events []mockStreamTagsEvent + maxChunkSize int + expected [][]mockStreamTagsEvent // Expecting indices of events in chunks for easier comparison + }{ + { + name: "Empty input", + events: []mockStreamTagsEvent{}, + maxChunkSize: 100, + expected: nil, // No chunks expected + }, + { + name: "Single event within chunk size", + events: []mockStreamTagsEvent{ + {id: 1, size: 50}, // Mock event with size 50 + }, + maxChunkSize: 100, + expected: [][]mockStreamTagsEvent{ + { + {id: 1, size: 50}, // One chunk with one event + }, + }, + }, + { + name: "Multiple events all fit in one chunk", + events: []mockStreamTagsEvent{ + {id: 1, size: 20}, {id: 2, size: 30}, {id: 3, size: 40}, // Total size = 90 + }, + maxChunkSize: 100, + expected: [][]mockStreamTagsEvent{ + { + {id: 1, size: 20}, {id: 2, size: 30}, {id: 3, size: 40}, // All events fit in one chunk + }, + }, + }, + { + name: "Multiple events require splitting", + events: []mockStreamTagsEvent{ + {id: 1, size: 40}, {id: 2, size: 50}, {id: 3, size: 60}, // Total size = 150 + }, + maxChunkSize: 100, + expected: [][]mockStreamTagsEvent{ + { + {id: 1, size: 40}, + {id: 2, size: 50}, + }, + { + {id: 3, size: 60}, + }, // Last event in second chunk + }, + }, + { + name: "Events fit exactly in chunks", + events: []mockStreamTagsEvent{ + {id: 1, size: 50}, {id: 2, size: 50}, // Total size = 100 + }, + maxChunkSize: 100, + expected: [][]mockStreamTagsEvent{ + {{id: 1, size: 50}, {id: 2, size: 50}}, // Both events fit exactly in one chunk + }, + }, + { + name: "Event size exactly matches or exceeds chunk size", + events: []mockStreamTagsEvent{ + {id: 1, size: 100}, {id: 2, size: 101}, // One exactly fits, one exceeds + }, + maxChunkSize: 100, + expected: [][]mockStreamTagsEvent{ + { + {id: 1, size: 100}, + }, + { + {id: 2, size: 101}, + }, + }, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + chunks := splitBySize(testCase.events, testCase.maxChunkSize, func(e mockStreamTagsEvent) int { return e.size }) + assert.Equal(t, testCase.expected, chunks) + }) + } +}