From fb3194560df5fdff0f7ca5c443fe0888904c87f5 Mon Sep 17 00:00:00 2001 From: Ariel Valentin Date: Tue, 21 Nov 2023 21:01:40 -0600 Subject: [PATCH] feat!(active_job): Use ActiveSupport instead of patches (#677) * feat!(active_job): Use ActiveSupport instead of patches * refactor: Register discard job * refactor: independent registration * refactor: Use template methods for children * refactor: some more small fixes * refactor: a little clean up * refactor: a little more clarity * refactor: try to make sense of a few things * refactor: fix test directory structure * refactor: extract to files * add some docs * refactor: clean up and renames * refactor: split tests out per handler * refactor... maybe a little overengineering * fix: add examples of in-line instrumentation * Update README.md Co-authored-by: Kayla Reopelle (she/her) <87386821+kaylareopelle@users.noreply.github.com> * Update README.md Co-authored-by: Kayla Reopelle (she/her) <87386821+kaylareopelle@users.noreply.github.com> * Update default.rb Co-authored-by: Kayla Reopelle (she/her) <87386821+kaylareopelle@users.noreply.github.com> * Update perform.rb Co-authored-by: Kayla Reopelle (she/her) <87386821+kaylareopelle@users.noreply.github.com> * squash: Use Singleton Tracer * fix: Use top level namespaces When used in conjunction with the `OpenTelemetry::Instrumenation::ActiveSupport`, the classloader would mistakenly use the wrong namespace and raises a `NameError`. This change updates references to ensure we use top level namespaces to load the appropriate classes. * squash: Update instrumentation/active_job/README.md Co-authored-by: Robb Kidd * squash: Update docs Co-authored-by: Kayla Reopelle (she/her) <87386821+kaylareopelle@users.noreply.github.com> * squash: Reduce Local Variable Allocation Co-authored-by: Robb Kidd * squash: Safe navigation no longer required * squash: fix bad commit * squash: Remove possible PII from status message * squash: PR Feedback * squash: Use up to date semconv and remove unused attrs * squash: remove unused file * squash: fix test * squash: fix messaging id * squash: More semconv fixes * squash: fix ids * squash: feedback from Ruby SIG --------- Co-authored-by: Kayla Reopelle (she/her) <87386821+kaylareopelle@users.noreply.github.com> Co-authored-by: Robb Kidd --- instrumentation/active_job/README.md | 58 +++ instrumentation/active_job/example/Gemfile | 8 - .../active_job/example/active_job.rb | 99 +++- .../instrumentation/active_job.rb | 33 ++ .../instrumentation/active_job/handlers.rb | 73 +++ .../active_job/handlers/default.rb | 114 +++++ .../active_job/handlers/enqueue.rb | 40 ++ .../active_job/handlers/perform.rb | 68 +++ .../active_job/instrumentation.rb | 9 +- .../active_job/mappers/attribute.rb | 43 ++ .../patches/active_job_callbacks.rb | 96 ---- .../active_job/patches/base.rb | 23 +- ...lemetry-instrumentation-active_job.gemspec | 1 - .../patches/active_job_callbacks_test.rb | 421 ------------------ .../active_job/handlers/discard_test.rb | 55 +++ .../active_job/handlers/perform_test.rb | 264 +++++++++++ .../active_job/handlers/retry_stopped_test.rb | 57 +++ .../active_job/handlers_test.rb | 53 +++ .../active_job/instrumentation_test.rb | 2 +- .../active_job/mappers/attribute_test.rb | 77 ++++ .../active_job/patches/base_test.rb | 17 +- .../active_job/test/test_helper.rb | 26 +- 22 files changed, 1077 insertions(+), 560 deletions(-) delete mode 100644 instrumentation/active_job/example/Gemfile create mode 100644 instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb create mode 100644 instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/default.rb create mode 100644 instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/enqueue.rb create mode 100644 instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/perform.rb create mode 100644 instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/mappers/attribute.rb delete mode 100644 instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/patches/active_job_callbacks.rb delete mode 100644 instrumentation/active_job/test/instrumentation/active_job/patches/active_job_callbacks_test.rb create mode 100644 instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/discard_test.rb create mode 100644 instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/perform_test.rb create mode 100644 instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/retry_stopped_test.rb create mode 100644 instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers_test.rb rename instrumentation/active_job/test/{ => opentelemetry}/instrumentation/active_job/instrumentation_test.rb (93%) create mode 100644 instrumentation/active_job/test/opentelemetry/instrumentation/active_job/mappers/attribute_test.rb rename instrumentation/active_job/test/{ => opentelemetry}/instrumentation/active_job/patches/base_test.rb (63%) diff --git a/instrumentation/active_job/README.md b/instrumentation/active_job/README.md index 7e0c4ba6d..686247ab9 100644 --- a/instrumentation/active_job/README.md +++ b/instrumentation/active_job/README.md @@ -30,6 +30,64 @@ OpenTelemetry::SDK.configure do |c| end ``` +## Active Support Instrumentation + +Earlier versions of this instrumentation relied on registering custom `around_perform` hooks in order to deal with limitations +in `ActiveSupport::Notifications`, however those patches resulted in error reports and inconsistent behavior when combined with other gems. + +This instrumentation now relies entirely on `ActiveSupport::Notifications` and registers a custom Subscriber that listens to relevant events to report as spans. + +See the table below for details of what [Rails Framework Hook Events](https://guides.rubyonrails.org/active_support_instrumentation.html#active-job) are recorded by this instrumentation: + +| Event Name | Creates Span? | Notes | +| - | - | - | +| `enqueue_at.active_job` | :white_check_mark: | Creates an egress span with kind `producer` | +| `enqueue.active_job` | :white_check_mark: | Creates an egress span with kind `producer` | +| `enqueue_retry.active_job` | :white_check_mark: | Creates an `internal` span | +| `perform_start.active_job` | :x: | This is invoked prior to the appropriate ingress point and is therefore ignored | +| `perform.active_job` | :white_check_mark: | Creates an ingress span with kind `consumer` | +| `retry_stopped.active_job` | :white_check_mark: | Creates and `internal` span with an `exception` event | +| `discard.active_job` | :white_check_mark: | Creates and `internal` span with an `exception` event | + +## Semantic Conventions + +This instrumentation generally uses [Messaging semantic conventions](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/) by treating job enqueuers as `producers` and workers as `consumers`. + +Internal spans are named using the name of the `ActiveSupport` event that was provided. + +Attributes that are specific to this instrumentation are recorded under `messaging.active_job.*`: + +| Attribute Name | Type | Notes | +| - | - | - | +| `code.namespace` | String | `ActiveJob` class name | +| `messaging.system` | String | Static value set to `active_job` | +| `messaging.destination` | String | Set from `ActiveJob#queue_name` | +| `messaging.message.id` | String | Set from `ActiveJob#job_id` | +| `messaging.active_job.adapter.name` | String | The name of the `ActiveJob` adapter implementation | +| `messaging.active_job.message.priority` | String | Present when set by the client from `ActiveJob#priority` | +| `messaging.active_job.message.provider_job_id` | String | Present if the underlying adapter has backend specific message ids | + +## Differences between ActiveJob versions + +### ActiveJob 6.1 + +`perform.active_job` events do not include timings for `ActiveJob` callbacks therefore time spent in `before` and `after` hooks will be missing + +### ActiveJob 7+ + +`perform.active_job` no longer includes exceptions handled using `rescue_from` in the payload. + +In order to preserve this behavior you will have to update the span yourself, e.g. + +```ruby + rescue_from MyCustomError do |e| + # Custom code to handle the error + span = OpenTelemetry::Instrumentation::ActiveJob.current_span + span.record_exception(e) + span.status = OpenTelemetry::Trace::Status.error('Job failed') + end +``` + ## Examples Example usage can be seen in the `./example/active_job.rb` file [here](https://github.com/open-telemetry/opentelemetry-ruby-contrib/blob/main/instrumentation/active_job/example/active_job.rb) diff --git a/instrumentation/active_job/example/Gemfile b/instrumentation/active_job/example/Gemfile deleted file mode 100644 index 103826612..000000000 --- a/instrumentation/active_job/example/Gemfile +++ /dev/null @@ -1,8 +0,0 @@ -# frozen_string_literal: true - -source 'https://rubygems.org' - -gem 'activejob' -gem 'opentelemetry-api' -gem 'opentelemetry-instrumentation-active_job' -gem 'opentelemetry-sdk' diff --git a/instrumentation/active_job/example/active_job.rb b/instrumentation/active_job/example/active_job.rb index 808a17b44..71cd31646 100644 --- a/instrumentation/active_job/example/active_job.rb +++ b/instrumentation/active_job/example/active_job.rb @@ -4,30 +4,115 @@ # # SPDX-License-Identifier: Apache-2.0 +ENV['OTEL_SERVICE_NAME'] ||= 'otel-active-job-demo' require 'rubygems' -require 'bundler/setup' -require 'active_job' +require 'bundler/inline' -Bundler.require +gemfile do + source 'https://rubygems.org' + gem 'activejob', '~> 7.0.0', require: 'active_job' + gem 'opentelemetry-instrumentation-active_job', path: '../' + gem 'opentelemetry-sdk' + gem 'opentelemetry-exporter-otlp' +end +ENV['OTEL_LOG_LEVEL'] ||= 'fatal' ENV['OTEL_TRACES_EXPORTER'] ||= 'console' OpenTelemetry::SDK.configure do |c| c.use 'OpenTelemetry::Instrumentation::ActiveJob' + at_exit { OpenTelemetry.tracer_provider.shutdown } +end + +class FailingJob < ::ActiveJob::Base + queue_as :demo + def perform + raise 'this job failed' + end +end + +class FailingRetryJob < ::ActiveJob::Base + queue_as :demo + + retry_on StandardError, attempts: 2, wait: 0 + def perform + raise 'this job failed' + end end +class RetryJob < ::ActiveJob::Base + queue_as :demo + + retry_on StandardError, attempts: 3, wait: 0 + def perform + if executions < 3 + raise 'this job failed' + else + puts <<~EOS + + -------------------------------------------------- + Done Retrying! + -------------------------------------------------- + + EOS + end + end +end + +class DiscardJob < ::ActiveJob::Base + queue_as :demo + + class DiscardError < StandardError; end + + discard_on DiscardError + + def perform + raise DiscardError, 'this job failed' + end +end + +EXAMPLE_TRACER = OpenTelemetry.tracer_provider.tracer('activejob-example', '1.0') + class TestJob < ::ActiveJob::Base def perform - puts <<~EOS + EXAMPLE_TRACER.in_span("custom span") do + puts <<~EOS + + -------------------------------------------------- + The computer is doing some work, beep beep boop. + -------------------------------------------------- + + EOS + end + end +end + +class DoItNowJob < ::ActiveJob::Base + def perform + $stderr.puts <<~EOS -------------------------------------------------- - The computer is doing some work, beep beep boop. + Called with perform_now! -------------------------------------------------- EOS end end +class BatchJob < ::ActiveJob::Base + def perform + TestJob.perform_later + FailingJob.perform_later + FailingRetryJob.perform_later + RetryJob.perform_later + DiscardJob.perform_later + end +end + ::ActiveJob::Base.queue_adapter = :async -TestJob.perform_later -sleep 0.1 # To ensure we see both spans! +EXAMPLE_TRACER.in_span('run-jobs') do + DoItNowJob.perform_now + BatchJob.perform_later +end + +sleep 5 # allow the job to complete diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job.rb index 259425a9c..b158de4ca 100644 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job.rb +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job.rb @@ -11,6 +11,39 @@ module OpenTelemetry module Instrumentation # Contains the OpenTelemetry instrumentation for the ActiveJob gem module ActiveJob + extend self + + CURRENT_SPAN_KEY = Context.create_key('current-span') + private_constant :CURRENT_SPAN_KEY + + # Returns the current span from the current or provided context + # + # @param [optional Context] context The context to lookup the current + # {Span} from. Defaults to Context.current + def current_span(context = nil) + context ||= Context.current + context.value(CURRENT_SPAN_KEY) || OpenTelemetry::Trace::Span::INVALID + end + + # Returns a context containing the span, derived from the optional parent + # context, or the current context if one was not provided. + # + # @param [optional Context] context The context to use as the parent for + # the returned context + def context_with_span(span, parent_context: Context.current) + parent_context.set_value(CURRENT_SPAN_KEY, span) + end + + # Activates/deactivates the Span within the current Context, which makes the "current span" + # available implicitly. + # + # On exit, the Span that was active before calling this method will be reactivated. + # + # @param [Span] span the span to activate + # @yield [span, context] yields span and a context containing the span to the block. + def with_span(span) + Context.with_value(CURRENT_SPAN_KEY, span) { |c, s| yield s, c } + end end end end diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb new file mode 100644 index 000000000..7eafb9425 --- /dev/null +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb @@ -0,0 +1,73 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +require_relative 'mappers/attribute' +require_relative 'handlers/default' +require_relative 'handlers/enqueue' +require_relative 'handlers/perform' + +module OpenTelemetry + module Instrumentation + module ActiveJob + # Module that contains custom event handlers, which are used to generate spans per event + module Handlers + module_function + + # Subscribes Event Handlers to relevant ActiveJob notifications + # + # The following events are recorded as spans: + # - enqueue + # - enqueue_at + # - enqueue_retry + # - perform + # - retry_stopped + # - discard + # + # Ingress and Egress spans (perform, enqueue, enqueue_at) use Messaging semantic conventions for naming the span, + # while internal spans keep their ActiveSupport event name. + # + # @note this method is not thread safe and should not be used in a multi-threaded context + # @note Why no perform_start? + # This event causes much heartache as it is the first in a series of events that is triggered. + # It should not be the ingress span because it does not measure anything. + # https://github.com/rails/rails/blob/v6.1.7.6/activejob/lib/active_job/instrumentation.rb#L14 + # https://github.com/rails/rails/blob/v7.0.8/activejob/lib/active_job/instrumentation.rb#L19 + def subscribe + return unless Array(@subscriptions).empty? + + mapper = Mappers::Attribute.new + config = ActiveJob::Instrumentation.instance.config + parent_span_provider = OpenTelemetry::Instrumentation::ActiveJob + + # TODO, use delegation instead of inheritance + default_handler = Handlers::Default.new(parent_span_provider, mapper, config) + enqueue_handler = Handlers::Enqueue.new(parent_span_provider, mapper, config) + perform_handler = Handlers::Perform.new(parent_span_provider, mapper, config) + + handlers_by_pattern = { + 'enqueue' => enqueue_handler, + 'enqueue_at' => enqueue_handler, + 'enqueue_retry' => default_handler, + 'perform' => perform_handler, + 'retry_stopped' => default_handler, + 'discard' => default_handler + } + + @subscriptions = handlers_by_pattern.map do |key, handler| + ::ActiveSupport::Notifications.subscribe("#{key}.active_job", handler) + end + end + + # Removes Event Handler Subscriptions for ActiveJob notifications + # @note this method is not thread-safe and should not be used in a multi-threaded context + def unsubscribe + @subscriptions&.each { |subscriber| ActiveSupport::Notifications.unsubscribe(subscriber) } + @subscriptions = nil + end + end + end + end +end diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/default.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/default.rb new file mode 100644 index 000000000..3c6b6aa7d --- /dev/null +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/default.rb @@ -0,0 +1,114 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +module OpenTelemetry + module Instrumentation + module ActiveJob + module Handlers + # Default handler to create internal spans for events + # This class provides default template methods that derived classes may override to generate spans and register contexts. + class Default + # @param parent_span_provider [Object] provides access to the top most parent span (usually the ingress span) + # @param mapper [Callable] converts ActiveSupport::Notifications payloads to span attributes + # @param config [Hash] of instrumentation options + def initialize(parent_span_provider, mapper, config) + @mapper = mapper + @config = config + @parent_span_provider = parent_span_provider + end + + # Invoked by ActiveSupport::Notifications at the start of the instrumentation block + # It amends the otel context of a Span and Context tokens to the payload + # + # @param name [String] of the Event + # @param id [String] of the event + # @param payload [Hash] containing job run information + # @return [Hash] the payload passed as a method argument + def start(name, id, payload) + payload.merge!(__otel: start_span(name, id, payload)) + rescue StandardError => e + OpenTelemetry.handle_error(exception: e) + end + + # Creates a span and registers it with the current context + # + # @param name [String] of the Event + # @param id [String] of the event + # @param payload [Hash] containing job run information + # @return [Hash] with the span and generated context tokens + def start_span(name, _id, payload) + span = tracer.start_span(name, attributes: @mapper.call(payload)) + tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))] + + { span: span, ctx_tokens: tokens } + end + + # Creates a span and registers it with the current context + # + # @param _name [String] of the Event (unused) + # @param _id [String] of the event (unused) + # @param payload [Hash] containing job run information + # @return [Hash] with the span and generated context tokens + def finish(_name, _id, payload) + otel = payload.delete(:__otel) + span = otel&.fetch(:span) + tokens = otel&.fetch(:ctx_tokens) + + on_exception((payload[:error] || payload[:exception_object]), span) + rescue StandardError => e + OpenTelemetry.handle_error(exception: e) + ensure + finish_span(span, tokens) + end + + # Finishes the provided spans and also detaches the associated contexts + # + # @param span [OpenTelemetry::Trace::Span] + # @param tokens [Array] to unregister + def finish_span(span, tokens) + # closes the span after all attributes have been finalized + begin + if span&.recording? + span.status = OpenTelemetry::Trace::Status.ok if span.status.code == OpenTelemetry::Trace::Status::UNSET + span.finish + end + rescue StandardError => e + OpenTelemetry.handle_error(exception: e) + end + + # pops the context stack + tokens&.reverse&.each do |token| + OpenTelemetry::Context.detach(token) + rescue StandardError => e + OpenTelemetry.handle_error(exception: e) + end + end + + # Records exceptions on spans and sets Span statuses to `Error` + # + # Handled exceptions are recorded on internal spans related to the event. E.g. `discard` events are recorded on the `discard.active_job` span + # Handled exceptions _are not_ copied to the ingress span, but it does set the status to `Error` making it easier to know that a job has failed + # Unhandled exceptions bubble up to the ingress span and are recorded there. + # + # @param [Exception] exception to report as a Span Event + # @param [OpenTelemetry::Trace::Span] the currently active span used to record the exception and set the status + def on_exception(exception, span) + return unless exception && span + + span.record_exception(exception) + span.status = + @parent_span_provider.current_span.status = + OpenTelemetry::Trace::Status.error("Unexpected ActiveJob Error #{exception.class.name}") + end + + def tracer + OpenTelemetry::Instrumentation::ActiveJob::Instrumentation.instance.tracer + end + end + end + end + end +end diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/enqueue.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/enqueue.rb new file mode 100644 index 000000000..7dace8a95 --- /dev/null +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/enqueue.rb @@ -0,0 +1,40 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +module OpenTelemetry + module Instrumentation + module ActiveJob + module Handlers + # Handles `enqueue.active_job` and `enqueue_at.active_job` to generate egress spans + class Enqueue < Default + def initialize(...) + super + @span_name_formatter = if @config[:span_naming] == :job_class + ->(job) { "#{job.class.name} publish" } + else + ->(job) { "#{job.queue_name} publish" } + end + end + + # Overrides the `Default#start_span` method to create an egress span + # and registers it with the current context + # + # @param name [String] of the Event + # @param id [String] of the event + # @param payload [Hash] containing job run information + # @return [Hash] with the span and generated context tokens + def start_span(name, _id, payload) + job = payload.fetch(:job) + span = tracer.start_span(@span_name_formatter.call(job), kind: :producer, attributes: @mapper.call(payload)) + tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))] + OpenTelemetry.propagation.inject(job.__otel_headers) # This must be transmitted over the wire + { span: span, ctx_tokens: tokens } + end + end + end + end + end +end diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/perform.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/perform.rb new file mode 100644 index 000000000..6dd7e2e28 --- /dev/null +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/perform.rb @@ -0,0 +1,68 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +module OpenTelemetry + module Instrumentation + module ActiveJob + module Handlers + # Handles perform.active_job to generate ingress spans + class Perform < Default + def initialize(...) + super + @span_name_formatter = if @config[:span_naming] == :job_class + ->(job) { "#{job.class.name} process" } + else + ->(job) { "#{job.queue_name} process" } + end + end + + # Overrides the `Default#start_span` method to create an ingress span + # and registers it with the current context + # + # @param name [String] of the Event + # @param id [String] of the event + # @param payload [Hash] containing job run information + # @return [Hash] with the span and generated context tokens + def start_span(name, _id, payload) + tokens = [] + job = payload.fetch(:job) + parent_context = OpenTelemetry.propagation.extract(job.__otel_headers) + + span_name = @span_name_formatter.call(job) + + # TODO: Refactor into a propagation strategy + propagation_style = @config[:propagation_style] + if propagation_style == :child + tokens << OpenTelemetry::Context.attach(parent_context) + span = tracer.start_span(span_name, kind: :consumer, attributes: @mapper.call(payload)) + else + span_context = OpenTelemetry::Trace.current_span(parent_context).context + links = [OpenTelemetry::Trace::Link.new(span_context)] if span_context.valid? && propagation_style == :link + span = tracer.start_root_span(span_name, kind: :consumer, attributes: @mapper.call(payload), links: links) + end + + tokens.concat(attach_consumer_context(span)) + + { span: span, ctx_tokens: tokens } + end + + # This method attaches a span to multiple contexts: + # 1. Registers the ingress span as the top level ActiveJob span. + # This is used later to enrich the ingress span in children, e.g. setting span status to error when a child event like `discard` terminates due to an error + # 2. Registers the ingress span as the "active" span, which is the default behavior of the SDK. + # @param span [OpenTelemetry::Trace::Span] the currently active span used to record the exception and set the status + # @return [Array] Context tokens that must be detached when finished + def attach_consumer_context(span) + consumer_context = OpenTelemetry::Trace.context_with_span(span) + internal_context = OpenTelemetry::Instrumentation::ActiveJob.context_with_span(span, parent_context: consumer_context) + + [consumer_context, internal_context].map { |context| OpenTelemetry::Context.attach(context) } + end + end + end + end + end +end diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/instrumentation.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/instrumentation.rb index 20f3cac57..e475247f8 100644 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/instrumentation.rb +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/instrumentation.rb @@ -17,7 +17,7 @@ class Instrumentation < OpenTelemetry::Instrumentation::Base end present do - defined?(::ActiveJob) + defined?(::ActiveJob) && defined?(::ActiveSupport) end compatible do @@ -64,12 +64,13 @@ def gem_version def require_dependencies require_relative 'patches/base' - require_relative 'patches/active_job_callbacks' + require_relative 'handlers' end def patch_activejob - ::ActiveJob::Base.prepend(Patches::Base) - ::ActiveJob::Base.prepend(Patches::ActiveJobCallbacks) + ::ActiveJob::Base.prepend(Patches::Base) unless ::ActiveJob::Base.ancestors.include?(Patches::Base) + + Handlers.subscribe end end end diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/mappers/attribute.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/mappers/attribute.rb new file mode 100644 index 000000000..d62405b46 --- /dev/null +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/mappers/attribute.rb @@ -0,0 +1,43 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +module OpenTelemetry + module Instrumentation + module ActiveJob + module Mappers + # Maps ActiveJob Attributes to Semantic Conventions + class Attribute + # Generates a set of attributes to add to a span using + # general and messaging semantic conventions as well as + # using `rails.active_job.*` namespace for custom attributes + # + # @param payload [Hash] of an ActiveSupport::Notifications payload + # @return [Hash] of semantic attributes + def call(payload) + job = payload.fetch(:job) + + otel_attributes = { + 'code.namespace' => job.class.name, + 'messaging.system' => 'active_job', + 'messaging.destination' => job.queue_name, + 'messaging.message.id' => job.job_id, + 'messaging.active_job.adapter.name' => job.class.queue_adapter_name + } + + # Not all adapters generate or provide back end specific ids for messages + otel_attributes['messaging.active_job.message.provider_job_id'] = job.provider_job_id.to_s if job.provider_job_id + # This can be problematic if programs use invalid attribute types like Symbols for priority instead of using Integers. + otel_attributes['messaging.active_job.message.priority'] = job.priority.to_s if job.priority + + otel_attributes.compact! + + otel_attributes + end + end + end + end + end +end diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/patches/active_job_callbacks.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/patches/active_job_callbacks.rb deleted file mode 100644 index 973025352..000000000 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/patches/active_job_callbacks.rb +++ /dev/null @@ -1,96 +0,0 @@ -# frozen_string_literal: true - -# Copyright The OpenTelemetry Authors -# -# SPDX-License-Identifier: Apache-2.0 - -module OpenTelemetry - module Instrumentation - module ActiveJob - module Patches - # Module to prepend to ActiveJob::Base for instrumentation. - module ActiveJobCallbacks - def self.prepended(base) - base.class_eval do - around_enqueue do |job, block| - span_kind = job.class.queue_adapter_name == 'inline' ? :client : :producer - span_name = "#{otel_config[:span_naming] == :job_class ? job.class : job.queue_name} publish" - span_attributes = job_attributes(job) - otel_tracer.in_span(span_name, attributes: span_attributes, kind: span_kind) do - OpenTelemetry.propagation.inject(job.metadata) - block.call - end - end - end - end - - def perform_now - span_kind = self.class.queue_adapter_name == 'inline' ? :server : :consumer - span_name = "#{otel_config[:span_naming] == :job_class ? self.class : queue_name} process" - span_attributes = job_attributes(self).merge('messaging.operation' => 'process', 'code.function' => 'perform_now') - executions_count = (executions || 0) + 1 # because we run before the count is incremented in ActiveJob::Execution - - extracted_context = OpenTelemetry.propagation.extract(metadata) - OpenTelemetry::Context.with_current(extracted_context) do - if otel_config[:propagation_style] == :child - otel_tracer.in_span(span_name, attributes: span_attributes, kind: span_kind) do |span| - span.set_attribute('messaging.active_job.executions', executions_count) - super - end - else - span_links = [] - if otel_config[:propagation_style] == :link - span_context = OpenTelemetry::Trace.current_span(extracted_context).context - span_links << OpenTelemetry::Trace::Link.new(span_context) if span_context.valid? - end - - root_span = otel_tracer.start_root_span(span_name, attributes: span_attributes, links: span_links, kind: span_kind) - OpenTelemetry::Trace.with_span(root_span) do |span| - span.set_attribute('messaging.active_job.executions', executions_count) - super - rescue Exception => e # rubocop:disable Lint/RescueException - span.record_exception(e) - span.status = OpenTelemetry::Trace::Status.error("Unhandled exception of type: #{e.class}") - raise e - ensure - root_span.finish - end - end - end - ensure - # We may be in a job system (eg: resque) that forks and kills worker processes often. - # We don't want to lose spans by not flushing any span processors, so we optionally force it here. - OpenTelemetry.tracer_provider.force_flush if otel_config[:force_flush] - end - - private - - def job_attributes(job) - otel_attributes = { - 'code.namespace' => job.class.name, - 'messaging.destination_kind' => 'queue', - 'messaging.system' => job.class.queue_adapter_name, - 'messaging.destination' => job.queue_name, - 'messaging.message_id' => job.job_id, - 'messaging.active_job.provider_job_id' => job.provider_job_id, - 'messaging.active_job.scheduled_at' => job.scheduled_at&.to_f, - 'messaging.active_job.priority' => job.priority - } - - otel_attributes['net.transport'] = 'inproc' if %w[async inline].include?(job.class.queue_adapter_name) - - otel_attributes.compact - end - - def otel_tracer - ActiveJob::Instrumentation.instance.tracer - end - - def otel_config - ActiveJob::Instrumentation.instance.config - end - end - end - end - end -end diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/patches/base.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/patches/base.rb index 42a3626d6..fc61f34aa 100644 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/patches/base.rb +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/patches/base.rb @@ -12,22 +12,33 @@ module Patches module Base def self.prepended(base) base.class_eval do - attr_accessor :metadata + attr_accessor :__otel_headers end end - def initialize(*args) - @metadata = {} + def initialize(...) + @__otel_headers = {} super end - ruby2_keywords(:initialize) if respond_to?(:ruby2_keywords, true) def serialize - super.merge('metadata' => serialize_arguments(metadata)) + message = super + + begin + message.merge!('__otel_headers' => serialize_arguments(@__otel_headers)) + rescue StandardError => e + OpenTelemetry.handle_error(exception: e) + end + + message end def deserialize(job_data) - self.metadata = deserialize_arguments(job_data['metadata'] || []).to_h + begin + @__otel_headers = deserialize_arguments(job_data.delete('__otel_headers') || []).to_h + rescue StandardError => e + OpenTelemetry.handle_error(exception: e) + end super end end diff --git a/instrumentation/active_job/opentelemetry-instrumentation-active_job.gemspec b/instrumentation/active_job/opentelemetry-instrumentation-active_job.gemspec index 0699b8621..8488ed675 100644 --- a/instrumentation/active_job/opentelemetry-instrumentation-active_job.gemspec +++ b/instrumentation/active_job/opentelemetry-instrumentation-active_job.gemspec @@ -34,7 +34,6 @@ Gem::Specification.new do |spec| spec.add_development_dependency 'minitest', '~> 5.0' spec.add_development_dependency 'opentelemetry-sdk', '~> 1.1' spec.add_development_dependency 'opentelemetry-test-helpers', '~> 0.3' - spec.add_development_dependency 'pry' spec.add_development_dependency 'rake', '~> 13.0' spec.add_development_dependency 'rubocop', '~> 1.56.1' spec.add_development_dependency 'simplecov', '~> 0.17.1' diff --git a/instrumentation/active_job/test/instrumentation/active_job/patches/active_job_callbacks_test.rb b/instrumentation/active_job/test/instrumentation/active_job/patches/active_job_callbacks_test.rb deleted file mode 100644 index c5c59d95e..000000000 --- a/instrumentation/active_job/test/instrumentation/active_job/patches/active_job_callbacks_test.rb +++ /dev/null @@ -1,421 +0,0 @@ -# frozen_string_literal: true - -# Copyright The OpenTelemetry Authors -# -# SPDX-License-Identifier: Apache-2.0 - -require 'test_helper' - -require_relative '../../../../lib/opentelemetry/instrumentation/active_job' - -describe OpenTelemetry::Instrumentation::ActiveJob::Patches::ActiveJobCallbacks do - let(:instrumentation) { OpenTelemetry::Instrumentation::ActiveJob::Instrumentation.instance } - # Technically these are the defaults. But ActiveJob seems to act oddly if you re-install - # the instrumentation over and over again - so we manipulate instance variables to - # reset between tests, and that means we should set the defaults here. - let(:config) { { propagation_style: :link, force_flush: false, span_naming: :queue } } - let(:exporter) { EXPORTER } - let(:spans) { exporter.finished_spans } - let(:publish_span) { spans.find { |s| s.name == 'default publish' } } - let(:process_span) { spans.find { |s| s.name == 'default process' } } - - before do - instrumentation.instance_variable_set(:@config, config) - exporter.reset - - ActiveJob::Base.queue_adapter = :async - ActiveJob::Base.queue_adapter.immediate = true - end - - after do - begin - ActiveJob::Base.queue_adapter.shutdown - rescue StandardError - nil - end - ActiveJob::Base.queue_adapter = :inline - instrumentation.instance_variable_set(:@config, config) - end - - describe 'perform_later' do - it 'traces enqueuing and processing the job' do - TestJob.perform_later - - _(publish_span).wont_be_nil - _(process_span).wont_be_nil - end - end - - describe 'perform_now' do - it 'only traces processing the job' do - TestJob.perform_now - - _(publish_span).must_be_nil - _(process_span).wont_be_nil - _(process_span.attributes['code.namespace']).must_equal('TestJob') - _(process_span.attributes['code.function']).must_equal('perform_now') - end - end - - describe 'compatibility' do - it 'works with positional args' do - _(PositionalOnlyArgsJob.perform_now('arg1')).must_be_nil # Make sure this runs without raising an error - end - - it 'works with keyword args' do - _(KeywordOnlyArgsJob.perform_now(keyword2: :keyword2)).must_be_nil # Make sure this runs without raising an error - end - - it 'works with mixed args' do - _(MixedArgsJob.perform_now('arg1', 'arg2', keyword2: :keyword2)).must_be_nil # Make sure this runs without raising an error - end - end - - describe 'exception handling' do - it 'sets span status to error' do - _ { ExceptionJob.perform_now }.must_raise StandardError, 'This job raises an exception' - _(process_span.status.code).must_equal OpenTelemetry::Trace::Status::ERROR - _(process_span.status.description).must_equal 'Unhandled exception of type: StandardError' - end - - it 'records the exception' do - _ { ExceptionJob.perform_now }.must_raise StandardError, 'This job raises an exception' - _(process_span.events.first.name).must_equal 'exception' - _(process_span.events.first.attributes['exception.type']).must_equal 'StandardError' - _(process_span.events.first.attributes['exception.message']).must_equal 'This job raises an exception' - end - end - - describe 'span kind' do - it 'sets correct span kinds for inline jobs' do - begin - ActiveJob::Base.queue_adapter.shutdown - rescue StandardError - nil - end - ActiveJob::Base.queue_adapter = :inline - - TestJob.perform_later - - _(publish_span.kind).must_equal(:client) - _(process_span.kind).must_equal(:server) - end - - it 'sets correct span kinds for all other jobs' do - TestJob.perform_later - - _(publish_span.kind).must_equal(:producer) - _(process_span.kind).must_equal(:consumer) - end - end - - describe 'attributes' do - it 'sets the messaging.operation attribute only when processing the job' do - TestJob.perform_later - - _(publish_span.attributes['messaging.operation']).must_be_nil - _(process_span.attributes['messaging.operation']).must_equal('process') - end - - describe 'net.transport' do - it 'is sets correctly for inline jobs' do - TestJob.perform_later - - [publish_span, process_span].each do |span| - _(span.attributes['net.transport']).must_equal('inproc') - end - end - - it 'is set correctly for async jobs' do - TestJob.perform_later - - [publish_span, process_span].each do |span| - _(span.attributes['net.transport']).must_equal('inproc') - end - end - end - - describe 'messaging.active_job.priority' do - it 'is unset for unprioritized jobs' do - TestJob.perform_later - - [publish_span, process_span].each do |span| - _(span.attributes['messaging.active_job.priority']).must_be_nil - end - end - - it 'is set for jobs with a priority' do - TestJob.set(priority: 1).perform_later - - [publish_span, process_span].each do |span| - _(span.attributes['messaging.active_job.priority']).must_equal(1) - end - end - end - - describe 'messaging.active_job.scheduled_at' do - it 'is unset for jobs that do not specify a wait time' do - TestJob.perform_later - - [publish_span, process_span].each do |span| - _(span.attributes['messaging.active_job.scheduled_at']).must_be_nil - end - end - - it 'records the scheduled at time for apps running Rails 7.1 and newer' do - skip 'scheduled jobs behave differently in Rails 7.1+' if ActiveJob.version < Gem::Version.new('7.1') - - job = TestJob.set(wait: 0.second).perform_later - - _(publish_span.attributes['messaging.active_job.scheduled_at']).must_equal(job.scheduled_at.to_f) - _(process_span.attributes['messaging.active_job.scheduled_at']).must_equal(job.scheduled_at.to_f) - end - - it 'records the scheduled at time for apps running Rails 7.0 or older' do - skip 'scheduled jobs behave differently in Rails 7.1+' if ActiveJob.version >= Gem::Version.new('7.1') - - job = TestJob.set(wait: 0.second).perform_later - - _(publish_span.attributes['messaging.active_job.scheduled_at']).must_equal(job.scheduled_at.to_f) - _(process_span.attributes['messaging.active_job.scheduled_at']).must_be_nil - end - end - - describe 'messaging.system' do - it 'is set correctly for the inline adapter' do - begin - ActiveJob::Base.queue_adapter.shutdown - rescue StandardError - nil - end - - ActiveJob::Base.queue_adapter = :inline - TestJob.perform_later - - [publish_span, process_span].each do |span| - _(span.attributes['messaging.system']).must_equal('inline') - end - end - - it 'is set correctly for the async adapter' do - TestJob.perform_later - - [publish_span, process_span].each do |span| - _(span.attributes['messaging.system']).must_equal('async') - end - end - end - - describe 'messaging.active_job.executions' do - it 'is 1 for a normal job that does not retry' do - TestJob.perform_now - _(process_span.attributes['messaging.active_job.executions']).must_equal(1) - end - - it 'tracks correctly for jobs that do retry' do - begin - RetryJob.perform_later - rescue StandardError - nil - end - - executions = spans.filter { |s| s.kind == :consumer }.sum { |s| s.attributes['messaging.active_job.executions'] } - _(executions).must_equal(3) # total of 3 runs. The initial and 2 retries. - end - end - - describe 'messaging.active_job.provider_job_id' do - it 'is empty for a job that do not sets provider_job_id' do - TestJob.perform_now - _(process_span.attributes['messaging.active_job.provider_job_id']).must_be_nil - end - - it 'sets the correct value if provider_job_id is provided' do - job = TestJob.perform_later - _(process_span.attributes['messaging.active_job.provider_job_id']).must_equal(job.provider_job_id) - end - end - - it 'generally sets other attributes as expected' do - job = TestJob.perform_later - - [publish_span, process_span].each do |span| - _(span.attributes['code.namespace']).must_equal('TestJob') - _(span.attributes['messaging.destination_kind']).must_equal('queue') - _(span.attributes['messaging.system']).must_equal('async') - _(span.attributes['messaging.message_id']).must_equal(job.job_id) - end - end - end - - describe 'span_naming option' do - describe 'when queue - default' do - it 'names spans according to the job queue' do - TestJob.set(queue: :foo).perform_later - publish_span = exporter.finished_spans.find { |s| s.name == 'foo publish' } - _(publish_span).wont_be_nil - - process_span = exporter.finished_spans.find { |s| s.name == 'foo process' } - _(process_span).wont_be_nil - end - end - - describe 'when job_class' do - let(:config) { { propagation_style: :link, span_naming: :job_class } } - - it 'names span according to the job class' do - TestJob.set(queue: :foo).perform_later - publish_span = exporter.finished_spans.find { |s| s.name == 'TestJob publish' } - _(publish_span).wont_be_nil - - process_span = exporter.finished_spans.find { |s| s.name == 'TestJob process' } - _(process_span).wont_be_nil - end - end - end - - describe 'force_flush option' do - let(:mock_tracer_provider) do - mock_tracer_provider = Minitest::Mock.new - mock_tracer_provider.expect(:force_flush, true) - - mock_tracer_provider - end - - describe 'false - default' do - it 'does not forcibly flush the tracer' do - OpenTelemetry.stub(:tracer_provider, mock_tracer_provider) do - TestJob.perform_later - end - - # We *do not* actually force flush in this case, so we expect the mock - # to fail validation - we will not actually call the mocked force_flush method. - expect { mock_tracer_provider.verify }.must_raise MockExpectationError - end - end - - describe 'true' do - let(:config) { { propagation_style: :link, force_flush: true, span_naming: :job_class } } - it 'does forcibly flush the tracer' do - OpenTelemetry.stub(:tracer_provider, mock_tracer_provider) do - TestJob.perform_later - end - - # Nothing should raise, the mock should be successful, we should have flushed. - mock_tracer_provider.verify - end - end - end - - describe 'propagation_style option' do - describe 'link - default' do - # The inline job adapter executes the job immediately upon enqueuing it - # so we can't actually use that in these tests - the actual Context.current at time - # of execution *will* be the context where the job was enqueued, because rails - # ends up doing job.around_enqueue { job.around_perform { block } } inline. - it 'creates span links in separate traces' do - TestJob.perform_later - - _(publish_span.trace_id).wont_equal(process_span.trace_id) - - _(process_span.total_recorded_links).must_equal(1) - _(process_span.links[0].span_context.trace_id).must_equal(publish_span.trace_id) - _(process_span.links[0].span_context.span_id).must_equal(publish_span.span_id) - end - - it 'propagates baggage' do - ctx = OpenTelemetry::Baggage.set_value('testing_baggage', 'it_worked') - OpenTelemetry::Context.with_current(ctx) do - BaggageJob.perform_later - end - - _(publish_span.trace_id).wont_equal(process_span.trace_id) - - _(process_span.total_recorded_links).must_equal(1) - _(process_span.links[0].span_context.trace_id).must_equal(publish_span.trace_id) - _(process_span.links[0].span_context.span_id).must_equal(publish_span.span_id) - _(process_span.attributes['success']).must_equal(true) - end - end - - describe 'when configured to do parent/child spans' do - let(:config) { { propagation_style: :child, span_naming: :queue } } - - it 'creates a parent/child relationship' do - TestJob.perform_later - - _(process_span.total_recorded_links).must_equal(0) - - _(publish_span.trace_id).must_equal(process_span.trace_id) - _(process_span.parent_span_id).must_equal(publish_span.span_id) - end - - it 'propagates baggage' do - ctx = OpenTelemetry::Baggage.set_value('testing_baggage', 'it_worked') - OpenTelemetry::Context.with_current(ctx) do - BaggageJob.perform_later - end - _(process_span.total_recorded_links).must_equal(0) - - _(publish_span.trace_id).must_equal(process_span.trace_id) - _(process_span.parent_span_id).must_equal(publish_span.span_id) - _(process_span.attributes['success']).must_equal(true) - end - end - - describe 'when explicitly configure for no propagation' do - let(:config) { { propagation_style: :none, span_naming: :queue } } - - it 'skips link creation and does not create parent/child relationship' do - TestJob.perform_later - - _(process_span.total_recorded_links).must_equal(0) - - _(publish_span.trace_id).wont_equal(process_span.trace_id) - _(process_span.parent_span_id).wont_equal(publish_span.span_id) - end - - it 'still propagates baggage' do - ctx = OpenTelemetry::Baggage.set_value('testing_baggage', 'it_worked') - OpenTelemetry::Context.with_current(ctx) do - BaggageJob.perform_later - end - - _(process_span.total_recorded_links).must_equal(0) - - _(publish_span.trace_id).wont_equal(process_span.trace_id) - _(process_span.parent_span_id).wont_equal(publish_span.span_id) - _(process_span.attributes['success']).must_equal(true) - end - end - end - - describe 'active_job callbacks' do - it 'makes the tracing context available in before_perform callbacks' do - CallbacksJob.perform_now - - _(CallbacksJob.context_before).wont_be_nil - _(CallbacksJob.context_before).must_be :valid? - end - - it 'makes the tracing context available in after_perform callbacks' do - CallbacksJob.perform_now - - _(CallbacksJob.context_after).wont_be_nil - _(CallbacksJob.context_after).must_be :valid? - end - end - - describe 'perform.active_job notifications' do - it 'makes the tracing context available in notifications' do - context = nil - callback = proc { context = OpenTelemetry::Trace.current_span.context } - ActiveSupport::Notifications.subscribed(callback, 'perform.active_job') do - TestJob.perform_now - end - - _(context).wont_be_nil - _(context).must_be :valid? - end - end -end diff --git a/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/discard_test.rb b/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/discard_test.rb new file mode 100644 index 000000000..89a32b6c6 --- /dev/null +++ b/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/discard_test.rb @@ -0,0 +1,55 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +require 'test_helper' + +require_relative '../../../../../lib/opentelemetry/instrumentation/active_job' + +describe 'OpenTelemetry::Instrumentation::ActiveJob::Handlers::Discard' do + let(:instrumentation) { OpenTelemetry::Instrumentation::ActiveJob::Instrumentation.instance } + let(:config) { { propagation_style: :link, span_naming: :queue } } + let(:exporter) { EXPORTER } + let(:spans) { exporter.finished_spans } + let(:publish_span) { spans.find { |s| s.name == 'default publish' } } + let(:process_span) { spans.find { |s| s.name == 'default process' } } + let(:discard_span) { spans.find { |s| s.name == 'discard.active_job' } } + + before do + OpenTelemetry::Instrumentation::ActiveJob::Handlers.unsubscribe + instrumentation.instance_variable_set(:@config, config) + instrumentation.instance_variable_set(:@installed, false) + + instrumentation.install(config) + ActiveJob::Base.queue_adapter = :async + ActiveJob::Base.queue_adapter.immediate = true + + exporter.reset + end + + after do + begin + ActiveJob::Base.queue_adapter.shutdown + rescue StandardError + nil + end + ActiveJob::Base.queue_adapter = :inline + end + + describe 'exception handling' do + it 'sets discard span status to error' do + DiscardJob.perform_later + + _(process_span.status.code).must_equal OpenTelemetry::Trace::Status::ERROR + _(process_span.status.description).must_equal 'Unexpected ActiveJob Error DiscardJob::DiscardError' + + _(discard_span.status.code).must_equal OpenTelemetry::Trace::Status::ERROR + _(discard_span.status.description).must_equal 'Unexpected ActiveJob Error DiscardJob::DiscardError' + _(discard_span.events.first.name).must_equal 'exception' + _(discard_span.events.first.attributes['exception.type']).must_equal 'DiscardJob::DiscardError' + _(discard_span.events.first.attributes['exception.message']).must_equal 'discard me' + end + end +end diff --git a/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/perform_test.rb b/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/perform_test.rb new file mode 100644 index 000000000..cf4c9b976 --- /dev/null +++ b/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/perform_test.rb @@ -0,0 +1,264 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +require 'test_helper' + +require_relative '../../../../../lib/opentelemetry/instrumentation/active_job' + +describe OpenTelemetry::Instrumentation::ActiveJob::Handlers::Perform do + let(:instrumentation) { OpenTelemetry::Instrumentation::ActiveJob::Instrumentation.instance } + let(:config) { { propagation_style: :link, span_naming: :queue } } + let(:exporter) { EXPORTER } + let(:spans) { exporter.finished_spans } + let(:publish_span) { spans.find { |s| s.name == 'default publish' } } + let(:process_span) { spans.find { |s| s.name == 'default process' } } + + before do + OpenTelemetry::Instrumentation::ActiveJob::Handlers.unsubscribe + instrumentation.instance_variable_set(:@config, config) + instrumentation.instance_variable_set(:@installed, false) + + instrumentation.install(config) + ActiveJob::Base.queue_adapter = :async + ActiveJob::Base.queue_adapter.immediate = true + + exporter.reset + end + + after do + begin + ActiveJob::Base.queue_adapter.shutdown + rescue StandardError + nil + end + ActiveJob::Base.queue_adapter = :inline + end + + describe 'perform_later' do + it 'traces enqueuing and processing the job' do + TestJob.perform_later + + _(publish_span).wont_be_nil + _(process_span).wont_be_nil + end + end + + describe 'perform_now' do + it 'only traces processing the job' do + TestJob.perform_now + + _(publish_span).must_be_nil + _(process_span).wont_be_nil + _(process_span.attributes['code.namespace']).must_equal('TestJob') + end + end + + describe 'exception handling' do + it 'sets span status to error' do + _ { ExceptionJob.perform_later }.must_raise StandardError, 'This job raises an exception' + + _(process_span.status.code).must_equal OpenTelemetry::Trace::Status::ERROR + _(process_span.status.description).must_equal 'Unexpected ActiveJob Error StandardError' + + _(process_span.events.first.name).must_equal 'exception' + _(process_span.events.first.attributes['exception.type']).must_equal 'StandardError' + _(process_span.events.first.attributes['exception.message']).must_equal 'This job raises an exception' + end + + it 'captures errors that were handled by rescue_from in versions earlier than Rails 7' do + skip 'rescue_from jobs behave differently in Rails 7 and newer' if ActiveJob.version >= Gem::Version.new('7') + RescueFromJob.perform_later + + _(process_span.status.code).must_equal OpenTelemetry::Trace::Status::ERROR + _(process_span.status.description).must_equal 'Unexpected ActiveJob Error RescueFromJob::RescueFromError' + + _(process_span.events.first.name).must_equal 'exception' + _(process_span.events.first.attributes['exception.type']).must_equal 'RescueFromJob::RescueFromError' + _(process_span.events.first.attributes['exception.message']).must_equal 'I was handled by rescue_from' + end + + it 'ignores errors that were handled by rescue_from in versions of Rails 7 or newer' do + skip 'rescue_from jobs behave differently in Rails 7 and newer' if ActiveJob.version < Gem::Version.new('7') + RescueFromJob.perform_later + + _(process_span.status.code).must_equal OpenTelemetry::Trace::Status::OK + + _(process_span.events).must_be_nil + end + end + + describe 'span kind' do + it 'sets correct span kinds for inline jobs' do + begin + ActiveJob::Base.queue_adapter.shutdown + rescue StandardError + nil + end + ActiveJob::Base.queue_adapter = :inline + + TestJob.perform_later + + _(publish_span.kind).must_equal(:producer) + _(process_span.kind).must_equal(:consumer) + end + + it 'sets correct span kinds for all other jobs' do + TestJob.perform_later + + _(publish_span.kind).must_equal(:producer) + _(process_span.kind).must_equal(:consumer) + end + end + + describe 'attributes' do + describe 'active_job.priority' do + it 'is unset for unprioritized jobs' do + TestJob.perform_later + + [publish_span, process_span].each do |span| + _(span.attributes['messaging.active_job.message.priority']).must_be_nil + end + end + + it 'is set for jobs with a priority' do + TestJob.set(priority: 1).perform_later + + [publish_span, process_span].each do |span| + _(span.attributes['messaging.active_job.message.priority']).must_equal('1') + end + end + end + end + + describe 'span_naming option' do + describe 'when queue - default' do + it 'names spans according to the job queue' do + TestJob.set(queue: :foo).perform_later + publish_span = exporter.finished_spans.find { |s| s.name == 'foo publish' } + _(publish_span).wont_be_nil + + process_span = exporter.finished_spans.find { |s| s.name == 'foo process' } + _(process_span).wont_be_nil + end + end + + describe 'when job_class' do + let(:config) { { propagation_style: :link, span_naming: :job_class } } + + it 'names span according to the job class' do + TestJob.set(queue: :foo).perform_later + + publish_span = exporter.finished_spans.find { |s| s.name == 'TestJob publish' } + _(publish_span).wont_be_nil + + process_span = exporter.finished_spans.find { |s| s.name == 'TestJob process' } + _(process_span).wont_be_nil + end + end + end + + describe 'propagation_style option' do + describe 'link - default' do + # The inline job adapter executes the job immediately upon enqueuing it + # so we can't actually use that in these tests - the actual Context.current at time + # of execution *will* be the context where the job was enqueued, because rails + # ends up doing job.around_enqueue { job.around_perform { block } } inline. + it 'creates span links in separate traces' do + TestJob.perform_later + + _(publish_span.trace_id).wont_equal(process_span.trace_id) + + _(process_span.total_recorded_links).must_equal(1) + _(process_span.links[0].span_context.trace_id).must_equal(publish_span.trace_id) + _(process_span.links[0].span_context.span_id).must_equal(publish_span.span_id) + end + + it 'propagates baggage' do + ctx = OpenTelemetry::Baggage.set_value('testing_baggage', 'it_worked') + OpenTelemetry::Context.with_current(ctx) do + BaggageJob.perform_later + end + + _(publish_span.trace_id).wont_equal(process_span.trace_id) + + _(process_span.total_recorded_links).must_equal(1) + _(process_span.links[0].span_context.trace_id).must_equal(publish_span.trace_id) + _(process_span.links[0].span_context.span_id).must_equal(publish_span.span_id) + + _(process_span.attributes['success']).must_equal(true) + end + end + + describe 'when configured to do parent/child spans' do + let(:config) { { propagation_style: :child, span_naming: :queue } } + + it 'creates a parent/child relationship' do + TestJob.perform_later + + _(process_span.total_recorded_links).must_equal(0) + + _(publish_span.trace_id).must_equal(process_span.trace_id) + _(process_span.parent_span_id).must_equal(publish_span.span_id) + end + + it 'propagates baggage' do + ctx = OpenTelemetry::Baggage.set_value('testing_baggage', 'it_worked') + OpenTelemetry::Context.with_current(ctx) do + BaggageJob.perform_later + end + _(process_span.total_recorded_links).must_equal(0) + + _(publish_span.trace_id).must_equal(process_span.trace_id) + _(process_span.parent_span_id).must_equal(publish_span.span_id) + _(process_span.attributes['success']).must_equal(true) + end + end + + describe 'when explicitly configure for no propagation' do + let(:config) { { propagation_style: :none, span_naming: :queue } } + + it 'skips link creation and does not create parent/child relationship' do + TestJob.perform_later + + _(process_span.total_recorded_links).must_equal(0) + + _(publish_span.trace_id).wont_equal(process_span.trace_id) + _(process_span.parent_span_id).wont_equal(publish_span.span_id) + end + + it 'still propagates baggage' do + ctx = OpenTelemetry::Baggage.set_value('testing_baggage', 'it_worked') + OpenTelemetry::Context.with_current(ctx) do + BaggageJob.perform_later + end + + _(process_span.total_recorded_links).must_equal(0) + + _(publish_span.trace_id).wont_equal(process_span.trace_id) + _(process_span.parent_span_id).wont_equal(publish_span.span_id) + _(process_span.attributes['success']).must_equal(true) + end + end + end + + describe 'active_job callbacks' do + it 'makes the tracing context available in before_perform callbacks' do + skip "ActiveJob #{ActiveJob.version} subscribers do not include timing information for callbacks" if ActiveJob.version < Gem::Version.new('7') + CallbacksJob.perform_now + + _(CallbacksJob.context_before).wont_be_nil + _(CallbacksJob.context_before).must_be :valid? + end + + it 'makes the tracing context available in after_perform callbacks' do + skip "ActiveJob #{ActiveJob.version} subscribers do not include timing information for callbacks" if ActiveJob.version < Gem::Version.new('7') + CallbacksJob.perform_now + + _(CallbacksJob.context_after).wont_be_nil + _(CallbacksJob.context_after).must_be :valid? + end + end +end diff --git a/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/retry_stopped_test.rb b/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/retry_stopped_test.rb new file mode 100644 index 000000000..5b4eb774c --- /dev/null +++ b/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/retry_stopped_test.rb @@ -0,0 +1,57 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +require 'test_helper' + +require_relative '../../../../../lib/opentelemetry/instrumentation/active_job' + +describe 'OpenTelemetry::Instrumentation::ActiveJob::Handlers::RetryStopped' do + let(:instrumentation) { OpenTelemetry::Instrumentation::ActiveJob::Instrumentation.instance } + let(:config) { { propagation_style: :link, span_naming: :queue } } + let(:exporter) { EXPORTER } + let(:spans) { exporter.finished_spans } + let(:publish_span) { spans.find { |s| s.name == 'default publish' } } + let(:process_span) { spans.find { |s| s.name == 'default process' } } + let(:retry_span) { spans.find { |s| s.name == 'retry_stopped.active_job' } } + + before do + OpenTelemetry::Instrumentation::ActiveJob::Handlers.unsubscribe + instrumentation.instance_variable_set(:@config, config) + instrumentation.instance_variable_set(:@installed, false) + + instrumentation.install(config) + ActiveJob::Base.queue_adapter = :async + ActiveJob::Base.queue_adapter.immediate = true + + exporter.reset + end + + after do + begin + ActiveJob::Base.queue_adapter.shutdown + rescue StandardError + nil + end + ActiveJob::Base.queue_adapter = :inline + end + + describe 'attributes' do + describe 'active_job.executions' do + it 'records retry errors' do + _ { RetryJob.perform_later }.must_raise StandardError + + _(process_span.status.code).must_equal OpenTelemetry::Trace::Status::ERROR + _(process_span.status.description).must_equal 'Unexpected ActiveJob Error StandardError' + + _(retry_span.status.code).must_equal OpenTelemetry::Trace::Status::ERROR + _(retry_span.status.description).must_equal 'Unexpected ActiveJob Error StandardError' + _(retry_span.events.first.name).must_equal 'exception' + _(retry_span.events.first.attributes['exception.type']).must_equal 'StandardError' + _(retry_span.events.first.attributes['exception.message']).must_equal 'from retry job' + end + end + end +end diff --git a/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers_test.rb b/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers_test.rb new file mode 100644 index 000000000..790ace52c --- /dev/null +++ b/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers_test.rb @@ -0,0 +1,53 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +require 'test_helper' + +require_relative '../../../../lib/opentelemetry/instrumentation/active_job' + +describe OpenTelemetry::Instrumentation::ActiveJob::Handlers do + let(:instrumentation) { OpenTelemetry::Instrumentation::ActiveJob::Instrumentation.instance } + let(:config) { { propagation_style: :link, span_naming: :queue } } + let(:exporter) { EXPORTER } + let(:spans) { exporter.finished_spans } + let(:publish_span) { spans.find { |s| s.name == 'default publish' } } + let(:process_span) { spans.find { |s| s.name == 'default process' } } + + before do + OpenTelemetry::Instrumentation::ActiveJob::Handlers.unsubscribe + instrumentation.instance_variable_set(:@config, config) + instrumentation.instance_variable_set(:@installed, false) + + instrumentation.install(config) + ActiveJob::Base.queue_adapter = :async + ActiveJob::Base.queue_adapter.immediate = true + + exporter.reset + end + + after do + begin + ActiveJob::Base.queue_adapter.shutdown + rescue StandardError + nil + end + ActiveJob::Base.queue_adapter = :inline + end + + describe 'compatibility' do + it 'works with positional args' do + _(PositionalOnlyArgsJob.perform_now('arg1')).must_be_nil # Make sure this runs without raising an error + end + + it 'works with keyword args' do + _(KeywordOnlyArgsJob.perform_now(keyword2: :keyword2)).must_be_nil # Make sure this runs without raising an error + end + + it 'works with mixed args' do + _(MixedArgsJob.perform_now('arg1', 'arg2', keyword2: :keyword2)).must_be_nil # Make sure this runs without raising an error + end + end +end diff --git a/instrumentation/active_job/test/instrumentation/active_job/instrumentation_test.rb b/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/instrumentation_test.rb similarity index 93% rename from instrumentation/active_job/test/instrumentation/active_job/instrumentation_test.rb rename to instrumentation/active_job/test/opentelemetry/instrumentation/active_job/instrumentation_test.rb index c1c2381ea..e41b7eb5c 100644 --- a/instrumentation/active_job/test/instrumentation/active_job/instrumentation_test.rb +++ b/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/instrumentation_test.rb @@ -6,7 +6,7 @@ require 'test_helper' -require_relative '../../../lib/opentelemetry/instrumentation/active_job' +require_relative '../../../../lib/opentelemetry/instrumentation/active_job' describe OpenTelemetry::Instrumentation::ActiveJob do let(:instrumentation) { OpenTelemetry::Instrumentation::ActiveJob::Instrumentation.instance } diff --git a/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/mappers/attribute_test.rb b/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/mappers/attribute_test.rb new file mode 100644 index 000000000..060e93f67 --- /dev/null +++ b/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/mappers/attribute_test.rb @@ -0,0 +1,77 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +require 'test_helper' + +require_relative '../../../../../lib/opentelemetry/instrumentation/active_job' + +describe OpenTelemetry::Instrumentation::ActiveJob::Mappers::Attribute do + let(:instrumentation) { OpenTelemetry::Instrumentation::ActiveJob::Instrumentation.instance } + let(:config) { { propagation_style: :link, span_naming: :queue } } + let(:exporter) { EXPORTER } + let(:spans) { exporter.finished_spans } + let(:publish_span) { spans.find { |s| s.name == 'default publish' } } + let(:process_span) { spans.find { |s| s.name == 'default process' } } + + before do + OpenTelemetry::Instrumentation::ActiveJob::Handlers.unsubscribe + instrumentation.instance_variable_set(:@config, config) + instrumentation.instance_variable_set(:@installed, false) + + instrumentation.install(config) + ActiveJob::Base.queue_adapter = :async + ActiveJob::Base.queue_adapter.immediate = true + + exporter.reset + end + + after do + begin + ActiveJob::Base.queue_adapter.shutdown + rescue StandardError + nil + end + ActiveJob::Base.queue_adapter = :inline + end + + it 'uses trace semantic conventions and Rails specific attributes' do + job = TestJob.perform_later + + [publish_span, process_span].each do |span| + _(span.attributes['code.namespace']).must_equal('TestJob') + _(span.attributes['messaging.system']).must_equal('active_job') + _(span.attributes['messaging.active_job.adapter.name']).must_equal('async') + _(span.attributes['messaging.destination']).must_equal('default') + _(span.attributes['messaging.message.id']).must_equal(job.job_id) + _(span.attributes['messaging.active_job.message.priority']).must_be_nil + end + + _(process_span.attributes['messaging.active_job.message.provider_job_id']).must_equal(job.provider_job_id) + end + + it 'tracks the job priority' do + TestJob.set(priority: 5).perform_later + + [publish_span, process_span].each do |span| + _(span.attributes['messaging.active_job.message.priority']).must_equal('5') + end + end + + it 'is set correctly for the inline adapter' do + begin + ActiveJob::Base.queue_adapter.shutdown + rescue StandardError + nil + end + + ActiveJob::Base.queue_adapter = :inline + TestJob.perform_later + + [publish_span, process_span].each do |span| + _(span.attributes['messaging.active_job.adapter.name']).must_equal('inline') + end + end +end diff --git a/instrumentation/active_job/test/instrumentation/active_job/patches/base_test.rb b/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/patches/base_test.rb similarity index 63% rename from instrumentation/active_job/test/instrumentation/active_job/patches/base_test.rb rename to instrumentation/active_job/test/opentelemetry/instrumentation/active_job/patches/base_test.rb index c22dc0b8a..d8c5cc22e 100644 --- a/instrumentation/active_job/test/instrumentation/active_job/patches/base_test.rb +++ b/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/patches/base_test.rb @@ -6,29 +6,20 @@ require 'test_helper' -require_relative '../../../../lib/opentelemetry/instrumentation/active_job' +require_relative '../../../../../lib/opentelemetry/instrumentation/active_job' describe OpenTelemetry::Instrumentation::ActiveJob::Patches::Base do - describe 'attr_accessor' do - it 'adds a "metadata" accessor' do - job = TestJob.new - - _(job).must_respond_to :metadata - _(job).must_respond_to :metadata= - end - end - describe 'serialization / deserialization' do it 'must handle metadata' do job = TestJob.new - job.metadata = { 'foo' => 'bar' } + job.__otel_headers = { 'foo' => 'bar' } serialized_job = job.serialize - _(serialized_job.keys).must_include 'metadata' + _(serialized_job.keys).must_include '__otel_headers' job = TestJob.new job.deserialize(serialized_job) - _(job.metadata).must_equal('foo' => 'bar') + _(job.__otel_headers).must_equal('foo' => 'bar') end it 'handles jobs queued without instrumentation' do # e.g. during a rolling deployment diff --git a/instrumentation/active_job/test/test_helper.rb b/instrumentation/active_job/test/test_helper.rb index 1af04821d..dd134b0ff 100644 --- a/instrumentation/active_job/test/test_helper.rb +++ b/instrumentation/active_job/test/test_helper.rb @@ -11,7 +11,6 @@ require 'active_job' require 'opentelemetry-instrumentation-active_job' require 'minitest/autorun' -require 'webmock/minitest' class TestJob < ActiveJob::Base def perform; end @@ -21,7 +20,16 @@ class RetryJob < ActiveJob::Base retry_on StandardError, wait: 0, attempts: 2 def perform - raise StandardError + raise StandardError, 'from retry job' + end +end + +class DiscardJob < ActiveJob::Base + class DiscardError < StandardError; end + discard_on DiscardError + + def perform + raise DiscardError, 'discard me' end end @@ -33,7 +41,7 @@ def perform class BaggageJob < ActiveJob::Base def perform - OpenTelemetry::Trace.current_span['success'] = true if OpenTelemetry::Baggage.value('testing_baggage') == 'it_worked' + OpenTelemetry::Trace.current_span['success'] = OpenTelemetry::Baggage.value('testing_baggage') == 'it_worked' end end @@ -68,6 +76,18 @@ def initialize(*) end end +class RescueFromJob < ActiveJob::Base + class RescueFromError < StandardError; end + + rescue_from RescueFromError do + # do nothing + end + + def perform + raise RescueFromError, 'I was handled by rescue_from' + end +end + ActiveJob::Base.queue_adapter = :inline ActiveJob::Base.logger = Logger.new($stderr, level: ENV.fetch('OTEL_LOG_LEVEL', 'fatal').to_sym)