From 973e52c0450a86dddbe6a57f5341b2dd81ac081d Mon Sep 17 00:00:00 2001 From: Kilemonn Date: Sat, 26 Aug 2023 10:25:10 +1000 Subject: [PATCH 01/10] Start working on mongo db support, creating models and operation repository. --- build.gradle.kts | 5 + .../message/PayloadResolvableMessage.kt | 10 + .../messagequeue/message/QueueMessage.kt | 8 +- .../message/QueueMessageDocument.kt | 30 +++ .../queue/nosql/NoSqlMultiQueue.kt | 85 +++++++++ .../repository/MongoQueueMessageRepository.kt | 11 ++ .../repository/QueueMessageRepository.kt | 175 ++++++++---------- .../messagequeue/queue/sql/SqlMultiQueue.kt | 12 +- .../repository/SQLQueueMessageRepository.kt | 50 +++++ .../rest/controller/MessageQueueController.kt | 4 +- ...IdFilter.kt => CorrelationIdFilterTest.kt} | 2 +- ...kt => RestResponseExceptionHandlerTest.kt} | 2 +- 12 files changed, 284 insertions(+), 110 deletions(-) create mode 100644 src/main/kotlin/au/kilemon/messagequeue/message/PayloadResolvableMessage.kt create mode 100644 src/main/kotlin/au/kilemon/messagequeue/message/QueueMessageDocument.kt create mode 100644 src/main/kotlin/au/kilemon/messagequeue/queue/nosql/NoSqlMultiQueue.kt create mode 100644 src/main/kotlin/au/kilemon/messagequeue/queue/nosql/repository/MongoQueueMessageRepository.kt rename src/main/kotlin/au/kilemon/messagequeue/queue/{sql => }/repository/QueueMessageRepository.kt (77%) create mode 100644 src/main/kotlin/au/kilemon/messagequeue/queue/sql/repository/SQLQueueMessageRepository.kt rename src/test/kotlin/au/kilemon/messagequeue/filter/{TestCorrelationIdFilter.kt => CorrelationIdFilterTest.kt} (94%) rename src/test/kotlin/au/kilemon/messagequeue/rest/response/{TestRestResponseExceptionHandler.kt => RestResponseExceptionHandlerTest.kt} (94%) diff --git a/build.gradle.kts b/build.gradle.kts index 2845e52..9acf857 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -42,6 +42,11 @@ dependencies { // https://mvnrepository.com/artifact/org.postgresql/postgresql implementation("org.postgresql:postgresql:42.5.1") + // No SQL drivers + // https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-data-mongodb + implementation("org.springframework.boot:spring-boot-starter-data-mongodb:3.1.3") + + // Test dependencies testImplementation("org.springframework.boot:spring-boot-starter-test:3.0.6") // Required to mock MultiQueue objects since they apparently override a final 'remove(Object)' method. testImplementation("org.mockito:mockito-inline:5.1.0") diff --git a/src/main/kotlin/au/kilemon/messagequeue/message/PayloadResolvableMessage.kt b/src/main/kotlin/au/kilemon/messagequeue/message/PayloadResolvableMessage.kt new file mode 100644 index 0000000..a6e162c --- /dev/null +++ b/src/main/kotlin/au/kilemon/messagequeue/message/PayloadResolvableMessage.kt @@ -0,0 +1,10 @@ +package au.kilemon.messagequeue.message + +import java.io.Serializable + +interface PayloadResolvableMessage: Serializable +{ + fun resolvePayloadObject(): T + + fun removePayload(detailed: Boolean?): T +} diff --git a/src/main/kotlin/au/kilemon/messagequeue/message/QueueMessage.kt b/src/main/kotlin/au/kilemon/messagequeue/message/QueueMessage.kt index efae459..33bb974 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/message/QueueMessage.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/message/QueueMessage.kt @@ -11,11 +11,13 @@ import javax.persistence.* * This object wraps a [Any] type `T` which is the payload to be stored in the queue. (This is actually a [Serializable] but causes issues in initialisation * if the type is an `interface`. This needs to be [Serializable] if you want to use it with `Redis` or anything else). * + * This is used for `InMemory`, `Redis` and `SQL` queues. + * * @author github.com/Kilemonn */ @Entity @Table(name = QueueMessage.TABLE_NAME) // TODO: Schema configuration schema = "\${${MessageQueueSettings.SQL_SCHEMA}:${MessageQueueSettings.SQL_SCHEMA_DEFAULT}}") -class QueueMessage(payload: Any?, @Column(nullable = false) var type: String, @Column(name = "assignedto") var assignedTo: String? = null): Serializable +class QueueMessage(payload: Any?, @Column(nullable = false) var type: String, @Column(name = "assignedto") var assignedTo: String? = null): Serializable, PayloadResolvableMessage { companion object { @@ -61,7 +63,7 @@ class QueueMessage(payload: Any?, @Column(nullable = false) var type: String, @C * * @return the current instance with the conditionally modified [QueueMessage.payload] member based on the points above */ - fun resolvePayloadObject(): QueueMessage + override fun resolvePayloadObject(): QueueMessage { if (payloadBytes != null && payload == null) { @@ -77,7 +79,7 @@ class QueueMessage(payload: Any?, @Column(nullable = false) var type: String, @C * @param detailed when `true` the [payload] object will be logged as well, otherwise the [payload] will not be contained in the response or `null`. * @return [QueueMessage] that is either a copy of `this` without the payload, or `this` with a resolved payload */ - fun removePayload(detailed: Boolean?): QueueMessage + override fun removePayload(detailed: Boolean?): QueueMessage { if (detailed == false) { diff --git a/src/main/kotlin/au/kilemon/messagequeue/message/QueueMessageDocument.kt b/src/main/kotlin/au/kilemon/messagequeue/message/QueueMessageDocument.kt new file mode 100644 index 0000000..103f1d5 --- /dev/null +++ b/src/main/kotlin/au/kilemon/messagequeue/message/QueueMessageDocument.kt @@ -0,0 +1,30 @@ +package au.kilemon.messagequeue.message + +import com.fasterxml.jackson.annotation.JsonIgnore +import org.springframework.data.annotation.Id +import org.springframework.data.mongodb.core.mapping.Document +import org.springframework.util.SerializationUtils +import java.util.* +import javax.persistence.* + +/** + * This is used for `No-SQL` queues. + * + * @author github.com/Kilemonn + */ +@Document +class QueueMessageDocument(var payload: Any?, @Column(nullable = false) var type: String, @Column(name = "assignedto") var assignedTo: String? = null) +{ + @Column(nullable = false, unique = true) + var uuid: String = UUID.randomUUID().toString() + + @JsonIgnore + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + var id: Long? = null + + /** + * Required for JSON deserialisation. + */ + constructor() : this(null, "") +} diff --git a/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/NoSqlMultiQueue.kt b/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/NoSqlMultiQueue.kt new file mode 100644 index 0000000..fcbb32b --- /dev/null +++ b/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/NoSqlMultiQueue.kt @@ -0,0 +1,85 @@ +package au.kilemon.messagequeue.queue.nosql + +import au.kilemon.messagequeue.logging.HasLogger +import au.kilemon.messagequeue.message.QueueMessage +import au.kilemon.messagequeue.queue.MultiQueue +import au.kilemon.messagequeue.queue.nosql.repository.MongoQueueMessageRepository +import org.slf4j.Logger +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.context.annotation.Lazy +import java.util.* +import java.util.concurrent.atomic.AtomicLong + +class NoSqlMultiQueue : MultiQueue, HasLogger +{ + override val LOG: Logger = initialiseLogger() + + override lateinit var maxQueueIndex: HashMap + + @Lazy + @Autowired + private lateinit var queueMessageRepository: MongoQueueMessageRepository + + /** + * Just initialise map, so it's not null, but the SQL [QueueMessage] ID is maintained by the database. + */ + override fun initialiseQueueIndex() + { + maxQueueIndex = HashMap() + } + + override fun persistMessage(message: QueueMessage) + { + TODO("Not yet implemented") + } + + override fun getQueueForType(queueType: String): Queue + { + TODO("Not yet implemented") + } + + override fun performHealthCheckInternal() + { + TODO("Not yet implemented") + } + + override fun getMessageByUUID(uuid: String): Optional + { + return queueMessageRepository.findByUuid(uuid) + } + + override fun clearForTypeInternal(queueType: String): Int + { + TODO("Not yet implemented") + } + + override fun isEmptyForType(queueType: String): Boolean + { + TODO("Not yet implemented") + } + + override fun pollInternal(queueType: String): Optional + { + TODO("Not yet implemented") + } + + override fun keys(includeEmpty: Boolean): Set + { + TODO("Not yet implemented") + } + + override fun containsUUID(uuid: String): Optional + { + TODO("Not yet implemented") + } + + override fun addInternal(element: QueueMessage): Boolean + { + TODO("Not yet implemented") + } + + override fun removeInternal(element: QueueMessage): Boolean + { + TODO("Not yet implemented") + } +} diff --git a/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/repository/MongoQueueMessageRepository.kt b/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/repository/MongoQueueMessageRepository.kt new file mode 100644 index 0000000..fc86034 --- /dev/null +++ b/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/repository/MongoQueueMessageRepository.kt @@ -0,0 +1,11 @@ +package au.kilemon.messagequeue.queue.nosql.repository + +import au.kilemon.messagequeue.message.QueueMessageDocument +import au.kilemon.messagequeue.queue.repository.QueueMessageRepository +import org.springframework.data.mongodb.repository.MongoRepository + + +interface MongoQueueMessageRepository: MongoRepository, QueueMessageRepository +{ + +} diff --git a/src/main/kotlin/au/kilemon/messagequeue/queue/sql/repository/QueueMessageRepository.kt b/src/main/kotlin/au/kilemon/messagequeue/queue/repository/QueueMessageRepository.kt similarity index 77% rename from src/main/kotlin/au/kilemon/messagequeue/queue/sql/repository/QueueMessageRepository.kt rename to src/main/kotlin/au/kilemon/messagequeue/queue/repository/QueueMessageRepository.kt index d9a4864..fc7f353 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/queue/sql/repository/QueueMessageRepository.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/queue/repository/QueueMessageRepository.kt @@ -1,97 +1,78 @@ -package au.kilemon.messagequeue.queue.sql.repository - -import au.kilemon.messagequeue.message.QueueMessage -import org.springframework.data.jpa.repository.JpaRepository -import org.springframework.data.jpa.repository.Modifying -import org.springframework.data.jpa.repository.Query -import org.springframework.stereotype.Repository -import org.springframework.transaction.annotation.Transactional -import java.util.* - -/** - * A [JpaRepository] specific for [QueueMessage] and queries made against them. - * Defines additional specific queries required for interacting with [QueueMessage]s. - * - * Reference: https://docs.spring.io/spring-data/jpa/docs/current/reference/html/#jpa.query-methods.query-creation - * - * @author github.com/Kilemonn - */ -@Repository -interface QueueMessageRepository: JpaRepository -{ - /** - * Delete a [QueueMessage] by the provided [QueueMessage.type] [String]. - * - * @param type the [QueueMessage.type] to remove entries by - * @return the number of deleted entities - */ - @Modifying - @Transactional - @Query("DELETE FROM QueueMessage WHERE type = ?1") - fun deleteByType(type: String): Int - - /** - * Get a distinct [List] of [String] [QueueMessage.type] that currently exist. - * - * @return a [List] of all the existing [QueueMessage.type] as [String]s - */ - @Transactional - @Query("SELECT DISTINCT type FROM QueueMessage") - fun findDistinctType(): List - - /** - * Get a list of [QueueMessage] which have [QueueMessage.type] matching the provided [type]. - * - * @param type the type to match [QueueMessage.type] with - * @return a [List] of [QueueMessage] who have a matching [QueueMessage.type] with the provided [type] - */ - @Transactional - fun findByTypeOrderByIdAsc(type: String): List - - /** - * Find the entity with the matching [QueueMessage.type] and that has a non-null [QueueMessage.assignedTo]. Sorted by ID ascending. - * - * @param type the type to match [QueueMessage.type] with - * @return a [List] of [QueueMessage] who have a matching [QueueMessage.type] with the provided [type] and non-null [QueueMessage.assignedTo] - */ - @Transactional - fun findByTypeAndAssignedToIsNotNullOrderByIdAsc(type: String): List - - /** - * Find the entity with the matching [QueueMessage.type] and that has [QueueMessage.assignedTo] set to `null`. Sorted by ID ascending. - * - * @param type the type to match [QueueMessage.type] with - * @return a [List] of [QueueMessage] who have a matching [QueueMessage.type] with the provided [type] and `null` [QueueMessage.assignedTo] - */ - @Transactional - fun findByTypeAndAssignedToIsNullOrderByIdAsc(type: String): List - - /** - * Find the entity with the matching [QueueMessage.type] and [QueueMessage.assignedTo]. Sorted by ID ascending. - * - * @param type the type to match [QueueMessage.type] with - * @param assignedTo the identifier to match [QueueMessage.assignedTo] with - * @return a [List] of [QueueMessage] who have a matching [QueueMessage.type] and [QueueMessage.assignedTo] - */ - @Transactional - fun findByTypeAndAssignedToOrderByIdAsc(type: String, assignedTo: String): List - - /** - * Find the entity which has a [QueueMessage.uuid] matching the provided [uuid]. - * - * @param uuid the [QueueMessage.uuid] of the message to find - * @return the [Optional] that may contain the found [QueueMessage] - */ - @Transactional - fun findByUuid(uuid: String): Optional - - /** - * Delete a [QueueMessage] by `uuid`. - * - * @param uuid the UUID of the [QueueMessage.uuid] to remove - * @return the number of removed entries (most likely one since the UUID is unique) - */ - @Modifying - @Transactional - fun deleteByUuid(uuid: String): Int -} +package au.kilemon.messagequeue.queue.repository + +import au.kilemon.messagequeue.message.QueueMessage +import java.util.* + +/** + * A collection of Repository method specific for [QueueMessage] and queries made against them. + * + * Reference: https://docs.spring.io/spring-data/jpa/docs/current/reference/html/#jpa.query-methods.query-creation + * + * @author github.com/Kilemonn + */ +interface QueueMessageRepository +{ + /** + * Delete a [QueueMessage] by the provided [QueueMessage.type] [String]. + * + * @param type the [QueueMessage.type] to remove entries by + * @return the number of deleted entities + */ + fun deleteByType(type: String): Int + + /** + * Get a distinct [List] of [String] [QueueMessage.type] that currently exist. + * + * @return a [List] of all the existing [QueueMessage.type] as [String]s + */ + fun findDistinctType(): List + + /** + * Get a list of [QueueMessage] which have [QueueMessage.type] matching the provided [type]. + * + * @param type the type to match [QueueMessage.type] with + * @return a [List] of [QueueMessage] who have a matching [QueueMessage.type] with the provided [type] + */ + fun findByTypeOrderByIdAsc(type: String): List + + /** + * Find the entity with the matching [QueueMessage.type] and that has a non-null [QueueMessage.assignedTo]. Sorted by ID ascending. + * + * @param type the type to match [QueueMessage.type] with + * @return a [List] of [QueueMessage] who have a matching [QueueMessage.type] with the provided [type] and non-null [QueueMessage.assignedTo] + */ + fun findByTypeAndAssignedToIsNotNullOrderByIdAsc(type: String): List + + /** + * Find the entity with the matching [QueueMessage.type] and that has [QueueMessage.assignedTo] set to `null`. Sorted by ID ascending. + * + * @param type the type to match [QueueMessage.type] with + * @return a [List] of [QueueMessage] who have a matching [QueueMessage.type] with the provided [type] and `null` [QueueMessage.assignedTo] + */ + fun findByTypeAndAssignedToIsNullOrderByIdAsc(type: String): List + + /** + * Find the entity with the matching [QueueMessage.type] and [QueueMessage.assignedTo]. Sorted by ID ascending. + * + * @param type the type to match [QueueMessage.type] with + * @param assignedTo the identifier to match [QueueMessage.assignedTo] with + * @return a [List] of [QueueMessage] who have a matching [QueueMessage.type] and [QueueMessage.assignedTo] + */ + fun findByTypeAndAssignedToOrderByIdAsc(type: String, assignedTo: String): List + + /** + * Find the entity which has a [QueueMessage.uuid] matching the provided [uuid]. + * + * @param uuid the [QueueMessage.uuid] of the message to find + * @return the [Optional] that may contain the found [QueueMessage] + */ + fun findByUuid(uuid: String): Optional + + /** + * Delete a [QueueMessage] by `uuid`. + * + * @param uuid the UUID of the [QueueMessage.uuid] to remove + * @return the number of removed entries (most likely one since the UUID is unique) + */ + fun deleteByUuid(uuid: String): Int +} diff --git a/src/main/kotlin/au/kilemon/messagequeue/queue/sql/SqlMultiQueue.kt b/src/main/kotlin/au/kilemon/messagequeue/queue/sql/SqlMultiQueue.kt index 9c317ba..0e8c4ca 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/queue/sql/SqlMultiQueue.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/queue/sql/SqlMultiQueue.kt @@ -4,7 +4,7 @@ import au.kilemon.messagequeue.logging.HasLogger import au.kilemon.messagequeue.message.QueueMessage import au.kilemon.messagequeue.queue.MultiQueue import au.kilemon.messagequeue.queue.exception.MessageUpdateException -import au.kilemon.messagequeue.queue.sql.repository.QueueMessageRepository +import au.kilemon.messagequeue.queue.sql.repository.SQLQueueMessageRepository import org.slf4j.Logger import org.springframework.beans.factory.annotation.Autowired import org.springframework.context.annotation.Lazy @@ -24,7 +24,7 @@ class SqlMultiQueue : MultiQueue, HasLogger @Lazy @Autowired - private lateinit var queueMessageRepository: QueueMessageRepository + private lateinit var queueMessageRepository: SQLQueueMessageRepository override lateinit var maxQueueIndex: HashMap @@ -39,7 +39,7 @@ class SqlMultiQueue : MultiQueue, HasLogger override fun getQueueForType(queueType: String): Queue { val entries = queueMessageRepository.findByTypeOrderByIdAsc(queueType) - return ConcurrentLinkedQueue(entries.map { entry -> entry.resolvePayloadObject() }) + return ConcurrentLinkedQueue(entries.map { entry -> entry.resolvePayloadObject() as QueueMessage }) } /** @@ -56,7 +56,7 @@ class SqlMultiQueue : MultiQueue, HasLogger queueMessageRepository.findByTypeAndAssignedToOrderByIdAsc(queueType, assignedTo) } - return ConcurrentLinkedQueue(entries.map { entry -> entry.resolvePayloadObject() }) + return ConcurrentLinkedQueue(entries.map { entry -> entry.resolvePayloadObject() as QueueMessage }) } /** @@ -65,7 +65,7 @@ class SqlMultiQueue : MultiQueue, HasLogger override fun getUnassignedMessagesForType(queueType: String): Queue { val entries = queueMessageRepository.findByTypeAndAssignedToIsNullOrderByIdAsc(queueType) - return ConcurrentLinkedQueue(entries.map { entry -> entry.resolvePayloadObject() }) + return ConcurrentLinkedQueue(entries.map { entry -> entry.resolvePayloadObject() as QueueMessage }) } override fun performHealthCheckInternal() @@ -95,7 +95,7 @@ class SqlMultiQueue : MultiQueue, HasLogger val messages = queueMessageRepository.findByTypeOrderByIdAsc(queueType) return if (messages.isNotEmpty()) { - return Optional.of(messages[0].resolvePayloadObject()) + return Optional.of(messages[0].resolvePayloadObject() as QueueMessage) } else { diff --git a/src/main/kotlin/au/kilemon/messagequeue/queue/sql/repository/SQLQueueMessageRepository.kt b/src/main/kotlin/au/kilemon/messagequeue/queue/sql/repository/SQLQueueMessageRepository.kt new file mode 100644 index 0000000..2a62984 --- /dev/null +++ b/src/main/kotlin/au/kilemon/messagequeue/queue/sql/repository/SQLQueueMessageRepository.kt @@ -0,0 +1,50 @@ +package au.kilemon.messagequeue.queue.sql.repository + +import au.kilemon.messagequeue.message.QueueMessage +import au.kilemon.messagequeue.queue.repository.QueueMessageRepository +import org.springframework.data.jpa.repository.JpaRepository +import org.springframework.data.jpa.repository.Modifying +import org.springframework.data.jpa.repository.Query +import org.springframework.stereotype.Repository +import org.springframework.transaction.annotation.Transactional +import java.util.* + +/** + * A [JpaRepository] specific for [QueueMessage] and queries made against them. + * Defines additional specific queries required for interacting with [QueueMessage]s. + * + * Reference: https://docs.spring.io/spring-data/jpa/docs/current/reference/html/#jpa.query-methods.query-creation + * + * @author github.com/Kilemonn + */ +@Repository +interface SQLQueueMessageRepository: JpaRepository, QueueMessageRepository +{ + @Modifying + @Transactional + @Query("DELETE FROM QueueMessage WHERE type = ?1") + override fun deleteByType(type: String): Int + + @Transactional + @Query("SELECT DISTINCT type FROM QueueMessage") + override fun findDistinctType(): List + + @Transactional + override fun findByTypeOrderByIdAsc(type: String): List + + @Transactional + override fun findByTypeAndAssignedToIsNotNullOrderByIdAsc(type: String): List + + @Transactional + override fun findByTypeAndAssignedToIsNullOrderByIdAsc(type: String): List + + @Transactional + override fun findByTypeAndAssignedToOrderByIdAsc(type: String, assignedTo: String): List + + @Transactional + override fun findByUuid(uuid: String): Optional + + @Modifying + @Transactional + override fun deleteByUuid(uuid: String): Int +} diff --git a/src/main/kotlin/au/kilemon/messagequeue/rest/controller/MessageQueueController.kt b/src/main/kotlin/au/kilemon/messagequeue/rest/controller/MessageQueueController.kt index 83ee25d..3b0b438 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/rest/controller/MessageQueueController.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/rest/controller/MessageQueueController.kt @@ -269,7 +269,7 @@ open class MessageQueueController : HasLogger { LOG.debug("Retrieving all entry details from queue with type [{}].", queueType) val queueForType: Queue = messageQueue.getQueueForType(queueType) - val queueDetails = queueForType.stream().map { message -> message.removePayload(detailed) }.collect(Collectors.toList()) + val queueDetails = queueForType.stream().map { message -> message.removePayload(detailed) as QueueMessage }.collect(Collectors.toList()) responseMap[queueType] = queueDetails } else @@ -279,7 +279,7 @@ open class MessageQueueController : HasLogger { // No need to empty check since we passed `false` to `keys()` above val queueForType: Queue = messageQueue.getQueueForType(key) - val queueDetails = queueForType.stream().map { message -> message.removePayload(detailed) }.collect(Collectors.toList()) + val queueDetails = queueForType.stream().map { message -> message.removePayload(detailed) as QueueMessage }.collect(Collectors.toList()) responseMap[key] = queueDetails } } diff --git a/src/test/kotlin/au/kilemon/messagequeue/filter/TestCorrelationIdFilter.kt b/src/test/kotlin/au/kilemon/messagequeue/filter/CorrelationIdFilterTest.kt similarity index 94% rename from src/test/kotlin/au/kilemon/messagequeue/filter/TestCorrelationIdFilter.kt rename to src/test/kotlin/au/kilemon/messagequeue/filter/CorrelationIdFilterTest.kt index 8d72269..7b849c9 100644 --- a/src/test/kotlin/au/kilemon/messagequeue/filter/TestCorrelationIdFilter.kt +++ b/src/test/kotlin/au/kilemon/messagequeue/filter/CorrelationIdFilterTest.kt @@ -11,7 +11,7 @@ import java.util.* * * @author github.com/Kilemonn */ -class TestCorrelationIdFilter +class CorrelationIdFilterTest { private val correlationIdFilter = CorrelationIdFilter() diff --git a/src/test/kotlin/au/kilemon/messagequeue/rest/response/TestRestResponseExceptionHandler.kt b/src/test/kotlin/au/kilemon/messagequeue/rest/response/RestResponseExceptionHandlerTest.kt similarity index 94% rename from src/test/kotlin/au/kilemon/messagequeue/rest/response/TestRestResponseExceptionHandler.kt rename to src/test/kotlin/au/kilemon/messagequeue/rest/response/RestResponseExceptionHandlerTest.kt index 02e3bea..70a9b95 100644 --- a/src/test/kotlin/au/kilemon/messagequeue/rest/response/TestRestResponseExceptionHandler.kt +++ b/src/test/kotlin/au/kilemon/messagequeue/rest/response/RestResponseExceptionHandlerTest.kt @@ -14,7 +14,7 @@ import java.util.* * * @author github.com/Kilemonn */ -class TestRestResponseExceptionHandler +class RestResponseExceptionHandlerTest { @BeforeEach fun setUp() From 2ea657ab481db8fa11d741d5e65b84677397b3fd Mon Sep 17 00:00:00 2001 From: Kilemonn Date: Sat, 26 Aug 2023 10:44:41 +1000 Subject: [PATCH 02/10] Rename to "mongo" instead of "no sql". Create test class to work through configuration errors. --- .../configuration/QueueConfiguration.kt | 18 ++-- .../MongoMultiQueue.kt} | 6 +- .../repository/MongoQueueMessageRepository.kt | 2 +- .../messagequeue/settings/MultiQueueType.kt | 7 +- .../queue/nosql/mongo/MongoMultiQueueTest.kt | 91 +++++++++++++++++++ 5 files changed, 112 insertions(+), 12 deletions(-) rename src/main/kotlin/au/kilemon/messagequeue/queue/nosql/{NoSqlMultiQueue.kt => mongo/MongoMultiQueue.kt} (88%) rename src/main/kotlin/au/kilemon/messagequeue/queue/nosql/{ => mongo}/repository/MongoQueueMessageRepository.kt (81%) create mode 100644 src/test/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/MongoMultiQueueTest.kt diff --git a/src/main/kotlin/au/kilemon/messagequeue/configuration/QueueConfiguration.kt b/src/main/kotlin/au/kilemon/messagequeue/configuration/QueueConfiguration.kt index f0883b8..de569b0 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/configuration/QueueConfiguration.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/configuration/QueueConfiguration.kt @@ -7,6 +7,7 @@ import au.kilemon.messagequeue.message.QueueMessage import au.kilemon.messagequeue.queue.MultiQueue import au.kilemon.messagequeue.queue.cache.redis.RedisMultiQueue import au.kilemon.messagequeue.queue.inmemory.InMemoryMultiQueue +import au.kilemon.messagequeue.queue.nosql.mongo.MongoMultiQueue import au.kilemon.messagequeue.queue.sql.SqlMultiQueue import au.kilemon.messagequeue.settings.MessageQueueSettings import au.kilemon.messagequeue.settings.MultiQueueType @@ -57,13 +58,16 @@ class QueueConfiguration : HasLogger // Default to in-memory var queue: MultiQueue = InMemoryMultiQueue() - if (MultiQueueType.REDIS.toString() == messageQueueSettings.multiQueueType) - { - queue = RedisMultiQueue(messageQueueSettings.redisPrefix, redisTemplate) - } - else if (MultiQueueType.SQL.toString() == messageQueueSettings.multiQueueType) - { - queue = SqlMultiQueue() + when (messageQueueSettings.multiQueueType) { + MultiQueueType.REDIS.toString() -> { + queue = RedisMultiQueue(messageQueueSettings.redisPrefix, redisTemplate) + } + MultiQueueType.SQL.toString() -> { + queue = SqlMultiQueue() + } + MultiQueueType.MONGO.toString() -> { + queue = MongoMultiQueue() + } } queue.initialiseQueueIndex() LOG.info("Initialising [{}] queue as the [{}] is set to [{}].", queue::class.java.name, MessageQueueSettings.MULTI_QUEUE_TYPE, messageQueueSettings.multiQueueType) diff --git a/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/NoSqlMultiQueue.kt b/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/MongoMultiQueue.kt similarity index 88% rename from src/main/kotlin/au/kilemon/messagequeue/queue/nosql/NoSqlMultiQueue.kt rename to src/main/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/MongoMultiQueue.kt index fcbb32b..15ff35d 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/NoSqlMultiQueue.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/MongoMultiQueue.kt @@ -1,16 +1,16 @@ -package au.kilemon.messagequeue.queue.nosql +package au.kilemon.messagequeue.queue.nosql.mongo import au.kilemon.messagequeue.logging.HasLogger import au.kilemon.messagequeue.message.QueueMessage import au.kilemon.messagequeue.queue.MultiQueue -import au.kilemon.messagequeue.queue.nosql.repository.MongoQueueMessageRepository +import au.kilemon.messagequeue.queue.nosql.mongo.repository.MongoQueueMessageRepository import org.slf4j.Logger import org.springframework.beans.factory.annotation.Autowired import org.springframework.context.annotation.Lazy import java.util.* import java.util.concurrent.atomic.AtomicLong -class NoSqlMultiQueue : MultiQueue, HasLogger +class MongoMultiQueue : MultiQueue, HasLogger { override val LOG: Logger = initialiseLogger() diff --git a/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/repository/MongoQueueMessageRepository.kt b/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/repository/MongoQueueMessageRepository.kt similarity index 81% rename from src/main/kotlin/au/kilemon/messagequeue/queue/nosql/repository/MongoQueueMessageRepository.kt rename to src/main/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/repository/MongoQueueMessageRepository.kt index fc86034..6b8d6a6 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/repository/MongoQueueMessageRepository.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/repository/MongoQueueMessageRepository.kt @@ -1,4 +1,4 @@ -package au.kilemon.messagequeue.queue.nosql.repository +package au.kilemon.messagequeue.queue.nosql.mongo.repository import au.kilemon.messagequeue.message.QueueMessageDocument import au.kilemon.messagequeue.queue.repository.QueueMessageRepository diff --git a/src/main/kotlin/au/kilemon/messagequeue/settings/MultiQueueType.kt b/src/main/kotlin/au/kilemon/messagequeue/settings/MultiQueueType.kt index c31e3fc..4e1ff15 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/settings/MultiQueueType.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/settings/MultiQueueType.kt @@ -21,5 +21,10 @@ enum class MultiQueueType /** * Will initialise and connect to a defined SQL database instance to store queue messages against. */ - SQL; + SQL, + + /** + * Initialise and connect to the defined `mongo` store. + */ + MONGO; } diff --git a/src/test/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/MongoMultiQueueTest.kt b/src/test/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/MongoMultiQueueTest.kt new file mode 100644 index 0000000..732f76b --- /dev/null +++ b/src/test/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/MongoMultiQueueTest.kt @@ -0,0 +1,91 @@ +package au.kilemon.messagequeue.queue.nosql.mongo + +import au.kilemon.messagequeue.configuration.QueueConfiguration +import au.kilemon.messagequeue.logging.LoggingConfiguration +import au.kilemon.messagequeue.queue.AbstractMultiQueueTest +import au.kilemon.messagequeue.queue.sql.MySqlMultiQueueTest +import au.kilemon.messagequeue.settings.MessageQueueSettings +import org.junit.jupiter.api.AfterAll +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.extension.ExtendWith +import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest +import org.springframework.boot.test.util.TestPropertyValues +import org.springframework.context.ApplicationContextInitializer +import org.springframework.context.ConfigurableApplicationContext +import org.springframework.context.annotation.Import +import org.springframework.test.context.ContextConfiguration +import org.springframework.test.context.junit.jupiter.SpringExtension +import org.testcontainers.containers.GenericContainer +import org.testcontainers.junit.jupiter.Testcontainers +import org.testcontainers.utility.DockerImageName + +@ExtendWith(SpringExtension::class) +@Testcontainers +@DataJpaTest(properties = ["${MessageQueueSettings.MULTI_QUEUE_TYPE}=MONGO"]) +@ContextConfiguration(initializers = [MongoMultiQueueTest.Initializer::class]) +@Import( *[QueueConfiguration::class, LoggingConfiguration::class, AbstractMultiQueueTest.AbstractMultiQueueTestConfiguration::class] ) +class MongoMultiQueueTest: AbstractMultiQueueTest() +{ + companion object + { + lateinit var mongoDb: GenericContainer<*> + + private const val MONGO_CONTAINER = "mongo:7.0.0" + private const val MONGO_PORT = 27017 + + /** + * Stop the container at the end of all the tests. + */ + @AfterAll + @JvmStatic + fun afterClass() + { + mongoDb.stop() + } + } + + /** + * The test initialiser for [MongoMultiQueueTest] to initialise the container and test properties. + * + * @author github.com/Kilemonn + */ + internal class Initializer : ApplicationContextInitializer + { + /** + * Force start the container, so we can place its host and dynamic ports into the system properties. + * + * Set the environment variables before any of the beans are initialised. + */ + override fun initialize(configurableApplicationContext: ConfigurableApplicationContext) + { + val password = "password" + val username = "root" + val envMap = HashMap() + envMap["ME_CONFIG_MONGODB_ADMINPASSWORD"] = password + envMap["ME_CONFIG_MONGODB_ADMINUSERNAME"] = username + + mongoDb = GenericContainer(DockerImageName.parse(MONGO_CONTAINER)) + .withExposedPorts(MONGO_PORT).withReuse(false).withEnv(envMap) + mongoDb.start() + + val endpoint = "mongodb://${mongoDb.host}:${mongoDb.getMappedPort(MONGO_PORT)}" + + TestPropertyValues.of( + "spring.datasource.url=$endpoint", + "spring.datasource.username=$username", + "spring.datasource.password=$password", + ).applyTo(configurableApplicationContext.environment) + } + } + + /** + * Check the container is running before each test as it's required for the methods to access the [MongoMultiQueue]. + */ + @BeforeEach + fun beforeEach() + { + Assertions.assertTrue(mongoDb.isRunning) + multiQueue.clear() + } +} From fee0cf383894649c21dcae1fb98469c53bd76de8 Mon Sep 17 00:00:00 2001 From: Kilemonn Date: Sat, 26 Aug 2023 10:50:29 +1000 Subject: [PATCH 03/10] Fix mongo test configuration. --- .../queue/nosql/mongo/MongoMultiQueueTest.kt | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/test/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/MongoMultiQueueTest.kt b/src/test/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/MongoMultiQueueTest.kt index 732f76b..bc80099 100644 --- a/src/test/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/MongoMultiQueueTest.kt +++ b/src/test/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/MongoMultiQueueTest.kt @@ -3,13 +3,13 @@ package au.kilemon.messagequeue.queue.nosql.mongo import au.kilemon.messagequeue.configuration.QueueConfiguration import au.kilemon.messagequeue.logging.LoggingConfiguration import au.kilemon.messagequeue.queue.AbstractMultiQueueTest -import au.kilemon.messagequeue.queue.sql.MySqlMultiQueueTest import au.kilemon.messagequeue.settings.MessageQueueSettings import org.junit.jupiter.api.AfterAll import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.extension.ExtendWith -import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest +import org.springframework.boot.test.autoconfigure.data.mongo.DataMongoTest +import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase import org.springframework.boot.test.util.TestPropertyValues import org.springframework.context.ApplicationContextInitializer import org.springframework.context.ConfigurableApplicationContext @@ -22,8 +22,9 @@ import org.testcontainers.utility.DockerImageName @ExtendWith(SpringExtension::class) @Testcontainers -@DataJpaTest(properties = ["${MessageQueueSettings.MULTI_QUEUE_TYPE}=MONGO"]) +@DataMongoTest(properties = ["${MessageQueueSettings.MULTI_QUEUE_TYPE}=MONGO"]) @ContextConfiguration(initializers = [MongoMultiQueueTest.Initializer::class]) +@AutoConfigureTestDatabase(replace = AutoConfigureTestDatabase.Replace.NONE) @Import( *[QueueConfiguration::class, LoggingConfiguration::class, AbstractMultiQueueTest.AbstractMultiQueueTestConfiguration::class] ) class MongoMultiQueueTest: AbstractMultiQueueTest() { @@ -72,9 +73,9 @@ class MongoMultiQueueTest: AbstractMultiQueueTest() val endpoint = "mongodb://${mongoDb.host}:${mongoDb.getMappedPort(MONGO_PORT)}" TestPropertyValues.of( - "spring.datasource.url=$endpoint", - "spring.datasource.username=$username", - "spring.datasource.password=$password", + "spring.data.mongo.host=$endpoint", + "spring.data.mongo.username=$username", + "spring.data.mongo.password=$password", ).applyTo(configurableApplicationContext.environment) } } From 4315531cb5f6500329c2921d17a259a87d0951a8 Mon Sep 17 00:00:00 2001 From: Kilemonn Date: Sat, 26 Aug 2023 11:19:02 +1000 Subject: [PATCH 04/10] Working through mongo db support. --- .../message/PayloadResolvableMessage.kt | 10 --- .../messagequeue/message/QueueMessage.kt | 15 +++- .../message/QueueMessageDocument.kt | 14 +++- .../queue/nosql/mongo/MongoMultiQueue.kt | 26 +++++-- .../repository/MongoQueueMessageRepository.kt | 24 +++++- .../repository/QueueMessageRepository.kt | 78 ------------------- .../repository/SQLQueueMessageRepository.kt | 67 +++++++++++++--- 7 files changed, 123 insertions(+), 111 deletions(-) delete mode 100644 src/main/kotlin/au/kilemon/messagequeue/message/PayloadResolvableMessage.kt delete mode 100644 src/main/kotlin/au/kilemon/messagequeue/queue/repository/QueueMessageRepository.kt diff --git a/src/main/kotlin/au/kilemon/messagequeue/message/PayloadResolvableMessage.kt b/src/main/kotlin/au/kilemon/messagequeue/message/PayloadResolvableMessage.kt deleted file mode 100644 index a6e162c..0000000 --- a/src/main/kotlin/au/kilemon/messagequeue/message/PayloadResolvableMessage.kt +++ /dev/null @@ -1,10 +0,0 @@ -package au.kilemon.messagequeue.message - -import java.io.Serializable - -interface PayloadResolvableMessage: Serializable -{ - fun resolvePayloadObject(): T - - fun removePayload(detailed: Boolean?): T -} diff --git a/src/main/kotlin/au/kilemon/messagequeue/message/QueueMessage.kt b/src/main/kotlin/au/kilemon/messagequeue/message/QueueMessage.kt index 33bb974..3c15fdc 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/message/QueueMessage.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/message/QueueMessage.kt @@ -17,7 +17,7 @@ import javax.persistence.* */ @Entity @Table(name = QueueMessage.TABLE_NAME) // TODO: Schema configuration schema = "\${${MessageQueueSettings.SQL_SCHEMA}:${MessageQueueSettings.SQL_SCHEMA_DEFAULT}}") -class QueueMessage(payload: Any?, @Column(nullable = false) var type: String, @Column(name = "assignedto") var assignedTo: String? = null): Serializable, PayloadResolvableMessage +class QueueMessage(payload: Any?, @Column(nullable = false) var type: String, @Column(name = "assignedto") var assignedTo: String? = null): Serializable { companion object { @@ -50,6 +50,15 @@ class QueueMessage(payload: Any?, @Column(nullable = false) var type: String, @C */ constructor() : this(null, "") + constructor(queueMessageDocument: QueueMessageDocument) : this() + { + this.type = queueMessageDocument.type + this.uuid = queueMessageDocument.uuid + this.id = queueMessageDocument.id + this.payload = queueMessageDocument.payload + this.assignedTo = queueMessageDocument.assignedTo + } + /** * When the [QueueMessage] is read back from a database serialised form, only the * [QueueMessage.payloadBytes] will be persisted, [QueueMessage.payload] will still be `null` by default. @@ -63,7 +72,7 @@ class QueueMessage(payload: Any?, @Column(nullable = false) var type: String, @C * * @return the current instance with the conditionally modified [QueueMessage.payload] member based on the points above */ - override fun resolvePayloadObject(): QueueMessage + fun resolvePayloadObject(): QueueMessage { if (payloadBytes != null && payload == null) { @@ -79,7 +88,7 @@ class QueueMessage(payload: Any?, @Column(nullable = false) var type: String, @C * @param detailed when `true` the [payload] object will be logged as well, otherwise the [payload] will not be contained in the response or `null`. * @return [QueueMessage] that is either a copy of `this` without the payload, or `this` with a resolved payload */ - override fun removePayload(detailed: Boolean?): QueueMessage + fun removePayload(detailed: Boolean?): QueueMessage { if (detailed == false) { diff --git a/src/main/kotlin/au/kilemon/messagequeue/message/QueueMessageDocument.kt b/src/main/kotlin/au/kilemon/messagequeue/message/QueueMessageDocument.kt index 103f1d5..dc64857 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/message/QueueMessageDocument.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/message/QueueMessageDocument.kt @@ -13,18 +13,26 @@ import javax.persistence.* * @author github.com/Kilemonn */ @Document -class QueueMessageDocument(var payload: Any?, @Column(nullable = false) var type: String, @Column(name = "assignedto") var assignedTo: String? = null) +class QueueMessageDocument(var payload: Any?, var type: String, var assignedTo: String? = null) { - @Column(nullable = false, unique = true) var uuid: String = UUID.randomUUID().toString() @JsonIgnore @Id - @GeneratedValue(strategy = GenerationType.IDENTITY) var id: Long? = null /** * Required for JSON deserialisation. */ constructor() : this(null, "") + + constructor(queueMessage: QueueMessage) : this() + { + val resolvedQueueMessage = queueMessage.resolvePayloadObject() + this.type = resolvedQueueMessage.type + this.uuid = resolvedQueueMessage.uuid + this.id = resolvedQueueMessage.id + this.payload = resolvedQueueMessage.payload + this.assignedTo = resolvedQueueMessage.assignedTo + } } diff --git a/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/MongoMultiQueue.kt b/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/MongoMultiQueue.kt index 15ff35d..89beca6 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/MongoMultiQueue.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/MongoMultiQueue.kt @@ -2,6 +2,7 @@ package au.kilemon.messagequeue.queue.nosql.mongo import au.kilemon.messagequeue.logging.HasLogger import au.kilemon.messagequeue.message.QueueMessage +import au.kilemon.messagequeue.message.QueueMessageDocument import au.kilemon.messagequeue.queue.MultiQueue import au.kilemon.messagequeue.queue.nosql.mongo.repository.MongoQueueMessageRepository import org.slf4j.Logger @@ -30,7 +31,8 @@ class MongoMultiQueue : MultiQueue, HasLogger override fun persistMessage(message: QueueMessage) { - TODO("Not yet implemented") + val queueMessageDocument = QueueMessageDocument(message) + queueMessageRepository.save(queueMessageDocument) } override fun getQueueForType(queueType: String): Queue @@ -40,7 +42,7 @@ class MongoMultiQueue : MultiQueue, HasLogger override fun performHealthCheckInternal() { - TODO("Not yet implemented") + queueMessageRepository.existsById(1) } override fun getMessageByUUID(uuid: String): Optional @@ -70,16 +72,30 @@ class MongoMultiQueue : MultiQueue, HasLogger override fun containsUUID(uuid: String): Optional { - TODO("Not yet implemented") + val optionalMessage = queueMessageRepository.findByUuid(uuid) + return if (optionalMessage.isPresent) + { + val message = optionalMessage.get() + LOG.debug("Found queue type [{}] for UUID: [{}].", message.type, uuid) + Optional.of(message.type) + } + else + { + LOG.debug("No queue type exists for UUID: [{}].", uuid) + Optional.empty() + } } override fun addInternal(element: QueueMessage): Boolean { - TODO("Not yet implemented") + val queueMessageDocument = QueueMessageDocument(element) + val saved = queueMessageRepository.save(queueMessageDocument) + return saved.id != null } override fun removeInternal(element: QueueMessage): Boolean { - TODO("Not yet implemented") + val removedCount = queueMessageRepository.deleteByUuid(element.uuid) + return removedCount > 0 } } diff --git a/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/repository/MongoQueueMessageRepository.kt b/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/repository/MongoQueueMessageRepository.kt index 6b8d6a6..2a8dafc 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/repository/MongoQueueMessageRepository.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/repository/MongoQueueMessageRepository.kt @@ -1,11 +1,31 @@ package au.kilemon.messagequeue.queue.nosql.mongo.repository +import au.kilemon.messagequeue.message.QueueMessage import au.kilemon.messagequeue.message.QueueMessageDocument -import au.kilemon.messagequeue.queue.repository.QueueMessageRepository +import org.springframework.data.jpa.repository.Modifying import org.springframework.data.mongodb.repository.MongoRepository +import org.springframework.transaction.annotation.Transactional +import java.util.* -interface MongoQueueMessageRepository: MongoRepository, QueueMessageRepository +interface MongoQueueMessageRepository: MongoRepository { + /** + * Find the entity which has a [QueueMessage.uuid] matching the provided [uuid]. + * + * @param uuid the [QueueMessage.uuid] of the message to find + * @return the [Optional] that may contain the found [QueueMessage] + */ + @Transactional + fun findByUuid(uuid: String): Optional + /** + * Delete a [QueueMessage] by `uuid`. + * + * @param uuid the UUID of the [QueueMessage.uuid] to remove + * @return the number of removed entries (most likely one since the UUID is unique) + */ + @Modifying + @Transactional + fun deleteByUuid(uuid: String): Int } diff --git a/src/main/kotlin/au/kilemon/messagequeue/queue/repository/QueueMessageRepository.kt b/src/main/kotlin/au/kilemon/messagequeue/queue/repository/QueueMessageRepository.kt deleted file mode 100644 index fc7f353..0000000 --- a/src/main/kotlin/au/kilemon/messagequeue/queue/repository/QueueMessageRepository.kt +++ /dev/null @@ -1,78 +0,0 @@ -package au.kilemon.messagequeue.queue.repository - -import au.kilemon.messagequeue.message.QueueMessage -import java.util.* - -/** - * A collection of Repository method specific for [QueueMessage] and queries made against them. - * - * Reference: https://docs.spring.io/spring-data/jpa/docs/current/reference/html/#jpa.query-methods.query-creation - * - * @author github.com/Kilemonn - */ -interface QueueMessageRepository -{ - /** - * Delete a [QueueMessage] by the provided [QueueMessage.type] [String]. - * - * @param type the [QueueMessage.type] to remove entries by - * @return the number of deleted entities - */ - fun deleteByType(type: String): Int - - /** - * Get a distinct [List] of [String] [QueueMessage.type] that currently exist. - * - * @return a [List] of all the existing [QueueMessage.type] as [String]s - */ - fun findDistinctType(): List - - /** - * Get a list of [QueueMessage] which have [QueueMessage.type] matching the provided [type]. - * - * @param type the type to match [QueueMessage.type] with - * @return a [List] of [QueueMessage] who have a matching [QueueMessage.type] with the provided [type] - */ - fun findByTypeOrderByIdAsc(type: String): List - - /** - * Find the entity with the matching [QueueMessage.type] and that has a non-null [QueueMessage.assignedTo]. Sorted by ID ascending. - * - * @param type the type to match [QueueMessage.type] with - * @return a [List] of [QueueMessage] who have a matching [QueueMessage.type] with the provided [type] and non-null [QueueMessage.assignedTo] - */ - fun findByTypeAndAssignedToIsNotNullOrderByIdAsc(type: String): List - - /** - * Find the entity with the matching [QueueMessage.type] and that has [QueueMessage.assignedTo] set to `null`. Sorted by ID ascending. - * - * @param type the type to match [QueueMessage.type] with - * @return a [List] of [QueueMessage] who have a matching [QueueMessage.type] with the provided [type] and `null` [QueueMessage.assignedTo] - */ - fun findByTypeAndAssignedToIsNullOrderByIdAsc(type: String): List - - /** - * Find the entity with the matching [QueueMessage.type] and [QueueMessage.assignedTo]. Sorted by ID ascending. - * - * @param type the type to match [QueueMessage.type] with - * @param assignedTo the identifier to match [QueueMessage.assignedTo] with - * @return a [List] of [QueueMessage] who have a matching [QueueMessage.type] and [QueueMessage.assignedTo] - */ - fun findByTypeAndAssignedToOrderByIdAsc(type: String, assignedTo: String): List - - /** - * Find the entity which has a [QueueMessage.uuid] matching the provided [uuid]. - * - * @param uuid the [QueueMessage.uuid] of the message to find - * @return the [Optional] that may contain the found [QueueMessage] - */ - fun findByUuid(uuid: String): Optional - - /** - * Delete a [QueueMessage] by `uuid`. - * - * @param uuid the UUID of the [QueueMessage.uuid] to remove - * @return the number of removed entries (most likely one since the UUID is unique) - */ - fun deleteByUuid(uuid: String): Int -} diff --git a/src/main/kotlin/au/kilemon/messagequeue/queue/sql/repository/SQLQueueMessageRepository.kt b/src/main/kotlin/au/kilemon/messagequeue/queue/sql/repository/SQLQueueMessageRepository.kt index 2a62984..9f06a4c 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/queue/sql/repository/SQLQueueMessageRepository.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/queue/sql/repository/SQLQueueMessageRepository.kt @@ -1,7 +1,6 @@ package au.kilemon.messagequeue.queue.sql.repository import au.kilemon.messagequeue.message.QueueMessage -import au.kilemon.messagequeue.queue.repository.QueueMessageRepository import org.springframework.data.jpa.repository.JpaRepository import org.springframework.data.jpa.repository.Modifying import org.springframework.data.jpa.repository.Query @@ -18,33 +17,81 @@ import java.util.* * @author github.com/Kilemonn */ @Repository -interface SQLQueueMessageRepository: JpaRepository, QueueMessageRepository +interface SQLQueueMessageRepository: JpaRepository { + /** + * Delete a [QueueMessage] by the provided [QueueMessage.type] [String]. + * + * @param type the [QueueMessage.type] to remove entries by + * @return the number of deleted entities + */ @Modifying @Transactional @Query("DELETE FROM QueueMessage WHERE type = ?1") - override fun deleteByType(type: String): Int + fun deleteByType(type: String): Int + /** + * Get a distinct [List] of [String] [QueueMessage.type] that currently exist. + * + * @return a [List] of all the existing [QueueMessage.type] as [String]s + */ @Transactional @Query("SELECT DISTINCT type FROM QueueMessage") - override fun findDistinctType(): List + fun findDistinctType(): List + /** + * Get a list of [QueueMessage] which have [QueueMessage.type] matching the provided [type]. + * + * @param type the type to match [QueueMessage.type] with + * @return a [List] of [QueueMessage] who have a matching [QueueMessage.type] with the provided [type] + */ @Transactional - override fun findByTypeOrderByIdAsc(type: String): List + fun findByTypeOrderByIdAsc(type: String): List + /** + * Find the entity with the matching [QueueMessage.type] and that has a non-null [QueueMessage.assignedTo]. Sorted by ID ascending. + * + * @param type the type to match [QueueMessage.type] with + * @return a [List] of [QueueMessage] who have a matching [QueueMessage.type] with the provided [type] and non-null [QueueMessage.assignedTo] + */ @Transactional - override fun findByTypeAndAssignedToIsNotNullOrderByIdAsc(type: String): List + fun findByTypeAndAssignedToIsNotNullOrderByIdAsc(type: String): List + /** + * Find the entity with the matching [QueueMessage.type] and that has [QueueMessage.assignedTo] set to `null`. Sorted by ID ascending. + * + * @param type the type to match [QueueMessage.type] with + * @return a [List] of [QueueMessage] who have a matching [QueueMessage.type] with the provided [type] and `null` [QueueMessage.assignedTo] + */ @Transactional - override fun findByTypeAndAssignedToIsNullOrderByIdAsc(type: String): List + fun findByTypeAndAssignedToIsNullOrderByIdAsc(type: String): List + /** + * Find the entity with the matching [QueueMessage.type] and [QueueMessage.assignedTo]. Sorted by ID ascending. + * + * @param type the type to match [QueueMessage.type] with + * @param assignedTo the identifier to match [QueueMessage.assignedTo] with + * @return a [List] of [QueueMessage] who have a matching [QueueMessage.type] and [QueueMessage.assignedTo] + */ @Transactional - override fun findByTypeAndAssignedToOrderByIdAsc(type: String, assignedTo: String): List + fun findByTypeAndAssignedToOrderByIdAsc(type: String, assignedTo: String): List + /** + * Find the entity which has a [QueueMessage.uuid] matching the provided [uuid]. + * + * @param uuid the [QueueMessage.uuid] of the message to find + * @return the [Optional] that may contain the found [QueueMessage] + */ @Transactional - override fun findByUuid(uuid: String): Optional + fun findByUuid(uuid: String): Optional + /** + * Delete a [QueueMessage] by `uuid`. + * + * @param uuid the UUID of the [QueueMessage.uuid] to remove + * @return the number of removed entries (most likely one since the UUID is unique) + */ @Modifying @Transactional - override fun deleteByUuid(uuid: String): Int + fun deleteByUuid(uuid: String): Int } From f9e66c3b9944dedd134a0a78fd8e72909851bf0b Mon Sep 17 00:00:00 2001 From: Kilemonn Date: Wed, 30 Aug 2023 21:48:09 +1000 Subject: [PATCH 05/10] Work on mongo method implementation to satisfy tests. --- .../message/QueueMessageDocument.kt | 7 ++- .../queue/exception/MessageUpdateException.kt | 2 +- .../queue/nosql/mongo/MongoMultiQueue.kt | 47 ++++++++++++++++--- .../repository/MongoQueueMessageRepository.kt | 30 +++++++----- .../messagequeue/queue/sql/SqlMultiQueue.kt | 8 ++-- .../rest/controller/MessageQueueController.kt | 4 +- .../queue/AbstractMultiQueueTest.kt | 3 +- .../queue/nosql/mongo/MongoMultiQueueTest.kt | 17 ++++--- 8 files changed, 85 insertions(+), 33 deletions(-) diff --git a/src/main/kotlin/au/kilemon/messagequeue/message/QueueMessageDocument.kt b/src/main/kotlin/au/kilemon/messagequeue/message/QueueMessageDocument.kt index dc64857..749deed 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/message/QueueMessageDocument.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/message/QueueMessageDocument.kt @@ -12,9 +12,14 @@ import javax.persistence.* * * @author github.com/Kilemonn */ -@Document +@Document(value = QueueMessageDocument.DOCUMENT_NAME) class QueueMessageDocument(var payload: Any?, var type: String, var assignedTo: String? = null) { + companion object + { + const val DOCUMENT_NAME: String = "multiqueuemessages" + } + var uuid: String = UUID.randomUUID().toString() @JsonIgnore diff --git a/src/main/kotlin/au/kilemon/messagequeue/queue/exception/MessageUpdateException.kt b/src/main/kotlin/au/kilemon/messagequeue/queue/exception/MessageUpdateException.kt index 044620a..3e09607 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/queue/exception/MessageUpdateException.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/queue/exception/MessageUpdateException.kt @@ -5,4 +5,4 @@ package au.kilemon.messagequeue.queue.exception * * @author github.com/Kilemonn */ -class MessageUpdateException(uuid: String) : Exception("Unable to update message with UUID [$uuid] as it either does not exist (and cannot be updated) or there was an underlying error in the storage mechanism.") +class MessageUpdateException(uuid: String, exception: Exception? = null) : Exception("Unable to update message with UUID [$uuid] as it either does not exist (and cannot be updated) or there was an underlying error in the storage mechanism.", exception) diff --git a/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/MongoMultiQueue.kt b/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/MongoMultiQueue.kt index 89beca6..aac67c3 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/MongoMultiQueue.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/MongoMultiQueue.kt @@ -4,11 +4,13 @@ import au.kilemon.messagequeue.logging.HasLogger import au.kilemon.messagequeue.message.QueueMessage import au.kilemon.messagequeue.message.QueueMessageDocument import au.kilemon.messagequeue.queue.MultiQueue +import au.kilemon.messagequeue.queue.exception.MessageUpdateException import au.kilemon.messagequeue.queue.nosql.mongo.repository.MongoQueueMessageRepository import org.slf4j.Logger import org.springframework.beans.factory.annotation.Autowired import org.springframework.context.annotation.Lazy import java.util.* +import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicLong class MongoMultiQueue : MultiQueue, HasLogger @@ -32,12 +34,20 @@ class MongoMultiQueue : MultiQueue, HasLogger override fun persistMessage(message: QueueMessage) { val queueMessageDocument = QueueMessageDocument(message) - queueMessageRepository.save(queueMessageDocument) + try + { + queueMessageRepository.save(queueMessageDocument) + } + catch (ex: Exception) + { + throw MessageUpdateException(message.uuid, ex) + } } override fun getQueueForType(queueType: String): Queue { - TODO("Not yet implemented") + val entries = queueMessageRepository.findByTypeOrderByIdAsc(queueType) + return ConcurrentLinkedQueue(entries.map { entry -> QueueMessage(entry) }) } override fun performHealthCheckInternal() @@ -47,27 +57,50 @@ class MongoMultiQueue : MultiQueue, HasLogger override fun getMessageByUUID(uuid: String): Optional { - return queueMessageRepository.findByUuid(uuid) + val documentMessage = queueMessageRepository.findByUuid(uuid) + return if (documentMessage.isPresent) + { + Optional.of(QueueMessage(documentMessage.get())) + } + else + { + Optional.empty() + } } override fun clearForTypeInternal(queueType: String): Int { - TODO("Not yet implemented") + val amountCleared = queueMessageRepository.deleteByType(queueType) + LOG.debug("Cleared existing queue for type [{}]. Removed [{}] message entries.", queueType, amountCleared) + return amountCleared } override fun isEmptyForType(queueType: String): Boolean { - TODO("Not yet implemented") + return queueMessageRepository.findByTypeOrderByIdAsc(queueType).isEmpty() } override fun pollInternal(queueType: String): Optional { - TODO("Not yet implemented") + val messages = queueMessageRepository.findByTypeOrderByIdAsc(queueType) + return if (messages.isNotEmpty()) + { + return Optional.of(QueueMessage(messages[0])) + } + else + { + Optional.empty() + } } + /** + * The [includeEmpty] value makes no difference it is always effectively `false`. + */ override fun keys(includeEmpty: Boolean): Set { - TODO("Not yet implemented") + val keySet = queueMessageRepository.getDistinctTypes().toSet() + LOG.debug("Total amount of queue keys [{}].", keySet.size) + return keySet } override fun containsUUID(uuid: String): Optional diff --git a/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/repository/MongoQueueMessageRepository.kt b/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/repository/MongoQueueMessageRepository.kt index 2a8dafc..55a3958 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/repository/MongoQueueMessageRepository.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/repository/MongoQueueMessageRepository.kt @@ -1,31 +1,39 @@ package au.kilemon.messagequeue.queue.nosql.mongo.repository -import au.kilemon.messagequeue.message.QueueMessage import au.kilemon.messagequeue.message.QueueMessageDocument import org.springframework.data.jpa.repository.Modifying +import org.springframework.data.mongodb.repository.Aggregation import org.springframework.data.mongodb.repository.MongoRepository -import org.springframework.transaction.annotation.Transactional import java.util.* interface MongoQueueMessageRepository: MongoRepository { /** - * Find the entity which has a [QueueMessage.uuid] matching the provided [uuid]. * - * @param uuid the [QueueMessage.uuid] of the message to find - * @return the [Optional] that may contain the found [QueueMessage] */ - @Transactional - fun findByUuid(uuid: String): Optional + @Aggregation(pipeline = [ "{ '\$group': { '_id' : '\$type' } }" ]) + fun getDistinctTypes(): List + + /** + * + */ + fun findByTypeOrderByIdAsc(type: String): List + + /** + * + */ + fun findByUuid(uuid: String): Optional + + /** + * + */ + @Modifying + fun deleteByType(type:String): Int /** - * Delete a [QueueMessage] by `uuid`. * - * @param uuid the UUID of the [QueueMessage.uuid] to remove - * @return the number of removed entries (most likely one since the UUID is unique) */ @Modifying - @Transactional fun deleteByUuid(uuid: String): Int } diff --git a/src/main/kotlin/au/kilemon/messagequeue/queue/sql/SqlMultiQueue.kt b/src/main/kotlin/au/kilemon/messagequeue/queue/sql/SqlMultiQueue.kt index 0e8c4ca..f45de96 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/queue/sql/SqlMultiQueue.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/queue/sql/SqlMultiQueue.kt @@ -39,7 +39,7 @@ class SqlMultiQueue : MultiQueue, HasLogger override fun getQueueForType(queueType: String): Queue { val entries = queueMessageRepository.findByTypeOrderByIdAsc(queueType) - return ConcurrentLinkedQueue(entries.map { entry -> entry.resolvePayloadObject() as QueueMessage }) + return ConcurrentLinkedQueue(entries.map { entry -> entry.resolvePayloadObject() }) } /** @@ -56,7 +56,7 @@ class SqlMultiQueue : MultiQueue, HasLogger queueMessageRepository.findByTypeAndAssignedToOrderByIdAsc(queueType, assignedTo) } - return ConcurrentLinkedQueue(entries.map { entry -> entry.resolvePayloadObject() as QueueMessage }) + return ConcurrentLinkedQueue(entries.map { entry -> entry.resolvePayloadObject() }) } /** @@ -65,7 +65,7 @@ class SqlMultiQueue : MultiQueue, HasLogger override fun getUnassignedMessagesForType(queueType: String): Queue { val entries = queueMessageRepository.findByTypeAndAssignedToIsNullOrderByIdAsc(queueType) - return ConcurrentLinkedQueue(entries.map { entry -> entry.resolvePayloadObject() as QueueMessage }) + return ConcurrentLinkedQueue(entries.map { entry -> entry.resolvePayloadObject() }) } override fun performHealthCheckInternal() @@ -95,7 +95,7 @@ class SqlMultiQueue : MultiQueue, HasLogger val messages = queueMessageRepository.findByTypeOrderByIdAsc(queueType) return if (messages.isNotEmpty()) { - return Optional.of(messages[0].resolvePayloadObject() as QueueMessage) + return Optional.of(messages[0].resolvePayloadObject()) } else { diff --git a/src/main/kotlin/au/kilemon/messagequeue/rest/controller/MessageQueueController.kt b/src/main/kotlin/au/kilemon/messagequeue/rest/controller/MessageQueueController.kt index 3b0b438..83ee25d 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/rest/controller/MessageQueueController.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/rest/controller/MessageQueueController.kt @@ -269,7 +269,7 @@ open class MessageQueueController : HasLogger { LOG.debug("Retrieving all entry details from queue with type [{}].", queueType) val queueForType: Queue = messageQueue.getQueueForType(queueType) - val queueDetails = queueForType.stream().map { message -> message.removePayload(detailed) as QueueMessage }.collect(Collectors.toList()) + val queueDetails = queueForType.stream().map { message -> message.removePayload(detailed) }.collect(Collectors.toList()) responseMap[queueType] = queueDetails } else @@ -279,7 +279,7 @@ open class MessageQueueController : HasLogger { // No need to empty check since we passed `false` to `keys()` above val queueForType: Queue = messageQueue.getQueueForType(key) - val queueDetails = queueForType.stream().map { message -> message.removePayload(detailed) as QueueMessage }.collect(Collectors.toList()) + val queueDetails = queueForType.stream().map { message -> message.removePayload(detailed) }.collect(Collectors.toList()) responseMap[key] = queueDetails } } diff --git a/src/test/kotlin/au/kilemon/messagequeue/queue/AbstractMultiQueueTest.kt b/src/test/kotlin/au/kilemon/messagequeue/queue/AbstractMultiQueueTest.kt index a4f0b5d..6de86fd 100644 --- a/src/test/kotlin/au/kilemon/messagequeue/queue/AbstractMultiQueueTest.kt +++ b/src/test/kotlin/au/kilemon/messagequeue/queue/AbstractMultiQueueTest.kt @@ -4,6 +4,7 @@ import au.kilemon.messagequeue.message.QueueMessage import au.kilemon.messagequeue.queue.exception.DuplicateMessageException import au.kilemon.messagequeue.queue.exception.MessageUpdateException import au.kilemon.messagequeue.queue.inmemory.InMemoryMultiQueue +import au.kilemon.messagequeue.queue.nosql.mongo.MongoMultiQueue import au.kilemon.messagequeue.queue.sql.SqlMultiQueue import au.kilemon.messagequeue.rest.model.Payload import au.kilemon.messagequeue.rest.model.PayloadEnum @@ -248,7 +249,7 @@ abstract class AbstractMultiQueueTest @Test fun testInitialiseQueueIndex_reInitialise() { - if (multiQueue is SqlMultiQueue) + if (multiQueue is SqlMultiQueue || multiQueue is MongoMultiQueue) { return } diff --git a/src/test/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/MongoMultiQueueTest.kt b/src/test/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/MongoMultiQueueTest.kt index bc80099..8e0e7fd 100644 --- a/src/test/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/MongoMultiQueueTest.kt +++ b/src/test/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/MongoMultiQueueTest.kt @@ -63,19 +63,24 @@ class MongoMultiQueueTest: AbstractMultiQueueTest() val password = "password" val username = "root" val envMap = HashMap() - envMap["ME_CONFIG_MONGODB_ADMINPASSWORD"] = password - envMap["ME_CONFIG_MONGODB_ADMINUSERNAME"] = username + envMap["MONGO_INITDB_ROOT_PASSWORD"] = password + envMap["MONGO_INITDB_ROOT_USERNAME"] = username mongoDb = GenericContainer(DockerImageName.parse(MONGO_CONTAINER)) .withExposedPorts(MONGO_PORT).withReuse(false).withEnv(envMap) mongoDb.start() - val endpoint = "mongodb://${mongoDb.host}:${mongoDb.getMappedPort(MONGO_PORT)}" + val databaseName = "MultiQueue" + // mongodb://:@:/ + val endpoint = "mongodb://$username:$password@${mongoDb.host}:${mongoDb.getMappedPort(MONGO_PORT)}/$databaseName?authSource=admin" TestPropertyValues.of( - "spring.data.mongo.host=$endpoint", - "spring.data.mongo.username=$username", - "spring.data.mongo.password=$password", +// "spring.data.mongodb.host=${mongoDb.host}", +// "spring.data.mongodb.database=$databaseName", +// "spring.data.mongodb.username=$username", +// "spring.data.mongodb.password=$password", +// "spring.data.mongodb.port=${mongoDb.getMappedPort(MONGO_PORT)}", + "spring.data.mongodb.uri=$endpoint" ).applyTo(configurableApplicationContext.environment) } } From b5a2f96f2081060fb74835e3f35ddfdd4579d05a Mon Sep 17 00:00:00 2001 From: Kilemonn Date: Wed, 30 Aug 2023 22:28:07 +1000 Subject: [PATCH 06/10] Add custom index handling for mongo since the ID is not generated by mongo DB and needs to be managed by us. The index is shared across all multiqueues. --- .../kilemon/messagequeue/queue/MultiQueue.kt | 12 ++++++- .../queue/nosql/mongo/MongoMultiQueue.kt | 34 +++++++++++++++++++ .../queue/AbstractMultiQueueTest.kt | 23 +++++++++++++ 3 files changed, 68 insertions(+), 1 deletion(-) diff --git a/src/main/kotlin/au/kilemon/messagequeue/queue/MultiQueue.kt b/src/main/kotlin/au/kilemon/messagequeue/queue/MultiQueue.kt index 3ca0352..15ef212 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/queue/MultiQueue.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/queue/MultiQueue.kt @@ -262,10 +262,20 @@ interface MultiQueue: Queue, HasLogger */ fun clearForType(queueType: String): Int { - maxQueueIndex.remove(queueType) + clearQueueIndexForType(queueType) return clearForTypeInternal(queueType) } + /** + * Clear the [MultiQueue.maxQueueIndex] entry matching the provided key [queueType]. + * + * @param queueType the [String] of the [Queue] to clear + */ + fun clearQueueIndexForType(queueType: String) + { + maxQueueIndex.remove(queueType) + } + /** * Indicates whether the underlying [Queue] for the provided [String] is empty. By calling [Queue.isEmpty]. * diff --git a/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/MongoMultiQueue.kt b/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/MongoMultiQueue.kt index aac67c3..5428f50 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/MongoMultiQueue.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/MongoMultiQueue.kt @@ -15,6 +15,11 @@ import java.util.concurrent.atomic.AtomicLong class MongoMultiQueue : MultiQueue, HasLogger { + companion object + { + const val INDEX_ID = "index_id" + } + override val LOG: Logger = initialiseLogger() override lateinit var maxQueueIndex: HashMap @@ -131,4 +136,33 @@ class MongoMultiQueue : MultiQueue, HasLogger val removedCount = queueMessageRepository.deleteByUuid(element.uuid) return removedCount > 0 } + + /** + * Overriding to use the constant [INDEX_ID] for all look-ups since the ID is shared and needs to be assigned to + * the [QueueMessageDocument] before it is created. + */ + override fun getAndIncrementQueueIndex(queueType: String): Optional + { + return super.getAndIncrementQueueIndex(INDEX_ID) + } + + /** + * Override to never clear the queue index for the type, since it's a shared index map. + */ + override fun clearQueueIndexForType(queueType: String) + { + + } + + /** + * Clear the [maxQueueIndex] if the entire map is cleared. + * + * + * Since [clearQueueIndexForType] is not clearing any of map entries. + */ + override fun clear() + { + super.clear() + maxQueueIndex.clear() + } } diff --git a/src/test/kotlin/au/kilemon/messagequeue/queue/AbstractMultiQueueTest.kt b/src/test/kotlin/au/kilemon/messagequeue/queue/AbstractMultiQueueTest.kt index 6de86fd..54ba2c2 100644 --- a/src/test/kotlin/au/kilemon/messagequeue/queue/AbstractMultiQueueTest.kt +++ b/src/test/kotlin/au/kilemon/messagequeue/queue/AbstractMultiQueueTest.kt @@ -288,8 +288,31 @@ abstract class AbstractMultiQueueTest if (multiQueue is SqlMultiQueue) { + // Ensure that we always return an empty optional Assertions.assertFalse(multiQueue.getAndIncrementQueueIndex(queueType).isPresent) } + else if (multiQueue is MongoMultiQueue) + { + // Ensure that no matter the provided queueType argument, that the single entry is always incremented + Assertions.assertNull(multiQueue.maxQueueIndex[queueType]) + Assertions.assertNull(multiQueue.maxQueueIndex[MongoMultiQueue.INDEX_ID]) + + Assertions.assertEquals(1, multiQueue.getAndIncrementQueueIndex(queueType).get()) + Assertions.assertEquals(2, multiQueue.getAndIncrementQueueIndex(MongoMultiQueue.INDEX_ID).get()) + Assertions.assertEquals(3, multiQueue.getAndIncrementQueueIndex(queueType).get()) + Assertions.assertEquals(4, multiQueue.getAndIncrementQueueIndex(MongoMultiQueue.INDEX_ID).get()) + Assertions.assertEquals(5, multiQueue.getAndIncrementQueueIndex(queueType).get()) + + multiQueue.clearForType(queueType) + Assertions.assertFalse(multiQueue.maxQueueIndex.isEmpty()) + Assertions.assertNull(multiQueue.maxQueueIndex[queueType]) + Assertions.assertNotNull(multiQueue.maxQueueIndex[MongoMultiQueue.INDEX_ID]) + + multiQueue.clear() + Assertions.assertTrue(multiQueue.maxQueueIndex.isEmpty()) + Assertions.assertNull(multiQueue.maxQueueIndex[queueType]) + Assertions.assertNull(multiQueue.maxQueueIndex[MongoMultiQueue.INDEX_ID]) + } else { From 4a31fa6deaa666918c66262f3e0856144b678e01 Mon Sep 17 00:00:00 2001 From: Kilemonn Date: Wed, 30 Aug 2023 22:46:13 +1000 Subject: [PATCH 07/10] Update readme, add to settings endpoint response. --- README.md | 49 +++++++++++- .../settings/MessageQueueSettings.kt | 76 +++++++++++++++++++ .../rest/controller/SettingsControllerTest.kt | 8 ++ 3 files changed, 130 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index c3b9de4..2390d08 100644 --- a/README.md +++ b/README.md @@ -8,9 +8,7 @@ The message storage mechanisms supported are: - In-memory (default) - Redis (stand alone and sentinel support) - SQL Database (MySQL, PostgreSQL) - -With plans to add support for the following mechanisms: -- NoSQL +- No SQL Database (Mongo) ## Rest API Documentation @@ -138,6 +136,51 @@ environment: - spring.datasource.password=5up3r5tR0nG! ``` +## NO SQL (Mongo DB) + +The application can be set into `MONGO` mode to interface with a NoSQL database. Similarly to the others you can set the type with `MULTI_QUEUE_TYPE=MONGO`. + +### NoSQL Environment Properties + +You can either specify all properties individually via, `spring.data.mongodb.host`, `spring.data.mongodb.port`, `spring.data.mongodb.database`, `spring.data.mongodb.username`, `spring.data.mongodb.password`. +Or you can provide all together in a single property: `spring.data.mongodb.uri`. + +#### spring.data.mongodb.host + +***This property is required unless `spring.data.mongodb.uri` is provided***. + +This is the host that the mongo DB is accessible from. + +#### spring.data.mongodb.database + +***This property is required unless `spring.data.mongodb.uri` is provided***. + +This is the database that should be connected to and where the related documents will be created. + +#### spring.data.mongodb.username + +***This property is required unless `spring.data.mongodb.uri` is provided***. + +This is the username/account name used to access the database at the configured endpoint. + +#### spring.data.mongodb.password + +***This property is required unless `spring.data.mongodb.uri` is provided***. + +This is the password used to access the database at the configured endpoint. + +#### spring.data.mongodb.port + +***This property is required unless `spring.data.mongodb.uri` is provided***. + +The port that the mongo db has exposed. + +#### spring.data.mongodb.uri + +***This property is required unless the above properties are already provided***. + +The whole url can be provided in the following format: `mongodb://:@:/` for example: `mongodb://root:password@localhost:27107/messagequeue`. + --- ## HTTPS diff --git a/src/main/kotlin/au/kilemon/messagequeue/settings/MessageQueueSettings.kt b/src/main/kotlin/au/kilemon/messagequeue/settings/MessageQueueSettings.kt index 7749127..2338128 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/settings/MessageQueueSettings.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/settings/MessageQueueSettings.kt @@ -61,6 +61,16 @@ class MessageQueueSettings const val SQL_USERNAME: String = "spring.datasource.username" const val SQL_PASSWORD: String = "spring.datasource.password" + /** + * Start MONGO related properties + */ + const val MONGO_HOST: String = "spring.data.mongodb.host" + const val MONGO_DATABASE: String = "spring.data.mongodb.database" + const val MONGO_PORT: String = "spring.data.mongodb.port" + const val MONGO_USERNAME: String = "spring.data.mongodb.username" + const val MONGO_PASSWORD: String = "spring.data.mongodb.password" + const val MONGO_URI: String = "spring.data.mongodb.uri" + /** * SQL Schema properties */ @@ -167,4 +177,70 @@ class MessageQueueSettings // @JsonProperty(SQL_PASSWORD) // @Value("\${$SQL_PASSWORD:}") // lateinit var sqlPassword: String + + /** + * Required when [MultiQueueType.MONGO] is used and [mongoUri] is empty. + * It specifies the host name that the mongo db is available at. + */ + @SerializedName(MONGO_HOST) + @JsonProperty(MONGO_HOST) + @Value("\${$MONGO_HOST:}") + @get:Generated + @set:Generated + lateinit var mongoHost: String + + /** + * Required when [MultiQueueType.MONGO] is used and [mongoUri] is empty. + * It specifies the port that the mongo db is available on. + */ + @SerializedName(MONGO_PORT) + @JsonProperty(MONGO_PORT) + @Value("\${$MONGO_PORT:}") + @get:Generated + @set:Generated + lateinit var mongoPort: String + + /** + * Required when [MultiQueueType.MONGO] is used and [mongoUri] is empty. + * It specifies the database you wish to connect to. + */ + @SerializedName(MONGO_DATABASE) + @JsonProperty(MONGO_DATABASE) + @Value("\${$MONGO_DATABASE:}") + @get:Generated + @set:Generated + lateinit var mongoDatabase: String + + /** + * Required when [MultiQueueType.MONGO] is used and [mongoUri] is empty. + * It specifies the username that you wish to connect with. + */ + @SerializedName(MONGO_USERNAME) + @JsonProperty(MONGO_USERNAME) + @Value("\${$MONGO_USERNAME:}") + @get:Generated + @set:Generated + lateinit var mongoUsername: String + + /** + * Required when [MultiQueueType.MONGO] is used and [mongoUri] is empty. + * It specifies the password for the user that you wish to connect with. + */ + // TODO: Commenting out since it is unused and returned in the settings endpoint without masking + // @JsonIgnore + // @SerializedName(MONGO_PASSWORD) + // @JsonProperty(MONGO_PASSWORD) + // @Value("\${MONGO_PASSWORD:}") + // lateinit var mongoPassword: String + + /** + * Required when [MultiQueueType.MONGO] is used and the above mongo properties are empty. + * It specifies all properties of the mongo connection in the format of `mongodb://:@:/`. + */ + @SerializedName(MONGO_URI) + @JsonProperty(MONGO_URI) + @Value("\${$MONGO_URI:}") + @get:Generated + @set:Generated + lateinit var mongoUri: String } diff --git a/src/test/kotlin/au/kilemon/messagequeue/rest/controller/SettingsControllerTest.kt b/src/test/kotlin/au/kilemon/messagequeue/rest/controller/SettingsControllerTest.kt index 650b3b4..9c6d294 100644 --- a/src/test/kotlin/au/kilemon/messagequeue/rest/controller/SettingsControllerTest.kt +++ b/src/test/kotlin/au/kilemon/messagequeue/rest/controller/SettingsControllerTest.kt @@ -67,11 +67,19 @@ class SettingsControllerTest .andReturn() val settings = gson.fromJson(mvcResult.response.contentAsString, MessageQueueSettings::class.java) Assertions.assertEquals(MultiQueueType.IN_MEMORY.toString(), settings.multiQueueType) + Assertions.assertTrue(settings.redisPrefix.isEmpty()) Assertions.assertEquals(MessageQueueSettings.REDIS_ENDPOINT_DEFAULT, settings.redisEndpoint) Assertions.assertEquals("false", settings.redisUseSentinels) Assertions.assertEquals(MessageQueueSettings.REDIS_MASTER_NAME_DEFAULT, settings.redisMasterName) + Assertions.assertTrue(settings.sqlEndpoint.isEmpty()) Assertions.assertTrue(settings.sqlUsername.isEmpty()) + + Assertions.assertTrue(settings.mongoHost.isEmpty()) + Assertions.assertTrue(settings.mongoPort.isEmpty()) + Assertions.assertTrue(settings.mongoDatabase.isEmpty()) + Assertions.assertTrue(settings.mongoUsername.isEmpty()) + Assertions.assertTrue(settings.mongoUri.isEmpty()) } } From 92e65c33728881ffa929a6318c725aeef80a762f Mon Sep 17 00:00:00 2001 From: Kilemonn Date: Wed, 30 Aug 2023 22:48:01 +1000 Subject: [PATCH 08/10] Update comments. --- .../messagequeue/settings/MessageQueueSettings.kt | 11 ----------- .../rest/controller/SettingsControllerTest.kt | 1 + 2 files changed, 1 insertion(+), 11 deletions(-) diff --git a/src/main/kotlin/au/kilemon/messagequeue/settings/MessageQueueSettings.kt b/src/main/kotlin/au/kilemon/messagequeue/settings/MessageQueueSettings.kt index 2338128..50587e1 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/settings/MessageQueueSettings.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/settings/MessageQueueSettings.kt @@ -15,17 +15,6 @@ import org.springframework.stereotype.Component * - The type of `MultiQueue` being used * - Other utility configuration for the application to use. * - * This does not hold the dialect and driver information for the database mode. - * This is the dialect that hibernate will use when interacting with the underlying database. - * Supported dialects are listed below: - * - MySQL (e.g. `org.hibernate.dialect.MySQLDialect`) - * - Postgresql (e.g. `org.hibernate.dialect.PostgreSQLDialect`) - * - * Defines the underlying driver which is used to connect to the requested database. - * Currently supports: - * - MySQL (e.g. `com.mysql.jdbc.Driver`) - * - Postgresql (e.g. `org.postgresql.Driver`) - * * When `SQL` is used, the following property must be provided: * `spring.jpa.hibernate.ddl-auto=create` * This will ensure the underlying tables will be created on start up if they do not exist. diff --git a/src/test/kotlin/au/kilemon/messagequeue/rest/controller/SettingsControllerTest.kt b/src/test/kotlin/au/kilemon/messagequeue/rest/controller/SettingsControllerTest.kt index 9c6d294..4a2fd76 100644 --- a/src/test/kotlin/au/kilemon/messagequeue/rest/controller/SettingsControllerTest.kt +++ b/src/test/kotlin/au/kilemon/messagequeue/rest/controller/SettingsControllerTest.kt @@ -66,6 +66,7 @@ class SettingsControllerTest .andExpect(MockMvcResultMatchers.status().isOk) .andReturn() val settings = gson.fromJson(mvcResult.response.contentAsString, MessageQueueSettings::class.java) + Assertions.assertEquals(MultiQueueType.IN_MEMORY.toString(), settings.multiQueueType) Assertions.assertTrue(settings.redisPrefix.isEmpty()) From e00842cee0cca04e6794099dc85b7a9597bbb692 Mon Sep 17 00:00:00 2001 From: Kilemonn Date: Wed, 30 Aug 2023 22:58:45 +1000 Subject: [PATCH 09/10] Add docs. --- .../queue/nosql/mongo/MongoMultiQueue.kt | 6 +++++ .../repository/MongoQueueMessageRepository.kt | 23 +++++++++++++++++-- .../queue/nosql/mongo/MongoMultiQueueTest.kt | 6 +++++ 3 files changed, 33 insertions(+), 2 deletions(-) diff --git a/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/MongoMultiQueue.kt b/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/MongoMultiQueue.kt index 5428f50..53ba212 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/MongoMultiQueue.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/MongoMultiQueue.kt @@ -13,6 +13,12 @@ import java.util.* import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicLong +/** + * A NoSql mongo backed [MultiQueue]. All operations are performed directly on the database it is the complete source of truth. + * It allows the messages to never go out of sync in a case where there are multiple [MultiQueue]s working on the same data source. + * + * @author github.com/Kilemonn + */ class MongoMultiQueue : MultiQueue, HasLogger { companion object diff --git a/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/repository/MongoQueueMessageRepository.kt b/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/repository/MongoQueueMessageRepository.kt index 55a3958..edd998f 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/repository/MongoQueueMessageRepository.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/repository/MongoQueueMessageRepository.kt @@ -6,33 +6,52 @@ import org.springframework.data.mongodb.repository.Aggregation import org.springframework.data.mongodb.repository.MongoRepository import java.util.* - +/** + * A [MongoRepository] specific for [QueueMessageDocument] and queries made against them. + * Defines additional specific queries required for interacting with [QueueMessageDocument]s. + * + * @author github.com/Kilemonn + */ interface MongoQueueMessageRepository: MongoRepository { /** + * Get a distinct [List] of [String] [QueueMessageDocument.type] that currently exist. * + * @return a [List] of all the existing [QueueMessageDocument.type] as [String]s */ @Aggregation(pipeline = [ "{ '\$group': { '_id' : '\$type' } }" ]) fun getDistinctTypes(): List /** + * Get a list of [QueueMessageDocument] which have [QueueMessageDocument.type] matching the provided [type]. * + * @param type the type to match [QueueMessageDocument.type] with + * @return a [List] of [QueueMessageDocument] who have a matching [QueueMessageDocument.type] with the provided [type] */ fun findByTypeOrderByIdAsc(type: String): List /** + * Find and return a [QueueMessageDocument] that matches the provided [uuid]. * + * @param uuid the uuid that the found [QueueMessageDocument] should match + * @return the matching [QueueMessageDocument] */ fun findByUuid(uuid: String): Optional /** + * Delete all [QueueMessageDocument] that have a [QueueMessageDocument.type] that matches the provided [type]. * + * @param type messages that are assigned this queue type will be removed + * @return the [Int] number of deleted entries */ @Modifying - fun deleteByType(type:String): Int + fun deleteByType(type: String): Int /** + * Delete a [QueueMessageDocument] by `uuid`. * + * @param uuid the UUID of the [QueueMessageDocument.uuid] to remove + * @return the number of removed entries (most likely one since the UUID is unique) */ @Modifying fun deleteByUuid(uuid: String): Int diff --git a/src/test/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/MongoMultiQueueTest.kt b/src/test/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/MongoMultiQueueTest.kt index 8e0e7fd..6e4eca7 100644 --- a/src/test/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/MongoMultiQueueTest.kt +++ b/src/test/kotlin/au/kilemon/messagequeue/queue/nosql/mongo/MongoMultiQueueTest.kt @@ -3,6 +3,7 @@ package au.kilemon.messagequeue.queue.nosql.mongo import au.kilemon.messagequeue.configuration.QueueConfiguration import au.kilemon.messagequeue.logging.LoggingConfiguration import au.kilemon.messagequeue.queue.AbstractMultiQueueTest +import au.kilemon.messagequeue.queue.nosql.mongo.MongoMultiQueueTest.Companion.MONGO_CONTAINER import au.kilemon.messagequeue.settings.MessageQueueSettings import org.junit.jupiter.api.AfterAll import org.junit.jupiter.api.Assertions @@ -20,6 +21,11 @@ import org.testcontainers.containers.GenericContainer import org.testcontainers.junit.jupiter.Testcontainers import org.testcontainers.utility.DockerImageName +/** + * A test class for the [MONGO_CONTAINER] to ensure the [MongoMultiQueue] works as expected with this underlying data storage DB. + * + * @author github.com/Kilemonn + */ @ExtendWith(SpringExtension::class) @Testcontainers @DataMongoTest(properties = ["${MessageQueueSettings.MULTI_QUEUE_TYPE}=MONGO"]) From 9ecaf3c5422c2ff7b20dc3dd96ff32113c201422 Mon Sep 17 00:00:00 2001 From: Kilemonn Date: Wed, 30 Aug 2023 23:23:42 +1000 Subject: [PATCH 10/10] Add example and credential disclaimer. --- README.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/README.md b/README.md index 2390d08..1625b75 100644 --- a/README.md +++ b/README.md @@ -181,6 +181,15 @@ The port that the mongo db has exposed. The whole url can be provided in the following format: `mongodb://:@:/` for example: `mongodb://root:password@localhost:27107/messagequeue`. +### Example: +***Note:** the use of `?authSource=admin` is to allow you to get up and running quickly, properly secured credentials and a non-admin account should always be used.* + +```yaml +environment: +- MULTI_QUEUE_TYPE=MONGO +- spring.data.mongodb.uri=mongodb://root:password@mongo:27017/messagequeue?authSource=admin +``` + --- ## HTTPS