Skip to content

Commit

Permalink
🐛 Chunk sending results by size
Browse files Browse the repository at this point in the history
There is an upper limit to what is accepted by the platform per request.
Make sure we do not go past it when sending data.
  • Loading branch information
jaym committed Sep 26, 2024
1 parent 3f1c248 commit 790d1da
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 15 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ require (
gocloud.dev v0.39.0
golang.org/x/sys v0.25.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1
google.golang.org/grpc v1.67.0
google.golang.org/protobuf v1.34.2
gopkg.in/yaml.v3 v3.0.1
k8s.io/utils v0.0.0-20240921022957-49e7df575cb6
Expand Down Expand Up @@ -281,7 +282,6 @@ require (
google.golang.org/api v0.198.0 // indirect
google.golang.org/genproto v0.0.0-20240903143218-8af14fe29dc1 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1 // indirect
google.golang.org/grpc v1.67.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
Expand Down
38 changes: 38 additions & 0 deletions policy/executor/internal/chunker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package internal

import (
"google.golang.org/protobuf/proto"
)

var maxMessageSize = 6 * (1 << 20)

func ChunkMessages[T proto.Message](sendFunc func([]T) error, onTooLarge func(T, int), items ...T) error {
idx := 0
for {
buffer := make([]T, 0, len(items))

if idx >= len(items) {
break
}
size := 0
for i := idx; i < len(items); i++ {
msgSize := proto.Size(items[i])
if msgSize > maxMessageSize {
onTooLarge(items[i], msgSize)
idx++
continue
}
size += proto.Size(items[i])
if size > maxMessageSize {
break
}
buffer = append(buffer, items[i])
idx++
}
if err := sendFunc(buffer); err != nil {
return err
}
}

return nil
}
62 changes: 62 additions & 0 deletions policy/executor/internal/chunker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package internal

import (
"bytes"
"testing"

"github.com/stretchr/testify/require"
"google.golang.org/grpc/interop/grpc_testing"
)

func TestChunkerIgnoreTooLargeMessages(t *testing.T) {
payloads := []*grpc_testing.Payload{
{
Body: bytes.Repeat([]byte{0x01}, maxMessageSize+1),
},
{
Body: bytes.Repeat([]byte{0x02}, maxMessageSize/2),
},
}

var chunks [][]*grpc_testing.Payload
err := ChunkMessages(func(chunk []*grpc_testing.Payload) error {
chunks = append(chunks, chunk)
return nil
}, func(*grpc_testing.Payload, int) {}, payloads...)
require.NoError(t, err)
require.Len(t, chunks, 1)
require.Len(t, chunks[0], 1)
require.Equal(t, payloads[1], chunks[0][0])
}

func TestChunker(t *testing.T) {
maxMessageSize = 100
payloads := []*grpc_testing.Payload{
{
Body: bytes.Repeat([]byte{0x01}, maxMessageSize-10),
},
{
Body: bytes.Repeat([]byte{0x02}, maxMessageSize-10),
},
{
Body: bytes.Repeat([]byte{0x03}, 10),
},
{
Body: bytes.Repeat([]byte{0x04}, 10),
},
}

var chunks [][]*grpc_testing.Payload
err := ChunkMessages(func(chunk []*grpc_testing.Payload) error {
chunks = append(chunks, chunk)
return nil
}, func(*grpc_testing.Payload, int) {}, payloads...)
require.NoError(t, err)
require.Len(t, chunks, 3)
require.Len(t, chunks[0], 1)
require.Equal(t, payloads[0], chunks[0][0])
require.Len(t, chunks[1], 1)
require.Equal(t, payloads[1], chunks[1][0])
require.Len(t, chunks[2], 2)
require.Equal(t, payloads[2], chunks[2][0])
}
54 changes: 40 additions & 14 deletions policy/executor/internal/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,23 +317,49 @@ func (c *PolicyServiceCollector) Sink(results []*llx.RawResult, scores []*policy
return
}

resultsToSend := make(map[string]*llx.Result, len(results))
for _, rr := range results {
resultsToSend[rr.CodeID] = c.toResult(rr)
if len(results) > 0 {
llxResults := make([]*llx.Result, len(results))
for i, rr := range results {
llxResults[i] = c.toResult(rr)
}
err := ChunkMessages(func(chunk []*llx.Result) error {
log.Debug().Msg("Sending datapoints")
resultsToSend := make(map[string]*llx.Result, len(results))
for _, rr := range chunk {
resultsToSend[rr.CodeId] = rr
}
_, err := c.resolver.StoreResults(context.Background(), &policy.StoreResultsReq{
AssetMrn: c.assetMrn,
Data: resultsToSend,
IsPreprocessed: true,
IsLastBatch: isDone,
})
if err != nil {
log.Error().Err(err).Msg("failed to send datapoints")
}
return nil
}, func(item *llx.Result, msgSize int) {
log.Warn().Msgf("Data %s %d exceeds maximum message size %d", item.CodeId, msgSize, maxMessageSize)
}, llxResults...)
if err != nil {
log.Error().Err(err).Msg("failed to send datapoints")
}
}

log.Debug().Msg("Sending datapoints and scores")
_, err := c.resolver.StoreResults(context.Background(), &policy.StoreResultsReq{
AssetMrn: c.assetMrn,
Data: resultsToSend,
Scores: scores,
Risks: risks,
IsPreprocessed: true,
IsLastBatch: isDone,
})
if err != nil {
log.Error().Err(err).Msg("failed to send datapoints and scores")
if len(scores) > 0 || len(risks) > 0 {
log.Debug().Msg("Sending scores")
_, err := c.resolver.StoreResults(context.Background(), &policy.StoreResultsReq{
AssetMrn: c.assetMrn,
Scores: scores,
Risks: risks,
IsPreprocessed: true,
IsLastBatch: isDone,
})
if err != nil {
log.Error().Err(err).Msg("failed to send datapoints and scores")
}
}

}

type FuncCollector struct {
Expand Down

0 comments on commit 790d1da

Please sign in to comment.