diff --git a/helyos_server/src/event_handlers/rabbitmq_event_subscriber.js b/helyos_server/src/event_handlers/rabbitmq_event_subscriber.js index 4a81c759..21095a94 100644 --- a/helyos_server/src/event_handlers/rabbitmq_event_subscriber.js +++ b/helyos_server/src/event_handlers/rabbitmq_event_subscriber.js @@ -276,7 +276,7 @@ function handleBrokerMessages(channelOrQueue, message) { if (objMsg.obj.body){ return queryDataBase(uuid, objMsg.obj, msgProps); } else { - logData.addLog('agent', objMsg.obj, 'error', `agent data rquest: input body not found`); + logData.addLog('agent', objMsg.obj, 'error', `agent data request: input body not found`); } break; diff --git a/helyos_server/src/initialization.js b/helyos_server/src/initialization.js index 9500e147..a06ae815 100644 --- a/helyos_server/src/initialization.js +++ b/helyos_server/src/initialization.js @@ -161,7 +161,7 @@ async function configureRabbitMQSchema(dataChannels) { await secondaryChannel.bindQueue(YARD_VISUALIZATION_QUEUE, AGENTS_UL_EXCHANGE, "yard.*.visualization"); await secondaryChannel.bindQueue(YARD_VISUALIZATION_QUEUE, AGENT_MQTT_EXCHANGE, "yard.*.visualization"); - await rbmqServices.assertOrSubstituteQueue(mainChannel, AGENT_STATE_QUEUE, false, true); + await rbmqServices.assertOrSubstituteQueue(mainChannel, AGENT_STATE_QUEUE, false, true, {"x-message-ttl" : TTL_STATE_MSG}); await mainChannel.bindQueue(AGENT_STATE_QUEUE, AGENTS_UL_EXCHANGE, "*.*.state" ); await mainChannel.bindQueue(AGENT_STATE_QUEUE, AGENT_MQTT_EXCHANGE, "*.*.state" ); diff --git a/helyos_server/src/services/message_broker/rabbitMQ_services.js b/helyos_server/src/services/message_broker/rabbitMQ_services.js index b528e984..96955433 100644 --- a/helyos_server/src/services/message_broker/rabbitMQ_services.js +++ b/helyos_server/src/services/message_broker/rabbitMQ_services.js @@ -259,13 +259,14 @@ function createDebugQueues(agent) { async function assertOrSubstituteQueue(channel, queueName, exclusive, durable, arguments) { try { - const queueInfo = await rbmqServices.getQueueInfo(queueName); - const ttl = queueInfo.arguments['x-message-ttl']; - if (arguments && ttl !== arguments['x-message-ttl']) { - console.log("Queue TTL will be altered..."); + const queueInfo = await rbmqAccessLayer.getQueueInfo(queueName); + const ttl = parseInt(queueInfo.arguments['x-message-ttl']); + + if (arguments && arguments['x-message-ttl'] && ttl !== arguments['x-message-ttl']) { + console.log(`${queueName} queue TTL will be altered from ${ttl} to ${arguments['x-message-ttl']}.`); await channel.deleteQueue(queueName).catch(e => console.log(e)); console.log(`Queue ${queueName} deleted.`); - logData.addLog('helyos_core', null, 'warn', `Queue ${queueName} message-time-to-live was changed.`); + logData.addLog('helyos_core', null, 'warn', `Queue ${queueName} message-time-to-live was changed to ${arguments['x-message-ttl']}.`); } await channel.assertQueue(queueName, {exclusive: exclusive, durable: durable, arguments: arguments});