From 9927df8012a51a34653c36f373a2e8d9b19ed7cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Lavoie?= Date: Wed, 21 Aug 2024 12:00:10 -0400 Subject: [PATCH] fix(active-job): Propagate context between enqueue and perform (#1132) --- .../active_job/handlers/enqueue.rb | 3 +- .../active_job/handlers/perform.rb | 2 +- .../active_job/handlers/perform_test.rb | 70 +++++++++++++++++++ 3 files changed, 73 insertions(+), 2 deletions(-) diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/enqueue.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/enqueue.rb index 23dab795e..9350516ff 100644 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/enqueue.rb +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/enqueue.rb @@ -23,8 +23,9 @@ def start_span(name, _id, payload) job = payload.fetch(:job) span_name = span_name(job, EVENT_NAME) span = tracer.start_span(span_name, kind: :producer, attributes: @mapper.call(payload)) + token = OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span)) OpenTelemetry.propagation.inject(job.__otel_headers) # This must be transmitted over the wire - { span: span, ctx_token: OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span)) } + { span: span, ctx_token: token } end end end diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/perform.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/perform.rb index b617b8ee2..88141f754 100644 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/perform.rb +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/perform.rb @@ -27,7 +27,7 @@ def start_span(name, _id, payload) # TODO: Refactor into a propagation strategy propagation_style = @config[:propagation_style] if propagation_style == :child - span = tracer.start_span(span_name, kind: :consumer, attributes: @mapper.call(payload)) + span = tracer.start_span(span_name, with_parent: parent_context, kind: :consumer, attributes: @mapper.call(payload)) else span_context = OpenTelemetry::Trace.current_span(parent_context).context links = [OpenTelemetry::Trace::Link.new(span_context)] if span_context.valid? && propagation_style == :link diff --git a/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/perform_test.rb b/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/perform_test.rb index 3227bf62f..de886c206 100644 --- a/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/perform_test.rb +++ b/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/perform_test.rb @@ -176,6 +176,18 @@ _(process_span.links[0].span_context.span_id).must_equal(publish_span.span_id) end + it 'does not interfere with an outer span' do + instrumentation.tracer.in_span('outer span') do + TestJob.perform_later + end + + _(publish_span.trace_id).wont_equal(process_span.trace_id) + + _(process_span.total_recorded_links).must_equal(1) + _(process_span.links[0].span_context.trace_id).must_equal(publish_span.trace_id) + _(process_span.links[0].span_context.span_id).must_equal(publish_span.span_id) + end + it 'propagates baggage' do ctx = OpenTelemetry::Baggage.set_value('testing_baggage', 'it_worked') OpenTelemetry::Context.with_current(ctx) do @@ -190,6 +202,30 @@ _(process_span.attributes['success']).must_equal(true) end + + describe 'with an async queue adapter' do + before do + begin + ActiveJob::Base.queue_adapter.shutdown + rescue StandardError + nil + end + + singleton_class.include ActiveJob::TestHelper + ActiveJob::Base.queue_adapter = :test + end + + it 'creates span links in separate traces' do + TestJob.perform_later + perform_enqueued_jobs + + _(publish_span.trace_id).wont_equal(process_span.trace_id) + + _(process_span.total_recorded_links).must_equal(1) + _(process_span.links[0].span_context.trace_id).must_equal(publish_span.trace_id) + _(process_span.links[0].span_context.span_id).must_equal(publish_span.span_id) + end + end end describe 'when configured to do parent/child spans' do @@ -204,6 +240,17 @@ _(process_span.parent_span_id).must_equal(publish_span.span_id) end + it 'does not interfere with an outer span' do + instrumentation.tracer.in_span('outer span') do + TestJob.perform_later + end + + _(process_span.total_recorded_links).must_equal(0) + + _(publish_span.trace_id).must_equal(process_span.trace_id) + _(process_span.parent_span_id).must_equal(publish_span.span_id) + end + it 'propagates baggage' do ctx = OpenTelemetry::Baggage.set_value('testing_baggage', 'it_worked') OpenTelemetry::Context.with_current(ctx) do @@ -215,6 +262,29 @@ _(process_span.parent_span_id).must_equal(publish_span.span_id) _(process_span.attributes['success']).must_equal(true) end + + describe 'with an async queue adapter' do + before do + begin + ActiveJob::Base.queue_adapter.shutdown + rescue StandardError + nil + end + + singleton_class.include ActiveJob::TestHelper + ActiveJob::Base.queue_adapter = :test + end + + it 'creates a parent/child relationship' do + TestJob.perform_later + perform_enqueued_jobs + + _(process_span.total_recorded_links).must_equal(0) + + _(publish_span.trace_id).must_equal(process_span.trace_id) + _(process_span.parent_span_id).must_equal(publish_span.span_id) + end + end end describe 'when explicitly configure for no propagation' do