Skip to content

Commit

Permalink
add some docs
Browse files Browse the repository at this point in the history
  • Loading branch information
arielvalentin committed Oct 9, 2023
1 parent 2134016 commit 4f46f12
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,29 @@
module OpenTelemetry
module Instrumentation
module ActiveJob
# Custom subscriber that handles ActiveJob notifications
# Module that contains custom event handlers, which are used to generate spans per event
module Handlers
module_function

# Subscribes Event Handlers to relevant ActiveJob notifications
#
# The following events are recorded as spans:
# - enqueue
# - enqueue_at
# - enqueue_retry
# - perform
# - retry_stopped
# - discard
#
# Ingress and Egress spans (perform, enqueue, enqueue_at) use Messaging semantic conventions for naming the span,
# while internal spans keep their ActiveSupport event name.
#
# @note this method is not thread safe and should not be used in a multi-threaded context
# @note Why no perform_start?
# This event causes much heartache as it is the first in a series of events that is triggered.
# It should not be the ingress span because it does not measure anything.
# https://github.com/rails/rails/blob/v6.1.7.6/activejob/lib/active_job/instrumentation.rb#L14
# https://github.com/rails/rails/blob/v7.0.8/activejob/lib/active_job/instrumentation.rb#L19
def install
return unless Array(@subscriptions).empty?

Expand All @@ -26,11 +45,6 @@ def install
enqueue_handler = Handlers::Enqueue.new(tracer, mapper)
perform_handler = Handlers::Perform.new(tracer, mapper)

# Why no perform_start?
# This event causes much heartache as it is the first in a series of events that is triggered.
# It should not be the ingress span because it does not measure anything.
# https://github.com/rails/rails/blob/v6.1.7.6/activejob/lib/active_job/instrumentation.rb#L14
# https://github.com/rails/rails/blob/v7.0.8/activejob/lib/active_job/instrumentation.rb#L19
handlers_by_pattern = {
'enqueue' => enqueue_handler,
'enqueue_at' => enqueue_handler,
Expand All @@ -45,6 +59,8 @@ def install
end
end

# Removes Event Handler Subscriptions for ActiveJob notifications
# @note this method is not thread safe and sholud not be used in a multi-threaded context
def uninstall
@subscriptions&.each { |subscriber| ActiveSupport::Notifications.unsubscribe(subscriber) }
@subscriptions = nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,45 @@ module Instrumentation
module ActiveJob
module Handlers
# Default handler to creates internal spans for events
# This class provides default template methods that derived classes may override to generate spans and register contexts.
class Default
def initialize(tracer, mapper)
@tracer = tracer
@mapper = mapper
end

# Invoked by ActiveSupport::Notifications at the start of the instrumentation block
# It amends the otel context of a Span and Context tokens to the payload
#
# @param name [String] of the Event
# @param id [String] of the event
# @param payload [Hash] containing job run information
# @return [Hash] the payload passed as a method argument
def start(name, id, payload)
payload.merge!(__otel: on_start(name, id, payload))
rescue StandardError => e
OpenTelemetry.handle_error(exception: e)
end

def on_start(name, _id, payload)
# Creates a span and registers it with the current context
#
# @param name [String] of the Event
# @param id [String] of the event
# @param payload [Hash] containing job run information
# @return [Hash] with the span and generated context tokens
def start_span(name, _id, payload)
span = @tracer.start_span(name, attributes: @mapper.call(payload))
tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))]

{ span: span, ctx_tokens: tokens }
end

# Creates a span and registers it with the current context
#
# @param _name [String] of the Event (unused)
# @param _id [String] of the event (unused)
# @param payload [Hash] containing job run information
# @return [Hash] with the span and generated context tokens
def finish(_name, _id, payload)
otel = payload.delete(:__otel)
span = otel&.fetch(:span)
Expand All @@ -41,6 +61,9 @@ def finish(_name, _id, payload)
finish_span(span, tokens)
end

# Finishes the provided spans and also detaches the associated contexts
# @param span [OpenTelemetry::Trace::Span]
# @param tokens [Array] to unregister
def finish_span(span, tokens)
# closes the span after all attributes have been finalized
begin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ module ActiveJob
module Handlers
# Handles enqueue.active_job
class Enqueue < Default
def on_start(name, _id, payload)
def start_span(name, _id, payload)
otel_config = ActiveJob::Instrumentation.instance.config
span_name = "#{otel_config[:span_naming] == :job_class ? payload.fetch(:job).class.name : payload.fetch(:job).queue_name} publish"
span = @tracer.start_span(span_name, kind: :producer, attributes: @mapper.call(payload))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ module ActiveJob
module Handlers
# Handles perform.active_job
class Perform < Default
def on_start(name, _id, payload)
def start_span(name, _id, payload)
tokens = []
parent_context = OpenTelemetry.propagation.extract(payload.fetch(:job).__otel_headers)

Expand Down

0 comments on commit 4f46f12

Please sign in to comment.