Skip to content

Commit

Permalink
🐛 Chunk sending results by size (#1440)
Browse files Browse the repository at this point in the history
* 🐛 Chunk sending results by size

There is an upper limit to what is accepted by the platform per request.
Make sure we do not go past it when sending data.

* go mod update
  • Loading branch information
jaym authored Oct 3, 2024
1 parent 0b94fa4 commit 38ed514
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 15 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ require (
github.com/spf13/pflag v1.0.6-0.20201009195203-85dd5c8bc61c
github.com/spf13/viper v1.19.0
github.com/stretchr/testify v1.9.0
go.mondoo.com/cnquery/v11 v11.24.0
go.mondoo.com/cnquery/v11 v11.24.1-0.20241003101157-5788fb46e68e
go.mondoo.com/mondoo-go v0.0.0-20240924071220-8972d78c3774
go.mondoo.com/ranger-rpc v0.6.4
go.opentelemetry.io/otel v1.30.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,8 @@ go.etcd.io/etcd/client/pkg/v3 v3.5.1/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3
go.etcd.io/etcd/client/v2 v2.305.1/go.mod h1:pMEacxZW7o8pg4CrFE7pquyCJJzZvkvdD2RibOCCCGs=
go.mondoo.com/cnquery/v11 v11.24.0 h1:Dahfra5OIlgHRmvCdo2yuWxRLKQR+WPgBYOKMtzEGyY=
go.mondoo.com/cnquery/v11 v11.24.0/go.mod h1:RhCgz3xFT8Lp9+Zz7A/YprJw4BNvKP1IRJENNDQleUs=
go.mondoo.com/cnquery/v11 v11.24.1-0.20241003101157-5788fb46e68e h1:X22nHyW6SN+x6p3/Q3AzaB42lwFCyW8Xdw4vS3+Cwvo=
go.mondoo.com/cnquery/v11 v11.24.1-0.20241003101157-5788fb46e68e/go.mod h1:RhCgz3xFT8Lp9+Zz7A/YprJw4BNvKP1IRJENNDQleUs=
go.mondoo.com/mondoo-go v0.0.0-20240924071220-8972d78c3774 h1:CKLSb7moOWfk6oqBjr1QUy+St0b/yb/0DayUgtUsoAQ=
go.mondoo.com/mondoo-go v0.0.0-20240924071220-8972d78c3774/go.mod h1:dun0t/zVJCSB/u9dhSO4FXn13moiH7JSwS2m8Ucxx50=
go.mondoo.com/ranger-rpc v0.6.4 h1:q01kjESvF2HSnbFO+TjpUQSiI2IM8JWGJLH3u0vNxZA=
Expand Down
55 changes: 41 additions & 14 deletions policy/executor/internal/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/rs/zerolog/log"
"go.mondoo.com/cnquery/v11/llx"
"go.mondoo.com/cnquery/v11/utils/iox"
"go.mondoo.com/cnspec/v11/policy"
"google.golang.org/protobuf/proto"
)
Expand Down Expand Up @@ -317,23 +318,49 @@ func (c *PolicyServiceCollector) Sink(results []*llx.RawResult, scores []*policy
return
}

resultsToSend := make(map[string]*llx.Result, len(results))
for _, rr := range results {
resultsToSend[rr.CodeID] = c.toResult(rr)
if len(results) > 0 {
llxResults := make([]*llx.Result, len(results))
for i, rr := range results {
llxResults[i] = c.toResult(rr)
}
err := iox.ChunkMessages(func(chunk []*llx.Result) error {
log.Debug().Msg("Sending datapoints")
resultsToSend := make(map[string]*llx.Result, len(chunk))
for _, rr := range chunk {
resultsToSend[rr.CodeId] = rr
}
_, err := c.resolver.StoreResults(context.Background(), &policy.StoreResultsReq{
AssetMrn: c.assetMrn,
Data: resultsToSend,
IsPreprocessed: true,
IsLastBatch: isDone,
})
if err != nil {
log.Error().Err(err).Msg("failed to send datapoints")
}
return nil
}, func(item *llx.Result, msgSize int) {
log.Warn().Msgf("Data %s %d exceeds maximum message size", item.CodeId, msgSize)
}, llxResults...)
if err != nil {
log.Error().Err(err).Msg("failed to send datapoints")
}
}

log.Debug().Msg("Sending datapoints and scores")
_, err := c.resolver.StoreResults(context.Background(), &policy.StoreResultsReq{
AssetMrn: c.assetMrn,
Data: resultsToSend,
Scores: scores,
Risks: risks,
IsPreprocessed: true,
IsLastBatch: isDone,
})
if err != nil {
log.Error().Err(err).Msg("failed to send datapoints and scores")
if len(scores) > 0 || len(risks) > 0 {
log.Debug().Msg("Sending scores")
_, err := c.resolver.StoreResults(context.Background(), &policy.StoreResultsReq{
AssetMrn: c.assetMrn,
Scores: scores,
Risks: risks,
IsPreprocessed: true,
IsLastBatch: isDone,
})
if err != nil {
log.Error().Err(err).Msg("failed to send datapoints and scores")
}
}

}

type FuncCollector struct {
Expand Down

0 comments on commit 38ed514

Please sign in to comment.