Skip to content

Commit

Permalink
Merge pull request #21 from Kilemonn/defer-index-to-underlying-storag…
Browse files Browse the repository at this point in the history
…e-medium

Remove internal map for queue indexes. Use the underlying medium to d…
  • Loading branch information
Kilemonn authored Sep 5, 2023
2 parents 61aa067 + c96580a commit 3e2dade
Show file tree
Hide file tree
Showing 13 changed files with 99 additions and 278 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,6 @@ class QueueConfiguration : HasLogger
}
}

if (!messageQueueSettings.multiQueueLazyInitialise.toBoolean())
{
LOG.trace("Lazy initialising is disabled, initialising queue indexes for existing messages now.")
queue.initialiseQueueIndex()
}
else
{
LOG.debug("Lazy initialise enabled with provided argument [{}], delaying initialisation.", messageQueueSettings.multiQueueLazyInitialise)
}

LOG.info("Initialising [{}] queue as the [{}] is set to [{}].", queue::class.java.name, MessageQueueSettings.MULTI_QUEUE_TYPE, messageQueueSettings.multiQueueType)

return queue
Expand Down
55 changes: 3 additions & 52 deletions src/main/kotlin/au/kilemon/messagequeue/queue/MultiQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -51,35 +51,7 @@ interface MultiQueue: Queue<QueueMessage>, HasLogger
*/

/**
* A holder for the current max [QueueMessage] index per sub-queue type.
* This is used to track the next ID of the queue index that should be set and gives order to the
* received messages as they are created in storage mechanisms that don't guarantee order.
*/
var maxQueueIndex: HashMap<String, AtomicLong>?

/**
* Lazy initialise the index for use in tests when the backing mechanism is not ready, but we are creating
* the [MultiQueue].
*
* This could be extended to provide a property to force lazy initialisation.
*/
fun getMaxQueueMap(): HashMap<String, AtomicLong>

/**
* Initialise the [maxQueueIndex] based on any existing [QueueMessage]s in the storage mechanism.
*/
fun initialiseQueueIndex()
{
maxQueueIndex = HashMap()
keys(false).forEach{ queueType ->
val queueForType = getQueueForType(queueType)
val maxIndex = queueForType.last().id
maxQueueIndex!![queueType] = AtomicLong(maxIndex?.plus(1) ?: 1)
}
}

/**
* Get the [maxQueueIndex] then increment it.
* Get the next queue index.
* If it does not exist yet, a default value of 1 will be set and returned.
*
* This can be overridden to return [Optional.EMPTY] to not override the ID of the
Expand All @@ -88,16 +60,7 @@ interface MultiQueue: Queue<QueueMessage>, HasLogger
*
* @return the current value of the index before it was incremented
*/
fun getAndIncrementQueueIndex(queueType: String): Optional<Long>
{
var index = getMaxQueueMap()[queueType]
if (index == null)
{
index = AtomicLong(1)
getMaxQueueMap()[queueType] = index
}
return Optional.of(index.getAndIncrement())
}
fun getNextQueueIndex(queueType: String): Optional<Long>

/**
* Used to persist the updated [QueueMessage] to the storage mechanism.
Expand Down Expand Up @@ -271,20 +234,9 @@ interface MultiQueue: Queue<QueueMessage>, HasLogger
*/
fun clearForType(queueType: String): Int
{
clearQueueIndexForType(queueType)
return clearForTypeInternal(queueType)
}

/**
* Clear the [MultiQueue.maxQueueIndex] entry matching the provided key [queueType].
*
* @param queueType the [String] of the [Queue] to clear
*/
fun clearQueueIndexForType(queueType: String)
{
getMaxQueueMap().remove(queueType)
}

/**
* Indicates whether the underlying [Queue] for the provided [String] is empty. By calling [Queue.isEmpty].
*
Expand Down Expand Up @@ -381,7 +333,7 @@ interface MultiQueue: Queue<QueueMessage>, HasLogger
{
if (element.id == null)
{
val index = getAndIncrementQueueIndex(element.type)
val index = getNextQueueIndex(element.type)
if (index.isPresent)
{
element.id = index.get()
Expand Down Expand Up @@ -531,7 +483,6 @@ interface MultiQueue: Queue<QueueMessage>, HasLogger
val amountRemovedForQueue = clearForType(key)
removedEntryCount += amountRemovedForQueue
}
getMaxQueueMap().clear()
LOG.debug("Cleared multi-queue, removed [{}] message entries over [{}] queue types.", removedEntryCount, keys)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,6 @@ class RedisMultiQueue(private val prefix: String = "", private val redisTemplate
{
override val LOG: Logger = initialiseLogger()

override var maxQueueIndex: HashMap<String, AtomicLong>? = null

override fun getMaxQueueMap(): HashMap<String, AtomicLong>
{
if (maxQueueIndex == null)
{
initialiseQueueIndex()
}

return maxQueueIndex!!
}

/**
* Append the [MessageQueueSettings.redisPrefix] to the provided [queueType] [String].
*
Expand Down Expand Up @@ -109,9 +97,17 @@ class RedisMultiQueue(private val prefix: String = "", private val redisTemplate
/**
* Overriding to pass in the [queueType] into [appendPrefix].
*/
override fun getAndIncrementQueueIndex(queueType: String): Optional<Long>
override fun getNextQueueIndex(queueType: String): Optional<Long>
{
return super.getAndIncrementQueueIndex(appendPrefix(queueType))
val queueForType = getQueueForType(appendPrefix(queueType))
return if (queueForType.isNotEmpty())
{
Optional.ofNullable(queueForType.last().id?.plus(1) ?: 1)
}
else
{
Optional.of(1)
}
}

override fun removeInternal(element: QueueMessage): Boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,23 @@ open class InMemoryMultiQueue : MultiQueue, HasLogger
*/
private val messageQueue: ConcurrentHashMap<String, Queue<QueueMessage>> = ConcurrentHashMap()

override var maxQueueIndex: HashMap<String, AtomicLong>? = null
private val maxQueueIndex: HashMap<String, AtomicLong> = HashMap()

override fun getMaxQueueMap(): HashMap<String, AtomicLong>
/**
* This index is special compared to the other types it will be incremented once retrieved. So we could be skipping
* indexes, but it should be fine since it's only used for message ordering.
*/
override fun getNextQueueIndex(queueType: String): Optional<Long>
{
if (maxQueueIndex == null)
var index = maxQueueIndex[queueType]
if (index == null)
{
initialiseQueueIndex()
index = AtomicLong(1)
maxQueueIndex[queueType] = index
}

return maxQueueIndex!!
return Optional.of(index.getAndIncrement())
}


override fun getQueueForType(queueType: String): Queue<QueueMessage>
{
var queueForType: Queue<QueueMessage>? = messageQueue[queueType]
Expand Down Expand Up @@ -82,6 +86,7 @@ open class InMemoryMultiQueue : MultiQueue, HasLogger
{
var amountRemoved = 0
val queueForType: Queue<QueueMessage>? = messageQueue[queueType]
maxQueueIndex.remove(queueType)
if (queueForType != null)
{
amountRemoved = queueForType.size
Expand All @@ -97,6 +102,12 @@ open class InMemoryMultiQueue : MultiQueue, HasLogger
return amountRemoved
}

override fun clear()
{
super.clear()
maxQueueIndex.clear()
}

@Throws(DuplicateMessageException::class)
override fun add(element: QueueMessage): Boolean
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,52 +28,10 @@ class MongoMultiQueue : MultiQueue, HasLogger

override val LOG: Logger = initialiseLogger()

override var maxQueueIndex: HashMap<String, AtomicLong>? = null

override fun getMaxQueueMap(): HashMap<String, AtomicLong>
{
if (maxQueueIndex == null)
{
initialiseQueueIndex()
}

return maxQueueIndex!!
}

@Lazy
@Autowired
private lateinit var queueMessageRepository: MongoQueueMessageRepository

/**
* Special initialisation for mongo, since the max ID is shared for all sub queues we need to set the [INDEX_ID]
* to the highest ID of all elements in all the queues.
*/
override fun initialiseQueueIndex()
{
maxQueueIndex = HashMap()
var maxIndex: Long? = null
keys(false).forEach{ queueType ->
val queueForType = getQueueForType(queueType)
val index = queueForType.last().id
if (index != null)
{
maxIndex = if (maxIndex == null)
{
index
}
else
{
Math.max(maxIndex!!, index)
}
}
}

if (maxIndex != null)
{
maxQueueIndex!![INDEX_ID] = AtomicLong(maxIndex!!)
}
}

override fun persistMessage(message: QueueMessage)
{
val queueMessageDocument = QueueMessageDocument(message)
Expand Down Expand Up @@ -179,27 +137,16 @@ class MongoMultiQueue : MultiQueue, HasLogger
* Overriding to use the constant [INDEX_ID] for all look-ups since the ID is shared and needs to be assigned to
* the [QueueMessageDocument] before it is created.
*/
override fun getAndIncrementQueueIndex(queueType: String): Optional<Long>
override fun getNextQueueIndex(queueType: String): Optional<Long>
{
return super.getAndIncrementQueueIndex(INDEX_ID)
}

/**
* Override to never clear the queue index for the type, since it's a shared index map.
*/
override fun clearQueueIndexForType(queueType: String)
{

}

/**
* Clear the [maxQueueIndex] if the entire map is cleared.
*
* Since [MongoMultiQueue.clearQueueIndexForType] is not clearing any of map entries.
*/
override fun clear()
{
super.clear()
getMaxQueueMap().clear()
val largestIdMessage = queueMessageRepository.findTopByOrderByIdDesc()
return if (largestIdMessage.isPresent)
{
Optional.ofNullable(largestIdMessage.get().id?.plus(1) ?: 1)
}
else
{
Optional.of(1)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,11 @@ interface MongoQueueMessageRepository: MongoRepository<QueueMessageDocument, Lon
*/
@Modifying
fun deleteByUuid(uuid: String): Int

/**
* Get the entry with the largest ID.
*
* @return the [QueueMessageDocument] with the largest ID, otherwise [Optional.empty]
*/
fun findTopByOrderByIdDesc(): Optional<QueueMessageDocument>
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,6 @@ class SqlMultiQueue : MultiQueue, HasLogger
@Autowired
private lateinit var queueMessageRepository: SQLQueueMessageRepository

override var maxQueueIndex: HashMap<String, AtomicLong>? = null

override fun getMaxQueueMap(): HashMap<String, AtomicLong>
{
if (maxQueueIndex == null)
{
initialiseQueueIndex()
}

return maxQueueIndex!!
}

/**
* Just initialise map, so it's not null, but the SQL [QueueMessage] ID is maintained by the database.
*/
override fun initialiseQueueIndex()
{
maxQueueIndex = HashMap()
}

override fun getQueueForType(queueType: String): Queue<QueueMessage>
{
val entries = queueMessageRepository.findByTypeOrderByIdAsc(queueType)
Expand Down Expand Up @@ -172,7 +152,7 @@ class SqlMultiQueue : MultiQueue, HasLogger
* Overriding to return [Optional.EMPTY] so that the [MultiQueue.add] does set an `id` into the [QueueMessage]
* even if the id is `null`.
*/
override fun getAndIncrementQueueIndex(queueType: String): Optional<Long>
override fun getNextQueueIndex(queueType: String): Optional<Long>
{
return Optional.empty()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ class MessageQueueSettings
{
const val MULTI_QUEUE_TYPE: String = "MULTI_QUEUE_TYPE"
const val MULTI_QUEUE_TYPE_DEFAULT: String = "IN_MEMORY"
const val MULTI_QUEUE_LAZY_INITIALISE: String = "MULTI_QUEUE_LAZY_INITIALISE"

/**
* Start redis related properties
Expand Down Expand Up @@ -80,18 +79,6 @@ class MessageQueueSettings
@set:Generated
lateinit var multiQueueType: String

/**
* `Optional` uses the [MULTI_QUEUE_TYPE] environment variable to determine where
* the underlying multi queue is persisted. It can be any value of [MultiQueueType].
* Defaults to [MultiQueueType.IN_MEMORY] ([MULTI_QUEUE_TYPE_DEFAULT]).
*/
@SerializedName(MULTI_QUEUE_LAZY_INITIALISE)
@JsonProperty(MULTI_QUEUE_LAZY_INITIALISE)
@Value("\${$MULTI_QUEUE_LAZY_INITIALISE:false}")
@get:Generated
@set:Generated
lateinit var multiQueueLazyInitialise: String


/**
* `Optional` when [MULTI_QUEUE_TYPE] is set to [MultiQueueType.REDIS].
Expand Down
Loading

0 comments on commit 3e2dade

Please sign in to comment.