Skip to content

Commit

Permalink
fix(active-job): Propagate context between enqueue and perform (#1132)
Browse files Browse the repository at this point in the history
  • Loading branch information
lavoiesl authored Aug 21, 2024
1 parent a564817 commit 9927df8
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ def start_span(name, _id, payload)
job = payload.fetch(:job)
span_name = span_name(job, EVENT_NAME)
span = tracer.start_span(span_name, kind: :producer, attributes: @mapper.call(payload))
token = OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))
OpenTelemetry.propagation.inject(job.__otel_headers) # This must be transmitted over the wire
{ span: span, ctx_token: OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span)) }
{ span: span, ctx_token: token }
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def start_span(name, _id, payload)
# TODO: Refactor into a propagation strategy
propagation_style = @config[:propagation_style]
if propagation_style == :child
span = tracer.start_span(span_name, kind: :consumer, attributes: @mapper.call(payload))
span = tracer.start_span(span_name, with_parent: parent_context, kind: :consumer, attributes: @mapper.call(payload))
else
span_context = OpenTelemetry::Trace.current_span(parent_context).context
links = [OpenTelemetry::Trace::Link.new(span_context)] if span_context.valid? && propagation_style == :link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,18 @@
_(process_span.links[0].span_context.span_id).must_equal(publish_span.span_id)
end

it 'does not interfere with an outer span' do
instrumentation.tracer.in_span('outer span') do
TestJob.perform_later
end

_(publish_span.trace_id).wont_equal(process_span.trace_id)

_(process_span.total_recorded_links).must_equal(1)
_(process_span.links[0].span_context.trace_id).must_equal(publish_span.trace_id)
_(process_span.links[0].span_context.span_id).must_equal(publish_span.span_id)
end

it 'propagates baggage' do
ctx = OpenTelemetry::Baggage.set_value('testing_baggage', 'it_worked')
OpenTelemetry::Context.with_current(ctx) do
Expand All @@ -190,6 +202,30 @@

_(process_span.attributes['success']).must_equal(true)
end

describe 'with an async queue adapter' do
before do
begin
ActiveJob::Base.queue_adapter.shutdown
rescue StandardError
nil
end

singleton_class.include ActiveJob::TestHelper
ActiveJob::Base.queue_adapter = :test
end

it 'creates span links in separate traces' do
TestJob.perform_later
perform_enqueued_jobs

_(publish_span.trace_id).wont_equal(process_span.trace_id)

_(process_span.total_recorded_links).must_equal(1)
_(process_span.links[0].span_context.trace_id).must_equal(publish_span.trace_id)
_(process_span.links[0].span_context.span_id).must_equal(publish_span.span_id)
end
end
end

describe 'when configured to do parent/child spans' do
Expand All @@ -204,6 +240,17 @@
_(process_span.parent_span_id).must_equal(publish_span.span_id)
end

it 'does not interfere with an outer span' do
instrumentation.tracer.in_span('outer span') do
TestJob.perform_later
end

_(process_span.total_recorded_links).must_equal(0)

_(publish_span.trace_id).must_equal(process_span.trace_id)
_(process_span.parent_span_id).must_equal(publish_span.span_id)
end

it 'propagates baggage' do
ctx = OpenTelemetry::Baggage.set_value('testing_baggage', 'it_worked')
OpenTelemetry::Context.with_current(ctx) do
Expand All @@ -215,6 +262,29 @@
_(process_span.parent_span_id).must_equal(publish_span.span_id)
_(process_span.attributes['success']).must_equal(true)
end

describe 'with an async queue adapter' do
before do
begin
ActiveJob::Base.queue_adapter.shutdown
rescue StandardError
nil
end

singleton_class.include ActiveJob::TestHelper
ActiveJob::Base.queue_adapter = :test
end

it 'creates a parent/child relationship' do
TestJob.perform_later
perform_enqueued_jobs

_(process_span.total_recorded_links).must_equal(0)

_(publish_span.trace_id).must_equal(process_span.trace_id)
_(process_span.parent_span_id).must_equal(publish_span.span_id)
end
end
end

describe 'when explicitly configure for no propagation' do
Expand Down

0 comments on commit 9927df8

Please sign in to comment.