Skip to content

Commit

Permalink
1.0.7
Browse files Browse the repository at this point in the history
  • Loading branch information
jwomeara committed Sep 22, 2024
1 parent eb8ef50 commit ccf469a
Show file tree
Hide file tree
Showing 7 changed files with 375 additions and 29 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<relativePath>../../microservice-service-parent/pom.xml</relativePath>
</parent>
<artifactId>spring-boot-starter-datawave-query</artifactId>
<version>1.0.7-SNAPSHOT</version>
<version>1.0.7</version>
<url>https://github.com/NationalSecurityAgency/datawave-spring-boot-starter-query</url>
<licenses>
<license>
Expand All @@ -27,7 +27,7 @@
<version.datawave>7.3.0</version.datawave>
<version.guava>31.0.1-jre</version.guava>
<version.microservice.hazelcast-client>4.0.2</version.microservice.hazelcast-client>
<version.microservice.starter>4.0.2</version.microservice.starter>
<version.microservice.starter>4.0.3</version.microservice.starter>
<version.microservice.starter-metadata>3.0.2</version.microservice.starter-metadata>
<version.microservice.starter-metrics>3.0.3</version.microservice.starter-metrics>
</properties>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,24 @@
package datawave.microservice.query.config;

import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cache.caffeine.CaffeineCacheManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.github.benmanes.caffeine.cache.CaffeineSpec;

import datawave.microservice.query.edge.config.EdgeDictionaryProviderProperties;
import datawave.microservice.query.mapreduce.config.MapReduceQueryProperties;
import datawave.microservice.query.stream.StreamingProperties;

@Configuration
@EnableConfigurationProperties({QueryProperties.class, MapReduceQueryProperties.class, StreamingProperties.class, EdgeDictionaryProviderProperties.class})
public class QueryStarterConfiguration {}
public class QueryStarterConfiguration {

@Bean
public CaffeineCacheManager dateIndexHelperCacheManager() {
CaffeineCacheManager caffeineCacheManager = new CaffeineCacheManager();
caffeineCacheManager.setCaffeineSpec(CaffeineSpec.parse("maximumSize=1000, expireAfterAccess=24h, expireAfterWrite=24h"));
return caffeineCacheManager;
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package datawave.microservice.query.messaging.config;

import static datawave.microservice.query.messaging.config.MessagingProperties.AutoOffsetReset.EARLIEST;
import static datawave.microservice.query.messaging.hazelcast.HazelcastQueryResultsManager.HAZELCAST;
import static datawave.microservice.query.messaging.kafka.KafkaQueryResultsManager.KAFKA;

import java.util.concurrent.TimeUnit;

import javax.validation.Valid;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Positive;
import javax.validation.constraints.PositiveOrZero;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.validation.annotation.Validated;

Expand Down Expand Up @@ -97,6 +97,9 @@ public static final class KafkaProperties {

private int replicas = -1;

private boolean useDedicatedInstance = true;
private KafkaInstanceSettings instanceSettings = new KafkaInstanceSettings();

public long getPollTimeoutMillis() {
return pollTimeoutMillis;
}
Expand Down Expand Up @@ -128,6 +131,22 @@ public int getReplicas() {
public void setReplicas(int replicas) {
this.replicas = replicas;
}

public boolean isUseDedicatedInstance() {
return useDedicatedInstance;
}

public void setUseDedicatedInstance(boolean useDedicatedInstance) {
this.useDedicatedInstance = useDedicatedInstance;
}

public KafkaInstanceSettings getInstanceSettings() {
return instanceSettings;
}

public void setInstanceSettings(KafkaInstanceSettings instanceSettings) {
this.instanceSettings = instanceSettings;
}
}

public final static class RabbitMQProperties {
Expand All @@ -139,6 +158,9 @@ public final static class RabbitMQProperties {
@Positive
private long maxMessageSizeBytes = 536870912L;

private boolean useDedicatedInstance = false;
private RabbitMQInstanceSettings instanceSettings = new RabbitMQInstanceSettings();

public boolean isDurable() {
return durable;
}
Expand All @@ -154,6 +176,140 @@ public long getMaxMessageSizeBytes() {
public void setMaxMessageSizeBytes(long maxMessageSizeBytes) {
this.maxMessageSizeBytes = maxMessageSizeBytes;
}

public boolean isUseDedicatedInstance() {
return useDedicatedInstance;
}

public void setUseDedicatedInstance(boolean useDedicatedInstance) {
this.useDedicatedInstance = useDedicatedInstance;
}

public RabbitMQInstanceSettings getInstanceSettings() {
return instanceSettings;
}

public void setInstanceSettings(RabbitMQInstanceSettings instanceSettings) {
this.instanceSettings = instanceSettings;
}
}

public final static class RabbitMQInstanceSettings {
private String host = null;
private int port = 5672;
private String username = null;
private String password = null;
private String virtualHost = null;
private CachingConnectionFactory.ConfirmType publisherConfirmType = CachingConnectionFactory.ConfirmType.SIMPLE;
private boolean publisherConfirms = true;
private boolean publisherReturns = true;

public String getHost() {
return host;
}

public void setHost(String host) {
this.host = host;
}

public int getPort() {
return port;
}

public void setPort(int port) {
this.port = port;
}

public String getUsername() {
return username;
}

public void setUsername(String username) {
this.username = username;
}

public String getPassword() {
return password;
}

public void setPassword(String password) {
this.password = password;
}

public String getVirtualHost() {
return virtualHost;
}

public void setVirtualHost(String virtualHost) {
this.virtualHost = virtualHost;
}

public CachingConnectionFactory.ConfirmType getPublisherConfirmType() {
return publisherConfirmType;
}

public void setPublisherConfirmType(CachingConnectionFactory.ConfirmType publisherConfirmType) {
this.publisherConfirmType = publisherConfirmType;
}

public boolean isPublisherConfirms() {
return publisherConfirms;
}

public void setPublisherConfirms(boolean publisherConfirms) {
this.publisherConfirms = publisherConfirms;
}

public boolean isPublisherReturns() {
return publisherReturns;
}

public void setPublisherReturns(boolean publisherReturns) {
this.publisherReturns = publisherReturns;
}
}

public final static class KafkaInstanceSettings {
private String bootstrapServers = null;
private AutoOffsetReset autoOffsetReset = EARLIEST;
private Boolean enableAutoCommit = false;
private Boolean allowAutoCreateTopics = false;

public String getBootstrapServers() {
return bootstrapServers;
}

public void setBootstrapServers(String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}

public AutoOffsetReset getAutoOffsetReset() {
return autoOffsetReset;
}

public void setAutoOffsetReset(AutoOffsetReset autoOffsetReset) {
this.autoOffsetReset = autoOffsetReset;
}

public Boolean isEnableAutoCommit() {
return enableAutoCommit;
}

public void setEnableAutoCommit(Boolean enableAutoCommit) {
this.enableAutoCommit = enableAutoCommit;
}

public Boolean isAllowAutoCreateTopics() {
return allowAutoCreateTopics;
}

public void setAllowAutoCreateTopics(Boolean allowAutoCreateTopics) {
this.allowAutoCreateTopics = allowAutoCreateTopics;
}
}

public enum AutoOffsetReset {
EARLIEST, LATEST, NONE, ANYTHING
}

public final static class HazelcastProperties {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package datawave.microservice.query.messaging.kafka;

import static datawave.microservice.query.messaging.kafka.KafkaQueryResultsManager.KAFKA;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -16,21 +14,16 @@
import org.apache.kafka.common.TopicPartitionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.stereotype.Component;

import datawave.microservice.query.messaging.QueryResultsListener;
import datawave.microservice.query.messaging.QueryResultsManager;
import datawave.microservice.query.messaging.QueryResultsPublisher;
import datawave.microservice.query.messaging.config.MessagingProperties;

@Component
@ConditionalOnProperty(name = "datawave.query.messaging.backend", havingValue = KAFKA)
public class KafkaQueryResultsManager implements QueryResultsManager {
private final Logger log = LoggerFactory.getLogger(this.getClass());

Expand All @@ -43,12 +36,12 @@ public class KafkaQueryResultsManager implements QueryResultsManager {
private final ProducerFactory<String,String> kafkaProducerFactory;
private final ConsumerFactory<String,String> kafkaConsumerFactory;

public KafkaQueryResultsManager(MessagingProperties messagingProperties, KafkaAdmin adminClient, ProducerFactory<String,String> kafkaProducerFactory,
ConsumerFactory<String,String> kafkaConsumerFactory) {
public KafkaQueryResultsManager(MessagingProperties messagingProperties, AdminClient queryKafkaAdminClient,
ProducerFactory<String,String> queryKafkaProducerFactory, ConsumerFactory<String,String> queryKafkaConsumerFactory) {
this.messagingProperties = messagingProperties;
this.adminClient = AdminClient.create(adminClient.getConfigurationProperties());
this.kafkaProducerFactory = kafkaProducerFactory;
this.kafkaConsumerFactory = kafkaConsumerFactory;
this.adminClient = queryKafkaAdminClient;
this.kafkaProducerFactory = queryKafkaProducerFactory;
this.kafkaConsumerFactory = queryKafkaConsumerFactory;
}

/**
Expand All @@ -66,7 +59,7 @@ public QueryResultsListener createListener(String listenerId, String queryId) {

/**
* Create a publisher for a specific query id.
*
*
* @param queryId
* The query ID to publish to
* @return a query result publisher
Expand Down
Loading

0 comments on commit ccf469a

Please sign in to comment.