diff --git a/internal/common/ingest/ingestion_pipeline.go b/internal/common/ingest/ingestion_pipeline.go index 98d3775a58e..9b590d1a7f4 100644 --- a/internal/common/ingest/ingestion_pipeline.go +++ b/internal/common/ingest/ingestion_pipeline.go @@ -150,7 +150,13 @@ func (i *IngestionPipeline[T]) Run(ctx *armadacontext.Context) error { // Batch up messages batchedEventSequences := make(chan *EventSequencesWithIds) - eventCounterFunc := func(seq *EventSequencesWithIds) int { return len(seq.EventSequences) } + eventCounterFunc := func(seq *EventSequencesWithIds) int { + totalEvents := 0 + for _, seq := range seq.EventSequences { + totalEvents += len(seq.Events) + } + return totalEvents + } eventPublisherFunc := func(b []*EventSequencesWithIds) { batchedEventSequences <- combineEventSequences(b) } batcher := NewBatcher[*EventSequencesWithIds](eventSequences, i.pulsarBatchSize, i.pulsarBatchDuration, eventCounterFunc, eventPublisherFunc) go func() {