diff --git a/pom.xml b/pom.xml
index 758ff769..00362400 100644
--- a/pom.xml
+++ b/pom.xml
@@ -8,7 +8,7 @@
../../microservice-service-parent/pom.xml
spring-boot-starter-datawave-query
- 1.0.7-SNAPSHOT
+ 1.0.7
https://github.com/NationalSecurityAgency/datawave-spring-boot-starter-query
@@ -27,7 +27,7 @@
7.3.0
31.0.1-jre
4.0.2
- 4.0.2
+ 4.0.3
3.0.2
3.0.3
diff --git a/src/main/java/datawave/microservice/query/config/QueryStarterConfiguration.java b/src/main/java/datawave/microservice/query/config/QueryStarterConfiguration.java
index 461dc721..89c870bd 100644
--- a/src/main/java/datawave/microservice/query/config/QueryStarterConfiguration.java
+++ b/src/main/java/datawave/microservice/query/config/QueryStarterConfiguration.java
@@ -1,12 +1,24 @@
package datawave.microservice.query.config;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.cache.caffeine.CaffeineCacheManager;
+import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import com.github.benmanes.caffeine.cache.CaffeineSpec;
+
import datawave.microservice.query.edge.config.EdgeDictionaryProviderProperties;
import datawave.microservice.query.mapreduce.config.MapReduceQueryProperties;
import datawave.microservice.query.stream.StreamingProperties;
@Configuration
@EnableConfigurationProperties({QueryProperties.class, MapReduceQueryProperties.class, StreamingProperties.class, EdgeDictionaryProviderProperties.class})
-public class QueryStarterConfiguration {}
+public class QueryStarterConfiguration {
+
+ @Bean
+ public CaffeineCacheManager dateIndexHelperCacheManager() {
+ CaffeineCacheManager caffeineCacheManager = new CaffeineCacheManager();
+ caffeineCacheManager.setCaffeineSpec(CaffeineSpec.parse("maximumSize=1000, expireAfterAccess=24h, expireAfterWrite=24h"));
+ return caffeineCacheManager;
+ }
+}
diff --git a/src/main/java/datawave/microservice/query/messaging/config/MessagingProperties.java b/src/main/java/datawave/microservice/query/messaging/config/MessagingProperties.java
index a6b54ef3..c4c01a3f 100644
--- a/src/main/java/datawave/microservice/query/messaging/config/MessagingProperties.java
+++ b/src/main/java/datawave/microservice/query/messaging/config/MessagingProperties.java
@@ -1,16 +1,16 @@
package datawave.microservice.query.messaging.config;
+import static datawave.microservice.query.messaging.config.MessagingProperties.AutoOffsetReset.EARLIEST;
import static datawave.microservice.query.messaging.hazelcast.HazelcastQueryResultsManager.HAZELCAST;
import static datawave.microservice.query.messaging.kafka.KafkaQueryResultsManager.KAFKA;
-import java.util.concurrent.TimeUnit;
-
import javax.validation.Valid;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Positive;
import javax.validation.constraints.PositiveOrZero;
+import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.validation.annotation.Validated;
@@ -97,6 +97,9 @@ public static final class KafkaProperties {
private int replicas = -1;
+ private boolean useDedicatedInstance = true;
+ private KafkaInstanceSettings instanceSettings = new KafkaInstanceSettings();
+
public long getPollTimeoutMillis() {
return pollTimeoutMillis;
}
@@ -128,6 +131,22 @@ public int getReplicas() {
public void setReplicas(int replicas) {
this.replicas = replicas;
}
+
+ public boolean isUseDedicatedInstance() {
+ return useDedicatedInstance;
+ }
+
+ public void setUseDedicatedInstance(boolean useDedicatedInstance) {
+ this.useDedicatedInstance = useDedicatedInstance;
+ }
+
+ public KafkaInstanceSettings getInstanceSettings() {
+ return instanceSettings;
+ }
+
+ public void setInstanceSettings(KafkaInstanceSettings instanceSettings) {
+ this.instanceSettings = instanceSettings;
+ }
}
public final static class RabbitMQProperties {
@@ -139,6 +158,9 @@ public final static class RabbitMQProperties {
@Positive
private long maxMessageSizeBytes = 536870912L;
+ private boolean useDedicatedInstance = false;
+ private RabbitMQInstanceSettings instanceSettings = new RabbitMQInstanceSettings();
+
public boolean isDurable() {
return durable;
}
@@ -154,6 +176,140 @@ public long getMaxMessageSizeBytes() {
public void setMaxMessageSizeBytes(long maxMessageSizeBytes) {
this.maxMessageSizeBytes = maxMessageSizeBytes;
}
+
+ public boolean isUseDedicatedInstance() {
+ return useDedicatedInstance;
+ }
+
+ public void setUseDedicatedInstance(boolean useDedicatedInstance) {
+ this.useDedicatedInstance = useDedicatedInstance;
+ }
+
+ public RabbitMQInstanceSettings getInstanceSettings() {
+ return instanceSettings;
+ }
+
+ public void setInstanceSettings(RabbitMQInstanceSettings instanceSettings) {
+ this.instanceSettings = instanceSettings;
+ }
+ }
+
+ public final static class RabbitMQInstanceSettings {
+ private String host = null;
+ private int port = 5672;
+ private String username = null;
+ private String password = null;
+ private String virtualHost = null;
+ private CachingConnectionFactory.ConfirmType publisherConfirmType = CachingConnectionFactory.ConfirmType.SIMPLE;
+ private boolean publisherConfirms = true;
+ private boolean publisherReturns = true;
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getVirtualHost() {
+ return virtualHost;
+ }
+
+ public void setVirtualHost(String virtualHost) {
+ this.virtualHost = virtualHost;
+ }
+
+ public CachingConnectionFactory.ConfirmType getPublisherConfirmType() {
+ return publisherConfirmType;
+ }
+
+ public void setPublisherConfirmType(CachingConnectionFactory.ConfirmType publisherConfirmType) {
+ this.publisherConfirmType = publisherConfirmType;
+ }
+
+ public boolean isPublisherConfirms() {
+ return publisherConfirms;
+ }
+
+ public void setPublisherConfirms(boolean publisherConfirms) {
+ this.publisherConfirms = publisherConfirms;
+ }
+
+ public boolean isPublisherReturns() {
+ return publisherReturns;
+ }
+
+ public void setPublisherReturns(boolean publisherReturns) {
+ this.publisherReturns = publisherReturns;
+ }
+ }
+
+ public final static class KafkaInstanceSettings {
+ private String bootstrapServers = null;
+ private AutoOffsetReset autoOffsetReset = EARLIEST;
+ private Boolean enableAutoCommit = false;
+ private Boolean allowAutoCreateTopics = false;
+
+ public String getBootstrapServers() {
+ return bootstrapServers;
+ }
+
+ public void setBootstrapServers(String bootstrapServers) {
+ this.bootstrapServers = bootstrapServers;
+ }
+
+ public AutoOffsetReset getAutoOffsetReset() {
+ return autoOffsetReset;
+ }
+
+ public void setAutoOffsetReset(AutoOffsetReset autoOffsetReset) {
+ this.autoOffsetReset = autoOffsetReset;
+ }
+
+ public Boolean isEnableAutoCommit() {
+ return enableAutoCommit;
+ }
+
+ public void setEnableAutoCommit(Boolean enableAutoCommit) {
+ this.enableAutoCommit = enableAutoCommit;
+ }
+
+ public Boolean isAllowAutoCreateTopics() {
+ return allowAutoCreateTopics;
+ }
+
+ public void setAllowAutoCreateTopics(Boolean allowAutoCreateTopics) {
+ this.allowAutoCreateTopics = allowAutoCreateTopics;
+ }
+ }
+
+ public enum AutoOffsetReset {
+ EARLIEST, LATEST, NONE, ANYTHING
}
public final static class HazelcastProperties {
diff --git a/src/main/java/datawave/microservice/query/messaging/kafka/KafkaQueryResultsManager.java b/src/main/java/datawave/microservice/query/messaging/kafka/KafkaQueryResultsManager.java
index 8b624823..f82c249e 100644
--- a/src/main/java/datawave/microservice/query/messaging/kafka/KafkaQueryResultsManager.java
+++ b/src/main/java/datawave/microservice/query/messaging/kafka/KafkaQueryResultsManager.java
@@ -1,7 +1,5 @@
package datawave.microservice.query.messaging.kafka;
-import static datawave.microservice.query.messaging.kafka.KafkaQueryResultsManager.KAFKA;
-
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -16,21 +14,16 @@
import org.apache.kafka.common.TopicPartitionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.ConsumerFactory;
-import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
-import org.springframework.stereotype.Component;
import datawave.microservice.query.messaging.QueryResultsListener;
import datawave.microservice.query.messaging.QueryResultsManager;
import datawave.microservice.query.messaging.QueryResultsPublisher;
import datawave.microservice.query.messaging.config.MessagingProperties;
-@Component
-@ConditionalOnProperty(name = "datawave.query.messaging.backend", havingValue = KAFKA)
public class KafkaQueryResultsManager implements QueryResultsManager {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@@ -43,12 +36,12 @@ public class KafkaQueryResultsManager implements QueryResultsManager {
private final ProducerFactory kafkaProducerFactory;
private final ConsumerFactory kafkaConsumerFactory;
- public KafkaQueryResultsManager(MessagingProperties messagingProperties, KafkaAdmin adminClient, ProducerFactory kafkaProducerFactory,
- ConsumerFactory kafkaConsumerFactory) {
+ public KafkaQueryResultsManager(MessagingProperties messagingProperties, AdminClient queryKafkaAdminClient,
+ ProducerFactory queryKafkaProducerFactory, ConsumerFactory queryKafkaConsumerFactory) {
this.messagingProperties = messagingProperties;
- this.adminClient = AdminClient.create(adminClient.getConfigurationProperties());
- this.kafkaProducerFactory = kafkaProducerFactory;
- this.kafkaConsumerFactory = kafkaConsumerFactory;
+ this.adminClient = queryKafkaAdminClient;
+ this.kafkaProducerFactory = queryKafkaProducerFactory;
+ this.kafkaConsumerFactory = queryKafkaConsumerFactory;
}
/**
@@ -66,7 +59,7 @@ public QueryResultsListener createListener(String listenerId, String queryId) {
/**
* Create a publisher for a specific query id.
- *
+ *
* @param queryId
* The query ID to publish to
* @return a query result publisher
diff --git a/src/main/java/datawave/microservice/query/messaging/kafka/config/KafkaMessagingConfiguration.java b/src/main/java/datawave/microservice/query/messaging/kafka/config/KafkaMessagingConfiguration.java
new file mode 100644
index 00000000..9fd6fd28
--- /dev/null
+++ b/src/main/java/datawave/microservice/query/messaging/kafka/config/KafkaMessagingConfiguration.java
@@ -0,0 +1,117 @@
+package datawave.microservice.query.messaging.kafka.config;
+
+import static datawave.microservice.query.messaging.kafka.KafkaQueryResultsManager.KAFKA;
+import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaAdmin;
+import org.springframework.kafka.core.ProducerFactory;
+
+import datawave.microservice.query.messaging.QueryResultsManager;
+import datawave.microservice.query.messaging.config.MessagingProperties;
+import datawave.microservice.query.messaging.kafka.KafkaQueryResultsManager;
+
+@Configuration
+@ConditionalOnProperty(name = "datawave.query.messaging.backend", havingValue = KAFKA)
+public class KafkaMessagingConfiguration {
+
+ @Bean
+ public QueryResultsManager kafkaQueryResultsManager(MessagingProperties messagingProperties, @Autowired(required = false) KafkaAdmin kafkaAdmin,
+ @Autowired(required = false) ProducerFactory kafkaProducerFactory,
+ @Autowired(required = false) ConsumerFactory kafkaConsumerFactory) {
+ Map kafkaConfigProps = createKafkaConfigProps(messagingProperties);
+ // @formatter:off
+ return new KafkaQueryResultsManager(
+ messagingProperties,
+ createAdminClient(messagingProperties, kafkaConfigProps, kafkaAdmin),
+ createProducerFactory(messagingProperties, kafkaConfigProps, kafkaProducerFactory),
+ createConsumerFactory(messagingProperties, kafkaConfigProps, kafkaConsumerFactory));
+ // @formatter:on
+ }
+
+ public Map createKafkaConfigProps(MessagingProperties messagingProperties) {
+ MessagingProperties.KafkaInstanceSettings instanceSettings = messagingProperties.getKafka().getInstanceSettings();
+
+ Map configProps = new HashMap<>();
+ if (instanceSettings.getBootstrapServers() != null) {
+ configProps.put(BOOTSTRAP_SERVERS_CONFIG, instanceSettings.getBootstrapServers());
+ }
+
+ if (instanceSettings.getAutoOffsetReset() != null) {
+ configProps.put(AUTO_OFFSET_RESET_CONFIG, instanceSettings.getAutoOffsetReset().name().toLowerCase());
+ }
+
+ if (instanceSettings.isEnableAutoCommit() != null) {
+ configProps.put(ENABLE_AUTO_COMMIT_CONFIG, instanceSettings.isEnableAutoCommit());
+ }
+
+ if (instanceSettings.isAllowAutoCreateTopics() != null) {
+ configProps.put(ALLOW_AUTO_CREATE_TOPICS_CONFIG, instanceSettings.isAllowAutoCreateTopics());
+ }
+
+ return configProps;
+ }
+
+ public AdminClient createAdminClient(MessagingProperties messagingProperties, Map queryKafkaConfigProps, KafkaAdmin kafkaAdmin) {
+ AdminClient finalClient = null;
+
+ if (messagingProperties.getKafka().isUseDedicatedInstance()) {
+ finalClient = AdminClient.create(queryKafkaConfigProps);
+ } else {
+ finalClient = AdminClient.create(kafkaAdmin.getConfigurationProperties());
+ }
+
+ return finalClient;
+ }
+
+ public ProducerFactory createProducerFactory(MessagingProperties messagingProperties, Map queryKafkaConfigProps,
+ ProducerFactory kafkaProducerFactory) {
+ ProducerFactory finalKafkaProducerFactory = null;
+
+ if (messagingProperties.getKafka().isUseDedicatedInstance()) {
+ // @formatter:off
+ finalKafkaProducerFactory = new DefaultKafkaProducerFactory<>(
+ queryKafkaConfigProps,
+ new StringSerializer(),
+ new StringSerializer());
+ // @formatter:on
+ } else {
+ finalKafkaProducerFactory = kafkaProducerFactory;
+ }
+
+ return finalKafkaProducerFactory;
+ }
+
+ public ConsumerFactory createConsumerFactory(MessagingProperties messagingProperties, Map queryKafkaConfigProps,
+ ConsumerFactory kafkaConsumerFactory) {
+ ConsumerFactory finalKafkaConsumerFactory = null;
+
+ if (messagingProperties.getKafka().isUseDedicatedInstance()) {
+ // @formatter:off
+ finalKafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(
+ queryKafkaConfigProps,
+ new StringDeserializer(),
+ new StringDeserializer());
+ // @formatter:on
+ } else {
+ finalKafkaConsumerFactory = kafkaConsumerFactory;
+ }
+
+ return finalKafkaConsumerFactory;
+ }
+}
diff --git a/src/main/java/datawave/microservice/query/messaging/rabbitmq/RabbitMQQueryResultsManager.java b/src/main/java/datawave/microservice/query/messaging/rabbitmq/RabbitMQQueryResultsManager.java
index d0a91ac0..1850c62b 100644
--- a/src/main/java/datawave/microservice/query/messaging/rabbitmq/RabbitMQQueryResultsManager.java
+++ b/src/main/java/datawave/microservice/query/messaging/rabbitmq/RabbitMQQueryResultsManager.java
@@ -1,7 +1,5 @@
package datawave.microservice.query.messaging.rabbitmq;
-import static datawave.microservice.query.messaging.rabbitmq.RabbitMQQueryResultsManager.RABBITMQ;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpIOException;
@@ -15,9 +13,6 @@
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.stereotype.Component;
import datawave.microservice.query.messaging.ClaimCheck;
import datawave.microservice.query.messaging.QueryResultsListener;
@@ -25,8 +20,6 @@
import datawave.microservice.query.messaging.QueryResultsPublisher;
import datawave.microservice.query.messaging.config.MessagingProperties;
-@Component
-@ConditionalOnProperty(name = "datawave.query.messaging.backend", havingValue = RABBITMQ)
public class RabbitMQQueryResultsManager implements QueryResultsManager {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@@ -44,15 +37,15 @@ public class RabbitMQQueryResultsManager implements QueryResultsManager {
private final DirectRabbitListenerContainerFactory listenerContainerFactory;
public RabbitMQQueryResultsManager(MessagingProperties messagingProperties, RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry,
- CachingConnectionFactory connectionFactory, @Autowired(required = false) ClaimCheck claimCheck) {
+ CachingConnectionFactory cachingConnectionFactory, ClaimCheck claimCheck) {
this.messagingProperties = messagingProperties;
this.rabbitListenerEndpointRegistry = rabbitListenerEndpointRegistry;
- this.connectionFactory = connectionFactory;
+ this.connectionFactory = cachingConnectionFactory;
this.claimCheck = claimCheck;
- rabbitAdmin = new RabbitAdmin(connectionFactory);
+ rabbitAdmin = new RabbitAdmin(cachingConnectionFactory);
listenerContainerFactory = new DirectRabbitListenerContainerFactory();
- listenerContainerFactory.setConnectionFactory(connectionFactory);
+ listenerContainerFactory.setConnectionFactory(cachingConnectionFactory);
listenerContainerFactory.setConsumersPerQueue(messagingProperties.getConcurrency());
}
diff --git a/src/main/java/datawave/microservice/query/messaging/rabbitmq/config/RabbitMQMessagingConfiguration.java b/src/main/java/datawave/microservice/query/messaging/rabbitmq/config/RabbitMQMessagingConfiguration.java
new file mode 100644
index 00000000..2caa752f
--- /dev/null
+++ b/src/main/java/datawave/microservice/query/messaging/rabbitmq/config/RabbitMQMessagingConfiguration.java
@@ -0,0 +1,75 @@
+package datawave.microservice.query.messaging.rabbitmq.config;
+
+import static datawave.microservice.query.messaging.rabbitmq.RabbitMQQueryResultsManager.RABBITMQ;
+
+import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
+import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import com.rabbitmq.client.ConnectionFactory;
+
+import datawave.microservice.query.messaging.ClaimCheck;
+import datawave.microservice.query.messaging.QueryResultsManager;
+import datawave.microservice.query.messaging.config.MessagingProperties;
+import datawave.microservice.query.messaging.rabbitmq.RabbitMQQueryResultsManager;
+
+@Configuration
+@ConditionalOnProperty(name = "datawave.query.messaging.backend", havingValue = RABBITMQ)
+public class RabbitMQMessagingConfiguration {
+
+ @Bean
+ public QueryResultsManager rabbitMQQueryResultsManager(MessagingProperties messagingProperties,
+ RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry, @Autowired(required = false) CachingConnectionFactory connectionFactory,
+ @Autowired(required = false) ClaimCheck claimCheck) {
+ // @formatter:off
+ return new RabbitMQQueryResultsManager(
+ messagingProperties,
+ rabbitListenerEndpointRegistry,
+ createCachingConnectionFactory(messagingProperties, connectionFactory),
+ claimCheck);
+ // @formatter:on
+ }
+
+ public CachingConnectionFactory createCachingConnectionFactory(MessagingProperties messagingProperties, CachingConnectionFactory connectionFactory) {
+ CachingConnectionFactory finalConnectionFactory = connectionFactory;
+
+ if (messagingProperties.getRabbitmq().isUseDedicatedInstance()) {
+ MessagingProperties.RabbitMQInstanceSettings rabbitMqProperties = messagingProperties.getRabbitmq().getInstanceSettings();
+
+ ConnectionFactory dedicatedConnectionFactory = new ConnectionFactory();
+
+ if (rabbitMqProperties.getHost() != null) {
+ dedicatedConnectionFactory.setHost(rabbitMqProperties.getHost());
+ }
+
+ if (rabbitMqProperties.getPassword() != null) {
+ dedicatedConnectionFactory.setPort(rabbitMqProperties.getPort());
+ }
+
+ if (rabbitMqProperties.getUsername() != null) {
+ dedicatedConnectionFactory.setUsername(rabbitMqProperties.getUsername());
+ }
+
+ if (rabbitMqProperties.getPassword() != null) {
+ dedicatedConnectionFactory.setPassword(rabbitMqProperties.getPassword());
+ }
+
+ if (rabbitMqProperties.getVirtualHost() != null) {
+ dedicatedConnectionFactory.setVirtualHost(rabbitMqProperties.getVirtualHost());
+ }
+
+ finalConnectionFactory = new CachingConnectionFactory(dedicatedConnectionFactory);
+ if (rabbitMqProperties.getPublisherConfirmType() != null) {
+ finalConnectionFactory.setPublisherConfirmType(rabbitMqProperties.getPublisherConfirmType());
+ }
+
+ finalConnectionFactory.setPublisherConfirms(CachingConnectionFactory.ConfirmType.SIMPLE != rabbitMqProperties.getPublisherConfirmType()
+ && rabbitMqProperties.isPublisherConfirms());
+ finalConnectionFactory.setPublisherReturns(rabbitMqProperties.isPublisherReturns());
+ }
+ return finalConnectionFactory;
+ }
+}