Skip to content

Commit

Permalink
🐛 Chunk sending results by size
Browse files Browse the repository at this point in the history
There is an upper limit to what is accepted by the platform per request.
Make sure we do not go past it when sending data.
  • Loading branch information
jaym committed Sep 26, 2024
1 parent 3f1c248 commit fe532b8
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 15 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ require (
gocloud.dev v0.39.0
golang.org/x/sys v0.25.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1
google.golang.org/grpc v1.67.0
google.golang.org/protobuf v1.34.2
gopkg.in/yaml.v3 v3.0.1
k8s.io/utils v0.0.0-20240921022957-49e7df575cb6
Expand Down Expand Up @@ -281,7 +282,6 @@ require (
google.golang.org/api v0.198.0 // indirect
google.golang.org/genproto v0.0.0-20240903143218-8af14fe29dc1 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1 // indirect
google.golang.org/grpc v1.67.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
Expand Down
38 changes: 38 additions & 0 deletions policy/executor/internal/chunker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package internal

import (
"google.golang.org/protobuf/proto"
)

var maxMessageSize = 6 * (1 << 20)

func ChunkMessages[T proto.Message](sendFunc func([]T) error, onTooLarge func(T, int), items ...T) error {
idx := 0
for {
buffer := make([]T, 0, len(items))

if idx >= len(items) {
break
}
size := 0
for i := idx; i < len(items); i++ {
msgSize := proto.Size(items[i])
if msgSize > maxMessageSize {
onTooLarge(items[i], msgSize)
idx++
continue
}
size += proto.Size(items[i])
if size > maxMessageSize {
break
}
buffer = append(buffer, items[i])
idx++
}
if err := sendFunc(buffer); err != nil {
return err
}
}

return nil
}
62 changes: 62 additions & 0 deletions policy/executor/internal/chunker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package internal

import (
"bytes"
"testing"

"github.com/stretchr/testify/require"
"google.golang.org/grpc/interop/grpc_testing"
)

func TestChunkerIgnoreTooLargeMessages(t *testing.T) {
payloads := []*grpc_testing.Payload{
{
Body: bytes.Repeat([]byte{0x01}, maxMessageSize+1),
},
{
Body: bytes.Repeat([]byte{0x02}, maxMessageSize/2),
},
}

var chunks [][]*grpc_testing.Payload
err := ChunkMessages(func(chunk []*grpc_testing.Payload) error {
chunks = append(chunks, chunk)
return nil
}, func(*grpc_testing.Payload, int) {}, payloads...)
require.NoError(t, err)
require.Len(t, chunks, 1)
require.Len(t, chunks[0], 1)
require.Equal(t, payloads[1], chunks[0][0])
}

func TestChunker(t *testing.T) {
maxMessageSize = 100
payloads := []*grpc_testing.Payload{
{
Body: bytes.Repeat([]byte{0x01}, maxMessageSize-10),
},
{
Body: bytes.Repeat([]byte{0x02}, maxMessageSize-10),
},
{
Body: bytes.Repeat([]byte{0x03}, 10),
},
{
Body: bytes.Repeat([]byte{0x04}, 10),
},
}

var chunks [][]*grpc_testing.Payload
err := ChunkMessages(func(chunk []*grpc_testing.Payload) error {
chunks = append(chunks, chunk)
return nil
}, func(*grpc_testing.Payload, int) {}, payloads...)
require.NoError(t, err)
require.Len(t, chunks, 3)
require.Len(t, chunks[0], 1)
require.Equal(t, payloads[0], chunks[0][0])
require.Len(t, chunks[1], 1)
require.Equal(t, payloads[1], chunks[1][0])
require.Len(t, chunks[2], 2)
require.Equal(t, payloads[2], chunks[2][0])
}
52 changes: 38 additions & 14 deletions policy/executor/internal/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,23 +317,47 @@ func (c *PolicyServiceCollector) Sink(results []*llx.RawResult, scores []*policy
return
}

resultsToSend := make(map[string]*llx.Result, len(results))
for _, rr := range results {
resultsToSend[rr.CodeID] = c.toResult(rr)
if len(results) > 0 {
llxResults := make([]*llx.Result, len(results))
for i, rr := range results {
llxResults[i] = c.toResult(rr)
}
ChunkMessages(func(chunk []*llx.Result) error {

Check failure on line 325 in policy/executor/internal/collector.go

View workflow job for this annotation

GitHub Actions / golangci-lint

Error return value is not checked (errcheck)
log.Debug().Msg("Sending datapoints")
resultsToSend := make(map[string]*llx.Result, len(results))
for _, rr := range chunk {
resultsToSend[rr.CodeId] = rr
}
_, err := c.resolver.StoreResults(context.Background(), &policy.StoreResultsReq{
AssetMrn: c.assetMrn,
Data: resultsToSend,
IsPreprocessed: true,
IsLastBatch: isDone,
})
if err != nil {
log.Error().Err(err).Msg("failed to send datapoints")
}
return nil
},
func(item *llx.Result, msgSize int) {
log.Warn().Msgf("Data %s %d exceeds maximum message size %d", item.CodeId, msgSize, maxMessageSize)
}, llxResults...)
}

log.Debug().Msg("Sending datapoints and scores")
_, err := c.resolver.StoreResults(context.Background(), &policy.StoreResultsReq{
AssetMrn: c.assetMrn,
Data: resultsToSend,
Scores: scores,
Risks: risks,
IsPreprocessed: true,
IsLastBatch: isDone,
})
if err != nil {
log.Error().Err(err).Msg("failed to send datapoints and scores")
if len(scores) > 0 || len(risks) > 0 {
log.Debug().Msg("Sending scores")
_, err := c.resolver.StoreResults(context.Background(), &policy.StoreResultsReq{
AssetMrn: c.assetMrn,
Scores: scores,
Risks: risks,
IsPreprocessed: true,
IsLastBatch: isDone,
})
if err != nil {
log.Error().Err(err).Msg("failed to send datapoints and scores")
}
}

}

type FuncCollector struct {
Expand Down

0 comments on commit fe532b8

Please sign in to comment.