From a5a7734dbddf6a174c303832fab910fd72ae8b60 Mon Sep 17 00:00:00 2001 From: Jay Mundrawala Date: Wed, 25 Sep 2024 16:17:39 -0500 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20Chunk=20sending=20results=20by?= =?UTF-8?q?=20size?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit There is an upper limit to what is accepted by the platform per request. Make sure we do not go past it when sending data. --- policy/executor/internal/collector.go | 55 ++++++++++++++++++++------- 1 file changed, 41 insertions(+), 14 deletions(-) diff --git a/policy/executor/internal/collector.go b/policy/executor/internal/collector.go index 7310b570..3067bf97 100644 --- a/policy/executor/internal/collector.go +++ b/policy/executor/internal/collector.go @@ -11,6 +11,7 @@ import ( "github.com/rs/zerolog/log" "go.mondoo.com/cnquery/v11/llx" + "go.mondoo.com/cnquery/v11/utils/iox" "go.mondoo.com/cnspec/v11/policy" "google.golang.org/protobuf/proto" ) @@ -317,23 +318,49 @@ func (c *PolicyServiceCollector) Sink(results []*llx.RawResult, scores []*policy return } - resultsToSend := make(map[string]*llx.Result, len(results)) - for _, rr := range results { - resultsToSend[rr.CodeID] = c.toResult(rr) + if len(results) > 0 { + llxResults := make([]*llx.Result, len(results)) + for i, rr := range results { + llxResults[i] = c.toResult(rr) + } + err := iox.ChunkMessages(func(chunk []*llx.Result) error { + log.Debug().Msg("Sending datapoints") + resultsToSend := make(map[string]*llx.Result, len(chunk)) + for _, rr := range chunk { + resultsToSend[rr.CodeId] = rr + } + _, err := c.resolver.StoreResults(context.Background(), &policy.StoreResultsReq{ + AssetMrn: c.assetMrn, + Data: resultsToSend, + IsPreprocessed: true, + IsLastBatch: isDone, + }) + if err != nil { + log.Error().Err(err).Msg("failed to send datapoints") + } + return nil + }, func(item *llx.Result, msgSize int) { + log.Warn().Msgf("Data %s %d exceeds maximum message size", item.CodeId, msgSize) + }, llxResults...) + if err != nil { + log.Error().Err(err).Msg("failed to send datapoints") + } } - log.Debug().Msg("Sending datapoints and scores") - _, err := c.resolver.StoreResults(context.Background(), &policy.StoreResultsReq{ - AssetMrn: c.assetMrn, - Data: resultsToSend, - Scores: scores, - Risks: risks, - IsPreprocessed: true, - IsLastBatch: isDone, - }) - if err != nil { - log.Error().Err(err).Msg("failed to send datapoints and scores") + if len(scores) > 0 || len(risks) > 0 { + log.Debug().Msg("Sending scores") + _, err := c.resolver.StoreResults(context.Background(), &policy.StoreResultsReq{ + AssetMrn: c.assetMrn, + Scores: scores, + Risks: risks, + IsPreprocessed: true, + IsLastBatch: isDone, + }) + if err != nil { + log.Error().Err(err).Msg("failed to send datapoints and scores") + } } + } type FuncCollector struct {