Skip to content

Commit

Permalink
🐛 Chunk sending results by size
Browse files Browse the repository at this point in the history
There is an upper limit to what is accepted by the platform per request.
Make sure we do not go past it when sending data.
  • Loading branch information
jaym committed Sep 25, 2024
1 parent 3f1c248 commit 6c02359
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 14 deletions.
38 changes: 38 additions & 0 deletions policy/executor/internal/chunker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package internal

import (
"google.golang.org/protobuf/proto"
)

var maxMessageSize = 6 * (1 << 20)

func ChunkMessages[T proto.Message](sendFunc func([]T) error, onTooLarge func(T, int), items ...T) error {
idx := 0
for {
buffer := make([]T, 0, len(items))

if idx >= len(items) {
break
}
size := 0
for i := idx; i < len(items); i++ {
msgSize := proto.Size(items[i])
if msgSize > maxMessageSize {
onTooLarge(items[i], msgSize)
idx++
continue
}
size += proto.Size(items[i])
if size > maxMessageSize {
break
}
buffer = append(buffer, items[i])
idx++
}
if err := sendFunc(buffer); err != nil {
return err
}
}

return nil
}
62 changes: 62 additions & 0 deletions policy/executor/internal/chunker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package internal

import (
"bytes"
"testing"

"github.com/stretchr/testify/require"
"google.golang.org/grpc/interop/grpc_testing"
)

func TestChunkerIgnoreTooLargeMessages(t *testing.T) {
payloads := []*grpc_testing.Payload{
{
Body: bytes.Repeat([]byte{0x01}, maxMessageSize+1),
},
{
Body: bytes.Repeat([]byte{0x02}, maxMessageSize/2),
},
}

var chunks [][]*grpc_testing.Payload
err := ChunkMessages(func(chunk []*grpc_testing.Payload) error {
chunks = append(chunks, chunk)
return nil
}, func(*grpc_testing.Payload, int) {}, payloads...)
require.NoError(t, err)
require.Len(t, chunks, 1)
require.Len(t, chunks[0], 1)
require.Equal(t, payloads[1], chunks[0][0])
}

func TestChunker(t *testing.T) {
maxMessageSize = 100
payloads := []*grpc_testing.Payload{
{
Body: bytes.Repeat([]byte{0x01}, maxMessageSize-10),
},
{
Body: bytes.Repeat([]byte{0x02}, maxMessageSize-10),
},
{
Body: bytes.Repeat([]byte{0x03}, 10),
},
{
Body: bytes.Repeat([]byte{0x04}, 10),
},
}

var chunks [][]*grpc_testing.Payload
err := ChunkMessages(func(chunk []*grpc_testing.Payload) error {
chunks = append(chunks, chunk)
return nil
}, func(*grpc_testing.Payload, int) {}, payloads...)
require.NoError(t, err)
require.Len(t, chunks, 3)
require.Len(t, chunks[0], 1)
require.Equal(t, payloads[0], chunks[0][0])
require.Len(t, chunks[1], 1)
require.Equal(t, payloads[1], chunks[1][0])
require.Len(t, chunks[2], 2)
require.Equal(t, payloads[2], chunks[2][0])
}
52 changes: 38 additions & 14 deletions policy/executor/internal/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,23 +317,47 @@ func (c *PolicyServiceCollector) Sink(results []*llx.RawResult, scores []*policy
return
}

resultsToSend := make(map[string]*llx.Result, len(results))
for _, rr := range results {
resultsToSend[rr.CodeID] = c.toResult(rr)
if len(results) > 0 {
llxResults := make([]*llx.Result, len(results))
for i, rr := range results {
llxResults[i] = c.toResult(rr)
}
ChunkMessages(func(chunk []*llx.Result) error {

Check failure on line 325 in policy/executor/internal/collector.go

View workflow job for this annotation

GitHub Actions / golangci-lint

Error return value is not checked (errcheck)
log.Debug().Msg("Sending datapoints")
resultsToSend := make(map[string]*llx.Result, len(results))
for _, rr := range chunk {
resultsToSend[rr.CodeId] = rr
}
_, err := c.resolver.StoreResults(context.Background(), &policy.StoreResultsReq{
AssetMrn: c.assetMrn,
Data: resultsToSend,
IsPreprocessed: true,
IsLastBatch: isDone,
})
if err != nil {
log.Error().Err(err).Msg("failed to send datapoints")
}
return nil
},
func(item *llx.Result, msgSize int) {
log.Warn().Msgf("Data %s %d exceeds maximum message size %d", item.CodeId, msgSize, maxMessageSize)
}, llxResults...)
}

log.Debug().Msg("Sending datapoints and scores")
_, err := c.resolver.StoreResults(context.Background(), &policy.StoreResultsReq{
AssetMrn: c.assetMrn,
Data: resultsToSend,
Scores: scores,
Risks: risks,
IsPreprocessed: true,
IsLastBatch: isDone,
})
if err != nil {
log.Error().Err(err).Msg("failed to send datapoints and scores")
if len(scores) > 0 || len(risks) > 0 {
log.Debug().Msg("Sending scores")
_, err := c.resolver.StoreResults(context.Background(), &policy.StoreResultsReq{
AssetMrn: c.assetMrn,
Scores: scores,
Risks: risks,
IsPreprocessed: true,
IsLastBatch: isDone,
})
if err != nil {
log.Error().Err(err).Msg("failed to send datapoints and scores")
}
}

}

type FuncCollector struct {
Expand Down

0 comments on commit 6c02359

Please sign in to comment.