From ccf469a07df2fa234d3eb38b3a2670c67d031b11 Mon Sep 17 00:00:00 2001 From: Whitney O'Meara Date: Sun, 22 Sep 2024 01:50:59 +0000 Subject: [PATCH] 1.0.7 --- pom.xml | 4 +- .../config/QueryStarterConfiguration.java | 14 +- .../messaging/config/MessagingProperties.java | 160 +++++++++++++++++- .../kafka/KafkaQueryResultsManager.java | 19 +-- .../config/KafkaMessagingConfiguration.java | 117 +++++++++++++ .../rabbitmq/RabbitMQQueryResultsManager.java | 15 +- .../RabbitMQMessagingConfiguration.java | 75 ++++++++ 7 files changed, 375 insertions(+), 29 deletions(-) create mode 100644 src/main/java/datawave/microservice/query/messaging/kafka/config/KafkaMessagingConfiguration.java create mode 100644 src/main/java/datawave/microservice/query/messaging/rabbitmq/config/RabbitMQMessagingConfiguration.java diff --git a/pom.xml b/pom.xml index 758ff769..00362400 100644 --- a/pom.xml +++ b/pom.xml @@ -8,7 +8,7 @@ ../../microservice-service-parent/pom.xml spring-boot-starter-datawave-query - 1.0.7-SNAPSHOT + 1.0.7 https://github.com/NationalSecurityAgency/datawave-spring-boot-starter-query @@ -27,7 +27,7 @@ 7.3.0 31.0.1-jre 4.0.2 - 4.0.2 + 4.0.3 3.0.2 3.0.3 diff --git a/src/main/java/datawave/microservice/query/config/QueryStarterConfiguration.java b/src/main/java/datawave/microservice/query/config/QueryStarterConfiguration.java index 461dc721..89c870bd 100644 --- a/src/main/java/datawave/microservice/query/config/QueryStarterConfiguration.java +++ b/src/main/java/datawave/microservice/query/config/QueryStarterConfiguration.java @@ -1,12 +1,24 @@ package datawave.microservice.query.config; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.cache.caffeine.CaffeineCacheManager; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import com.github.benmanes.caffeine.cache.CaffeineSpec; + import datawave.microservice.query.edge.config.EdgeDictionaryProviderProperties; import datawave.microservice.query.mapreduce.config.MapReduceQueryProperties; import datawave.microservice.query.stream.StreamingProperties; @Configuration @EnableConfigurationProperties({QueryProperties.class, MapReduceQueryProperties.class, StreamingProperties.class, EdgeDictionaryProviderProperties.class}) -public class QueryStarterConfiguration {} +public class QueryStarterConfiguration { + + @Bean + public CaffeineCacheManager dateIndexHelperCacheManager() { + CaffeineCacheManager caffeineCacheManager = new CaffeineCacheManager(); + caffeineCacheManager.setCaffeineSpec(CaffeineSpec.parse("maximumSize=1000, expireAfterAccess=24h, expireAfterWrite=24h")); + return caffeineCacheManager; + } +} diff --git a/src/main/java/datawave/microservice/query/messaging/config/MessagingProperties.java b/src/main/java/datawave/microservice/query/messaging/config/MessagingProperties.java index a6b54ef3..c4c01a3f 100644 --- a/src/main/java/datawave/microservice/query/messaging/config/MessagingProperties.java +++ b/src/main/java/datawave/microservice/query/messaging/config/MessagingProperties.java @@ -1,16 +1,16 @@ package datawave.microservice.query.messaging.config; +import static datawave.microservice.query.messaging.config.MessagingProperties.AutoOffsetReset.EARLIEST; import static datawave.microservice.query.messaging.hazelcast.HazelcastQueryResultsManager.HAZELCAST; import static datawave.microservice.query.messaging.kafka.KafkaQueryResultsManager.KAFKA; -import java.util.concurrent.TimeUnit; - import javax.validation.Valid; import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; import javax.validation.constraints.Positive; import javax.validation.constraints.PositiveOrZero; +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.validation.annotation.Validated; @@ -97,6 +97,9 @@ public static final class KafkaProperties { private int replicas = -1; + private boolean useDedicatedInstance = true; + private KafkaInstanceSettings instanceSettings = new KafkaInstanceSettings(); + public long getPollTimeoutMillis() { return pollTimeoutMillis; } @@ -128,6 +131,22 @@ public int getReplicas() { public void setReplicas(int replicas) { this.replicas = replicas; } + + public boolean isUseDedicatedInstance() { + return useDedicatedInstance; + } + + public void setUseDedicatedInstance(boolean useDedicatedInstance) { + this.useDedicatedInstance = useDedicatedInstance; + } + + public KafkaInstanceSettings getInstanceSettings() { + return instanceSettings; + } + + public void setInstanceSettings(KafkaInstanceSettings instanceSettings) { + this.instanceSettings = instanceSettings; + } } public final static class RabbitMQProperties { @@ -139,6 +158,9 @@ public final static class RabbitMQProperties { @Positive private long maxMessageSizeBytes = 536870912L; + private boolean useDedicatedInstance = false; + private RabbitMQInstanceSettings instanceSettings = new RabbitMQInstanceSettings(); + public boolean isDurable() { return durable; } @@ -154,6 +176,140 @@ public long getMaxMessageSizeBytes() { public void setMaxMessageSizeBytes(long maxMessageSizeBytes) { this.maxMessageSizeBytes = maxMessageSizeBytes; } + + public boolean isUseDedicatedInstance() { + return useDedicatedInstance; + } + + public void setUseDedicatedInstance(boolean useDedicatedInstance) { + this.useDedicatedInstance = useDedicatedInstance; + } + + public RabbitMQInstanceSettings getInstanceSettings() { + return instanceSettings; + } + + public void setInstanceSettings(RabbitMQInstanceSettings instanceSettings) { + this.instanceSettings = instanceSettings; + } + } + + public final static class RabbitMQInstanceSettings { + private String host = null; + private int port = 5672; + private String username = null; + private String password = null; + private String virtualHost = null; + private CachingConnectionFactory.ConfirmType publisherConfirmType = CachingConnectionFactory.ConfirmType.SIMPLE; + private boolean publisherConfirms = true; + private boolean publisherReturns = true; + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public String getVirtualHost() { + return virtualHost; + } + + public void setVirtualHost(String virtualHost) { + this.virtualHost = virtualHost; + } + + public CachingConnectionFactory.ConfirmType getPublisherConfirmType() { + return publisherConfirmType; + } + + public void setPublisherConfirmType(CachingConnectionFactory.ConfirmType publisherConfirmType) { + this.publisherConfirmType = publisherConfirmType; + } + + public boolean isPublisherConfirms() { + return publisherConfirms; + } + + public void setPublisherConfirms(boolean publisherConfirms) { + this.publisherConfirms = publisherConfirms; + } + + public boolean isPublisherReturns() { + return publisherReturns; + } + + public void setPublisherReturns(boolean publisherReturns) { + this.publisherReturns = publisherReturns; + } + } + + public final static class KafkaInstanceSettings { + private String bootstrapServers = null; + private AutoOffsetReset autoOffsetReset = EARLIEST; + private Boolean enableAutoCommit = false; + private Boolean allowAutoCreateTopics = false; + + public String getBootstrapServers() { + return bootstrapServers; + } + + public void setBootstrapServers(String bootstrapServers) { + this.bootstrapServers = bootstrapServers; + } + + public AutoOffsetReset getAutoOffsetReset() { + return autoOffsetReset; + } + + public void setAutoOffsetReset(AutoOffsetReset autoOffsetReset) { + this.autoOffsetReset = autoOffsetReset; + } + + public Boolean isEnableAutoCommit() { + return enableAutoCommit; + } + + public void setEnableAutoCommit(Boolean enableAutoCommit) { + this.enableAutoCommit = enableAutoCommit; + } + + public Boolean isAllowAutoCreateTopics() { + return allowAutoCreateTopics; + } + + public void setAllowAutoCreateTopics(Boolean allowAutoCreateTopics) { + this.allowAutoCreateTopics = allowAutoCreateTopics; + } + } + + public enum AutoOffsetReset { + EARLIEST, LATEST, NONE, ANYTHING } public final static class HazelcastProperties { diff --git a/src/main/java/datawave/microservice/query/messaging/kafka/KafkaQueryResultsManager.java b/src/main/java/datawave/microservice/query/messaging/kafka/KafkaQueryResultsManager.java index 8b624823..f82c249e 100644 --- a/src/main/java/datawave/microservice/query/messaging/kafka/KafkaQueryResultsManager.java +++ b/src/main/java/datawave/microservice/query/messaging/kafka/KafkaQueryResultsManager.java @@ -1,7 +1,5 @@ package datawave.microservice.query.messaging.kafka; -import static datawave.microservice.query.messaging.kafka.KafkaQueryResultsManager.KAFKA; - import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -16,21 +14,16 @@ import org.apache.kafka.common.TopicPartitionInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.kafka.config.TopicBuilder; import org.springframework.kafka.core.ConsumerFactory; -import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; -import org.springframework.stereotype.Component; import datawave.microservice.query.messaging.QueryResultsListener; import datawave.microservice.query.messaging.QueryResultsManager; import datawave.microservice.query.messaging.QueryResultsPublisher; import datawave.microservice.query.messaging.config.MessagingProperties; -@Component -@ConditionalOnProperty(name = "datawave.query.messaging.backend", havingValue = KAFKA) public class KafkaQueryResultsManager implements QueryResultsManager { private final Logger log = LoggerFactory.getLogger(this.getClass()); @@ -43,12 +36,12 @@ public class KafkaQueryResultsManager implements QueryResultsManager { private final ProducerFactory kafkaProducerFactory; private final ConsumerFactory kafkaConsumerFactory; - public KafkaQueryResultsManager(MessagingProperties messagingProperties, KafkaAdmin adminClient, ProducerFactory kafkaProducerFactory, - ConsumerFactory kafkaConsumerFactory) { + public KafkaQueryResultsManager(MessagingProperties messagingProperties, AdminClient queryKafkaAdminClient, + ProducerFactory queryKafkaProducerFactory, ConsumerFactory queryKafkaConsumerFactory) { this.messagingProperties = messagingProperties; - this.adminClient = AdminClient.create(adminClient.getConfigurationProperties()); - this.kafkaProducerFactory = kafkaProducerFactory; - this.kafkaConsumerFactory = kafkaConsumerFactory; + this.adminClient = queryKafkaAdminClient; + this.kafkaProducerFactory = queryKafkaProducerFactory; + this.kafkaConsumerFactory = queryKafkaConsumerFactory; } /** @@ -66,7 +59,7 @@ public QueryResultsListener createListener(String listenerId, String queryId) { /** * Create a publisher for a specific query id. - * + * * @param queryId * The query ID to publish to * @return a query result publisher diff --git a/src/main/java/datawave/microservice/query/messaging/kafka/config/KafkaMessagingConfiguration.java b/src/main/java/datawave/microservice/query/messaging/kafka/config/KafkaMessagingConfiguration.java new file mode 100644 index 00000000..9fd6fd28 --- /dev/null +++ b/src/main/java/datawave/microservice/query/messaging/kafka/config/KafkaMessagingConfiguration.java @@ -0,0 +1,117 @@ +package datawave.microservice.query.messaging.kafka.config; + +import static datawave.microservice.query.messaging.kafka.KafkaQueryResultsManager.KAFKA; +import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaAdmin; +import org.springframework.kafka.core.ProducerFactory; + +import datawave.microservice.query.messaging.QueryResultsManager; +import datawave.microservice.query.messaging.config.MessagingProperties; +import datawave.microservice.query.messaging.kafka.KafkaQueryResultsManager; + +@Configuration +@ConditionalOnProperty(name = "datawave.query.messaging.backend", havingValue = KAFKA) +public class KafkaMessagingConfiguration { + + @Bean + public QueryResultsManager kafkaQueryResultsManager(MessagingProperties messagingProperties, @Autowired(required = false) KafkaAdmin kafkaAdmin, + @Autowired(required = false) ProducerFactory kafkaProducerFactory, + @Autowired(required = false) ConsumerFactory kafkaConsumerFactory) { + Map kafkaConfigProps = createKafkaConfigProps(messagingProperties); + // @formatter:off + return new KafkaQueryResultsManager( + messagingProperties, + createAdminClient(messagingProperties, kafkaConfigProps, kafkaAdmin), + createProducerFactory(messagingProperties, kafkaConfigProps, kafkaProducerFactory), + createConsumerFactory(messagingProperties, kafkaConfigProps, kafkaConsumerFactory)); + // @formatter:on + } + + public Map createKafkaConfigProps(MessagingProperties messagingProperties) { + MessagingProperties.KafkaInstanceSettings instanceSettings = messagingProperties.getKafka().getInstanceSettings(); + + Map configProps = new HashMap<>(); + if (instanceSettings.getBootstrapServers() != null) { + configProps.put(BOOTSTRAP_SERVERS_CONFIG, instanceSettings.getBootstrapServers()); + } + + if (instanceSettings.getAutoOffsetReset() != null) { + configProps.put(AUTO_OFFSET_RESET_CONFIG, instanceSettings.getAutoOffsetReset().name().toLowerCase()); + } + + if (instanceSettings.isEnableAutoCommit() != null) { + configProps.put(ENABLE_AUTO_COMMIT_CONFIG, instanceSettings.isEnableAutoCommit()); + } + + if (instanceSettings.isAllowAutoCreateTopics() != null) { + configProps.put(ALLOW_AUTO_CREATE_TOPICS_CONFIG, instanceSettings.isAllowAutoCreateTopics()); + } + + return configProps; + } + + public AdminClient createAdminClient(MessagingProperties messagingProperties, Map queryKafkaConfigProps, KafkaAdmin kafkaAdmin) { + AdminClient finalClient = null; + + if (messagingProperties.getKafka().isUseDedicatedInstance()) { + finalClient = AdminClient.create(queryKafkaConfigProps); + } else { + finalClient = AdminClient.create(kafkaAdmin.getConfigurationProperties()); + } + + return finalClient; + } + + public ProducerFactory createProducerFactory(MessagingProperties messagingProperties, Map queryKafkaConfigProps, + ProducerFactory kafkaProducerFactory) { + ProducerFactory finalKafkaProducerFactory = null; + + if (messagingProperties.getKafka().isUseDedicatedInstance()) { + // @formatter:off + finalKafkaProducerFactory = new DefaultKafkaProducerFactory<>( + queryKafkaConfigProps, + new StringSerializer(), + new StringSerializer()); + // @formatter:on + } else { + finalKafkaProducerFactory = kafkaProducerFactory; + } + + return finalKafkaProducerFactory; + } + + public ConsumerFactory createConsumerFactory(MessagingProperties messagingProperties, Map queryKafkaConfigProps, + ConsumerFactory kafkaConsumerFactory) { + ConsumerFactory finalKafkaConsumerFactory = null; + + if (messagingProperties.getKafka().isUseDedicatedInstance()) { + // @formatter:off + finalKafkaConsumerFactory = new DefaultKafkaConsumerFactory<>( + queryKafkaConfigProps, + new StringDeserializer(), + new StringDeserializer()); + // @formatter:on + } else { + finalKafkaConsumerFactory = kafkaConsumerFactory; + } + + return finalKafkaConsumerFactory; + } +} diff --git a/src/main/java/datawave/microservice/query/messaging/rabbitmq/RabbitMQQueryResultsManager.java b/src/main/java/datawave/microservice/query/messaging/rabbitmq/RabbitMQQueryResultsManager.java index d0a91ac0..1850c62b 100644 --- a/src/main/java/datawave/microservice/query/messaging/rabbitmq/RabbitMQQueryResultsManager.java +++ b/src/main/java/datawave/microservice/query/messaging/rabbitmq/RabbitMQQueryResultsManager.java @@ -1,7 +1,5 @@ package datawave.microservice.query.messaging.rabbitmq; -import static datawave.microservice.query.messaging.rabbitmq.RabbitMQQueryResultsManager.RABBITMQ; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.AmqpIOException; @@ -15,9 +13,6 @@ import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Component; import datawave.microservice.query.messaging.ClaimCheck; import datawave.microservice.query.messaging.QueryResultsListener; @@ -25,8 +20,6 @@ import datawave.microservice.query.messaging.QueryResultsPublisher; import datawave.microservice.query.messaging.config.MessagingProperties; -@Component -@ConditionalOnProperty(name = "datawave.query.messaging.backend", havingValue = RABBITMQ) public class RabbitMQQueryResultsManager implements QueryResultsManager { private final Logger log = LoggerFactory.getLogger(this.getClass()); @@ -44,15 +37,15 @@ public class RabbitMQQueryResultsManager implements QueryResultsManager { private final DirectRabbitListenerContainerFactory listenerContainerFactory; public RabbitMQQueryResultsManager(MessagingProperties messagingProperties, RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry, - CachingConnectionFactory connectionFactory, @Autowired(required = false) ClaimCheck claimCheck) { + CachingConnectionFactory cachingConnectionFactory, ClaimCheck claimCheck) { this.messagingProperties = messagingProperties; this.rabbitListenerEndpointRegistry = rabbitListenerEndpointRegistry; - this.connectionFactory = connectionFactory; + this.connectionFactory = cachingConnectionFactory; this.claimCheck = claimCheck; - rabbitAdmin = new RabbitAdmin(connectionFactory); + rabbitAdmin = new RabbitAdmin(cachingConnectionFactory); listenerContainerFactory = new DirectRabbitListenerContainerFactory(); - listenerContainerFactory.setConnectionFactory(connectionFactory); + listenerContainerFactory.setConnectionFactory(cachingConnectionFactory); listenerContainerFactory.setConsumersPerQueue(messagingProperties.getConcurrency()); } diff --git a/src/main/java/datawave/microservice/query/messaging/rabbitmq/config/RabbitMQMessagingConfiguration.java b/src/main/java/datawave/microservice/query/messaging/rabbitmq/config/RabbitMQMessagingConfiguration.java new file mode 100644 index 00000000..2caa752f --- /dev/null +++ b/src/main/java/datawave/microservice/query/messaging/rabbitmq/config/RabbitMQMessagingConfiguration.java @@ -0,0 +1,75 @@ +package datawave.microservice.query.messaging.rabbitmq.config; + +import static datawave.microservice.query.messaging.rabbitmq.RabbitMQQueryResultsManager.RABBITMQ; + +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; +import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import com.rabbitmq.client.ConnectionFactory; + +import datawave.microservice.query.messaging.ClaimCheck; +import datawave.microservice.query.messaging.QueryResultsManager; +import datawave.microservice.query.messaging.config.MessagingProperties; +import datawave.microservice.query.messaging.rabbitmq.RabbitMQQueryResultsManager; + +@Configuration +@ConditionalOnProperty(name = "datawave.query.messaging.backend", havingValue = RABBITMQ) +public class RabbitMQMessagingConfiguration { + + @Bean + public QueryResultsManager rabbitMQQueryResultsManager(MessagingProperties messagingProperties, + RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry, @Autowired(required = false) CachingConnectionFactory connectionFactory, + @Autowired(required = false) ClaimCheck claimCheck) { + // @formatter:off + return new RabbitMQQueryResultsManager( + messagingProperties, + rabbitListenerEndpointRegistry, + createCachingConnectionFactory(messagingProperties, connectionFactory), + claimCheck); + // @formatter:on + } + + public CachingConnectionFactory createCachingConnectionFactory(MessagingProperties messagingProperties, CachingConnectionFactory connectionFactory) { + CachingConnectionFactory finalConnectionFactory = connectionFactory; + + if (messagingProperties.getRabbitmq().isUseDedicatedInstance()) { + MessagingProperties.RabbitMQInstanceSettings rabbitMqProperties = messagingProperties.getRabbitmq().getInstanceSettings(); + + ConnectionFactory dedicatedConnectionFactory = new ConnectionFactory(); + + if (rabbitMqProperties.getHost() != null) { + dedicatedConnectionFactory.setHost(rabbitMqProperties.getHost()); + } + + if (rabbitMqProperties.getPassword() != null) { + dedicatedConnectionFactory.setPort(rabbitMqProperties.getPort()); + } + + if (rabbitMqProperties.getUsername() != null) { + dedicatedConnectionFactory.setUsername(rabbitMqProperties.getUsername()); + } + + if (rabbitMqProperties.getPassword() != null) { + dedicatedConnectionFactory.setPassword(rabbitMqProperties.getPassword()); + } + + if (rabbitMqProperties.getVirtualHost() != null) { + dedicatedConnectionFactory.setVirtualHost(rabbitMqProperties.getVirtualHost()); + } + + finalConnectionFactory = new CachingConnectionFactory(dedicatedConnectionFactory); + if (rabbitMqProperties.getPublisherConfirmType() != null) { + finalConnectionFactory.setPublisherConfirmType(rabbitMqProperties.getPublisherConfirmType()); + } + + finalConnectionFactory.setPublisherConfirms(CachingConnectionFactory.ConfirmType.SIMPLE != rabbitMqProperties.getPublisherConfirmType() + && rabbitMqProperties.isPublisherConfirms()); + finalConnectionFactory.setPublisherReturns(rabbitMqProperties.isPublisherReturns()); + } + return finalConnectionFactory; + } +}