diff --git a/go.mod b/go.mod index 72328054..3e689bee 100644 --- a/go.mod +++ b/go.mod @@ -38,6 +38,7 @@ require ( gocloud.dev v0.39.0 golang.org/x/sys v0.25.0 google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 + google.golang.org/grpc v1.67.0 google.golang.org/protobuf v1.34.2 gopkg.in/yaml.v3 v3.0.1 k8s.io/utils v0.0.0-20240921022957-49e7df575cb6 @@ -281,7 +282,6 @@ require ( google.golang.org/api v0.198.0 // indirect google.golang.org/genproto v0.0.0-20240903143218-8af14fe29dc1 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1 // indirect - google.golang.org/grpc v1.67.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/warnings.v0 v0.1.2 // indirect diff --git a/policy/executor/internal/chunker.go b/policy/executor/internal/chunker.go new file mode 100644 index 00000000..e8caeb47 --- /dev/null +++ b/policy/executor/internal/chunker.go @@ -0,0 +1,41 @@ +// Using license identifier: BUSL-1.1 +// Using copyright holder: Mondoo, Inc. + +package internal + +import ( + "google.golang.org/protobuf/proto" +) + +var maxMessageSize = 6 * (1 << 20) + +func ChunkMessages[T proto.Message](sendFunc func([]T) error, onTooLarge func(T, int), items ...T) error { + idx := 0 + for { + buffer := make([]T, 0, len(items)) + + if idx >= len(items) { + break + } + size := 0 + for i := idx; i < len(items); i++ { + msgSize := proto.Size(items[i]) + if msgSize > maxMessageSize { + onTooLarge(items[i], msgSize) + idx++ + continue + } + size += proto.Size(items[i]) + if size > maxMessageSize { + break + } + buffer = append(buffer, items[i]) + idx++ + } + if err := sendFunc(buffer); err != nil { + return err + } + } + + return nil +} diff --git a/policy/executor/internal/chunker_test.go b/policy/executor/internal/chunker_test.go new file mode 100644 index 00000000..05254fe8 --- /dev/null +++ b/policy/executor/internal/chunker_test.go @@ -0,0 +1,65 @@ +// Using license identifier: BUSL-1.1 +// Using copyright holder: Mondoo, Inc. + +package internal + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/require" + "google.golang.org/grpc/interop/grpc_testing" +) + +func TestChunkerIgnoreTooLargeMessages(t *testing.T) { + payloads := []*grpc_testing.Payload{ + { + Body: bytes.Repeat([]byte{0x01}, maxMessageSize+1), + }, + { + Body: bytes.Repeat([]byte{0x02}, maxMessageSize/2), + }, + } + + var chunks [][]*grpc_testing.Payload + err := ChunkMessages(func(chunk []*grpc_testing.Payload) error { + chunks = append(chunks, chunk) + return nil + }, func(*grpc_testing.Payload, int) {}, payloads...) + require.NoError(t, err) + require.Len(t, chunks, 1) + require.Len(t, chunks[0], 1) + require.Equal(t, payloads[1], chunks[0][0]) +} + +func TestChunker(t *testing.T) { + maxMessageSize = 100 + payloads := []*grpc_testing.Payload{ + { + Body: bytes.Repeat([]byte{0x01}, maxMessageSize-10), + }, + { + Body: bytes.Repeat([]byte{0x02}, maxMessageSize-10), + }, + { + Body: bytes.Repeat([]byte{0x03}, 10), + }, + { + Body: bytes.Repeat([]byte{0x04}, 10), + }, + } + + var chunks [][]*grpc_testing.Payload + err := ChunkMessages(func(chunk []*grpc_testing.Payload) error { + chunks = append(chunks, chunk) + return nil + }, func(*grpc_testing.Payload, int) {}, payloads...) + require.NoError(t, err) + require.Len(t, chunks, 3) + require.Len(t, chunks[0], 1) + require.Equal(t, payloads[0], chunks[0][0]) + require.Len(t, chunks[1], 1) + require.Equal(t, payloads[1], chunks[1][0]) + require.Len(t, chunks[2], 2) + require.Equal(t, payloads[2], chunks[2][0]) +} diff --git a/policy/executor/internal/collector.go b/policy/executor/internal/collector.go index 7310b570..732bfb7d 100644 --- a/policy/executor/internal/collector.go +++ b/policy/executor/internal/collector.go @@ -317,23 +317,49 @@ func (c *PolicyServiceCollector) Sink(results []*llx.RawResult, scores []*policy return } - resultsToSend := make(map[string]*llx.Result, len(results)) - for _, rr := range results { - resultsToSend[rr.CodeID] = c.toResult(rr) + if len(results) > 0 { + llxResults := make([]*llx.Result, len(results)) + for i, rr := range results { + llxResults[i] = c.toResult(rr) + } + err := ChunkMessages(func(chunk []*llx.Result) error { + log.Debug().Msg("Sending datapoints") + resultsToSend := make(map[string]*llx.Result, len(results)) + for _, rr := range chunk { + resultsToSend[rr.CodeId] = rr + } + _, err := c.resolver.StoreResults(context.Background(), &policy.StoreResultsReq{ + AssetMrn: c.assetMrn, + Data: resultsToSend, + IsPreprocessed: true, + IsLastBatch: isDone, + }) + if err != nil { + log.Error().Err(err).Msg("failed to send datapoints") + } + return nil + }, func(item *llx.Result, msgSize int) { + log.Warn().Msgf("Data %s %d exceeds maximum message size %d", item.CodeId, msgSize, maxMessageSize) + }, llxResults...) + if err != nil { + log.Error().Err(err).Msg("failed to send datapoints") + } } - log.Debug().Msg("Sending datapoints and scores") - _, err := c.resolver.StoreResults(context.Background(), &policy.StoreResultsReq{ - AssetMrn: c.assetMrn, - Data: resultsToSend, - Scores: scores, - Risks: risks, - IsPreprocessed: true, - IsLastBatch: isDone, - }) - if err != nil { - log.Error().Err(err).Msg("failed to send datapoints and scores") + if len(scores) > 0 || len(risks) > 0 { + log.Debug().Msg("Sending scores") + _, err := c.resolver.StoreResults(context.Background(), &policy.StoreResultsReq{ + AssetMrn: c.assetMrn, + Scores: scores, + Risks: risks, + IsPreprocessed: true, + IsLastBatch: isDone, + }) + if err != nil { + log.Error().Err(err).Msg("failed to send datapoints and scores") + } } + } type FuncCollector struct {