Skip to content

Commit

Permalink
Merge pull request #369 from telefonicaid/task/mqtt_reconnect_when_cl…
Browse files Browse the repository at this point in the history
…ose_hotfix_1_7_6

Fix reconnect when close hotfix 1.7.6
  • Loading branch information
mrutid authored Jul 10, 2019
2 parents e45a610 + 093c3e9 commit 3e73b2f
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 14 deletions.
91 changes: 79 additions & 12 deletions lib/bindings/MQTTBinding.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ var iotAgentLib = require('iotagent-node-lib'),
op: 'IOTAUL.MQTT.Binding'
},
mqttClient,
mqttConn,
config = require('../configService');

/**
Expand Down Expand Up @@ -68,7 +69,9 @@ function recreateSubscriptions(callback) {
} else {
iotAgentLib.alarms.release(constants.MQTTB_ALARM);
config.getLogger().debug('Successfully subscribed to the following topics:\n%j\n', topics);
callback(null);
if (callback) {
callback(null);
}
}
});
}
Expand Down Expand Up @@ -148,27 +151,88 @@ function start(callback) {
connectTimeout: 60 * 60 * 1000
};

config.getLogger().info(context, 'Starting MQTT binding');

if (config.getConfig().mqtt && config.getConfig().mqtt.username && config.getConfig().mqtt.password) {
options.username = config.getConfig().mqtt.username;
options.password = config.getConfig().mqtt.password;
}
if (config.getConfig().mqtt.keepalive) {
options.keepalive = parseInt(config.getConfig().mqtt.keepalive) || 0;
}
var retries, retryTime;

mqttClient = mqtt.connect('mqtt://' + config.getConfig().mqtt.host + ':' + config.getConfig().mqtt.port, options);
mqttClient.on('error', function(e) {
config.getLogger().fatal('GLOBAL-002: Couldn\'t connect with MQTT broker: %j', e);
callback(e);
});
if (config.getConfig() && config.getConfig().mqtt && config.getConfig().mqtt.retries) {
retries = config.getConfig().mqtt.retries;
} else {
retries = constants.MQTT_DEFAULT_RETRIES;
}
if (config.getConfig() && config.getConfig().mqtt && config.getConfig().mqtt.retrytime) {
retryTime = config.getConfig().mqtt.retryTime;
} else {
retryTime = constants.MQTT_DEFAULT_RETRY_TIME;
}
var isConnecting = false;
var numRetried = 0;
config.getLogger().info(context, 'Starting MQTT binding');

function createConnection(callback) {
config.getLogger().info(context, 'creating connection');
if (isConnecting) {
return;
}
isConnecting = true;
mqttClient = mqtt.connect(
'mqtt://' + config.getConfig().mqtt.host + ':' + config.getConfig().mqtt.port,
options
);
isConnecting = false;
// TDB: check if error
if (!mqttClient) {
config.getLogger().error(context, 'error mqttClient not created');
if (numRetried <= retries) {
numRetried++;
return setTimeout(createConnection, retryTime * 1000, callback);
}
}
mqttClient.on('error', function(e) {
/*jshint quotmark: double */
config.getLogger().fatal("GLOBAL-002: Couldn't connect with MQTT broker: %j", e);
/*jshint quotmark: single */
callback(e);
});
mqttClient.on('message', commonBindings.mqttMessageHandler);
mqttClient.on('connect', function(ack) {
config.getLogger().info(context, 'MQTT Client connected');
recreateSubscriptions();
});
mqttClient.on('close', function() {
// If mqttConn is null, the connection has been closed on purpose
if (mqttConn) {
if (numRetried <= retries) {
config.getLogger().error(context, 'reconnecting...');
numRetried++;
return setTimeout(createConnection, retryTime * 1000);
}
} else {
return;
}
});

mqttClient.on('message', commonBindings.mqttMessageHandler);
config.getLogger().info(context, 'connected');
mqttConn = mqttClient;
if (callback) {
callback();
}
} // function createConnection

mqttClient.on('connect', function(ack) {
config.getLogger().info(context, 'MQTT Client connected');
recreateSubscriptions(callback);
async.waterfall([createConnection], function(error) {
if (error) {
config.getLogger().debug('MQTT error %j', error);
}
callback();
});
}


/**
* Stops the IoT Agent and all the transport plugins.
*/
Expand All @@ -180,6 +244,9 @@ function stop(callback) {
mqttClient.end.bind(mqttClient, true)
], function() {
config.getLogger().info('MQTT Binding Stopped');
if (mqttConn) {
mqttConn = null;
}
callback();
});
}
Expand Down
20 changes: 19 additions & 1 deletion lib/configService.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ function processEnvironmentVariables() {
'IOTA_MQTT_PASSWORD',
'IOTA_MQTT_QOS',
'IOTA_MQTT_RETAIN',
'IOTA_MQTT_RETRIES',
'IOTA_MQTT_RETRY_TIME',
'IOTA_MQTT_KEEPALIVE',
'IOTA_AMQP_HOST',
'IOTA_AMQP_PORT',
'IOTA_AMQP_USERNAME',
Expand All @@ -62,7 +65,10 @@ function processEnvironmentVariables() {
'IOTA_MQTT_USERNAME',
'IOTA_MQTT_PASSWORD',
'IOTA_MQTT_QOS',
'IOTA_MQTT_RETAIN'
'IOTA_MQTT_RETAIN',
'IOTA_MQTT_RETRIES',
'IOTA_MQTT_RETRY_TIME',
'IOTA_MQTT_KEEPALIVE'
],
amqpVariables = [
'IOTA_AMQP_HOST',
Expand Down Expand Up @@ -115,6 +121,18 @@ function processEnvironmentVariables() {
config.mqtt.retain = process.env.IOTA_MQTT_RETAIN === 'true';
}

if (process.env.IOTA_MQTT_RETRIES) {
config.mqtt.retries = process.env.IOTA_MQTT_RETRIES;
}

if (process.env.IOTA_MQTT_RETRY_TIME) {
config.mqtt.retryTime = process.env.IOTA_MQTT_RETRY_TIME;
}

if (process.env.IOTA_MQTT_KEEPALIVE) {
config.mqtt.keepalive = process.env.IOTA_MQTT_KEEPALIVE;
}

if (anyIsSet(amqpVariables)) {
config.amqp = {};
}
Expand Down
2 changes: 2 additions & 0 deletions lib/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ module.exports = {
COMMAND_STATUS_COMPLETED: 'OK',

MQTTB_ALARM: 'MQTTB-ALARM',
MQTT_DEFAULT_RETRIES: 5,
MQTT_DEFAULT_RETRY_TIME: 5,

AMQP_DEFAULT_EXCHANGE: 'amq.topic',
AMQP_DEFAULT_QUEUE: 'iotaqueue',
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "iotagent-ul",
"description": "IoT Agent for the Ultrlight 2.0 protocol",
"version": "1.7.50",
"version": "1.7.60",
"homepage": "https://github.com/telefonicaid/iotagent-ul",
"author": {
"name": "Daniel Moran",
Expand Down
4 changes: 4 additions & 0 deletions rpm/SPECS/iotaul.spec
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ fi
%{_install_dir}

%changelog

* Wed Jul 10 2019 Alvaro Vega Garcia <alvaro.vegagarcia@telefonica.com> 1.7.60
- Fix: reconnect when MQTT closes connection (including mqtt retries and keepalive conf options)

* Fri Apr 26 2019 Fermin Galan <fermin.galanmarquez@telefonica.com> 1.7.50

* Mon Aug 06 2018 Fermin Galan <fermin.galanmarquez@telefonica.com> 1.7.0
Expand Down

0 comments on commit 3e73b2f

Please sign in to comment.