Skip to content

Commit

Permalink
feat(processor): Add multiple worker support
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm committed Oct 23, 2023
1 parent 1878369 commit 2f91c49
Show file tree
Hide file tree
Showing 6 changed files with 225 additions and 234 deletions.
1 change: 1 addition & 0 deletions pkg/output/http/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Config struct {
MaxExportBatchSize int `yaml:"maxExportBatchSize" default:"512"`
Compression CompressionStrategy `yaml:"compression" default:"none"`
KeepAlive *bool `yaml:"keepAlive" default:"true"`
Workers int `yaml:"workers" default:"1"`
}

func (c *Config) Validate() error {
Expand Down
1 change: 1 addition & 0 deletions pkg/output/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func New(name string, config *Config, log logrus.FieldLogger, filterConfig *xatu
processor.WithExportTimeout(config.ExportTimeout),
processor.WithMaxExportBatchSize(config.MaxExportBatchSize),
processor.WithShippingMethod(shippingMethod),
processor.WithWorkers(config.Workers),
)
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions pkg/output/xatu/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Config struct {
BatchTimeout time.Duration `yaml:"batchTimeout" default:"5s"`
ExportTimeout time.Duration `yaml:"exportTimeout" default:"30s"`
MaxExportBatchSize int `yaml:"maxExportBatchSize" default:"512"`
Workers int `yaml:"workers" default:"1"`
}

func (c *Config) Validate() error {
Expand Down
1 change: 1 addition & 0 deletions pkg/output/xatu/xatu.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func New(name string, config *Config, log logrus.FieldLogger, filterConfig *xatu
processor.WithExportTimeout(config.ExportTimeout),
processor.WithMaxExportBatchSize(config.MaxExportBatchSize),
processor.WithShippingMethod(shippingMethod),
processor.WithWorkers(config.Workers),
)
if err != nil {
return nil, err
Expand Down
228 changes: 104 additions & 124 deletions pkg/processor/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const (
DefaultExportTimeout = 30000
DefaultMaxExportBatchSize = 512
DefaultShippingMethod = ShippingMethodAsync
DefaultNumWorkers = 1
)

type ShippingMethod string
Expand Down Expand Up @@ -85,6 +86,9 @@ type BatchItemProcessorOptions struct {

// ShippingMethod is the method used to ship items to the exporter.
ShippingMethod ShippingMethod

// Number of workers to process items.
Workers int
}

// BatchItemProcessor is a buffer that batches asynchronously-received
Expand All @@ -101,12 +105,17 @@ type BatchItemProcessor[T any] struct {

metrics *Metrics

batches chan []*T
batchReady chan bool

batch []*T
batchMutex sync.Mutex
timer *time.Timer
stopWait sync.WaitGroup
stopOnce sync.Once
stopCh chan struct{}

timer *time.Timer
stopWait sync.WaitGroup
stopOnce sync.Once
stopCh chan struct{}
stopWorkersCh chan struct{}
}

// NewBatchItemProcessor creates a new ItemProcessor that will send completed
Expand All @@ -131,6 +140,7 @@ func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log log
MaxQueueSize: maxQueueSize,
MaxExportBatchSize: maxExportBatchSize,
ShippingMethod: DefaultShippingMethod,
Workers: DefaultNumWorkers,
}
for _, opt := range options {
opt(&o)
Expand All @@ -139,24 +149,32 @@ func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log log
metrics := DefaultMetrics

bvp := BatchItemProcessor[T]{
e: exporter,
o: o,
log: log,
name: name,
metrics: metrics,
batch: make([]*T, 0, o.MaxExportBatchSize),
timer: time.NewTimer(o.BatchTimeout),
queue: make(chan *T, o.MaxQueueSize),
stopCh: make(chan struct{}),
e: exporter,
o: o,
log: log,
name: name,
metrics: metrics,
batch: make([]*T, 0, o.MaxExportBatchSize),
timer: time.NewTimer(o.BatchTimeout),
queue: make(chan *T, o.MaxQueueSize),
stopCh: make(chan struct{}),
stopWorkersCh: make(chan struct{}),
}

bvp.stopWait.Add(1)
bvp.batches = make(chan []*T, o.Workers) // Buffer the channel to hold batches for each worker
bvp.batchReady = make(chan bool, 1)

go func() {
defer bvp.stopWait.Done()
bvp.processQueue()
bvp.drainQueue()
}()
bvp.stopWait.Add(o.Workers)

for i := 0; i < o.Workers; i++ {
go func() {
defer bvp.stopWait.Done()

bvp.worker(context.Background())
}()
}

go bvp.batchBuilder(context.Background()) // Start building batches

return &bvp, nil
}
Expand Down Expand Up @@ -243,16 +261,33 @@ func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBa
func (bvp *BatchItemProcessor[T]) Shutdown(ctx context.Context) error {
var err error

bvp.log.Debug("Shutting down processor")

bvp.stopOnce.Do(func() {
wait := make(chan struct{})
go func() {
// Stop accepting new items
close(bvp.stopCh)

// Drain the queue
bvp.drainQueue()

// Stop the timer
bvp.timer.Stop()

// Stop the workers
close(bvp.stopWorkersCh)

// Wait for the workers to finish
bvp.stopWait.Wait()

// Shutdown the exporter
if bvp.e != nil {
if err = bvp.e.Shutdown(ctx); err != nil {
bvp.log.WithError(err).Error("failed to shutdown processor")
}
}

close(wait)
}()
// Wait until the wait group is done or the context is cancelled
Expand All @@ -266,29 +301,6 @@ func (bvp *BatchItemProcessor[T]) Shutdown(ctx context.Context) error {
return err
}

// ForceFlush exports all ended items that have not yet been exported.
func (bvp *BatchItemProcessor[T]) ForceFlush(ctx context.Context) error {
var err error

if bvp.e != nil {
wait := make(chan error)

go func() {
wait <- bvp.exportItems(ctx)
close(wait)
}()

// Wait until the export is finished or the context is cancelled/timed out
select {
case err = <-wait:
case <-ctx.Done():
err = ctx.Err()
}
}

return err
}

// WithMaxQueueSize returns a BatchItemProcessorOption that configures the
// maximum queue size allowed for a BatchItemProcessor.
func WithMaxQueueSize(size int) BatchItemProcessorOption {
Expand Down Expand Up @@ -332,118 +344,86 @@ func WithShippingMethod(method ShippingMethod) BatchItemProcessorOption {
}
}

// exportItems is a subroutine of processing and draining the queue.
func (bvp *BatchItemProcessor[T]) exportItems(ctx context.Context) error {
bvp.timer.Reset(bvp.o.BatchTimeout)

bvp.batchMutex.Lock()
defer bvp.batchMutex.Unlock()

if bvp.o.ExportTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, bvp.o.ExportTimeout)

defer cancel()
// WithWorkers returns a BatchItemProcessorOption that configures the
// number of workers to process items.
func WithWorkers(workers int) BatchItemProcessorOption {
return func(o *BatchItemProcessorOptions) {
o.Workers = workers
}
}

if l := len(bvp.batch); l > 0 {
countItemsToExport := len(bvp.batch)
func (bvp *BatchItemProcessor[T]) batchBuilder(ctx context.Context) {
for {
select {
case <-bvp.stopCh:
return
case sd := <-bvp.queue:
bvp.batchMutex.Lock()

bvp.log.WithFields(logrus.Fields{
"count": countItemsToExport,
"total_dropped": atomic.LoadUint32(&bvp.dropped),
}).Debug("exporting items")
bvp.batch = append(bvp.batch, sd)

err := bvp.e.ExportItems(ctx, bvp.batch)
if len(bvp.batch) >= bvp.o.MaxExportBatchSize {
batchCopy := make([]*T, len(bvp.batch))
copy(batchCopy, bvp.batch)
bvp.batches <- batchCopy
bvp.batch = bvp.batch[:0]
bvp.batchReady <- true
}

bvp.metrics.IncItemsExportedBy(bvp.name, float64(countItemsToExport))
bvp.batchMutex.Unlock()
case <-bvp.timer.C:
bvp.batchMutex.Lock()

// A new batch is always created after exporting, even if the batch failed to be exported.
//
// It is up to the exporter to implement any type of retry logic if a batch is failing
// to be exported, since it is specific to the protocol and backend being sent to.
bvp.batch = bvp.batch[:0]
if len(bvp.batch) > 0 {
batchCopy := make([]*T, len(bvp.batch))
copy(batchCopy, bvp.batch)
bvp.batches <- batchCopy
bvp.batch = bvp.batch[:0]
bvp.batchReady <- true
}

if err != nil {
return err
bvp.batchMutex.Unlock()
}
}

return nil
}

// processQueue removes items from the `queue` channel until processor
// worker removes items from the `queue` channel until processor
// is shut down. It calls the exporter in batches of up to MaxExportBatchSize
// waiting up to BatchTimeout to form a batch.
func (bvp *BatchItemProcessor[T]) processQueue() {
defer bvp.timer.Stop()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

func (bvp *BatchItemProcessor[T]) worker(ctx context.Context) {
for {
select {
case <-bvp.stopCh:
case <-bvp.stopWorkersCh:
return
case <-bvp.timer.C:
if err := bvp.exportItems(ctx); err != nil {
case <-bvp.batchReady:
batch := <-bvp.batches
if err := bvp.exportWithTimeout(ctx, batch); err != nil {
bvp.log.WithError(err).Error("failed to export items")
}
case sd := <-bvp.queue:
bvp.batchMutex.Lock()
bvp.batch = append(bvp.batch, sd)
shouldExport := len(bvp.batch) >= bvp.o.MaxExportBatchSize
bvp.batchMutex.Unlock()

if shouldExport {
if !bvp.timer.Stop() {
<-bvp.timer.C
}

if err := bvp.exportItems(ctx); err != nil {
bvp.log.WithError(err).Error("failed to export items")
}
}
}
}
}

// drainQueue awaits the any caller that had added to bvp.stopWait
// to finish the enqueue, then exports the final batch.
func (bvp *BatchItemProcessor[T]) drainQueue() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

for {
select {
case sd := <-bvp.queue:
if sd == nil {
if err := bvp.exportItems(ctx); err != nil {
bvp.log.WithError(err).Error("failed to export items")
}

return
}

bvp.batchMutex.Lock()
bvp.batch = append(bvp.batch, sd)
shouldExport := len(bvp.batch) == bvp.o.MaxExportBatchSize
bvp.batchMutex.Unlock()
// Wait for the batch builder to send all remaining items to the workers.
for len(bvp.queue) > 0 {
time.Sleep(10 * time.Millisecond)
}

if shouldExport {
if err := bvp.exportItems(ctx); err != nil {
bvp.log.WithError(err).Error("failed to export items")
}
}
default:
close(bvp.queue)
}
// Wait for the workers to finish processing all batches.
for len(bvp.batches) > 0 {
time.Sleep(10 * time.Millisecond)
}

// Close the batches channel since no more batches will be sent.
close(bvp.batches)
}

func (bvp *BatchItemProcessor[T]) enqueue(sd *T) {
ctx := context.TODO()
bvp.enqueueDrop(ctx, sd)
bvp.enqueueOrDrop(ctx, sd)
}

func recoverSendOnClosedChan() {
Expand All @@ -459,7 +439,7 @@ func recoverSendOnClosedChan() {
panic(x)
}

func (bvp *BatchItemProcessor[T]) enqueueDrop(ctx context.Context, sd *T) bool {
func (bvp *BatchItemProcessor[T]) enqueueOrDrop(ctx context.Context, sd *T) bool {
// This ensures the bvp.queue<- below does not panic as the
// processor shuts down.
defer recoverSendOnClosedChan()
Expand Down
Loading

0 comments on commit 2f91c49

Please sign in to comment.