Skip to content

Commit

Permalink
feat(processor): Add worker support for sync workloads (#331)
Browse files Browse the repository at this point in the history
* feat(processor): Add worker support for sync workloads

* Cancellable

* test: Add unit tests for batch item processor
  • Loading branch information
samcm committed Jun 13, 2024
1 parent 1f75a9d commit eeee715
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 27 deletions.
85 changes: 67 additions & 18 deletions pkg/processor/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,36 +205,85 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error {
}

// ImmediatelyExportItems immediately exports the items to the exporter.
// Useful for propogating errors from the exporter.
// Useful for propagating errors from the exporter.
func (bvp *BatchItemProcessor[T]) ImmediatelyExportItems(ctx context.Context, items []*T) error {
_, span := observability.Tracer().Start(ctx, "BatchItemProcessor.ImmediatelyExportItems")
defer span.End()

if l := len(items); l > 0 {
countItemsToExport := len(items)
if len(items) == 0 {
return nil
}

// Split the items in to chunks of our max batch size
for i := 0; i < countItemsToExport; i += bvp.o.MaxExportBatchSize {
end := i + bvp.o.MaxExportBatchSize
if end > countItemsToExport {
end = countItemsToExport
}
countItemsToExport := len(items)

itemsBatch := items[i:end]
batchSize := bvp.o.MaxExportBatchSize
if batchSize == 0 {
batchSize = 1 // Ensure we can't divide by zero
}

bvp.log.WithFields(logrus.Fields{
"count": len(itemsBatch),
}).Debug("Immediately exporting items")
batches := (countItemsToExport + batchSize - 1) / batchSize

err := bvp.exportWithTimeout(ctx, itemsBatch)
batchCh := make(chan []*T, batches)
errCh := make(chan error, 1)

if err != nil {
return err
}
defer close(errCh)

var wg sync.WaitGroup

cctx, cancel := context.WithCancel(ctx)
defer cancel()

for i := 0; i < countItemsToExport; i += batchSize {
end := i + batchSize
if end > countItemsToExport {
end = countItemsToExport
}

itemsBatch := items[i:end]
batchCh <- itemsBatch
}
close(batchCh) // Close the channel after all batches are sent

return nil
bvp.log.
WithField("workers", bvp.o.Workers).
WithField("batches", batches).
Debug("Split items into batches for immediate export")

for i := 0; i < bvp.o.Workers && i < batches; i++ {
wg.Add(1)

go func(workerID int) {
defer wg.Done()

for itemsBatch := range batchCh {
bvp.log.WithFields(logrus.Fields{
"count": len(itemsBatch),
"worker": workerID,
}).Debug("Immediately exporting items")

err := bvp.exportWithTimeout(cctx, itemsBatch)
if err != nil {
select {
case errCh <- err:
default:
}

cancel() // Cancel the context to stop other workers

return
}
}
}(i)
}

wg.Wait()

select {
case err := <-errCh:
return err
default:
return nil
}
}

// exportWithTimeout exports the items with a timeout.
Expand Down
98 changes: 89 additions & 9 deletions pkg/processor/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package processor
import (
"context"
"errors"
"fmt"
"io"
"strconv"
"sync"
Expand All @@ -25,18 +26,28 @@ type TestItem struct {
}

type testBatchExporter[T TestItem] struct {
mu sync.Mutex
items []*T
sizes []int
batchCount int
shutdownCount int
errors []error
droppedCount int
idx int
err error
mu sync.Mutex
items []*T
sizes []int
batchCount int
shutdownCount int
errors []error
droppedCount int
idx int
err error
failNextExport bool
delay time.Duration
}

func (t *testBatchExporter[T]) ExportItems(ctx context.Context, items []*T) error {
time.Sleep(t.delay)

if t.failNextExport {
t.failNextExport = false

return errors.New("export failure")
}

t.mu.Lock()
defer t.mu.Unlock()

Expand Down Expand Up @@ -487,3 +498,72 @@ func TestBatchItemProcessorWithAsyncErrorExporter(t *testing.T) {
t.Errorf("Expected write to fail")
}
}

func TestBatchItemProcessorImmediatelyExportItems(t *testing.T) {
// Define a range of values for workers, maxExportBatchSize, and itemsToExport
workerCounts := []int{1, 5, 10}
maxBatchSizes := []int{1, 5, 10, 20}
itemCounts := []int{0, 1, 25, 50}

for _, workers := range workerCounts {
for _, maxBatchSize := range maxBatchSizes {
for _, itemsToExport := range itemCounts {
t.Run(fmt.Sprintf("%d workers, batch size %d, %d items", workers, maxBatchSize, itemsToExport), func(t *testing.T) {
te := testBatchExporter[TestItem]{}
bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(maxBatchSize), WithWorkers(workers))
require.NoError(t, err)

items := make([]*TestItem, itemsToExport)
for i := 0; i < itemsToExport; i++ {
items[i] = &TestItem{name: strconv.Itoa(i)}
}

err = bsp.ImmediatelyExportItems(context.Background(), items)
require.NoError(t, err)

expectedBatches := (itemsToExport + maxBatchSize - 1) / maxBatchSize
if itemsToExport == 0 {
expectedBatches = 0
}

if te.len() != itemsToExport {
t.Errorf("Expected all items to be exported, got: %v", te.len())
}

if te.getBatchCount() != expectedBatches {
t.Errorf("Expected %v batches to be exported, got: %v", expectedBatches, te.getBatchCount())
}
})
}
}
}
}

func TestBatchItemProcessorExportCancellationOnFailure(t *testing.T) {
workers := 10
maxBatchSize := 10
itemsToExport := 5000

t.Run(fmt.Sprintf("%d workers, batch size %d, %d items with cancellation on failure", workers, maxBatchSize, itemsToExport), func(t *testing.T) {
te := testBatchExporter[TestItem]{
delay: 100 * time.Millisecond, // Introduce a delay to simulate processing time
}
bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(maxBatchSize), WithWorkers(workers))
require.NoError(t, err)

items := make([]*TestItem, itemsToExport)
for i := 0; i < itemsToExport; i++ {
items[i] = &TestItem{name: strconv.Itoa(i)}
}

// Simulate export failure and expect cancellation
te.failNextExport = true

err = bsp.ImmediatelyExportItems(context.Background(), items)
require.Error(t, err, "Expected an error due to simulated export failure")

// Ensure we exported less than the number of batches since the export should've
// stopped due to the failure.
require.Less(t, te.batchCount, itemsToExport/maxBatchSize)
})
}

0 comments on commit eeee715

Please sign in to comment.