Skip to content

Commit

Permalink
Fix the performance of LimitSequenceByteSize (#3548)
Browse files Browse the repository at this point in the history
* Fix the performance of LimitSequenceByteSize

I changed this function recently (#3483) as it was previously incorrectly not limiting the byte sequence in all cases

Unfortunately this change caused the complexity of the function to go from O(n) -> O(n^2), where n is the number of sequence.Events

This PR changes the complexity back to O(n) while still keeping the correct limit check

We now estimate the byte overhead added by the Event slice using the constant `sequenceEventListSizeBytes` - which is a very conservative estimate
 - This does mean the function is now less useful for very small limits, but that isn't a problem for us in practice. Typically we limit to 4Mb, so 100 byte overhead is insignificant

This change means the time to process an EventSequence with 170k events (approximately our worst case when using a 4Mb limit) from 3m10s -> 22.5ms (on my AWS machine)

The motivation for this change was that large events going through our system were causing significant pauses in process
 - This impacted the scheduler and ingesters, causing widespread dispruption on large events (cancel/repioritize)

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>

* Fix comment

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>

* Rename constant + better comment

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>

---------

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>
  • Loading branch information
JamesMurkin authored Apr 28, 2024
1 parent cf292c0 commit 18c29e1
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 38 deletions.
63 changes: 28 additions & 35 deletions internal/common/eventutil/eventutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,58 +459,51 @@ func LimitSequencesByteSize(sequences []*armadaevents.EventSequence, sizeInBytes
return rv, nil
}

// LimitSequenceByteSize returns a slice of sequences produced by breaking up sequence.Events
// into separate sequences, each of which is at most MAX_SEQUENCE_SIZE_IN_BYTES bytes in size.
func LimitSequenceByteSize(sequence *armadaevents.EventSequence, maxSequenceSizeInBytes uint, strict bool) ([]*armadaevents.EventSequence, error) {
// This is an (over)estimate of the byte overhead used to represent the list EventSequence.Events
// We need this get a safe estimate for the headerSize in LimitSequenceByteSize
// We cannot simply rely on proto.Size on an EventSequence with an empty Event list,
// as proto is smart enough to realise it is empty and just nils it out for 0 bytes
const sequenceEventListOverheadSizeBytes = 100

// LimitSequenceByteSize returns a slice of sequences produced by breaking up sequence.Events into separate sequences
// If strict is true, each sequence will be at most sizeInBytes bytes in size
// If strict is false, sizeInBytes can be exceeded by at most the size of a single sequence.Event
func LimitSequenceByteSize(sequence *armadaevents.EventSequence, sizeInBytes uint, strict bool) ([]*armadaevents.EventSequence, error) {
// Compute the size of the sequence without events.
events := sequence.Events
sequence.Events = make([]*armadaevents.EventSequence_Event, 0)
headerSize := uint(proto.Size(sequence)) + sequenceEventListOverheadSizeBytes
sequence.Events = events

// var currentSequence *armadaevents.EventSequence
sequences := make([]*armadaevents.EventSequence, 0, 1)
lastSequenceEventSize := uint(0)
for _, event := range sequence.Events {
if len(sequences) == 0 {
sequences = append(sequences, &armadaevents.EventSequence{
Queue: sequence.Queue,
JobSetName: sequence.JobSetName,
UserId: sequence.UserId,
Groups: sequence.Groups,
Events: nil,
eventSize := uint(proto.Size(event))
if eventSize+headerSize > sizeInBytes && strict {
return nil, errors.WithStack(&armadaerrors.ErrInvalidArgument{
Name: "sequence",
Value: sequence,
Message: fmt.Sprintf(
"event of %d bytes is too large, when combined with a header of size %d is larger than the sequence size limit of %d",
eventSize,
headerSize,
sizeInBytes,
),
})
}
lastSequence := sequences[len(sequences)-1]
lastSequence.Events = append(lastSequence.Events, event)
sequenceSizeInBytes := uint(proto.Size(lastSequence))

if sequenceSizeInBytes > maxSequenceSizeInBytes {
// Event makes sequence too large, remove event and make a new sequence
lastSequence.Events = lastSequence.Events[:len(lastSequence.Events)-1]
if len(sequences) == 0 || lastSequenceEventSize+eventSize+headerSize > sizeInBytes {
sequences = append(sequences, &armadaevents.EventSequence{
Queue: sequence.Queue,
JobSetName: sequence.JobSetName,
UserId: sequence.UserId,
Groups: sequence.Groups,
Events: nil,
})

lastSequence = sequences[len(sequences)-1]
lastSequence.Events = append(lastSequence.Events, event)
sequenceSizeInBytes = uint(proto.Size(lastSequence))

if sequenceSizeInBytes > maxSequenceSizeInBytes && strict {
eventSize := uint(proto.Size(event))
return nil, errors.WithStack(&armadaerrors.ErrInvalidArgument{
Name: "sequence",
Value: sequence,
Message: fmt.Sprintf(
"event of %d bytes is too large, preventing the creation of a sequence with size limit %d",
eventSize,
maxSequenceSizeInBytes,
),
})
}
lastSequenceEventSize = 0
}
lastSequence := sequences[len(sequences)-1]
lastSequence.Events = append(lastSequence.Events, event)
lastSequenceEventSize += eventSize
}
return sequences, nil
}
35 changes: 32 additions & 3 deletions internal/common/eventutil/eventutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"
"time"

"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
networking "k8s.io/api/networking/v1"
Expand Down Expand Up @@ -622,6 +623,35 @@ func TestCompactSequences_Groups(t *testing.T) {
assert.Equal(t, expected, actual)
}

func TestSequenceEventListSizeBytes(t *testing.T) {
jobId, err := armadaevents.ProtoUuidFromUlidString(util.ULID().String())
if !assert.NoError(t, err) {
return
}

sequence := &armadaevents.EventSequence{
Queue: "",
UserId: "",
JobSetName: "",
Groups: []string{},
Events: []*armadaevents.EventSequence_Event{
{
Event: &armadaevents.EventSequence_Event_CancelledJob{
CancelledJob: &armadaevents.CancelledJob{
JobId: jobId,
},
},
},
},
}

sequenceSizeBytes := uint(proto.Size(sequence))
// If this fails, it means that the sequenceEventListOverheadSizeBytes constant is possibly too small
// We are showing our safe estimate of the byte overhead added by the event list in proto is definitely large enough
// by showing it is larger than a sequence with a single event (as that sequence contains the overhead added by the event list)
assert.True(t, sequenceSizeBytes < sequenceEventListOverheadSizeBytes)
}

func TestLimitSequenceByteSize(t *testing.T) {
sequence := &armadaevents.EventSequence{
Queue: "queue1",
Expand Down Expand Up @@ -655,7 +685,6 @@ func TestLimitSequenceByteSize(t *testing.T) {

_, err = LimitSequenceByteSize(sequence, 1, false)
assert.NoError(t, err)
assert.Equal(t, []*armadaevents.EventSequence{sequence}, actual)

expected := make([]*armadaevents.EventSequence, numEvents)
for i := 0; i < numEvents; i++ {
Expand All @@ -675,7 +704,7 @@ func TestLimitSequenceByteSize(t *testing.T) {
},
}
}
actual, err = LimitSequenceByteSize(sequence, 65, true)
actual, err = LimitSequenceByteSize(sequence, 65+sequenceEventListOverheadSizeBytes, true)
if !assert.NoError(t, err) {
return
}
Expand Down Expand Up @@ -710,7 +739,7 @@ func TestLimitSequencesByteSize(t *testing.T) {
sequences = append(sequences, sequence)
}

actual, err := LimitSequencesByteSize(sequences, 65, true)
actual, err := LimitSequencesByteSize(sequences, 65+sequenceEventListOverheadSizeBytes, true)
if !assert.NoError(t, err) {
return
}
Expand Down

0 comments on commit 18c29e1

Please sign in to comment.