From 09e1a124a405ca92c530e7225e8221f763a4d6c5 Mon Sep 17 00:00:00 2001 From: JamesMurkin Date: Tue, 25 Jun 2024 13:29:14 +0100 Subject: [PATCH] [Bug] Fix ingestion pipeline event counting (#3754) The function that counts the number of events per event sequence is wrong as it is actually counting the number of event sequences (each of which can contain many events) This just makes us actually count all the events in all event sequences Signed-off-by: JamesMurkin --- internal/common/ingest/ingestion_pipeline.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/internal/common/ingest/ingestion_pipeline.go b/internal/common/ingest/ingestion_pipeline.go index 98d3775a58e..9b590d1a7f4 100644 --- a/internal/common/ingest/ingestion_pipeline.go +++ b/internal/common/ingest/ingestion_pipeline.go @@ -150,7 +150,13 @@ func (i *IngestionPipeline[T]) Run(ctx *armadacontext.Context) error { // Batch up messages batchedEventSequences := make(chan *EventSequencesWithIds) - eventCounterFunc := func(seq *EventSequencesWithIds) int { return len(seq.EventSequences) } + eventCounterFunc := func(seq *EventSequencesWithIds) int { + totalEvents := 0 + for _, seq := range seq.EventSequences { + totalEvents += len(seq.Events) + } + return totalEvents + } eventPublisherFunc := func(b []*EventSequencesWithIds) { batchedEventSequences <- combineEventSequences(b) } batcher := NewBatcher[*EventSequencesWithIds](eventSequences, i.pulsarBatchSize, i.pulsarBatchDuration, eventCounterFunc, eventPublisherFunc) go func() {