Skip to content

Commit

Permalink
refactor... maybe a little overengineering
Browse files Browse the repository at this point in the history
  • Loading branch information
arielvalentin committed Oct 11, 2023
1 parent f20551a commit 4deba54
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 28 deletions.
8 changes: 0 additions & 8 deletions instrumentation/active_job/example/Gemfile

This file was deleted.

93 changes: 88 additions & 5 deletions instrumentation/active_job/example/active_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,70 @@
#
# SPDX-License-Identifier: Apache-2.0

ENV['OTEL_SERVICE_NAME'] ||= 'otel-active-job-demo'
require 'rubygems'
require 'bundler/setup'
require 'active_job'
require 'bundler/inline'

Bundler.require
gemfile do
source 'https://rubygems.org'
gem 'activejob', '~> 7.0.0', require: 'active_job'
gem 'opentelemetry-instrumentation-active_job', path: '../'
gem 'opentelemetry-sdk'
gem 'opentelemetry-exporter-otlp'
end

ENV['OTEL_LOG_LEVEL'] ||= 'fatal'
ENV['OTEL_TRACES_EXPORTER'] ||= 'console'
OpenTelemetry::SDK.configure do |c|
c.use 'OpenTelemetry::Instrumentation::ActiveJob'
at_exit { OpenTelemetry.tracer_provider.shutdown }
end

class FailingJob < ::ActiveJob::Base
queue_as :demo
def perform
raise 'this job failed'
end
end

class FailingRetryJob < ::ActiveJob::Base
queue_as :demo

retry_on StandardError, attempts: 2, wait: 0
def perform
raise 'this job failed'
end
end

class RetryJob < ::ActiveJob::Base
queue_as :demo

retry_on StandardError, attempts: 3, wait: 0
def perform
if executions < 3
raise 'this job failed'
else
puts <<~EOS
--------------------------------------------------
Done Retrying!
--------------------------------------------------
EOS
end
end
end

class DiscardJob < ::ActiveJob::Base
queue_as :demo

class DiscardError < StandardError; end

discard_on DiscardError

def perform
raise DiscardError, 'this job failed'
end
end

class TestJob < ::ActiveJob::Base
Expand All @@ -27,7 +82,35 @@ def perform
end
end

class DoItNowJob < ::ActiveJob::Base
def perform
$stderr.puts <<~EOS
--------------------------------------------------
Called with perform_now!
--------------------------------------------------
EOS
end
end

class BatchJob < ::ActiveJob::Base
def perform
TestJob.perform_later
FailingJob.perform_later
FailingRetryJob.perform_later
RetryJob.perform_later
DiscardJob.perform_later
end
end

::ActiveJob::Base.queue_adapter = :async

TestJob.perform_later
sleep 0.1 # To ensure we see both spans!
tracer = OpenTelemetry.tracer_provider.tracer('example', '0.1.0')

tracer.in_span('run-jobs') do
DoItNowJob.perform_now
BatchJob.perform_later
end

sleep 5 # allow the job to complete
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ module Handlers
def subscribe
return unless Array(@subscriptions).empty?

tracer = Instrumentation.instance.tracer
tracer = OpenTelemetry.tracer_provider.tracer(ActiveJob.name, ActiveJob::VERSION)
mapper = Mappers::Attribute.new
config = ActiveJob::Instrumentation.instance.config
parent_span_provider = OpenTelemetry::Instrumentation::ActiveJob
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,10 @@ def finish(_name, _id, payload)
def finish_span(span, tokens)
# closes the span after all attributes have been finalized
begin
span&.status = OpenTelemetry::Trace::Status.ok if span&.status&.code == OpenTelemetry::Trace::Status::UNSET
span&.finish
if span&.recording?
span&.status = OpenTelemetry::Trace::Status.ok if span&.status&.code == OpenTelemetry::Trace::Status::UNSET
span&.finish
end
rescue StandardError => e
OpenTelemetry.handle_error(exception: e)
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ module ActiveJob
module Handlers
# Handles `enqueue.active_job` and `enqueue_at.active_job` to generate egress spans
class Enqueue < Default
def initialize(...)
super
@span_name_formatter = if @config[:span_naming] == :job_class
->(job) { "#{job.class.name} publish" }
else
->(job) { "#{job.queue_name} publish" }
end
end

# Overrides the `Default#start_span` method to create an egress span
# and registers it with the current context
#
Expand All @@ -19,16 +28,11 @@ class Enqueue < Default
# @return [Hash] with the span and generated context tokens
def start_span(name, _id, payload)
job = payload.fetch(:job)
span = @tracer.start_span(span_name_from(job), kind: :producer, attributes: @mapper.call(payload))
span = @tracer.start_span(@span_name_formatter.call(job), kind: :producer, attributes: @mapper.call(payload))
tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))]
OpenTelemetry.propagation.inject(job.__otel_headers) # This must be transmitted over the wire
{ span: span, ctx_tokens: tokens }
end

# TODO: extract strategy
def span_name_from(job)
"#{@config[:span_naming] == :job_class ? job.class.name : job.queue_name} publish"
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ module ActiveJob
module Handlers
# Handles perform.active_job to geenrate ingress spans
class Perform < Default
def initialize(...)
super
@span_name_formatter = if @config[:span_naming] == :job_class
->(job) { "#{job.class.name} process" }
else
->(job) { "#{job.queue_name} process" }
end
end

# Overrides the `Default#start_span` method to create an ingress span
# and registers it with the current context
#
Expand All @@ -22,7 +31,7 @@ def start_span(name, _id, payload)
job = payload.fetch(:job)
parent_context = OpenTelemetry.propagation.extract(job.__otel_headers)

span_name = span_name_from(job)
span_name = @span_name_formatter.call(job)

# TODO: Refactor into a propagation strategy
propagation_style = @config[:propagation_style]
Expand Down Expand Up @@ -52,11 +61,6 @@ def attach_consumer_context(span)

[consumer_context, internal_context].map { |context| OpenTelemetry::Context.attach(context) }
end

# TODO: refactor into a strategy
def span_name_from(job)
"#{@config[:span_naming] == :job_class ? job.class.name : job.queue_name} process"
end
end
end
end
Expand Down

0 comments on commit 4deba54

Please sign in to comment.