Skip to content

Commit

Permalink
Merge pull request #18 from Kilemonn/add-support-for-mongo
Browse files Browse the repository at this point in the history
Add support for mongo
  • Loading branch information
Kilemonn authored Aug 30, 2023
2 parents 835ebac + 9ecaf3c commit fbacbc4
Show file tree
Hide file tree
Showing 18 changed files with 593 additions and 30 deletions.
58 changes: 55 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ The message storage mechanisms supported are:
- In-memory (default)
- Redis (stand alone and sentinel support)
- SQL Database (MySQL, PostgreSQL)

With plans to add support for the following mechanisms:
- NoSQL
- No SQL Database (Mongo)

## Rest API Documentation

Expand Down Expand Up @@ -138,6 +136,60 @@ environment:
- spring.datasource.password=5up3r5tR0nG!
```

## NO SQL (Mongo DB)

The application can be set into `MONGO` mode to interface with a NoSQL database. Similarly to the others you can set the type with `MULTI_QUEUE_TYPE=MONGO`.

### NoSQL Environment Properties

You can either specify all properties individually via, `spring.data.mongodb.host`, `spring.data.mongodb.port`, `spring.data.mongodb.database`, `spring.data.mongodb.username`, `spring.data.mongodb.password`.
Or you can provide all together in a single property: `spring.data.mongodb.uri`.

#### spring.data.mongodb.host

***This property is required unless `spring.data.mongodb.uri` is provided***.

This is the host that the mongo DB is accessible from.

#### spring.data.mongodb.database

***This property is required unless `spring.data.mongodb.uri` is provided***.

This is the database that should be connected to and where the related documents will be created.

#### spring.data.mongodb.username

***This property is required unless `spring.data.mongodb.uri` is provided***.

This is the username/account name used to access the database at the configured endpoint.

#### spring.data.mongodb.password

***This property is required unless `spring.data.mongodb.uri` is provided***.

This is the password used to access the database at the configured endpoint.

#### spring.data.mongodb.port

***This property is required unless `spring.data.mongodb.uri` is provided***.

The port that the mongo db has exposed.

#### spring.data.mongodb.uri

***This property is required unless the above properties are already provided***.

The whole url can be provided in the following format: `mongodb://<username>:<password>@<host>:<port>/<database>` for example: `mongodb://root:password@localhost:27107/messagequeue`.

### Example:
***Note:** the use of `?authSource=admin` is to allow you to get up and running quickly, properly secured credentials and a non-admin account should always be used.*

```yaml
environment:
- MULTI_QUEUE_TYPE=MONGO
- spring.data.mongodb.uri=mongodb://root:password@mongo:27017/messagequeue?authSource=admin
```

---

## HTTPS
Expand Down
5 changes: 5 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ dependencies {
// https://mvnrepository.com/artifact/org.postgresql/postgresql
implementation("org.postgresql:postgresql:42.5.1")

// No SQL drivers
// https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-data-mongodb
implementation("org.springframework.boot:spring-boot-starter-data-mongodb:3.1.3")

// Test dependencies
testImplementation("org.springframework.boot:spring-boot-starter-test:3.0.6")
// Required to mock MultiQueue objects since they apparently override a final 'remove(Object)' method.
testImplementation("org.mockito:mockito-inline:5.1.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import au.kilemon.messagequeue.message.QueueMessage
import au.kilemon.messagequeue.queue.MultiQueue
import au.kilemon.messagequeue.queue.cache.redis.RedisMultiQueue
import au.kilemon.messagequeue.queue.inmemory.InMemoryMultiQueue
import au.kilemon.messagequeue.queue.nosql.mongo.MongoMultiQueue
import au.kilemon.messagequeue.queue.sql.SqlMultiQueue
import au.kilemon.messagequeue.settings.MessageQueueSettings
import au.kilemon.messagequeue.settings.MultiQueueType
Expand Down Expand Up @@ -57,13 +58,16 @@ class QueueConfiguration : HasLogger

// Default to in-memory
var queue: MultiQueue = InMemoryMultiQueue()
if (MultiQueueType.REDIS.toString() == messageQueueSettings.multiQueueType)
{
queue = RedisMultiQueue(messageQueueSettings.redisPrefix, redisTemplate)
}
else if (MultiQueueType.SQL.toString() == messageQueueSettings.multiQueueType)
{
queue = SqlMultiQueue()
when (messageQueueSettings.multiQueueType) {
MultiQueueType.REDIS.toString() -> {
queue = RedisMultiQueue(messageQueueSettings.redisPrefix, redisTemplate)
}
MultiQueueType.SQL.toString() -> {
queue = SqlMultiQueue()
}
MultiQueueType.MONGO.toString() -> {
queue = MongoMultiQueue()
}
}
queue.initialiseQueueIndex()
LOG.info("Initialising [{}] queue as the [{}] is set to [{}].", queue::class.java.name, MessageQueueSettings.MULTI_QUEUE_TYPE, messageQueueSettings.multiQueueType)
Expand Down
11 changes: 11 additions & 0 deletions src/main/kotlin/au/kilemon/messagequeue/message/QueueMessage.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import javax.persistence.*
* This object wraps a [Any] type `T` which is the payload to be stored in the queue. (This is actually a [Serializable] but causes issues in initialisation
* if the type is an `interface`. This needs to be [Serializable] if you want to use it with `Redis` or anything else).
*
* This is used for `InMemory`, `Redis` and `SQL` queues.
*
* @author github.com/Kilemonn
*/
@Entity
Expand Down Expand Up @@ -48,6 +50,15 @@ class QueueMessage(payload: Any?, @Column(nullable = false) var type: String, @C
*/
constructor() : this(null, "")

constructor(queueMessageDocument: QueueMessageDocument) : this()
{
this.type = queueMessageDocument.type
this.uuid = queueMessageDocument.uuid
this.id = queueMessageDocument.id
this.payload = queueMessageDocument.payload
this.assignedTo = queueMessageDocument.assignedTo
}

/**
* When the [QueueMessage] is read back from a database serialised form, only the
* [QueueMessage.payloadBytes] will be persisted, [QueueMessage.payload] will still be `null` by default.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package au.kilemon.messagequeue.message

import com.fasterxml.jackson.annotation.JsonIgnore
import org.springframework.data.annotation.Id
import org.springframework.data.mongodb.core.mapping.Document
import org.springframework.util.SerializationUtils
import java.util.*
import javax.persistence.*

/**
* This is used for `No-SQL` queues.
*
* @author github.com/Kilemonn
*/
@Document(value = QueueMessageDocument.DOCUMENT_NAME)
class QueueMessageDocument(var payload: Any?, var type: String, var assignedTo: String? = null)
{
companion object
{
const val DOCUMENT_NAME: String = "multiqueuemessages"
}

var uuid: String = UUID.randomUUID().toString()

@JsonIgnore
@Id
var id: Long? = null

/**
* Required for JSON deserialisation.
*/
constructor() : this(null, "")

constructor(queueMessage: QueueMessage) : this()
{
val resolvedQueueMessage = queueMessage.resolvePayloadObject()
this.type = resolvedQueueMessage.type
this.uuid = resolvedQueueMessage.uuid
this.id = resolvedQueueMessage.id
this.payload = resolvedQueueMessage.payload
this.assignedTo = resolvedQueueMessage.assignedTo
}
}
12 changes: 11 additions & 1 deletion src/main/kotlin/au/kilemon/messagequeue/queue/MultiQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,20 @@ interface MultiQueue: Queue<QueueMessage>, HasLogger
*/
fun clearForType(queueType: String): Int
{
maxQueueIndex.remove(queueType)
clearQueueIndexForType(queueType)
return clearForTypeInternal(queueType)
}

/**
* Clear the [MultiQueue.maxQueueIndex] entry matching the provided key [queueType].
*
* @param queueType the [String] of the [Queue] to clear
*/
fun clearQueueIndexForType(queueType: String)
{
maxQueueIndex.remove(queueType)
}

/**
* Indicates whether the underlying [Queue] for the provided [String] is empty. By calling [Queue.isEmpty].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ package au.kilemon.messagequeue.queue.exception
*
* @author github.com/Kilemonn
*/
class MessageUpdateException(uuid: String) : Exception("Unable to update message with UUID [$uuid] as it either does not exist (and cannot be updated) or there was an underlying error in the storage mechanism.")
class MessageUpdateException(uuid: String, exception: Exception? = null) : Exception("Unable to update message with UUID [$uuid] as it either does not exist (and cannot be updated) or there was an underlying error in the storage mechanism.", exception)
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package au.kilemon.messagequeue.queue.nosql.mongo

import au.kilemon.messagequeue.logging.HasLogger
import au.kilemon.messagequeue.message.QueueMessage
import au.kilemon.messagequeue.message.QueueMessageDocument
import au.kilemon.messagequeue.queue.MultiQueue
import au.kilemon.messagequeue.queue.exception.MessageUpdateException
import au.kilemon.messagequeue.queue.nosql.mongo.repository.MongoQueueMessageRepository
import org.slf4j.Logger
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.annotation.Lazy
import java.util.*
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicLong

/**
* A NoSql mongo backed [MultiQueue]. All operations are performed directly on the database it is the complete source of truth.
* It allows the messages to never go out of sync in a case where there are multiple [MultiQueue]s working on the same data source.
*
* @author github.com/Kilemonn
*/
class MongoMultiQueue : MultiQueue, HasLogger
{
companion object
{
const val INDEX_ID = "index_id"
}

override val LOG: Logger = initialiseLogger()

override lateinit var maxQueueIndex: HashMap<String, AtomicLong>

@Lazy
@Autowired
private lateinit var queueMessageRepository: MongoQueueMessageRepository

/**
* Just initialise map, so it's not null, but the SQL [QueueMessage] ID is maintained by the database.
*/
override fun initialiseQueueIndex()
{
maxQueueIndex = HashMap()
}

override fun persistMessage(message: QueueMessage)
{
val queueMessageDocument = QueueMessageDocument(message)
try
{
queueMessageRepository.save(queueMessageDocument)
}
catch (ex: Exception)
{
throw MessageUpdateException(message.uuid, ex)
}
}

override fun getQueueForType(queueType: String): Queue<QueueMessage>
{
val entries = queueMessageRepository.findByTypeOrderByIdAsc(queueType)
return ConcurrentLinkedQueue(entries.map { entry -> QueueMessage(entry) })
}

override fun performHealthCheckInternal()
{
queueMessageRepository.existsById(1)
}

override fun getMessageByUUID(uuid: String): Optional<QueueMessage>
{
val documentMessage = queueMessageRepository.findByUuid(uuid)
return if (documentMessage.isPresent)
{
Optional.of(QueueMessage(documentMessage.get()))
}
else
{
Optional.empty()
}
}

override fun clearForTypeInternal(queueType: String): Int
{
val amountCleared = queueMessageRepository.deleteByType(queueType)
LOG.debug("Cleared existing queue for type [{}]. Removed [{}] message entries.", queueType, amountCleared)
return amountCleared
}

override fun isEmptyForType(queueType: String): Boolean
{
return queueMessageRepository.findByTypeOrderByIdAsc(queueType).isEmpty()
}

override fun pollInternal(queueType: String): Optional<QueueMessage>
{
val messages = queueMessageRepository.findByTypeOrderByIdAsc(queueType)
return if (messages.isNotEmpty())
{
return Optional.of(QueueMessage(messages[0]))
}
else
{
Optional.empty()
}
}

/**
* The [includeEmpty] value makes no difference it is always effectively `false`.
*/
override fun keys(includeEmpty: Boolean): Set<String>
{
val keySet = queueMessageRepository.getDistinctTypes().toSet()
LOG.debug("Total amount of queue keys [{}].", keySet.size)
return keySet
}

override fun containsUUID(uuid: String): Optional<String>
{
val optionalMessage = queueMessageRepository.findByUuid(uuid)
return if (optionalMessage.isPresent)
{
val message = optionalMessage.get()
LOG.debug("Found queue type [{}] for UUID: [{}].", message.type, uuid)
Optional.of(message.type)
}
else
{
LOG.debug("No queue type exists for UUID: [{}].", uuid)
Optional.empty()
}
}

override fun addInternal(element: QueueMessage): Boolean
{
val queueMessageDocument = QueueMessageDocument(element)
val saved = queueMessageRepository.save(queueMessageDocument)
return saved.id != null
}

override fun removeInternal(element: QueueMessage): Boolean
{
val removedCount = queueMessageRepository.deleteByUuid(element.uuid)
return removedCount > 0
}

/**
* Overriding to use the constant [INDEX_ID] for all look-ups since the ID is shared and needs to be assigned to
* the [QueueMessageDocument] before it is created.
*/
override fun getAndIncrementQueueIndex(queueType: String): Optional<Long>
{
return super.getAndIncrementQueueIndex(INDEX_ID)
}

/**
* Override to never clear the queue index for the type, since it's a shared index map.
*/
override fun clearQueueIndexForType(queueType: String)
{

}

/**
* Clear the [maxQueueIndex] if the entire map is cleared.
*
*
* Since [clearQueueIndexForType] is not clearing any of map entries.
*/
override fun clear()
{
super.clear()
maxQueueIndex.clear()
}
}
Loading

0 comments on commit fbacbc4

Please sign in to comment.