Skip to content

Commit

Permalink
increasing msg ttl
Browse files Browse the repository at this point in the history
  • Loading branch information
cviolbarbosa committed May 6, 2024
1 parent c79bfa9 commit 29dce91
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 9 deletions.
6 changes: 3 additions & 3 deletions helyos_server/src/initialization.js
Original file line number Diff line number Diff line change
Expand Up @@ -139,17 +139,17 @@ async function configureRabbitMQSchema(dataChannel) {
// ONE CANNOT ASCRIBE SEPARATED EXCHANGES FOR UL AND DL WHEN USE MQTT.
await dataChannel.assertExchange(AGENT_MQTT_EXCHANGE, 'topic', { durable: true });

await dataChannel.assertQueue(AGENT_UPDATE_QUEUE, {exclusive: false, durable:true, arguments: {"x-message-ttl" : 10000}});
await dataChannel.assertQueue(AGENT_UPDATE_QUEUE, {exclusive: false, durable:true, arguments: {"x-message-ttl" : 3600000}});
await dataChannel.bindQueue(AGENT_UPDATE_QUEUE, AGENTS_UL_EXCHANGE, "*.*.update");
await dataChannel.bindQueue(AGENT_UPDATE_QUEUE, AGENTS_UL_EXCHANGE, "*.*.fact_sheet");
await dataChannel.bindQueue(AGENT_UPDATE_QUEUE, AGENT_MQTT_EXCHANGE, "*.*.update");
await dataChannel.bindQueue(AGENT_UPDATE_QUEUE, AGENT_MQTT_EXCHANGE, "*.*.fact_sheet"); // VDA-5050 COMPATIBLE

await dataChannel.assertQueue(AGENT_VISUALIZATION_QUEUE, {exclusive: false, durable:false, arguments: {"x-message-ttl" : 10}});
await dataChannel.assertQueue(AGENT_VISUALIZATION_QUEUE, {exclusive: false, durable:false, arguments: {"x-message-ttl" : 2000}});
await dataChannel.bindQueue(AGENT_VISUALIZATION_QUEUE, AGENTS_UL_EXCHANGE, "agent.*.visualization");
await dataChannel.bindQueue(AGENT_VISUALIZATION_QUEUE, AGENT_MQTT_EXCHANGE, "agent.*.visualization"); // VDA-5050 COMPATIBLE

await dataChannel.assertQueue(YARD_VISUALIZATION_QUEUE, {exclusive: false, durable:false, arguments: {"x-message-ttl" : 10}});
await dataChannel.assertQueue(YARD_VISUALIZATION_QUEUE, {exclusive: false, durable:false, arguments: {"x-message-ttl" : 2000}});
await dataChannel.bindQueue(YARD_VISUALIZATION_QUEUE, AGENTS_UL_EXCHANGE, "yard.*.visualization");
await dataChannel.bindQueue(YARD_VISUALIZATION_QUEUE, AGENT_MQTT_EXCHANGE, "yard.*.visualization");

Expand Down
16 changes: 10 additions & 6 deletions helyos_server/src/services/in_mem_database/mem_database_service.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
const { logData } = require("../../modules/systemlog");
const { setDBTimeout } = require("../database/database_services");

MAX_PENDING_UPDATES = 20;
SHORT_TIMEOUT = Math.round(parseInt(process.env.DB_BUFFER_TIME || 1000)/2);
LONG_TIMEOUT = SHORT_TIMEOUT * MAX_PENDING_UPDATES;
const DB_BUFFER_TIME = parseInt(process.env.DB_BUFFER_TIME || 1000);
const LONG_TIMEOUT = 2000; // Maximum time for the database update

let MAX_PENDING_UPDATES = LONG_TIMEOUT / DB_BUFFER_TIME;
MAX_PENDING_UPDATES = MAX_PENDING_UPDATES > 5 ? MAX_PENDING_UPDATES : 5;
const SHORT_TIMEOUT = DB_BUFFER_TIME / 2;


/**
Expand All @@ -22,12 +25,12 @@ LONG_TIMEOUT = SHORT_TIMEOUT * MAX_PENDING_UPDATES;
*
* shortTimeout and longTimeout are used to dynamically adjust the update timeout (this.updateTimeout) based on the number of pending promisses updates.
*
* If the number of pending promisses is too big, the update timeout is reduced to shortTimeout.
* If the number of pending promisses is higher than MAX_PENDING_UPDATES, the update timeout is reduced to shortTimeout.
* If the number of pending promisses keeps increasing to higher then 2 x MAX_PENDING_UPDATES, the system will additionally block new updates.
* The smaller timeout causes the the database update promises to expire which will reduce the number of pending promisses, decresing the system load.
* Therefore, we are accepting to lose some updates in order to avoid the system to be blocked.
*
* If the number of pending promisses is small, the update timeout is increased to longTimeout.
* Such that the database has enough time to process all updates.
*
*
*/
Expand Down Expand Up @@ -229,7 +232,8 @@ class InMemDB {


dispatchUpdatePromise(promiseTrigger, numberUpdates=1) {
if (this.pendingPromises > this.limitWaitingFlushes) {
// Damaging control: if the number of pending promisses is too big, block new updates and accept the losts.
if (this.pendingPromises > 2 * this.limitWaitingFlushes) {
this.lostUpdates += numberUpdates;
return Promise.resolve();
}
Expand Down

0 comments on commit 29dce91

Please sign in to comment.