Skip to content

Commit

Permalink
🐛 Chunk sending results by size
Browse files Browse the repository at this point in the history
There is an upper limit to what is accepted by the platform per request.
Make sure we do not go past it when sending data.
  • Loading branch information
jaym committed Oct 3, 2024
1 parent 3f1c248 commit 47b4bca
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 15 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ require (
gocloud.dev v0.39.0
golang.org/x/sys v0.25.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1
google.golang.org/grpc v1.67.0
google.golang.org/protobuf v1.34.2
gopkg.in/yaml.v3 v3.0.1
k8s.io/utils v0.0.0-20240921022957-49e7df575cb6
Expand Down Expand Up @@ -281,7 +282,6 @@ require (
google.golang.org/api v0.198.0 // indirect
google.golang.org/genproto v0.0.0-20240903143218-8af14fe29dc1 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1 // indirect
google.golang.org/grpc v1.67.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
Expand Down
55 changes: 41 additions & 14 deletions policy/executor/internal/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/rs/zerolog/log"
"go.mondoo.com/cnquery/v11/llx"
"go.mondoo.com/cnquery/v11/utils/iox"
"go.mondoo.com/cnspec/v11/policy"
"google.golang.org/protobuf/proto"
)
Expand Down Expand Up @@ -317,23 +318,49 @@ func (c *PolicyServiceCollector) Sink(results []*llx.RawResult, scores []*policy
return
}

resultsToSend := make(map[string]*llx.Result, len(results))
for _, rr := range results {
resultsToSend[rr.CodeID] = c.toResult(rr)
if len(results) > 0 {
llxResults := make([]*llx.Result, len(results))
for i, rr := range results {
llxResults[i] = c.toResult(rr)
}
err := iox.ChunkMessages(func(chunk []*llx.Result) error {

Check failure on line 326 in policy/executor/internal/collector.go

View workflow job for this annotation

GitHub Actions / go-bench

undefined: iox.ChunkMessages

Check failure on line 326 in policy/executor/internal/collector.go

View workflow job for this annotation

GitHub Actions / go-test

undefined: iox.ChunkMessages

Check failure on line 326 in policy/executor/internal/collector.go

View workflow job for this annotation

GitHub Actions / go-test

undefined: iox.ChunkMessages
log.Debug().Msg("Sending datapoints")
resultsToSend := make(map[string]*llx.Result, len(chunk))
for _, rr := range chunk {
resultsToSend[rr.CodeId] = rr
}
_, err := c.resolver.StoreResults(context.Background(), &policy.StoreResultsReq{
AssetMrn: c.assetMrn,
Data: resultsToSend,
IsPreprocessed: true,
IsLastBatch: isDone,
})
if err != nil {
log.Error().Err(err).Msg("failed to send datapoints")
}
return nil
}, func(item *llx.Result, msgSize int) {
log.Warn().Msgf("Data %s %d exceeds maximum message size", item.CodeId, msgSize)
}, llxResults...)
if err != nil {
log.Error().Err(err).Msg("failed to send datapoints")
}
}

log.Debug().Msg("Sending datapoints and scores")
_, err := c.resolver.StoreResults(context.Background(), &policy.StoreResultsReq{
AssetMrn: c.assetMrn,
Data: resultsToSend,
Scores: scores,
Risks: risks,
IsPreprocessed: true,
IsLastBatch: isDone,
})
if err != nil {
log.Error().Err(err).Msg("failed to send datapoints and scores")
if len(scores) > 0 || len(risks) > 0 {
log.Debug().Msg("Sending scores")
_, err := c.resolver.StoreResults(context.Background(), &policy.StoreResultsReq{
AssetMrn: c.assetMrn,
Scores: scores,
Risks: risks,
IsPreprocessed: true,
IsLastBatch: isDone,
})
if err != nil {
log.Error().Err(err).Msg("failed to send datapoints and scores")
}
}

}

type FuncCollector struct {
Expand Down

0 comments on commit 47b4bca

Please sign in to comment.