Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Micrometer's Observation API for the SQS pipeline #1164

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions docs/src/main/asciidoc/sqs.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,37 @@ Optional<Message<SampleRecord>> receivedMessage = template
.receive(queue, SampleRecord.class);
```

==== Observability Support

The framework instruments the codebase for sending messages to SQS to publish observations.
This instrumentation will create 2 types of observations:

* `sqs.single.message.publish` when a single SQS message is published by the service. It propagates the outbound tracing information by setting it up in `MessageHeaders`.
* `sqs.batch.message.publish` when multiple SQS messages are published by the service. It propagates the outbound tracing information by setting it up in `MessageHeaders` of each SQS message.

Additionally, both types of observations measure the time taken to publish SQS messages and create the following `KeyValues`:

.Low cardinality Keys
[cols="2"]
|===
| Name | Description
| `messaging.operation` |The mode of SQS message publishing (values: `single message publish` or `batch message publish`).
|===

.High cardinality Keys
[cols="2"]
|===
| Name | Description
| `messaging.message.id` |The ID of either a single published SQS message or concatenation IDs of all SQS messages in the entire published batch.
|===

When using a `Spring Boot` application with `auto-configuration` and the default `SqsTemplate`, enabling this functionality requires only the `ObservationRegistry` bean to be available. In any other case, the `ObservationRegistry` needs to be set in the `SqsTemplate` builder:

[source, java]
----
SqsTemplate.builder().observationRegistry(registry);
----

=== Receiving Messages

The framework offers the following options to receive messages from a queue.
Expand Down Expand Up @@ -1751,3 +1782,82 @@ Sample IAM policy granting access to SQS:
"Resource": "yourARN"
}
----

=== Observability Support

The framework offers instrumentation for the SQS processing codebase to publish observations.
This instrumentation is available with SQS polling and `SqsTemplate`.
The observations measure the time taken to process SQS messages and create the following `KeyValues`:

.High cardinality Keys
[cols="2"]
|===
| Name | Description
| `messaging.message.id` |The ID of either a single processed SQS message or the concatenation IDs of all SQS messages in the entire processed batch.
|===

.Low cardinality Keys for SQS polling
[cols="2"]
|===
| Name | Description
| `messaging.operation` |The mode of SQS message processing (values: `single message polling process` or `batch message polling process`).
|===

.Low cardinality Keys for `SqsTemplate`
[cols="2"]
|===
| Name | Description
| `messaging.operation` |The mode of SQS message processing (values: `single message manual process` or `batch message manual process`).
|===


==== SQS polling

The instrumentation will create two types of observations:

* `sqs.single.message.polling.process` when a SQS messages from the batch are processed by the service. They propagate the inbound tracing information by looking it up in `MessageHeaders`.
* `sqs.batch.message.polling.process` when the entire received batch of SQS messages is processed by the service. They propagate the inbound tracing information in the form the of a list of tracing `Link` by looking it up in the `MessageHeaders` of incoming traces in the entire received batch.

When using a `Spring Boot` application with `auto-configuration` and the default `SqsMessageListenerContainerFactory`, enabling this functionality requires only the `ObservationRegistry` bean to be available. In any other case, the `ObservationRegistry` needs to be set in the `MessageListenerContainerFactory`:

[source, java]
----
@Bean
SqsMessageListenerContainerFactory<Object> sqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient, ObservationRegistry observationRegistry) {
return SqsMessageListenerContainerFactory
.builder()
.sqsAsyncClientSupplier(sqsAsyncClient)
.observationRegistry(observationRegistry)
.build();
}
----

Or directly in the `MessageListenerContainer`:

[source, java]
----
@Bean
MessageListenerContainer<Object> listenerContainer(SqsAsyncClient sqsAsyncClient, ObservationRegistry observationRegistry) {
return SqsMessageListenerContainer
.builder()
.sqsAsyncClient(sqsAsyncClient)
.observationRegistry(observationRegistry)
.build();
}
----

==== SqsTemplate

The instrumentation will create two types of observations:

* `sqs.single.message.manual.process` when an individual SQS message from the batch is processed by the service. The inbound tracing information is not propagated from messages, and a new tracing context is created instead.
* `sqs.batch.message.manual.process` when the entire batch of received SQS messages is processed by the service. The inbound tracing information is not propagated from messages, and a new tracing context is created instead.

*Note*: The inbound tracing information is not propagated, and a new tracing context is created because the user of SqsTemplate can invoke this API within the scope of an existing tracing context, where their trace ID should be reused. Additionally, the headers of received messages contain the required tracing information, allowing the user to decide whether to use the inbound tracing or not.

When using a `Spring Boot` application with `auto-configuration` and the default `SqsTemplate`, enabling this functionality requires only the `ObservationRegistry` bean to be available. In any other case, the `ObservationRegistry` needs to be set in the `SqsTemplate` builder:

[source, java]
----
SqsTemplate.builder().observationRegistry(registry);
----
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.awspring.cloud.autoconfigure.sqs;

import static org.springframework.boot.actuate.autoconfigure.tracing.MicrometerTracingAutoConfiguration.RECEIVER_TRACING_OBSERVATION_HANDLER_ORDER;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.awspring.cloud.autoconfigure.AwsAsyncClientCustomizer;
import io.awspring.cloud.autoconfigure.core.AwsClientBuilderConfigurer;
Expand All @@ -30,21 +32,28 @@
import io.awspring.cloud.sqs.listener.errorhandler.ErrorHandler;
import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor;
import io.awspring.cloud.sqs.listener.interceptor.MessageInterceptor;
import io.awspring.cloud.sqs.observation.BatchMessageProcessTracingObservationHandler;
import io.awspring.cloud.sqs.operations.SqsTemplate;
import io.awspring.cloud.sqs.operations.SqsTemplateBuilder;
import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter;
import io.awspring.cloud.sqs.support.converter.SqsMessagingMessageConverter;
import io.micrometer.observation.ObservationRegistry;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.propagation.Propagator;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.annotation.Order;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.SqsAsyncClientBuilder;
import software.amazon.awssdk.services.sqs.model.Message;
Expand All @@ -56,6 +65,7 @@
* @author Maciej Walkowiak
* @author Wei Jiang
* @author Dongha Kim
* @author Mariusz Sondecki
* @since 3.0
*/
@AutoConfiguration
Expand Down Expand Up @@ -87,13 +97,15 @@ public SqsAsyncClient sqsAsyncClient(AwsClientBuilderConfigurer awsClientBuilder
@ConditionalOnMissingBean
@Bean
public SqsTemplate sqsTemplate(SqsAsyncClient sqsAsyncClient, ObjectProvider<ObjectMapper> objectMapperProvider,
MessagingMessageConverter<Message> messageConverter) {
MessagingMessageConverter<Message> messageConverter,
ObjectProvider<ObservationRegistry> observationRegistry) {
SqsTemplateBuilder builder = SqsTemplate.builder().sqsAsyncClient(sqsAsyncClient)
.messageConverter(messageConverter);
objectMapperProvider.ifAvailable(om -> setMapperToConverter(messageConverter, om));
if (sqsProperties.getQueueNotFoundStrategy() != null) {
builder.configure((options) -> options.queueNotFoundStrategy(sqsProperties.getQueueNotFoundStrategy()));
}
observationRegistry.ifAvailable(builder::observationRegistry);
return builder.build();
}

Expand All @@ -104,7 +116,8 @@ public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFac
ObjectProvider<ErrorHandler<Object>> errorHandler,
ObjectProvider<AsyncMessageInterceptor<Object>> asyncInterceptors,
ObjectProvider<MessageInterceptor<Object>> interceptors, ObjectProvider<ObjectMapper> objectMapperProvider,
MessagingMessageConverter<?> messagingMessageConverter) {
MessagingMessageConverter<?> messagingMessageConverter,
ObjectProvider<ObservationRegistry> observationRegistry) {

SqsMessageListenerContainerFactory<Object> factory = new SqsMessageListenerContainerFactory<>();
factory.configure(this::configureProperties);
Expand All @@ -115,6 +128,8 @@ public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFac
asyncInterceptors.forEach(factory::addMessageInterceptor);
objectMapperProvider.ifAvailable(om -> setMapperToConverter(messagingMessageConverter, om));
factory.configure(options -> options.messageConverter(messagingMessageConverter));
observationRegistry.ifAvailable(factory::setObservationRegistry);

return factory;
}

Expand Down Expand Up @@ -149,4 +164,17 @@ public SqsListenerConfigurer objectMapperCustomizer(ObjectProvider<ObjectMapper>
};
}

@Configuration(proxyBeanMethods = false)
@ConditionalOnBean({ Tracer.class, Propagator.class })
public static class SqsTracingConfiguration {

@Bean
@ConditionalOnMissingBean
@Order(RECEIVER_TRACING_OBSERVATION_HANDLER_ORDER - 100)
public BatchMessageProcessTracingObservationHandler batchMessageProcessTracingObservationHandler(Tracer tracer,
Propagator propagator) {
return new BatchMessageProcessTracingObservationHandler(tracer, propagator);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@
import io.awspring.cloud.sqs.listener.QueueNotFoundStrategy;
import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler;
import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor;
import io.awspring.cloud.sqs.observation.BatchMessageProcessTracingObservationHandler;
import io.awspring.cloud.sqs.operations.SqsTemplate;
import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter;
import io.awspring.cloud.sqs.support.converter.SqsMessagingMessageConverter;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.propagation.Propagator;
import java.net.URI;
import java.time.Duration;
import java.util.List;
Expand Down Expand Up @@ -242,6 +245,13 @@ void configuresMessageConverter() {
});
}

@Test
void configureSqsObservation() {
this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.enabled:true")
.withUserConfiguration(TracingConfiguration.class)
.run(context -> assertThat(context).hasSingleBean(BatchMessageProcessTracingObservationHandler.class));
}

@Configuration(proxyBeanMethods = false)
static class CustomComponentsConfiguration {

Expand Down Expand Up @@ -300,4 +310,19 @@ public SdkAsyncHttpClient asyncHttpClient() {
}
}

@Configuration(proxyBeanMethods = false)
static class TracingConfiguration {

@Bean
Propagator propagator() {
return Propagator.NOOP;
}

@Bean
Tracer tracer() {
return Tracer.NOOP;
}

}

}
25 changes: 24 additions & 1 deletion spring-cloud-aws-sqs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand All @@ -57,7 +61,26 @@
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-observation-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-brave</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave-tests</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Loading