diff --git a/lib/bindConnectors/index.js b/lib/bindConnectors/index.js index 230b2c93..39ea2ba1 100644 --- a/lib/bindConnectors/index.js +++ b/lib/bindConnectors/index.js @@ -26,24 +26,38 @@ module.exports = function (config, options) { // Dont wait for event loop to be empty when callback is called context.callbackWaitsForEmptyEventLoop = false; + + /* + If context.getRemainingTimeInMillis is available, set it's value + minus 5 seconds, and make sure it is positive; else set the + timeout to 0 (such that it is falsy) + */ + var timeRemaining = ( + _.isPlainObject(context) && _.isFunction(context.getRemainingTimeInMillis) ? + context.getRemainingTimeInMillis() : + 0 + ); + var promiseTimeout = timeRemaining - 5000; + promiseTimeout = ( promiseTimeout < 1 ? 0 : promiseTimeout ); + + // The array of events should all be executed in parallel var promises = _.map(events, function (event) { - return when.promise(function (resolve, reject) { + + var messagePromise = when.promise(function (resolve, reject) { // Handle error invalid payload if (!event.header || !event.header.message) { - return resolve( - formatError( - { - headers: {}, - body: { - code: 'invalid_input', - message: 'Invalid message received.' - } - }, - event - ) - ); + return resolve(formatError( + { + headers: {}, + body: { + code: 'invalid_input', + message: 'Invalid message received.' + } + }, + event + )); } @@ -63,18 +77,16 @@ module.exports = function (config, options) { // Handle message not existing if (!_.isFunction(hooks[event.header.message])) { - return resolve( - formatError( - { - headers: {}, - body: { - code: 'not_implemented', - message: 'Could not find a valid message handler for ' + event.header.message - } - }, - event - ) - ); + return resolve(formatError( + { + headers: {}, + body: { + code: 'not_implemented', + message: 'Could not find a valid message handler for ' + event.header.message + } + }, + event + )); } // Got this far? All good, let's run @@ -87,19 +99,52 @@ module.exports = function (config, options) { .done( function (response) { - resolve( - ( _.isError(response.body) ? - formatError(response, event) : - formatMessage(event, response, response.version) - ) - ); - }, function (response) { + resolve(( + _.isError(response.body) ? + formatError(response, event) : + formatMessage(event, response, response.version) + )); + }, + function (response) { resolve(formatError(response, event)); } ); - }); + + //If a non-0 value is present, add a timeout condition to the promise + if (promiseTimeout) { + + return messagePromise + + .timeout(promiseTimeout) + + //Perform this catch only if it is due to timeout + .catch(when.TimeoutError, function () { + // eslint-disable-next-line no-console + console.warn('The promise has not been closed within the time limit.', event.header.message); + //NOTE: this reject will force this lambda function to error (not the operation only) + return when.reject(formatError( + { + headers: {}, + body: { + code: '#fatal_error', + message: 'The operation timed out.', + payload: { + reason: 'The operation has timed out due to the promise not closing (resolving/rejecting) within the time limit.', + operation: event.header.message + } + } + }, + event + )); + }); + + } + + + return messagePromise; + }); // Wait for all of the above to finish, then resolve diff --git a/package.json b/package.json index 91c8f84e..e212e0e3 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "@trayio/falafel", - "version": "1.14.1", - "description": "", + "version": "1.15.0", + "description": "A framework for developing and running connectors.", "main": "lib/index.js", "scripts": { "test": "echo \"Error: no test specified\" && exit 1" diff --git a/test/bindConnectors/index.js b/test/bindConnectors/index.js index ba5b355f..3c48021d 100644 --- a/test/bindConnectors/index.js +++ b/test/bindConnectors/index.js @@ -1,10 +1,329 @@ var assert = require('assert'); -var _ = require('lodash'); +global._ = require('lodash'); +global.when = require('when'); var bindConnectors = require('../../lib/bindConnectors'); +/* eslint-disable no-console */ +describe('#bindConnectors', function () { -describe.skip('#bindConnectors', function () { + global.falafel = {}; + var config = [], + options = {}; + config.push({ + name: 'test_connector', + globalModel: {}, + globalSchema: {}, + messages: [ + { + name: 'test_op', + schema: { + name: 'test_op' + }, + model: function (params) { + return params; + }, + }, + { + name: 'test_op_promise', + schema: { + name: 'test_op_promise' + }, + model: function (params) { + return when.resolve(params); + }, + }, + { + name: 'test_fail_op', + schema: { + name: 'test_fail_op' + }, + model: function (params) { + throw new Error('Throw error.'); + }, + }, + { + name: 'test_fail_op_promise', + schema: { + name: 'test_fail_op_promise' + }, + model: function (params) { + return when.reject({ + code: '#connector_error', + message: 'Reject error' + }); + }, + } + ] -}); \ No newline at end of file + }); + + var boundConnectors = bindConnectors(config, options); + + it('should pass simple function operation run', function (done) { + + boundConnectors( + [ + { + id: 'testID', + header: { + message: 'test_op' + }, + body: { + hello: 'world' + } + } + ], + { + + }, + function (err, resArr) { + if (err) { + assert.fail(err); + } else { + assert.deepEqual(resArr[0].body, { hello: 'world' }); + } + done(); + } + ); + + }); + + it('should pass simple promise function operation run', function (done) { + + boundConnectors( + [ + { + id: 'testID', + header: { + message: 'test_op_promise' + }, + body: { + hello: 'world' + } + } + ], + { + + }, + function (err, resArr) { + if (err) { + assert.fail(err); + } else { + assert.deepEqual(resArr[0].body, { hello: 'world' }); + } + done(); + } + ); + + }); + + it('should operation fail simple function operation run erring', function (done) { + + boundConnectors( + [ + { + id: 'testID', + header: { + message: 'test_fail_op' + }, + body: { + hello: 'world' + } + } + ], + { + + }, + function (err, resArr) { + if (err) { + assert.fail(resArr); + } else { + assert(resArr[0].header.error); + assert.deepEqual(resArr[0].body.message, 'Throw error.'); + } + done(); + } + ); + + }); + + it('should operation fail simple promise function operation run rejecting', function (done) { + + boundConnectors( + [ + { + id: 'testID', + header: { + message: 'test_fail_op_promise' + }, + body: { + hello: 'world' + } + } + ], + { + + }, + function (err, resArr) { + if (err) { + assert.fail(resArr); + } else { + assert(resArr[0].header.error); + assert.deepEqual( + resArr[0].body, + { + code: '#connector_error', + message: 'Reject error' + } + ); + } + done(); + } + ); + + }); + + + //Timeout tests + var timeoutConnectors = bindConnectors( + [ + { + name: 'timeout_connector', + globalModel: {}, + globalSchema: {}, + messages: [ + { + name: 'timeout_op', + schema: { + name: 'timeout_op' + }, + model: function (params) { + return params; + }, + }, + { + name: 'timeout_op_promise', + schema: { + name: 'timeout_op_promise' + }, + model: function (params) { + return when.resolve(params); + }, + }, + { + name: 'long_timeout_op_promise', + schema: { + name: 'timeout_op_promise' + }, + model: function (params) { + return when.promise(function (resolve, reject) { + + setTimeout(function () { + resolve(params); + }, 21000); + + }); + }, + } + ] + + } + ], + options + ); + + it('should pass simple function operation run within timeout limit', function (done) { + + timeoutConnectors( + [ + { + id: 'testID', + header: { + message: 'timeout_op' + }, + body: { + hello: 'world' + } + } + ], + { + + }, + function (err, resArr) { + if (err) { + assert.fail(err); + } else { + assert.deepEqual(resArr[0].body, { hello: 'world' }); + } + done(); + } + ); + + }); + + it('should pass simple function promise operation run within timeout limit', function (done) { + + timeoutConnectors( + [ + { + id: 'testID', + header: { + message: 'timeout_op_promise' + }, + body: { + hello: 'world' + } + } + ], + { + + }, + function (err, resArr) { + if (err) { + assert.fail(err); + } else { + assert.deepEqual(resArr[0].body, { hello: 'world' }); + } + done(); + } + ); + + }); + + it('should error on lambda function level for operation not ending within time limit', function (done) { + + this.timeout(25000); + + timeoutConnectors( + [ + { + id: 'testID', + header: { + message: 'long_timeout_op_promise' + }, + body: { + hello: 'world' + } + } + ], + { + getRemainingTimeInMillis: function () { + return 20000; //20s + } + }, + function (err, resArr) { + if (err) { + assert(err); + } else { + console.log(resArr); + assert.fail(resArr); + } + done(); + } + ); + + }); + +}); diff --git a/test/falafel.js b/test/falafel.js index ee994d65..44ff31f4 100644 --- a/test/falafel.js +++ b/test/falafel.js @@ -19,9 +19,9 @@ describe('falafel', function () { directory: __dirname+'/sample' }); - assert(_.isObject(GLOBAL.falafel)); - assert(GLOBAL._); - assert(GLOBAL.when); + assert(_.isObject(global.falafel)); + assert(global._); + assert(global.when); }); it('should create the connectors.json in dev mode', function () {