From 25eed1bf6ff0d08df9a9dfd71a0ad28d876260b6 Mon Sep 17 00:00:00 2001 From: Ariel Valentin Date: Sun, 24 Sep 2023 14:17:53 +0000 Subject: [PATCH] refactor: Extract Module --- .../instrumentation/active_job/subscriber.rb | 70 +++++++++++-------- 1 file changed, 39 insertions(+), 31 deletions(-) diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/subscriber.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/subscriber.rb index c409547fd..c279df6b9 100644 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/subscriber.rb +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/subscriber.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require 'active_support/subscriber' +require_relative './version' module OpenTelemetry module Instrumentation @@ -50,13 +51,36 @@ module OpenTelemetry module Instrumentation module ActiveJob + module AttributeProcessor + def to_otel_semconv_attributes(job) + test_adapters = %w[async inline] + + otel_attributes = { + 'code.namespace' => job.class.name, + 'messaging.destination_kind' => 'queue', + 'messaging.system' => job.class.queue_adapter_name, + 'messaging.destination' => job.queue_name, + 'messaging.message_id' => job.job_id, + 'messaging.active_job.provider_job_id' => job.provider_job_id, + 'messaging.active_job.priority' => job.priority + } + + otel_attributes['net.transport'] = 'inproc' if test_adapters.include?(job.class.queue_adapter_name) + otel_attributes.compact! + + otel_attributes + end + end + class DefaultHandler + include AttributeProcessor + def initialize(tracer) @tracer = tracer end - def on_start(name, _id, payload, attribute_processor) - span = @tracer.start_span(name, attributes: attribute_processor.as_otel_semconv_attrs(payload.fetch(:job))) + def on_start(name, _id, payload) + span = @tracer.start_span(name, attributes: to_otel_semconv_attributes(payload.fetch(:job))) tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))] OpenTelemetry.propagation.inject(payload.fetch(:job).__otel_headers) # This must be transmitted over the wire { span: span, ctx_tokens: tokens } @@ -64,14 +88,16 @@ def on_start(name, _id, payload, attribute_processor) end class EnqueueHandler + include AttributeProcessor + def initialize(tracer) @tracer = tracer end - def on_start(name, _id, payload, attribute_processor) + def on_start(name, _id, payload) span = @tracer.start_span("#{payload.fetch(:job).queue_name} publish", kind: :producer, - attributes: subscriber.as_otel_semconv_attrs(payload.fetch(:job))) + attributes: to_otel_semconv_attributes(payload.fetch(:job))) tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))] OpenTelemetry.propagation.inject(payload.fetch(:job).__otel_headers) # This must be transmitted over the wire { span: span, ctx_tokens: tokens } @@ -79,11 +105,13 @@ def on_start(name, _id, payload, attribute_processor) end class PerformHandler + include AttributeProcessor + def initialize(tracer) @tracer = tracer end - def on_start(name, _id, payload, attribute_processor) + def on_start(name, _id, payload) tokens = [] parent_context = OpenTelemetry.propagation.extract(payload.fetch(:job).__otel_headers) span_context = OpenTelemetry::Trace.current_span(parent_context).context @@ -96,7 +124,7 @@ def on_start(name, _id, payload, attribute_processor) span = @tracer.start_span( "#{payload.fetch(:job).queue_name} process", kind: :consumer, - attributes: attribute_processor.as_otel_semconv_attrs(payload.fetch(:job)), + attributes: to_otel_semconv_attributes(payload.fetch(:job)), links: links ) @@ -113,11 +141,11 @@ class Subscriber < ::ActiveSupport::Subscriber def initialize(...) super - @tracer = OpenTelemetry.tracer_provider.tracer('otel-active_job', '0.0.1') - default_handler = DefaultHandler.new(@tracer) + tracer = OpenTelemetry.tracer_provider.tracer('otel-active_job', ::OpenTelemetry::Instrumentation::ActiveJob::VERSION) + default_handler = DefaultHandler.new(tracer) @handlers_by_pattern = { - 'enqueue.active_job' => EnqueueHandler.new(@tracer), - 'perform.active_job' => PerformHandler.new(@tracer), + 'enqueue.active_job' => EnqueueHandler.new(tracer), + 'perform.active_job' => PerformHandler.new(tracer), } @handlers_by_pattern.default = default_handler end @@ -129,7 +157,7 @@ def perform(...);end def start(name, id, payload) begin - payload.merge!(__otel: @handlers_by_pattern[name].on_start(name, id, payload, self)) # The payload is _not_ transmitted over the wire + payload.merge!(__otel: @handlers_by_pattern[name].on_start(name, id, payload)) # The payload is _not_ transmitted over the wire rescue StandardError => error OpenTelemetry.handle_error(exception: error) end @@ -163,26 +191,6 @@ def finish(_name, _id, payload) end end - - def as_otel_semconv_attrs(job) - test_adapters = %w[async inline] - - otel_attributes = { - 'code.namespace' => job.class.name, - 'messaging.destination_kind' => 'queue', - 'messaging.system' => job.class.queue_adapter_name, - 'messaging.destination' => job.queue_name, - 'messaging.message_id' => job.job_id, - 'messaging.active_job.provider_job_id' => job.provider_job_id, - 'messaging.active_job.priority' => job.priority - } - - otel_attributes['net.transport'] = 'inproc' if test_adapters.include?(job.class.queue_adapter_name) - otel_attributes.compact! - - otel_attributes - end - attach_to :active_job end end