Skip to content

Commit

Permalink
Merge pull request #16 from helyOSFramework/improve_rbmq_dataflow
Browse files Browse the repository at this point in the history
Improve RabbitMQ messages dataflow
  • Loading branch information
cviolbarbosa authored May 16, 2024
2 parents bc66e98 + b03a065 commit b1b7343
Show file tree
Hide file tree
Showing 13 changed files with 169 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ const databaseServices = require('../../services/database/database_services.js')


async function createAssignment(workProcess, servResponse, serviceRequest){
console.log("servResponse")
console.log(servResponse)

const agentIds = workProcess.agent_ids;
const serviceRequestId = serviceRequest? serviceRequest.id:null
console.log("workProcess",workProcess)
logData.addLog('helyos_core', {wproc_id: workProcess.id}, 'info', `Create assignment(s) using the response of ${serviceRequest.service_url}`);
console.log(`WORKPROCESS ${workProcess.id}: Create assignment(s) using the response of ${serviceRequest.service_url}`);

const yardId = workProcess.yard_id;


if (!workProcess.sched_start_at) {
workProcess.sched_start_at = new Date();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ async function agentAutoUpdate(objMsg, uuid, bufferPeriod=0) {
async function connectFollowersToLeader (leaderUUID, followerUUIDs) {
databaseServices.connectAgents(leaderUUID, followerUUIDs)
.then((newConnectionIds) => {
logData.addLog('agent', {uuid: leaderUUID}, 'normal', `new connected followers # : ${newConnectionIds}` );
logData.addLog('agent', {uuid: leaderUUID}, 'normal', `Followers connected to this agent # : ${newConnectionIds.length? newConnectionIds:'None'}` );
// Allow follower agents to be updated by the leader agent user account.
const followerPatchs = followerUUIDs.map( uuid => ({uuid, rbmq_username: leaderUUID}));
return databaseServices.agents.updateMany(followerPatchs,'uuid');
Expand Down
23 changes: 14 additions & 9 deletions helyos_server/src/event_handlers/rabbitmq_event_subscriber.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,12 @@ async function validateMessageSender(registeredAgent, uuid, objMsg, msgProps, ex
console.log(`Agent ${uuid} is using the leader account ${possibleLeaderUUID}`);
inMemDB.update('agents', 'uuid', {uuid, rbmq_username:possibleLeaderUUID}, new Date(), 'realtime');
} else { // OK, we did our best to validate you and you will be disconnected.
logData.addLog('agent', {uuid}, 'error', `Agent disconnected: RabbitMQ username ${agentAccount} does not match agent uuid or agent leader!`)
deleteConnections(agentAccount);
logData.addLog('agent', {uuid}, 'error',
`helyOS disconnected the agent: An agent is trying to publish a message for another agent. The RabbitMQ username ${agentAccount} does not match either its UUID or its leader's UUID, in case of connected agents.`)
inMemDB.delete('agents', 'uuid', uuid);
inMemDB.delete('agents', 'uuid', agentAccount);
throw Error(`RabbitMQ username ${agentAccount} does not match agent uuid or agent leader!`);
deleteConnections(agentAccount);
throw Error(`RabbitMQ username ${agentAccount} does not match either the agent's UUID or its leader's UUID.`);
}
}
}
Expand All @@ -112,7 +113,7 @@ async function validateMessageSender(registeredAgent, uuid, objMsg, msgProps, ex
}
} else {
deleteConnections(uuid);
throw ({msg:`signature or public key-absent; ${uuid}`, code: 'AGENT-403'});
throw ({msg:`Signature or public key-absent; ${uuid}`, code: 'AGENT-403'});
}
}
}
Expand Down Expand Up @@ -241,7 +242,9 @@ function handleBrokerMessages(channelOrQueue, message) {
// SENDER IS INDENTIFIED, CHECKED-IN, REGISTERED AND VALIDATED, LET'S NOW PROCESS THE MESSAGE...

// Check if the agent is sending too many messages per second.
inMemDB.agents_stats[uuid]['msgPerSecond'].countMessage();
if (inMemDB.agents_stats[uuid]) {
inMemDB.agents_stats[uuid]['msgPerSecond'].countMessage();
}
const avgRates = inMemDB.getHistoricalCountRateAverage('agents', uuid, 20);
let closeConnection = false;

Expand All @@ -251,15 +254,16 @@ function handleBrokerMessages(channelOrQueue, message) {
}

if (avgRates.avgUpdtPerSecond > MESSAGE_UPDATE_LIMIT) {
logData.addLog('agent', {uuid}, 'error', `Agent disconnected: high db updates per second. Check the publish
rate for agent.{uuid}.update, agent.{uuid}.state, agent.{uuid}.database_req routes`);
logData.addLog('agent', {uuid}, 'error',
`Agent disconnected: high number of database updates per second. Please check the publish rate for the routes agent.{uuid}.update, agent.{uuid}.state, and agent.{uuid}.database_req.`);

closeConnection = true;
}

if (closeConnection) {
deleteConnections(agentAccount);
inMemDB.delete('agents', 'uuid', uuid);
inMemDB.delete('agents', 'uuid', agentAccount);
deleteConnections(agentAccount).catch(e => console.log(e));
return;
}

Expand Down Expand Up @@ -325,7 +329,8 @@ function handleBrokerMessages(channelOrQueue, message) {
console.log(error);
}
})().catch(error => {
logData.addLog('agent', {uuid}, 'error', JSON.stringify(error, Object.getOwnPropertyNames(error)));
const errorMsg = error.message? {message:error.message} : error;
logData.addLog('agent', {uuid}, 'error', JSON.stringify(errorMsg, Object.getOwnPropertyNames(errorMsg)));
});
};

Expand Down
103 changes: 57 additions & 46 deletions helyos_server/src/initialization.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ const microserviceWatcher = require('./event_handlers/microservice_event_watcher
const fs = require('fs');
const readYML = require('./modules/read_config_yml.js');

const AGENT_IDLE_TIME_OFFLINE = process.env.AGENT_IDLE_TIME_OFFLINE || 10;
const AGENT_IDLE_TIME_OFFLINE = process.env.AGENT_IDLE_TIME_OFFLINE || 10; // Time of inactivity in seconds to consider an agent offline.
const PREFETCH_COUNT = parseInt(process.env.PREFETCH_COUNT) || 100; // Number of messages to prefetch from the broker.
const TTL_VISUAL_MSG = parseInt(process.env.TTL_VISUAL_MSG) || 2000; // Time to live for visualization messages in ms.
const TTL_STATE_MSG = parseInt(process.env.TTL_STATE_MSG) || 360000; // Time to live for state messages in ms.

const CREATE_RBMQ_ACCOUNTS = process.env.CREATE_RBMQ_ACCOUNTS || "True";
const { AGENTS_UL_EXCHANGE, AGENTS_DL_EXCHANGE, ANONYMOUS_EXCHANGE, AGENT_MQTT_EXCHANGE } = require('./services/message_broker/rabbitMQ_services.js');
const { CHECK_IN_QUEUE, AGENT_MISSION_QUEUE,AGENT_VISUALIZATION_QUEUE, AGENT_UPDATE_QUEUE,
Expand Down Expand Up @@ -122,72 +126,79 @@ function initializeRabbitMQAccounts() {
configureRabbitMQSchema()
helyOS defines the topic exchanges and queues in the rabbitMQ schema.
*/
async function configureRabbitMQSchema(dataChannel) {
async function configureRabbitMQSchema(dataChannels) {
const mainChannel = dataChannels[0];
const secondaryChannel = dataChannels[1];
await mainChannel.prefetch(PREFETCH_COUNT);
await secondaryChannel.prefetch(PREFETCH_COUNT);

console.log("===> Setting RabbitMQ Schema");
// SET EXCHANGE ANONYMOUS TO RECEIVE/SEND MESSAGES FROM/TO AGENT
await dataChannel.assertExchange(ANONYMOUS_EXCHANGE, 'topic', { durable: true });
await dataChannel.assertQueue(CHECK_IN_QUEUE, {exclusive: false, durable: true } );
await dataChannel.bindQueue(CHECK_IN_QUEUE, ANONYMOUS_EXCHANGE, "*.*.checkin" );
await mainChannel.assertExchange(ANONYMOUS_EXCHANGE, 'topic', { durable: true });
await rbmqServices.assertOrSubstituteQueue(mainChannel, CHECK_IN_QUEUE, false, true);
await mainChannel.bindQueue(CHECK_IN_QUEUE, ANONYMOUS_EXCHANGE, "*.*.checkin" );

// SET EXCHANGE "DOWN LINK" (DL) TO SEND MESSAGES TO AGENT
await dataChannel.assertExchange(AGENTS_DL_EXCHANGE, 'topic', { durable: true });
await mainChannel.assertExchange(AGENTS_DL_EXCHANGE, 'topic', { durable: true });

// SET EXCHANGE "UP LINK" (UL) AND QUEUES TO RECEIVE MESSAGES FROM AGENT
await dataChannel.assertExchange(AGENTS_UL_EXCHANGE, 'topic', { durable: true });
await mainChannel.assertExchange(AGENTS_UL_EXCHANGE, 'topic', { durable: true });

// SET EXCHANGE FOR "MQTT" AGENTS AND QUEUES TO RECEIVE AND SEND MESSAGES TO AGENT
// ONE CANNOT ASCRIBE SEPARATED EXCHANGES FOR UL AND DL WHEN USE MQTT.
await dataChannel.assertExchange(AGENT_MQTT_EXCHANGE, 'topic', { durable: true });
// SET EXCHANGE FOR "MQTT" AGENTS AND QUEUES TO RECEIVE AND SEND MESSAGES TO AGENT. No exchange is used for MQTT
await mainChannel.assertExchange(AGENT_MQTT_EXCHANGE, 'topic', { durable: true });

await dataChannel.assertQueue(AGENT_UPDATE_QUEUE, {exclusive: false, durable:true, arguments: {"x-message-ttl" : 3600000}});
await dataChannel.bindQueue(AGENT_UPDATE_QUEUE, AGENTS_UL_EXCHANGE, "*.*.update");
await dataChannel.bindQueue(AGENT_UPDATE_QUEUE, AGENTS_UL_EXCHANGE, "*.*.fact_sheet");
await dataChannel.bindQueue(AGENT_UPDATE_QUEUE, AGENT_MQTT_EXCHANGE, "*.*.update");
await dataChannel.bindQueue(AGENT_UPDATE_QUEUE, AGENT_MQTT_EXCHANGE, "*.*.fact_sheet"); // VDA-5050 COMPATIBLE
await rbmqServices.assertOrSubstituteQueue(mainChannel, AGENT_UPDATE_QUEUE, false, true, {"x-message-ttl" : TTL_STATE_MSG});
await mainChannel.bindQueue(AGENT_UPDATE_QUEUE, AGENTS_UL_EXCHANGE, "*.*.update");
await mainChannel.bindQueue(AGENT_UPDATE_QUEUE, AGENTS_UL_EXCHANGE, "*.*.fact_sheet");
await mainChannel.bindQueue(AGENT_UPDATE_QUEUE, AGENT_MQTT_EXCHANGE, "*.*.update");
await mainChannel.bindQueue(AGENT_UPDATE_QUEUE, AGENT_MQTT_EXCHANGE, "*.*.fact_sheet");

await dataChannel.assertQueue(AGENT_VISUALIZATION_QUEUE, {exclusive: false, durable:false, arguments: {"x-message-ttl" : 2000}});
await dataChannel.bindQueue(AGENT_VISUALIZATION_QUEUE, AGENTS_UL_EXCHANGE, "agent.*.visualization");
await dataChannel.bindQueue(AGENT_VISUALIZATION_QUEUE, AGENT_MQTT_EXCHANGE, "agent.*.visualization"); // VDA-5050 COMPATIBLE
await rbmqServices.assertOrSubstituteQueue(secondaryChannel, AGENT_VISUALIZATION_QUEUE, false, false, {"x-message-ttl" : TTL_VISUAL_MSG});
await secondaryChannel.bindQueue(AGENT_VISUALIZATION_QUEUE, AGENTS_UL_EXCHANGE, "agent.*.visualization");
await secondaryChannel.bindQueue(AGENT_VISUALIZATION_QUEUE, AGENT_MQTT_EXCHANGE, "agent.*.visualization");

await dataChannel.assertQueue(YARD_VISUALIZATION_QUEUE, {exclusive: false, durable:false, arguments: {"x-message-ttl" : 2000}});
await dataChannel.bindQueue(YARD_VISUALIZATION_QUEUE, AGENTS_UL_EXCHANGE, "yard.*.visualization");
await dataChannel.bindQueue(YARD_VISUALIZATION_QUEUE, AGENT_MQTT_EXCHANGE, "yard.*.visualization");
await rbmqServices.assertOrSubstituteQueue(secondaryChannel, YARD_VISUALIZATION_QUEUE, false, false, {"x-message-ttl" : TTL_VISUAL_MSG});
await secondaryChannel.bindQueue(YARD_VISUALIZATION_QUEUE, AGENTS_UL_EXCHANGE, "yard.*.visualization");
await secondaryChannel.bindQueue(YARD_VISUALIZATION_QUEUE, AGENT_MQTT_EXCHANGE, "yard.*.visualization");

await dataChannel.assertQueue(AGENT_STATE_QUEUE, {exclusive: false, durable:true});
await dataChannel.bindQueue(AGENT_STATE_QUEUE, AGENTS_UL_EXCHANGE, "*.*.state" );
await dataChannel.bindQueue(AGENT_STATE_QUEUE, AGENT_MQTT_EXCHANGE, "*.*.state" ); // VDA-5050 COMPATIBLE
await rbmqServices.assertOrSubstituteQueue(mainChannel, AGENT_STATE_QUEUE, false, true);
await mainChannel.bindQueue(AGENT_STATE_QUEUE, AGENTS_UL_EXCHANGE, "*.*.state" );
await mainChannel.bindQueue(AGENT_STATE_QUEUE, AGENT_MQTT_EXCHANGE, "*.*.state" );

await dataChannel.assertQueue(AGENT_MISSION_QUEUE, {exclusive: false, durable:true});
await dataChannel.bindQueue(AGENT_MISSION_QUEUE, AGENTS_UL_EXCHANGE, "*.*.mission_req" );
await dataChannel.bindQueue(AGENT_MISSION_QUEUE, AGENT_MQTT_EXCHANGE, "*.*.mission_req" );
await rbmqServices.assertOrSubstituteQueue(mainChannel, AGENT_MISSION_QUEUE, false, true);
await mainChannel.bindQueue(AGENT_MISSION_QUEUE, AGENTS_UL_EXCHANGE, "*.*.mission_req" );
await mainChannel.bindQueue(AGENT_MISSION_QUEUE, AGENT_MQTT_EXCHANGE, "*.*.mission_req" );

await dataChannel.bindQueue(CHECK_IN_QUEUE, AGENTS_UL_EXCHANGE, "*.*.checkin" );
await dataChannel.bindQueue(CHECK_IN_QUEUE, AGENT_MQTT_EXCHANGE, "*.*.checkin" );
await mainChannel.bindQueue(CHECK_IN_QUEUE, AGENTS_UL_EXCHANGE, "*.*.checkin" );
await mainChannel.bindQueue(CHECK_IN_QUEUE, AGENT_MQTT_EXCHANGE, "*.*.checkin" );

await dataChannel.assertQueue(SUMMARY_REQUESTS_QUEUE, {exclusive: false, durable:true});
await dataChannel.bindQueue(SUMMARY_REQUESTS_QUEUE, AGENTS_UL_EXCHANGE, "*.*.database_req");
await dataChannel.bindQueue(SUMMARY_REQUESTS_QUEUE, AGENTS_UL_EXCHANGE, "*.*.summary_req");
await dataChannel.bindQueue(SUMMARY_REQUESTS_QUEUE, AGENTS_UL_EXCHANGE, "*.*.summary"); // MAGPIE COMPATIBLE
await rbmqServices.assertOrSubstituteQueue(mainChannel, SUMMARY_REQUESTS_QUEUE, false, true);
await mainChannel.bindQueue(SUMMARY_REQUESTS_QUEUE, AGENTS_UL_EXCHANGE, "*.*.database_req");
await mainChannel.bindQueue(SUMMARY_REQUESTS_QUEUE, AGENTS_UL_EXCHANGE, "*.*.summary_req");
await mainChannel.bindQueue(SUMMARY_REQUESTS_QUEUE, AGENTS_UL_EXCHANGE, "*.*.summary"); // MAGPIE COMPATIBLE
console.log("===> RabbitMQ Schema Completed");

return dataChannel;
return dataChannels;
}


function helyosConsumingMessages (dataChannel) {
console.log(" ================================================================")
console.log(" ================= SUBSCRIBE TO HELYOS' QUEUES ==================")
console.log(" ================================================================")
dataChannel.consume(CHECK_IN_QUEUE, (message) => handleBrokerMessages(CHECK_IN_QUEUE, message), { noAck: true});
dataChannel.consume(AGENT_STATE_QUEUE, (message) => handleBrokerMessages(AGENT_STATE_QUEUE, message), { noAck: true});
dataChannel.consume(AGENT_UPDATE_QUEUE, (message) => handleBrokerMessages(AGENT_UPDATE_QUEUE, message), { noAck: true});
dataChannel.consume(AGENT_MISSION_QUEUE, (message) => handleBrokerMessages(AGENT_MISSION_QUEUE, message), { noAck: true});
dataChannel.consume(AGENT_VISUALIZATION_QUEUE, (message) => handleBrokerMessages(AGENT_VISUALIZATION_QUEUE, message), { noAck: true});
dataChannel.consume(YARD_VISUALIZATION_QUEUE, (message) => handleBrokerMessages(YARD_VISUALIZATION_QUEUE, message), { noAck: true});
function helyosConsumingMessages (dataChannels) {
const mainChannel = dataChannels[0];
const secondaryChannel = dataChannels[1];
console.log(`\n ================================================================`+
`\n ================= SUBSCRIBE TO HELYOS' QUEUES ==================`+
`\n ================================================================`);

mainChannel.consume(CHECK_IN_QUEUE, (message) => handleBrokerMessages(CHECK_IN_QUEUE, message), { noAck: true});
mainChannel.consume(AGENT_STATE_QUEUE, (message) => handleBrokerMessages(AGENT_STATE_QUEUE, message), { noAck: true});
mainChannel.consume(AGENT_UPDATE_QUEUE, (message) => handleBrokerMessages(AGENT_UPDATE_QUEUE, message), { noAck: true});
mainChannel.consume(AGENT_MISSION_QUEUE, (message) => handleBrokerMessages(AGENT_MISSION_QUEUE, message), { noAck: true});
secondaryChannel.consume(AGENT_VISUALIZATION_QUEUE, (message) => handleBrokerMessages(AGENT_VISUALIZATION_QUEUE, message), { noAck: true});
secondaryChannel.consume(YARD_VISUALIZATION_QUEUE, (message) => handleBrokerMessages(YARD_VISUALIZATION_QUEUE, message), { noAck: true});


dataChannel.consume(SUMMARY_REQUESTS_QUEUE, (message) => handleBrokerMessages(SUMMARY_REQUESTS_QUEUE, message), { noAck: true});
return dataChannel;
mainChannel.consume(SUMMARY_REQUESTS_QUEUE, (message) => handleBrokerMessages(SUMMARY_REQUESTS_QUEUE, message), { noAck: true});
return dataChannels;
}


Expand Down
8 changes: 4 additions & 4 deletions helyos_server/src/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ const RabbitMQServices = require('./services/message_broker/rabbitMQ_services.js

function connectToRabbitMQ () {
return initialization.initializeRabbitMQAccounts()
.then(() => RabbitMQServices.connectAndOpenChannel({subscribe: false, connect: true, recoverCallback: initialization.helyosConsumingMessages}) )
.then( dataChannel => {
.then(() => RabbitMQServices.connectAndOpenChannels({subscribe: false, connect: true, recoverCallback: initialization.helyosConsumingMessages}) )
.then( dataChannels => {
// SET RABBITMQ EXCHANGE/QUEUES SCHEMA AND THEN SUBSCRIBE TO QUEUES
initialization.configureRabbitMQSchema(dataChannel)
.then( dataChannel => initialization.helyosConsumingMessages(dataChannel));
return initialization.configureRabbitMQSchema(dataChannels)
.then( dataChannels => initialization.helyosConsumingMessages(dataChannels));
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ async function sendReleaseFromWorkProcessRequest(agentId, wpId) {
body: {
work_process_id: parseInt(wpId, 10),
operation_types_required: [], // to be used in the future
work_process_id: parseInt(wpId),
reserved: false
},
_version: MESSAGE_VERSION
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ const processMicroserviceRequest = (servRequestId) => {
})
.catch(e => {
const servResponse = e.data;
logData.addLog('microservice', servRequest, 'error', e );
logData.addLog('microservice', servRequest, 'error', e.message );
return databaseServices.service_requests.updateByConditions({ 'id': servRequestId,
'status__in': [ SERVICE_STATUS.DISPATCHING_SERVICE,
SERVICE_STATUS.WAIT_DEPENDENCIES,
Expand Down
11 changes: 9 additions & 2 deletions helyos_server/src/modules/microservice_orchestration.js
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,15 @@ function createServiceRequestsForWorkProcessType(processType, request, agentIds,
delete s['__depends_on_steps'];
});

console.log("\n\n(1)========= Create service requests for the process type "+ processType +" ==========" );
console.log("service Requests from service matrix", serviceRequests);
console.log("\n\n========= Preparing microservice requests for the process type "+ processType +" ==========" );
serviceRequests.forEach(s => {
console.log(`Request UID: ${s.request_uid}`);
console.log(`Step: ${s.step}`);
console.log(`Service Type: ${s.service_type}`);
console.log(`Service URL: ${s.service_url}`);
console.log(`Next Request UID: ${s.next_request_to_dispatch_uid}`);
console.log("--------------------");
});
console.log("================================================================================");

return serviceRequests;
Expand Down
1 change: 0 additions & 1 deletion helyos_server/src/services/database/postg_access_layer.js
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,6 @@ class DatabaseLayer {
let colNames = [], colValues = [...condColValues], valueMasks = [];
delete patch['id'];

console.log('colValues', colValues);
Object.keys(patch).forEach((key, idx) => {
colNames.push(key);
colValues.push(patch[key]);
Expand Down
Loading

0 comments on commit b1b7343

Please sign in to comment.