Skip to content

Commit

Permalink
🐛 Chunk sending results by size
Browse files Browse the repository at this point in the history
There is an upper limit to what is accepted by the platform per request.
Make sure we do not go past it when sending data.
  • Loading branch information
jaym committed Oct 3, 2024
1 parent 0b94fa4 commit a5a7734
Showing 1 changed file with 41 additions and 14 deletions.
55 changes: 41 additions & 14 deletions policy/executor/internal/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/rs/zerolog/log"
"go.mondoo.com/cnquery/v11/llx"
"go.mondoo.com/cnquery/v11/utils/iox"
"go.mondoo.com/cnspec/v11/policy"
"google.golang.org/protobuf/proto"
)
Expand Down Expand Up @@ -317,23 +318,49 @@ func (c *PolicyServiceCollector) Sink(results []*llx.RawResult, scores []*policy
return
}

resultsToSend := make(map[string]*llx.Result, len(results))
for _, rr := range results {
resultsToSend[rr.CodeID] = c.toResult(rr)
if len(results) > 0 {
llxResults := make([]*llx.Result, len(results))
for i, rr := range results {
llxResults[i] = c.toResult(rr)
}
err := iox.ChunkMessages(func(chunk []*llx.Result) error {

Check failure on line 326 in policy/executor/internal/collector.go

View workflow job for this annotation

GitHub Actions / go-bench

undefined: iox.ChunkMessages

Check failure on line 326 in policy/executor/internal/collector.go

View workflow job for this annotation

GitHub Actions / go-test

undefined: iox.ChunkMessages

Check failure on line 326 in policy/executor/internal/collector.go

View workflow job for this annotation

GitHub Actions / go-test

undefined: iox.ChunkMessages

Check failure on line 326 in policy/executor/internal/collector.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: iox.ChunkMessages) (typecheck)

Check failure on line 326 in policy/executor/internal/collector.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: iox.ChunkMessages (typecheck)
log.Debug().Msg("Sending datapoints")
resultsToSend := make(map[string]*llx.Result, len(chunk))
for _, rr := range chunk {
resultsToSend[rr.CodeId] = rr
}
_, err := c.resolver.StoreResults(context.Background(), &policy.StoreResultsReq{
AssetMrn: c.assetMrn,
Data: resultsToSend,
IsPreprocessed: true,
IsLastBatch: isDone,
})
if err != nil {
log.Error().Err(err).Msg("failed to send datapoints")
}
return nil
}, func(item *llx.Result, msgSize int) {
log.Warn().Msgf("Data %s %d exceeds maximum message size", item.CodeId, msgSize)
}, llxResults...)
if err != nil {
log.Error().Err(err).Msg("failed to send datapoints")
}
}

log.Debug().Msg("Sending datapoints and scores")
_, err := c.resolver.StoreResults(context.Background(), &policy.StoreResultsReq{
AssetMrn: c.assetMrn,
Data: resultsToSend,
Scores: scores,
Risks: risks,
IsPreprocessed: true,
IsLastBatch: isDone,
})
if err != nil {
log.Error().Err(err).Msg("failed to send datapoints and scores")
if len(scores) > 0 || len(risks) > 0 {
log.Debug().Msg("Sending scores")
_, err := c.resolver.StoreResults(context.Background(), &policy.StoreResultsReq{
AssetMrn: c.assetMrn,
Scores: scores,
Risks: risks,
IsPreprocessed: true,
IsLastBatch: isDone,
})
if err != nil {
log.Error().Err(err).Msg("failed to send datapoints and scores")
}
}

}

type FuncCollector struct {
Expand Down

0 comments on commit a5a7734

Please sign in to comment.