Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporterqueue] Implement blocking queue #11951

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .chloggen/blocking_queue.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adds blocking queue which is used when the user sets up batching but not queuing.

# One or more tracking issues or pull requests related to the change
issues: [8122, 10368]

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
19 changes: 17 additions & 2 deletions exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,23 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe
}
}

if !usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled ||
usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled && !be.queueCfg.Enabled {
if usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled && !be.queueCfg.Enabled {
be.queueFactory = exporterqueue.NewBlockingMemoryQueue[internal.Request]()
be.queueCfg.QueueSize = 20
q := be.queueFactory(
context.Background(),
exporterqueue.Settings{
Signal: be.Signal,
ExporterSettings: be.Set,
},
be.queueCfg)
be.QueueSender = NewQueueSender(q, be.Set, be.queueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep, be.BatcherCfg)
for _, op := range options {
err = multierr.Append(err, op(be))
}
}

if !usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled {
bs := NewBatchSender(be.BatcherCfg, be.Set)
be.BatchSender = bs
}
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/internal/batch_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func TestBatchSender_PostShutdown(t *testing.T) {
assert.Equal(t, int64(8), sink.itemsCount.Load())
})
}
runTest("enable_queue_batcher", true)
// We don't expect the same behavior when disable_queue_batcher is true
runTest("disable_queue_batcher", false)
}

Expand Down
8 changes: 5 additions & 3 deletions exporter/exporterhelper/internal/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,12 @@ func (qs *QueueSender) Shutdown(ctx context.Context) error {
func (qs *QueueSender) Send(ctx context.Context, req internal.Request) error {
// Prevent cancellation and deadline to propagate to the context stored in the queue.
// The grpc/http based receivers will cancel the request context after this function returns.
c := context.WithoutCancel(ctx)
if !usePullingBasedExporterQueueBatcher.IsEnabled() && !qs.queue.IsBlocking() {
ctx = context.WithoutCancel(ctx)
}

span := trace.SpanFromContext(c)
if err := qs.queue.Offer(c, req); err != nil {
span := trace.SpanFromContext(ctx)
if err := qs.queue.Offer(ctx, req); err != nil {
span.AddEvent("Failed to enqueue item.", trace.WithAttributes(qs.traceAttribute))
return err
}
Expand Down
12 changes: 12 additions & 0 deletions exporter/exporterqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,18 @@ func NewMemoryQueueFactory[T any]() Factory[T] {
}
}

// NewBlockingMemoryQueue returns a factory to create a new blocking memory queue.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func NewBlockingMemoryQueue[T any]() Factory[T] {
return func(_ context.Context, _ Settings, cfg Config) Queue[T] {
return queue.NewBlockingMemoryQueue[T](queue.BlockingMemoryQueueSettings[T]{
Sizer: &queue.RequestSizer[T]{},
Capacity: int64(cfg.QueueSize),
})
}
}

// PersistentQueueSettings defines developer settings for the persistent queue factory.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
Expand Down
93 changes: 93 additions & 0 deletions exporter/internal/queue/blocking_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package queue // import "go.opentelemetry.io/collector/exporter/internal/queue"

import (
"context"
"sync"

"go.opentelemetry.io/collector/component"
)

// boundedMemoryQueue blocks insert until the batch containing the request is sent out.
type blockingMemoryQueue[T any] struct {
component.StartFunc
*sizedChannel[blockingMemQueueEl[T]]
sizer Sizer[T]

mu sync.Mutex
nextIndex uint64
doneCh map[uint64](chan error)
}

// MemoryQueueSettings defines internal parameters for boundedMemoryQueue creation.
type BlockingMemoryQueueSettings[T any] struct {
Sizer Sizer[T]
Capacity int64
}

// NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional
// callback for dropped items (e.g. useful to emit metrics).
func NewBlockingMemoryQueue[T any](set BlockingMemoryQueueSettings[T]) Queue[T] {
return &blockingMemoryQueue[T]{
sizedChannel: newSizedChannel[blockingMemQueueEl[T]](set.Capacity, nil, 0),
sizer: set.Sizer,
nextIndex: 0,
doneCh: make(map[uint64](chan error)),
}
}

// Offer is used by the producer to submit new item to the queue. Calling this method on a stopped queue will panic.
func (q *blockingMemoryQueue[T]) Offer(ctx context.Context, req T) error {
q.mu.Lock()
index := q.nextIndex
q.nextIndex++
done := make(chan error)
q.doneCh[index] = done

if err := q.sizedChannel.push(
blockingMemQueueEl[T]{ctx: ctx, req: req, index: index},
q.sizer.Sizeof(req),
nil); err != nil {
delete(q.doneCh, index)
q.mu.Unlock()
return err
}

Check warning on line 56 in exporter/internal/queue/blocking_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/internal/queue/blocking_queue.go#L53-L56

Added lines #L53 - L56 were not covered by tests

q.mu.Unlock()
err := <-done
return err
}

func (q *blockingMemoryQueue[T]) Read(_ context.Context) (uint64, context.Context, T, bool) {
item, ok := q.sizedChannel.pop(func(el blockingMemQueueEl[T]) int64 { return q.sizer.Sizeof(el.req) })
return item.index, item.ctx, item.req, ok
}

// OnProcessingFinished should be called to remove the item of the given index from the queue once processing is finished.
// For in memory queue, this function is noop.
func (q *blockingMemoryQueue[T]) OnProcessingFinished(index uint64, err error) {
q.mu.Lock()
q.doneCh[index] <- err
delete(q.doneCh, index)
q.mu.Unlock()
}

// Shutdown closes the queue channel to initiate draining of the queue.
func (q *blockingMemoryQueue[T]) Shutdown(context.Context) error {
q.mu.Lock()
defer q.mu.Unlock()
q.sizedChannel.shutdown()
return nil
}

func (q *blockingMemoryQueue[T]) IsBlocking() bool {
return true

Check warning on line 86 in exporter/internal/queue/blocking_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/internal/queue/blocking_queue.go#L85-L86

Added lines #L85 - L86 were not covered by tests
}

type blockingMemQueueEl[T any] struct {
index uint64
req T
ctx context.Context
}
37 changes: 37 additions & 0 deletions exporter/internal/queue/blocking_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package queue

import (
"context"
"errors"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestBlockingMemoryQueue(t *testing.T) {
var wg sync.WaitGroup
q := NewBlockingMemoryQueue[string](BlockingMemoryQueueSettings[string]{Sizer: &RequestSizer[string]{}, Capacity: 1})

err := errors.New("This is an error")
wg.Add(1)
go func() {
assert.EqualError(t, q.Offer(context.Background(), "a"), err.Error()) // Blocks until OnProcessingFinished is called
wg.Done()
}()

index, ctx, req, ok := q.Read(context.Background())
for !ok {
index, ctx, req, ok = q.Read(context.Background())
}

require.Equal(t, uint64(0), index)
require.Equal(t, context.Background(), ctx)
require.Equal(t, "a", req)
q.OnProcessingFinished(index, err)
wg.Wait()
}
4 changes: 4 additions & 0 deletions exporter/internal/queue/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func (q *boundedMemoryQueue[T]) Shutdown(context.Context) error {
return nil
}

func (q *boundedMemoryQueue[T]) IsBlocking() bool {
return false
}

type memQueueEl[T any] struct {
req T
ctx context.Context
Expand Down
4 changes: 4 additions & 0 deletions exporter/internal/queue/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,3 +560,7 @@ func bytesToItemIndexArray(buf []byte) ([]uint64, error) {
}
return val, nil
}

func (pq *persistentQueue[T]) IsBlocking() bool {
return false
}
2 changes: 2 additions & 0 deletions exporter/internal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type Queue[T any] interface {
Read(context.Context) (uint64, context.Context, T, bool)
// OnProcessingFinished should be called to remove the item of the given index from the queue once processing is finished.
OnProcessingFinished(index uint64, consumeErr error)
// Returns a boolean to tell whether the queue is blocking
IsBlocking() bool
}

// Sizer is an interface that returns the size of the given element.
Expand Down
Loading