Skip to content

Commit

Permalink
Cancellable
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm committed Jun 12, 2024
1 parent 50034ea commit 1cb073a
Showing 1 changed file with 8 additions and 4 deletions.
12 changes: 8 additions & 4 deletions pkg/processor/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,13 @@ func (bvp *BatchItemProcessor[T]) ImmediatelyExportItems(ctx context.Context, it
if l := len(items); l > 0 {
countItemsToExport := len(items)

// Split the items in to chunks of our max batch size
// Split the items into chunks of our max batch size
batchCh := make(chan []*T, bvp.o.Workers)
errCh := make(chan error, bvp.o.Workers)

var wg sync.WaitGroup
ctx, cancel := context.WithCancel(ctx)

Check failure on line 221 in pkg/processor/batch.go

View workflow job for this annotation

GitHub Actions / lint

shadow: declaration of "ctx" shadows declaration at line 209 (govet)
defer cancel()

Check failure on line 222 in pkg/processor/batch.go

View workflow job for this annotation

GitHub Actions / lint

only one cuddle assignment allowed before defer statement (wsl)

for i := 0; i < countItemsToExport; i += bvp.o.MaxExportBatchSize {
end := i + bvp.o.MaxExportBatchSize
Expand All @@ -233,23 +235,25 @@ func (bvp *BatchItemProcessor[T]) ImmediatelyExportItems(ctx context.Context, it
for i := 0; i < bvp.o.Workers; i++ {
wg.Add(1)

go func() {
go func(workerID int) {
defer wg.Done()

for itemsBatch := range batchCh {
bvp.log.WithFields(logrus.Fields{
"count": len(itemsBatch),
"worker": i,
"worker": workerID,
}).Debug("Immediately exporting items")

err := bvp.exportWithTimeout(ctx, itemsBatch)
if err != nil {
errCh <- err

cancel() // Cancel the context to stop other workers

return
}
}
}()
}(i)
}

wg.Wait()
Expand Down

0 comments on commit 1cb073a

Please sign in to comment.