Skip to content

Commit

Permalink
Add sharding (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrkamel committed Sep 21, 2024
1 parent 2eb82b5 commit 96e7f3b
Show file tree
Hide file tree
Showing 14 changed files with 305 additions and 39 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
ruby: ['2.6', '2.7', '3.0']
ruby: ['2.7', '3.0', '3.2']
redis:
- redis:5.0
- redis:6.0
Expand All @@ -26,7 +26,6 @@ jobs:
path: vendor/bundler
key: ${{ hashFiles('Gemfile.lock') }}-${{ matrix.ruby }}
- run: |
gem install bundler
bundle install --path=vendor/bundler
bundle exec rspec
bundle exec rubocop
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# CHANGELOG

## v0.6.0
* Allow sharding records into multiple streams

## v0.5.0
* No longer delete delay messages

Expand Down
1 change: 0 additions & 1 deletion Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ gem "bundler"
gem "concurrent-ruby"
gem "database_cleaner"
gem "factory_bot"
gem "mocha"
gem "rake"
gem "rspec"
gem "rspec-instafail"
Expand Down
27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,33 @@ array of records to `Redstream::Producer#bulk`, like shown above. If you pass
an `ActiveRecord::Relation`, the `#bulk` method will convert it to an array,
i.e. load the whole result set into memory.

## Sharding

When you want to attach multiple consumers to a single stream, you maybe want
to add sharding. This can be accomplished by specifying a dynamic stream name
where you compute the shard key by hashing the primary key.

```ruby
class Product < ActiveRecord::Base
include Redstream::Model

NUM_SHARDS = 4

def self.redstream_name(shard)
"products-#{shard}"
end

def redstream_name
self.class.redstream_name(Digest::SHA1.hexdigest(id.to_s)[0, 4].to_i(16) % NUM_SHARDS)
end
end
```

The sharding via hashing the primary key is neccessary, because we want each
change of a specific object to end up in the same stream. Otherwise the order
of changes for a specific object gets mixed up. Subsequently, you can add
consumers, etc for each individual stream name.

## Namespacing

In case you are using a shared redis, where multiple appications read/write
Expand Down
2 changes: 1 addition & 1 deletion lib/redstream/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def run_once(&block)
commit offset
end

sleep(5) unless got_lock
@lock.wait(5) unless got_lock
rescue StandardError => e
@logger.error e

Expand Down
74 changes: 61 additions & 13 deletions lib/redstream/lock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,40 +21,67 @@ module Redstream
# end

class Lock
class Signal
def initialize
@mutex = Mutex.new
@condition_variable = ConditionVariable.new
end

def wait(timeout)
@mutex.synchronize { @condition_variable.wait(@mutex, timeout) }
end

def signal
@condition_variable.signal
end
end

def initialize(name:)
@name = name
@id = SecureRandom.hex
end

def acquire(&block)
got_lock = get_lock
keep_lock(&block) if got_lock

if got_lock
keep_lock(&block)
release_lock
end

got_lock
end

def wait(timeout)
@wait_redis ||= Redstream.connection_pool.with(&:dup)
@wait_redis.brpop("#{Redstream.lock_key_name(@name)}.notify", timeout: timeout)
end

private

def keep_lock(&block)
stop = false
mutex = Mutex.new
stopped = false
signal = Signal.new

Thread.new do
until mutex.synchronize { stop }
Redstream.connection_pool.with { |redis| redis.expire(Redstream.lock_key_name(@name), 5) }
thread = Thread.new do
until stopped
Redstream.connection_pool.with do |redis|
redis.expire(Redstream.lock_key_name(@name), 5)
end

sleep 3
signal.wait(3)
end
end

block.call
ensure
mutex.synchronize do
stop = true
end
stopped = true
signal&.signal
thread&.join
end

def get_lock
@get_lock_script = <<~GET_LOCK_SCRIPT
@get_lock_script = <<~SCRIPT
local lock_key_name, id = ARGV[1], ARGV[2]
local cur = redis.call('get', lock_key_name)
Expand All @@ -70,9 +97,30 @@ def get_lock
end
return false
GET_LOCK_SCRIPT
SCRIPT

Redstream.connection_pool.with do |redis|
redis.eval(@get_lock_script, argv: [Redstream.lock_key_name(@name), @id])
end
end

def release_lock
@release_lock_script = <<~SCRIPT
local lock_key_name, id = ARGV[1], ARGV[2]
Redstream.connection_pool.with { |redis| redis.eval(@get_lock_script, argv: [Redstream.lock_key_name(@name), @id]) }
local cur = redis.call('get', lock_key_name)
if cur and cur == id then
redis.call('del', lock_key_name)
end
redis.call('del', lock_key_name .. '.notify')
redis.call('rpush', lock_key_name .. '.notify', '1')
SCRIPT

Redstream.connection_pool.with do |redis|
redis.eval(@release_lock_script, argv: [Redstream.lock_key_name(@name), @id])
end
end
end
end
23 changes: 23 additions & 0 deletions lib/redstream/model.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,28 @@ def redstream_name
def redstream_payload
{ id: id }
end

# Override to customize the stream name. By default, the stream name
# is determined by the class name. If you override the instance method,
# please also override the class method.
#
# @example Sharding
# class Product
# include Redstream::Model
#
# NUM_SHARDS = 4
#
# def redstream_name
# self.class.redstream_name(Digest::SHA1.hexdigest(id.to_s)[0, 4].to_i(16) % NUM_SHARDS)
# end
#
# def self.redstream_name(shard)
# "products-#{shard}
# end
# end

def redstream_name
self.class.redstream_name
end
end
end
17 changes: 4 additions & 13 deletions lib/redstream/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ class Producer

def initialize(wait: false)
@wait = wait
@stream_name_cache = {}

super()
end
Expand Down Expand Up @@ -72,7 +71,7 @@ def bulk_delay(records)
Redstream.connection_pool.with do |redis|
redis.pipelined do |pipeline|
slice.each do |object|
pipeline.xadd(Redstream.stream_key_name("#{stream_name(object)}.delay"), { payload: JSON.dump(object.redstream_payload) })
pipeline.xadd(Redstream.stream_key_name("#{object.redstream_name}.delay"), { payload: JSON.dump(object.redstream_payload) })
end
end
end
Expand All @@ -96,7 +95,7 @@ def bulk_queue(records)
Redstream.connection_pool.with do |redis|
redis.pipelined do |pipeline|
slice.each do |object|
pipeline.xadd(Redstream.stream_key_name(stream_name(object)), { payload: JSON.dump(object.redstream_payload) })
pipeline.xadd(Redstream.stream_key_name(object.redstream_name), { payload: JSON.dump(object.redstream_payload) })
end
end
end
Expand All @@ -115,7 +114,7 @@ def bulk_queue(records)

def delay(object)
Redstream.connection_pool.with do |redis|
res = redis.xadd(Redstream.stream_key_name("#{stream_name(object)}.delay"), { payload: JSON.dump(object.redstream_payload) })
res = redis.xadd(Redstream.stream_key_name("#{object.redstream_name}.delay"), { payload: JSON.dump(object.redstream_payload) })
redis.wait(@wait, 0) if @wait
res
end
Expand All @@ -129,18 +128,10 @@ def delay(object)

def queue(object)
Redstream.connection_pool.with do |redis|
redis.xadd(Redstream.stream_key_name(stream_name(object)), { payload: JSON.dump(object.redstream_payload) })
redis.xadd(Redstream.stream_key_name(object.redstream_name), { payload: JSON.dump(object.redstream_payload) })
end

true
end

private

def stream_name(object)
synchronize do
@stream_name_cache[object.class] ||= object.class.redstream_name
end
end
end
end
2 changes: 1 addition & 1 deletion lib/redstream/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module Redstream
VERSION = "0.5.0"
VERSION = "0.6.0"
end
39 changes: 39 additions & 0 deletions spec/redstream/consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -86,5 +86,44 @@

expect(redis.get(Redstream.offset_key_name(stream_name: "products", consumer_name: "consumer"))).to eq(all_messages.last[0])
end

it "does not starve" do
product = create(:product)
stopped = false
results = Concurrent::Array.new

thread1 = Thread.new do
until stopped
Redstream::Consumer.new(name: "consumer", stream_name: "products").run_once do
results.push("thread1")

product.touch

sleep 0.1
end
end
end

thread2 = Thread.new do
until stopped
Redstream::Consumer.new(name: "consumer", stream_name: "products").run_once do
results.push("thread2")

product.touch

sleep 0.1
end
end
end

sleep 6

stopped = true
[thread1, thread2].each(&:join)

expect(results.size).to be > 10
expect(results.count("thread1").to_f / results.size).to be > 0.2
expect(results.count("thread2").to_f / results.size).to be > 0.2
end
end
end
Loading

0 comments on commit 96e7f3b

Please sign in to comment.