diff --git a/lib/multiWorker.js b/lib/multiWorker.js index cc68db2a..de4e4fa6 100644 --- a/lib/multiWorker.js +++ b/lib/multiWorker.js @@ -6,7 +6,7 @@ var eventLoopDelay = require(__dirname + '/eventLoopDelay'); var multiWorker = function(options, jobs){ var self = this; - + var defaults = self.defaults(); for(var i in defaults){ if(options[i] === null || options[i] === undefined){ @@ -15,7 +15,7 @@ var multiWorker = function(options, jobs){ } if(options.connection.redis && typeof options.connection.redis.setMaxListeners === 'function'){ - options.connection.redis.setMaxListeners(options.maxTaskProcessors + 1); + options.connection.redis.setMaxListeners(options.connection.redis.getMaxListeners() + options.maxTaskProcessors); } self.workers = []; @@ -27,10 +27,10 @@ var multiWorker = function(options, jobs){ self.eventLoopBlocked = true; self.eventLoopDelay = Infinity; self.eventLoopCheckCounter = 0; - + eventLoopDelay( - self.options.maxEventLoopDelay, - self.options.checkTimeout, + self.options.maxEventLoopDelay, + self.options.checkTimeout, function(blocked, ms){ self.eventLoopBlocked = blocked; self.eventLoopDelay = ms; @@ -58,14 +58,14 @@ multiWorker.prototype.startWorker = function(callback){ var self = this; var id = (self.workers.length + 1); var worker = new Worker({ - connection: self.options.connection, + connection: self.options.connection, queues: self.options.queues, timeout: self.options.timeout, name: self.options.name + ":" + process.pid + '+' + id }, self.jobs); worker.connect(function(){ worker.start(); - worker.workerCleanup(function(error){ + worker.workerCleanup(function(error){ if(error){ self.emit('error', error); } process.nextTick(callback); }); @@ -109,16 +109,16 @@ multiWorker.prototype.checkWorkers = function(callback){ else if(self.eventLoopBlocked && self.workers.length > self.options.minTaskProcessors){ verb = '-'; } else if(self.eventLoopBlocked && self.workers.length == self.options.minTaskProcessors){ verb = 'x'; } else if( - !self.eventLoopBlocked && - self.workers.length < self.options.maxTaskProcessors && + !self.eventLoopBlocked && + self.workers.length < self.options.maxTaskProcessors && ( - self.workers.length === 0 || + self.workers.length === 0 || workingCount / self.workers.length > 0.5 - ) + ) ){ verb = '+'; } else if( - !self.eventLoopBlocked && - self.workers.length > self.options.minTaskProcessors && + !self.eventLoopBlocked && + self.workers.length > self.options.minTaskProcessors && workingCount / self.workers.length < 0.5 ){ verb = '-'; @@ -141,9 +141,9 @@ multiWorker.prototype.checkWorkers = function(callback){ worker.end(function(err){ touched--; self.cleanupWorker(worker); - if(touched === 0){ + if(touched === 0){ self.workers = []; - callback(err, verb, self.eventLoopDelay); + callback(err, verb, self.eventLoopDelay); } }); }); @@ -215,7 +215,7 @@ multiWorker.prototype.end = function(callback){ multiWorker.prototype.stopWait = function(callback){ var self = this; - if(self.workers.length === 0 && self.working === false){ + if(self.workers.length === 0 && self.working === false){ clearTimeout(self.checkTimer); setTimeout(function(){ if(typeof callback === 'function'){ callback(); } diff --git a/package.json b/package.json index 54dd9527..65fb2fbd 100644 --- a/package.json +++ b/package.json @@ -3,7 +3,7 @@ "name": "node-resque", "description": "an opinionated implementation of resque in node", "license": "Apache-2.0", - "version": "2.0.5", + "version": "2.0.6", "homepage": "http://github.com/taskrabbit/node-resque", "repository": { "type": "git",