diff --git a/instrumentation/active_job/example/Gemfile b/instrumentation/active_job/example/Gemfile deleted file mode 100644 index 1038266125..0000000000 --- a/instrumentation/active_job/example/Gemfile +++ /dev/null @@ -1,8 +0,0 @@ -# frozen_string_literal: true - -source 'https://rubygems.org' - -gem 'activejob' -gem 'opentelemetry-api' -gem 'opentelemetry-instrumentation-active_job' -gem 'opentelemetry-sdk' diff --git a/instrumentation/active_job/example/active_job.rb b/instrumentation/active_job/example/active_job.rb index 808a17b44e..d7f13e7fdb 100644 --- a/instrumentation/active_job/example/active_job.rb +++ b/instrumentation/active_job/example/active_job.rb @@ -4,15 +4,70 @@ # # SPDX-License-Identifier: Apache-2.0 +ENV['OTEL_SERVICE_NAME'] ||= 'otel-active-job-demo' require 'rubygems' -require 'bundler/setup' -require 'active_job' +require 'bundler/inline' -Bundler.require +gemfile do + source 'https://rubygems.org' + gem 'activejob', '~> 7.0.0', require: 'active_job' + gem 'opentelemetry-instrumentation-active_job', path: '../' + gem 'opentelemetry-sdk' + gem 'opentelemetry-exporter-otlp' +end +ENV['OTEL_LOG_LEVEL'] ||= 'fatal' ENV['OTEL_TRACES_EXPORTER'] ||= 'console' OpenTelemetry::SDK.configure do |c| c.use 'OpenTelemetry::Instrumentation::ActiveJob' + at_exit { OpenTelemetry.tracer_provider.shutdown } +end + +class FailingJob < ::ActiveJob::Base + queue_as :demo + def perform + raise 'this job failed' + end +end + +class FailingRetryJob < ::ActiveJob::Base + queue_as :demo + + retry_on StandardError, attempts: 2, wait: 0 + def perform + raise 'this job failed' + end +end + +class RetryJob < ::ActiveJob::Base + queue_as :demo + + retry_on StandardError, attempts: 3, wait: 0 + def perform + if executions < 3 + raise 'this job failed' + else + puts <<~EOS + + -------------------------------------------------- + Done Retrying! + -------------------------------------------------- + + EOS + end + end +end + +class DiscardJob < ::ActiveJob::Base + queue_as :demo + + class DiscardError < StandardError; end + + discard_on DiscardError + + def perform + raise DiscardError, 'this job failed' + end end class TestJob < ::ActiveJob::Base @@ -27,7 +82,35 @@ def perform end end +class DoItNowJob < ::ActiveJob::Base + def perform + $stderr.puts <<~EOS + + -------------------------------------------------- + Called with perform_now! + -------------------------------------------------- + + EOS + end +end + +class BatchJob < ::ActiveJob::Base + def perform + TestJob.perform_later + FailingJob.perform_later + FailingRetryJob.perform_later + RetryJob.perform_later + DiscardJob.perform_later + end +end + ::ActiveJob::Base.queue_adapter = :async -TestJob.perform_later -sleep 0.1 # To ensure we see both spans! +tracer = OpenTelemetry.tracer_provider.tracer('example', '0.1.0') + +tracer.in_span('run-jobs') do + DoItNowJob.perform_now + BatchJob.perform_later +end + +sleep 5 # allow the job to complete diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb index 215fca970b..b174028d7d 100644 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb @@ -38,7 +38,7 @@ module Handlers def subscribe return unless Array(@subscriptions).empty? - tracer = Instrumentation.instance.tracer + tracer = OpenTelemetry.tracer_provider.tracer(ActiveJob.name, ActiveJob::VERSION) mapper = Mappers::Attribute.new config = ActiveJob::Instrumentation.instance.config parent_span_provider = OpenTelemetry::Instrumentation::ActiveJob diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/default.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/default.rb index 76cb54ed17..b07ea1c2bd 100644 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/default.rb +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/default.rb @@ -74,8 +74,10 @@ def finish(_name, _id, payload) def finish_span(span, tokens) # closes the span after all attributes have been finalized begin - span&.status = OpenTelemetry::Trace::Status.ok if span&.status&.code == OpenTelemetry::Trace::Status::UNSET - span&.finish + if span&.recording? + span&.status = OpenTelemetry::Trace::Status.ok if span&.status&.code == OpenTelemetry::Trace::Status::UNSET + span&.finish + end rescue StandardError => e OpenTelemetry.handle_error(exception: e) end diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/enqueue.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/enqueue.rb index 7d8fb8281a..1a8e413bf9 100644 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/enqueue.rb +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/enqueue.rb @@ -10,6 +10,15 @@ module ActiveJob module Handlers # Handles `enqueue.active_job` and `enqueue_at.active_job` to generate egress spans class Enqueue < Default + def initialize(...) + super + @span_name_formatter = if @config[:span_naming] == :job_class + ->(job) { "#{job.class.name} publish" } + else + ->(job) { "#{job.queue_name} publish" } + end + end + # Overrides the `Default#start_span` method to create an egress span # and registers it with the current context # @@ -19,16 +28,11 @@ class Enqueue < Default # @return [Hash] with the span and generated context tokens def start_span(name, _id, payload) job = payload.fetch(:job) - span = @tracer.start_span(span_name_from(job), kind: :producer, attributes: @mapper.call(payload)) + span = @tracer.start_span(@span_name_formatter.call(job), kind: :producer, attributes: @mapper.call(payload)) tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))] OpenTelemetry.propagation.inject(job.__otel_headers) # This must be transmitted over the wire { span: span, ctx_tokens: tokens } end - - # TODO: extract strategy - def span_name_from(job) - "#{@config[:span_naming] == :job_class ? job.class.name : job.queue_name} publish" - end end end end diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/perform.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/perform.rb index b32a7b9980..c777486616 100644 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/perform.rb +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/perform.rb @@ -10,6 +10,15 @@ module ActiveJob module Handlers # Handles perform.active_job to geenrate ingress spans class Perform < Default + def initialize(...) + super + @span_name_formatter = if @config[:span_naming] == :job_class + ->(job) { "#{job.class.name} process" } + else + ->(job) { "#{job.queue_name} process" } + end + end + # Overrides the `Default#start_span` method to create an ingress span # and registers it with the current context # @@ -22,7 +31,7 @@ def start_span(name, _id, payload) job = payload.fetch(:job) parent_context = OpenTelemetry.propagation.extract(job.__otel_headers) - span_name = span_name_from(job) + span_name = @span_name_formatter.call(job) # TODO: Refactor into a propagation strategy propagation_style = @config[:propagation_style] @@ -52,11 +61,6 @@ def attach_consumer_context(span) [consumer_context, internal_context].map { |context| OpenTelemetry::Context.attach(context) } end - - # TODO: refactor into a strategy - def span_name_from(job) - "#{@config[:span_naming] == :job_class ? job.class.name : job.queue_name} process" - end end end end