Skip to content

Commit

Permalink
Enable SqsMessagingMessageConverter to handle JSON Strings with JSON …
Browse files Browse the repository at this point in the history
…Mime Type (#1195)

* Enable SqsMessagingMessageConverter to handle JSON strings configured with the Application_JSON header

* Polishing

Closes #1144

---------

Co-authored-by: daniel kim <daniel.d.kim@polarissharetech.com>
  • Loading branch information
tomazfernandes and DanielDonghaKim authored Aug 20, 2024
1 parent 8581ffa commit 57e94c2
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ void configuresFactoryComponentsAndOptions() {
.asInstanceOf(type(CompositeMessageConverter.class))
.extracting(CompositeMessageConverter::getConverters)
.isInstanceOfSatisfying(List.class, converters ->
assertThat(converters.get(1)).isInstanceOfSatisfying(
assertThat(converters.get(2)).isInstanceOfSatisfying(
MappingJackson2MessageConverter.class,
jackson2MessageConverter ->
assertThat(jackson2MessageConverter.getObjectMapper().getRegisteredModuleIds()).contains("jackson-datatype-jsr310")));
Expand All @@ -205,7 +205,7 @@ void configuresFactoryComponentsAndOptionsWithDefaults() {
.asInstanceOf(type(CompositeMessageConverter.class))
.extracting(CompositeMessageConverter::getConverters)
.isInstanceOfSatisfying(List.class, converters ->
assertThat(converters.get(1)).isInstanceOfSatisfying(
assertThat(converters.get(2)).isInstanceOfSatisfying(
MappingJackson2MessageConverter.class,
jackson2MessageConverter ->
assertThat(jackson2MessageConverter.getObjectMapper().getRegisteredModuleIds()).isEmpty()));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2022 the original author or authors.
* Copyright 2013-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,6 +31,8 @@
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.SimpleMessageConverter;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.messaging.converter.StringMessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
Expand All @@ -39,6 +41,8 @@
* Base {@link MessagingMessageConverter} implementation.
*
* @author Tomaz Fernandes
* @author Dongha Kim
*
* @since 3.0
* @see SqsHeaderMapper
* @see SqsMessageConversionContext
Expand Down Expand Up @@ -215,14 +219,17 @@ public MessageConversionContext createMessageConversionContext() {
public S fromMessagingMessage(Message<?> message, @Nullable MessageConversionContext context) {
// We must make sure the message id stays consistent throughout this process
MessageHeaders headers = getMessageHeaders(message);
Message<?> convertedMessage = Objects.requireNonNull(
this.payloadMessageConverter.toMessage(message.getPayload(), message.getHeaders()),
() -> "payloadMessageConverter returned null message for message " + message);
Message<?> convertedMessage = convertPayload(message, message.getPayload());
MessageHeaders completeHeaders = MessageHeaderUtils.addHeadersIfAbsent(headers, convertedMessage.getHeaders());
S messageWithHeaders = this.headerMapper.fromHeaders(completeHeaders);
return doConvertMessage(messageWithHeaders, convertedMessage.getPayload());
}

private Message<?> convertPayload(Message<?> message, Object payload) {
return Objects.requireNonNull(this.payloadMessageConverter.toMessage(payload, message.getHeaders()),
() -> "payloadMessageConverter returned null message for message " + message);
}

private MessageHeaders getMessageHeaders(Message<?> message) {
String typeHeaderName = this.payloadTypeHeaderFunction.apply(message);
return typeHeaderName != null
Expand All @@ -234,11 +241,18 @@ private MessageHeaders getMessageHeaders(Message<?> message) {

private CompositeMessageConverter createDefaultCompositeMessageConverter() {
List<MessageConverter> messageConverters = new ArrayList<>();
messageConverters.add(createClassMatchingMessageConverter());
messageConverters.add(createStringMessageConverter());
messageConverters.add(createDefaultMappingJackson2MessageConverter());
return new CompositeMessageConverter(messageConverters);
}

private SimpleClassMatchingMessageConverter createClassMatchingMessageConverter() {
SimpleClassMatchingMessageConverter matchingMessageConverter = new SimpleClassMatchingMessageConverter();
matchingMessageConverter.setSerializedPayloadClass(String.class);
return matchingMessageConverter;
}

private StringMessageConverter createStringMessageConverter() {
StringMessageConverter stringMessageConverter = new StringMessageConverter();
stringMessageConverter.setSerializedPayloadClass(String.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.awspring.cloud.sqs.support.converter;

import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.AbstractMessageConverter;
import org.springframework.messaging.converter.SmartMessageConverter;

/**
* {@link SmartMessageConverter} implementation that returns the payload unchanged if the target class
* for Serialization / Deserialization matches the payload class.
*
* @author Tomaz Fernandes
* @since 3.3
*/
public class SimpleClassMatchingMessageConverter extends AbstractMessageConverter {

@Override
protected boolean supports(Class<?> clazz) {
return true;
}

@Nullable
@Override
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, @Nullable Object conversionHint) {
Object payload = message.getPayload();
return payload.getClass().isAssignableFrom(targetClass) ? payload : null;
}

@Nullable
@Override
protected Object convertToInternal(Object payload, @Nullable MessageHeaders headers, @Nullable Object conversionHint) {
return payload.getClass().isAssignableFrom(getSerializedPayloadClass()) ? payload : null;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2022 the original author or authors.
* Copyright 2013-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,7 @@
* {@link software.amazon.awssdk.services.sqs.model.Message} instances to Spring Messaging {@link Message} instances.
*
* @author Tomaz Fernandes
* @author Dongha kim
* @since 3.0
* @see SqsHeaderMapper
* @see SqsMessageConversionContext
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2022 the original author or authors.
* Copyright 2013-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -46,6 +46,7 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
Expand All @@ -58,6 +59,7 @@
*
* @author Tomaz Fernandes
* @author Mikhail Strokov
* @author Dongha Kim
* @author Wei Jiang
*/
@SpringBootTest
Expand Down Expand Up @@ -172,6 +174,20 @@ private Map<String, Object> getHeaderMapping(Class<?> clazz) {
return Collections.singletonMap(SqsHeaders.SQS_DEFAULT_TYPE_HEADER, clazz.getName());
}

@Test
void shouldSendAndReceiveJsonString() throws Exception {
String messageBody = """
{
"firstField": "hello",
"secondField": "sqs!"
}
""";
sqsTemplate.send(to -> to.queue(RESOLVES_POJO_TYPES_QUEUE_NAME).payload(messageBody).header(MessageHeaders.CONTENT_TYPE, "application/json"));
logger.debug("Sent message to queue {} with messageBody {}", RESOLVES_POJO_TYPES_QUEUE_NAME, messageBody);
assertThat(latchContainer.resolvesPojoLatch.await(10, TimeUnit.SECONDS)).isTrue();
}


static class ResolvesPojoListener {

@Autowired
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2023 the original author or authors.
* Copyright 2013-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,24 +15,10 @@
*/
package io.awspring.cloud.sqs.integration;

import static org.assertj.core.api.Assertions.assertThat;

import io.awspring.cloud.sqs.listener.SqsHeaders;
import io.awspring.cloud.sqs.listener.acknowledgement.Acknowledgement;
import io.awspring.cloud.sqs.operations.SendResult;
import io.awspring.cloud.sqs.operations.SqsOperations;
import io.awspring.cloud.sqs.operations.SqsTemplate;
import io.awspring.cloud.sqs.operations.SqsTemplateParameters;
import io.awspring.cloud.sqs.operations.TemplateAcknowledgementMode;
import io.awspring.cloud.sqs.operations.*;
import io.awspring.cloud.sqs.support.converter.AbstractMessagingMessageConverter;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.IntStream;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
Expand All @@ -41,13 +27,22 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.StopWatch;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.stream.IntStream;

import static org.assertj.core.api.Assertions.assertThat;

/**
* @author Tomaz Fernandes
* @author Dongha Kim
*/
@SpringBootTest
public class SqsTemplateIntegrationTests extends BaseSqsIntegrationTest {
Expand Down Expand Up @@ -78,6 +73,8 @@ public class SqsTemplateIntegrationTests extends BaseSqsIntegrationTest {

private static final String HANDLES_CONTENT_DEDUPLICATION_QUEUE_NAME = "handles-content-deduplication-queue.fifo";

private static final String SENDS_AND_RECEIVES_JSON_MESSAGE_QUEUE_NAME = "send-receive-json-message-queue";

@Autowired
private SqsAsyncClient asyncClient;

Expand All @@ -92,7 +89,9 @@ static void beforeTests() {
createQueue(client, RECORD_WITHOUT_TYPE_HEADER_QUEUE_NAME),
createQueue(client, RETURNS_ON_PARTIAL_BATCH_QUEUE_NAME),
createQueue(client, THROWS_ON_PARTIAL_BATCH_QUEUE_NAME),
createQueue(client, SENDS_AND_RECEIVES_MANUAL_ACK_QUEUE_NAME), createQueue(client, EMPTY_QUEUE_NAME),
createQueue(client, SENDS_AND_RECEIVES_JSON_MESSAGE_QUEUE_NAME),
createQueue(client, SENDS_AND_RECEIVES_MANUAL_ACK_QUEUE_NAME),
createQueue(client, EMPTY_QUEUE_NAME),
createFifoQueue(client, SENDS_AND_RECEIVES_MESSAGE_FIFO_QUEUE_NAME),
createFifoQueue(client, SENDS_AND_RECEIVES_BATCH_FIFO_QUEUE_NAME),
createFifoQueue(client, HANDLES_CONTENT_DEDUPLICATION_QUEUE_NAME,
Expand Down Expand Up @@ -187,6 +186,27 @@ void shouldSendAndReceiveWithManualAcknowledgement() {
assertThat(receivedMessage3).isEmpty();
}

@Test
void shouldSendAndReceiveJsonString() {
SqsOperations template = SqsTemplate.builder()
.sqsAsyncClient(this.asyncClient)
.configureDefaultConverter(AbstractMessagingMessageConverter::doNotSendPayloadTypeHeader)
.buildSyncTemplate();
String jsonString = """
{
"propertyOne": "hello",
"propertyTwo": "sqs!"
}
""";
SampleRecord expectedPayload = new SampleRecord("hello", "sqs!");
SendResult<Object> result = template.send(to -> to.queue(SENDS_AND_RECEIVES_JSON_MESSAGE_QUEUE_NAME)
.payload(jsonString).header(MessageHeaders.CONTENT_TYPE, "application/json"));
assertThat(result).isNotNull();
Optional<Message<SampleRecord>> receivedMessage = template
.receive(from -> from.queue(SENDS_AND_RECEIVES_JSON_MESSAGE_QUEUE_NAME), SampleRecord.class);
assertThat(receivedMessage).isPresent().get().extracting(Message::getPayload).isEqualTo(expectedPayload);
}

@Test
void shouldSendAndReceiveBatch() {
SqsOperations template = SqsTemplate.builder().sqsAsyncClient(this.asyncClient)
Expand Down

0 comments on commit 57e94c2

Please sign in to comment.