Skip to content

Commit

Permalink
add more listners before warning
Browse files Browse the repository at this point in the history
  • Loading branch information
evantahler committed May 29, 2016
1 parent 7a9a9a5 commit 7637f2d
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 17 deletions.
32 changes: 16 additions & 16 deletions lib/multiWorker.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ var eventLoopDelay = require(__dirname + '/eventLoopDelay');

var multiWorker = function(options, jobs){
var self = this;

var defaults = self.defaults();
for(var i in defaults){
if(options[i] === null || options[i] === undefined){
Expand All @@ -15,7 +15,7 @@ var multiWorker = function(options, jobs){
}

if(options.connection.redis && typeof options.connection.redis.setMaxListeners === 'function'){
options.connection.redis.setMaxListeners(options.maxTaskProcessors + 1);
options.connection.redis.setMaxListeners(options.connection.redis.getMaxListeners() + options.maxTaskProcessors);
}

self.workers = [];
Expand All @@ -27,10 +27,10 @@ var multiWorker = function(options, jobs){
self.eventLoopBlocked = true;
self.eventLoopDelay = Infinity;
self.eventLoopCheckCounter = 0;

eventLoopDelay(
self.options.maxEventLoopDelay,
self.options.checkTimeout,
self.options.maxEventLoopDelay,
self.options.checkTimeout,
function(blocked, ms){
self.eventLoopBlocked = blocked;
self.eventLoopDelay = ms;
Expand Down Expand Up @@ -58,14 +58,14 @@ multiWorker.prototype.startWorker = function(callback){
var self = this;
var id = (self.workers.length + 1);
var worker = new Worker({
connection: self.options.connection,
connection: self.options.connection,
queues: self.options.queues,
timeout: self.options.timeout,
name: self.options.name + ":" + process.pid + '+' + id
}, self.jobs);
worker.connect(function(){
worker.start();
worker.workerCleanup(function(error){
worker.workerCleanup(function(error){
if(error){ self.emit('error', error); }
process.nextTick(callback);
});
Expand Down Expand Up @@ -109,16 +109,16 @@ multiWorker.prototype.checkWorkers = function(callback){
else if(self.eventLoopBlocked && self.workers.length > self.options.minTaskProcessors){ verb = '-'; }
else if(self.eventLoopBlocked && self.workers.length == self.options.minTaskProcessors){ verb = 'x'; }
else if(
!self.eventLoopBlocked &&
self.workers.length < self.options.maxTaskProcessors &&
!self.eventLoopBlocked &&
self.workers.length < self.options.maxTaskProcessors &&
(
self.workers.length === 0 ||
self.workers.length === 0 ||
workingCount / self.workers.length > 0.5
)
)
){ verb = '+'; }
else if(
!self.eventLoopBlocked &&
self.workers.length > self.options.minTaskProcessors &&
!self.eventLoopBlocked &&
self.workers.length > self.options.minTaskProcessors &&
workingCount / self.workers.length < 0.5
){
verb = '-';
Expand All @@ -141,9 +141,9 @@ multiWorker.prototype.checkWorkers = function(callback){
worker.end(function(err){
touched--;
self.cleanupWorker(worker);
if(touched === 0){
if(touched === 0){
self.workers = [];
callback(err, verb, self.eventLoopDelay);
callback(err, verb, self.eventLoopDelay);
}
});
});
Expand Down Expand Up @@ -215,7 +215,7 @@ multiWorker.prototype.end = function(callback){

multiWorker.prototype.stopWait = function(callback){
var self = this;
if(self.workers.length === 0 && self.working === false){
if(self.workers.length === 0 && self.working === false){
clearTimeout(self.checkTimer);
setTimeout(function(){
if(typeof callback === 'function'){ callback(); }
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"name": "node-resque",
"description": "an opinionated implementation of resque in node",
"license": "Apache-2.0",
"version": "2.0.5",
"version": "2.0.6",
"homepage": "http://github.com/taskrabbit/node-resque",
"repository": {
"type": "git",
Expand Down

0 comments on commit 7637f2d

Please sign in to comment.