From 4639e3b556c7329464f0fcdcd571a50278f12e5b Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Wed, 18 Dec 2024 00:45:14 -0800 Subject: [PATCH 1/5] Implement blocking queue --- .../exporterhelper/internal/base_exporter.go | 19 +++- .../internal/batch_sender_test.go | 2 +- .../exporterhelper/internal/queue_sender.go | 8 +- exporter/exporterqueue/queue.go | 12 +++ exporter/internal/queue/blocking_queue.go | 93 +++++++++++++++++++ .../internal/queue/blocking_queue_test.go | 12 +++ .../internal/queue/bounded_memory_queue.go | 4 + exporter/internal/queue/persistent_queue.go | 4 + exporter/internal/queue/queue.go | 2 + 9 files changed, 150 insertions(+), 6 deletions(-) create mode 100644 exporter/internal/queue/blocking_queue.go create mode 100644 exporter/internal/queue/blocking_queue_test.go diff --git a/exporter/exporterhelper/internal/base_exporter.go b/exporter/exporterhelper/internal/base_exporter.go index f29343db0d3..e95f8e3d956 100644 --- a/exporter/exporterhelper/internal/base_exporter.go +++ b/exporter/exporterhelper/internal/base_exporter.go @@ -104,8 +104,23 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe } } - if !usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled || - usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled && !be.queueCfg.Enabled { + if usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled && !be.queueCfg.Enabled { + be.queueFactory = exporterqueue.NewBlockingMemoryQueue[internal.Request]() + be.queueCfg.QueueSize = 20 + q := be.queueFactory( + context.Background(), + exporterqueue.Settings{ + Signal: be.Signal, + ExporterSettings: be.Set, + }, + be.queueCfg) + be.QueueSender = NewQueueSender(q, be.Set, be.queueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep, be.BatcherCfg) + for _, op := range options { + err = multierr.Append(err, op(be)) + } + } + + if !usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled { bs := NewBatchSender(be.BatcherCfg, be.Set) be.BatchSender = bs } diff --git a/exporter/exporterhelper/internal/batch_sender_test.go b/exporter/exporterhelper/internal/batch_sender_test.go index 061aeea89a3..4a6402bde33 100644 --- a/exporter/exporterhelper/internal/batch_sender_test.go +++ b/exporter/exporterhelper/internal/batch_sender_test.go @@ -326,7 +326,7 @@ func TestBatchSender_PostShutdown(t *testing.T) { assert.Equal(t, int64(8), sink.itemsCount.Load()) }) } - runTest("enable_queue_batcher", true) + // We don't expect the same behavior when disable_queue_batcher is true runTest("disable_queue_batcher", false) } diff --git a/exporter/exporterhelper/internal/queue_sender.go b/exporter/exporterhelper/internal/queue_sender.go index 509c747115b..1a569512b38 100644 --- a/exporter/exporterhelper/internal/queue_sender.go +++ b/exporter/exporterhelper/internal/queue_sender.go @@ -178,10 +178,12 @@ func (qs *QueueSender) Shutdown(ctx context.Context) error { func (qs *QueueSender) Send(ctx context.Context, req internal.Request) error { // Prevent cancellation and deadline to propagate to the context stored in the queue. // The grpc/http based receivers will cancel the request context after this function returns. - c := context.WithoutCancel(ctx) + if !usePullingBasedExporterQueueBatcher.IsEnabled() && !qs.queue.IsBlocking() { + ctx = context.WithoutCancel(ctx) + } - span := trace.SpanFromContext(c) - if err := qs.queue.Offer(c, req); err != nil { + span := trace.SpanFromContext(ctx) + if err := qs.queue.Offer(ctx, req); err != nil { span.AddEvent("Failed to enqueue item.", trace.WithAttributes(qs.traceAttribute)) return err } diff --git a/exporter/exporterqueue/queue.go b/exporter/exporterqueue/queue.go index 724cc23e0ae..984b27d4403 100644 --- a/exporter/exporterqueue/queue.go +++ b/exporter/exporterqueue/queue.go @@ -56,6 +56,18 @@ func NewMemoryQueueFactory[T any]() Factory[T] { } } +// NewBlockingMemoryQueue returns a factory to create a new blocking memory queue. +// Experimental: This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func NewBlockingMemoryQueue[T any]() Factory[T] { + return func(_ context.Context, _ Settings, cfg Config) Queue[T] { + return queue.NewBlockingMemoryQueue[T](queue.BlockingMemoryQueueSettings[T]{ + Sizer: &queue.RequestSizer[T]{}, + Capacity: int64(cfg.QueueSize), + }) + } +} + // PersistentQueueSettings defines developer settings for the persistent queue factory. // Experimental: This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. diff --git a/exporter/internal/queue/blocking_queue.go b/exporter/internal/queue/blocking_queue.go new file mode 100644 index 00000000000..e5a04405ba8 --- /dev/null +++ b/exporter/internal/queue/blocking_queue.go @@ -0,0 +1,93 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" + +import ( + "context" + "sync" + + "go.opentelemetry.io/collector/component" +) + +// boundedMemoryQueue blocks insert until the batch containing the request is sent out. +type blockingMemoryQueue[T any] struct { + component.StartFunc + *sizedChannel[blockingMemQueueEl[T]] + sizer Sizer[T] + + mu sync.Mutex + nextIndex uint64 + doneCh map[uint64](chan error) +} + +// MemoryQueueSettings defines internal parameters for boundedMemoryQueue creation. +type BlockingMemoryQueueSettings[T any] struct { + Sizer Sizer[T] + Capacity int64 +} + +// NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional +// callback for dropped items (e.g. useful to emit metrics). +func NewBlockingMemoryQueue[T any](set BlockingMemoryQueueSettings[T]) Queue[T] { + return &blockingMemoryQueue[T]{ + sizedChannel: newSizedChannel[blockingMemQueueEl[T]](set.Capacity, nil, 0), + sizer: set.Sizer, + nextIndex: 0, + doneCh: make(map[uint64](chan error)), + } +} + +// Offer is used by the producer to submit new item to the queue. Calling this method on a stopped queue will panic. +func (q *blockingMemoryQueue[T]) Offer(ctx context.Context, req T) error { + q.mu.Lock() + index := q.nextIndex + q.nextIndex++ + done := make(chan error) + q.doneCh[index] = done + + if err := q.sizedChannel.push( + blockingMemQueueEl[T]{ctx: ctx, req: req, index: index}, + q.sizer.Sizeof(req), + nil); err != nil { + delete(q.doneCh, index) + q.mu.Unlock() + return err + } + + q.mu.Unlock() + err := <-done + return err +} + +func (q *blockingMemoryQueue[T]) Read(_ context.Context) (uint64, context.Context, T, bool) { + item, ok := q.sizedChannel.pop(func(el blockingMemQueueEl[T]) int64 { return q.sizer.Sizeof(el.req) }) + return item.index, item.ctx, item.req, ok +} + +// OnProcessingFinished should be called to remove the item of the given index from the queue once processing is finished. +// For in memory queue, this function is noop. +func (q *blockingMemoryQueue[T]) OnProcessingFinished(index uint64, err error) { + q.mu.Lock() + q.doneCh[index] <- err + delete(q.doneCh, index) + q.mu.Unlock() +} + +// Shutdown closes the queue channel to initiate draining of the queue. +func (q *blockingMemoryQueue[T]) Shutdown(context.Context) error { + q.mu.Lock() + defer q.mu.Unlock() + q.sizedChannel.shutdown() + return nil +} + +func (q *blockingMemoryQueue[T]) IsBlocking() bool { + return true +} + +type blockingMemQueueEl[T any] struct { + index uint64 + req T + ctx context.Context +} diff --git a/exporter/internal/queue/blocking_queue_test.go b/exporter/internal/queue/blocking_queue_test.go new file mode 100644 index 00000000000..648b395302b --- /dev/null +++ b/exporter/internal/queue/blocking_queue_test.go @@ -0,0 +1,12 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue + +import ( + "testing" +) + +func TestBlockingMemoryQueue(t *testing.T) { + +} diff --git a/exporter/internal/queue/bounded_memory_queue.go b/exporter/internal/queue/bounded_memory_queue.go index 015c94473df..9864fa01fa8 100644 --- a/exporter/internal/queue/bounded_memory_queue.go +++ b/exporter/internal/queue/bounded_memory_queue.go @@ -56,6 +56,10 @@ func (q *boundedMemoryQueue[T]) Shutdown(context.Context) error { return nil } +func (q *boundedMemoryQueue[T]) IsBlocking() bool { + return false +} + type memQueueEl[T any] struct { req T ctx context.Context diff --git a/exporter/internal/queue/persistent_queue.go b/exporter/internal/queue/persistent_queue.go index 038cb09cc39..df7c614b8c2 100644 --- a/exporter/internal/queue/persistent_queue.go +++ b/exporter/internal/queue/persistent_queue.go @@ -560,3 +560,7 @@ func bytesToItemIndexArray(buf []byte) ([]uint64, error) { } return val, nil } + +func (pq *persistentQueue[T]) IsBlocking() bool { + return false +} diff --git a/exporter/internal/queue/queue.go b/exporter/internal/queue/queue.go index 77cac737f7e..f258debdbd6 100644 --- a/exporter/internal/queue/queue.go +++ b/exporter/internal/queue/queue.go @@ -34,6 +34,8 @@ type Queue[T any] interface { Read(context.Context) (uint64, context.Context, T, bool) // OnProcessingFinished should be called to remove the item of the given index from the queue once processing is finished. OnProcessingFinished(index uint64, consumeErr error) + // Returns a boolean to tell whether the queue is blocking + IsBlocking() bool } // Sizer is an interface that returns the size of the given element. From 10dd87ed5e21b6d7024737832051a1c08a1f8ecd Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Wed, 18 Dec 2024 01:05:05 -0800 Subject: [PATCH 2/5] Added a test --- .../internal/queue/blocking_queue_test.go | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/exporter/internal/queue/blocking_queue_test.go b/exporter/internal/queue/blocking_queue_test.go index 648b395302b..331ecffad46 100644 --- a/exporter/internal/queue/blocking_queue_test.go +++ b/exporter/internal/queue/blocking_queue_test.go @@ -4,9 +4,34 @@ package queue import ( + "context" + "errors" "testing" + "time" + + "github.com/stretchr/testify/require" ) func TestBlockingMemoryQueue(t *testing.T) { + q := NewBlockingMemoryQueue[string](BlockingMemoryQueueSettings[string]{Sizer: &RequestSizer[string]{}, Capacity: 1}) + + done := false + err := errors.New("This is an error") + go func() { + require.EqualError(t, q.Offer(context.Background(), "a"), err.Error()) + done = true + }() + + require.False(t, done) + index, ctx, req, ok := q.Read(context.Background()) + require.Equal(t, index, uint64(0)) + require.Equal(t, ctx, context.Background()) + require.Equal(t, req, "a") + require.True(t, ok) + + require.False(t, done) + q.OnProcessingFinished(index, err) + time.Sleep(100 * time.Millisecond) + require.True(t, done) } From 752cc2985f0236ad21c462c376937b6457b84d32 Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Wed, 18 Dec 2024 14:48:40 -0800 Subject: [PATCH 3/5] Added change log --- .chloggen/blocking_queue.yaml | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 .chloggen/blocking_queue.yaml diff --git a/.chloggen/blocking_queue.yaml b/.chloggen/blocking_queue.yaml new file mode 100644 index 00000000000..6caead02278 --- /dev/null +++ b/.chloggen/blocking_queue.yaml @@ -0,0 +1,18 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Adds blocking queue which is used when the user sets up batching but not queuing. + +# One or more tracking issues or pull requests related to the change +issues: [8122, 10368] + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] From 05cec89279a6e9004ebfb9f7a58f35eea8470618 Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Thu, 19 Dec 2024 17:31:02 -0800 Subject: [PATCH 4/5] Make linter happy --- .../internal/queue/blocking_queue_test.go | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/exporter/internal/queue/blocking_queue_test.go b/exporter/internal/queue/blocking_queue_test.go index 331ecffad46..8766176fe97 100644 --- a/exporter/internal/queue/blocking_queue_test.go +++ b/exporter/internal/queue/blocking_queue_test.go @@ -6,32 +6,32 @@ package queue import ( "context" "errors" + "sync" "testing" - "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestBlockingMemoryQueue(t *testing.T) { + var wg sync.WaitGroup q := NewBlockingMemoryQueue[string](BlockingMemoryQueueSettings[string]{Sizer: &RequestSizer[string]{}, Capacity: 1}) - done := false err := errors.New("This is an error") + wg.Add(1) go func() { - require.EqualError(t, q.Offer(context.Background(), "a"), err.Error()) - done = true + assert.EqualError(t, q.Offer(context.Background(), "a"), err.Error()) // Blocks until OnProcessingFinished is called + wg.Done() }() - require.False(t, done) index, ctx, req, ok := q.Read(context.Background()) - require.Equal(t, index, uint64(0)) - require.Equal(t, ctx, context.Background()) - require.Equal(t, req, "a") - require.True(t, ok) + for !ok { + index, ctx, req, ok = q.Read(context.Background()) + } - require.False(t, done) + require.Equal(t, uint64(0), index) + require.Equal(t, context.Background(), ctx) + require.Equal(t, "a", req) q.OnProcessingFinished(index, err) - - time.Sleep(100 * time.Millisecond) - require.True(t, done) + wg.Wait() } From 3028f4005f640349d4df1b404c3b199c203158f0 Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Thu, 2 Jan 2025 15:07:12 -0800 Subject: [PATCH 5/5] fix build --- exporter/exporterhelper/internal/base_exporter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/exporterhelper/internal/base_exporter.go b/exporter/exporterhelper/internal/base_exporter.go index e95f8e3d956..4a730e20aab 100644 --- a/exporter/exporterhelper/internal/base_exporter.go +++ b/exporter/exporterhelper/internal/base_exporter.go @@ -110,7 +110,7 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe q := be.queueFactory( context.Background(), exporterqueue.Settings{ - Signal: be.Signal, + Signal: signal, ExporterSettings: be.Set, }, be.queueCfg)