Skip to content

Commit

Permalink
Merge pull request #11 from PavelVanecek/master
Browse files Browse the repository at this point in the history
support for overriding request params
  • Loading branch information
harrisiirak committed Sep 2, 2015
2 parents ea24480 + d7ab9af commit 3bbb166
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 10 deletions.
20 changes: 11 additions & 9 deletions lib/webhdfs.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ var BufferStreamReader = require('buffer-stream-reader');
* Initializes new WebHDFS instance
*
* @param {Object} [opts]
* @param {Object} [requestParams]
* @returns {WebHDFS}
*
* @constructor
*/
function WebHDFS (opts) {
function WebHDFS (opts, requestParams) {
if (!(this instanceof WebHDFS)) {
return new WebHDFS(opts);
return new WebHDFS(opts, requestParams);
}

[ 'user', 'host', 'port', 'path' ].some(function iterate (property) {
Expand All @@ -30,6 +31,7 @@ function WebHDFS (opts) {
}
});

this._requestParams = requestParams;
this._opts = opts;
this._url = {
protocol: opts.protocol || 'http',
Expand Down Expand Up @@ -162,7 +164,7 @@ WebHDFS.prototype._sendRequest = function _sendRequest (method, url, opts, callb
method: method,
url: url,
json: true
}, opts), function onComplete(err, res, body) {
}, this._requestParams, opts), function onComplete(err, res, body) {
if (err) {
return callback && callback(err);
}
Expand Down Expand Up @@ -519,11 +521,11 @@ WebHDFS.prototype.createWriteStream = function createWriteStream (path, append,

var self = this;
var stream = null;
var params = {
var params = extend({
method: append ? 'POST' : 'PUT',
url: endpoint,
json: true
};
}, this._requestParams);

var req = request(params, function (err, res, body) {
// Handle redirect only if there was not an error (e.g. res is defined)
Expand Down Expand Up @@ -612,11 +614,11 @@ WebHDFS.prototype.createReadStream = function createReadStream (path, opts) {
var self = this;
var endpoint = this._getOperationEndpoint('open', path, opts);
var stream = null;
var params = {
var params = extend({
method: 'GET',
url: endpoint,
json: true
};
}, this._requestParams);

var req = request(params);
req.on('error', function (err) {
Expand Down Expand Up @@ -742,12 +744,12 @@ WebHDFS.prototype.unlink = function writeFile (path, recursive, callback) {
WebHDFS.prototype.rmdir = WebHDFS.prototype.unlink;

module.exports = {
createClient: function createClient (opts) {
createClient: function createClient (opts, requestParams) {
return new WebHDFS(extend({
user: 'webuser',
host: 'localhost',
port: '50070',
path: '/webhdfs/v1'
}, opts));
}, opts), requestParams);
}
};
82 changes: 81 additions & 1 deletion test/webhdfs.js
Original file line number Diff line number Diff line change
Expand Up @@ -184,4 +184,84 @@ describe('WebHDFS', function () {
done();
});
});
});

it('should support optional opts', function (done) {
var myOpts = {
"user.name": "testuser"
}
hdfs.writeFile(path + '/file-1', 'random data', myOpts, function (err) {
demand(err).be.null();
done();
});
});

});

describe('WebHDFS with requestParams', function() {
var path = '/files/' + Math.random();
var hdfs = WebHDFS.createClient({
user: process.env.USER,
port: 45001
}, {
headers: {
'X-My-Custom-Header': 'Kerberos'
}
});

this.timeout(10000);

before(function (done) {
var opts = {
path: '/webhdfs/v1',
http: {
port: 45001
}
};

WebHDFSProxy.createServer(opts, WebHDFSProxyMemoryStorage, done);
});

it('should override request() options', function (done) {
var localFileStream = fs.createReadStream(__filename);
var remoteFileStream = hdfs.createWriteStream(path + '/file-2');
var spy = sinon.spy();

localFileStream.pipe(remoteFileStream);
remoteFileStream.on('error', spy);

remoteFileStream.on('response', function(response) {
var customHeader = response.req.getHeader('X-My-Custom-Header');
demand(customHeader).equal('Kerberos');
demand(spy.called).be.falsy();
done();
})

});

it('should pass requestParams to _sendRequest', function (done) {
var req = hdfs.readdir('/');

req.on('response', function(response) {
var customHeader = response.req.getHeader('X-My-Custom-Header');
demand(customHeader).equal('Kerberos');
done();
});
});

it('should not override explicit opts with _sendRequest', function (done) {
var mostSpecificParams = {
headers: {
'X-My-Custom-Header': 'Bear'
}
}

var endpoint = hdfs._getOperationEndpoint('liststatus', '/file-2');

hdfs._sendRequest('GET', endpoint, mostSpecificParams, function(err, response, body) {
var customHeader = response.req.getHeader('X-My-Custom-Header');
demand(customHeader).equal('Bear');
done(err)
});
});

});

0 comments on commit 3bbb166

Please sign in to comment.