Skip to content

Commit

Permalink
Merge pull request #136 from gcoonrod/middleware
Browse files Browse the repository at this point in the history
Add basic support for AH Task Middleware
  • Loading branch information
evantahler committed May 17, 2016
2 parents 21434f6 + da7919f commit 850f4d8
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 9 deletions.
21 changes: 12 additions & 9 deletions lib/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ var worker = function(options, jobs){
self.name = self.options.name;
self.queues = self.options.queues;
self.error = null;
self.result = null;
self.ready = false;
self.running = false;
self.working = false;
Expand Down Expand Up @@ -107,6 +108,7 @@ worker.prototype.poll = function(nQueue, callback) {
if(!err && resp){
var currentJob = JSON.parse(resp.toString());
if(self.options.looping){
self.result = null;
self.perform(currentJob);
}else{
callback(currentJob);
Expand Down Expand Up @@ -141,13 +143,13 @@ worker.prototype.perform = function(job, callback) {
self.job = job;
d.on('error', function(err){
self.error = err;
self.completeJob(null, true, callback);
self.completeJob(true, callback);
});
d.run(function(){
self.error = null;
if (!self.jobs[job["class"]]){
self.error = new Error("No job defined for class '"+job["class"]+"'");
self.completeJob(null, true, callback);
self.completeJob(true, callback);
}else{
var cb = self.jobs[job["class"]].perform;
self.emit('job', self.queue, job);
Expand All @@ -159,7 +161,7 @@ worker.prototype.perform = function(job, callback) {
if(returnCounter !== 1){
self.emit('failure', self.queue, job, callbackError);
}else if(toRun === false){
self.completeJob(null, false, callback);
self.completeJob(false, callback);
}else{
self.error = err;
self.workingOn(job);
Expand All @@ -176,13 +178,14 @@ worker.prototype.perform = function(job, callback) {
self.emit('failure', self.queue, job, callbackError);
}else{
self.error = err;
self.result = result;
pluginRunner.runPlugins(self, 'after_perform', job["class"], self.queue, self.jobs[job["class"]], job.args, function(e, toRun){
if(self.error === undefined && e){ self.error = e; }
returnCounter++;
if(returnCounter !== 3){
self.emit('failure', self.queue, job, callbackError);
}else{
self.completeJob(result, true, callback);
self.completeJob(true, callback);
}
});
}
Expand All @@ -203,19 +206,19 @@ worker.prototype.perform = function(job, callback) {
});
}else{
self.error = new Error("Missing Job: " + job["class"]);
self.completeJob(null, true, callback);
self.completeJob(true, callback);
}
}
});
};

worker.prototype.completeJob = function(result, toRespond, callback){
worker.prototype.completeJob = function(toRespond, callback){
var self = this;
var job = self.job;
if(self.error){
self.fail(self.error, job);
}else if(toRespond){
self.succeed(result, job);
self.succeed(job);
}
self.doneWorking();
self.job = null;
Expand All @@ -228,11 +231,11 @@ worker.prototype.completeJob = function(result, toRespond, callback){
}));
};

worker.prototype.succeed = function(result, job) {
worker.prototype.succeed = function(job) {
var self = this;
self.connection.redis.incr(self.connection.key('stat', 'processed'));
self.connection.redis.incr(self.connection.key('stat', 'processed', self.name));
self.emit('success', self.queue, job, result);
self.emit('success', self.queue, job, self.result);
};

worker.prototype.fail = function(err, job) {
Expand Down
3 changes: 3 additions & 0 deletions test/core/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ describe('worker', function(){
q.should.equal(specHelper.queue);
job.class.should.equal('add');
result.should.equal(3);
worker.result.should.equal(result);

worker.removeAllListeners('success');
done();
Expand All @@ -169,6 +170,8 @@ describe('worker', function(){
it('job arguments are immutable', function(done){
var listener = worker.on('success', function(q, job, result){
result.a.should.equal('starting value');
worker.result.should.equal(result);

worker.removeAllListeners('success');
done();
});
Expand Down

0 comments on commit 850f4d8

Please sign in to comment.