diff --git a/pkg/processor/batch.go b/pkg/processor/batch.go index baf57739..a3f84ec6 100644 --- a/pkg/processor/batch.go +++ b/pkg/processor/batch.go @@ -213,11 +213,13 @@ func (bvp *BatchItemProcessor[T]) ImmediatelyExportItems(ctx context.Context, it if l := len(items); l > 0 { countItemsToExport := len(items) - // Split the items in to chunks of our max batch size + // Split the items into chunks of our max batch size batchCh := make(chan []*T, bvp.o.Workers) errCh := make(chan error, bvp.o.Workers) var wg sync.WaitGroup + ctx, cancel := context.WithCancel(ctx) + defer cancel() for i := 0; i < countItemsToExport; i += bvp.o.MaxExportBatchSize { end := i + bvp.o.MaxExportBatchSize @@ -233,23 +235,25 @@ func (bvp *BatchItemProcessor[T]) ImmediatelyExportItems(ctx context.Context, it for i := 0; i < bvp.o.Workers; i++ { wg.Add(1) - go func() { + go func(workerID int) { defer wg.Done() for itemsBatch := range batchCh { bvp.log.WithFields(logrus.Fields{ "count": len(itemsBatch), - "worker": i, + "worker": workerID, }).Debug("Immediately exporting items") err := bvp.exportWithTimeout(ctx, itemsBatch) if err != nil { errCh <- err + cancel() // Cancel the context to stop other workers + return } } - }() + }(i) } wg.Wait()