From fd59d7b792924a0c2e2a2090be373a36969b516b Mon Sep 17 00:00:00 2001 From: cviolbarbosa Date: Wed, 8 May 2024 08:53:02 +0200 Subject: [PATCH 1/2] use separated rbmq channel for visualization messages ensure rabbitmq connection is not broken due to queue assertion failure --- .../rabbitmq_event_subscriber.js | 4 +- helyos_server/src/initialization.js | 96 ++++++++++--------- helyos_server/src/main.js | 8 +- .../message_broker/rabbitMQ_access_layer.js | 11 +++ .../message_broker/rabbitMQ_services.js | 90 +++++++++++------ 5 files changed, 132 insertions(+), 77 deletions(-) diff --git a/helyos_server/src/event_handlers/rabbitmq_event_subscriber.js b/helyos_server/src/event_handlers/rabbitmq_event_subscriber.js index fabbac3f..74aa247e 100644 --- a/helyos_server/src/event_handlers/rabbitmq_event_subscriber.js +++ b/helyos_server/src/event_handlers/rabbitmq_event_subscriber.js @@ -241,7 +241,9 @@ function handleBrokerMessages(channelOrQueue, message) { // SENDER IS INDENTIFIED, CHECKED-IN, REGISTERED AND VALIDATED, LET'S NOW PROCESS THE MESSAGE... // Check if the agent is sending too many messages per second. - inMemDB.agents_stats[uuid]['msgPerSecond'].countMessage(); + if (inMemDB.agents_stats[uuid]) { + inMemDB.agents_stats[uuid]['msgPerSecond'].countMessage(); + } const avgRates = inMemDB.getHistoricalCountRateAverage('agents', uuid, 20); let closeConnection = false; diff --git a/helyos_server/src/initialization.js b/helyos_server/src/initialization.js index 28904239..2e0d45d8 100644 --- a/helyos_server/src/initialization.js +++ b/helyos_server/src/initialization.js @@ -7,7 +7,11 @@ const microserviceWatcher = require('./event_handlers/microservice_event_watcher const fs = require('fs'); const readYML = require('./modules/read_config_yml.js'); -const AGENT_IDLE_TIME_OFFLINE = process.env.AGENT_IDLE_TIME_OFFLINE || 10; +const AGENT_IDLE_TIME_OFFLINE = process.env.AGENT_IDLE_TIME_OFFLINE || 10; // Time of inactivity in seconds to consider an agent offline. +const PREFETCH_COUNT = parseInt(process.env.PREFETCH_COUNT) || 100; // Number of messages to prefetch from the broker. +const TTL_VISUAL_MSG = parseInt(process.env.TTL_VISUAL_MSG) || 2000; // Time to live for visualization messages in ms. +const TTL_STATE_MSG = parseInt(process.env.TTL_STATE_MSG) || 360000; // Time to live for state messages in ms. + const CREATE_RBMQ_ACCOUNTS = process.env.CREATE_RBMQ_ACCOUNTS || "True"; const { AGENTS_UL_EXCHANGE, AGENTS_DL_EXCHANGE, ANONYMOUS_EXCHANGE, AGENT_MQTT_EXCHANGE } = require('./services/message_broker/rabbitMQ_services.js'); const { CHECK_IN_QUEUE, AGENT_MISSION_QUEUE,AGENT_VISUALIZATION_QUEUE, AGENT_UPDATE_QUEUE, @@ -122,72 +126,78 @@ function initializeRabbitMQAccounts() { configureRabbitMQSchema() helyOS defines the topic exchanges and queues in the rabbitMQ schema. */ -async function configureRabbitMQSchema(dataChannel) { +async function configureRabbitMQSchema(dataChannels) { + const mainChannel = dataChannels[0]; + const secondaryChannel = dataChannels[1]; + await mainChannel.prefetch(PREFETCH_COUNT); + await secondaryChannel.prefetch(PREFETCH_COUNT); + console.log("===> Setting RabbitMQ Schema"); // SET EXCHANGE ANONYMOUS TO RECEIVE/SEND MESSAGES FROM/TO AGENT - await dataChannel.assertExchange(ANONYMOUS_EXCHANGE, 'topic', { durable: true }); - await dataChannel.assertQueue(CHECK_IN_QUEUE, {exclusive: false, durable: true } ); - await dataChannel.bindQueue(CHECK_IN_QUEUE, ANONYMOUS_EXCHANGE, "*.*.checkin" ); + await mainChannel.assertExchange(ANONYMOUS_EXCHANGE, 'topic', { durable: true }); + await rbmqServices.assertOrSubstituteQueue(mainChannel, CHECK_IN_QUEUE, false, true); + await mainChannel.bindQueue(CHECK_IN_QUEUE, ANONYMOUS_EXCHANGE, "*.*.checkin" ); // SET EXCHANGE "DOWN LINK" (DL) TO SEND MESSAGES TO AGENT - await dataChannel.assertExchange(AGENTS_DL_EXCHANGE, 'topic', { durable: true }); + await mainChannel.assertExchange(AGENTS_DL_EXCHANGE, 'topic', { durable: true }); // SET EXCHANGE "UP LINK" (UL) AND QUEUES TO RECEIVE MESSAGES FROM AGENT - await dataChannel.assertExchange(AGENTS_UL_EXCHANGE, 'topic', { durable: true }); + await mainChannel.assertExchange(AGENTS_UL_EXCHANGE, 'topic', { durable: true }); - // SET EXCHANGE FOR "MQTT" AGENTS AND QUEUES TO RECEIVE AND SEND MESSAGES TO AGENT - // ONE CANNOT ASCRIBE SEPARATED EXCHANGES FOR UL AND DL WHEN USE MQTT. - await dataChannel.assertExchange(AGENT_MQTT_EXCHANGE, 'topic', { durable: true }); + // SET EXCHANGE FOR "MQTT" AGENTS AND QUEUES TO RECEIVE AND SEND MESSAGES TO AGENT. No exchange is used for MQTT + await mainChannel.assertExchange(AGENT_MQTT_EXCHANGE, 'topic', { durable: true }); - await dataChannel.assertQueue(AGENT_UPDATE_QUEUE, {exclusive: false, durable:true, arguments: {"x-message-ttl" : 3600000}}); - await dataChannel.bindQueue(AGENT_UPDATE_QUEUE, AGENTS_UL_EXCHANGE, "*.*.update"); - await dataChannel.bindQueue(AGENT_UPDATE_QUEUE, AGENTS_UL_EXCHANGE, "*.*.fact_sheet"); - await dataChannel.bindQueue(AGENT_UPDATE_QUEUE, AGENT_MQTT_EXCHANGE, "*.*.update"); - await dataChannel.bindQueue(AGENT_UPDATE_QUEUE, AGENT_MQTT_EXCHANGE, "*.*.fact_sheet"); // VDA-5050 COMPATIBLE + await rbmqServices.assertOrSubstituteQueue(mainChannel, AGENT_UPDATE_QUEUE, false, true, {"x-message-ttl" : TTL_STATE_MSG}); + await mainChannel.bindQueue(AGENT_UPDATE_QUEUE, AGENTS_UL_EXCHANGE, "*.*.update"); + await mainChannel.bindQueue(AGENT_UPDATE_QUEUE, AGENTS_UL_EXCHANGE, "*.*.fact_sheet"); + await mainChannel.bindQueue(AGENT_UPDATE_QUEUE, AGENT_MQTT_EXCHANGE, "*.*.update"); + await mainChannel.bindQueue(AGENT_UPDATE_QUEUE, AGENT_MQTT_EXCHANGE, "*.*.fact_sheet"); - await dataChannel.assertQueue(AGENT_VISUALIZATION_QUEUE, {exclusive: false, durable:false, arguments: {"x-message-ttl" : 2000}}); - await dataChannel.bindQueue(AGENT_VISUALIZATION_QUEUE, AGENTS_UL_EXCHANGE, "agent.*.visualization"); - await dataChannel.bindQueue(AGENT_VISUALIZATION_QUEUE, AGENT_MQTT_EXCHANGE, "agent.*.visualization"); // VDA-5050 COMPATIBLE + await rbmqServices.assertOrSubstituteQueue(secondaryChannel, AGENT_VISUALIZATION_QUEUE, false, false, {"x-message-ttl" : TTL_VISUAL_MSG}); + await secondaryChannel.bindQueue(AGENT_VISUALIZATION_QUEUE, AGENTS_UL_EXCHANGE, "agent.*.visualization"); + await secondaryChannel.bindQueue(AGENT_VISUALIZATION_QUEUE, AGENT_MQTT_EXCHANGE, "agent.*.visualization"); - await dataChannel.assertQueue(YARD_VISUALIZATION_QUEUE, {exclusive: false, durable:false, arguments: {"x-message-ttl" : 2000}}); - await dataChannel.bindQueue(YARD_VISUALIZATION_QUEUE, AGENTS_UL_EXCHANGE, "yard.*.visualization"); - await dataChannel.bindQueue(YARD_VISUALIZATION_QUEUE, AGENT_MQTT_EXCHANGE, "yard.*.visualization"); + await rbmqServices.assertOrSubstituteQueue(secondaryChannel, YARD_VISUALIZATION_QUEUE, false, false, {"x-message-ttl" : TTL_VISUAL_MSG}); + await secondaryChannel.bindQueue(YARD_VISUALIZATION_QUEUE, AGENTS_UL_EXCHANGE, "yard.*.visualization"); + await secondaryChannel.bindQueue(YARD_VISUALIZATION_QUEUE, AGENT_MQTT_EXCHANGE, "yard.*.visualization"); - await dataChannel.assertQueue(AGENT_STATE_QUEUE, {exclusive: false, durable:true}); - await dataChannel.bindQueue(AGENT_STATE_QUEUE, AGENTS_UL_EXCHANGE, "*.*.state" ); - await dataChannel.bindQueue(AGENT_STATE_QUEUE, AGENT_MQTT_EXCHANGE, "*.*.state" ); // VDA-5050 COMPATIBLE + await rbmqServices.assertOrSubstituteQueue(mainChannel, AGENT_STATE_QUEUE, false, true); + await mainChannel.bindQueue(AGENT_STATE_QUEUE, AGENTS_UL_EXCHANGE, "*.*.state" ); + await mainChannel.bindQueue(AGENT_STATE_QUEUE, AGENT_MQTT_EXCHANGE, "*.*.state" ); - await dataChannel.assertQueue(AGENT_MISSION_QUEUE, {exclusive: false, durable:true}); - await dataChannel.bindQueue(AGENT_MISSION_QUEUE, AGENTS_UL_EXCHANGE, "*.*.mission_req" ); - await dataChannel.bindQueue(AGENT_MISSION_QUEUE, AGENT_MQTT_EXCHANGE, "*.*.mission_req" ); + await rbmqServices.assertOrSubstituteQueue(mainChannel, AGENT_MISSION_QUEUE, false, true); + await mainChannel.bindQueue(AGENT_MISSION_QUEUE, AGENTS_UL_EXCHANGE, "*.*.mission_req" ); + await mainChannel.bindQueue(AGENT_MISSION_QUEUE, AGENT_MQTT_EXCHANGE, "*.*.mission_req" ); - await dataChannel.bindQueue(CHECK_IN_QUEUE, AGENTS_UL_EXCHANGE, "*.*.checkin" ); - await dataChannel.bindQueue(CHECK_IN_QUEUE, AGENT_MQTT_EXCHANGE, "*.*.checkin" ); + await mainChannel.bindQueue(CHECK_IN_QUEUE, AGENTS_UL_EXCHANGE, "*.*.checkin" ); + await mainChannel.bindQueue(CHECK_IN_QUEUE, AGENT_MQTT_EXCHANGE, "*.*.checkin" ); - await dataChannel.assertQueue(SUMMARY_REQUESTS_QUEUE, {exclusive: false, durable:true}); - await dataChannel.bindQueue(SUMMARY_REQUESTS_QUEUE, AGENTS_UL_EXCHANGE, "*.*.database_req"); - await dataChannel.bindQueue(SUMMARY_REQUESTS_QUEUE, AGENTS_UL_EXCHANGE, "*.*.summary_req"); - await dataChannel.bindQueue(SUMMARY_REQUESTS_QUEUE, AGENTS_UL_EXCHANGE, "*.*.summary"); // MAGPIE COMPATIBLE + await rbmqServices.assertOrSubstituteQueue(mainChannel, SUMMARY_REQUESTS_QUEUE, false, true); + await mainChannel.bindQueue(SUMMARY_REQUESTS_QUEUE, AGENTS_UL_EXCHANGE, "*.*.database_req"); + await mainChannel.bindQueue(SUMMARY_REQUESTS_QUEUE, AGENTS_UL_EXCHANGE, "*.*.summary_req"); + await mainChannel.bindQueue(SUMMARY_REQUESTS_QUEUE, AGENTS_UL_EXCHANGE, "*.*.summary"); // MAGPIE COMPATIBLE console.log("===> RabbitMQ Schema Completed"); - return dataChannel; + return dataChannels; } - function helyosConsumingMessages (dataChannel) { + function helyosConsumingMessages (dataChannels) { + const mainChannel = dataChannels[0]; + const secondaryChannel = dataChannels[1]; console.log(" ================================================================") console.log(" ================= SUBSCRIBE TO HELYOS' QUEUES ==================") console.log(" ================================================================") - dataChannel.consume(CHECK_IN_QUEUE, (message) => handleBrokerMessages(CHECK_IN_QUEUE, message), { noAck: true}); - dataChannel.consume(AGENT_STATE_QUEUE, (message) => handleBrokerMessages(AGENT_STATE_QUEUE, message), { noAck: true}); - dataChannel.consume(AGENT_UPDATE_QUEUE, (message) => handleBrokerMessages(AGENT_UPDATE_QUEUE, message), { noAck: true}); - dataChannel.consume(AGENT_MISSION_QUEUE, (message) => handleBrokerMessages(AGENT_MISSION_QUEUE, message), { noAck: true}); - dataChannel.consume(AGENT_VISUALIZATION_QUEUE, (message) => handleBrokerMessages(AGENT_VISUALIZATION_QUEUE, message), { noAck: true}); - dataChannel.consume(YARD_VISUALIZATION_QUEUE, (message) => handleBrokerMessages(YARD_VISUALIZATION_QUEUE, message), { noAck: true}); + mainChannel.consume(CHECK_IN_QUEUE, (message) => handleBrokerMessages(CHECK_IN_QUEUE, message), { noAck: true}); + mainChannel.consume(AGENT_STATE_QUEUE, (message) => handleBrokerMessages(AGENT_STATE_QUEUE, message), { noAck: true}); + mainChannel.consume(AGENT_UPDATE_QUEUE, (message) => handleBrokerMessages(AGENT_UPDATE_QUEUE, message), { noAck: true}); + mainChannel.consume(AGENT_MISSION_QUEUE, (message) => handleBrokerMessages(AGENT_MISSION_QUEUE, message), { noAck: true}); + secondaryChannel.consume(AGENT_VISUALIZATION_QUEUE, (message) => handleBrokerMessages(AGENT_VISUALIZATION_QUEUE, message), { noAck: true}); + secondaryChannel.consume(YARD_VISUALIZATION_QUEUE, (message) => handleBrokerMessages(YARD_VISUALIZATION_QUEUE, message), { noAck: true}); - dataChannel.consume(SUMMARY_REQUESTS_QUEUE, (message) => handleBrokerMessages(SUMMARY_REQUESTS_QUEUE, message), { noAck: true}); - return dataChannel; + mainChannel.consume(SUMMARY_REQUESTS_QUEUE, (message) => handleBrokerMessages(SUMMARY_REQUESTS_QUEUE, message), { noAck: true}); + return dataChannels; } diff --git a/helyos_server/src/main.js b/helyos_server/src/main.js index 20aa0b2c..7ec89fba 100644 --- a/helyos_server/src/main.js +++ b/helyos_server/src/main.js @@ -97,11 +97,11 @@ const RabbitMQServices = require('./services/message_broker/rabbitMQ_services.js function connectToRabbitMQ () { return initialization.initializeRabbitMQAccounts() - .then(() => RabbitMQServices.connectAndOpenChannel({subscribe: false, connect: true, recoverCallback: initialization.helyosConsumingMessages}) ) - .then( dataChannel => { + .then(() => RabbitMQServices.connectAndOpenChannels({subscribe: false, connect: true, recoverCallback: initialization.helyosConsumingMessages}) ) + .then( dataChannels => { // SET RABBITMQ EXCHANGE/QUEUES SCHEMA AND THEN SUBSCRIBE TO QUEUES - initialization.configureRabbitMQSchema(dataChannel) - .then( dataChannel => initialization.helyosConsumingMessages(dataChannel)); + return initialization.configureRabbitMQSchema(dataChannels) + .then( dataChannels => initialization.helyosConsumingMessages(dataChannels)); }) } diff --git a/helyos_server/src/services/message_broker/rabbitMQ_access_layer.js b/helyos_server/src/services/message_broker/rabbitMQ_access_layer.js index e2bc759f..52dc0da2 100644 --- a/helyos_server/src/services/message_broker/rabbitMQ_access_layer.js +++ b/helyos_server/src/services/message_broker/rabbitMQ_access_layer.js @@ -110,6 +110,16 @@ const update_guest_account_permissions = (username) => !username? Promise.resolv }); +const getQueueInfo = (queueName) => { + return requestXHTTP + .get(`${API_PROTOCOL}://${RBMQ_HOST}:${RBMQ_API_PORT}/api/queues/%2F/${queueName}`) + .set( {'content-type': 'application/json' }).ca(RBMQ_CERTIFICATE) + .auth(RBMQ_ADMIN_USERNAME, RBMQ_ADMIN_PASSWORD) + .then(r => r.body); +} + + + const connect = amqp.connect; @@ -120,6 +130,7 @@ module.exports.create_rbmq_admin = create_rbmq_admin; module.exports.add_rbmq_user_vhost = add_rbmq_user_vhost; module.exports.listConnections = listConnections; module.exports.deleteConnections = deleteConnections; +module.exports.getQueueInfo = getQueueInfo; module.exports.connect = connect; diff --git a/helyos_server/src/services/message_broker/rabbitMQ_services.js b/helyos_server/src/services/message_broker/rabbitMQ_services.js index 07dac007..3a482015 100644 --- a/helyos_server/src/services/message_broker/rabbitMQ_services.js +++ b/helyos_server/src/services/message_broker/rabbitMQ_services.js @@ -65,18 +65,6 @@ try { -// function to sign string messages using crypto module and PSS padding: -function sign_message(message, privateKey) { - const sign = crypto.createSign('SHA256'); - sign.update(message); - sign.end(); - return sign.sign({ - key: privateKey, - padding: crypto.constants.RSA_PSS_SALTLEN_MAX_SIGN, - }); -} - - function verifyMessageSignature(message, publicKey, signature) { const verifier = crypto.createVerify('RSA-SHA256'); verifier.update(message, 'utf8'); @@ -125,17 +113,25 @@ const connect_as_guest_and_create_admin = () => rbmqAccessLayer.connect(urlObj(' // Initialize a RabbitMQ client and create channel -let mainChannelPromise; -function connectAndOpenChannel(options={}) { - console.log('connectAndOpenChannel', options) +let mainChannelPromise, secondaryChannelPromise; + +function getMainChannel(options={}) { if (mainChannelPromise && !options.connect) { - return mainChannelPromise.then(ch => { - if (options.subscribe) { - ch = options.recoverCallback(ch); - } - return ch - }); + return mainChannelPromise.then(ch => { + if (options.subscribe) { + ch = options.recoverCallback(ch); + } + return ch + }); + + } else { + return connectAndOpenChannels(options).then( channels => channels[0]); } +} + + +function connectAndOpenChannels(options={}) { + console.log('connectAndOpenChannels', options) return rbmqAccessLayer.connect(urlObj(), sslOptions) .then(conn => { @@ -148,10 +144,19 @@ function connectAndOpenChannel(options={}) { console.error("====================RABITMQ CONNECTION LOST ============="); console.error(err); console.error("========================================================="); - connectAndOpenChannel({subscribe: true, connect: true, recoverCallback:options.recoverCallback}); + connectAndOpenChannels({subscribe: true, connect: true, recoverCallback:options.recoverCallback}); }); - mainChannelPromise = conn.createChannel() // This is a Promise! + mainChannelPromise = conn.createChannel() + .then(ch => { + if (options.subscribe) { + ch = options.recoverCallback(ch); + } + return ch; + }); + + + secondaryChannelPromise = conn.createChannel() .then(ch => { if (options.subscribe) { ch = options.recoverCallback(ch); @@ -159,12 +164,12 @@ function connectAndOpenChannel(options={}) { return ch; }); - return mainChannelPromise; + return Promise.all([mainChannelPromise, secondaryChannelPromise]); }) .catch(e => { console.warn("\nWaiting AMQP server...\n\n"); const promiseSetTimeout = util.promisify(setTimeout); - return promiseSetTimeout(3000).then( () => connectAndOpenChannel({subscribe: false, connect: true})) + return promiseSetTimeout(3000).then( () => connectAndOpenChannels({subscribe: false, connect: true})) }); } @@ -206,7 +211,7 @@ function sendEncriptedMsg(queue, message, publicKey='', routingKey=null, exchang } - connectAndOpenChannel() + getMainChannel() .then( dataChannel => { const sign = crypto.createSign('SHA256'); sign.update(encryptedMsg); @@ -237,7 +242,7 @@ function sendEncriptedMsg(queue, message, publicKey='', routingKey=null, exchang function createDebugQueues(agent) { - connectAndOpenChannel({subscribe: false, connect: false} ).then( dataChannel => { + getMainChannel({subscribe: false, connect: false} ).then( dataChannel => { dataChannel.assertQueue(`tap-${agent.name}`, {durable:false, maxLength: 10}); dataChannel.bindQueue(`tap-${agent.name}`, AGENTS_DL_EXCHANGE, `*.${agent.uuid}.*` ); dataChannel.bindQueue(`tap-${agent.name}`, AGENTS_UL_EXCHANGE, `*.${agent.uuid}.*` ); @@ -252,8 +257,33 @@ function createDebugQueues(agent) { } +async function assertOrSubstituteQueue(channel, queueName, exclusive, durable, arguments) { + try { + const queueInfo = await rbmqServices.getQueueInfo(queueName); + const ttl = queueInfo.arguments['x-message-ttl']; + if (arguments && ttl !== arguments['x-message-ttl']) { + console.log("Queue TTL will be altered..."); + await channel.deleteQueue(queueName).catch(e => console.log(e)); + console.log(`Queue ${queueName} deleted.`); + logData.addLog('helyos_core', null, 'warn', `Queue ${queueName} message-time-to-live was changed.`); + } + + await channel.assertQueue(queueName, {exclusive: exclusive, durable: durable, arguments: arguments}); + console.log(`Queue ${queueName} asserted.`); + return true; + + } catch (error) { + console.log(`Queue ${queueName} not found. Creating...`); + await channel.assertQueue(queueName, {exclusive: exclusive, durable: durable, arguments: arguments}); + logData.addLog('helyos_core', null, 'info', `Queue ${queueName} was created.`); + return false; + + } +} + + function removeDebugQueues(agent) { - connectAndOpenChannel({subscribe: false, connect: false} ).then( dataChannel => { + getMainChannel({subscribe: false, connect: false} ).then( dataChannel => { dataChannel.deleteQueue(`tap-${agent.name}`); dataChannel.deleteQueue(`tap-cmd_to_${agent.name}`); }); @@ -270,7 +300,7 @@ function dispatchAllBufferedMessages(bufferPayload){ module.exports.connect = connect; -module.exports.connectAndOpenChannel = connectAndOpenChannel; +module.exports.connectAndOpenChannels = connectAndOpenChannels; module.exports.dispatchAllBufferedMessages = dispatchAllBufferedMessages; module.exports.sendEncriptedMsg = sendEncriptedMsg; module.exports.create_rbmq_user = rbmqAccessLayer.createUser; @@ -280,6 +310,8 @@ module.exports.connect_as_admin_and_create_accounts = connect_as_admin_and_creat module.exports.connect_as_guest_and_create_admin = connect_as_guest_and_create_admin; module.exports.createDebugQueues = createDebugQueues; module.exports.removeDebugQueues = removeDebugQueues; +module.exports.assertOrSubstituteQueue = assertOrSubstituteQueue; +module.exports.getQueueInfo = rbmqAccessLayer.getQueueInfo; module.exports.CHECK_IN_QUEUE = CHECK_IN_QUEUE; From b03a065f75100b09b67800343bcabdff27a81a2b Mon Sep 17 00:00:00 2001 From: cviolbarbosa Date: Thu, 16 May 2024 13:26:22 +0200 Subject: [PATCH 2/2] improving log messages --- .../microservice_assignment_result.js | 7 ++++--- .../update_event_handler.js | 2 +- .../rabbitmq_event_subscriber.js | 19 +++++++++++-------- helyos_server/src/initialization.js | 7 ++++--- .../communication/agent_communication.js | 1 - .../microservice_communication.js | 2 +- .../src/modules/microservice_orchestration.js | 11 +++++++++-- .../services/database/postg_access_layer.js | 1 - .../message_broker/rabbitMQ_services.js | 2 +- .../src/services/microservice_services.js | 10 +++++----- packaging/build.sh | 2 +- 11 files changed, 37 insertions(+), 27 deletions(-) diff --git a/helyos_server/src/event_handlers/microservice_event_handlers/microservice_assignment_result.js b/helyos_server/src/event_handlers/microservice_event_handlers/microservice_assignment_result.js index f0df220b..d9307d87 100644 --- a/helyos_server/src/event_handlers/microservice_event_handlers/microservice_assignment_result.js +++ b/helyos_server/src/event_handlers/microservice_event_handlers/microservice_assignment_result.js @@ -9,14 +9,15 @@ const databaseServices = require('../../services/database/database_services.js') async function createAssignment(workProcess, servResponse, serviceRequest){ - console.log("servResponse") - console.log(servResponse) const agentIds = workProcess.agent_ids; const serviceRequestId = serviceRequest? serviceRequest.id:null - console.log("workProcess",workProcess) + logData.addLog('helyos_core', {wproc_id: workProcess.id}, 'info', `Create assignment(s) using the response of ${serviceRequest.service_url}`); + console.log(`WORKPROCESS ${workProcess.id}: Create assignment(s) using the response of ${serviceRequest.service_url}`); + const yardId = workProcess.yard_id; + if (!workProcess.sched_start_at) { workProcess.sched_start_at = new Date(); } diff --git a/helyos_server/src/event_handlers/rabbitmq_event_handlers/update_event_handler.js b/helyos_server/src/event_handlers/rabbitmq_event_handlers/update_event_handler.js index 3e22a58c..23c6483b 100644 --- a/helyos_server/src/event_handlers/rabbitmq_event_handlers/update_event_handler.js +++ b/helyos_server/src/event_handlers/rabbitmq_event_handlers/update_event_handler.js @@ -136,7 +136,7 @@ async function agentAutoUpdate(objMsg, uuid, bufferPeriod=0) { async function connectFollowersToLeader (leaderUUID, followerUUIDs) { databaseServices.connectAgents(leaderUUID, followerUUIDs) .then((newConnectionIds) => { - logData.addLog('agent', {uuid: leaderUUID}, 'normal', `new connected followers # : ${newConnectionIds}` ); + logData.addLog('agent', {uuid: leaderUUID}, 'normal', `Followers connected to this agent # : ${newConnectionIds.length? newConnectionIds:'None'}` ); // Allow follower agents to be updated by the leader agent user account. const followerPatchs = followerUUIDs.map( uuid => ({uuid, rbmq_username: leaderUUID})); return databaseServices.agents.updateMany(followerPatchs,'uuid'); diff --git a/helyos_server/src/event_handlers/rabbitmq_event_subscriber.js b/helyos_server/src/event_handlers/rabbitmq_event_subscriber.js index 74aa247e..b1ee1e39 100644 --- a/helyos_server/src/event_handlers/rabbitmq_event_subscriber.js +++ b/helyos_server/src/event_handlers/rabbitmq_event_subscriber.js @@ -93,11 +93,12 @@ async function validateMessageSender(registeredAgent, uuid, objMsg, msgProps, ex console.log(`Agent ${uuid} is using the leader account ${possibleLeaderUUID}`); inMemDB.update('agents', 'uuid', {uuid, rbmq_username:possibleLeaderUUID}, new Date(), 'realtime'); } else { // OK, we did our best to validate you and you will be disconnected. - logData.addLog('agent', {uuid}, 'error', `Agent disconnected: RabbitMQ username ${agentAccount} does not match agent uuid or agent leader!`) - deleteConnections(agentAccount); + logData.addLog('agent', {uuid}, 'error', + `helyOS disconnected the agent: An agent is trying to publish a message for another agent. The RabbitMQ username ${agentAccount} does not match either its UUID or its leader's UUID, in case of connected agents.`) inMemDB.delete('agents', 'uuid', uuid); inMemDB.delete('agents', 'uuid', agentAccount); - throw Error(`RabbitMQ username ${agentAccount} does not match agent uuid or agent leader!`); + deleteConnections(agentAccount); + throw Error(`RabbitMQ username ${agentAccount} does not match either the agent's UUID or its leader's UUID.`); } } } @@ -112,7 +113,7 @@ async function validateMessageSender(registeredAgent, uuid, objMsg, msgProps, ex } } else { deleteConnections(uuid); - throw ({msg:`signature or public key-absent; ${uuid}`, code: 'AGENT-403'}); + throw ({msg:`Signature or public key-absent; ${uuid}`, code: 'AGENT-403'}); } } } @@ -253,15 +254,16 @@ function handleBrokerMessages(channelOrQueue, message) { } if (avgRates.avgUpdtPerSecond > MESSAGE_UPDATE_LIMIT) { - logData.addLog('agent', {uuid}, 'error', `Agent disconnected: high db updates per second. Check the publish - rate for agent.{uuid}.update, agent.{uuid}.state, agent.{uuid}.database_req routes`); + logData.addLog('agent', {uuid}, 'error', + `Agent disconnected: high number of database updates per second. Please check the publish rate for the routes agent.{uuid}.update, agent.{uuid}.state, and agent.{uuid}.database_req.`); + closeConnection = true; } if (closeConnection) { - deleteConnections(agentAccount); inMemDB.delete('agents', 'uuid', uuid); inMemDB.delete('agents', 'uuid', agentAccount); + deleteConnections(agentAccount).catch(e => console.log(e)); return; } @@ -327,7 +329,8 @@ function handleBrokerMessages(channelOrQueue, message) { console.log(error); } })().catch(error => { - logData.addLog('agent', {uuid}, 'error', JSON.stringify(error, Object.getOwnPropertyNames(error))); + const errorMsg = error.message? {message:error.message} : error; + logData.addLog('agent', {uuid}, 'error', JSON.stringify(errorMsg, Object.getOwnPropertyNames(errorMsg))); }); }; diff --git a/helyos_server/src/initialization.js b/helyos_server/src/initialization.js index 2e0d45d8..9500e147 100644 --- a/helyos_server/src/initialization.js +++ b/helyos_server/src/initialization.js @@ -185,9 +185,10 @@ async function configureRabbitMQSchema(dataChannels) { function helyosConsumingMessages (dataChannels) { const mainChannel = dataChannels[0]; const secondaryChannel = dataChannels[1]; - console.log(" ================================================================") - console.log(" ================= SUBSCRIBE TO HELYOS' QUEUES ==================") - console.log(" ================================================================") + console.log(`\n ================================================================`+ + `\n ================= SUBSCRIBE TO HELYOS' QUEUES ==================`+ + `\n ================================================================`); + mainChannel.consume(CHECK_IN_QUEUE, (message) => handleBrokerMessages(CHECK_IN_QUEUE, message), { noAck: true}); mainChannel.consume(AGENT_STATE_QUEUE, (message) => handleBrokerMessages(AGENT_STATE_QUEUE, message), { noAck: true}); mainChannel.consume(AGENT_UPDATE_QUEUE, (message) => handleBrokerMessages(AGENT_UPDATE_QUEUE, message), { noAck: true}); diff --git a/helyos_server/src/modules/communication/agent_communication.js b/helyos_server/src/modules/communication/agent_communication.js index f8eb61dd..897ec5a5 100644 --- a/helyos_server/src/modules/communication/agent_communication.js +++ b/helyos_server/src/modules/communication/agent_communication.js @@ -135,7 +135,6 @@ async function sendReleaseFromWorkProcessRequest(agentId, wpId) { body: { work_process_id: parseInt(wpId, 10), operation_types_required: [], // to be used in the future - work_process_id: parseInt(wpId), reserved: false }, _version: MESSAGE_VERSION diff --git a/helyos_server/src/modules/communication/microservice_communication.js b/helyos_server/src/modules/communication/microservice_communication.js index 2636c37e..1a6cdca5 100644 --- a/helyos_server/src/modules/communication/microservice_communication.js +++ b/helyos_server/src/modules/communication/microservice_communication.js @@ -57,7 +57,7 @@ const processMicroserviceRequest = (servRequestId) => { }) .catch(e => { const servResponse = e.data; - logData.addLog('microservice', servRequest, 'error', e ); + logData.addLog('microservice', servRequest, 'error', e.message ); return databaseServices.service_requests.updateByConditions({ 'id': servRequestId, 'status__in': [ SERVICE_STATUS.DISPATCHING_SERVICE, SERVICE_STATUS.WAIT_DEPENDENCIES, diff --git a/helyos_server/src/modules/microservice_orchestration.js b/helyos_server/src/modules/microservice_orchestration.js index 55ad8fc9..ea6f87a6 100644 --- a/helyos_server/src/modules/microservice_orchestration.js +++ b/helyos_server/src/modules/microservice_orchestration.js @@ -187,8 +187,15 @@ function createServiceRequestsForWorkProcessType(processType, request, agentIds, delete s['__depends_on_steps']; }); - console.log("\n\n(1)========= Create service requests for the process type "+ processType +" ==========" ); - console.log("service Requests from service matrix", serviceRequests); + console.log("\n\n========= Preparing microservice requests for the process type "+ processType +" ==========" ); + serviceRequests.forEach(s => { + console.log(`Request UID: ${s.request_uid}`); + console.log(`Step: ${s.step}`); + console.log(`Service Type: ${s.service_type}`); + console.log(`Service URL: ${s.service_url}`); + console.log(`Next Request UID: ${s.next_request_to_dispatch_uid}`); + console.log("--------------------"); + }); console.log("================================================================================"); return serviceRequests; diff --git a/helyos_server/src/services/database/postg_access_layer.js b/helyos_server/src/services/database/postg_access_layer.js index 8bf578af..06a24d6e 100644 --- a/helyos_server/src/services/database/postg_access_layer.js +++ b/helyos_server/src/services/database/postg_access_layer.js @@ -215,7 +215,6 @@ class DatabaseLayer { let colNames = [], colValues = [...condColValues], valueMasks = []; delete patch['id']; - console.log('colValues', colValues); Object.keys(patch).forEach((key, idx) => { colNames.push(key); colValues.push(patch[key]); diff --git a/helyos_server/src/services/message_broker/rabbitMQ_services.js b/helyos_server/src/services/message_broker/rabbitMQ_services.js index 3a482015..b528e984 100644 --- a/helyos_server/src/services/message_broker/rabbitMQ_services.js +++ b/helyos_server/src/services/message_broker/rabbitMQ_services.js @@ -176,7 +176,7 @@ function connectAndOpenChannels(options={}) { function sendEncriptedMsg(queue, message, publicKey='', routingKey=null, exchange=null, correlationId=null) { let encryptedMsg; - console.log(" =============== HelyOS is sending a message in rabbitMQ ===================") + console.log(" \n=============== helyOS core is sending a message in RabbitMQ ===================") switch (ENCRYPT) { case "agent": diff --git a/helyos_server/src/services/microservice_services.js b/helyos_server/src/services/microservice_services.js index 9b6d201b..67d0cc38 100644 --- a/helyos_server/src/services/microservice_services.js +++ b/helyos_server/src/services/microservice_services.js @@ -63,13 +63,13 @@ const sendRequestToService = (service_url, service_licence_key, request, context .send({request, context, config}) .then((res) => parseResponseToJson(res)) .catch((err)=> { - console.log("\n*************** Error - send request to external microservice *************\n"); - console.log(err) + console.log(`\n*************** Error - microservice ${service_url} *************\n`); + console.log(err.message) if(err.response) { err.data = parseResponseToJson(err.response); } else { if (err.code === 'ENOTFOUND') { - err.message = 'microservice is unreachable.'; + err.message = 'Microservice is unreachable.'; } } throw err; @@ -92,7 +92,7 @@ const getServiceResponse = (service_url, service_licence_key, jobId) => { } return data; }) - .catch((err)=> console.log('\n==== error in getResultPathPlanner===', err)); + .catch((err)=> console.log('\n==== Error in getting microservice response ===', err.message)); } @@ -106,7 +106,7 @@ const cancelService = (service_url, service_licence_key, jobId) => { .set('Accept', 'application/json') .set('Content-Type', 'application/json') .then((res) => res.body.toString()) - .catch((err)=> console.log('error in cancelPathPlanner', err)); + .catch((err)=> console.log('Error in canceling microservice', err.message)); } diff --git a/packaging/build.sh b/packaging/build.sh index c3275d0d..2c65abb2 100644 --- a/packaging/build.sh +++ b/packaging/build.sh @@ -1 +1 @@ -docker build --no-cache -f ./Dockerfile -t helyosframework/helyos_core:test .. \ No newline at end of file +docker build -f ./Dockerfile -t helyosframework/helyos_core:test .. \ No newline at end of file