Skip to content

Commit

Permalink
fix(processor): Memory usage in workers (#335)
Browse files Browse the repository at this point in the history
* feat(processor): Add worker support for sync workloads

* Cancellable

* test: Add unit tests for batch item processor

* feat: Static workers

* more debugging

* Refactor workers

* Add comments

* Close

* test: Add queue size and dropped events tests

* refactor: Improve error handling in enqueueOrDrop function

* refactor: Remove duplicate fields and unused variables in testBatchExporter

* refactor: Modify batch processing preparation for shipping method

* refactor: Update default value of Workers to 5
  • Loading branch information
samcm committed Jun 17, 2024
1 parent 6789a1b commit dd9842c
Showing 1 changed file with 13 additions and 7 deletions.
20 changes: 13 additions & 7 deletions pkg/processor/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const (
DefaultExportTimeout = 30000
DefaultMaxExportBatchSize = 512
DefaultShippingMethod = ShippingMethodAsync
DefaultNumWorkers = 1
DefaultNumWorkers = 5
)

// ShippingMethod is the method of shipping items for export.
Expand Down Expand Up @@ -77,7 +77,7 @@ type BatchItemProcessorOptions struct {
// of ShippingMethod is "async".
ShippingMethod ShippingMethod
// Workers is the number of workers to process batches.
// The default value of Workers is 1.
// The default value of Workers is 5.
Workers int
}

Expand Down Expand Up @@ -223,12 +223,18 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error {
}

prepared := []traceableItem[T]{}

for _, i := range s[start:end] {
prepared = append(prepared, traceableItem[T]{
item: i,
errCh: make(chan error, 1),
completedCh: make(chan struct{}, 1),
})
item := traceableItem[T]{
item: i,
}

if bvp.o.ShippingMethod == ShippingMethodSync {
item.errCh = make(chan error, 1)
item.completedCh = make(chan struct{}, 1)
}

prepared = append(prepared, item)
}

for _, i := range prepared {
Expand Down

0 comments on commit dd9842c

Please sign in to comment.