Skip to content

Commit

Permalink
update MQTTBindings.js
Browse files Browse the repository at this point in the history
  • Loading branch information
fbuedding committed Sep 27, 2023
1 parent 0c5c256 commit 46d46c6
Showing 1 changed file with 85 additions and 106 deletions.
191 changes: 85 additions & 106 deletions lib/bindings/MQTTBinding.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,105 +45,84 @@ function generateTopics(callback) {
const topics = [];

config.getLogger().debug(context, 'Generating topics');

let MQTT_SHARE_SUBSCRIPTION_GROUP_VAR = '';
if (config.getConfig().mqtt.sharedSubscriptionsDisabled === true) {
// With leading slashes
topics.push('/+/+/' + constants.MEASURES_SUFIX + '/+');
topics.push('/' + constants.MQTT_TOPIC_PROTOCOL + '/+/+/' + constants.MEASURES_SUFIX + '/+');
topics.push('/+/+/' + constants.MEASURES_SUFIX);
topics.push('/' + constants.MQTT_TOPIC_PROTOCOL + '/+/+/' + constants.MEASURES_SUFIX);
topics.push('/+/+/' + constants.CONFIGURATION_SUFIX + '/' + constants.CONFIGURATION_COMMAND_SUFIX);
topics.push(
'/' +
constants.MQTT_TOPIC_PROTOCOL +
'/+/+/' +
constants.CONFIGURATION_SUFIX +
'/' +
constants.CONFIGURATION_COMMAND_SUFIX
);
topics.push('/+/+/' + constants.CONFIGURATION_COMMAND_UPDATE);
topics.push('/' + constants.MQTT_TOPIC_PROTOCOL + '/+/+/' + constants.CONFIGURATION_COMMAND_UPDATE);

//Without leading slashes
topics.push('+/+/' + constants.MEASURES_SUFIX + '/+');
topics.push(constants.MQTT_TOPIC_PROTOCOL + '/+/+/' + constants.MEASURES_SUFIX + '/+');
topics.push('+/+/' + constants.MEASURES_SUFIX);
topics.push(constants.MQTT_TOPIC_PROTOCOL + '/+/+/' + constants.MEASURES_SUFIX);
topics.push('+/+/' + constants.CONFIGURATION_SUFIX + '/' + constants.CONFIGURATION_COMMAND_SUFIX);
topics.push(
constants.MQTT_TOPIC_PROTOCOL +
'/+/+/' +
constants.CONFIGURATION_SUFIX +
'/' +
constants.CONFIGURATION_COMMAND_SUFIX
);
topics.push('+/+/' + constants.CONFIGURATION_COMMAND_UPDATE);
topics.push(constants.MQTT_TOPIC_PROTOCOL + '/+/+/' + constants.CONFIGURATION_COMMAND_UPDATE);
} else {
let shareSubscriptionGroup = constants.MQTT_SHARE_SUBSCRIPTION_GROUP;
MQTT_SHARE_SUBSCRIPTION_GROUP_VAR = constants.MQTT_SHARE_SUBSCRIPTION_GROUP;
if (config.getConfig().mqtt.groupIdSufix !== undefined) {
shareSubscriptionGroup =
shareSubscriptionGroup.slice(0, -1) +
MQTT_SHARE_SUBSCRIPTION_GROUP_VAR =
MQTT_SHARE_SUBSCRIPTION_GROUP_VAR.slice(0, -1) +
config.getConfig().mqtt.groupIdSufix +
shareSubscriptionGroup.slice(-1);
MQTT_SHARE_SUBSCRIPTION_GROUP_VAR.slice(-1);
}
// With leading slashes
topics.push(shareSubscriptionGroup + '/+/+/' + constants.MEASURES_SUFIX + '/+');
topics.push(
shareSubscriptionGroup + '/' + constants.MQTT_TOPIC_PROTOCOL + '/+/+/' + constants.MEASURES_SUFIX + '/+'
);
topics.push(shareSubscriptionGroup + '/+/+/' + constants.MEASURES_SUFIX);
topics.push(shareSubscriptionGroup + '/' + constants.MQTT_TOPIC_PROTOCOL + '/+/+/' + constants.MEASURES_SUFIX);
topics.push(
shareSubscriptionGroup +
'/+/+/' +
constants.CONFIGURATION_SUFIX +
'/' +
constants.CONFIGURATION_COMMAND_SUFIX
);
topics.push(
shareSubscriptionGroup +
'/' +
constants.MQTT_TOPIC_PROTOCOL +
'/+/+/' +
constants.CONFIGURATION_SUFIX +
'/' +
constants.CONFIGURATION_COMMAND_SUFIX
);
topics.push(shareSubscriptionGroup + '/+/+/' + constants.CONFIGURATION_COMMAND_UPDATE);
topics.push(
shareSubscriptionGroup +
'/' +
constants.MQTT_TOPIC_PROTOCOL +
'/+/+/' +
constants.CONFIGURATION_COMMAND_UPDATE
);

//Without leading slashes
topics.push(shareSubscriptionGroup + '+/+/' + constants.MEASURES_SUFIX + '/+');
topics.push(shareSubscriptionGroup + constants.MQTT_TOPIC_PROTOCOL + '/+/+/' + constants.MEASURES_SUFIX + '/+');
topics.push(shareSubscriptionGroup + '+/+/' + constants.MEASURES_SUFIX);
topics.push(shareSubscriptionGroup + constants.MQTT_TOPIC_PROTOCOL + '/+/+/' + constants.MEASURES_SUFIX);
topics.push(
shareSubscriptionGroup +
'+/+/' +
constants.CONFIGURATION_SUFIX +
'/' +
constants.CONFIGURATION_COMMAND_SUFIX
);
topics.push(
shareSubscriptionGroup +
constants.MQTT_TOPIC_PROTOCOL +
'/+/+/' +
constants.CONFIGURATION_SUFIX +
'/' +
constants.CONFIGURATION_COMMAND_SUFIX
);
topics.push(shareSubscriptionGroup + '+/+/' + constants.CONFIGURATION_COMMAND_UPDATE);
topics.push(
shareSubscriptionGroup + constants.MQTT_TOPIC_PROTOCOL + '/+/+/' + constants.CONFIGURATION_COMMAND_UPDATE
);
}
// With leading slashes
topics.push(MQTT_SHARE_SUBSCRIPTION_GROUP_VAR + '/+/+/' + constants.MEASURES_SUFIX + '/+');
topics.push(
MQTT_SHARE_SUBSCRIPTION_GROUP_VAR +
'/' +
constants.MQTT_TOPIC_PROTOCOL +
'/+/+/' +
constants.MEASURES_SUFIX +
'/+'
);
topics.push(MQTT_SHARE_SUBSCRIPTION_GROUP_VAR + '/+/+/' + constants.MEASURES_SUFIX);
topics.push(
MQTT_SHARE_SUBSCRIPTION_GROUP_VAR + '/' + constants.MQTT_TOPIC_PROTOCOL + '/+/+/' + constants.MEASURES_SUFIX
);
topics.push(
MQTT_SHARE_SUBSCRIPTION_GROUP_VAR +
'/+/+/' +
constants.CONFIGURATION_SUFIX +
'/' +
constants.CONFIGURATION_COMMAND_SUFIX
);
topics.push(
MQTT_SHARE_SUBSCRIPTION_GROUP_VAR +
'/' +
constants.MQTT_TOPIC_PROTOCOL +
'/+/+/' +
constants.CONFIGURATION_SUFIX +
'/' +
constants.CONFIGURATION_COMMAND_SUFIX
);
topics.push(MQTT_SHARE_SUBSCRIPTION_GROUP_VAR + '/+/+/' + constants.CONFIGURATION_COMMAND_UPDATE);
topics.push(
MQTT_SHARE_SUBSCRIPTION_GROUP_VAR +
'/' +
constants.MQTT_TOPIC_PROTOCOL +
'/+/+/' +
constants.CONFIGURATION_COMMAND_UPDATE
);

//Without leading slashes
topics.push(MQTT_SHARE_SUBSCRIPTION_GROUP_VAR + '+/+/' + constants.MEASURES_SUFIX + '/+');
topics.push(
MQTT_SHARE_SUBSCRIPTION_GROUP_VAR + constants.MQTT_TOPIC_PROTOCOL + '/+/+/' + constants.MEASURES_SUFIX + '/+'
);
topics.push(MQTT_SHARE_SUBSCRIPTION_GROUP_VAR + '+/+/' + constants.MEASURES_SUFIX);
topics.push(MQTT_SHARE_SUBSCRIPTION_GROUP_VAR + constants.MQTT_TOPIC_PROTOCOL + '/+/+/' + constants.MEASURES_SUFIX);
topics.push(
MQTT_SHARE_SUBSCRIPTION_GROUP_VAR +
'+/+/' +
constants.CONFIGURATION_SUFIX +
'/' +
constants.CONFIGURATION_COMMAND_SUFIX
);
topics.push(
MQTT_SHARE_SUBSCRIPTION_GROUP_VAR +
constants.MQTT_TOPIC_PROTOCOL +
'/+/+/' +
constants.CONFIGURATION_SUFIX +
'/' +
constants.CONFIGURATION_COMMAND_SUFIX
);
topics.push(MQTT_SHARE_SUBSCRIPTION_GROUP_VAR + '+/+/' + constants.CONFIGURATION_COMMAND_UPDATE);
topics.push(
MQTT_SHARE_SUBSCRIPTION_GROUP_VAR +
constants.MQTT_TOPIC_PROTOCOL +
'/+/+/' +
constants.CONFIGURATION_COMMAND_UPDATE
);

callback(null, topics);
}
Expand All @@ -157,7 +136,7 @@ function recreateSubscriptions(callback) {
function subscribeToTopics(topics, callback) {
config.getLogger().debug(context, 'Subscribing to topics: %j', topics);
const options = {};
mqttClient.subscribe(topics, options, function (error) {
mqttClient.subscribe(topics, options, function(error) {
if (error) {
iotAgentLib.alarms.raise(constants.MQTTB_ALARM, error);
config.getLogger().error(context, 'GLOBAL-001: Error subscribing to topics: %s', error);
Expand Down Expand Up @@ -300,25 +279,25 @@ function start(callback) {
return setTimeout(createConnection, retryTime * 1000, callback);
}
}
mqttClient.on('error', function (e) {
mqttClient.on('error', function(e) {
/*jshint quotmark: double */
config.getLogger().fatal("GLOBAL-002: Couldn't connect with MQTT broker: %j", e);
/*jshint quotmark: single */
mqttClient.end();
});
mqttClient.on('message', commonBindings.mqttMessageHandler);
mqttClient.on('connect', function (ack) {
mqttClient.on('connect', function(ack) {
config.getLogger().info(context, 'MQTT Client connected');
recreateSubscriptions();
numRetried = 1;
});
mqttClient.on('reconnect', function () {
mqttClient.on('reconnect', function() {
config.getLogger().debug(context, 'MQTT Client reconnect');
});
mqttClient.on('offline', function () {
mqttClient.on('offline', function() {
config.getLogger().debug(context, 'MQTT Client offline');
});
mqttClient.on('close', function () {
mqttClient.on('close', function() {
config.getLogger().info(context, 'MQTT Client closed');
// If mqttConn is null, the connection has been closed on purpose
if (mqttConn) {
Expand All @@ -339,7 +318,7 @@ function start(callback) {
}
} // function createConnection

async.waterfall([createConnection], function (error) {
async.waterfall([createConnection], function(error) {
if (error) {
config.getLogger().info('MQTT error %j', error);
}
Expand Down Expand Up @@ -371,7 +350,7 @@ function deviceUpdatingHandler(device, callback) {
function stop(callback) {
config.getLogger().info('Stopping MQTT Binding');

async.series([unsubscribeAll, mqttClient.end.bind(mqttClient, true)], function () {
async.series([unsubscribeAll, mqttClient.end.bind(mqttClient, true)], function() {
config.getLogger().info('MQTT Binding Stopped');
if (mqttConn) {
mqttConn = null;
Expand All @@ -397,14 +376,14 @@ function executeCommand(apiKey, device, cmdName, serializedPayload, contentType,
commands[cmdName].mqtt && commands[cmdName].mqtt.qos
? parseInt(commands[cmdName].mqtt.qos)
: config.getConfig().mqtt.qos
? parseInt(config.getConfig().mqtt.qos)
: 0;
? parseInt(config.getConfig().mqtt.qos)
: 0;
options.retain =
commands[cmdName].mqtt && commands[cmdName].mqtt.retain
? commands[cmdName].mqtt.retain
: config.getConfig().mqtt.retain
? config.getConfig().mqtt.retain
: false;
? config.getConfig().mqtt.retain
: false;

const commandTopic = '/' + apiKey + '/' + device.id + '/cmd';
config
Expand Down

0 comments on commit 46d46c6

Please sign in to comment.