Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle clients going away without relinquishing the lock #5

Merged
merged 6 commits into from
Mar 18, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.project
*.gem
.rspec
.bundle
.rbenv-version
3 changes: 2 additions & 1 deletion Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ group :test do
gem 'redis'
gem 'rspec'
gem 'rake'
end
gem 'pry'
end
8 changes: 8 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
GEM
remote: http://rubygems.org/
specs:
coderay (1.0.7)
diff-lcs (1.1.2)
method_source (0.8)
pry (0.9.10)
coderay (~> 1.0.5)
method_source (~> 0.8)
slop (~> 3.3.1)
rake (0.9.2)
redis (2.2.2)
rspec (2.6.0)
Expand All @@ -12,11 +18,13 @@ GEM
rspec-expectations (2.6.0)
diff-lcs (~> 1.1.2)
rspec-mocks (2.6.0)
slop (3.3.2)

PLATFORMS
ruby

DEPENDENCIES
pry
rake
redis
rspec
133 changes: 84 additions & 49 deletions lib/redis/semaphore.rb
Original file line number Diff line number Diff line change
@@ -1,89 +1,124 @@
require 'redis'

class Redis
class Semaphore
class InconsistentStateError < StandardError
end

attr_reader :resources
class Semaphore

# RedisSempahore.new(:my_semaphore, 5, myRedis)
# RedisSemaphore.new(:my_semaphore, myRedis)
#stale_client_timeout is the threshold of time before we assume
#that something has gone terribly wrong with a client and we
#invalidate it's lock.
#Default is nil for which we don't check for stale clients
# RedisSemaphore.new(:my_semaphore, :stale_client_timeout => 30, :redis => myRedis)
# RedisSempahore.new(:my_semaphore, :redis => myRedis)
# RedisSemaphore.new(:my_semaphore, :resources => 1, :redis => myRedis)
# RedisSemaphore.new(:my_semaphore, :connection => "", :port => "")
# RedisSemaphore.new(:my_semaphore, :path => "bla")
def initialize(*args)
raise "Need at least two arguments" if args.size < 2

@locked = false
@name = args.shift.to_s
@redis = args.pop
if !(@redis.is_a?(Redis) || (defined?(Redis::Namespace) && @redis.is_a?(Redis::Namespace)))
@redis = Redis.new(@redis)
end
@resources = args.pop || 1

def initialize(name, opts={})
@name = name
@resources = opts.delete(:resources)
@resources ||= 1
@stale_client_timeout = opts.delete(:stale_client_timeout)
@redis = opts.delete(:redis)
@redis ||= Redis.new(opts)
@namespace = opts.delete(:namespace)
@namespace ||= @redis.namespace if @redis.respond_to? :namespace #this fixes the Redis::Namespace issue
@namespace ||= 'SEMAPHORE' #fall back to original name
@namespace_delim = opts.delete(:namespace_delim) #this allows Redis::Namespace users to pass in ':' as the delimiter
@namespace_delim ||= '::'
end

def available
@redis.llen(list_name)
end

def exists?
@redis.exists(exists_name)
@redis.llen(available_name)
end

def delete!
@redis.del(list_name)
@redis.del(available_name)
@redis.del(grabbed_name)
@redis.del(exists_name)
end

def lock(timeout = 0)
exists_or_create!

token = @redis.blpop(available_name, timeout)
return false if token.nil?

resource_index = @redis.blpop(list_name, timeout)
return false if resource_index.nil?
token = token[1].to_i
@redis.hset grabbed_name, token, DateTime.now.strftime('%s')
token
end

@locked = resource_index[1].to_i
if block_given?
begin
yield @locked
ensure
unlock
end
def with_locked_resource(timeout = 0)
token = lock(timeout)
return false unless token
begin
yield token
ensure
unlock(token)
end

true
end

def unlock
return false unless locked?

@redis.lpush(list_name, @locked)
@locked = false
def unlock(token=0)
raise InconsistentStateError.new('Invalid Unlock') unless token && locked?(token)
@redis.multi do
@redis.lpush available_name, token
@redis.hdel grabbed_name, token
end
end

def locked?
!!@locked
def locked?(token=nil)
if token
@redis.hexists grabbed_name, token
else
@redis.hlen( grabbed_name ) > 0
end
end


private
def list_name
(defined?(Redis::Namespace) && @redis.is_a?(Redis::Namespace)) ? "#{@name}:LIST" : "SEMAPHORE:#{@name}:LIST"
def available_name
@available_name ||= namespaced_key_name('AVAILABLE')
end

def exists_name
(defined?(Redis::Namespace) && @redis.is_a?(Redis::Namespace)) ? "#{@name}:EXISTS" : "SEMAPHORE:#{@name}:EXISTS"
@exists_name ||= namespaced_key_name('EXISTS')
end

def exists_or_create!
exists = @redis.getset(exists_name, 1)
def grabbed_name
@grabbed_name ||= namespaced_key_name('GRABBED')
end

if "1" != exists
@resources.times do |index|
@redis.rpush(list_name, index)
def namespaced_key_name(key_name)
[@namespace,@name,key_name].join(@namespace_delim)
end

def exists_or_create!
old = @redis.get(exists_name)
raise InconsistentStateError.new('Code does not match data') if old && old.to_i != @resources
if @redis.getset(exists_name, @resources)
if @stale_client_timeout
#fix missing clients
@redis.hgetall(grabbed_name).each do |resource_index, last_held_at|
if (last_held_at.to_i + @stale_client_timeout) < DateTime.now.strftime('%s').to_i
@redis.multi do
@redis.hdel(grabbed_name, resource_index)
#in case of race condition, remove the resource that the other process added
@redis.lrem(available_name, 0, resource_index)
@redis.lpush(available_name, resource_index)
end
end
end
end
else
@redis.multi do
@redis.del(grabbed_name)
@redis.del(available_name)
@resources.times do |index|
@redis.rpush(available_name, index)
end
end
end
end

end
end
66 changes: 42 additions & 24 deletions spec/redis_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,10 @@
before(:all) do
# use database 15 for testing so we dont accidentally step on you real data
@redis = Redis.new :db => 15
@semaphore = Redis::Semaphore.new(:my_semaphore, @redis)
end

before(:each) do
@redis.flushdb
end

after(:each) do
@semaphore = Redis::Semaphore.new(:my_semaphore, :redis => @redis)
@redis.flushdb
end

Expand All @@ -24,83 +20,105 @@
end

it "should lock and unlock" do
@semaphore.lock
@semaphore.lock(1)
@semaphore.locked?.should == true
@semaphore.unlock
@semaphore.locked?.should == false
end

it "should not lock twice as a mutex" do
@semaphore.lock
@semaphore.lock(1)
@semaphore.lock(1).should == false
end

it "should not lock three times when only two available" do
multisem = Redis::Semaphore.new(:my_semaphore2, 2, @redis)
multisem.lock.should == true
multisem.lock(1).should == true
multisem = Redis::Semaphore.new(:my_semaphore2, :resources => 2, :redis => @redis)
(!!multisem.lock(1)).should == true
(!!multisem.lock(1)).should == true
multisem.lock(1).should == false
end

it "should reuse the same index for 5 calls in serial" do
multisem = Redis::Semaphore.new(:my_semaphore5_serial, 5, @redis)
multisem = Redis::Semaphore.new(:my_semaphore5_serial, :resources => 5, :redis => @redis)
ids = []
5.times do
multisem.lock(1) do |i|
multisem.with_locked_resource(1) do |i|
ids << i
end
end
ids.size.should == 5
ids.uniq.size.should == 1
end

it "should have 5 different indexes for 5 parallel calls" do
multisem = Redis::Semaphore.new(:my_semaphore5_parallel, 5, @redis)
it "should have 5 different indexes for 5 parallel calls and it should unlock all properly" do
multisem = Redis::Semaphore.new(:my_semaphore5_parallel, :resources => 5, :redis => @redis)
ids = []
multisem.lock(1) do |i|
multisem.with_locked_resource(1) do |i|
ids << i
multisem.lock(1) do |i|
multisem.with_locked_resource(1) do |i|
ids << i
multisem.lock(1) do |i|
multisem.with_locked_resource(1) do |i|
ids << i
multisem.lock(1) do |i|
multisem.with_locked_resource(1) do |i|
ids << i
multisem.lock(1) do |i|
multisem.with_locked_resource(1) do |i|
ids << i
multisem.lock(1) do |i|
multisem.with_locked_resource(1) do |i|
ids << i
end.should == false
end
end
end
end
end
(0..4).to_a.should == ids
ids.should == (0..4).to_a

@redis.lrange(multisem.send(:available_name),0,10).sort.should == %w(0 1 2 3 4)
end

it "should execute the given code block" do
code_executed = false
@semaphore.lock do
@semaphore.with_locked_resource(1) do
code_executed = true
end
code_executed.should == true
end

it "should pass an exception right through" do
lambda do
@semaphore.lock do
@semaphore.with_locked_resource(1) do
raise Exception, "redis semaphore exception"
end
end.should raise_error(Exception, "redis semaphore exception")
end

it "should not leave the semaphore locked after raising an exception" do
lambda do
@semaphore.lock do
@semaphore.with_locked_resource(1) do
raise Exception
end
end.should raise_error

@semaphore.locked?.should == false
end

it "should blow up if the data in redis is off" do
(!!@semaphore.lock(1)).should == true
@semaphore.unlock
@semaphore = Redis::Semaphore.new(:my_semaphore, :resources=>9, :redis => @redis)
lambda do
@semaphore.lock(1)
end.should raise_error(Redis::InconsistentStateError)
@semaphore = Redis::Semaphore.new(:my_semaphore, :resources=>1, :redis => @redis)
(!!@semaphore.lock(1)).should == true
end

it "should restore resources of stale clients" do
hyper_aggressive_sem = Redis::Semaphore.new(:hyper_aggressive_sem, :resources => 1, :redis => @redis, :stale_client_timeout => 1)
(!!hyper_aggressive_sem.lock(1)).should == true
hyper_aggressive_sem.lock(1).should == false #because it is locked as expected
(!!hyper_aggressive_sem.lock(1)).should == true #becuase it is assumed that the first
#lock is no longer valid since the
#client could've been killed
end
end