Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(processor): Add worker support for sync workloads #331

Merged
merged 3 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 67 additions & 18 deletions pkg/processor/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,36 +205,85 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error {
}

// ImmediatelyExportItems immediately exports the items to the exporter.
// Useful for propogating errors from the exporter.
// Useful for propagating errors from the exporter.
func (bvp *BatchItemProcessor[T]) ImmediatelyExportItems(ctx context.Context, items []*T) error {
_, span := observability.Tracer().Start(ctx, "BatchItemProcessor.ImmediatelyExportItems")
defer span.End()

if l := len(items); l > 0 {
countItemsToExport := len(items)
if len(items) == 0 {
return nil
}

// Split the items in to chunks of our max batch size
for i := 0; i < countItemsToExport; i += bvp.o.MaxExportBatchSize {
end := i + bvp.o.MaxExportBatchSize
if end > countItemsToExport {
end = countItemsToExport
}
countItemsToExport := len(items)

itemsBatch := items[i:end]
batchSize := bvp.o.MaxExportBatchSize
if batchSize == 0 {
batchSize = 1 // Ensure we can't divide by zero
}

bvp.log.WithFields(logrus.Fields{
"count": len(itemsBatch),
}).Debug("Immediately exporting items")
batches := (countItemsToExport + batchSize - 1) / batchSize

err := bvp.exportWithTimeout(ctx, itemsBatch)
batchCh := make(chan []*T, batches)
errCh := make(chan error, 1)

if err != nil {
return err
}
defer close(errCh)

var wg sync.WaitGroup

cctx, cancel := context.WithCancel(ctx)
defer cancel()

for i := 0; i < countItemsToExport; i += batchSize {
end := i + batchSize
if end > countItemsToExport {
end = countItemsToExport
}

itemsBatch := items[i:end]
batchCh <- itemsBatch
}
close(batchCh) // Close the channel after all batches are sent

return nil
bvp.log.
WithField("workers", bvp.o.Workers).
WithField("batches", batches).
Debug("Split items into batches for immediate export")

for i := 0; i < bvp.o.Workers && i < batches; i++ {
wg.Add(1)

go func(workerID int) {
defer wg.Done()

for itemsBatch := range batchCh {
bvp.log.WithFields(logrus.Fields{
"count": len(itemsBatch),
"worker": workerID,
}).Debug("Immediately exporting items")

err := bvp.exportWithTimeout(cctx, itemsBatch)
if err != nil {
select {
case errCh <- err:
default:
}

cancel() // Cancel the context to stop other workers

return
}
}
}(i)
}

wg.Wait()

select {
case err := <-errCh:
return err
default:
return nil
}
}

// exportWithTimeout exports the items with a timeout.
Expand Down
98 changes: 89 additions & 9 deletions pkg/processor/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package processor
import (
"context"
"errors"
"fmt"
"io"
"strconv"
"sync"
Expand All @@ -25,18 +26,28 @@ type TestItem struct {
}

type testBatchExporter[T TestItem] struct {
mu sync.Mutex
items []*T
sizes []int
batchCount int
shutdownCount int
errors []error
droppedCount int
idx int
err error
mu sync.Mutex
items []*T
sizes []int
batchCount int
shutdownCount int
errors []error
droppedCount int
idx int
err error
failNextExport bool
delay time.Duration
}

func (t *testBatchExporter[T]) ExportItems(ctx context.Context, items []*T) error {
time.Sleep(t.delay)

if t.failNextExport {
t.failNextExport = false

return errors.New("export failure")
}

t.mu.Lock()
defer t.mu.Unlock()

Expand Down Expand Up @@ -487,3 +498,72 @@ func TestBatchItemProcessorWithAsyncErrorExporter(t *testing.T) {
t.Errorf("Expected write to fail")
}
}

func TestBatchItemProcessorImmediatelyExportItems(t *testing.T) {
// Define a range of values for workers, maxExportBatchSize, and itemsToExport
workerCounts := []int{1, 5, 10}
maxBatchSizes := []int{1, 5, 10, 20}
itemCounts := []int{0, 1, 25, 50}

for _, workers := range workerCounts {
for _, maxBatchSize := range maxBatchSizes {
for _, itemsToExport := range itemCounts {
t.Run(fmt.Sprintf("%d workers, batch size %d, %d items", workers, maxBatchSize, itemsToExport), func(t *testing.T) {
te := testBatchExporter[TestItem]{}
bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(maxBatchSize), WithWorkers(workers))
require.NoError(t, err)

items := make([]*TestItem, itemsToExport)
for i := 0; i < itemsToExport; i++ {
items[i] = &TestItem{name: strconv.Itoa(i)}
}

err = bsp.ImmediatelyExportItems(context.Background(), items)
require.NoError(t, err)

expectedBatches := (itemsToExport + maxBatchSize - 1) / maxBatchSize
if itemsToExport == 0 {
expectedBatches = 0
}

if te.len() != itemsToExport {
t.Errorf("Expected all items to be exported, got: %v", te.len())
}

if te.getBatchCount() != expectedBatches {
t.Errorf("Expected %v batches to be exported, got: %v", expectedBatches, te.getBatchCount())
}
})
}
}
}
}

func TestBatchItemProcessorExportCancellationOnFailure(t *testing.T) {
workers := 10
maxBatchSize := 10
itemsToExport := 5000

t.Run(fmt.Sprintf("%d workers, batch size %d, %d items with cancellation on failure", workers, maxBatchSize, itemsToExport), func(t *testing.T) {
te := testBatchExporter[TestItem]{
delay: 100 * time.Millisecond, // Introduce a delay to simulate processing time
}
bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(maxBatchSize), WithWorkers(workers))
require.NoError(t, err)

items := make([]*TestItem, itemsToExport)
for i := 0; i < itemsToExport; i++ {
items[i] = &TestItem{name: strconv.Itoa(i)}
}

// Simulate export failure and expect cancellation
te.failNextExport = true

err = bsp.ImmediatelyExportItems(context.Background(), items)
require.Error(t, err, "Expected an error due to simulated export failure")

// Ensure we exported less than the number of batches since the export should've
// stopped due to the failure.
require.Less(t, te.batchCount, itemsToExport/maxBatchSize)
})
}
Loading