diff --git a/go.mod b/go.mod index b5d0952d..ef4ee0c6 100644 --- a/go.mod +++ b/go.mod @@ -31,7 +31,7 @@ require ( github.com/spf13/pflag v1.0.6-0.20201009195203-85dd5c8bc61c github.com/spf13/viper v1.19.0 github.com/stretchr/testify v1.9.0 - go.mondoo.com/cnquery/v11 v11.24.0 + go.mondoo.com/cnquery/v11 v11.24.1-0.20241003101157-5788fb46e68e go.mondoo.com/mondoo-go v0.0.0-20240924071220-8972d78c3774 go.mondoo.com/ranger-rpc v0.6.4 go.opentelemetry.io/otel v1.30.0 diff --git a/go.sum b/go.sum index 631a58b2..53078d92 100644 --- a/go.sum +++ b/go.sum @@ -941,6 +941,8 @@ go.etcd.io/etcd/client/pkg/v3 v3.5.1/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3 go.etcd.io/etcd/client/v2 v2.305.1/go.mod h1:pMEacxZW7o8pg4CrFE7pquyCJJzZvkvdD2RibOCCCGs= go.mondoo.com/cnquery/v11 v11.24.0 h1:Dahfra5OIlgHRmvCdo2yuWxRLKQR+WPgBYOKMtzEGyY= go.mondoo.com/cnquery/v11 v11.24.0/go.mod h1:RhCgz3xFT8Lp9+Zz7A/YprJw4BNvKP1IRJENNDQleUs= +go.mondoo.com/cnquery/v11 v11.24.1-0.20241003101157-5788fb46e68e h1:X22nHyW6SN+x6p3/Q3AzaB42lwFCyW8Xdw4vS3+Cwvo= +go.mondoo.com/cnquery/v11 v11.24.1-0.20241003101157-5788fb46e68e/go.mod h1:RhCgz3xFT8Lp9+Zz7A/YprJw4BNvKP1IRJENNDQleUs= go.mondoo.com/mondoo-go v0.0.0-20240924071220-8972d78c3774 h1:CKLSb7moOWfk6oqBjr1QUy+St0b/yb/0DayUgtUsoAQ= go.mondoo.com/mondoo-go v0.0.0-20240924071220-8972d78c3774/go.mod h1:dun0t/zVJCSB/u9dhSO4FXn13moiH7JSwS2m8Ucxx50= go.mondoo.com/ranger-rpc v0.6.4 h1:q01kjESvF2HSnbFO+TjpUQSiI2IM8JWGJLH3u0vNxZA= diff --git a/policy/executor/internal/collector.go b/policy/executor/internal/collector.go index 7310b570..3067bf97 100644 --- a/policy/executor/internal/collector.go +++ b/policy/executor/internal/collector.go @@ -11,6 +11,7 @@ import ( "github.com/rs/zerolog/log" "go.mondoo.com/cnquery/v11/llx" + "go.mondoo.com/cnquery/v11/utils/iox" "go.mondoo.com/cnspec/v11/policy" "google.golang.org/protobuf/proto" ) @@ -317,23 +318,49 @@ func (c *PolicyServiceCollector) Sink(results []*llx.RawResult, scores []*policy return } - resultsToSend := make(map[string]*llx.Result, len(results)) - for _, rr := range results { - resultsToSend[rr.CodeID] = c.toResult(rr) + if len(results) > 0 { + llxResults := make([]*llx.Result, len(results)) + for i, rr := range results { + llxResults[i] = c.toResult(rr) + } + err := iox.ChunkMessages(func(chunk []*llx.Result) error { + log.Debug().Msg("Sending datapoints") + resultsToSend := make(map[string]*llx.Result, len(chunk)) + for _, rr := range chunk { + resultsToSend[rr.CodeId] = rr + } + _, err := c.resolver.StoreResults(context.Background(), &policy.StoreResultsReq{ + AssetMrn: c.assetMrn, + Data: resultsToSend, + IsPreprocessed: true, + IsLastBatch: isDone, + }) + if err != nil { + log.Error().Err(err).Msg("failed to send datapoints") + } + return nil + }, func(item *llx.Result, msgSize int) { + log.Warn().Msgf("Data %s %d exceeds maximum message size", item.CodeId, msgSize) + }, llxResults...) + if err != nil { + log.Error().Err(err).Msg("failed to send datapoints") + } } - log.Debug().Msg("Sending datapoints and scores") - _, err := c.resolver.StoreResults(context.Background(), &policy.StoreResultsReq{ - AssetMrn: c.assetMrn, - Data: resultsToSend, - Scores: scores, - Risks: risks, - IsPreprocessed: true, - IsLastBatch: isDone, - }) - if err != nil { - log.Error().Err(err).Msg("failed to send datapoints and scores") + if len(scores) > 0 || len(risks) > 0 { + log.Debug().Msg("Sending scores") + _, err := c.resolver.StoreResults(context.Background(), &policy.StoreResultsReq{ + AssetMrn: c.assetMrn, + Scores: scores, + Risks: risks, + IsPreprocessed: true, + IsLastBatch: isDone, + }) + if err != nil { + log.Error().Err(err).Msg("failed to send datapoints and scores") + } } + } type FuncCollector struct {