Skip to content

Commit

Permalink
Cut tagger stream event responses into chunks of 4MB max size each (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
adel121 authored Oct 21, 2024
1 parent 6cd87f7 commit 87369f7
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 12 deletions.
6 changes: 5 additions & 1 deletion cmd/cluster-agent/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ var (
apiRouter *mux.Router
)

const maxMessageSize = 4 * 1024 * 1024 // 4 MB

// StartServer creates the router and starts the HTTP server
func StartServer(ctx context.Context, w workloadmeta.Component, taggerComp tagger.Component, ac autodiscovery.Component, statusComponent status.Component, settings settings.Component, cfg config.Component) error {
// create the root HTTP router
Expand Down Expand Up @@ -126,11 +128,13 @@ func StartServer(ctx context.Context, w workloadmeta.Component, taggerComp tagge
opts := []grpc.ServerOption{
grpc.StreamInterceptor(grpc_auth.StreamServerInterceptor(authInterceptor)),
grpc.UnaryInterceptor(grpc_auth.UnaryServerInterceptor(authInterceptor)),
grpc.MaxSendMsgSize(maxMessageSize),
grpc.MaxRecvMsgSize(maxMessageSize),
}

grpcSrv := grpc.NewServer(opts...)
pb.RegisterAgentSecureServer(grpcSrv, &serverSecure{
taggerServer: taggerserver.NewServer(taggerComp),
taggerServer: taggerserver.NewServer(taggerComp, maxMessageSize),
})

timeout := pkgconfigsetup.Datadog().GetDuration("cluster_agent.server.idle_timeout_seconds") * time.Second
Expand Down
6 changes: 5 additions & 1 deletion comp/api/api/apiimpl/server_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

const cmdServerName string = "CMD API Server"
const cmdServerShortName string = "CMD"
const maxMessageSize = 4 * 1024 * 1024 // 4 MB

func (server *apiServer) startCMDServer(
cmdAddr string,
Expand All @@ -48,18 +49,21 @@ func (server *apiServer) startCMDServer(

// gRPC server
authInterceptor := grpcutil.AuthInterceptor(parseToken)

opts := []grpc.ServerOption{
grpc.Creds(credentials.NewClientTLSFromCert(tlsCertPool, cmdAddr)),
grpc.StreamInterceptor(grpc_auth.StreamServerInterceptor(authInterceptor)),
grpc.UnaryInterceptor(grpc_auth.UnaryServerInterceptor(authInterceptor)),
grpc.MaxRecvMsgSize(maxMessageSize),
grpc.MaxSendMsgSize(maxMessageSize),
}

s := grpc.NewServer(opts...)
pb.RegisterAgentServer(s, &grpcServer{})
pb.RegisterAgentSecureServer(s, &serverSecure{
configService: server.rcService,
configServiceMRF: server.rcServiceMRF,
taggerServer: taggerserver.NewServer(server.taggerComp),
taggerServer: taggerserver.NewServer(server.taggerComp, maxMessageSize),
taggerComp: server.taggerComp,
// TODO(components): decide if workloadmetaServer should be componentized itself
workloadmetaServer: workloadmetaServer.NewServer(server.wmeta),
Expand Down
26 changes: 16 additions & 10 deletions comp/core/tagger/taggerimpl/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ const (
// Server is a grpc server that streams tagger entities
type Server struct {
taggerComponent tagger.Component
maxMessageSize int
}

// NewServer returns a new Server
func NewServer(t tagger.Component) *Server {
func NewServer(t tagger.Component, maxMessageSize int) *Server {
return &Server{
taggerComponent: t,
maxMessageSize: maxMessageSize,
}
}

Expand Down Expand Up @@ -86,16 +88,20 @@ func (s *Server) TaggerStreamEntities(in *pb.StreamTagsRequest, out pb.AgentSecu
responseEvents = append(responseEvents, e)
}

err = grpc.DoWithTimeout(func() error {
return out.Send(&pb.StreamTagsResponse{
Events: responseEvents,
})
}, taggerStreamSendTimeout)
// Split events into chunks and send each one
chunks := splitEvents(responseEvents, s.maxMessageSize)
for _, chunk := range chunks {
err = grpc.DoWithTimeout(func() error {
return out.Send(&pb.StreamTagsResponse{
Events: chunk,
})
}, taggerStreamSendTimeout)

if err != nil {
log.Warnf("error sending tagger event: %s", err)
s.taggerComponent.GetTaggerTelemetryStore().ServerStreamErrors.Inc()
return err
if err != nil {
log.Warnf("error sending tagger event: %s", err)
s.taggerComponent.GetTaggerTelemetryStore().ServerStreamErrors.Inc()
return err
}
}

case <-out.Context().Done():
Expand Down
50 changes: 50 additions & 0 deletions comp/core/tagger/taggerimpl/server/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package server

import (
"google.golang.org/protobuf/proto"

pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/core"
)

// splitBySize splits the given slice into contiguous non-overlapping subslices such that
// the size of each sub-slice is at most maxChunkSize.
// The size of each item is calculated using computeSize
//
// This function assumes that the size of each single item of the initial slice is not larger than maxChunkSize
func splitBySize[T any](slice []T, maxChunkSize int, computeSize func(T) int) [][]T {

// TODO: return an iter.Seq[[]T] instead of [][]T once we upgrade to golang v1.23
// returning iter.Seq[[]T] has better performance in terms of memory consumption
var chunks [][]T
currentChunk := []T{}
currentSize := 0

for _, item := range slice {
eventSize := computeSize(item)
if currentSize+eventSize > maxChunkSize {
chunks = append(chunks, currentChunk)
currentChunk = []T{}
currentSize = 0
}
currentChunk = append(currentChunk, item)
currentSize += eventSize
}
if len(currentChunk) > 0 {
chunks = append(chunks, currentChunk)
}
return chunks
}

// splitEvents splits the array of events to chunks with at most maxChunkSize each
func splitEvents(events []*pb.StreamTagsEvent, maxChunkSize int) [][]*pb.StreamTagsEvent {
return splitBySize(
events,
maxChunkSize,
func(event *pb.StreamTagsEvent) int { return proto.Size(event) },
)
}
105 changes: 105 additions & 0 deletions comp/core/tagger/taggerimpl/server/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package server

import (
"testing"

"github.com/stretchr/testify/assert"
)

type mockStreamTagsEvent struct {
id int
size int
}

func TestSplitEvents(t *testing.T) {
testCases := []struct {
name string
events []mockStreamTagsEvent
maxChunkSize int
expected [][]mockStreamTagsEvent // Expecting indices of events in chunks for easier comparison
}{
{
name: "Empty input",
events: []mockStreamTagsEvent{},
maxChunkSize: 100,
expected: nil, // No chunks expected
},
{
name: "Single event within chunk size",
events: []mockStreamTagsEvent{
{id: 1, size: 50}, // Mock event with size 50
},
maxChunkSize: 100,
expected: [][]mockStreamTagsEvent{
{
{id: 1, size: 50}, // One chunk with one event
},
},
},
{
name: "Multiple events all fit in one chunk",
events: []mockStreamTagsEvent{
{id: 1, size: 20}, {id: 2, size: 30}, {id: 3, size: 40}, // Total size = 90
},
maxChunkSize: 100,
expected: [][]mockStreamTagsEvent{
{
{id: 1, size: 20}, {id: 2, size: 30}, {id: 3, size: 40}, // All events fit in one chunk
},
},
},
{
name: "Multiple events require splitting",
events: []mockStreamTagsEvent{
{id: 1, size: 40}, {id: 2, size: 50}, {id: 3, size: 60}, // Total size = 150
},
maxChunkSize: 100,
expected: [][]mockStreamTagsEvent{
{
{id: 1, size: 40},
{id: 2, size: 50},
},
{
{id: 3, size: 60},
}, // Last event in second chunk
},
},
{
name: "Events fit exactly in chunks",
events: []mockStreamTagsEvent{
{id: 1, size: 50}, {id: 2, size: 50}, // Total size = 100
},
maxChunkSize: 100,
expected: [][]mockStreamTagsEvent{
{{id: 1, size: 50}, {id: 2, size: 50}}, // Both events fit exactly in one chunk
},
},
{
name: "Event size exactly matches or exceeds chunk size",
events: []mockStreamTagsEvent{
{id: 1, size: 100}, {id: 2, size: 101}, // One exactly fits, one exceeds
},
maxChunkSize: 100,
expected: [][]mockStreamTagsEvent{
{
{id: 1, size: 100},
},
{
{id: 2, size: 101},
},
},
},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
chunks := splitBySize(testCase.events, testCase.maxChunkSize, func(e mockStreamTagsEvent) int { return e.size })
assert.Equal(t, testCase.expected, chunks)
})
}
}

0 comments on commit 87369f7

Please sign in to comment.