Skip to content

Commit

Permalink
Merge pull request #1670 from telefonicaid/task/store_last_measure
Browse files Browse the repository at this point in the history
Task/store last measure
  • Loading branch information
fgalan authored Jan 13, 2025
2 parents 69042f9 + b8c5b6e commit 3775aa9
Show file tree
Hide file tree
Showing 17 changed files with 180 additions and 24 deletions.
2 changes: 2 additions & 0 deletions CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
- Add: store last measure in device (by id, apikey, service and subservice) and new API field storeLastMeasure at group and device levels (#1669)
- Add: IOTA_STORE_LAST_MEASURE env var to set default store last measure behaviour at instance level (#1669)
- Upgrade express dep from 4.19.2 to 4.20.0
- Upgrade mongodb devdep from 4.17.1 to 4.17.2
- Upgrade mongoose dep from 5.13.20 to 8.8.4 (solving vulnerability CVE-2024-53900) (#1674)
3 changes: 2 additions & 1 deletion config.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ var config = {
providerUrl: 'http://192.168.56.1:4041',
deviceRegistrationDuration: 'P1M',
defaultType: 'Thing',
expressLimit: '1Mb'
expressLimit: '1Mb',
storeLastMeasure: false
};

module.exports = config;
38 changes: 33 additions & 5 deletions doc/admin.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ allowing the computer to interpret the rest of the data with more clarity and de
```

Under mixed mode, **NGSI v2** payloads are used for context broker communications by default, but this payload may also
be switched to **NGSI LD** at group or device provisioning time using the `ngsiVersion` field in the
provisioning API. The `ngsiVersion` field switch may be added at either group or device level, with the device level
overriding the group setting.
be switched to **NGSI LD** at group or device provisioning time using the `ngsiVersion` field in the provisioning API.
The `ngsiVersion` field switch may be added at either group or device level, with the device level overriding the group
setting.

#### `server`

Expand Down Expand Up @@ -306,7 +306,8 @@ added `agentPath`:

#### `types`

This parameter includes additional groups configuration as described into the [Config group API](api.md#config-group-api) section.
This parameter includes additional groups configuration as described into the
[Config group API](api.md#config-group-api) section.

#### `service`

Expand Down Expand Up @@ -415,7 +416,33 @@ IotAgents, as all Express applications that use the body-parser middleware, have
size that the application will handle. This default limit for ioiotagnets are 1Mb. So, if your IotAgent receives a
request with a body that exceeds this limit, the application will throw a “Error: Request entity too large”.

The 1Mb default can be changed setting the `expressLimit` configuration parameter (or equivalente `IOTA_EXPRESS_LIMIT` environment variable).
The 1Mb default can be changed setting the `expressLimit` configuration parameter (or equivalente `IOTA_EXPRESS_LIMIT`
environment variable).

#### `storeLastMeasure`

If this flag is activated, last measure arrived to Device IoTAgent without be processed will be stored in Device under
`lastMeasure` field (composed of sub-fields `timestamp` and `measure` for the measure itself, in multi-measure format). This flag is overwritten by `storeLastMeasure` flag in group or device. This flag
is disabled by default.

For example in a device document stored in MongoDB will be extended with a subdocument named lastMeasure like this:

```json
{
"lastMeasure": {
"timestamp": "2025-01-09T10:35:33.079Z",
"measure": [
[
{
"name": "level",
"type": "Text",
"value": 33
}
]
]
}
}
```

### Configuration using environment variables

Expand Down Expand Up @@ -479,6 +506,7 @@ overrides.
| IOTA_DEFAULT_ENTITY_NAME_CONJUNCTION | `defaultEntityNameConjunction` |
| IOTA_RELAX_TEMPLATE_VALIDATION | `relaxTemplateValidation` |
| IOTA_EXPRESS_LIMIT | `expressLimit` |
| IOTA_STORE_LAST_MEASURE | `storeLastMeasure` |

Note:

Expand Down
3 changes: 3 additions & 0 deletions doc/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1777,6 +1777,7 @@ Config group is represented by a JSON object with the following fields:
| `payloadType` || string | | optional string value used to switch between **IoTAgent**, **NGSI-v2** and **NGSI-LD** measure payloads types. Possible values are: `iotagent`, `ngsiv2` or `ngsild`. The default is `iotagent`. |
| `transport` || `string` | | Transport protocol used by the group of devices to send updates, for the IoT Agents with multiple transport protocols. |
| `endpoint` || `string` | | Endpoint where the group of device is going to receive commands, if any. |
| `storeLastMeasure` || `boolean` | | Store in device last measure received. See more info [in this section](admin.md#storelastmeasure). False by default |

### Config group operations

Expand Down Expand Up @@ -1998,6 +1999,8 @@ the API resource fields and the same fields in the database model.
| `ngsiVersion` || `string` | | string value used in mixed mode to switch between **NGSI-v2** and **NGSI-LD** payloads. The default is `v2`. When not running in mixed mode, this field is ignored. |
| `payloadType` || `string` | | optional string value used to switch between **IoTAgent**, **NGSI-v2** and **NGSI-LD** measure payloads types. Possible values are: `iotagent`, `ngsiv2` or `ngsild`. The default is `iotagent`. |

| `storeLastMeasure` || `boolean` | | Store in device last measure received. See more info [in this section](admin.md#storelastmeasure). False by default. |

### Device operations

#### Retrieve devices /iot/devices `GET /iot/devices`
Expand Down
11 changes: 9 additions & 2 deletions lib/commonConfig.js
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ function processEnvironmentVariables() {
'IOTA_FALLBACK_PATH',
'IOTA_LD_SUPPORT_NULL',
'IOTA_LD_SUPPORT_DATASET_ID',
'IOTA_EXPRESS_LIMIT'
'IOTA_EXPRESS_LIMIT',
'IOTA_STORE_LAST_MEASURE'
];
const iotamVariables = [
'IOTA_IOTAM_URL',
Expand Down Expand Up @@ -474,6 +475,11 @@ function processEnvironmentVariables() {
} else {
config.expressLimit = config.expressLimit ? config.expressLimit : '1mb';
}
if (process.env.IOTA_STORE_LAST_MEASURE) {
config.storeLastMeasure = process.env.IOTA_STORE_LAST_MEASURE === 'true';
} else {
config.storeLastMeasure = config.storeLastMeasure === true;
}
}

function setConfig(newConfig) {
Expand Down Expand Up @@ -503,7 +509,8 @@ function getConfigForTypeInformation() {
multiCore: config.multiCore,
relaxTemplateValidation: config.relaxTemplateValidation,
defaultEntityNameConjunction: config.defaultEntityNameConjunction,
defaultType: config.defaultType
defaultType: config.defaultType,
storeLastMeasure: config.storeLastMeasure
};
return conf;
}
Expand Down
4 changes: 3 additions & 1 deletion lib/model/Device.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ const Device = new Schema({
autoprovision: Boolean,
explicitAttrs: Group.ExplicitAttrsType,
ngsiVersion: String,
payloadType: String
payloadType: String,
storeLastMeasure: Boolean,
lastMeasure: Object
});

function load() {
Expand Down
3 changes: 2 additions & 1 deletion lib/model/Group.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ const Group = new Schema({
defaultEntityNameConjunction: String,
ngsiVersion: String,
entityNameExp: String,
payloadType: String
payloadType: String,
storeLastMeasure: Boolean
});

function load() {
Expand Down
3 changes: 2 additions & 1 deletion lib/services/common/iotManagerService.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ function register(callback) {
entityNameExp: service.entityNameExp,
payloadType: service.payloadType,
endpoint: service.endpoint,
transport: service.transport
transport: service.transport,
storeLastMeasure: service.storeLastMeasure
};
}

Expand Down
32 changes: 32 additions & 0 deletions lib/services/devices/deviceRegistryMemory.js
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,40 @@ function getDevicesByAttribute(name, value, service, subservice, callback) {
}
}

function storeLastMeasure(measure, typeInformation, callback) {
if (
typeInformation &&
typeInformation.id &&
typeInformation.apikey &&
typeInformation.service &&
typeInformation.subservice
) {
getDevice(
typeInformation.id,
typeInformation.apikey,
typeInformation.service,
typeInformation.subservice,
function (error, data) {
if (!error) {
if (data) {
data.lastMeasure = measure;
}
if (callback) {
callback(null, data);
}
} else {
callback(error, null);
}
}
);
} else {
callback(null, null);
}
}

exports.getDevicesByAttribute = getDevicesByAttribute;
exports.store = storeDevice;
exports.storeLastMeasure = storeLastMeasure;
exports.update = update;
exports.remove = removeDevice;
exports.list = listDevices;
Expand Down
55 changes: 50 additions & 5 deletions lib/services/devices/deviceRegistryMongoDB.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ const attributeList = [
'explicitAttrs',
'ngsiVersion',
'subscriptions',
'payloadType'
'payloadType',
'storeLastMeasure'
];

/**
Expand Down Expand Up @@ -222,7 +223,7 @@ function findOneInMongoDB(queryParams, id, callback) {
* @param {String} subservice Division inside the service (optional).
*/
function getDeviceById(id, apikey, service, subservice, callback) {
let queryParams = {
const queryParams = {
id,
service,
subservice
Expand Down Expand Up @@ -254,9 +255,9 @@ function getDevice(id, apikey, service, subservice, callback) {

function getByNameAndType(name, type, service, servicepath, callback) {
context = fillService(context, { service, subservice: servicepath });
let optionsQuery = {
name: name,
service: service,
const optionsQuery = {
name,
service,
subservice: servicepath
};
if (type) {
Expand Down Expand Up @@ -319,6 +320,7 @@ function update(previousDevice, device, callback) {
data.timestamp = device.timestamp;
data.subscriptions = device.subscriptions;
data.payloadType = device.payloadType;
data.storeLastMeasure = device.storeLastMeasure;

/* eslint-disable-next-line new-cap */
const deviceObj = new Device.model(data);
Expand Down Expand Up @@ -392,8 +394,51 @@ function getDevicesByAttribute(name, value, service, subservice, callback) {
});
}

function storeLastMeasure(measure, typeInformation, callback) {
if (
typeInformation &&
typeInformation.id &&
typeInformation.apikey &&
typeInformation.service &&
typeInformation.subservice
) {
getDevice(
typeInformation.id,
typeInformation.apikey,
typeInformation.service,
typeInformation.subservice,
function (error, data) {
if (error) {
callback(error);
} else {
data.lastMeasure = { timestamp: new Date().toISOString(), measure };
/* eslint-disable-next-line new-cap */
const deviceObj = new Device.model(data);
deviceObj.isNew = false;
deviceObj
.save({})
.then((deviceDao) => {
callback(null, deviceDao.toObject());
})
.catch((error) => {
logger.debug(
fillService(context, deviceObj),
'Error storing device information: %s',
error
);
callback(new errors.InternalDbError(error));
});
}
}
);
} else {
callback(null, null);
}
}

exports.getDevicesByAttribute = alarmsInt(constants.MONGO_ALARM, getDevicesByAttribute);
exports.store = alarmsInt(constants.MONGO_ALARM, storeDevice);
exports.storeLastMeasure = alarmsInt(constants.MONGO_ALARM, storeLastMeasure);
exports.update = alarmsInt(constants.MONGO_ALARM, update);
exports.remove = alarmsInt(constants.MONGO_ALARM, removeDevice);
exports.list = alarmsInt(constants.MONGO_ALARM, listDevices);
Expand Down
12 changes: 10 additions & 2 deletions lib/services/devices/deviceService.js
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ function mergeDeviceWithConfiguration(fields, defaults, deviceData, configuratio
if (configuration && configuration.payloadType !== undefined && deviceData.payloadType === undefined) {
deviceData.payloadType = configuration.payloadType;
}
if (configuration && configuration.storeLastMeasure !== undefined && deviceData.storeLastMeasure === undefined) {
deviceData.storeLastMeasure = configuration.storeLastMeasure;
}
logger.debug(context, 'deviceData after merge with conf: %j', deviceData);
callback(null, deviceData);
}
Expand Down Expand Up @@ -285,7 +288,7 @@ function registerDevice(deviceObj, callback) {
let attrList = pluginUtils.getIdTypeServSubServiceFromDevice(deviceData);
attrList = deviceData.staticAttributes ? attrList.concat(deviceData.staticAttributes) : attrList;
attrList = configuration.staticAttributes ? attrList.concat(configuration.staticAttributes) : attrList;
let ctxt = expressionPlugin.extractContext(attrList);
const ctxt = expressionPlugin.extractContext(attrList);
try {
entityName = expressionPlugin.applyExpression(configuration.entityNameExp, ctxt, deviceData);
} catch (e) {
Expand Down Expand Up @@ -616,7 +619,7 @@ function findOrCreate(deviceId, apikey, group, callback) {
} else if (error.name === 'DEVICE_NOT_FOUND') {
const newDevice = {
id: deviceId,
apikey: apikey,
apikey,
service: group.service,
subservice: group.subservice,
type: group.type
Expand Down Expand Up @@ -692,6 +695,10 @@ function retrieveDevice(deviceId, apiKey, callback) {
}
}

function storeLastMeasure(measure, typeInformation, callback) {
config.getRegistry().storeLastMeasure(measure, typeInformation, callback);
}

exports.listDevices = intoTrans(context, checkRegistry)(listDevices);
exports.listDevicesWithType = intoTrans(context, checkRegistry)(listDevicesWithType);
exports.getDevice = intoTrans(context, checkRegistry)(getDevice);
Expand All @@ -708,4 +715,5 @@ exports.retrieveDevice = intoTrans(context, checkRegistry)(retrieveDevice);
exports.mergeDeviceWithConfiguration = mergeDeviceWithConfiguration;
exports.findOrCreate = findOrCreate;
exports.findConfigurationGroup = findConfigurationGroup;
exports.storeLastMeasure = storeLastMeasure;
exports.init = init;
4 changes: 3 additions & 1 deletion lib/services/devices/devices-NGSI-v2.js
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,9 @@ function updateRegisterDeviceNgsi2(deviceObj, previousDevice, entityInfoUpdated,
if ('transport' in newDevice && newDevice.transport !== undefined) {
oldDevice.transport = newDevice.transport;
}

if ('storeLastMeasure' in newDevice && newDevice.storeLastMeasure !== undefined) {
oldDevice.storeLastMeasure = newDevice.storeLastMeasure;
}
callback(null, oldDevice);
} else {
callback(new errors.DeviceNotFound(newDevice.id, newDevice));
Expand Down
3 changes: 2 additions & 1 deletion lib/services/groups/groupRegistryMongoDB.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ const attributeList = [
'defaultEntityNameConjunction',
'ngsiVersion',
'entityNameExp',
'payloadType'
'payloadType',
'storeLastMeasure'
];

function createGroup(group, callback) {
Expand Down
11 changes: 10 additions & 1 deletion lib/services/ngsi/ngsiService.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
const async = require('async');
const apply = async.apply;
const statsRegistry = require('../stats/statsRegistry');
const deviceService = require('../devices/deviceService');
const intoTrans = require('../common/domain').intoTrans;
const fillService = require('./../common/domain').fillService;
const errors = require('../../errors');
Expand Down Expand Up @@ -68,8 +69,16 @@ function init() {
* @param {String} token User token to identify against the PEP Proxies (optional).
*/
function sendUpdateValue(entityName, attributes, typeInformation, token, callback) {
// check config about store last measure
const newCallback = statsRegistry.withStats('updateEntityRequestsOk', 'updateEntityRequestsError', callback);
entityHandler.sendUpdateValue(entityName, attributes, typeInformation, token, newCallback);
if (typeInformation.storeLastMeasure) {
logger.debug(context, 'StoreLastMeasure for %j', typeInformation);
deviceService.storeLastMeasure(attributes, typeInformation, function () {
return entityHandler.sendUpdateValue(entityName, attributes, typeInformation, token, newCallback);
});
} else {
entityHandler.sendUpdateValue(entityName, attributes, typeInformation, token, newCallback);
}
}

/**
Expand Down
Loading

0 comments on commit 3775aa9

Please sign in to comment.