Skip to content

Commit

Permalink
Add support for Micrometer's Observation API for the SQS
Browse files Browse the repository at this point in the history
  • Loading branch information
sondemar committed Aug 29, 2024
1 parent 57e94c2 commit 804f8cb
Show file tree
Hide file tree
Showing 68 changed files with 2,849 additions and 330 deletions.
110 changes: 110 additions & 0 deletions docs/src/main/asciidoc/sqs.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,37 @@ Optional<Message<SampleRecord>> receivedMessage = template
.receive(queue, SampleRecord.class);
```

==== Observability Support

The framework instruments the codebase for sending messages to SQS to publish observations.
This instrumentation will create 2 types of observations:

* `sqs.single.message.publish` when a single SQS message is published by the service. It propagates the outbound tracing information by setting it up in `MessageHeaders`.
* `sqs.batch.message.publish` when multiple SQS messages are published by the service. It propagates the outbound tracing information by setting it up in `MessageHeaders` of each SQS message.

Additionally, both types of observations measure the time taken to publish SQS messages and create the following `KeyValues`:

.Low cardinality Keys
[cols="2"]
|===
| Name | Description
| `messaging.operation` |The mode of SQS message publishing (values: `single message publish` or `batch message publish`).
|===

.High cardinality Keys
[cols="2"]
|===
| Name | Description
| `messaging.message.id` |The ID of either a single published SQS message or concatenation IDs of all SQS messages in the entire published batch.
|===

When using a `Spring Boot` application with `auto-configuration` and the default `SqsTemplate`, enabling this functionality requires only the `ObservationRegistry` bean to be available. In any other case, the `ObservationRegistry` needs to be set in the `SqsTemplate` builder:

[source, java]
----
SqsTemplate.builder().observationRegistry(registry);
----

=== Receiving Messages

The framework offers the following options to receive messages from a queue.
Expand Down Expand Up @@ -1723,3 +1754,82 @@ Sample IAM policy granting access to SQS:
"Resource": "yourARN"
}
----

=== Observability Support

The framework offers instrumentation for the SQS processing codebase to publish observations.
This instrumentation is available with SQS polling and `SqsTemplate`.
The observations measure the time taken to process SQS messages and create the following `KeyValues`:

.High cardinality Keys
[cols="2"]
|===
| Name | Description
| `messaging.message.id` |The ID of either a single processed SQS message or the concatenation IDs of all SQS messages in the entire processed batch.
|===

.Low cardinality Keys for SQS polling
[cols="2"]
|===
| Name | Description
| `messaging.operation` |The mode of SQS message processing (values: `single message polling process` or `batch message polling process`).
|===

.Low cardinality Keys for `SqsTemplate`
[cols="2"]
|===
| Name | Description
| `messaging.operation` |The mode of SQS message processing (values: `single message manual process` or `batch message manual process`).
|===


==== SQS polling

The instrumentation will create two types of observations:

* `sqs.single.message.polling.process` when a SQS messages from the batch are processed by the service. They propagate the inbound tracing information by looking it up in `MessageHeaders`.
* `sqs.batch.message.polling.process` when the entire received batch of SQS messages is processed by the service. They propagate the inbound tracing information in the form the of a list of tracing `Link` by looking it up in the `MessageHeaders` of incoming traces in the entire received batch.

When using a `Spring Boot` application with `auto-configuration` and the default `SqsMessageListenerContainerFactory`, enabling this functionality requires only the `ObservationRegistry` bean to be available. In any other case, the `ObservationRegistry` needs to be set in the `MessageListenerContainerFactory`:

[source, java]
----
@Bean
SqsMessageListenerContainerFactory<Object> sqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient, ObservationRegistry observationRegistry) {
return SqsMessageListenerContainerFactory
.builder()
.sqsAsyncClientSupplier(sqsAsyncClient)
.observationRegistry(observationRegistry)
.build();
}
----

Or directly in the `MessageListenerContainer`:

[source, java]
----
@Bean
MessageListenerContainer<Object> listenerContainer(SqsAsyncClient sqsAsyncClient, ObservationRegistry observationRegistry) {
return SqsMessageListenerContainer
.builder()
.sqsAsyncClient(sqsAsyncClient)
.observationRegistry(observationRegistry)
.build();
}
----

==== SqsTemplate

The instrumentation will create two types of observations:

* `sqs.single.message.manual.process` when an individual SQS message from the batch is processed by the service. The inbound tracing information is not propagated from messages, and a new tracing context is created instead.
* `sqs.batch.message.manual.process` when the entire batch of received SQS messages is processed by the service. The inbound tracing information is not propagated from messages, and a new tracing context is created instead.

*Note*: The inbound tracing information is not propagated, and a new tracing context is created because the user of SqsTemplate can invoke this API within the scope of an existing tracing context, where their trace ID should be reused. Additionally, the headers of received messages contain the required tracing information, allowing the user to decide whether to use the inbound tracing or not.

When using a `Spring Boot` application with `auto-configuration` and the default `SqsTemplate`, enabling this functionality requires only the `ObservationRegistry` bean to be available. In any other case, the `ObservationRegistry` needs to be set in the `SqsTemplate` builder:

[source, java]
----
SqsTemplate.builder().observationRegistry(registry);
----
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,40 @@
*/
package io.awspring.cloud.autoconfigure.sqs;

import static org.springframework.boot.actuate.autoconfigure.tracing.MicrometerTracingAutoConfiguration.RECEIVER_TRACING_OBSERVATION_HANDLER_ORDER;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.awspring.cloud.autoconfigure.core.AwsClientBuilderConfigurer;
import io.awspring.cloud.autoconfigure.core.AwsClientCustomizer;
import io.awspring.cloud.autoconfigure.core.AwsConnectionDetails;
import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration;
import io.awspring.cloud.autoconfigure.core.RegionProviderAutoConfiguration;
import io.awspring.cloud.autoconfigure.core.*;
import io.awspring.cloud.sqs.config.SqsBootstrapConfiguration;
import io.awspring.cloud.sqs.config.SqsListenerConfigurer;
import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory;
import io.awspring.cloud.sqs.listener.SqsContainerOptionsBuilder;
import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler;
import io.awspring.cloud.sqs.listener.errorhandler.ErrorHandler;
import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor;
import io.awspring.cloud.sqs.listener.interceptor.MessageInterceptor;
import io.awspring.cloud.sqs.listener.SqsContainerOptionsBuilder;
import io.awspring.cloud.sqs.observation.BatchMessageProcessTracingObservationHandler;
import io.awspring.cloud.sqs.operations.SqsTemplate;
import io.awspring.cloud.sqs.operations.SqsTemplateBuilder;
import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter;
import io.awspring.cloud.sqs.support.converter.SqsMessagingMessageConverter;
import io.micrometer.observation.ObservationRegistry;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.propagation.Propagator;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.annotation.Order;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.SqsAsyncClientBuilder;
import software.amazon.awssdk.services.sqs.model.Message;
Expand All @@ -55,6 +60,7 @@
* @author Maciej Walkowiak
* @author Wei Jiang
* @author Dongha Kim
* @author Mariusz Sondecki
* @since 3.0
*/
@AutoConfiguration
Expand Down Expand Up @@ -82,12 +88,16 @@ public SqsAsyncClient sqsAsyncClient(AwsClientBuilderConfigurer awsClientBuilder

@ConditionalOnMissingBean
@Bean
public SqsTemplate sqsTemplate(SqsAsyncClient sqsAsyncClient, ObjectProvider<ObjectMapper> objectMapperProvider, MessagingMessageConverter<Message> messageConverter) {
SqsTemplateBuilder builder = SqsTemplate.builder().sqsAsyncClient(sqsAsyncClient).messageConverter(messageConverter);
public SqsTemplate sqsTemplate(SqsAsyncClient sqsAsyncClient, ObjectProvider<ObjectMapper> objectMapperProvider,
MessagingMessageConverter<Message> messageConverter,
ObjectProvider<ObservationRegistry> observationRegistry) {
SqsTemplateBuilder builder = SqsTemplate.builder().sqsAsyncClient(sqsAsyncClient)
.messageConverter(messageConverter);
objectMapperProvider.ifAvailable(om -> setMapperToConverter(messageConverter, om));
if (sqsProperties.getQueueNotFoundStrategy() != null) {
builder.configure((options) -> options.queueNotFoundStrategy(sqsProperties.getQueueNotFoundStrategy()));
}
observationRegistry.ifAvailable(builder::observationRegistry);
return builder.build();
}

Expand All @@ -97,9 +107,9 @@ public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFac
ObjectProvider<SqsAsyncClient> sqsAsyncClient, ObjectProvider<AsyncErrorHandler<Object>> asyncErrorHandler,
ObjectProvider<ErrorHandler<Object>> errorHandler,
ObjectProvider<AsyncMessageInterceptor<Object>> asyncInterceptors,
ObjectProvider<MessageInterceptor<Object>> interceptors,
ObjectProvider<ObjectMapper> objectMapperProvider,
MessagingMessageConverter<?> messagingMessageConverter) {
ObjectProvider<MessageInterceptor<Object>> interceptors, ObjectProvider<ObjectMapper> objectMapperProvider,
MessagingMessageConverter<?> messagingMessageConverter,
ObjectProvider<ObservationRegistry> observationRegistry) {

SqsMessageListenerContainerFactory<Object> factory = new SqsMessageListenerContainerFactory<>();
factory.configure(this::configureProperties);
Expand All @@ -110,6 +120,8 @@ public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFac
asyncInterceptors.forEach(factory::addMessageInterceptor);
objectMapperProvider.ifAvailable(om -> setMapperToConverter(messagingMessageConverter, om));
factory.configure(options -> options.messageConverter(messagingMessageConverter));
observationRegistry.ifAvailable(factory::setObservationRegistry);

return factory;
}

Expand Down Expand Up @@ -144,4 +156,17 @@ public SqsListenerConfigurer objectMapperCustomizer(ObjectProvider<ObjectMapper>
};
}

@Configuration(proxyBeanMethods = false)
@ConditionalOnBean({ Tracer.class, Propagator.class })
public static class SqsTracingConfiguration {

@Bean
@ConditionalOnMissingBean
@Order(RECEIVER_TRACING_OBSERVATION_HANDLER_ORDER - 100)
public BatchMessageProcessTracingObservationHandler batchMessageProcessTracingObservationHandler(Tracer tracer,
Propagator propagator) {
return new BatchMessageProcessTracingObservationHandler(tracer, propagator);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import io.awspring.cloud.autoconfigure.AwsClientProperties;
import io.awspring.cloud.sqs.listener.QueueNotFoundStrategy;

import java.time.Duration;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.lang.Nullable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@
import io.awspring.cloud.sqs.QueueAttributesResolvingException;
import io.awspring.cloud.sqs.annotation.SqsListener;
import io.awspring.cloud.sqs.operations.SqsTemplate;
import org.testcontainers.shaded.org.bouncycastle.util.Arrays;
import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;

import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -41,7 +38,9 @@
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.shaded.org.bouncycastle.util.Arrays;
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;

/**
* Integration tests for {@link SqsAutoConfiguration}.
Expand All @@ -59,28 +58,26 @@ class SqsAutoConfigurationIntegrationTest {

@Container
static LocalStackContainer localstack = new LocalStackContainer(
DockerImageName.parse("localstack/localstack:3.2.0"));
DockerImageName.parse("localstack/localstack:3.2.0"));

static {
localstack.start();
}

private static final String[] BASE_PARAMS = {"spring.cloud.aws.sqs.region=eu-west-1",
"spring.cloud.aws.sqs.endpoint=" + localstack.getEndpoint(),
"spring.cloud.aws.credentials.access-key=noop", "spring.cloud.aws.credentials.secret-key=noop",
"spring.cloud.aws.region.static=eu-west-1"};
private static final String[] BASE_PARAMS = { "spring.cloud.aws.sqs.region=eu-west-1",
"spring.cloud.aws.sqs.endpoint=" + localstack.getEndpoint(), "spring.cloud.aws.credentials.access-key=noop",
"spring.cloud.aws.credentials.secret-key=noop", "spring.cloud.aws.region.static=eu-west-1" };

private static final AutoConfigurations BASE_CONFIGURATIONS = AutoConfigurations.of(RegionProviderAutoConfiguration.class,
CredentialsProviderAutoConfiguration.class, SqsAutoConfiguration.class, AwsAutoConfiguration.class,
ListenerConfiguration.class);
private static final AutoConfigurations BASE_CONFIGURATIONS = AutoConfigurations.of(
RegionProviderAutoConfiguration.class, CredentialsProviderAutoConfiguration.class,
SqsAutoConfiguration.class, AwsAutoConfiguration.class, ListenerConfiguration.class);

private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withPropertyValues(BASE_PARAMS)
.withConfiguration(BASE_CONFIGURATIONS);
.withPropertyValues(BASE_PARAMS).withConfiguration(BASE_CONFIGURATIONS);

private final ApplicationContextRunner applicationContextRunnerWithFailStrategy = new ApplicationContextRunner()
.withPropertyValues(Arrays.append(BASE_PARAMS, "spring.cloud.aws.sqs.queue-not-found-strategy=fail"))
.withConfiguration(BASE_CONFIGURATIONS);
.withPropertyValues(Arrays.append(BASE_PARAMS, "spring.cloud.aws.sqs.queue-not-found-strategy=fail"))
.withConfiguration(BASE_CONFIGURATIONS);

@SuppressWarnings("unchecked")
@Test
Expand All @@ -95,21 +92,22 @@ void sendsAndReceivesMessage() {

@Test
void containerReceivesMessageWithFailQueueNotFoundStrategy() {
applicationContextRunnerWithFailStrategy.run(context ->
assertThatThrownBy(() -> context.getBean(SqsTemplate.class).send(to -> to.queue("QUEUE_DOES_NOT_EXISTS").payload(PAYLOAD)))
.isInstanceOf(IllegalStateException.class).cause().isInstanceOf(ApplicationContextException.class).cause()
.isInstanceOf(CompletionException.class).cause().isInstanceOf(QueueAttributesResolvingException.class)
.cause().isInstanceOf(QueueDoesNotExistException.class));
applicationContextRunnerWithFailStrategy.run(context -> assertThatThrownBy(
() -> context.getBean(SqsTemplate.class).send(to -> to.queue("QUEUE_DOES_NOT_EXISTS").payload(PAYLOAD)))
.isInstanceOf(IllegalStateException.class).cause().isInstanceOf(ApplicationContextException.class)
.cause().isInstanceOf(CompletionException.class).cause()
.isInstanceOf(QueueAttributesResolvingException.class).cause()
.isInstanceOf(QueueDoesNotExistException.class));
}

@Test
void templateReceivesMessageWithFailQueueNotFoundStrategy() {
applicationContextRunnerWithFailStrategy
.run(context ->
assertThatThrownBy(() -> context.getBean(SqsTemplate.class).receive("QUEUE_DOES_NOT_EXIST", String.class))
.isInstanceOf(IllegalStateException.class).cause().isInstanceOf(ApplicationContextException.class).cause()
.isInstanceOf(CompletionException.class).cause().isInstanceOf(QueueAttributesResolvingException.class)
.cause().isInstanceOf(QueueDoesNotExistException.class));
applicationContextRunnerWithFailStrategy.run(context -> assertThatThrownBy(
() -> context.getBean(SqsTemplate.class).receive("QUEUE_DOES_NOT_EXIST", String.class))
.isInstanceOf(IllegalStateException.class).cause().isInstanceOf(ApplicationContextException.class)
.cause().isInstanceOf(CompletionException.class).cause()
.isInstanceOf(QueueAttributesResolvingException.class).cause()
.isInstanceOf(QueueDoesNotExistException.class));
}

static class Listener {
Expand Down
Loading

0 comments on commit 804f8cb

Please sign in to comment.