Skip to content

Commit

Permalink
queue.locks() and queue.delLock()
Browse files Browse the repository at this point in the history
  • Loading branch information
evantahler committed Jul 8, 2016
1 parent ffc51d4 commit 515ef63
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 7 deletions.
11 changes: 6 additions & 5 deletions lib/plugins/jobLock.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ jobLock.prototype.before_perform = function(callback){
var now = Math.round(new Date().getTime() / 1000);
var timeout = now + self.lock_timeout() + 1;
self.queueObject.connection.redis.setnx(key, timeout, function(err, setCallback){
if(err){ return callback(err); }
if(setCallback === true || setCallback === 1){
self.queueObject.connection.redis.expire(key, self.lock_timeout());
callback(null, true);
}else{
self.reEnqueue(function(){
callback(null, false);
self.reEnqueue(function(err){
callback(err, false);
});
}
});
Expand All @@ -44,7 +45,7 @@ jobLock.prototype.after_perform = function(callback){
var self = this;
var key = self.key();
self.queueObject.connection.redis.del(key, function(err){
callback(null, true);
callback(null, err);
});
};

Expand All @@ -55,8 +56,8 @@ jobLock.prototype.after_perform = function(callback){
jobLock.prototype.reEnqueue = function(callback){
var self = this;
setTimeout(function(){
self.queueObject.enqueue(self.queue, self.func, self.args, function(){
callback();
self.queueObject.enqueue(self.queue, self.func, self.args, function(err){
callback(err);
});
}, self.enqueue_timeout() );
};
Expand Down
5 changes: 4 additions & 1 deletion lib/plugins/queueLock.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,18 @@ queueLock.prototype.before_enqueue = function(callback){
var now = Math.round(new Date().getTime() / 1000);
var timeout = now + self.lock_timeout() + 1;
self.queueObject.connection.redis.setnx(key, timeout, function(err, setCallback){
if(err){ return callback(err); }
if(setCallback === true || setCallback === 1){
callback(null, true);
}else{
self.queueObject.connection.redis.get(key, function(err, redisTimeout){
if(err){ return callback(err); }
redisTimeout = parseInt(redisTimeout);
if(now <= redisTimeout){
callback(null, false);
}else{
self.queueObject.connection.redis.set(key, timeout, function(err){
self.queueObject.connection.redis.expire(key, timeout);
callback(err, !err);
});
}
Expand All @@ -50,7 +53,7 @@ queueLock.prototype.after_perform = function(callback){
var self = this;
var key = self.key();
self.queueObject.connection.redis.del(key, function(err){
callback(null, true);
callback(err, true);
});
};

Expand Down
33 changes: 33 additions & 0 deletions lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,39 @@ queue.prototype.allDelayed = function(callback){
});
};

queue.prototype.locks = function(callback){
var self = this;
var keys = [];
var data = {};

self.connection.redis.keys(self.connection.key('lock:*'), function(err, _keys){
if(err){ return callback(err); }
keys = keys.concat(_keys);
self.connection.redis.keys(self.connection.key('workerslock:*'), function(err, _keys){
if(err){ return callback(err); }
keys = keys.concat(_keys);

if(keys.length === 0){ return callback(null, data); }

self.connection.redis.mget(keys, function(err, values){
if(err){return callback(err); }
for (var i = 0; i < keys.length; i++){
var k = keys[i];
k = k.replace(self.connection.key(''), '');
data[k] = values[i];
}
callback(null, data);
});

});
});
};

queue.prototype.delLock = function(key, callback){
var self = this;
self.connection.redis.del(self.connection.key(key), callback);
};

queue.prototype.workers = function(callback){
var self = this;
var workers = {};
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"name": "node-resque",
"description": "an opinionated implementation of resque in node",
"license": "Apache-2.0",
"version": "2.0.8",
"version": "2.0.9",
"homepage": "http://github.com/taskrabbit/node-resque",
"repository": {
"type": "git",
Expand Down
30 changes: 30 additions & 0 deletions test/core/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,36 @@ describe('queue', function(){
});
});

describe('locks', function(){
beforeEach(function(done){ queue.connection.redis.set(queue.connection.key('lock:lists:queueName:jobName:[{}]'), 123, done); });
beforeEach(function(done){ queue.connection.redis.set(queue.connection.key('workerslock:lists:queueName:jobName:[{}]'), 456, done); });

afterEach(function(done){ queue.connection.redis.del(queue.connection.key('lock:lists:queueName:jobName:[{}]'), done); });
afterEach(function(done){ queue.connection.redis.del(queue.connection.key('workerslock:lists:queueName:jobName:[{}]'), done); });

it('can get locks', function(done){
queue.locks(function(err, locks){
should.not.exist(err);
Object.keys(locks).length.should.equal(2);
locks['lock:lists:queueName:jobName:[{}]'].should.equal('123');
locks['workerslock:lists:queueName:jobName:[{}]'].should.equal('456');
done();
});
});

it('can remove locks', function(done){
queue.locks(function(err, locks){
should.not.exist(err);
Object.keys(locks).length.should.equal(2);
queue.delLock('workerslock:lists:queueName:jobName:[{}]', function(err, count){
should.not.exist(err);
count.should.equal(1);
done();
});
});
});
});

describe('failed job managment', function(){

beforeEach(function(done){
Expand Down

0 comments on commit 515ef63

Please sign in to comment.