Skip to content

Commit

Permalink
Merge pull request #401 from telefonicaid/task/use_mqtt_share_subscri…
Browse files Browse the repository at this point in the history
…ptions

Task/use mqtt share subscriptions  and protocol in topic
  • Loading branch information
mrutid authored Nov 19, 2019
2 parents 951d771 + def3c76 commit 44f647d
Show file tree
Hide file tree
Showing 10 changed files with 105 additions and 63 deletions.
12 changes: 6 additions & 6 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ jobs:
node_js: 8

before_install:
- docker pull ansi/mosquitto
- docker run -d -p 1883:1883 -l mosquitto ansi/mosquitto
- docker pull eclipse-mosquitto:1.6.7
- docker run -d -p 1883:1883 -l mosquitto eclipse-mosquitto:1.6.7

before_script:
- npm run lint
Expand All @@ -43,8 +43,8 @@ jobs:
node_js: 10

before_install:
- docker pull ansi/mosquitto
- docker run -d -p 1883:1883 -l mosquitto ansi/mosquitto
- docker pull eclipse-mosquitto:1.6.7
- docker run -d -p 1883:1883 -l mosquitto eclipse-mosquitto:1.6.7

before_script:
- npm run lint
Expand All @@ -57,8 +57,8 @@ jobs:
node_js: 12

before_install:
- docker pull ansi/mosquitto
- docker run -d -p 1883:1883 -l mosquitto ansi/mosquitto
- docker pull eclipse-mosquitto:1.6.7
- docker run -d -p 1883:1883 -l mosquitto eclipse-mosquitto:1.6.7

before_script:
- npm run lint
Expand Down
2 changes: 2 additions & 0 deletions CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
- Allow use protocol ("/ul") in all mqtt topics subscribed by the agent (#287)
- Use MQTT v5 shared subscriptions to avoid dupplicated messages per agent type (upgrade mqtt dep from 2.18.8 to 3.0.0). Needs MQTT v5 broker like mosquitto 1.6+
- Use AMQP durable option in assertExchange
- Use device apikey if exists in getEffectiveApiKey for command handling
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ This project is part of [FIWARE](https://www.fiware.org/). For more information
[IoT Agents](https://github.com/Fiware/catalogue/tree/master/iot-agents).

| :books: [Documentation](https://fiware-iotagent-ul.readthedocs.io) | :mortar_board: [Academy](https://fiware-academy.readthedocs.io/en/latest/iot-agents/idas) | :whale: [Docker Hub](https://hub.docker.com/r/fiware/iotagent-ul/) | :dart: [Roadmap](https://github.com/telefonicaid/iotagent-ul/blob/master/docs/roadmap.md) |
| ----------------------------------------------------------- | ----------------------------------------------------------------------------------------- | ------------------------------------------------------------------ | ----------------------------------------------------------------------------------------- |
| ------------------------------------------------------------------ | ----------------------------------------------------------------------------------------- | ------------------------------------------------------------------ | ----------------------------------------------------------------------------------------- |


## Contents
Expand Down Expand Up @@ -68,8 +68,8 @@ The following features are listed as [deprecated](docs/deprecated.md).
## API

Apiary reference for the Configuration API can be found
[here](https://telefonicaiotiotagents.docs.apiary.io/#reference/configuration-api). More information about IoT Agents and
their APIs can be found in the IoT Agent Library [documentation](https://iotagent-node-lib.readthedocs.io/).
[here](https://telefonicaiotiotagents.docs.apiary.io/#reference/configuration-api). More information about IoT Agents
and their APIs can be found in the IoT Agent Library [documentation](https://iotagent-node-lib.readthedocs.io/).

The latest IoT Agent for Ultralight documentation is also available on
[ReadtheDocs](https://fiware-iotagent-ul.readthedocs.io/en/latest/)
Expand All @@ -84,7 +84,7 @@ Module mocking during testing can be done with [proxyquire](https://github.com/t

In order to successfuly run the tests, on the local machine three services must be running:

- **mosquitto** MQTT broker;
- **mosquitto** MQTT v5 broker;
- **mongo** Database;
- **rabbitmq** AMQP broker/server;

Expand Down
30 changes: 17 additions & 13 deletions docs/usermanual.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,23 +193,27 @@ Some additional remarks regarding polling commands:
MQTT is a machine-to-machine (M2M)/IoT connectivity protocol, focused on a lightweight interaction between peers. MQTT
is based on publish-subscribe mechanisms over a hierarchical set of topics defined by the user.

This section specifies the topics and messages allowed when using MQTT as the transport protocol for Ultralight 2.0. All
the topics used with the MQTT protocol contain the same prefix:
This section specifies the topics and messages allowed when using MQTT as the transport protocol for Ultralight 2.0. All
the topics subscribed by the agent (to send measures, to configuration command retrieval or to get result
of a command) are prefixed with the agent procotol:

```text
<apiKey>/<deviceId>
/ul/<apiKey>/<deviceId>
```

where `<apiKey>` is the API Key assigned to the service and `<deviceId>` is the ID of the device.

All topis published by the agent (to send a comamnd or to send configuration information) to a device are not prefixed
by the protocol, in this case '/ul', just include apikey and deviceid (e.g: `/FF957A98/MydeviceId/cmd` and
`/FF957A98/MyDeviceId/configuration/values` ).

This transport protocol binding is still under development.

##### Sending a single measure in one message

In order to send a single measure value to the server, the device must publish the plain value to the following topic:

```text
<apiKey>/<deviceId>/attrs/<attrName>
/ul/<apiKey>/<deviceId>/attrs/<attrName>
```

Where `<apiKey>` and `<deviceId>` have the typical meaning and `<attrName>` is the name of the measure the device is
Expand All @@ -219,15 +223,15 @@ or instance, if using [Mosquitto](https://mosquitto.org/) with a device with ID
attribute IDs `h` and `t`, then humidity measures are reported this way:

```bash
$ mosquitto_pub -t /ABCDEF/id_sen1/attrs/h -m 70 -h <mosquitto_broker> -p <mosquitto_port> -u <user> -P <password>
$ mosquitto_pub -t /ul/ABCDEF/id_sen1/attrs/h -m 70 -h <mosquitto_broker> -p <mosquitto_port> -u <user> -P <password>
```

##### Sending multiple measures in one message

In order to send multiple measures in a single message, a device must publish a message in the following topic:

```text
<apiKey>/<deviceId>/attrs
/ul/<apiKey>/<deviceId>/attrs
```

Where `<apiKey>` and `<deviceId>` have the typical meaning. The payload of such message should be a legal Ultralight 2.0
Expand All @@ -237,7 +241,7 @@ For instance, if using [Mosquitto](https://mosquitto.org/) with a device with ID
attribute IDs `h` and `t`, then all measures (humidity and temperature) are reported this way:

```bash
$ mosquitto_pub -t /ABCDEF/id_sen1/attrs -m 'h|70|t|15' -h <mosquitto_broker> -p <mosquitto_port> -u <user> -P <password>
$ mosquitto_pub -t /ul/ABCDEF/id_sen1/attrs -m 'h|70|t|15' -h <mosquitto_broker> -p <mosquitto_port> -u <user> -P <password>
```

##### Configuration retrieval
Expand All @@ -252,7 +256,7 @@ This mechanism and the bidirectionality plugin cannot be simultaneously activate
##### Configuration command topic

```text
/{{apikey}}/{{deviceid}}/configuration/commands
/ul/{{apikey}}/{{deviceid}}/configuration/commands
```

The IoT Agent listens in this topic for requests coming from the device. The messages must contain an Ultralight 2.0
Expand Down Expand Up @@ -313,13 +317,13 @@ result to another topic.
The _commands topic_, where the client will be subscribed has the following format:

```text
<apiKey>/<deviceId>/cmd
/<apiKey>/<deviceId>/cmd
```

The result of the command must be reported in the following topic:

```text
<apiKey>/<deviceId>/cmdexe
/ul/<apiKey>/<deviceId>/cmdexe
```

The command execution and command reporting payload format is specified under the Ultralight 2.0 Commands Syntax, above.
Expand Down Expand Up @@ -363,7 +367,7 @@ $ mosquitto_sub -v -t /# -h <mosquitto_broker> -p <mosquitto_port> -u <user> -P
At this point, Context Broker will have updated the value of `ping_status` to `PENDING` for `sen1` entity. Neither
`ping_info` nor `ping` are updated.

Once the device has executed the command, it can publish its results in the `/ABCDEF/id_sen1/cmdexe` topic with a
Once the device has executed the command, it can publish its results in the `/ul/ABCDEF/id_sen1/cmdexe` topic with a
payload with the following format:

```text
Expand All @@ -373,7 +377,7 @@ id_sen1@ping|1234567890
If using [Mosquitto](https://mosquitto.org/), such command result is sent by running the `mosquitto_pub` script:

```bash
$ mosquitto_pub -t /ABCDEF/id_sen1/cmdexe -m 'id_sen1@ping|1234567890' -h <mosquitto_broker> -p <mosquitto_port> -u <user> -P <password>
$ mosquitto_pub -t /ul/ABCDEF/id_sen1/cmdexe -m 'id_sen1@ping|1234567890' -h <mosquitto_broker> -p <mosquitto_port> -u <user> -P <password>
```

In the end, Context Broker will have updated the values of `ping_info` and `ping_status` to `1234567890` and `OK`,
Expand Down
64 changes: 45 additions & 19 deletions lib/bindings/MQTTBinding.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,40 @@ function generateTopics(callback) {
var topics = [];

config.getLogger().debug(context, 'Generating topics');
topics.push('/+/+/' + constants.MEASURES_SUFIX + '/+');
topics.push('/+/+/' + constants.MEASURES_SUFIX);
topics.push('/+/+/' + constants.CONFIGURATION_SUFIX + '/' + constants.CONFIGURATION_COMMAND_SUFIX);
topics.push('/+/+/' + constants.CONFIGURATION_COMMAND_UPDATE);
topics.push(constants.MQTT_SHARE_SUBSCRIPTION_GROUP + '/+/+/' + constants.MEASURES_SUFIX + '/+');
topics.push(
constants.MQTT_SHARE_SUBSCRIPTION_GROUP +
constants.MQTT_TOPIC_PROTOCOL +
'/+/+/' +
constants.MEASURES_SUFIX +
'/+'
);
topics.push(constants.MQTT_SHARE_SUBSCRIPTION_GROUP + '/+/+/' + constants.MEASURES_SUFIX);
topics.push(
constants.MQTT_SHARE_SUBSCRIPTION_GROUP + constants.MQTT_TOPIC_PROTOCOL + '/+/+/' + constants.MEASURES_SUFIX
);
topics.push(
constants.MQTT_SHARE_SUBSCRIPTION_GROUP +
'/+/+/' +
constants.CONFIGURATION_SUFIX +
'/' +
constants.CONFIGURATION_COMMAND_SUFIX
);
topics.push(
constants.MQTT_SHARE_SUBSCRIPTION_GROUP +
constants.MQTT_TOPIC_PROTOCOL +
'/+/+/' +
constants.CONFIGURATION_SUFIX +
'/' +
constants.CONFIGURATION_COMMAND_SUFIX
);
topics.push(constants.MQTT_SHARE_SUBSCRIPTION_GROUP + '/+/+/' + constants.CONFIGURATION_COMMAND_UPDATE);
topics.push(
constants.MQTT_SHARE_SUBSCRIPTION_GROUP +
constants.MQTT_TOPIC_PROTOCOL +
'/+/+/' +
constants.CONFIGURATION_COMMAND_UPDATE
);

callback(null, topics);
}
Expand Down Expand Up @@ -120,8 +150,8 @@ function generateCommandExecution(apiKey, device, attribute) {
device.id,
payload
);

return mqttClient.publish.bind(mqttClient, '/' + apiKey + '/' + device.id + '/cmd', payload, options);
var commandTopic = '/' + apiKey + '/' + device.id + '/cmd';
return mqttClient.publish.bind(mqttClient, commandTopic, payload, options);
}

/**
Expand Down Expand Up @@ -172,20 +202,16 @@ function sendConfigurationToDevice(apiKey, deviceId, results, callback) {
apiKey,
payload
);

mqttClient.publish(
var commandTopic =
'/' +
apiKey +
'/' +
deviceId +
'/' +
constants.CONFIGURATION_SUFIX +
'/' +
constants.CONFIGURATION_VALUES_SUFIX,
payload,
options,
callback
);
apiKey +
'/' +
deviceId +
'/' +
constants.CONFIGURATION_SUFIX +
'/' +
constants.CONFIGURATION_VALUES_SUFIX;
mqttClient.publish(commandTopic, payload, options, callback);
}

/**
Expand Down
14 changes: 11 additions & 3 deletions lib/commonBindings.js
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,18 @@ function singleMeasure(apiKey, attribute, device, message) {
*/
function messageHandler(topic, message, protocol) {
var topicInformation = topic.split('/'),
apiKey = topicInformation[1],
deviceId = topicInformation[2],
messageStr = message.toString(),
apiKey,
deviceId,
messageStr,
parsedMessage;

if ('ul' === topicInformation[1].toLowerCase()) {
topicInformation.splice(1, 1);
}
apiKey = topicInformation[1];
deviceId = topicInformation[2];
messageStr = message.toString();

function processMessageForDevice(device, apiKey, topicInformation) {
iotAgentLib.alarms.release(constants.MQTTB_ALARM);

Expand Down Expand Up @@ -322,6 +329,7 @@ function amqpMessageHandler(topic, message) {
* @param {Object} message MQTT message body (Object or Buffer, depending on the value).
*/
function mqttMessageHandler(topic, message) {
config.getLogger().debug(context, 'message topic: %s', topic);
messageHandler(topic, message, 'MQTT');
}

Expand Down
2 changes: 2 additions & 0 deletions lib/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ module.exports = {
MQTTB_ALARM: 'MQTTB-ALARM',
MQTT_DEFAULT_RETRIES: 5,
MQTT_DEFAULT_RETRY_TIME: 5,
MQTT_SHARE_SUBSCRIPTION_GROUP: '$share/ul/',
MQTT_TOPIC_PROTOCOL: '/ul',

AMQP_DEFAULT_EXCHANGE: 'amq.topic',
AMQP_DEFAULT_QUEUE: 'iotaqueue',
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
"express": "~4.16.4",
"iotagent-node-lib": "git://github.com/telefonicaid/iotagent-node-lib.git#master",
"logops": "2.1.0",
"mqtt": "2.18.8",
"mqtt": "3.0.0",
"request": "2.88.0",
"underscore": "1.9.1"
},
Expand Down
16 changes: 8 additions & 8 deletions test/unit/mqttBinding-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ describe('MQTT Transport binding: measures', function() {
});

it('should send a new update context request to the Context Broker with just that attribute', function(done) {
mqttClient.publish('/1234/MQTT_2/attrs/a', '23', null, function(error) {
mqttClient.publish('/ul/1234/MQTT_2/attrs/a', '23', null, function(error) {
setTimeout(function() {
contextBrokerMock.done();
done();
Expand Down Expand Up @@ -126,7 +126,7 @@ describe('MQTT Transport binding: measures', function() {
});

it('should send a new update context request to the Context Broker with just that attribute', function(done) {
mqttClient.publish('/80K09H324HV8732/MQTT_UNPROVISIONED/attrs/a', '23', null, function(error) {
mqttClient.publish('/ul/80K09H324HV8732/MQTT_UNPROVISIONED/attrs/a', '23', null, function(error) {
setTimeout(function() {
contextBrokerUnprovMock.done();
done();
Expand All @@ -145,7 +145,7 @@ describe('MQTT Transport binding: measures', function() {
});

it('should send a single update context request with all the attributes', function(done) {
mqttClient.publish('/1234/MQTT_2/attrs', 'a|23', null, function(error) {
mqttClient.publish('/ul/1234/MQTT_2/attrs', 'a|23', null, function(error) {
setTimeout(function() {
contextBrokerMock.done();
done();
Expand All @@ -164,7 +164,7 @@ describe('MQTT Transport binding: measures', function() {
});

it('should silently ignore the error (without crashing)', function(done) {
mqttClient.publish('/1234/MQTT_2/attrs', 'notAULPayload ', null, function(error) {
mqttClient.publish('/ul/1234/MQTT_2/attrs', 'notAULPayload ', null, function(error) {
setTimeout(function() {
done();
}, 100);
Expand All @@ -182,7 +182,7 @@ describe('MQTT Transport binding: measures', function() {
});

it('should send one update context per measure group to the Contet Broker', function(done) {
mqttClient.publish('/1234/MQTT_2/attrs', 'a|23|b|98', null, function(error) {
mqttClient.publish('/ul/1234/MQTT_2/attrs', 'a|23|b|98', null, function(error) {
setTimeout(function() {
contextBrokerMock.done();
done();
Expand All @@ -207,7 +207,7 @@ describe('MQTT Transport binding: measures', function() {
});

it('should send a two update context requests to the Context Broker one with each attribute', function(done) {
mqttClient.publish('/1234/MQTT_2/attrs', 'a|23#b|98', null, function(error) {
mqttClient.publish('/ul/1234/MQTT_2/attrs', 'a|23#b|98', null, function(error) {
setTimeout(function() {
contextBrokerMock.done();
done();
Expand All @@ -231,7 +231,7 @@ describe('MQTT Transport binding: measures', function() {
});

it('should send a two update context requests to the Context Broker one with each attribute', function(done) {
mqttClient.publish('/1234/MQTT_2/attrs', 'a|23|b|98#a|16|b|34', null, function(error) {
mqttClient.publish('/ul/1234/MQTT_2/attrs', 'a|23|b|98#a|16|b|34', null, function(error) {
setTimeout(function() {
contextBrokerMock.done();
done();
Expand Down Expand Up @@ -281,7 +281,7 @@ describe('MQTT Transport binding: measures', function() {

it('should use the provided TimeInstant as the general timestamp for the measures', function(done) {
mqttClient.publish(
'/1234/timestampedDevice/attrs',
'/ul/1234/timestampedDevice/attrs',
'tmp|24.4|tt|2016-09-26T12:19:26.476659Z',
null,
function(error) {
Expand Down
Loading

0 comments on commit 44f647d

Please sign in to comment.