Skip to content

Commit

Permalink
Recreate queues when TTL changes
Browse files Browse the repository at this point in the history
  • Loading branch information
cviolbarbosa committed May 27, 2024
1 parent 820b0a9 commit 44fbe42
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ function handleBrokerMessages(channelOrQueue, message) {
if (objMsg.obj.body){
return queryDataBase(uuid, objMsg.obj, msgProps);
} else {
logData.addLog('agent', objMsg.obj, 'error', `agent data rquest: input body not found`);
logData.addLog('agent', objMsg.obj, 'error', `agent data request: input body not found`);
}
break;

Expand Down
2 changes: 1 addition & 1 deletion helyos_server/src/initialization.js
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ async function configureRabbitMQSchema(dataChannels) {
await secondaryChannel.bindQueue(YARD_VISUALIZATION_QUEUE, AGENTS_UL_EXCHANGE, "yard.*.visualization");
await secondaryChannel.bindQueue(YARD_VISUALIZATION_QUEUE, AGENT_MQTT_EXCHANGE, "yard.*.visualization");

await rbmqServices.assertOrSubstituteQueue(mainChannel, AGENT_STATE_QUEUE, false, true);
await rbmqServices.assertOrSubstituteQueue(mainChannel, AGENT_STATE_QUEUE, false, true, {"x-message-ttl" : TTL_STATE_MSG});
await mainChannel.bindQueue(AGENT_STATE_QUEUE, AGENTS_UL_EXCHANGE, "*.*.state" );
await mainChannel.bindQueue(AGENT_STATE_QUEUE, AGENT_MQTT_EXCHANGE, "*.*.state" );

Expand Down
11 changes: 6 additions & 5 deletions helyos_server/src/services/message_broker/rabbitMQ_services.js
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,14 @@ function createDebugQueues(agent) {

async function assertOrSubstituteQueue(channel, queueName, exclusive, durable, arguments) {
try {
const queueInfo = await rbmqServices.getQueueInfo(queueName);
const ttl = queueInfo.arguments['x-message-ttl'];
if (arguments && ttl !== arguments['x-message-ttl']) {
console.log("Queue TTL will be altered...");
const queueInfo = await rbmqAccessLayer.getQueueInfo(queueName);
const ttl = parseInt(queueInfo.arguments['x-message-ttl']);

if (arguments && arguments['x-message-ttl'] && ttl !== arguments['x-message-ttl']) {
console.log(`${queueName} queue TTL will be altered from ${ttl} to ${arguments['x-message-ttl']}.`);
await channel.deleteQueue(queueName).catch(e => console.log(e));
console.log(`Queue ${queueName} deleted.`);
logData.addLog('helyos_core', null, 'warn', `Queue ${queueName} message-time-to-live was changed.`);
logData.addLog('helyos_core', null, 'warn', `Queue ${queueName} message-time-to-live was changed to ${arguments['x-message-ttl']}.`);
}

await channel.assertQueue(queueName, {exclusive: exclusive, durable: durable, arguments: arguments});
Expand Down

0 comments on commit 44fbe42

Please sign in to comment.