Skip to content

Commit

Permalink
Enhance SqsAutoConfiguration to use an available ObjectMapper for Sqs…
Browse files Browse the repository at this point in the history
…ContainerOptions

Fixes gh-697
  • Loading branch information
postalservice14 authored Oct 10, 2023
1 parent 2076908 commit d6d9dce
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.awspring.cloud.sqs.listener.interceptor.MessageInterceptor;
import io.awspring.cloud.sqs.operations.SqsTemplate;
import io.awspring.cloud.sqs.operations.SqsTemplateBuilder;
import io.awspring.cloud.sqs.support.converter.SqsMessagingMessageConverter;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
Expand Down Expand Up @@ -88,7 +89,8 @@ public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFac
ObjectProvider<SqsAsyncClient> sqsAsyncClient, ObjectProvider<AsyncErrorHandler<Object>> asyncErrorHandler,
ObjectProvider<ErrorHandler<Object>> errorHandler,
ObjectProvider<AsyncMessageInterceptor<Object>> asyncInterceptors,
ObjectProvider<MessageInterceptor<Object>> interceptors) {
ObjectProvider<MessageInterceptor<Object>> interceptors,
ObjectProvider<ObjectMapper> objectMapperProvider) {

SqsMessageListenerContainerFactory<Object> factory = new SqsMessageListenerContainerFactory<>();
factory.configure(this::configureContainerOptions);
Expand All @@ -97,6 +99,11 @@ public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFac
errorHandler.ifAvailable(factory::setErrorHandler);
interceptors.forEach(factory::addMessageInterceptor);
asyncInterceptors.forEach(factory::addMessageInterceptor);
objectMapperProvider.ifAvailable(objectMapper -> {
var messageConverter = new SqsMessagingMessageConverter();
messageConverter.setObjectMapper(objectMapper);
factory.configure(options -> options.messageConverter(messageConverter));
});
return factory;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.assertj.core.api.InstanceOfAssertFactories.type;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.awspring.cloud.autoconfigure.ConfiguredAwsClient;
import io.awspring.cloud.autoconfigure.core.AwsAutoConfiguration;
import io.awspring.cloud.autoconfigure.core.AwsClientCustomizer;
Expand All @@ -34,8 +35,10 @@
import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler;
import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor;
import io.awspring.cloud.sqs.operations.SqsTemplate;
import io.awspring.cloud.sqs.support.converter.SqsMessagingMessageConverter;
import java.net.URI;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.AutoConfigurations;
Expand All @@ -44,6 +47,8 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.lang.Nullable;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkClientOption;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
Expand Down Expand Up @@ -131,7 +136,7 @@ void configuresFactoryComponentsAndOptions() {
"spring.cloud.aws.sqs.listener.max-concurrent-messages:19",
"spring.cloud.aws.sqs.listener.max-messages-per-poll:8",
"spring.cloud.aws.sqs.listener.poll-timeout:6s")
.withUserConfiguration(CustomComponentsConfiguration.class).run(context -> {
.withUserConfiguration(CustomComponentsConfiguration.class, ObjectMapperConfiguration.class).run(context -> {
assertThat(context).hasSingleBean(SqsMessageListenerContainerFactory.class);
SqsMessageListenerContainerFactory<?> factory = context
.getBean(SqsMessageListenerContainerFactory.class);
Expand All @@ -146,9 +151,46 @@ void configuresFactoryComponentsAndOptions() {
assertThat(options.getMaxConcurrentMessages()).isEqualTo(19);
assertThat(options.getMaxMessagesPerPoll()).isEqualTo(8);
assertThat(options.getPollTimeout()).isEqualTo(Duration.ofSeconds(6));
});
})
.extracting("messageConverter")
.asInstanceOf(type(SqsMessagingMessageConverter.class))
.extracting("payloadMessageConverter")
.asInstanceOf(type(CompositeMessageConverter.class))
.extracting(CompositeMessageConverter::getConverters)
.isInstanceOfSatisfying(List.class, converters ->
assertThat(converters.get(1)).isInstanceOfSatisfying(
MappingJackson2MessageConverter.class,
jackson2MessageConverter ->
assertThat(jackson2MessageConverter.getObjectMapper().getRegisteredModuleIds()).contains("jackson-datatype-jsr310")));
});
}

@Test
void configuresFactoryComponentsAndOptionsWithDefaults() {
this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.enabled:true").run(context -> {
assertThat(context).hasSingleBean(SqsMessageListenerContainerFactory.class);
var factory = context.getBean(SqsMessageListenerContainerFactory.class);
assertThat(factory).hasFieldOrProperty("errorHandler").extracting("asyncMessageInterceptors").asList()
.isEmpty();
assertThat(factory).extracting("containerOptionsBuilder").asInstanceOf(type(ContainerOptionsBuilder.class))
.extracting(ContainerOptionsBuilder::build)
.isInstanceOfSatisfying(ContainerOptions.class, options -> {
assertThat(options.getMaxConcurrentMessages()).isEqualTo(10);
assertThat(options.getMaxMessagesPerPoll()).isEqualTo(10);
assertThat(options.getPollTimeout()).isEqualTo(Duration.ofSeconds(10));
})
.extracting("messageConverter")
.asInstanceOf(type(SqsMessagingMessageConverter.class))
.extracting("payloadMessageConverter")
.asInstanceOf(type(CompositeMessageConverter.class))
.extracting(CompositeMessageConverter::getConverters)
.isInstanceOfSatisfying(List.class, converters ->
assertThat(converters.get(1)).isInstanceOfSatisfying(
MappingJackson2MessageConverter.class,
jackson2MessageConverter ->
assertThat(jackson2MessageConverter.getObjectMapper().getRegisteredModuleIds()).isEmpty()));
});
}
// @formatter:on

@Test
Expand Down Expand Up @@ -185,7 +227,7 @@ static class ObjectMapperConfiguration {

@Bean(name = CUSTOM_OBJECT_MAPPER_BEAN_NAME)
ObjectMapper objectMapper() {
return new ObjectMapper();
return new ObjectMapper().registerModule(new JavaTimeModule());
}

}
Expand Down

0 comments on commit d6d9dce

Please sign in to comment.