Skip to content

Commit

Permalink
refactor: Use template methods for children
Browse files Browse the repository at this point in the history
  • Loading branch information
arielvalentin committed Oct 8, 2023
1 parent 86d6d0e commit eba4ba0
Showing 1 changed file with 34 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
module OpenTelemetry
module Instrumentation
module ActiveJob
# Provides helper methods
# Maps ActiveJob Attributes to Semantic Conventions
#
# Some of the more promenant attributes will come from
#
class AttributeMapper
TEST_ADAPTERS = %w[async inline].freeze
Expand Down Expand Up @@ -39,12 +41,16 @@ def initialize(tracer, mapper)
@mapper = mapper
end

def start(name, _id, payload)
def start(name, id, payload)
payload.merge!(__otel: on_start(name, id, payload))
end

def on_start(name, _id, payload)
span = @tracer.start_span(name, attributes: @mapper.call(payload))
tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))]
OpenTelemetry.propagation.inject(payload.fetch(:job).__otel_headers) # This must be transmitted over the wire

payload.merge!(__otel: { span: span, ctx_tokens: tokens })
{ span: span, ctx_tokens: tokens }
end

def finish(_name, _id, payload)
Expand Down Expand Up @@ -83,19 +89,19 @@ def finish(_name, _id, payload)

# Handles enqueue.active_job
class EnqueueHandler < DefaultHandler
def start(name, _id, payload)
def on_start(name, _id, payload)
otel_config = ActiveJob::Instrumentation.instance.config
span_name = "#{otel_config[:span_naming] == :job_class ? payload.fetch(:job).class.name : payload.fetch(:job).queue_name} publish"
span = @tracer.start_span(span_name, kind: :producer, attributes: @mapper.call(payload))
tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))]
OpenTelemetry.propagation.inject(payload.fetch(:job).__otel_headers) # This must be transmitted over the wire
payload.merge!(__otel: { span: span, ctx_tokens: tokens })
{ span: span, ctx_tokens: tokens }
end
end

# Handles perform.active_job
class PerformHandler < DefaultHandler
def start(name, _id, payload)
def on_start(name, _id, payload)
tokens = []
parent_context = OpenTelemetry.propagation.extract(payload.fetch(:job).__otel_headers)
span_context = OpenTelemetry::Trace.current_span(parent_context).context
Expand All @@ -117,68 +123,44 @@ def start(name, _id, payload)

tokens.concat([consumer_context, aj_context].map { |context| OpenTelemetry::Context.attach(context) })

payload.merge!(__otel: { span: span, ctx_tokens: tokens })
{ span: span, ctx_tokens: tokens }
end
end

# Custom subscriber that handles ActiveJob notifications
class Subscriber < ::ActiveSupport::Subscriber
attr_reader :tracer
class Subscriber
def self.install
return unless Array(@subscriptions).empty?

def initialize(...)
super
tracer = Instrumentation.instance.tracer
mapper = AttributeMapper.new

default_handler = DefaultHandler.new(tracer, mapper)
enqueue_handler = EnqueueHandler.new(tracer, mapper)

@handlers_by_pattern = {
'enqueue.active_job' => enqueue_handler,
'enqueue_at.active_job' => enqueue_handler,
'perform.active_job' => PerformHandler.new(tracer, mapper)
perform_handler = PerformHandler.new(tracer, mapper)

# Why no perform_start?
# This event causes much heartache as it is the first in a series of events that is triggered.
# It should not be the ingress span because it does not measure anything.
# https://github.com/rails/rails/blob/v6.1.7.6/activejob/lib/active_job/instrumentation.rb#L14
# https://github.com/rails/rails/blob/v7.0.8/activejob/lib/active_job/instrumentation.rb#L19
handlers_by_pattern = {
'enqueue' => enqueue_handler,
'enqueue_at' => enqueue_handler,
'enqueue_retry' => default_handler,
'perform' => perform_handler,
'retry_stopped' => default_handler,
'discard' => default_handler
}
@handlers_by_pattern.default = default_handler
@call_super if ::ActiveJob.version < Gem::Version.new('7.1')
end

# The methods below are the events the Subscriber is interested in.
def enqueue_at(...); end
def enqueue(...); end
def enqueue_retry(...); end
# This event causes much heartache as it is the first in a series of events that is triggered.
# It should not be the ingress span because it does not measure anything.
# https://github.com/rails/rails/blob/v6.1.7.6/activejob/lib/active_job/instrumentation.rb#L14
# https://github.com/rails/rails/blob/v7.0.8/activejob/lib/active_job/instrumentation.rb#L19
# def perform_start(...); end
def perform(...); end
def retry_stopped(...); end
# def discard(...); end

def start(name, id, payload)
@handlers_by_pattern[name].start(name, id, payload)
# This is nuts
super if @call_super
end

def finish(name, id, payload)
@handlers_by_pattern[name].finish(name, id, payload)
# This is equally nuts
super if @call_super
end

def self.install
attach_to :active_job
tracer = Instrumentation.instance.tracer
mapper = AttributeMapper.new
default_handler = DefaultHandler.new(tracer, mapper)
@subscriptions = %w[discard.active_job].map do |key|
ActiveSupport::Notifications.subscribe(key, default_handler)
@subscriptions = handlers_by_pattern.map do |key, handler|
ActiveSupport::Notifications.subscribe("#{key}.active_job", handler)
end
end

def self.uninstall
detach_from :active_job
@subscriptions&.each { |subscriber| ActiveSupport::Notifications.unsubscribe(subscriber) }
@subscriptions = nil
end
end
end
Expand Down

0 comments on commit eba4ba0

Please sign in to comment.