Skip to content

Commit

Permalink
Ensure that readable stream gets correctly paused. Fixes #8
Browse files Browse the repository at this point in the history
  • Loading branch information
harrisiirak committed Sep 3, 2015
1 parent 5348cfb commit dc1891d
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 8 deletions.
21 changes: 17 additions & 4 deletions lib/webhdfs.js
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ WebHDFS.prototype.createWriteStream = function createWriteStream (path, append,

var self = this;
var stream = null;
var canResume = true;
var params = extend({
method: append ? 'POST' : 'PUT',
url: endpoint,
Expand All @@ -544,6 +545,8 @@ WebHDFS.prototype.createWriteStream = function createWriteStream (path, append,
}
});

canResume = true; // Enable resume

stream.pipe(upload);
stream.resume();
}
Expand All @@ -563,13 +566,23 @@ WebHDFS.prototype.createWriteStream = function createWriteStream (path, append,
});

req.on('pipe', function onPipe (src) {
// Unpipe initial request
src.unpipe(req);
req.end();

// Pause read stream
stream = src;
stream.pause();

// This is not an elegant solution but here we go
// Basically we don't allow pipe() method to resume reading input
// and set internal _readableState.flowing to false
canResume = false;
stream.on('resume', function () {
if (!canResume) {
stream._readableState.flowing = false;
}
});

// Unpipe initial request
src.unpipe(req);
req.end();
});

return req;
Expand Down
9 changes: 5 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "webhdfs",
"version": "0.2.0",
"version": "0.2.1",
"description": "Node.js WebHDFS REST API client",
"main": "lib/webhdfs.js",
"scripts": {
Expand All @@ -27,8 +27,9 @@
"webhdfs-proxy-memory": "^0.1.2"
},
"dependencies": {
"request": "~2.40.0",
"extend": "~1.3.0",
"buffer-stream-reader": "~0.1.1"
"buffer-stream-reader": "~0.1.1",
"extend": "^3.0.0",
"mocha": "^2.3.0",
"request": "^2.61.0"
}
}

0 comments on commit dc1891d

Please sign in to comment.