From 2f91c49e503379093912a30aaf245a684207da66 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Mon, 23 Oct 2023 12:52:36 +1000 Subject: [PATCH 1/6] feat(processor): Add multiple worker support --- pkg/output/http/config.go | 1 + pkg/output/http/http.go | 1 + pkg/output/xatu/config.go | 1 + pkg/output/xatu/xatu.go | 1 + pkg/processor/batch.go | 228 ++++++++++++++++-------------------- pkg/processor/batch_test.go | 227 ++++++++++++++++++----------------- 6 files changed, 225 insertions(+), 234 deletions(-) diff --git a/pkg/output/http/config.go b/pkg/output/http/config.go index fb6a100e..d444b6d7 100644 --- a/pkg/output/http/config.go +++ b/pkg/output/http/config.go @@ -14,6 +14,7 @@ type Config struct { MaxExportBatchSize int `yaml:"maxExportBatchSize" default:"512"` Compression CompressionStrategy `yaml:"compression" default:"none"` KeepAlive *bool `yaml:"keepAlive" default:"true"` + Workers int `yaml:"workers" default:"1"` } func (c *Config) Validate() error { diff --git a/pkg/output/http/http.go b/pkg/output/http/http.go index 58bb2104..0b40fc33 100644 --- a/pkg/output/http/http.go +++ b/pkg/output/http/http.go @@ -46,6 +46,7 @@ func New(name string, config *Config, log logrus.FieldLogger, filterConfig *xatu processor.WithExportTimeout(config.ExportTimeout), processor.WithMaxExportBatchSize(config.MaxExportBatchSize), processor.WithShippingMethod(shippingMethod), + processor.WithWorkers(config.Workers), ) if err != nil { return nil, err diff --git a/pkg/output/xatu/config.go b/pkg/output/xatu/config.go index 432e2329..d8900f8d 100644 --- a/pkg/output/xatu/config.go +++ b/pkg/output/xatu/config.go @@ -13,6 +13,7 @@ type Config struct { BatchTimeout time.Duration `yaml:"batchTimeout" default:"5s"` ExportTimeout time.Duration `yaml:"exportTimeout" default:"30s"` MaxExportBatchSize int `yaml:"maxExportBatchSize" default:"512"` + Workers int `yaml:"workers" default:"1"` } func (c *Config) Validate() error { diff --git a/pkg/output/xatu/xatu.go b/pkg/output/xatu/xatu.go index 9ec75fb2..6c6dd801 100644 --- a/pkg/output/xatu/xatu.go +++ b/pkg/output/xatu/xatu.go @@ -46,6 +46,7 @@ func New(name string, config *Config, log logrus.FieldLogger, filterConfig *xatu processor.WithExportTimeout(config.ExportTimeout), processor.WithMaxExportBatchSize(config.MaxExportBatchSize), processor.WithShippingMethod(shippingMethod), + processor.WithWorkers(config.Workers), ) if err != nil { return nil, err diff --git a/pkg/processor/batch.go b/pkg/processor/batch.go index a265561a..3ec92e9f 100644 --- a/pkg/processor/batch.go +++ b/pkg/processor/batch.go @@ -46,6 +46,7 @@ const ( DefaultExportTimeout = 30000 DefaultMaxExportBatchSize = 512 DefaultShippingMethod = ShippingMethodAsync + DefaultNumWorkers = 1 ) type ShippingMethod string @@ -85,6 +86,9 @@ type BatchItemProcessorOptions struct { // ShippingMethod is the method used to ship items to the exporter. ShippingMethod ShippingMethod + + // Number of workers to process items. + Workers int } // BatchItemProcessor is a buffer that batches asynchronously-received @@ -101,12 +105,17 @@ type BatchItemProcessor[T any] struct { metrics *Metrics + batches chan []*T + batchReady chan bool + batch []*T batchMutex sync.Mutex - timer *time.Timer - stopWait sync.WaitGroup - stopOnce sync.Once - stopCh chan struct{} + + timer *time.Timer + stopWait sync.WaitGroup + stopOnce sync.Once + stopCh chan struct{} + stopWorkersCh chan struct{} } // NewBatchItemProcessor creates a new ItemProcessor that will send completed @@ -131,6 +140,7 @@ func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log log MaxQueueSize: maxQueueSize, MaxExportBatchSize: maxExportBatchSize, ShippingMethod: DefaultShippingMethod, + Workers: DefaultNumWorkers, } for _, opt := range options { opt(&o) @@ -139,24 +149,32 @@ func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log log metrics := DefaultMetrics bvp := BatchItemProcessor[T]{ - e: exporter, - o: o, - log: log, - name: name, - metrics: metrics, - batch: make([]*T, 0, o.MaxExportBatchSize), - timer: time.NewTimer(o.BatchTimeout), - queue: make(chan *T, o.MaxQueueSize), - stopCh: make(chan struct{}), + e: exporter, + o: o, + log: log, + name: name, + metrics: metrics, + batch: make([]*T, 0, o.MaxExportBatchSize), + timer: time.NewTimer(o.BatchTimeout), + queue: make(chan *T, o.MaxQueueSize), + stopCh: make(chan struct{}), + stopWorkersCh: make(chan struct{}), } - bvp.stopWait.Add(1) + bvp.batches = make(chan []*T, o.Workers) // Buffer the channel to hold batches for each worker + bvp.batchReady = make(chan bool, 1) - go func() { - defer bvp.stopWait.Done() - bvp.processQueue() - bvp.drainQueue() - }() + bvp.stopWait.Add(o.Workers) + + for i := 0; i < o.Workers; i++ { + go func() { + defer bvp.stopWait.Done() + + bvp.worker(context.Background()) + }() + } + + go bvp.batchBuilder(context.Background()) // Start building batches return &bvp, nil } @@ -243,16 +261,33 @@ func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBa func (bvp *BatchItemProcessor[T]) Shutdown(ctx context.Context) error { var err error + bvp.log.Debug("Shutting down processor") + bvp.stopOnce.Do(func() { wait := make(chan struct{}) go func() { + // Stop accepting new items close(bvp.stopCh) + + // Drain the queue + bvp.drainQueue() + + // Stop the timer + bvp.timer.Stop() + + // Stop the workers + close(bvp.stopWorkersCh) + + // Wait for the workers to finish bvp.stopWait.Wait() + + // Shutdown the exporter if bvp.e != nil { if err = bvp.e.Shutdown(ctx); err != nil { bvp.log.WithError(err).Error("failed to shutdown processor") } } + close(wait) }() // Wait until the wait group is done or the context is cancelled @@ -266,29 +301,6 @@ func (bvp *BatchItemProcessor[T]) Shutdown(ctx context.Context) error { return err } -// ForceFlush exports all ended items that have not yet been exported. -func (bvp *BatchItemProcessor[T]) ForceFlush(ctx context.Context) error { - var err error - - if bvp.e != nil { - wait := make(chan error) - - go func() { - wait <- bvp.exportItems(ctx) - close(wait) - }() - - // Wait until the export is finished or the context is cancelled/timed out - select { - case err = <-wait: - case <-ctx.Done(): - err = ctx.Err() - } - } - - return err -} - // WithMaxQueueSize returns a BatchItemProcessorOption that configures the // maximum queue size allowed for a BatchItemProcessor. func WithMaxQueueSize(size int) BatchItemProcessorOption { @@ -332,78 +344,62 @@ func WithShippingMethod(method ShippingMethod) BatchItemProcessorOption { } } -// exportItems is a subroutine of processing and draining the queue. -func (bvp *BatchItemProcessor[T]) exportItems(ctx context.Context) error { - bvp.timer.Reset(bvp.o.BatchTimeout) - - bvp.batchMutex.Lock() - defer bvp.batchMutex.Unlock() - - if bvp.o.ExportTimeout > 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, bvp.o.ExportTimeout) - - defer cancel() +// WithWorkers returns a BatchItemProcessorOption that configures the +// number of workers to process items. +func WithWorkers(workers int) BatchItemProcessorOption { + return func(o *BatchItemProcessorOptions) { + o.Workers = workers } +} - if l := len(bvp.batch); l > 0 { - countItemsToExport := len(bvp.batch) +func (bvp *BatchItemProcessor[T]) batchBuilder(ctx context.Context) { + for { + select { + case <-bvp.stopCh: + return + case sd := <-bvp.queue: + bvp.batchMutex.Lock() - bvp.log.WithFields(logrus.Fields{ - "count": countItemsToExport, - "total_dropped": atomic.LoadUint32(&bvp.dropped), - }).Debug("exporting items") + bvp.batch = append(bvp.batch, sd) - err := bvp.e.ExportItems(ctx, bvp.batch) + if len(bvp.batch) >= bvp.o.MaxExportBatchSize { + batchCopy := make([]*T, len(bvp.batch)) + copy(batchCopy, bvp.batch) + bvp.batches <- batchCopy + bvp.batch = bvp.batch[:0] + bvp.batchReady <- true + } - bvp.metrics.IncItemsExportedBy(bvp.name, float64(countItemsToExport)) + bvp.batchMutex.Unlock() + case <-bvp.timer.C: + bvp.batchMutex.Lock() - // A new batch is always created after exporting, even if the batch failed to be exported. - // - // It is up to the exporter to implement any type of retry logic if a batch is failing - // to be exported, since it is specific to the protocol and backend being sent to. - bvp.batch = bvp.batch[:0] + if len(bvp.batch) > 0 { + batchCopy := make([]*T, len(bvp.batch)) + copy(batchCopy, bvp.batch) + bvp.batches <- batchCopy + bvp.batch = bvp.batch[:0] + bvp.batchReady <- true + } - if err != nil { - return err + bvp.batchMutex.Unlock() } } - - return nil } -// processQueue removes items from the `queue` channel until processor +// worker removes items from the `queue` channel until processor // is shut down. It calls the exporter in batches of up to MaxExportBatchSize // waiting up to BatchTimeout to form a batch. -func (bvp *BatchItemProcessor[T]) processQueue() { - defer bvp.timer.Stop() - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - +func (bvp *BatchItemProcessor[T]) worker(ctx context.Context) { for { select { - case <-bvp.stopCh: + case <-bvp.stopWorkersCh: return - case <-bvp.timer.C: - if err := bvp.exportItems(ctx); err != nil { + case <-bvp.batchReady: + batch := <-bvp.batches + if err := bvp.exportWithTimeout(ctx, batch); err != nil { bvp.log.WithError(err).Error("failed to export items") } - case sd := <-bvp.queue: - bvp.batchMutex.Lock() - bvp.batch = append(bvp.batch, sd) - shouldExport := len(bvp.batch) >= bvp.o.MaxExportBatchSize - bvp.batchMutex.Unlock() - - if shouldExport { - if !bvp.timer.Stop() { - <-bvp.timer.C - } - - if err := bvp.exportItems(ctx); err != nil { - bvp.log.WithError(err).Error("failed to export items") - } - } } } } @@ -411,39 +407,23 @@ func (bvp *BatchItemProcessor[T]) processQueue() { // drainQueue awaits the any caller that had added to bvp.stopWait // to finish the enqueue, then exports the final batch. func (bvp *BatchItemProcessor[T]) drainQueue() { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - for { - select { - case sd := <-bvp.queue: - if sd == nil { - if err := bvp.exportItems(ctx); err != nil { - bvp.log.WithError(err).Error("failed to export items") - } - - return - } - - bvp.batchMutex.Lock() - bvp.batch = append(bvp.batch, sd) - shouldExport := len(bvp.batch) == bvp.o.MaxExportBatchSize - bvp.batchMutex.Unlock() + // Wait for the batch builder to send all remaining items to the workers. + for len(bvp.queue) > 0 { + time.Sleep(10 * time.Millisecond) + } - if shouldExport { - if err := bvp.exportItems(ctx); err != nil { - bvp.log.WithError(err).Error("failed to export items") - } - } - default: - close(bvp.queue) - } + // Wait for the workers to finish processing all batches. + for len(bvp.batches) > 0 { + time.Sleep(10 * time.Millisecond) } + + // Close the batches channel since no more batches will be sent. + close(bvp.batches) } func (bvp *BatchItemProcessor[T]) enqueue(sd *T) { ctx := context.TODO() - bvp.enqueueDrop(ctx, sd) + bvp.enqueueOrDrop(ctx, sd) } func recoverSendOnClosedChan() { @@ -459,7 +439,7 @@ func recoverSendOnClosedChan() { panic(x) } -func (bvp *BatchItemProcessor[T]) enqueueDrop(ctx context.Context, sd *T) bool { +func (bvp *BatchItemProcessor[T]) enqueueOrDrop(ctx context.Context, sd *T) bool { // This ensures the bvp.queue<- below does not panic as the // processor shuts down. defer recoverSendOnClosedChan() diff --git a/pkg/processor/batch_test.go b/pkg/processor/batch_test.go index 9e446320..9733e496 100644 --- a/pkg/processor/batch_test.go +++ b/pkg/processor/batch_test.go @@ -93,10 +93,6 @@ func TestNewBatchItemProcessorWithNilExporter(t *testing.T) { t.Errorf("failed to Write to the BatchItemProcessor: %v", err) } - if err := bsp.ForceFlush(context.Background()); err != nil { - t.Errorf("failed to ForceFlush the BatchItemProcessor: %v", err) - } - if err := bsp.Shutdown(context.Background()); err != nil { t.Errorf("failed to Shutdown the BatchItemProcessor: %v", err) } @@ -258,9 +254,29 @@ func TestBatchItemProcessorShutdown(t *testing.T) { assert.Equal(t, 1, bp.shutdownCount) } +func TestBatchItemProcessorDrainQueue(t *testing.T) { + be := testBatchExporter[TestItem]{} + bsp, err := NewBatchItemProcessor[TestItem](&be, "processor", nullLogger(), WithMaxExportBatchSize(5), WithBatchTimeout(5*time.Minute)) + require.NoError(t, err) + + itemsToExport := 500 + + for i := 0; i < itemsToExport; i++ { + if err := bsp.Write(context.Background(), []*TestItem{{ + name: strconv.Itoa(i), + }}); err != nil { + t.Errorf("Error writing to BatchItemProcessor\n") + } + } + + require.NoError(t, bsp.Shutdown(context.Background()), "shutting down BatchItemProcessor") + + assert.Equal(t, itemsToExport, be.len(), "Queue should have been drained on shutdown") +} + func TestBatchItemProcessorPostShutdown(t *testing.T) { be := testBatchExporter[TestItem]{} - bsp, err := NewBatchItemProcessor[TestItem](&be, "processor", nullLogger(), WithMaxExportBatchSize(50)) + bsp, err := NewBatchItemProcessor[TestItem](&be, "processor", nullLogger(), WithMaxExportBatchSize(50), WithBatchTimeout(5*time.Millisecond)) require.NoError(t, err) for i := 0; i < 60; i++ { @@ -275,137 +291,124 @@ func TestBatchItemProcessorPostShutdown(t *testing.T) { lenJustAfterShutdown := be.len() - assert.NoError(t, bsp.ForceFlush(context.Background()), "force flushing BatchItemProcessor") + for i := 0; i < 60; i++ { + if err := bsp.Write(context.Background(), []*TestItem{{ + name: strconv.Itoa(i), + }}); err != nil { + t.Errorf("Error writing to BatchItemProcessor\n") + } + } - assert.Equal(t, lenJustAfterShutdown, be.len(), "Write and ForceFlush should have no effect after Shutdown") + assert.Equal(t, lenJustAfterShutdown, be.len(), "Write should have no effect after Shutdown") } -func TestBatchItemProcessorForceFlushSucceeds(t *testing.T) { - te := testBatchExporter[TestItem]{} - option := testOption{ - name: "default BatchItemProcessorOptions", - o: []BatchItemProcessorOption{ - WithMaxQueueSize(0), - WithMaxExportBatchSize(3000), - }, - genNumItems: 2053, - wantNumItems: 2053, - wantBatchCount: 1, - } +type slowExporter[T TestItem] struct { + itemsExported int +} - ssp, err := createAndRegisterBatchSP(option.o, &te) - if err != nil { - t.Fatalf("%s: Error creating new instance of BatchItemProcessor\n", option.name) - } +func (slowExporter[T]) Shutdown(context.Context) error { return nil } +func (t *slowExporter[T]) ExportItems(ctx context.Context, items []*T) error { + time.Sleep(100 * time.Millisecond) - if ssp == nil { - t.Fatalf("%s: Error creating new instance of BatchItemProcessor\n", option.name) - } + t.itemsExported += len(items) - for i := 0; i < option.genNumItems; i++ { - if errr := ssp.Write(context.Background(), []*TestItem{{ - name: strconv.Itoa(i), - }}); errr != nil { - t.Errorf("%s: Error writing to BatchItemProcessor\n", option.name) - } + <-ctx.Done() - time.Sleep(1 * time.Millisecond) - } + return ctx.Err() +} - // Force flush any held item batches - err = ssp.ForceFlush(context.Background()) +func TestMultipleWorkersConsumeConcurrently(t *testing.T) { + te := slowExporter[TestItem]{} + bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(10), WithBatchTimeout(5*time.Minute), WithWorkers(20)) + require.NoError(t, err) - assertMaxItemDiff(t, te.len(), option.wantNumItems, 10) + itemsToExport := 100 - gotBatchCount := te.getBatchCount() - if gotBatchCount < option.wantBatchCount { - t.Errorf("number batches: got %+v, want >= %+v\n", - gotBatchCount, option.wantBatchCount) - t.Errorf("Batches %v\n", te.sizes) + for i := 0; i < itemsToExport; i++ { + if err := bsp.Write(context.Background(), []*TestItem{{name: strconv.Itoa(i)}}); err != nil { + t.Errorf("Error writing to BatchItemProcessor\n") + } } - assert.NoError(t, err) -} + time.Sleep(1 * time.Second) // give some time for workers to process -func TestBatchItemProcessorDropBatchIfFailed(t *testing.T) { - te := testBatchExporter[TestItem]{ - errors: []error{errors.New("fail to export")}, - } - option := testOption{ - o: []BatchItemProcessorOption{ - WithMaxQueueSize(0), - WithMaxExportBatchSize(2000), - }, - genNumItems: 1000, - wantNumItems: 1000, - wantBatchCount: 1, + if te.itemsExported != itemsToExport { + t.Errorf("Expected all items to be exported, got: %v", te.itemsExported) } +} - ssp, err := createAndRegisterBatchSP(option.o, &te) - if err != nil { - t.Fatalf("%s: Error creating new instance of BatchItemProcessor\n", option.name) - } +func TestWorkersProcessBatches(t *testing.T) { + te := testBatchExporter[TestItem]{} + bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(10), WithWorkers(5)) + require.NoError(t, err) - if ssp == nil { - t.Fatalf("%s: Error creating new instance of BatchItemProcessor\n", option.name) - } + itemsToExport := 50 - for i := 0; i < option.genNumItems; i++ { - if errr := ssp.Write(context.Background(), []*TestItem{{ - name: strconv.Itoa(i), - }}); errr != nil { - t.Errorf("%s: Error writing to BatchItemProcessor\n", option.name) + for i := 0; i < itemsToExport; i++ { + if err := bsp.Write(context.Background(), []*TestItem{{name: strconv.Itoa(i)}}); err != nil { + t.Errorf("Error writing to BatchItemProcessor\n") } + } - time.Sleep(1 * time.Millisecond) + time.Sleep(1 * time.Second) + + if te.getBatchCount() != 5 { + t.Errorf("Expected 5 batches, got: %v", te.getBatchCount()) } - // Force flush any held item batches - err = ssp.ForceFlush(context.Background()) - assert.Error(t, err) - assert.EqualError(t, err, "fail to export") + if te.len() != itemsToExport { + t.Errorf("Expected all items to be exported, got: %v", te.len()) + } +} - // First flush will fail, nothing should be exported. - assertMaxItemDiff(t, te.droppedCount, option.wantNumItems, 10) - assert.Equal(t, 0, te.len()) - assert.Equal(t, 0, te.getBatchCount()) +func TestDrainQueueWithMultipleWorkers(t *testing.T) { + te := testBatchExporter[TestItem]{} + bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(10), WithWorkers(5)) + require.NoError(t, err) - // Generate a new batch, this will succeed - for i := 0; i < option.genNumItems; i++ { - if errr := ssp.Write(context.Background(), []*TestItem{{ - name: strconv.Itoa(i), - }}); errr != nil { - t.Errorf("%s: Error writing to BatchItemProcessor\n", option.name) + itemsToExport := 100 + + for i := 0; i < itemsToExport; i++ { + if err := bsp.Write(context.Background(), []*TestItem{{name: strconv.Itoa(i)}}); err != nil { + t.Errorf("Error writing to BatchItemProcessor\n") } + } - time.Sleep(1 * time.Millisecond) + if err := bsp.Shutdown(context.Background()); err != nil { + t.Errorf("Error shutting down BatchItemProcessor\n") } - // Force flush any held item batches - err = ssp.ForceFlush(context.Background()) - assert.NoError(t, err) + if te.len() != itemsToExport { + t.Errorf("Expected all items to be exported after drain, got: %v", te.len()) + } +} + +func TestBatchItemProcessorTimerFunctionality(t *testing.T) { + te := testBatchExporter[TestItem]{} + batchTimeout := 500 * time.Millisecond + bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(50), WithBatchTimeout(batchTimeout), WithWorkers(5)) + require.NoError(t, err) - assertMaxItemDiff(t, te.len(), option.wantNumItems, 10) - gotBatchCount := te.getBatchCount() + // Add items less than the max batch size + itemsToExport := 25 - if gotBatchCount < option.wantBatchCount { - t.Errorf("number batches: got %+v, want >= %+v\n", - gotBatchCount, option.wantBatchCount) - t.Errorf("Batches %v\n", te.sizes) + for i := 0; i < itemsToExport; i++ { + if err := bsp.Write(context.Background(), []*TestItem{{name: strconv.Itoa(i)}}); err != nil { + t.Errorf("Error writing to BatchItemProcessor\n") + } } -} -func assertMaxItemDiff(t *testing.T, got, want, maxDif int) { - t.Helper() + // Wait for more than the batchTimeout duration + time.Sleep(batchTimeout + 100*time.Millisecond) - itemDifference := want - got - if itemDifference < 0 { - itemDifference *= -1 + // Check if items have been exported due to timer trigger + if te.len() != itemsToExport { + t.Errorf("Expected %v items to be exported due to timer, but got: %v", itemsToExport, te.len()) } - if itemDifference > maxDif { - t.Errorf("number of exported item not equal to or within %d less than: got %+v, want %+v\n", - maxDif, got, want) + // Ensure that it was exported as a single batch + if te.getBatchCount() != 1 { + t.Errorf("Expected 1 batch to be exported due to timer, but got: %v", te.getBatchCount()) } } @@ -418,33 +421,37 @@ func (indefiniteExporter[T]) ExportItems(ctx context.Context, _ []*T) error { return ctx.Err() } -func TestBatchItemProcessorForceFlushTimeout(t *testing.T) { +func TestBatchItemProcessorTimeout(t *testing.T) { // Add timeout to context to test deadline - ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond) + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100) defer cancel() <-ctx.Done() - bsp, err := NewBatchItemProcessor[TestItem](indefiniteExporter[TestItem]{}, "processor", nullLogger()) + bsp, err := NewBatchItemProcessor[TestItem](indefiniteExporter[TestItem]{}, "processor", nullLogger(), WithShippingMethod(ShippingMethodSync), WithExportTimeout(time.Millisecond*10)) if err != nil { t.Fatalf("failed to create batch processor: %v", err) } - if got, want := bsp.ForceFlush(ctx), context.DeadlineExceeded; !errors.Is(got, want) { + if got, want := bsp.Write(ctx, []*TestItem{{}}), context.DeadlineExceeded; !errors.Is(got, want) { t.Errorf("expected %q error, got %v", want, got) } } -func TestBatchItemProcessorForceFlushCancellation(t *testing.T) { +func TestBatchItemProcessorCancellation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) // Cancel the context - cancel() + go func() { + time.Sleep(time.Millisecond * 100) + + cancel() + }() - bsp, err := NewBatchItemProcessor[TestItem](indefiniteExporter[TestItem]{}, "processor", nullLogger()) + bsp, err := NewBatchItemProcessor[TestItem](indefiniteExporter[TestItem]{}, "processor", nullLogger(), WithShippingMethod(ShippingMethodSync)) if err != nil { t.Fatalf("failed to create batch processor: %v", err) } - if got, want := bsp.ForceFlush(ctx), context.Canceled; !errors.Is(got, want) { + if got, want := bsp.Write(ctx, []*TestItem{{}}), context.Canceled; !errors.Is(got, want) { t.Errorf("expected %q error, got %v", want, got) } } From 93b43a6bb0edb7199a57d3b31ffedf31d7040d36 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Mon, 23 Oct 2023 13:27:34 +1000 Subject: [PATCH 2/6] chore: Update alpha-releases.yaml --- .github/workflows/alpha-releases.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/alpha-releases.yaml b/.github/workflows/alpha-releases.yaml index 3214a9b7..29246fdd 100644 --- a/.github/workflows/alpha-releases.yaml +++ b/.github/workflows/alpha-releases.yaml @@ -6,9 +6,9 @@ on: - 'release/*' jobs: - permissions: - contents: write tag-release: + permissions: + contents: write runs-on: ubuntu-20.04 steps: - name: Checkout From 0c5054ac64b61cbdf6eeb4e539edc7bf4d4d2cb2 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Mon, 23 Oct 2023 13:55:01 +1000 Subject: [PATCH 3/6] refactor: Simplify batch timeout reset in worker function --- pkg/processor/batch.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/processor/batch.go b/pkg/processor/batch.go index 3ec92e9f..a5e6b608 100644 --- a/pkg/processor/batch.go +++ b/pkg/processor/batch.go @@ -355,7 +355,7 @@ func WithWorkers(workers int) BatchItemProcessorOption { func (bvp *BatchItemProcessor[T]) batchBuilder(ctx context.Context) { for { select { - case <-bvp.stopCh: + case <-bvp.stopWorkersCh: return case sd := <-bvp.queue: bvp.batchMutex.Lock() @@ -396,6 +396,8 @@ func (bvp *BatchItemProcessor[T]) worker(ctx context.Context) { case <-bvp.stopWorkersCh: return case <-bvp.batchReady: + bvp.timer.Reset(bvp.o.BatchTimeout) + batch := <-bvp.batches if err := bvp.exportWithTimeout(ctx, batch); err != nil { bvp.log.WithError(err).Error("failed to export items") From 6ff8770e8fc05d89cf61b6fd9a2807797d406481 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Mon, 23 Oct 2023 13:57:57 +1000 Subject: [PATCH 4/6] refactor: Reset timer in batchBuilder --- pkg/processor/batch.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/processor/batch.go b/pkg/processor/batch.go index a5e6b608..8baed6e3 100644 --- a/pkg/processor/batch.go +++ b/pkg/processor/batch.go @@ -382,6 +382,8 @@ func (bvp *BatchItemProcessor[T]) batchBuilder(ctx context.Context) { bvp.batchReady <- true } + bvp.timer.Reset(bvp.o.BatchTimeout) + bvp.batchMutex.Unlock() } } From a4083b1b1b432102b11f233c8de6a7c4dca13861 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Mon, 23 Oct 2023 14:07:34 +1000 Subject: [PATCH 5/6] refactor(batch): Reset timer when there are no items in the batch --- pkg/processor/batch.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/processor/batch.go b/pkg/processor/batch.go index 8baed6e3..5256fa59 100644 --- a/pkg/processor/batch.go +++ b/pkg/processor/batch.go @@ -380,10 +380,12 @@ func (bvp *BatchItemProcessor[T]) batchBuilder(ctx context.Context) { bvp.batches <- batchCopy bvp.batch = bvp.batch[:0] bvp.batchReady <- true + } else { + // Reset the timer if there are no items in the batch. + // If there are items in the batch, one of the workers will reset the timer. + bvp.timer.Reset(bvp.o.BatchTimeout) } - bvp.timer.Reset(bvp.o.BatchTimeout) - bvp.batchMutex.Unlock() } } From df35d80be1fc94b57749f12bd63d6788ac930e68 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Mon, 23 Oct 2023 14:19:42 +1000 Subject: [PATCH 6/6] refactor: Remove redundant code --- pkg/processor/batch.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/processor/batch.go b/pkg/processor/batch.go index 5256fa59..632539ee 100644 --- a/pkg/processor/batch.go +++ b/pkg/processor/batch.go @@ -228,8 +228,6 @@ func (bvp *BatchItemProcessor[T]) ImmediatelyExportItems(ctx context.Context, it err := bvp.exportWithTimeout(ctx, itemsBatch) - bvp.metrics.IncItemsExportedBy(bvp.name, float64(len(itemsBatch))) - if err != nil { return err } @@ -248,6 +246,8 @@ func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBa defer cancel() } + bvp.metrics.IncItemsExportedBy(bvp.name, float64(len(itemsBatch))) + err := bvp.e.ExportItems(ctx, itemsBatch) if err != nil { return err