diff --git a/internal/common/eventutil/eventutil.go b/internal/common/eventutil/eventutil.go index 15a0fdbf169..d430ace9bf4 100644 --- a/internal/common/eventutil/eventutil.go +++ b/internal/common/eventutil/eventutil.go @@ -459,33 +459,39 @@ func LimitSequencesByteSize(sequences []*armadaevents.EventSequence, sizeInBytes return rv, nil } -// LimitSequenceByteSize returns a slice of sequences produced by breaking up sequence.Events -// into separate sequences, each of which is at most MAX_SEQUENCE_SIZE_IN_BYTES bytes in size. -func LimitSequenceByteSize(sequence *armadaevents.EventSequence, maxSequenceSizeInBytes uint, strict bool) ([]*armadaevents.EventSequence, error) { +// This is an (over)estimate of the byte overhead used to represent the list EventSequence.Events +// We need this get a safe estimate for the headerSize in LimitSequenceByteSize +// We cannot simply rely on proto.Size on an EventSequence with an empty Event list, +// as proto is smart enough to realise it is empty and just nils it out for 0 bytes +const sequenceEventListOverheadSizeBytes = 100 + +// LimitSequenceByteSize returns a slice of sequences produced by breaking up sequence.Events into separate sequences +// If strict is true, each sequence will be at most sizeInBytes bytes in size +// If strict is false, sizeInBytes can be exceeded by at most the size of a single sequence.Event +func LimitSequenceByteSize(sequence *armadaevents.EventSequence, sizeInBytes uint, strict bool) ([]*armadaevents.EventSequence, error) { // Compute the size of the sequence without events. events := sequence.Events sequence.Events = make([]*armadaevents.EventSequence_Event, 0) + headerSize := uint(proto.Size(sequence)) + sequenceEventListOverheadSizeBytes sequence.Events = events - // var currentSequence *armadaevents.EventSequence sequences := make([]*armadaevents.EventSequence, 0, 1) + lastSequenceEventSize := uint(0) for _, event := range sequence.Events { - if len(sequences) == 0 { - sequences = append(sequences, &armadaevents.EventSequence{ - Queue: sequence.Queue, - JobSetName: sequence.JobSetName, - UserId: sequence.UserId, - Groups: sequence.Groups, - Events: nil, + eventSize := uint(proto.Size(event)) + if eventSize+headerSize > sizeInBytes && strict { + return nil, errors.WithStack(&armadaerrors.ErrInvalidArgument{ + Name: "sequence", + Value: sequence, + Message: fmt.Sprintf( + "event of %d bytes is too large, when combined with a header of size %d is larger than the sequence size limit of %d", + eventSize, + headerSize, + sizeInBytes, + ), }) } - lastSequence := sequences[len(sequences)-1] - lastSequence.Events = append(lastSequence.Events, event) - sequenceSizeInBytes := uint(proto.Size(lastSequence)) - - if sequenceSizeInBytes > maxSequenceSizeInBytes { - // Event makes sequence too large, remove event and make a new sequence - lastSequence.Events = lastSequence.Events[:len(lastSequence.Events)-1] + if len(sequences) == 0 || lastSequenceEventSize+eventSize+headerSize > sizeInBytes { sequences = append(sequences, &armadaevents.EventSequence{ Queue: sequence.Queue, JobSetName: sequence.JobSetName, @@ -493,24 +499,11 @@ func LimitSequenceByteSize(sequence *armadaevents.EventSequence, maxSequenceSize Groups: sequence.Groups, Events: nil, }) - - lastSequence = sequences[len(sequences)-1] - lastSequence.Events = append(lastSequence.Events, event) - sequenceSizeInBytes = uint(proto.Size(lastSequence)) - - if sequenceSizeInBytes > maxSequenceSizeInBytes && strict { - eventSize := uint(proto.Size(event)) - return nil, errors.WithStack(&armadaerrors.ErrInvalidArgument{ - Name: "sequence", - Value: sequence, - Message: fmt.Sprintf( - "event of %d bytes is too large, preventing the creation of a sequence with size limit %d", - eventSize, - maxSequenceSizeInBytes, - ), - }) - } + lastSequenceEventSize = 0 } + lastSequence := sequences[len(sequences)-1] + lastSequence.Events = append(lastSequence.Events, event) + lastSequenceEventSize += eventSize } return sequences, nil } diff --git a/internal/common/eventutil/eventutil_test.go b/internal/common/eventutil/eventutil_test.go index d92bd6cc182..905f4fa52fd 100644 --- a/internal/common/eventutil/eventutil_test.go +++ b/internal/common/eventutil/eventutil_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" networking "k8s.io/api/networking/v1" @@ -622,6 +623,35 @@ func TestCompactSequences_Groups(t *testing.T) { assert.Equal(t, expected, actual) } +func TestSequenceEventListSizeBytes(t *testing.T) { + jobId, err := armadaevents.ProtoUuidFromUlidString(util.ULID().String()) + if !assert.NoError(t, err) { + return + } + + sequence := &armadaevents.EventSequence{ + Queue: "", + UserId: "", + JobSetName: "", + Groups: []string{}, + Events: []*armadaevents.EventSequence_Event{ + { + Event: &armadaevents.EventSequence_Event_CancelledJob{ + CancelledJob: &armadaevents.CancelledJob{ + JobId: jobId, + }, + }, + }, + }, + } + + sequenceSizeBytes := uint(proto.Size(sequence)) + // If this fails, it means that the sequenceEventListOverheadSizeBytes constant is possibly too small + // We are showing our safe estimate of the byte overhead added by the event list in proto is definitely large enough + // by showing it is larger than a sequence with a single event (as that sequence contains the overhead added by the event list) + assert.True(t, sequenceSizeBytes < sequenceEventListOverheadSizeBytes) +} + func TestLimitSequenceByteSize(t *testing.T) { sequence := &armadaevents.EventSequence{ Queue: "queue1", @@ -655,7 +685,6 @@ func TestLimitSequenceByteSize(t *testing.T) { _, err = LimitSequenceByteSize(sequence, 1, false) assert.NoError(t, err) - assert.Equal(t, []*armadaevents.EventSequence{sequence}, actual) expected := make([]*armadaevents.EventSequence, numEvents) for i := 0; i < numEvents; i++ { @@ -675,7 +704,7 @@ func TestLimitSequenceByteSize(t *testing.T) { }, } } - actual, err = LimitSequenceByteSize(sequence, 65, true) + actual, err = LimitSequenceByteSize(sequence, 65+sequenceEventListOverheadSizeBytes, true) if !assert.NoError(t, err) { return } @@ -710,7 +739,7 @@ func TestLimitSequencesByteSize(t *testing.T) { sequences = append(sequences, sequence) } - actual, err := LimitSequencesByteSize(sequences, 65, true) + actual, err := LimitSequencesByteSize(sequences, 65+sequenceEventListOverheadSizeBytes, true) if !assert.NoError(t, err) { return }