diff --git a/lib/bindings/MQTTBinding.js b/lib/bindings/MQTTBinding.js index 89a74ac0..f65844a2 100644 --- a/lib/bindings/MQTTBinding.js +++ b/lib/bindings/MQTTBinding.js @@ -34,6 +34,7 @@ var iotAgentLib = require('iotagent-node-lib'), op: 'IOTAUL.MQTT.Binding' }, mqttClient, + mqttConn, config = require('../configService'); /** @@ -68,7 +69,9 @@ function recreateSubscriptions(callback) { } else { iotAgentLib.alarms.release(constants.MQTTB_ALARM); config.getLogger().debug('Successfully subscribed to the following topics:\n%j\n', topics); - callback(null); + if (callback) { + callback(null); + } } }); } @@ -148,27 +151,88 @@ function start(callback) { connectTimeout: 60 * 60 * 1000 }; - config.getLogger().info(context, 'Starting MQTT binding'); - if (config.getConfig().mqtt && config.getConfig().mqtt.username && config.getConfig().mqtt.password) { options.username = config.getConfig().mqtt.username; options.password = config.getConfig().mqtt.password; } + if (config.getConfig().mqtt.keepalive) { + options.keepalive = parseInt(config.getConfig().mqtt.keepalive) || 0; + } + var retries, retryTime; - mqttClient = mqtt.connect('mqtt://' + config.getConfig().mqtt.host + ':' + config.getConfig().mqtt.port, options); - mqttClient.on('error', function(e) { - config.getLogger().fatal('GLOBAL-002: Couldn\'t connect with MQTT broker: %j', e); - callback(e); - }); + if (config.getConfig() && config.getConfig().mqtt && config.getConfig().mqtt.retries) { + retries = config.getConfig().mqtt.retries; + } else { + retries = constants.MQTT_DEFAULT_RETRIES; + } + if (config.getConfig() && config.getConfig().mqtt && config.getConfig().mqtt.retrytime) { + retryTime = config.getConfig().mqtt.retryTime; + } else { + retryTime = constants.MQTT_DEFAULT_RETRY_TIME; + } + var isConnecting = false; + var numRetried = 0; + config.getLogger().info(context, 'Starting MQTT binding'); + + function createConnection(callback) { + config.getLogger().info(context, 'creating connection'); + if (isConnecting) { + return; + } + isConnecting = true; + mqttClient = mqtt.connect( + 'mqtt://' + config.getConfig().mqtt.host + ':' + config.getConfig().mqtt.port, + options + ); + isConnecting = false; + // TDB: check if error + if (!mqttClient) { + config.getLogger().error(context, 'error mqttClient not created'); + if (numRetried <= retries) { + numRetried++; + return setTimeout(createConnection, retryTime * 1000, callback); + } + } + mqttClient.on('error', function(e) { + /*jshint quotmark: double */ + config.getLogger().fatal("GLOBAL-002: Couldn't connect with MQTT broker: %j", e); + /*jshint quotmark: single */ + callback(e); + }); + mqttClient.on('message', commonBindings.mqttMessageHandler); + mqttClient.on('connect', function(ack) { + config.getLogger().info(context, 'MQTT Client connected'); + recreateSubscriptions(); + }); + mqttClient.on('close', function() { + // If mqttConn is null, the connection has been closed on purpose + if (mqttConn) { + if (numRetried <= retries) { + config.getLogger().error(context, 'reconnecting...'); + numRetried++; + return setTimeout(createConnection, retryTime * 1000); + } + } else { + return; + } + }); - mqttClient.on('message', commonBindings.mqttMessageHandler); + config.getLogger().info(context, 'connected'); + mqttConn = mqttClient; + if (callback) { + callback(); + } + } // function createConnection - mqttClient.on('connect', function(ack) { - config.getLogger().info(context, 'MQTT Client connected'); - recreateSubscriptions(callback); + async.waterfall([createConnection], function(error) { + if (error) { + config.getLogger().debug('MQTT error %j', error); + } + callback(); }); } + /** * Stops the IoT Agent and all the transport plugins. */ @@ -180,6 +244,9 @@ function stop(callback) { mqttClient.end.bind(mqttClient, true) ], function() { config.getLogger().info('MQTT Binding Stopped'); + if (mqttConn) { + mqttConn = null; + } callback(); }); } diff --git a/lib/configService.js b/lib/configService.js index 4f1ab2ac..e8bf6c70 100644 --- a/lib/configService.js +++ b/lib/configService.js @@ -44,6 +44,9 @@ function processEnvironmentVariables() { 'IOTA_MQTT_PASSWORD', 'IOTA_MQTT_QOS', 'IOTA_MQTT_RETAIN', + 'IOTA_MQTT_RETRIES', + 'IOTA_MQTT_RETRY_TIME', + 'IOTA_MQTT_KEEPALIVE', 'IOTA_AMQP_HOST', 'IOTA_AMQP_PORT', 'IOTA_AMQP_USERNAME', @@ -62,7 +65,10 @@ function processEnvironmentVariables() { 'IOTA_MQTT_USERNAME', 'IOTA_MQTT_PASSWORD', 'IOTA_MQTT_QOS', - 'IOTA_MQTT_RETAIN' + 'IOTA_MQTT_RETAIN', + 'IOTA_MQTT_RETRIES', + 'IOTA_MQTT_RETRY_TIME', + 'IOTA_MQTT_KEEPALIVE' ], amqpVariables = [ 'IOTA_AMQP_HOST', @@ -115,6 +121,18 @@ function processEnvironmentVariables() { config.mqtt.retain = process.env.IOTA_MQTT_RETAIN === 'true'; } + if (process.env.IOTA_MQTT_RETRIES) { + config.mqtt.retries = process.env.IOTA_MQTT_RETRIES; + } + + if (process.env.IOTA_MQTT_RETRY_TIME) { + config.mqtt.retryTime = process.env.IOTA_MQTT_RETRY_TIME; + } + + if (process.env.IOTA_MQTT_KEEPALIVE) { + config.mqtt.keepalive = process.env.IOTA_MQTT_KEEPALIVE; + } + if (anyIsSet(amqpVariables)) { config.amqp = {}; } diff --git a/lib/constants.js b/lib/constants.js index 612769a5..45a15ac2 100644 --- a/lib/constants.js +++ b/lib/constants.js @@ -47,6 +47,8 @@ module.exports = { COMMAND_STATUS_COMPLETED: 'OK', MQTTB_ALARM: 'MQTTB-ALARM', + MQTT_DEFAULT_RETRIES: 5, + MQTT_DEFAULT_RETRY_TIME: 5, AMQP_DEFAULT_EXCHANGE: 'amq.topic', AMQP_DEFAULT_QUEUE: 'iotaqueue', diff --git a/package.json b/package.json index 94d0c705..4625abb2 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "iotagent-ul", "description": "IoT Agent for the Ultrlight 2.0 protocol", - "version": "1.7.50", + "version": "1.7.60", "homepage": "https://github.com/telefonicaid/iotagent-ul", "author": { "name": "Daniel Moran", diff --git a/rpm/SPECS/iotaul.spec b/rpm/SPECS/iotaul.spec index ff9859a1..c0c6cdd7 100644 --- a/rpm/SPECS/iotaul.spec +++ b/rpm/SPECS/iotaul.spec @@ -170,6 +170,10 @@ fi %{_install_dir} %changelog + +* Wed Jul 10 2019 Alvaro Vega Garcia 1.7.60 +- Fix: reconnect when MQTT closes connection (including mqtt retries and keepalive conf options) + * Fri Apr 26 2019 Fermin Galan 1.7.50 * Mon Aug 06 2018 Fermin Galan 1.7.0