Skip to content

Commit

Permalink
refactor: Extract Module
Browse files Browse the repository at this point in the history
  • Loading branch information
arielvalentin authored Sep 24, 2023
1 parent 2e532c6 commit 25eed1b
Showing 1 changed file with 39 additions and 31 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# frozen_string_literal: true

require 'active_support/subscriber'
require_relative './version'

module OpenTelemetry
module Instrumentation
Expand Down Expand Up @@ -50,40 +51,67 @@ module OpenTelemetry
module Instrumentation
module ActiveJob

module AttributeProcessor
def to_otel_semconv_attributes(job)
test_adapters = %w[async inline]

otel_attributes = {
'code.namespace' => job.class.name,
'messaging.destination_kind' => 'queue',
'messaging.system' => job.class.queue_adapter_name,
'messaging.destination' => job.queue_name,
'messaging.message_id' => job.job_id,
'messaging.active_job.provider_job_id' => job.provider_job_id,
'messaging.active_job.priority' => job.priority
}

otel_attributes['net.transport'] = 'inproc' if test_adapters.include?(job.class.queue_adapter_name)
otel_attributes.compact!

otel_attributes
end
end

class DefaultHandler
include AttributeProcessor

def initialize(tracer)
@tracer = tracer
end

def on_start(name, _id, payload, attribute_processor)
span = @tracer.start_span(name, attributes: attribute_processor.as_otel_semconv_attrs(payload.fetch(:job)))
def on_start(name, _id, payload)
span = @tracer.start_span(name, attributes: to_otel_semconv_attributes(payload.fetch(:job)))
tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))]
OpenTelemetry.propagation.inject(payload.fetch(:job).__otel_headers) # This must be transmitted over the wire
{ span: span, ctx_tokens: tokens }
end
end

class EnqueueHandler
include AttributeProcessor

def initialize(tracer)
@tracer = tracer
end

def on_start(name, _id, payload, attribute_processor)
def on_start(name, _id, payload)
span = @tracer.start_span("#{payload.fetch(:job).queue_name} publish",
kind: :producer,
attributes: subscriber.as_otel_semconv_attrs(payload.fetch(:job)))
attributes: to_otel_semconv_attributes(payload.fetch(:job)))
tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))]
OpenTelemetry.propagation.inject(payload.fetch(:job).__otel_headers) # This must be transmitted over the wire
{ span: span, ctx_tokens: tokens }
end
end

class PerformHandler
include AttributeProcessor

def initialize(tracer)
@tracer = tracer
end

def on_start(name, _id, payload, attribute_processor)
def on_start(name, _id, payload)
tokens = []
parent_context = OpenTelemetry.propagation.extract(payload.fetch(:job).__otel_headers)
span_context = OpenTelemetry::Trace.current_span(parent_context).context
Expand All @@ -96,7 +124,7 @@ def on_start(name, _id, payload, attribute_processor)
span = @tracer.start_span(
"#{payload.fetch(:job).queue_name} process",
kind: :consumer,
attributes: attribute_processor.as_otel_semconv_attrs(payload.fetch(:job)),
attributes: to_otel_semconv_attributes(payload.fetch(:job)),
links: links
)

Expand All @@ -113,11 +141,11 @@ class Subscriber < ::ActiveSupport::Subscriber

def initialize(...)
super
@tracer = OpenTelemetry.tracer_provider.tracer('otel-active_job', '0.0.1')
default_handler = DefaultHandler.new(@tracer)
tracer = OpenTelemetry.tracer_provider.tracer('otel-active_job', ::OpenTelemetry::Instrumentation::ActiveJob::VERSION)
default_handler = DefaultHandler.new(tracer)
@handlers_by_pattern = {
'enqueue.active_job' => EnqueueHandler.new(@tracer),
'perform.active_job' => PerformHandler.new(@tracer),
'enqueue.active_job' => EnqueueHandler.new(tracer),
'perform.active_job' => PerformHandler.new(tracer),
}
@handlers_by_pattern.default = default_handler
end
Expand All @@ -129,7 +157,7 @@ def perform(...);end

def start(name, id, payload)
begin
payload.merge!(__otel: @handlers_by_pattern[name].on_start(name, id, payload, self)) # The payload is _not_ transmitted over the wire
payload.merge!(__otel: @handlers_by_pattern[name].on_start(name, id, payload)) # The payload is _not_ transmitted over the wire
rescue StandardError => error
OpenTelemetry.handle_error(exception: error)
end
Expand Down Expand Up @@ -163,26 +191,6 @@ def finish(_name, _id, payload)
end
end


def as_otel_semconv_attrs(job)
test_adapters = %w[async inline]

otel_attributes = {
'code.namespace' => job.class.name,
'messaging.destination_kind' => 'queue',
'messaging.system' => job.class.queue_adapter_name,
'messaging.destination' => job.queue_name,
'messaging.message_id' => job.job_id,
'messaging.active_job.provider_job_id' => job.provider_job_id,
'messaging.active_job.priority' => job.priority
}

otel_attributes['net.transport'] = 'inproc' if test_adapters.include?(job.class.queue_adapter_name)
otel_attributes.compact!

otel_attributes
end

attach_to :active_job
end
end
Expand Down

0 comments on commit 25eed1b

Please sign in to comment.