Skip to content

Commit

Permalink
[Bug] Fix ingestion pipeline event counting (#3754)
Browse files Browse the repository at this point in the history
The function that counts the number of events per event sequence is wrong as it is actually counting the number of event sequences (each of which can contain many events)

This just makes us actually count all the events in all event sequences

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>
  • Loading branch information
JamesMurkin authored Jun 25, 2024
1 parent 2b3e7f3 commit 09e1a12
Showing 1 changed file with 7 additions and 1 deletion.
8 changes: 7 additions & 1 deletion internal/common/ingest/ingestion_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,13 @@ func (i *IngestionPipeline[T]) Run(ctx *armadacontext.Context) error {

// Batch up messages
batchedEventSequences := make(chan *EventSequencesWithIds)
eventCounterFunc := func(seq *EventSequencesWithIds) int { return len(seq.EventSequences) }
eventCounterFunc := func(seq *EventSequencesWithIds) int {
totalEvents := 0
for _, seq := range seq.EventSequences {
totalEvents += len(seq.Events)
}
return totalEvents
}
eventPublisherFunc := func(b []*EventSequencesWithIds) { batchedEventSequences <- combineEventSequences(b) }
batcher := NewBatcher[*EventSequencesWithIds](eventSequences, i.pulsarBatchSize, i.pulsarBatchDuration, eventCounterFunc, eventPublisherFunc)
go func() {
Expand Down

0 comments on commit 09e1a12

Please sign in to comment.