From dd9842c499e0fa45438303736ab4172ae6fa10c4 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Mon, 17 Jun 2024 16:15:41 +1000 Subject: [PATCH] fix(processor): Memory usage in workers (#335) * feat(processor): Add worker support for sync workloads * Cancellable * test: Add unit tests for batch item processor * feat: Static workers * more debugging * Refactor workers * Add comments * Close * test: Add queue size and dropped events tests * refactor: Improve error handling in enqueueOrDrop function * refactor: Remove duplicate fields and unused variables in testBatchExporter * refactor: Modify batch processing preparation for shipping method * refactor: Update default value of Workers to 5 --- pkg/processor/batch.go | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/pkg/processor/batch.go b/pkg/processor/batch.go index b9de255b..1e5ad380 100644 --- a/pkg/processor/batch.go +++ b/pkg/processor/batch.go @@ -41,7 +41,7 @@ const ( DefaultExportTimeout = 30000 DefaultMaxExportBatchSize = 512 DefaultShippingMethod = ShippingMethodAsync - DefaultNumWorkers = 1 + DefaultNumWorkers = 5 ) // ShippingMethod is the method of shipping items for export. @@ -77,7 +77,7 @@ type BatchItemProcessorOptions struct { // of ShippingMethod is "async". ShippingMethod ShippingMethod // Workers is the number of workers to process batches. - // The default value of Workers is 1. + // The default value of Workers is 5. Workers int } @@ -223,12 +223,18 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error { } prepared := []traceableItem[T]{} + for _, i := range s[start:end] { - prepared = append(prepared, traceableItem[T]{ - item: i, - errCh: make(chan error, 1), - completedCh: make(chan struct{}, 1), - }) + item := traceableItem[T]{ + item: i, + } + + if bvp.o.ShippingMethod == ShippingMethodSync { + item.errCh = make(chan error, 1) + item.completedCh = make(chan struct{}, 1) + } + + prepared = append(prepared, item) } for _, i := range prepared {