diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/subscriber.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/subscriber.rb index 6536b9bc5..0fe7d233c 100644 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/subscriber.rb +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/subscriber.rb @@ -49,11 +49,16 @@ def deserialize(job_data) module OpenTelemetry module Instrumentation module ActiveJob + class EnqueueSubscriber + def initialize(tracer) + @tracer = tracer + end + def on_start(name, _id, payload, subscriber) - span = subscriber.tracer.start_span("#{payload.fetch(:job).queue_name} publish", + span = @tracer.start_span("#{payload.fetch(:job).queue_name} publish", kind: :producer, - attributes: subscriber.job_attributes(payload.fetch(:job))) + attributes: subscriber.as_otel_semconv_attrs(payload.fetch(:job))) tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))] OpenTelemetry.propagation.inject(payload.fetch(:job).__otel_headers) # This must be transmitted over the wire { span: span, ctx_tokens: tokens } @@ -61,6 +66,10 @@ def on_start(name, _id, payload, subscriber) end class PerformSubscriber + def initialize(tracer) + @tracer = tracer + end + def on_start(name, _id, payload, subscriber) tokens = [] parent_context = OpenTelemetry.propagation.extract(payload.fetch(:job).__otel_headers) @@ -71,10 +80,10 @@ def on_start(name, _id, payload, subscriber) links = [OpenTelemetry::Trace::Link.new(span_context)] end - span = subscriber.tracer.start_span( + span = @tracer.start_span( "#{payload.fetch(:job).queue_name} process", kind: :consumer, - attributes: subscriber.job_attributes(payload.fetch(:job)), + attributes: subscriber.as_otel_semconv_attrs(payload.fetch(:job)), links: links ) @@ -87,12 +96,19 @@ def on_start(name, _id, payload, subscriber) end class Subscriber < ::ActiveSupport::Subscriber - TEST_ADAPTERS = %w[async inline] EVENT_HANDLERS = { - 'enqueue.active_job' => EnqueueSubscriber.new, - 'perform.active_job' => PerformSubscriber.new, + 'enqueue.active_job' => EnqueueSubscriber.new(OpenTelemetry.tracer_provider.tracer('otel-active_job', '0.0.1')), + 'perform.active_job' => PerformSubscriber.new(OpenTelemetry.tracer_provider.tracer('otel-active_job', '0.0.1')), } + def initialize(...) + super + @handlers_by_pattern = { + 'enqueue.active_job' => EnqueueSubscriber.new(OpenTelemetry.tracer_provider.tracer('otel-active_job', '0.0.1')), + 'perform.active_job' => PerformSubscriber.new(OpenTelemetry.tracer_provider.tracer('otel-active_job', '0.0.1')), + } + end + attach_to :active_job # The methods below are the events the Subscriber is interested in. @@ -101,7 +117,7 @@ def perform(...);end def start(name, id, payload) begin - payload.merge!(__otel: EVENT_HANDLERS.fetch(name).on_start(name, id, payload, self)) # The payload is _not_ transmitted over the wire + payload.merge!(__otel: @handlers_by_pattern.fetch(name).on_start(name, id, payload, self)) # The payload is _not_ transmitted over the wire rescue StandardError => error OpenTelemetry.handle_error(exception: error) end @@ -136,13 +152,15 @@ def finish(_name, _id, payload) end def on_start(name, _id, payload) - span = tracer.start_span(name, attributes: job_attributes(payload.fetch(:job))) + span = tracer.start_span(name, attributes: as_otel_semconv_attrs(payload.fetch(:job))) tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))] OpenTelemetry.propagation.inject(payload.fetch(:job).__otel_headers) # This must be transmitted over the wire { span: span, ctx_tokens: tokens } end - def job_attributes(job) + def as_otel_semconv_attrs(job) + test_adapters = %w[async inline] + otel_attributes = { 'code.namespace' => job.class.name, 'messaging.destination_kind' => 'queue', @@ -153,7 +171,7 @@ def job_attributes(job) 'messaging.active_job.priority' => job.priority } - otel_attributes['net.transport'] = 'inproc' if TEST_ADAPTERS.include?(job.class.queue_adapter_name) + otel_attributes['net.transport'] = 'inproc' if test_adapters.include?(job.class.queue_adapter_name) otel_attributes.compact! otel_attributes