Skip to content

Commit

Permalink
refaactor: rename method
Browse files Browse the repository at this point in the history
  • Loading branch information
arielvalentin authored Sep 24, 2023
1 parent 1d09955 commit e7344ae
Showing 1 changed file with 29 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,27 @@ def deserialize(job_data)
module OpenTelemetry
module Instrumentation
module ActiveJob

class EnqueueSubscriber
def initialize(tracer)
@tracer = tracer
end

def on_start(name, _id, payload, subscriber)
span = subscriber.tracer.start_span("#{payload.fetch(:job).queue_name} publish",
span = @tracer.start_span("#{payload.fetch(:job).queue_name} publish",
kind: :producer,
attributes: subscriber.job_attributes(payload.fetch(:job)))
attributes: subscriber.as_otel_semconv_attrs(payload.fetch(:job)))
tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))]
OpenTelemetry.propagation.inject(payload.fetch(:job).__otel_headers) # This must be transmitted over the wire
{ span: span, ctx_tokens: tokens }
end
end

class PerformSubscriber
def initialize(tracer)
@tracer = tracer
end

def on_start(name, _id, payload, subscriber)
tokens = []
parent_context = OpenTelemetry.propagation.extract(payload.fetch(:job).__otel_headers)
Expand All @@ -71,10 +80,10 @@ def on_start(name, _id, payload, subscriber)
links = [OpenTelemetry::Trace::Link.new(span_context)]
end

span = subscriber.tracer.start_span(
span = @tracer.start_span(
"#{payload.fetch(:job).queue_name} process",
kind: :consumer,
attributes: subscriber.job_attributes(payload.fetch(:job)),
attributes: subscriber.as_otel_semconv_attrs(payload.fetch(:job)),
links: links
)

Expand All @@ -87,12 +96,19 @@ def on_start(name, _id, payload, subscriber)
end

class Subscriber < ::ActiveSupport::Subscriber
TEST_ADAPTERS = %w[async inline]
EVENT_HANDLERS = {
'enqueue.active_job' => EnqueueSubscriber.new,
'perform.active_job' => PerformSubscriber.new,
'enqueue.active_job' => EnqueueSubscriber.new(OpenTelemetry.tracer_provider.tracer('otel-active_job', '0.0.1')),
'perform.active_job' => PerformSubscriber.new(OpenTelemetry.tracer_provider.tracer('otel-active_job', '0.0.1')),
}

def initialize(...)
super
@handlers_by_pattern = {
'enqueue.active_job' => EnqueueSubscriber.new(OpenTelemetry.tracer_provider.tracer('otel-active_job', '0.0.1')),
'perform.active_job' => PerformSubscriber.new(OpenTelemetry.tracer_provider.tracer('otel-active_job', '0.0.1')),
}
end

attach_to :active_job

# The methods below are the events the Subscriber is interested in.
Expand All @@ -101,7 +117,7 @@ def perform(...);end

def start(name, id, payload)
begin
payload.merge!(__otel: EVENT_HANDLERS.fetch(name).on_start(name, id, payload, self)) # The payload is _not_ transmitted over the wire
payload.merge!(__otel: @handlers_by_pattern.fetch(name).on_start(name, id, payload, self)) # The payload is _not_ transmitted over the wire
rescue StandardError => error
OpenTelemetry.handle_error(exception: error)
end
Expand Down Expand Up @@ -136,13 +152,15 @@ def finish(_name, _id, payload)
end

def on_start(name, _id, payload)
span = tracer.start_span(name, attributes: job_attributes(payload.fetch(:job)))
span = tracer.start_span(name, attributes: as_otel_semconv_attrs(payload.fetch(:job)))
tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))]
OpenTelemetry.propagation.inject(payload.fetch(:job).__otel_headers) # This must be transmitted over the wire
{ span: span, ctx_tokens: tokens }
end

def job_attributes(job)
def as_otel_semconv_attrs(job)
test_adapters = %w[async inline]

otel_attributes = {
'code.namespace' => job.class.name,
'messaging.destination_kind' => 'queue',
Expand All @@ -153,7 +171,7 @@ def job_attributes(job)
'messaging.active_job.priority' => job.priority
}

otel_attributes['net.transport'] = 'inproc' if TEST_ADAPTERS.include?(job.class.queue_adapter_name)
otel_attributes['net.transport'] = 'inproc' if test_adapters.include?(job.class.queue_adapter_name)
otel_attributes.compact!

otel_attributes
Expand Down

0 comments on commit e7344ae

Please sign in to comment.