From 50034eae7eca5cb96df5fcc181da84eb179fcf02 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Wed, 12 Jun 2024 15:33:00 +1000 Subject: [PATCH 01/10] feat(processor): Add worker support for sync workloads --- pkg/processor/batch.go | 40 +++++++++++++++++++++++++++++++++------- 1 file changed, 33 insertions(+), 7 deletions(-) diff --git a/pkg/processor/batch.go b/pkg/processor/batch.go index 386c4568..baf57739 100644 --- a/pkg/processor/batch.go +++ b/pkg/processor/batch.go @@ -214,6 +214,11 @@ func (bvp *BatchItemProcessor[T]) ImmediatelyExportItems(ctx context.Context, it countItemsToExport := len(items) // Split the items in to chunks of our max batch size + batchCh := make(chan []*T, bvp.o.Workers) + errCh := make(chan error, bvp.o.Workers) + + var wg sync.WaitGroup + for i := 0; i < countItemsToExport; i += bvp.o.MaxExportBatchSize { end := i + bvp.o.MaxExportBatchSize if end > countItemsToExport { @@ -221,16 +226,37 @@ func (bvp *BatchItemProcessor[T]) ImmediatelyExportItems(ctx context.Context, it } itemsBatch := items[i:end] + batchCh <- itemsBatch + } + close(batchCh) - bvp.log.WithFields(logrus.Fields{ - "count": len(itemsBatch), - }).Debug("Immediately exporting items") + for i := 0; i < bvp.o.Workers; i++ { + wg.Add(1) - err := bvp.exportWithTimeout(ctx, itemsBatch) + go func() { + defer wg.Done() - if err != nil { - return err - } + for itemsBatch := range batchCh { + bvp.log.WithFields(logrus.Fields{ + "count": len(itemsBatch), + "worker": i, + }).Debug("Immediately exporting items") + + err := bvp.exportWithTimeout(ctx, itemsBatch) + if err != nil { + errCh <- err + + return + } + } + }() + } + + wg.Wait() + close(errCh) + + if len(errCh) > 0 { + return <-errCh } } From 1cb073aeb1ef8e865630ce0d340724ee50933123 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Wed, 12 Jun 2024 15:39:10 +1000 Subject: [PATCH 02/10] Cancellable --- pkg/processor/batch.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/pkg/processor/batch.go b/pkg/processor/batch.go index baf57739..a3f84ec6 100644 --- a/pkg/processor/batch.go +++ b/pkg/processor/batch.go @@ -213,11 +213,13 @@ func (bvp *BatchItemProcessor[T]) ImmediatelyExportItems(ctx context.Context, it if l := len(items); l > 0 { countItemsToExport := len(items) - // Split the items in to chunks of our max batch size + // Split the items into chunks of our max batch size batchCh := make(chan []*T, bvp.o.Workers) errCh := make(chan error, bvp.o.Workers) var wg sync.WaitGroup + ctx, cancel := context.WithCancel(ctx) + defer cancel() for i := 0; i < countItemsToExport; i += bvp.o.MaxExportBatchSize { end := i + bvp.o.MaxExportBatchSize @@ -233,23 +235,25 @@ func (bvp *BatchItemProcessor[T]) ImmediatelyExportItems(ctx context.Context, it for i := 0; i < bvp.o.Workers; i++ { wg.Add(1) - go func() { + go func(workerID int) { defer wg.Done() for itemsBatch := range batchCh { bvp.log.WithFields(logrus.Fields{ "count": len(itemsBatch), - "worker": i, + "worker": workerID, }).Debug("Immediately exporting items") err := bvp.exportWithTimeout(ctx, itemsBatch) if err != nil { errCh <- err + cancel() // Cancel the context to stop other workers + return } } - }() + }(i) } wg.Wait() From 61b5a0ee20513603b0ed4fc1df266dbb98db1db0 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Thu, 13 Jun 2024 11:23:33 +1000 Subject: [PATCH 03/10] test: Add unit tests for batch item processor --- pkg/processor/batch.go | 103 +++++++++++++++++++++--------------- pkg/processor/batch_test.go | 98 ++++++++++++++++++++++++++++++---- 2 files changed, 150 insertions(+), 51 deletions(-) diff --git a/pkg/processor/batch.go b/pkg/processor/batch.go index a3f84ec6..76e95aab 100644 --- a/pkg/processor/batch.go +++ b/pkg/processor/batch.go @@ -205,66 +205,85 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error { } // ImmediatelyExportItems immediately exports the items to the exporter. -// Useful for propogating errors from the exporter. +// Useful for propagating errors from the exporter. func (bvp *BatchItemProcessor[T]) ImmediatelyExportItems(ctx context.Context, items []*T) error { _, span := observability.Tracer().Start(ctx, "BatchItemProcessor.ImmediatelyExportItems") defer span.End() - if l := len(items); l > 0 { - countItemsToExport := len(items) + if len(items) == 0 { + return nil + } - // Split the items into chunks of our max batch size - batchCh := make(chan []*T, bvp.o.Workers) - errCh := make(chan error, bvp.o.Workers) + countItemsToExport := len(items) - var wg sync.WaitGroup - ctx, cancel := context.WithCancel(ctx) - defer cancel() + batchSize := bvp.o.MaxExportBatchSize + if batchSize == 0 { + batchSize = 1 // Ensure we can't divide by zero + } - for i := 0; i < countItemsToExport; i += bvp.o.MaxExportBatchSize { - end := i + bvp.o.MaxExportBatchSize - if end > countItemsToExport { - end = countItemsToExport - } + batches := (countItemsToExport + batchSize - 1) / batchSize - itemsBatch := items[i:end] - batchCh <- itemsBatch - } - close(batchCh) + batchCh := make(chan []*T, batches) + errCh := make(chan error, 1) - for i := 0; i < bvp.o.Workers; i++ { - wg.Add(1) + defer close(errCh) - go func(workerID int) { - defer wg.Done() + var wg sync.WaitGroup - for itemsBatch := range batchCh { - bvp.log.WithFields(logrus.Fields{ - "count": len(itemsBatch), - "worker": workerID, - }).Debug("Immediately exporting items") + cctx, cancel := context.WithCancel(ctx) + defer cancel() - err := bvp.exportWithTimeout(ctx, itemsBatch) - if err != nil { - errCh <- err - - cancel() // Cancel the context to stop other workers + for i := 0; i < countItemsToExport; i += batchSize { + end := i + batchSize + if end > countItemsToExport { + end = countItemsToExport + } - return + itemsBatch := items[i:end] + batchCh <- itemsBatch + } + close(batchCh) // Close the channel after all batches are sent + + bvp.log. + WithField("workers", bvp.o.Workers). + WithField("batches", batches). + Debug("Split items into batches for immediate export") + + for i := 0; i < bvp.o.Workers && i < batches; i++ { + wg.Add(1) + + go func(workerID int) { + defer wg.Done() + + for itemsBatch := range batchCh { + bvp.log.WithFields(logrus.Fields{ + "count": len(itemsBatch), + "worker": workerID, + }).Debug("Immediately exporting items") + + err := bvp.exportWithTimeout(cctx, itemsBatch) + if err != nil { + select { + case errCh <- err: + default: } - } - }(i) - } - wg.Wait() - close(errCh) + cancel() // Cancel the context to stop other workers - if len(errCh) > 0 { - return <-errCh - } + return + } + } + }(i) } - return nil + wg.Wait() + + select { + case err := <-errCh: + return err + default: + return nil + } } // exportWithTimeout exports the items with a timeout. diff --git a/pkg/processor/batch_test.go b/pkg/processor/batch_test.go index fa5c43d4..c7850f9d 100644 --- a/pkg/processor/batch_test.go +++ b/pkg/processor/batch_test.go @@ -9,6 +9,7 @@ package processor import ( "context" "errors" + "fmt" "io" "strconv" "sync" @@ -25,18 +26,28 @@ type TestItem struct { } type testBatchExporter[T TestItem] struct { - mu sync.Mutex - items []*T - sizes []int - batchCount int - shutdownCount int - errors []error - droppedCount int - idx int - err error + mu sync.Mutex + items []*T + sizes []int + batchCount int + shutdownCount int + errors []error + droppedCount int + idx int + err error + failNextExport bool + delay time.Duration } func (t *testBatchExporter[T]) ExportItems(ctx context.Context, items []*T) error { + time.Sleep(t.delay) + + if t.failNextExport { + t.failNextExport = false + + return errors.New("export failure") + } + t.mu.Lock() defer t.mu.Unlock() @@ -487,3 +498,72 @@ func TestBatchItemProcessorWithAsyncErrorExporter(t *testing.T) { t.Errorf("Expected write to fail") } } + +func TestBatchItemProcessorImmediatelyExportItems(t *testing.T) { + // Define a range of values for workers, maxExportBatchSize, and itemsToExport + workerCounts := []int{1, 5, 10} + maxBatchSizes := []int{1, 5, 10, 20} + itemCounts := []int{0, 1, 25, 50} + + for _, workers := range workerCounts { + for _, maxBatchSize := range maxBatchSizes { + for _, itemsToExport := range itemCounts { + t.Run(fmt.Sprintf("%d workers, batch size %d, %d items", workers, maxBatchSize, itemsToExport), func(t *testing.T) { + te := testBatchExporter[TestItem]{} + bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(maxBatchSize), WithWorkers(workers)) + require.NoError(t, err) + + items := make([]*TestItem, itemsToExport) + for i := 0; i < itemsToExport; i++ { + items[i] = &TestItem{name: strconv.Itoa(i)} + } + + err = bsp.ImmediatelyExportItems(context.Background(), items) + require.NoError(t, err) + + expectedBatches := (itemsToExport + maxBatchSize - 1) / maxBatchSize + if itemsToExport == 0 { + expectedBatches = 0 + } + + if te.len() != itemsToExport { + t.Errorf("Expected all items to be exported, got: %v", te.len()) + } + + if te.getBatchCount() != expectedBatches { + t.Errorf("Expected %v batches to be exported, got: %v", expectedBatches, te.getBatchCount()) + } + }) + } + } + } +} + +func TestBatchItemProcessorExportCancellationOnFailure(t *testing.T) { + workers := 10 + maxBatchSize := 10 + itemsToExport := 5000 + + t.Run(fmt.Sprintf("%d workers, batch size %d, %d items with cancellation on failure", workers, maxBatchSize, itemsToExport), func(t *testing.T) { + te := testBatchExporter[TestItem]{ + delay: 100 * time.Millisecond, // Introduce a delay to simulate processing time + } + bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(maxBatchSize), WithWorkers(workers)) + require.NoError(t, err) + + items := make([]*TestItem, itemsToExport) + for i := 0; i < itemsToExport; i++ { + items[i] = &TestItem{name: strconv.Itoa(i)} + } + + // Simulate export failure and expect cancellation + te.failNextExport = true + + err = bsp.ImmediatelyExportItems(context.Background(), items) + require.Error(t, err, "Expected an error due to simulated export failure") + + // Ensure we exported less than the number of batches since the export should've + // stopped due to the failure. + require.Less(t, te.batchCount, itemsToExport/maxBatchSize) + }) +} From 188fad40108e60d70a64d1333e4786a70c7f2244 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Thu, 13 Jun 2024 16:13:38 +1000 Subject: [PATCH 04/10] feat: Static workers --- pkg/processor/batch.go | 169 +++++++++++++++++++----------------- pkg/processor/batch_test.go | 107 ++++++++++++++++++++--- 2 files changed, 184 insertions(+), 92 deletions(-) diff --git a/pkg/processor/batch.go b/pkg/processor/batch.go index 76e95aab..7320b4a2 100644 --- a/pkg/processor/batch.go +++ b/pkg/processor/batch.go @@ -1,14 +1,9 @@ -/* - * This processor was adapted from the OpenTelemetry Collector's batch processor. - * - * Authors: OpenTelemetry - * URL: https://github.com/open-telemetry/opentelemetry-go/blob/496c086ece129182662c14d6a023a2b2da09fe30/sdk/trace/batch_span_processor.go - */ - package processor import ( "context" + "errors" + "fmt" "runtime" "sync" "sync/atomic" @@ -91,6 +86,22 @@ type BatchItemProcessorOptions struct { Workers int } +func (o *BatchItemProcessorOptions) Validate() error { + if o.MaxExportBatchSize > o.MaxQueueSize { + return errors.New("max export batch size cannot be greater than max queue size") + } + + if o.Workers == 0 { + return errors.New("workers must be greater than 0") + } + + if o.MaxExportBatchSize < 1 { + return errors.New("max export batch size must be greater than 0") + } + + return nil +} + // BatchItemProcessor is a buffer that batches asynchronously-received // items and sends them to a exporter when complete. type BatchItemProcessor[T any] struct { @@ -105,8 +116,8 @@ type BatchItemProcessor[T any] struct { metrics *Metrics - batches chan []*T - batchReady chan bool + batches chan batchWithErr[T] + batchReady chan struct{} batch []*T batchMutex sync.Mutex @@ -118,6 +129,12 @@ type BatchItemProcessor[T any] struct { stopWorkersCh chan struct{} } +type batchWithErr[T any] struct { + items []*T + errCh chan error + completedCh chan struct{} +} + // NewBatchItemProcessor creates a new ItemProcessor that will send completed // item batches to the exporter with the supplied options. // @@ -146,6 +163,10 @@ func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log log opt(&o) } + if err := o.Validate(); err != nil { + return nil, fmt.Errorf("invalid batch item processor options: %w: %s", err, name) + } + metrics := DefaultMetrics bvp := BatchItemProcessor[T]{ @@ -161,15 +182,14 @@ func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log log stopWorkersCh: make(chan struct{}), } - bvp.batches = make(chan []*T, o.Workers) // Buffer the channel to hold batches for each worker - bvp.batchReady = make(chan bool, 1) + bvp.batches = make(chan batchWithErr[T], o.Workers) + bvp.batchReady = make(chan struct{}, 1) bvp.stopWait.Add(o.Workers) for i := 0; i < o.Workers; i++ { go func() { defer bvp.stopWait.Done() - bvp.worker(context.Background()) }() } @@ -179,7 +199,6 @@ func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log log return &bvp, nil } -// OnEnd method enqueues a item for later processing. func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error { _, span := observability.Tracer().Start(ctx, "BatchItemProcessor.Write") defer span.End() @@ -187,14 +206,13 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error { bvp.metrics.SetItemsQueued(bvp.name, float64(len(bvp.queue))) if bvp.o.ShippingMethod == ShippingMethodSync { - return bvp.ImmediatelyExportItems(ctx, s) + return bvp.immediatelyExportItems(ctx, s) } bvp.metrics.SetItemsQueued(bvp.name, float64(len(bvp.queue))) - // Do not enqueue items if we are just going to drop them. if bvp.e == nil { - return nil + return errors.New("exporter is nil") } for _, i := range s { @@ -204,9 +222,9 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error { return nil } -// ImmediatelyExportItems immediately exports the items to the exporter. +// immediatelyExportItems immediately exports the items to the exporter. // Useful for propagating errors from the exporter. -func (bvp *BatchItemProcessor[T]) ImmediatelyExportItems(ctx context.Context, items []*T) error { +func (bvp *BatchItemProcessor[T]) immediatelyExportItems(ctx context.Context, items []*T) error { _, span := observability.Tracer().Start(ctx, "BatchItemProcessor.ImmediatelyExportItems") defer span.End() @@ -214,76 +232,63 @@ func (bvp *BatchItemProcessor[T]) ImmediatelyExportItems(ctx context.Context, it return nil } - countItemsToExport := len(items) - batchSize := bvp.o.MaxExportBatchSize if batchSize == 0 { - batchSize = 1 // Ensure we can't divide by zero + batchSize = 1 } - batches := (countItemsToExport + batchSize - 1) / batchSize - - batchCh := make(chan []*T, batches) - errCh := make(chan error, 1) + errCh := make(chan error, bvp.o.Workers) + completedCh := make(chan struct{}, bvp.o.Workers) + pendingExports := 0 - defer close(errCh) - - var wg sync.WaitGroup - - cctx, cancel := context.WithCancel(ctx) - defer cancel() - - for i := 0; i < countItemsToExport; i += batchSize { + for i := 0; i < len(items); i += batchSize { end := i + batchSize - if end > countItemsToExport { - end = countItemsToExport + if end > len(items) { + end = len(items) } itemsBatch := items[i:end] - batchCh <- itemsBatch - } - close(batchCh) // Close the channel after all batches are sent - - bvp.log. - WithField("workers", bvp.o.Workers). - WithField("batches", batches). - Debug("Split items into batches for immediate export") + bvp.batches <- batchWithErr[T]{items: itemsBatch, errCh: errCh, completedCh: completedCh} + bvp.batchReady <- struct{}{} - for i := 0; i < bvp.o.Workers && i < batches; i++ { - wg.Add(1) + pendingExports++ - go func(workerID int) { - defer wg.Done() + // Only create a new batch if there are already bvp.o.Workers pending exports. + // We do this so we don't bother queueing up any more batches if + // a previous batch has failed. + if pendingExports >= bvp.o.Workers { + select { + case batchErr := <-errCh: + pendingExports-- - for itemsBatch := range batchCh { - bvp.log.WithFields(logrus.Fields{ - "count": len(itemsBatch), - "worker": workerID, - }).Debug("Immediately exporting items") - - err := bvp.exportWithTimeout(cctx, itemsBatch) - if err != nil { - select { - case errCh <- err: - default: - } - - cancel() // Cancel the context to stop other workers - - return + if batchErr != nil { + return batchErr } + case <-completedCh: + pendingExports-- + case <-ctx.Done(): + return ctx.Err() } - }(i) + } } - wg.Wait() + // Wait for remaining pending exports to complete + for pendingExports > 0 { + select { + case batchErr := <-errCh: + pendingExports-- - select { - case err := <-errCh: - return err - default: - return nil + if batchErr != nil { + return batchErr + } + case <-completedCh: + pendingExports-- + case <-ctx.Done(): + return ctx.Err() + } } + + return nil } // exportWithTimeout exports the items with a timeout. @@ -317,7 +322,6 @@ func (bvp *BatchItemProcessor[T]) Shutdown(ctx context.Context) error { bvp.stopOnce.Do(func() { wait := make(chan struct{}) go func() { - // Stop accepting new items close(bvp.stopCh) // Drain the queue @@ -410,17 +414,15 @@ func (bvp *BatchItemProcessor[T]) batchBuilder(ctx context.Context) { return case sd := <-bvp.queue: bvp.batchMutex.Lock() - bvp.batch = append(bvp.batch, sd) if len(bvp.batch) >= bvp.o.MaxExportBatchSize { batchCopy := make([]*T, len(bvp.batch)) copy(batchCopy, bvp.batch) - bvp.batches <- batchCopy + bvp.batches <- batchWithErr[T]{items: batchCopy, errCh: make(chan error), completedCh: make(chan struct{})} bvp.batch = bvp.batch[:0] - bvp.batchReady <- true + bvp.batchReady <- struct{}{} } - bvp.batchMutex.Unlock() case <-bvp.timer.C: bvp.batchMutex.Lock() @@ -428,9 +430,9 @@ func (bvp *BatchItemProcessor[T]) batchBuilder(ctx context.Context) { if len(bvp.batch) > 0 { batchCopy := make([]*T, len(bvp.batch)) copy(batchCopy, bvp.batch) - bvp.batches <- batchCopy + bvp.batches <- batchWithErr[T]{items: batchCopy, errCh: make(chan error), completedCh: make(chan struct{})} bvp.batch = bvp.batch[:0] - bvp.batchReady <- true + bvp.batchReady <- struct{}{} } else { // Reset the timer if there are no items in the batch. // If there are items in the batch, one of the workers will reset the timer. @@ -452,11 +454,17 @@ func (bvp *BatchItemProcessor[T]) worker(ctx context.Context) { return case <-bvp.batchReady: bvp.timer.Reset(bvp.o.BatchTimeout) - batch := <-bvp.batches - if err := bvp.exportWithTimeout(ctx, batch); err != nil { + + if err := bvp.exportWithTimeout(ctx, batch.items); err != nil { bvp.log.WithError(err).Error("failed to export items") + + if batch.errCh != nil { + batch.errCh <- err + } } + + batch.completedCh <- struct{}{} } } } @@ -471,7 +479,8 @@ func (bvp *BatchItemProcessor[T]) drainQueue() { // Wait for the workers to finish processing all batches. for len(bvp.batches) > 0 { - time.Sleep(10 * time.Millisecond) + batch := <-bvp.batches + <-batch.completedCh } // Close the batches channel since no more batches will be sent. diff --git a/pkg/processor/batch_test.go b/pkg/processor/batch_test.go index c7850f9d..1ac75bdd 100644 --- a/pkg/processor/batch_test.go +++ b/pkg/processor/batch_test.go @@ -98,10 +98,11 @@ func TestNewBatchItemProcessorWithNilExporter(t *testing.T) { bsp, err := NewBatchItemProcessor[TestItem](nil, "processor", nullLogger()) require.NoError(t, err) - if err := bsp.Write(context.Background(), []*TestItem{{ + err = bsp.Write(context.Background(), []*TestItem{{ name: "test", - }}); err != nil { - t.Errorf("failed to Write to the BatchItemProcessor: %v", err) + }}) + if err == nil || err.Error() != "exporter is nil" { + t.Errorf("expected error 'exporter is nil', got: %v", err) } if err := bsp.Shutdown(context.Background()); err != nil { @@ -122,8 +123,10 @@ func TestNewBatchItemProcessorWithOptions(t *testing.T) { schDelay := 100 * time.Millisecond options := []testOption{ { - name: "default", - o: []BatchItemProcessorOption{}, + name: "default", + o: []BatchItemProcessorOption{ + WithShippingMethod(ShippingMethodSync), + }, genNumItems: 2053, wantNumItems: 2048, wantBatchCount: 4, @@ -132,6 +135,7 @@ func TestNewBatchItemProcessorWithOptions(t *testing.T) { name: "non-default BatchTimeout", o: []BatchItemProcessorOption{ WithBatchTimeout(schDelay), + WithShippingMethod(ShippingMethodSync), }, writeNumItems: 2048, genNumItems: 2053, @@ -143,6 +147,7 @@ func TestNewBatchItemProcessorWithOptions(t *testing.T) { o: []BatchItemProcessorOption{ WithBatchTimeout(schDelay), WithMaxQueueSize(200), + WithShippingMethod(ShippingMethodSync), }, writeNumItems: 200, genNumItems: 205, @@ -155,6 +160,7 @@ func TestNewBatchItemProcessorWithOptions(t *testing.T) { WithBatchTimeout(schDelay), WithMaxQueueSize(200), WithMaxExportBatchSize(20), + WithShippingMethod(ShippingMethodSync), }, writeNumItems: 200, genNumItems: 205, @@ -270,10 +276,10 @@ func TestBatchItemProcessorShutdown(t *testing.T) { func TestBatchItemProcessorDrainQueue(t *testing.T) { be := testBatchExporter[TestItem]{} - bsp, err := NewBatchItemProcessor[TestItem](&be, "processor", nullLogger(), WithMaxExportBatchSize(5), WithBatchTimeout(5*time.Minute)) + bsp, err := NewBatchItemProcessor[TestItem](&be, "processor", nullLogger(), WithMaxExportBatchSize(5), WithBatchTimeout(5*time.Minute), WithWorkers(5)) require.NoError(t, err) - itemsToExport := 500 + itemsToExport := 50 for i := 0; i < itemsToExport; i++ { if err := bsp.Write(context.Background(), []*TestItem{{ @@ -499,7 +505,7 @@ func TestBatchItemProcessorWithAsyncErrorExporter(t *testing.T) { } } -func TestBatchItemProcessorImmediatelyExportItems(t *testing.T) { +func TestBatchItemProcessorSyncShipping(t *testing.T) { // Define a range of values for workers, maxExportBatchSize, and itemsToExport workerCounts := []int{1, 5, 10} maxBatchSizes := []int{1, 5, 10, 20} @@ -510,7 +516,7 @@ func TestBatchItemProcessorImmediatelyExportItems(t *testing.T) { for _, itemsToExport := range itemCounts { t.Run(fmt.Sprintf("%d workers, batch size %d, %d items", workers, maxBatchSize, itemsToExport), func(t *testing.T) { te := testBatchExporter[TestItem]{} - bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(maxBatchSize), WithWorkers(workers)) + bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(maxBatchSize), WithWorkers(workers), WithShippingMethod(ShippingMethodSync)) require.NoError(t, err) items := make([]*TestItem, itemsToExport) @@ -518,7 +524,7 @@ func TestBatchItemProcessorImmediatelyExportItems(t *testing.T) { items[i] = &TestItem{name: strconv.Itoa(i)} } - err = bsp.ImmediatelyExportItems(context.Background(), items) + err = bsp.Write(context.Background(), items) require.NoError(t, err) expectedBatches := (itemsToExport + maxBatchSize - 1) / maxBatchSize @@ -548,7 +554,7 @@ func TestBatchItemProcessorExportCancellationOnFailure(t *testing.T) { te := testBatchExporter[TestItem]{ delay: 100 * time.Millisecond, // Introduce a delay to simulate processing time } - bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(maxBatchSize), WithWorkers(workers)) + bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(maxBatchSize), WithWorkers(workers), WithShippingMethod(ShippingMethodSync)) require.NoError(t, err) items := make([]*TestItem, itemsToExport) @@ -559,7 +565,7 @@ func TestBatchItemProcessorExportCancellationOnFailure(t *testing.T) { // Simulate export failure and expect cancellation te.failNextExport = true - err = bsp.ImmediatelyExportItems(context.Background(), items) + err = bsp.Write(context.Background(), items) require.Error(t, err, "Expected an error due to simulated export failure") // Ensure we exported less than the number of batches since the export should've @@ -567,3 +573,80 @@ func TestBatchItemProcessorExportCancellationOnFailure(t *testing.T) { require.Less(t, te.batchCount, itemsToExport/maxBatchSize) }) } + +func TestBatchItemProcessorWithZeroWorkers(t *testing.T) { + te := testBatchExporter[TestItem]{} + _, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(10), WithWorkers(0)) + require.Error(t, err, "Expected an error when initializing with zero workers") +} + +func TestBatchItemProcessorWithNegativeBatchSize(t *testing.T) { + te := testBatchExporter[TestItem]{} + _, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(-1), WithWorkers(5)) + require.Error(t, err, "Expected an error when initializing with negative batch size") +} + +func TestBatchItemProcessorWithNegativeQueueSize(t *testing.T) { + te := testBatchExporter[TestItem]{} + _, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxQueueSize(-1), WithWorkers(5)) + require.Error(t, err, "Expected an error when initializing with negative queue size") +} + +func TestBatchItemProcessorWithZeroBatchSize(t *testing.T) { + te := testBatchExporter[TestItem]{} + _, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(0), WithWorkers(5)) + require.Error(t, err, "Expected an error when initializing with zero batch size") +} + +func TestBatchItemProcessorWithZeroQueueSize(t *testing.T) { + te := testBatchExporter[TestItem]{} + _, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxQueueSize(0), WithWorkers(5)) + require.Error(t, err, "Expected an error when initializing with zero queue size") +} + +func TestBatchItemProcessorShutdownWithoutExport(t *testing.T) { + te := testBatchExporter[TestItem]{} + bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(10), WithWorkers(5)) + require.NoError(t, err) + + require.NoError(t, bsp.Shutdown(context.Background()), "shutting down BatchItemProcessor") + require.Equal(t, 0, te.len(), "No items should have been exported") +} + +func TestBatchItemProcessorExportWithTimeout(t *testing.T) { + te := testBatchExporter[TestItem]{delay: 2 * time.Second} + bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(10), WithWorkers(5), WithExportTimeout(1*time.Second), WithShippingMethod(ShippingMethodSync)) + require.NoError(t, err) + + itemsToExport := 10 + items := make([]*TestItem, itemsToExport) + + for i := 0; i < itemsToExport; i++ { + items[i] = &TestItem{name: strconv.Itoa(i)} + } + + err = bsp.Write(context.Background(), items) + require.Error(t, err, "Expected an error due to export timeout") + require.Less(t, te.len(), itemsToExport, "Expected some items to be exported before timeout") +} + +func TestBatchItemProcessorWithBatchTimeout(t *testing.T) { + te := testBatchExporter[TestItem]{} + bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(10), WithWorkers(5), WithBatchTimeout(1*time.Second)) + require.NoError(t, err) + + itemsToExport := 5 + items := make([]*TestItem, itemsToExport) + + for i := 0; i < itemsToExport; i++ { + items[i] = &TestItem{name: strconv.Itoa(i)} + } + + for _, item := range items { + err := bsp.Write(context.Background(), []*TestItem{item}) + require.NoError(t, err) + } + + time.Sleep(2 * time.Second) + require.Equal(t, itemsToExport, te.len(), "Expected all items to be exported after batch timeout") +} From 413e0e18fb01bba711ef6314494c916630d45b41 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Thu, 13 Jun 2024 17:13:56 +1000 Subject: [PATCH 05/10] more debugging --- pkg/processor/batch.go | 86 ++++++++++++++++++++++++++++--------- pkg/processor/batch_test.go | 3 +- 2 files changed, 67 insertions(+), 22 deletions(-) diff --git a/pkg/processor/batch.go b/pkg/processor/batch.go index 7320b4a2..e6a541de 100644 --- a/pkg/processor/batch.go +++ b/pkg/processor/batch.go @@ -175,26 +175,40 @@ func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log log log: log, name: name, metrics: metrics, - batch: make([]*T, 0, o.MaxExportBatchSize), + batch: make([]*T, 0, o.MaxQueueSize), timer: time.NewTimer(o.BatchTimeout), queue: make(chan *T, o.MaxQueueSize), stopCh: make(chan struct{}), stopWorkersCh: make(chan struct{}), } + bvp.log.WithFields( + logrus.Fields{ + "workers": bvp.o.Workers, + "batch_timeout": bvp.o.BatchTimeout, + "export_timeout": bvp.o.ExportTimeout, + "max_export_batch_size": bvp.o.MaxExportBatchSize, + "max_queue_size": bvp.o.MaxQueueSize, + "shipping_method": bvp.o.ShippingMethod, + }, + ).Info("Batch item processor initialized") + bvp.batches = make(chan batchWithErr[T], o.Workers) bvp.batchReady = make(chan struct{}, 1) bvp.stopWait.Add(o.Workers) for i := 0; i < o.Workers; i++ { - go func() { + go func(num int) { defer bvp.stopWait.Done() - bvp.worker(context.Background()) - }() + bvp.worker(context.Background(), num) + }(i) } - go bvp.batchBuilder(context.Background()) // Start building batches + go func() { + bvp.batchBuilder(context.Background()) // Start building batches + bvp.log.Info("Batch builder exited") + }() return &bvp, nil } @@ -317,11 +331,11 @@ func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBa func (bvp *BatchItemProcessor[T]) Shutdown(ctx context.Context) error { var err error - bvp.log.Debug("Shutting down processor") - bvp.stopOnce.Do(func() { wait := make(chan struct{}) go func() { + bvp.log.Info("Stopping processor") + close(bvp.stopCh) // Drain the queue @@ -411,51 +425,73 @@ func (bvp *BatchItemProcessor[T]) batchBuilder(ctx context.Context) { for { select { case <-bvp.stopWorkersCh: + bvp.log.Info("Stopping batch builder") + return case sd := <-bvp.queue: + bvp.log.Info("New item added to queue") + bvp.batchMutex.Lock() bvp.batch = append(bvp.batch, sd) if len(bvp.batch) >= bvp.o.MaxExportBatchSize { - batchCopy := make([]*T, len(bvp.batch)) - copy(batchCopy, bvp.batch) - bvp.batches <- batchWithErr[T]{items: batchCopy, errCh: make(chan error), completedCh: make(chan struct{})} - bvp.batch = bvp.batch[:0] - bvp.batchReady <- struct{}{} + bvp.sendBatch("max_export_batch_size") } bvp.batchMutex.Unlock() case <-bvp.timer.C: bvp.batchMutex.Lock() + bvp.log.Info("Batch timeout reached") if len(bvp.batch) > 0 { - batchCopy := make([]*T, len(bvp.batch)) - copy(batchCopy, bvp.batch) - bvp.batches <- batchWithErr[T]{items: batchCopy, errCh: make(chan error), completedCh: make(chan struct{})} - bvp.batch = bvp.batch[:0] - bvp.batchReady <- struct{}{} + bvp.sendBatch("timer") } else { - // Reset the timer if there are no items in the batch. - // If there are items in the batch, one of the workers will reset the timer. + bvp.log.Info("No items in batch, resetting timer") bvp.timer.Reset(bvp.o.BatchTimeout) } - bvp.batchMutex.Unlock() } } } +func (bvp *BatchItemProcessor[T]) sendBatch(reason string) { + log := bvp.log.WithField("reason", reason) + log.Infof("Creating a batch of %d items", len(bvp.batch)) + + batchCopy := make([]*T, len(bvp.batch)) + log.Infof("Copying batch items") + copy(batchCopy, bvp.batch) + log.Infof("Batch items copied") + + bvp.batches <- batchWithErr[T]{items: batchCopy, errCh: make(chan error), completedCh: make(chan struct{})} + log.Infof("Batch sent to batches channel") + + bvp.batch = bvp.batch[:0] + log.Infof("Batch cleared") + + bvp.batchReady <- struct{}{} + log.Infof("Signaled batchReady") +} + // worker removes items from the `queue` channel until processor // is shut down. It calls the exporter in batches of up to MaxExportBatchSize // waiting up to BatchTimeout to form a batch. -func (bvp *BatchItemProcessor[T]) worker(ctx context.Context) { +func (bvp *BatchItemProcessor[T]) worker(ctx context.Context, number int) { + bvp.log.Infof("Starting worker %d", number) + for { select { case <-bvp.stopWorkersCh: + bvp.log.Infof("Stopping worker %d", number) + return case <-bvp.batchReady: + bvp.log.Infof("Worker %d is processing a batch", number) + bvp.timer.Reset(bvp.o.BatchTimeout) batch := <-bvp.batches + bvp.log.Infof("Worker %d is exporting a batch of %d items", number, len(batch.items)) + if err := bvp.exportWithTimeout(ctx, batch.items); err != nil { bvp.log.WithError(err).Error("failed to export items") @@ -464,6 +500,8 @@ func (bvp *BatchItemProcessor[T]) worker(ctx context.Context) { } } + bvp.log.Infof("Batch completed by worker %d", number) + batch.completedCh <- struct{}{} } } @@ -472,17 +510,23 @@ func (bvp *BatchItemProcessor[T]) worker(ctx context.Context) { // drainQueue awaits the any caller that had added to bvp.stopWait // to finish the enqueue, then exports the final batch. func (bvp *BatchItemProcessor[T]) drainQueue() { + bvp.log.Info("Draining queue: waiting for the batch builder to pull all the items from the queue") + // Wait for the batch builder to send all remaining items to the workers. for len(bvp.queue) > 0 { time.Sleep(10 * time.Millisecond) } + bvp.log.Info("Draining queue: waiting for workers to finish processing batches") + // Wait for the workers to finish processing all batches. for len(bvp.batches) > 0 { batch := <-bvp.batches <-batch.completedCh } + bvp.log.Info("Draining queue: all batches finished") + // Close the batches channel since no more batches will be sent. close(bvp.batches) } diff --git a/pkg/processor/batch_test.go b/pkg/processor/batch_test.go index 1ac75bdd..8f24181e 100644 --- a/pkg/processor/batch_test.go +++ b/pkg/processor/batch_test.go @@ -276,7 +276,8 @@ func TestBatchItemProcessorShutdown(t *testing.T) { func TestBatchItemProcessorDrainQueue(t *testing.T) { be := testBatchExporter[TestItem]{} - bsp, err := NewBatchItemProcessor[TestItem](&be, "processor", nullLogger(), WithMaxExportBatchSize(5), WithBatchTimeout(5*time.Minute), WithWorkers(5)) + log := logrus.New() + bsp, err := NewBatchItemProcessor[TestItem](&be, "processor", log, WithMaxExportBatchSize(5), WithBatchTimeout(1*time.Second), WithWorkers(2), WithShippingMethod(ShippingMethodAsync)) require.NoError(t, err) itemsToExport := 50 From 840ea39ccbfd1a4770401abf0625b6421681e948 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Mon, 17 Jun 2024 12:38:38 +1000 Subject: [PATCH 06/10] Refactor workers --- pkg/processor/batch.go | 331 +++++++++++------------------------- pkg/processor/batch_test.go | 68 ++++---- 2 files changed, 136 insertions(+), 263 deletions(-) diff --git a/pkg/processor/batch.go b/pkg/processor/batch.go index e6a541de..b387418f 100644 --- a/pkg/processor/batch.go +++ b/pkg/processor/batch.go @@ -13,28 +13,12 @@ import ( "github.com/sirupsen/logrus" ) +// ItemExporter is an interface for exporting items. type ItemExporter[T any] interface { - // ExportItems exports a batch of items. - // - // This function is called synchronously, so there is no concurrency - // safety requirement. However, due to the synchronous calling pattern, - // it is critical that all timeouts and cancellations contained in the - // passed context must be honored. - // - // Any retry logic must be contained in this function. The SDK that - // calls this function will not implement any retry logic. All errors - // returned by this function are considered unrecoverable and will be - // reported to a configured error Handler. ExportItems(ctx context.Context, items []*T) error - - // Shutdown notifies the exporter of a pending halt to operations. The - // exporter is expected to preform any cleanup or synchronization it - // requires while honoring all timeouts and cancellations contained in - // the passed context. Shutdown(ctx context.Context) error } -// Defaults for BatchItemProcessorOptions. const ( DefaultMaxQueueSize = 51200 DefaultScheduleDelay = 5000 @@ -44,6 +28,7 @@ const ( DefaultNumWorkers = 1 ) +// ShippingMethod is the method of shipping items for export. type ShippingMethod string const ( @@ -52,38 +37,16 @@ const ( ShippingMethodSync ShippingMethod = "sync" ) -// BatchItemProcessorOption configures a BatchItemProcessor. +// BatchItemProcessorOption is a functional option for the batch item processor. type BatchItemProcessorOption func(o *BatchItemProcessorOptions) -// BatchItemProcessorOptions is configuration settings for a -// BatchItemProcessor. type BatchItemProcessorOptions struct { - // MaxQueueSize is the maximum queue size to buffer items for delayed processing. If the - // queue gets full it drops the items. - // The default value of MaxQueueSize is 51200. - MaxQueueSize int - - // BatchTimeout is the maximum duration for constructing a batch. Processor - // forcefully sends available items when timeout is reached. - // The default value of BatchTimeout is 5000 msec. - BatchTimeout time.Duration - - // ExportTimeout specifies the maximum duration for exporting items. If the timeout - // is reached, the export will be cancelled. - // The default value of ExportTimeout is 30000 msec. - ExportTimeout time.Duration - - // MaxExportBatchSize is the maximum number of items to process in a single batch. - // If there are more than one batch worth of items then it processes multiple batches - // of items one batch after the other without any delay. - // The default value of MaxExportBatchSize is 512. + MaxQueueSize int + BatchTimeout time.Duration + ExportTimeout time.Duration MaxExportBatchSize int - - // ShippingMethod is the method used to ship items to the exporter. - ShippingMethod ShippingMethod - - // Number of workers to process items. - Workers int + ShippingMethod ShippingMethod + Workers int } func (o *BatchItemProcessorOptions) Validate() error { @@ -102,26 +65,20 @@ func (o *BatchItemProcessorOptions) Validate() error { return nil } -// BatchItemProcessor is a buffer that batches asynchronously-received -// items and sends them to a exporter when complete. +// BatchItemProcessor is a processor that batches items for export. type BatchItemProcessor[T any] struct { e ItemExporter[T] o BatchItemProcessorOptions log logrus.FieldLogger - queue chan *T + queue chan traceableItem[T] + batchCh chan []traceableItem[T] dropped uint32 name string metrics *Metrics - batches chan batchWithErr[T] - batchReady chan struct{} - - batch []*T - batchMutex sync.Mutex - timer *time.Timer stopWait sync.WaitGroup stopOnce sync.Once @@ -129,16 +86,13 @@ type BatchItemProcessor[T any] struct { stopWorkersCh chan struct{} } -type batchWithErr[T any] struct { - items []*T +type traceableItem[T any] struct { + item *T errCh chan error completedCh chan struct{} } -// NewBatchItemProcessor creates a new ItemProcessor that will send completed -// item batches to the exporter with the supplied options. -// -// If the exporter is nil, the item processor will preform no action. +// NewBatchItemProcessor creates a new batch item processor. func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log logrus.FieldLogger, options ...BatchItemProcessorOption) (*BatchItemProcessor[T], error) { maxQueueSize := DefaultMaxQueueSize maxExportBatchSize := DefaultMaxExportBatchSize @@ -175,9 +129,9 @@ func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log log log: log, name: name, metrics: metrics, - batch: make([]*T, 0, o.MaxQueueSize), timer: time.NewTimer(o.BatchTimeout), - queue: make(chan *T, o.MaxQueueSize), + queue: make(chan traceableItem[T], o.MaxQueueSize), + batchCh: make(chan []traceableItem[T], o.Workers), stopCh: make(chan struct{}), stopWorkersCh: make(chan struct{}), } @@ -193,9 +147,6 @@ func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log log }, ).Info("Batch item processor initialized") - bvp.batches = make(chan batchWithErr[T], o.Workers) - bvp.batchReady = make(chan struct{}, 1) - bvp.stopWait.Add(o.Workers) for i := 0; i < o.Workers; i++ { @@ -206,107 +157,75 @@ func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log log } go func() { - bvp.batchBuilder(context.Background()) // Start building batches + bvp.batchBuilder(context.Background()) bvp.log.Info("Batch builder exited") }() return &bvp, nil } +// Write writes items to the queue. If the Processor is configured to use +// the sync shipping method, the items will be written to the queue and this +// function will return when all items have been processed. If the Processor is +// configured to use the async shipping method, the items will be written to +// the queue and this function will return immediately. func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error { _, span := observability.Tracer().Start(ctx, "BatchItemProcessor.Write") defer span.End() bvp.metrics.SetItemsQueued(bvp.name, float64(len(bvp.queue))) - if bvp.o.ShippingMethod == ShippingMethodSync { - return bvp.immediatelyExportItems(ctx, s) - } - - bvp.metrics.SetItemsQueued(bvp.name, float64(len(bvp.queue))) - if bvp.e == nil { return errors.New("exporter is nil") } - for _, i := range s { - bvp.enqueue(i) - } - - return nil -} - -// immediatelyExportItems immediately exports the items to the exporter. -// Useful for propagating errors from the exporter. -func (bvp *BatchItemProcessor[T]) immediatelyExportItems(ctx context.Context, items []*T) error { - _, span := observability.Tracer().Start(ctx, "BatchItemProcessor.ImmediatelyExportItems") - defer span.End() - - if len(items) == 0 { - return nil - } - - batchSize := bvp.o.MaxExportBatchSize - if batchSize == 0 { - batchSize = 1 - } - - errCh := make(chan error, bvp.o.Workers) - completedCh := make(chan struct{}, bvp.o.Workers) - pendingExports := 0 - - for i := 0; i < len(items); i += batchSize { - end := i + batchSize - if end > len(items) { - end = len(items) + // Break our items up in to chunks that can be processed at + // one time by our workers. This is to prevent wasting + // resources sending items if we've failed an earlier + // batch. + batchSize := bvp.o.Workers * bvp.o.MaxExportBatchSize + for start := 0; start < len(s); start += batchSize { + end := start + batchSize + if end > len(s) { + end = len(s) } - itemsBatch := items[i:end] - bvp.batches <- batchWithErr[T]{items: itemsBatch, errCh: errCh, completedCh: completedCh} - bvp.batchReady <- struct{}{} - - pendingExports++ - - // Only create a new batch if there are already bvp.o.Workers pending exports. - // We do this so we don't bother queueing up any more batches if - // a previous batch has failed. - if pendingExports >= bvp.o.Workers { - select { - case batchErr := <-errCh: - pendingExports-- + prepared := []traceableItem[T]{} + for _, i := range s[start:end] { + prepared = append(prepared, traceableItem[T]{ + item: i, + errCh: make(chan error, 1), + completedCh: make(chan struct{}, 1), + }) + } - if batchErr != nil { - return batchErr - } - case <-completedCh: - pendingExports-- - case <-ctx.Done(): - return ctx.Err() + for _, i := range prepared { + if !bvp.enqueueOrDrop(ctx, i.item, i.errCh, i.completedCh) { + return errors.New("failed to enqueue item - queue is full") } } - } - // Wait for remaining pending exports to complete - for pendingExports > 0 { - select { - case batchErr := <-errCh: - pendingExports-- - - if batchErr != nil { - return batchErr + if bvp.o.ShippingMethod == ShippingMethodSync { + for _, item := range prepared { + select { + case err := <-item.errCh: + if err != nil { + return err + } + case <-item.completedCh: + continue + case <-ctx.Done(): + return ctx.Err() + } } - case <-completedCh: - pendingExports-- - case <-ctx.Done(): - return ctx.Err() } } return nil } -// exportWithTimeout exports the items with a timeout. -func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBatch []*T) error { +// exportWithTimeout exports items with a timeout. +func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBatch []traceableItem[T]) error { if bvp.o.ExportTimeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, bvp.o.ExportTimeout) @@ -314,20 +233,32 @@ func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBa defer cancel() } - err := bvp.e.ExportItems(ctx, itemsBatch) + items := make([]*T, len(itemsBatch)) + for i, item := range itemsBatch { + items[i] = item.item + } + + err := bvp.e.ExportItems(ctx, items) if err != nil { bvp.metrics.IncItemsFailedBy(bvp.name, float64(len(itemsBatch))) - - return err + } else { + bvp.metrics.IncItemsExportedBy(bvp.name, float64(len(itemsBatch))) } - bvp.metrics.IncItemsExportedBy(bvp.name, float64(len(itemsBatch))) + for _, item := range itemsBatch { + if item.errCh != nil { + item.errCh <- err + } + + if item.completedCh != nil { + item.completedCh <- struct{}{} + } + } return nil } -// Shutdown flushes the queue and waits until all items are processed. -// It only executes once. Subsequent call does nothing. +// Shutdown shuts down the batch item processor. func (bvp *BatchItemProcessor[T]) Shutdown(ctx context.Context) error { var err error @@ -338,19 +269,14 @@ func (bvp *BatchItemProcessor[T]) Shutdown(ctx context.Context) error { close(bvp.stopCh) - // Drain the queue - bvp.drainQueue() - - // Stop the timer bvp.timer.Stop() - // Stop the workers + bvp.drainQueue() + close(bvp.stopWorkersCh) - // Wait for the workers to finish bvp.stopWait.Wait() - // Shutdown the exporter if bvp.e != nil { if err = bvp.e.Shutdown(ctx); err != nil { bvp.log.WithError(err).Error("failed to shutdown processor") @@ -359,7 +285,6 @@ func (bvp *BatchItemProcessor[T]) Shutdown(ctx context.Context) error { close(wait) }() - // Wait until the wait group is done or the context is cancelled select { case <-wait: case <-ctx.Done(): @@ -370,51 +295,36 @@ func (bvp *BatchItemProcessor[T]) Shutdown(ctx context.Context) error { return err } -// WithMaxQueueSize returns a BatchItemProcessorOption that configures the -// maximum queue size allowed for a BatchItemProcessor. func WithMaxQueueSize(size int) BatchItemProcessorOption { return func(o *BatchItemProcessorOptions) { o.MaxQueueSize = size } } -// WithMaxExportBatchSize returns a BatchItemProcessorOption that configures -// the maximum export batch size allowed for a BatchItemProcessor. func WithMaxExportBatchSize(size int) BatchItemProcessorOption { return func(o *BatchItemProcessorOptions) { o.MaxExportBatchSize = size } } -// WithBatchTimeout returns a BatchItemProcessorOption that configures the -// maximum delay allowed for a BatchItemProcessor before it will export any -// held item (whether the queue is full or not). func WithBatchTimeout(delay time.Duration) BatchItemProcessorOption { return func(o *BatchItemProcessorOptions) { o.BatchTimeout = delay } } -// WithExportTimeout returns a BatchItemProcessorOption that configures the -// amount of time a BatchItemProcessor waits for an exporter to export before -// abandoning the export. func WithExportTimeout(timeout time.Duration) BatchItemProcessorOption { return func(o *BatchItemProcessorOptions) { o.ExportTimeout = timeout } } -// WithExportTimeout returns a BatchItemProcessorOption that configures the -// amount of time a BatchItemProcessor waits for an exporter to export before -// abandoning the export. func WithShippingMethod(method ShippingMethod) BatchItemProcessorOption { return func(o *BatchItemProcessorOptions) { o.ShippingMethod = method } } -// WithWorkers returns a BatchItemProcessorOption that configures the -// number of workers to process items. func WithWorkers(workers int) BatchItemProcessorOption { return func(o *BatchItemProcessorOptions) { o.Workers = workers @@ -422,59 +332,48 @@ func WithWorkers(workers int) BatchItemProcessorOption { } func (bvp *BatchItemProcessor[T]) batchBuilder(ctx context.Context) { + log := bvp.log.WithField("module", "batch_builder") + + var batch []traceableItem[T] + for { select { case <-bvp.stopWorkersCh: - bvp.log.Info("Stopping batch builder") + log.Info("Stopping batch builder") return - case sd := <-bvp.queue: - bvp.log.Info("New item added to queue") + case item := <-bvp.queue: + batch = append(batch, item) - bvp.batchMutex.Lock() - bvp.batch = append(bvp.batch, sd) + if len(batch) >= bvp.o.MaxExportBatchSize { + bvp.sendBatch(batch, "max_export_batch_size") - if len(bvp.batch) >= bvp.o.MaxExportBatchSize { - bvp.sendBatch("max_export_batch_size") + batch = []traceableItem[T]{} } - bvp.batchMutex.Unlock() case <-bvp.timer.C: - bvp.batchMutex.Lock() - bvp.log.Info("Batch timeout reached") - - if len(bvp.batch) > 0 { - bvp.sendBatch("timer") + if len(batch) > 0 { + bvp.sendBatch(batch, "timer") + batch = []traceableItem[T]{} } else { - bvp.log.Info("No items in batch, resetting timer") bvp.timer.Reset(bvp.o.BatchTimeout) } - bvp.batchMutex.Unlock() } } } - -func (bvp *BatchItemProcessor[T]) sendBatch(reason string) { +func (bvp *BatchItemProcessor[T]) sendBatch(batch []traceableItem[T], reason string) { log := bvp.log.WithField("reason", reason) - log.Infof("Creating a batch of %d items", len(bvp.batch)) + log.Tracef("Creating a batch of %d items", len(batch)) - batchCopy := make([]*T, len(bvp.batch)) - log.Infof("Copying batch items") - copy(batchCopy, bvp.batch) - log.Infof("Batch items copied") + batchCopy := make([]traceableItem[T], len(batch)) + copy(batchCopy, batch) - bvp.batches <- batchWithErr[T]{items: batchCopy, errCh: make(chan error), completedCh: make(chan struct{})} - log.Infof("Batch sent to batches channel") + log.Tracef("Batch items copied") - bvp.batch = bvp.batch[:0] - log.Infof("Batch cleared") + bvp.batchCh <- batchCopy - bvp.batchReady <- struct{}{} - log.Infof("Signaled batchReady") + log.Tracef("Batch sent to batch channel") } -// worker removes items from the `queue` channel until processor -// is shut down. It calls the exporter in batches of up to MaxExportBatchSize -// waiting up to BatchTimeout to form a batch. func (bvp *BatchItemProcessor[T]) worker(ctx context.Context, number int) { bvp.log.Infof("Starting worker %d", number) @@ -484,56 +383,32 @@ func (bvp *BatchItemProcessor[T]) worker(ctx context.Context, number int) { bvp.log.Infof("Stopping worker %d", number) return - case <-bvp.batchReady: - bvp.log.Infof("Worker %d is processing a batch", number) - + case batch := <-bvp.batchCh: bvp.timer.Reset(bvp.o.BatchTimeout) - batch := <-bvp.batches - - bvp.log.Infof("Worker %d is exporting a batch of %d items", number, len(batch.items)) - if err := bvp.exportWithTimeout(ctx, batch.items); err != nil { + if err := bvp.exportWithTimeout(ctx, batch); err != nil { bvp.log.WithError(err).Error("failed to export items") - - if batch.errCh != nil { - batch.errCh <- err - } } - - bvp.log.Infof("Batch completed by worker %d", number) - - batch.completedCh <- struct{}{} } } } -// drainQueue awaits the any caller that had added to bvp.stopWait -// to finish the enqueue, then exports the final batch. func (bvp *BatchItemProcessor[T]) drainQueue() { bvp.log.Info("Draining queue: waiting for the batch builder to pull all the items from the queue") - // Wait for the batch builder to send all remaining items to the workers. for len(bvp.queue) > 0 { time.Sleep(10 * time.Millisecond) } bvp.log.Info("Draining queue: waiting for workers to finish processing batches") - // Wait for the workers to finish processing all batches. - for len(bvp.batches) > 0 { - batch := <-bvp.batches - <-batch.completedCh + for len(bvp.queue) > 0 { + <-bvp.queue } bvp.log.Info("Draining queue: all batches finished") - // Close the batches channel since no more batches will be sent. - close(bvp.batches) -} - -func (bvp *BatchItemProcessor[T]) enqueue(sd *T) { - ctx := context.TODO() - bvp.enqueueOrDrop(ctx, sd) + close(bvp.queue) } func recoverSendOnClosedChan() { @@ -549,7 +424,7 @@ func recoverSendOnClosedChan() { panic(x) } -func (bvp *BatchItemProcessor[T]) enqueueOrDrop(ctx context.Context, sd *T) bool { +func (bvp *BatchItemProcessor[T]) enqueueOrDrop(ctx context.Context, sd *T, errCh chan error, completedCh chan struct{}) bool { // This ensures the bvp.queue<- below does not panic as the // processor shuts down. defer recoverSendOnClosedChan() @@ -561,7 +436,7 @@ func (bvp *BatchItemProcessor[T]) enqueueOrDrop(ctx context.Context, sd *T) bool } select { - case bvp.queue <- sd: + case bvp.queue <- traceableItem[T]{item: sd, errCh: errCh, completedCh: completedCh}: return true default: atomic.AddUint32(&bvp.dropped, 1) diff --git a/pkg/processor/batch_test.go b/pkg/processor/batch_test.go index 8f24181e..72c7fbe4 100644 --- a/pkg/processor/batch_test.go +++ b/pkg/processor/batch_test.go @@ -119,23 +119,26 @@ type testOption struct { wantBatchCount int } -func TestNewBatchItemProcessorWithOptions(t *testing.T) { +func TestAsyncNewBatchItemProcessorWithOptions(t *testing.T) { schDelay := 100 * time.Millisecond options := []testOption{ { name: "default", o: []BatchItemProcessorOption{ - WithShippingMethod(ShippingMethodSync), + WithShippingMethod(ShippingMethodAsync), + WithBatchTimeout(10 * time.Millisecond), }, + writeNumItems: 2048, genNumItems: 2053, - wantNumItems: 2048, - wantBatchCount: 4, + wantNumItems: 2053, + wantBatchCount: 5, }, { name: "non-default BatchTimeout", o: []BatchItemProcessorOption{ WithBatchTimeout(schDelay), - WithShippingMethod(ShippingMethodSync), + WithShippingMethod(ShippingMethodAsync), + WithMaxExportBatchSize(512), }, writeNumItems: 2048, genNumItems: 2053, @@ -147,25 +150,13 @@ func TestNewBatchItemProcessorWithOptions(t *testing.T) { o: []BatchItemProcessorOption{ WithBatchTimeout(schDelay), WithMaxQueueSize(200), - WithShippingMethod(ShippingMethodSync), - }, - writeNumItems: 200, - genNumItems: 205, - wantNumItems: 205, - wantBatchCount: 1, - }, - { - name: "blocking option", - o: []BatchItemProcessorOption{ - WithBatchTimeout(schDelay), - WithMaxQueueSize(200), - WithMaxExportBatchSize(20), - WithShippingMethod(ShippingMethodSync), + WithMaxExportBatchSize(200), + WithShippingMethod(ShippingMethodAsync), }, writeNumItems: 200, genNumItems: 205, wantNumItems: 205, - wantBatchCount: 11, + wantBatchCount: 2, }, } @@ -175,11 +166,11 @@ func TestNewBatchItemProcessorWithOptions(t *testing.T) { ssp, err := createAndRegisterBatchSP(option.o, &te) if err != nil { - t.Fatalf("%s: Error creating new instance of BatchItemProcessor\n", option.name) + require.NoError(t, err) } if ssp == nil { - t.Fatalf("%s: Error creating new instance of BatchItemProcessor\n", option.name) + require.NoError(t, err) } for i := 0; i < option.genNumItems; i++ { @@ -190,7 +181,7 @@ func TestNewBatchItemProcessorWithOptions(t *testing.T) { if err := ssp.Write(context.Background(), []*TestItem{{ name: "test", }}); err != nil { - t.Errorf("%s: Error writing to BatchItemProcessor\n", option.name) + t.Errorf("%s: Error writing to BatchItemProcessor", option.name) } } @@ -198,15 +189,15 @@ func TestNewBatchItemProcessorWithOptions(t *testing.T) { gotNumOfItems := te.len() if option.wantNumItems > 0 && option.wantNumItems != gotNumOfItems { - t.Errorf("number of exported items: got %+v, want %+v\n", + t.Errorf("number of exported items: got %v, want %v", gotNumOfItems, option.wantNumItems) } gotBatchCount := te.getBatchCount() if option.wantBatchCount > 0 && gotBatchCount != option.wantBatchCount { - t.Errorf("number batches: got %+v, want >= %+v\n", + t.Errorf("number batches: got %v, want %v", gotBatchCount, option.wantBatchCount) - t.Errorf("Batches %v\n", te.sizes) + t.Errorf("Batches %v", te.sizes) } }) } @@ -280,7 +271,7 @@ func TestBatchItemProcessorDrainQueue(t *testing.T) { bsp, err := NewBatchItemProcessor[TestItem](&be, "processor", log, WithMaxExportBatchSize(5), WithBatchTimeout(1*time.Second), WithWorkers(2), WithShippingMethod(ShippingMethodAsync)) require.NoError(t, err) - itemsToExport := 50 + itemsToExport := 5000 for i := 0; i < itemsToExport; i++ { if err := bsp.Write(context.Background(), []*TestItem{{ @@ -313,11 +304,10 @@ func TestBatchItemProcessorPostShutdown(t *testing.T) { lenJustAfterShutdown := be.len() for i := 0; i < 60; i++ { - if err := bsp.Write(context.Background(), []*TestItem{{ + err := bsp.Write(context.Background(), []*TestItem{{ name: strconv.Itoa(i), - }}); err != nil { - t.Errorf("Error writing to BatchItemProcessor\n") - } + }}) + require.Error(t, err) } assert.Equal(t, lenJustAfterShutdown, be.len(), "Write should have no effect after Shutdown") @@ -494,8 +484,8 @@ func (ErrorItemExporter[T]) ExportItems(ctx context.Context, _ []*T) error { } // TestBatchItemProcessorWithSyncErrorExporter tests a processor with ShippingMethod = sync and an exporter that only returns errors. -func TestBatchItemProcessorWithAsyncErrorExporter(t *testing.T) { - bsp, err := NewBatchItemProcessor[TestItem](ErrorItemExporter[TestItem]{}, "processor", nullLogger(), WithShippingMethod(ShippingMethodSync)) +func TestBatchItemProcessorWithSyncErrorExporter(t *testing.T) { + bsp, err := NewBatchItemProcessor[TestItem](ErrorItemExporter[TestItem]{}, "processor", nullLogger(), WithShippingMethod(ShippingMethodSync), WithBatchTimeout(100*time.Millisecond)) if err != nil { t.Fatalf("failed to create batch processor: %v", err) } @@ -510,14 +500,22 @@ func TestBatchItemProcessorSyncShipping(t *testing.T) { // Define a range of values for workers, maxExportBatchSize, and itemsToExport workerCounts := []int{1, 5, 10} maxBatchSizes := []int{1, 5, 10, 20} - itemCounts := []int{0, 1, 25, 50} + itemCounts := []int{0, 1, 10, 25, 50} for _, workers := range workerCounts { for _, maxBatchSize := range maxBatchSizes { for _, itemsToExport := range itemCounts { t.Run(fmt.Sprintf("%d workers, batch size %d, %d items", workers, maxBatchSize, itemsToExport), func(t *testing.T) { te := testBatchExporter[TestItem]{} - bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(maxBatchSize), WithWorkers(workers), WithShippingMethod(ShippingMethodSync)) + bsp, err := NewBatchItemProcessor[TestItem]( + &te, + "processor", + logrus.New(), + WithMaxExportBatchSize(maxBatchSize), + WithWorkers(workers), + WithShippingMethod(ShippingMethodSync), + WithBatchTimeout(100*time.Millisecond), + ) require.NoError(t, err) items := make([]*TestItem, itemsToExport) From e1e7e94ab0987bacfe5a6c7d8fa7678362e6d709 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Mon, 17 Jun 2024 12:40:59 +1000 Subject: [PATCH 07/10] Add comments --- pkg/processor/batch.go | 42 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 37 insertions(+), 5 deletions(-) diff --git a/pkg/processor/batch.go b/pkg/processor/batch.go index b387418f..f69909f7 100644 --- a/pkg/processor/batch.go +++ b/pkg/processor/batch.go @@ -15,7 +15,23 @@ import ( // ItemExporter is an interface for exporting items. type ItemExporter[T any] interface { + // ExportItems exports a batch of items. + // + // This function is called synchronously, so there is no concurrency + // safety requirement. However, due to the synchronous calling pattern, + // it is critical that all timeouts and cancellations contained in the + // passed context must be honored. + // + // Any retry logic must be contained in this function. The SDK that + // calls this function will not implement any retry logic. All errors + // returned by this function are considered unrecoverable and will be + // reported to a configured error Handler. ExportItems(ctx context.Context, items []*T) error + + // Shutdown notifies the exporter of a pending halt to operations. The + // exporter is expected to preform any cleanup or synchronization it + // requires while honoring all timeouts and cancellations contained in + // the passed context. Shutdown(ctx context.Context) error } @@ -41,12 +57,28 @@ const ( type BatchItemProcessorOption func(o *BatchItemProcessorOptions) type BatchItemProcessorOptions struct { - MaxQueueSize int - BatchTimeout time.Duration - ExportTimeout time.Duration + // MaxQueueSize is the maximum queue size to buffer items for delayed processing. If the + // queue gets full it drops the items. + // The default value of MaxQueueSize is 51200. + MaxQueueSize int + // BatchTimeout is the maximum duration for constructing a batch. Processor + // forcefully sends available items when timeout is reached. + // The default value of BatchTimeout is 5000 msec. + BatchTimeout time.Duration + + // ExportTimeout specifies the maximum duration for exporting items. If the timeout + // is reached, the export will be cancelled. + // The default value of ExportTimeout is 30000 msec. + ExportTimeout time.Duration + // MaxExportBatchSize is the maximum number of items to include in a batch. + // The default value of MaxExportBatchSize is 512. MaxExportBatchSize int - ShippingMethod ShippingMethod - Workers int + // ShippingMethod is the method of shipping items for export. The default value + // of ShippingMethod is "async". + ShippingMethod ShippingMethod + // Workers is the number of workers to process batches. + // The default value of Workers is 1. + Workers int } func (o *BatchItemProcessorOptions) Validate() error { From 0a4f318538c4039007da3d2e32e0de92135488cb Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Mon, 17 Jun 2024 13:12:30 +1000 Subject: [PATCH 08/10] Close --- pkg/processor/batch.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/processor/batch.go b/pkg/processor/batch.go index f69909f7..039b1877 100644 --- a/pkg/processor/batch.go +++ b/pkg/processor/batch.go @@ -280,10 +280,12 @@ func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBa for _, item := range itemsBatch { if item.errCh != nil { item.errCh <- err + close(item.errCh) } if item.completedCh != nil { item.completedCh <- struct{}{} + close(item.completedCh) } } From 3863dcb82ee19c8f26e8bd42c86e54f2fe379163 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Mon, 17 Jun 2024 13:57:13 +1000 Subject: [PATCH 09/10] test: Add queue size and dropped events tests --- pkg/processor/batch_test.go | 54 +++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/pkg/processor/batch_test.go b/pkg/processor/batch_test.go index 72c7fbe4..934b491b 100644 --- a/pkg/processor/batch_test.go +++ b/pkg/processor/batch_test.go @@ -649,3 +649,57 @@ func TestBatchItemProcessorWithBatchTimeout(t *testing.T) { time.Sleep(2 * time.Second) require.Equal(t, itemsToExport, te.len(), "Expected all items to be exported after batch timeout") } + +func TestBatchItemProcessorQueueSize(t *testing.T) { + te := testBatchExporter[TestItem]{} + maxQueueSize := 5 + bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxQueueSize(maxQueueSize), WithMaxExportBatchSize(2), WithWorkers(1)) + require.NoError(t, err) + + itemsToExport := 10 + items := make([]*TestItem, itemsToExport) + + for i := 0; i < itemsToExport; i++ { + items[i] = &TestItem{name: strconv.Itoa(i)} + } + + // Write items to the processor + for i := 0; i < itemsToExport; i++ { + err := bsp.Write(context.Background(), []*TestItem{items[i]}) + if i < maxQueueSize { + require.NoError(t, err, "Expected no error for item %d", i) + } else { + require.Error(t, err, "Expected an error for item %d due to queue size limit", i) + } + } + + // Ensure that the queue size is respected + require.Equal(t, maxQueueSize, len(bsp.queue), "Queue size should be equal to maxQueueSize") +} + +func TestBatchItemProcessorDropEvents(t *testing.T) { + te := testBatchExporter[TestItem]{} + maxQueueSize := 5 + bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxQueueSize(maxQueueSize), WithMaxExportBatchSize(2), WithWorkers(1)) + require.NoError(t, err) + + itemsToExport := 10 + items := make([]*TestItem, itemsToExport) + + for i := 0; i < itemsToExport; i++ { + items[i] = &TestItem{name: strconv.Itoa(i)} + } + + // Write items to the processor + for i := 0; i < itemsToExport; i++ { + err := bsp.Write(context.Background(), []*TestItem{items[i]}) + if i < maxQueueSize { + require.NoError(t, err, "Expected no error for item %d", i) + } else { + require.Error(t, err, "Expected an error for item %d due to queue size limit", i) + } + } + + // Ensure that the dropped count is correct + require.Equal(t, uint32(itemsToExport-maxQueueSize), bsp.dropped, "Dropped count should be equal to the number of items that exceeded the queue size") +} From d94184976d788cb8279c9069dec6f123b9850322 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Mon, 17 Jun 2024 14:38:00 +1000 Subject: [PATCH 10/10] refactor: Improve error handling in enqueueOrDrop function --- pkg/processor/batch.go | 14 +++++++------- pkg/processor/batch_test.go | 29 ++--------------------------- 2 files changed, 9 insertions(+), 34 deletions(-) diff --git a/pkg/processor/batch.go b/pkg/processor/batch.go index 039b1877..b9de255b 100644 --- a/pkg/processor/batch.go +++ b/pkg/processor/batch.go @@ -232,8 +232,8 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error { } for _, i := range prepared { - if !bvp.enqueueOrDrop(ctx, i.item, i.errCh, i.completedCh) { - return errors.New("failed to enqueue item - queue is full") + if err := bvp.enqueueOrDrop(ctx, i); err != nil { + return err } } @@ -458,24 +458,24 @@ func recoverSendOnClosedChan() { panic(x) } -func (bvp *BatchItemProcessor[T]) enqueueOrDrop(ctx context.Context, sd *T, errCh chan error, completedCh chan struct{}) bool { +func (bvp *BatchItemProcessor[T]) enqueueOrDrop(ctx context.Context, item traceableItem[T]) error { // This ensures the bvp.queue<- below does not panic as the // processor shuts down. defer recoverSendOnClosedChan() select { case <-bvp.stopCh: - return false + return errors.New("processor is shutting down") default: } select { - case bvp.queue <- traceableItem[T]{item: sd, errCh: errCh, completedCh: completedCh}: - return true + case bvp.queue <- item: + return nil default: atomic.AddUint32(&bvp.dropped, 1) bvp.metrics.IncItemsDroppedBy(bvp.name, float64(1)) } - return false + return errors.New("queue is full") } diff --git a/pkg/processor/batch_test.go b/pkg/processor/batch_test.go index 934b491b..8a41ec73 100644 --- a/pkg/processor/batch_test.go +++ b/pkg/processor/batch_test.go @@ -651,9 +651,9 @@ func TestBatchItemProcessorWithBatchTimeout(t *testing.T) { } func TestBatchItemProcessorQueueSize(t *testing.T) { - te := testBatchExporter[TestItem]{} + te := indefiniteExporter[TestItem]{} maxQueueSize := 5 - bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxQueueSize(maxQueueSize), WithMaxExportBatchSize(2), WithWorkers(1)) + bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithBatchTimeout(10*time.Minute), WithMaxQueueSize(maxQueueSize), WithMaxExportBatchSize(maxQueueSize), WithWorkers(1), WithShippingMethod(ShippingMethodAsync)) require.NoError(t, err) itemsToExport := 10 @@ -675,31 +675,6 @@ func TestBatchItemProcessorQueueSize(t *testing.T) { // Ensure that the queue size is respected require.Equal(t, maxQueueSize, len(bsp.queue), "Queue size should be equal to maxQueueSize") -} - -func TestBatchItemProcessorDropEvents(t *testing.T) { - te := testBatchExporter[TestItem]{} - maxQueueSize := 5 - bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxQueueSize(maxQueueSize), WithMaxExportBatchSize(2), WithWorkers(1)) - require.NoError(t, err) - - itemsToExport := 10 - items := make([]*TestItem, itemsToExport) - - for i := 0; i < itemsToExport; i++ { - items[i] = &TestItem{name: strconv.Itoa(i)} - } - - // Write items to the processor - for i := 0; i < itemsToExport; i++ { - err := bsp.Write(context.Background(), []*TestItem{items[i]}) - if i < maxQueueSize { - require.NoError(t, err, "Expected no error for item %d", i) - } else { - require.Error(t, err, "Expected an error for item %d due to queue size limit", i) - } - } - // Ensure that the dropped count is correct require.Equal(t, uint32(itemsToExport-maxQueueSize), bsp.dropped, "Dropped count should be equal to the number of items that exceeded the queue size") }