Skip to content

Commit

Permalink
refactor: Clean up Handler and MessageHelper
Browse files Browse the repository at this point in the history
  • Loading branch information
jterapin committed Sep 5, 2024
1 parent 7fb750c commit 32c0aa8
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,36 +9,32 @@ module Instrumentation
module AwsSdk
# Generates Spans for all interactions with AwsSdk
class Handler < Seahorse::Client::Handler
SQS_SEND_MESSAGE = 'SQS.SendMessage'
SQS_SEND_MESSAGE_BATCH = 'SQS.SendMessageBatch'
SQS_RECEIVE_MESSAGE = 'SQS.ReceiveMessage'
SNS_PUBLISH = 'SNS.Publish'

def call(context)
return super unless context

service_name = service_name(context)
service_id = service_name(context)
operation = context.operation&.name
client_method = "#{service_name}.#{operation}"
attributes = {
'aws.region' => context.config.region,
OpenTelemetry::SemanticConventions::Trace::RPC_SYSTEM => 'aws-api',
OpenTelemetry::SemanticConventions::Trace::RPC_METHOD => operation,
OpenTelemetry::SemanticConventions::Trace::RPC_SERVICE => service_name
}
attributes[SemanticConventions::Trace::DB_SYSTEM] = 'dynamodb' if service_name == 'DynamoDB'
MessagingHelper.apply_sqs_attributes(attributes, context, client_method) if service_name == 'SQS'
MessagingHelper.apply_sns_attributes(attributes, context, client_method) if service_name == 'SNS'
client_method = "#{service_id}.#{operation}"

tracer.in_span(
span_name(context, client_method, service_id),
attributes: attributes(context, client_method, service_id, operation),
kind: span_kind(client_method, service_id)
) do |span|
if instrumentation_config[:inject_messaging_context] &&
%w[SQS SNS].include?(service_id)
MessagingHelper.inject_context(context, client_method)
end

tracer.in_span(span_name(context, client_method), attributes: attributes, kind: span_kind(client_method)) do |span|
inject_context(context, client_method)
if instrumentation_config[:suppress_internal_instrumentation]
OpenTelemetry::Common::Utilities.untraced { super }
else
super
end.tap do |response|
span.set_attribute(OpenTelemetry::SemanticConventions::Trace::HTTP_STATUS_CODE,
context.http_response.status_code)
span.set_attribute(
OpenTelemetry::SemanticConventions::Trace::HTTP_STATUS_CODE,
context.http_response.status_code
)

if (err = response.error)
span.record_exception(err)
Expand All @@ -65,48 +61,40 @@ def service_name(context)
context.client.class.api.metadata['serviceId'] || context.client.class.to_s.split('::')[1]
end

SEND_MESSAGE_CLIENT_METHODS = [SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH].freeze
def inject_context(context, client_method)
return unless SEND_MESSAGE_CLIENT_METHODS.include? client_method
return unless instrumentation_config[:inject_messaging_context]

if client_method == SQS_SEND_MESSAGE_BATCH
context.params[:entries].each do |entry|
entry[:message_attributes] ||= {}
OpenTelemetry.propagation.inject(entry[:message_attributes], setter: MessageAttributeSetter)
end
def span_kind(client_method, service_id)
case service_id
when 'SQS', 'SNS'
MessagingHelper.span_kind(client_method)
else
context.params[:message_attributes] ||= {}
OpenTelemetry.propagation.inject(context.params[:message_attributes], setter: MessageAttributeSetter)
OpenTelemetry::Trace::SpanKind::CLIENT
end
end

def span_kind(client_method)
case client_method
when SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH
OpenTelemetry::Trace::SpanKind::PRODUCER
when SQS_RECEIVE_MESSAGE
OpenTelemetry::Trace::SpanKind::CONSUMER
def span_name(context, client_method, service_id)
case service_id
when 'SQS', 'SNS'
MessagingHelper.legacy_span_name(context, client_method)
else
OpenTelemetry::Trace::SpanKind::CLIENT
client_method
end
end

def span_name(context, client_method)
case client_method
when SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH
"#{MessagingHelper.queue_name(context)} publish"
when SQS_RECEIVE_MESSAGE
"#{MessagingHelper.queue_name(context)} receive"
else
client_method
def attributes(context, client_method, service_id, operation)
{
'aws.region' => context.config.region,
OpenTelemetry::SemanticConventions::Trace::RPC_SYSTEM => 'aws-api',
OpenTelemetry::SemanticConventions::Trace::RPC_METHOD => operation,
OpenTelemetry::SemanticConventions::Trace::RPC_SERVICE => service_id
}.tap do |attrs|
attrs[SemanticConventions::Trace::DB_SYSTEM] = 'dynamodb' if service_id == 'DynamoDB'
MessagingHelper.apply_span_attributes(context, attrs, client_method, service_id) if %w[SQS SNS].include?(service_id)
end
end
end

# A Seahorse::Client::Plugin that enables instrumentation for all AWS services
class Plugin < Seahorse::Client::Plugin
def add_handlers(handlers, config)
def add_handlers(handlers, _config)
# run before Seahorse::Client::Plugin::ParamValidator (priority 50)
handlers.add Handler, step: :validate, priority: 49
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,15 @@
module OpenTelemetry
module Instrumentation
module AwsSdk
# MessagingHelper class provides methods for calculating messaging span attributes
# An utility class to help SQS/SNS-related span attributes/context injection
class MessagingHelper
class << self
SQS_SEND_MESSAGE = 'SQS.SendMessage'
SQS_SEND_MESSAGE_BATCH = 'SQS.SendMessageBatch'
SQS_RECEIVE_MESSAGE = 'SQS.ReceiveMessage'
SNS_PUBLISH = 'SNS.Publish'
SEND_MESSAGE_CLIENT_METHODS = [SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH].freeze

def queue_name(context)
topic_arn = context.params[:topic_arn]
target_arn = context.params[:target_arn]
Expand All @@ -28,19 +34,65 @@ def queue_name(context)
'unknown'
end

def legacy_span_name(context, client_method)
case client_method
when SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH
"#{MessagingHelper.queue_name(context)} publish"
when SQS_RECEIVE_MESSAGE
"#{MessagingHelper.queue_name(context)} receive"
else
client_method
end
end

# def apply_span_attributes(attrs, context, service_id, client_method)
def apply_span_attributes(context, attrs, client_method, service_id)
case service_id
when 'SQS'
apply_sqs_attributes(attrs, context, client_method)
when 'SNS'
apply_sns_attributes(attrs, context, client_method)
end
end

def span_kind(client_method)
case client_method
when SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH
OpenTelemetry::Trace::SpanKind::PRODUCER
when SQS_RECEIVE_MESSAGE
OpenTelemetry::Trace::SpanKind::CONSUMER
else
OpenTelemetry::Trace::SpanKind::CLIENT
end
end

def inject_context(context, client_method)
return unless SEND_MESSAGE_CLIENT_METHODS.include?(client_method)

if client_method == SQS_SEND_MESSAGE_BATCH
context.params[:entries].each do |entry|
entry[:message_attributes] ||= {}
OpenTelemetry.propagation.inject(entry[:message_attributes], setter: MessageAttributeSetter)
end
else
context.params[:message_attributes] ||= {}
OpenTelemetry.propagation.inject(context.params[:message_attributes], setter: MessageAttributeSetter)
end
end

private

def apply_sqs_attributes(attributes, context, client_method)
attributes[SemanticConventions::Trace::MESSAGING_SYSTEM] = 'aws.sqs'
attributes[SemanticConventions::Trace::MESSAGING_DESTINATION_KIND] = 'queue'
attributes[SemanticConventions::Trace::MESSAGING_DESTINATION] = queue_name(context)
attributes[SemanticConventions::Trace::MESSAGING_URL] = context.params[:queue_url] if context.params[:queue_url]

attributes[SemanticConventions::Trace::MESSAGING_OPERATION] = 'receive' if client_method == 'SQS.ReceiveMessage'
attributes[SemanticConventions::Trace::MESSAGING_OPERATION] = 'receive' if client_method == SQS_RECEIVE_MESSAGE
end

def apply_sns_attributes(attributes, context, client_method)
attributes[SemanticConventions::Trace::MESSAGING_SYSTEM] = 'aws.sns'

return unless client_method == 'SNS.Publish'
return unless client_method == SNS_PUBLISH

attributes[SemanticConventions::Trace::MESSAGING_DESTINATION_KIND] = 'topic'
attributes[SemanticConventions::Trace::MESSAGING_DESTINATION] = queue_name(context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ Gem::Specification.new do |spec|
spec.homepage = 'https://github.com/open-telemetry/opentelemetry-ruby-contrib'
spec.license = 'Apache-2.0'

spec.files = Dir.glob('lib/**/*.rb') +
spec.files =
Dir.glob('lib/**/*.rb') +
Dir.glob('*.md') +
['LICENSE', '.yardopts']
spec.require_paths = ['lib']
Expand Down

0 comments on commit 32c0aa8

Please sign in to comment.