Skip to content

Commit

Permalink
refactor... maybe a little overengineering
Browse files Browse the repository at this point in the history
  • Loading branch information
arielvalentin committed Oct 10, 2023
1 parent 38bbe0b commit 4a24c08
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ module ActiveJob
module Handlers
# Handles `enqueue.active_job` and `enqueue_at.active_job` to generate egress spans
class Enqueue < Default
def initialize(...)
super
@span_name_formatter = if @config[:span_naming] == :job_class
->(job) { "#{job.class.name} publish" }
else
->(job) { "#{job.queue_name} publish" }
end
end

# Overrides the `Default#start_span` method to create an egress span
# and registers it with the current context
#
Expand All @@ -19,16 +28,11 @@ class Enqueue < Default
# @return [Hash] with the span and generated context tokens
def start_span(name, _id, payload)
job = payload.fetch(:job)
span = @tracer.start_span(span_name_from(job), kind: :producer, attributes: @mapper.call(payload))
span = @tracer.start_span(@span_name_formatter.call(job), kind: :producer, attributes: @mapper.call(payload))
tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))]
OpenTelemetry.propagation.inject(job.__otel_headers) # This must be transmitted over the wire
{ span: span, ctx_tokens: tokens }
end

# TODO: extract strategy
def span_name_from(job)
"#{@config[:span_naming] == :job_class ? job.class.name : job.queue_name} publish"
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ module ActiveJob
module Handlers
# Handles perform.active_job to geenrate ingress spans
class Perform < Default
def initialize(...)
super
@span_name_formatter = if @config[:span_naming] == :job_class
->(job) { "#{job.class.name} process" }
else
->(job) { "#{job.queue_name} process" }
end
end

# Overrides the `Default#start_span` method to create an ingress span
# and registers it with the current context
#
Expand All @@ -22,7 +31,7 @@ def start_span(name, _id, payload)
job = payload.fetch(:job)
parent_context = OpenTelemetry.propagation.extract(job.__otel_headers)

span_name = span_name_from(job)
span_name = @span_name_formatter.call(job)

# TODO: Refactor into a propagation strategy
propagation_style = @config[:propagation_style]
Expand Down Expand Up @@ -52,11 +61,6 @@ def attach_consumer_context(span)

[consumer_context, internal_context].map { |context| OpenTelemetry::Context.attach(context) }
end

# TODO: refactor into a strategy
def span_name_from(job)
"#{@config[:span_naming] == :job_class ? job.class.name : job.queue_name} process"
end
end
end
end
Expand Down

0 comments on commit 4a24c08

Please sign in to comment.