diff --git a/pkg/processor/batch.go b/pkg/processor/batch.go index 386c4568..76e95aab 100644 --- a/pkg/processor/batch.go +++ b/pkg/processor/batch.go @@ -205,36 +205,85 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error { } // ImmediatelyExportItems immediately exports the items to the exporter. -// Useful for propogating errors from the exporter. +// Useful for propagating errors from the exporter. func (bvp *BatchItemProcessor[T]) ImmediatelyExportItems(ctx context.Context, items []*T) error { _, span := observability.Tracer().Start(ctx, "BatchItemProcessor.ImmediatelyExportItems") defer span.End() - if l := len(items); l > 0 { - countItemsToExport := len(items) + if len(items) == 0 { + return nil + } - // Split the items in to chunks of our max batch size - for i := 0; i < countItemsToExport; i += bvp.o.MaxExportBatchSize { - end := i + bvp.o.MaxExportBatchSize - if end > countItemsToExport { - end = countItemsToExport - } + countItemsToExport := len(items) - itemsBatch := items[i:end] + batchSize := bvp.o.MaxExportBatchSize + if batchSize == 0 { + batchSize = 1 // Ensure we can't divide by zero + } - bvp.log.WithFields(logrus.Fields{ - "count": len(itemsBatch), - }).Debug("Immediately exporting items") + batches := (countItemsToExport + batchSize - 1) / batchSize - err := bvp.exportWithTimeout(ctx, itemsBatch) + batchCh := make(chan []*T, batches) + errCh := make(chan error, 1) - if err != nil { - return err - } + defer close(errCh) + + var wg sync.WaitGroup + + cctx, cancel := context.WithCancel(ctx) + defer cancel() + + for i := 0; i < countItemsToExport; i += batchSize { + end := i + batchSize + if end > countItemsToExport { + end = countItemsToExport } + + itemsBatch := items[i:end] + batchCh <- itemsBatch } + close(batchCh) // Close the channel after all batches are sent - return nil + bvp.log. + WithField("workers", bvp.o.Workers). + WithField("batches", batches). + Debug("Split items into batches for immediate export") + + for i := 0; i < bvp.o.Workers && i < batches; i++ { + wg.Add(1) + + go func(workerID int) { + defer wg.Done() + + for itemsBatch := range batchCh { + bvp.log.WithFields(logrus.Fields{ + "count": len(itemsBatch), + "worker": workerID, + }).Debug("Immediately exporting items") + + err := bvp.exportWithTimeout(cctx, itemsBatch) + if err != nil { + select { + case errCh <- err: + default: + } + + cancel() // Cancel the context to stop other workers + + return + } + } + }(i) + } + + wg.Wait() + + select { + case err := <-errCh: + return err + default: + return nil + } } // exportWithTimeout exports the items with a timeout. diff --git a/pkg/processor/batch_test.go b/pkg/processor/batch_test.go index fa5c43d4..c7850f9d 100644 --- a/pkg/processor/batch_test.go +++ b/pkg/processor/batch_test.go @@ -9,6 +9,7 @@ package processor import ( "context" "errors" + "fmt" "io" "strconv" "sync" @@ -25,18 +26,28 @@ type TestItem struct { } type testBatchExporter[T TestItem] struct { - mu sync.Mutex - items []*T - sizes []int - batchCount int - shutdownCount int - errors []error - droppedCount int - idx int - err error + mu sync.Mutex + items []*T + sizes []int + batchCount int + shutdownCount int + errors []error + droppedCount int + idx int + err error + failNextExport bool + delay time.Duration } func (t *testBatchExporter[T]) ExportItems(ctx context.Context, items []*T) error { + time.Sleep(t.delay) + + if t.failNextExport { + t.failNextExport = false + + return errors.New("export failure") + } + t.mu.Lock() defer t.mu.Unlock() @@ -487,3 +498,72 @@ func TestBatchItemProcessorWithAsyncErrorExporter(t *testing.T) { t.Errorf("Expected write to fail") } } + +func TestBatchItemProcessorImmediatelyExportItems(t *testing.T) { + // Define a range of values for workers, maxExportBatchSize, and itemsToExport + workerCounts := []int{1, 5, 10} + maxBatchSizes := []int{1, 5, 10, 20} + itemCounts := []int{0, 1, 25, 50} + + for _, workers := range workerCounts { + for _, maxBatchSize := range maxBatchSizes { + for _, itemsToExport := range itemCounts { + t.Run(fmt.Sprintf("%d workers, batch size %d, %d items", workers, maxBatchSize, itemsToExport), func(t *testing.T) { + te := testBatchExporter[TestItem]{} + bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(maxBatchSize), WithWorkers(workers)) + require.NoError(t, err) + + items := make([]*TestItem, itemsToExport) + for i := 0; i < itemsToExport; i++ { + items[i] = &TestItem{name: strconv.Itoa(i)} + } + + err = bsp.ImmediatelyExportItems(context.Background(), items) + require.NoError(t, err) + + expectedBatches := (itemsToExport + maxBatchSize - 1) / maxBatchSize + if itemsToExport == 0 { + expectedBatches = 0 + } + + if te.len() != itemsToExport { + t.Errorf("Expected all items to be exported, got: %v", te.len()) + } + + if te.getBatchCount() != expectedBatches { + t.Errorf("Expected %v batches to be exported, got: %v", expectedBatches, te.getBatchCount()) + } + }) + } + } + } +} + +func TestBatchItemProcessorExportCancellationOnFailure(t *testing.T) { + workers := 10 + maxBatchSize := 10 + itemsToExport := 5000 + + t.Run(fmt.Sprintf("%d workers, batch size %d, %d items with cancellation on failure", workers, maxBatchSize, itemsToExport), func(t *testing.T) { + te := testBatchExporter[TestItem]{ + delay: 100 * time.Millisecond, // Introduce a delay to simulate processing time + } + bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(maxBatchSize), WithWorkers(workers)) + require.NoError(t, err) + + items := make([]*TestItem, itemsToExport) + for i := 0; i < itemsToExport; i++ { + items[i] = &TestItem{name: strconv.Itoa(i)} + } + + // Simulate export failure and expect cancellation + te.failNextExport = true + + err = bsp.ImmediatelyExportItems(context.Background(), items) + require.Error(t, err, "Expected an error due to simulated export failure") + + // Ensure we exported less than the number of batches since the export should've + // stopped due to the failure. + require.Less(t, te.batchCount, itemsToExport/maxBatchSize) + }) +}