Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sharding #11

Merged
merged 7 commits into from
Sep 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
ruby: ['2.6', '2.7', '3.0']
ruby: ['2.7', '3.0', '3.2']
redis:
- redis:5.0
- redis:6.0
Expand All @@ -26,7 +26,6 @@ jobs:
path: vendor/bundler
key: ${{ hashFiles('Gemfile.lock') }}-${{ matrix.ruby }}
- run: |
gem install bundler
bundle install --path=vendor/bundler
bundle exec rspec
bundle exec rubocop
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# CHANGELOG

## v0.6.0
* Allow sharding records into multiple streams

## v0.5.0
* No longer delete delay messages

Expand Down
1 change: 0 additions & 1 deletion Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ gem "bundler"
gem "concurrent-ruby"
gem "database_cleaner"
gem "factory_bot"
gem "mocha"
gem "rake"
gem "rspec"
gem "rspec-instafail"
Expand Down
27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,33 @@ array of records to `Redstream::Producer#bulk`, like shown above. If you pass
an `ActiveRecord::Relation`, the `#bulk` method will convert it to an array,
i.e. load the whole result set into memory.

## Sharding

When you want to attach multiple consumers to a single stream, you maybe want
to add sharding. This can be accomplished by specifying a dynamic stream name
where you compute the shard key by hashing the primary key.

```ruby
class Product < ActiveRecord::Base
include Redstream::Model

NUM_SHARDS = 4

def self.redstream_name(shard)
"products-#{shard}"
end

def redstream_name
self.class.redstream_name(Digest::SHA1.hexdigest(id.to_s)[0, 4].to_i(16) % NUM_SHARDS)
end
end
```

The sharding via hashing the primary key is neccessary, because we want each
change of a specific object to end up in the same stream. Otherwise the order
of changes for a specific object gets mixed up. Subsequently, you can add
consumers, etc for each individual stream name.

## Namespacing

In case you are using a shared redis, where multiple appications read/write
Expand Down
2 changes: 1 addition & 1 deletion lib/redstream/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def run_once(&block)
commit offset
end

sleep(5) unless got_lock
@lock.wait(5) unless got_lock
rescue StandardError => e
@logger.error e

Expand Down
74 changes: 61 additions & 13 deletions lib/redstream/lock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,40 +21,67 @@ module Redstream
# end

class Lock
class Signal
def initialize
@mutex = Mutex.new
@condition_variable = ConditionVariable.new
end

def wait(timeout)
@mutex.synchronize { @condition_variable.wait(@mutex, timeout) }
end

def signal
@condition_variable.signal
end
end

def initialize(name:)
@name = name
@id = SecureRandom.hex
end

def acquire(&block)
got_lock = get_lock
keep_lock(&block) if got_lock

if got_lock
keep_lock(&block)
release_lock
end

got_lock
end

def wait(timeout)
@wait_redis ||= Redstream.connection_pool.with(&:dup)
@wait_redis.brpop("#{Redstream.lock_key_name(@name)}.notify", timeout: timeout)
end

private

def keep_lock(&block)
stop = false
mutex = Mutex.new
stopped = false
signal = Signal.new

Thread.new do
until mutex.synchronize { stop }
Redstream.connection_pool.with { |redis| redis.expire(Redstream.lock_key_name(@name), 5) }
thread = Thread.new do
until stopped
Redstream.connection_pool.with do |redis|
redis.expire(Redstream.lock_key_name(@name), 5)
end

sleep 3
signal.wait(3)
end
end

block.call
ensure
mutex.synchronize do
stop = true
end
stopped = true
signal&.signal
thread&.join
end

def get_lock
@get_lock_script = <<~GET_LOCK_SCRIPT
@get_lock_script = <<~SCRIPT
local lock_key_name, id = ARGV[1], ARGV[2]

local cur = redis.call('get', lock_key_name)
Expand All @@ -70,9 +97,30 @@ def get_lock
end

return false
GET_LOCK_SCRIPT
SCRIPT

Redstream.connection_pool.with do |redis|
redis.eval(@get_lock_script, argv: [Redstream.lock_key_name(@name), @id])
end
end

def release_lock
@release_lock_script = <<~SCRIPT
local lock_key_name, id = ARGV[1], ARGV[2]

Redstream.connection_pool.with { |redis| redis.eval(@get_lock_script, argv: [Redstream.lock_key_name(@name), @id]) }
local cur = redis.call('get', lock_key_name)

if cur and cur == id then
redis.call('del', lock_key_name)
end

redis.call('del', lock_key_name .. '.notify')
redis.call('rpush', lock_key_name .. '.notify', '1')
SCRIPT

Redstream.connection_pool.with do |redis|
redis.eval(@release_lock_script, argv: [Redstream.lock_key_name(@name), @id])
end
end
end
end
23 changes: 23 additions & 0 deletions lib/redstream/model.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,28 @@ def redstream_name
def redstream_payload
{ id: id }
end

# Override to customize the stream name. By default, the stream name
# is determined by the class name. If you override the instance method,
# please also override the class method.
#
# @example Sharding
# class Product
# include Redstream::Model
#
# NUM_SHARDS = 4
#
# def redstream_name
# self.class.redstream_name(Digest::SHA1.hexdigest(id.to_s)[0, 4].to_i(16) % NUM_SHARDS)
# end
#
# def self.redstream_name(shard)
# "products-#{shard}
# end
# end

def redstream_name
self.class.redstream_name
end
end
end
17 changes: 4 additions & 13 deletions lib/redstream/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ class Producer

def initialize(wait: false)
@wait = wait
@stream_name_cache = {}

super()
end
Expand Down Expand Up @@ -72,7 +71,7 @@ def bulk_delay(records)
Redstream.connection_pool.with do |redis|
redis.pipelined do |pipeline|
slice.each do |object|
pipeline.xadd(Redstream.stream_key_name("#{stream_name(object)}.delay"), { payload: JSON.dump(object.redstream_payload) })
pipeline.xadd(Redstream.stream_key_name("#{object.redstream_name}.delay"), { payload: JSON.dump(object.redstream_payload) })
end
end
end
Expand All @@ -96,7 +95,7 @@ def bulk_queue(records)
Redstream.connection_pool.with do |redis|
redis.pipelined do |pipeline|
slice.each do |object|
pipeline.xadd(Redstream.stream_key_name(stream_name(object)), { payload: JSON.dump(object.redstream_payload) })
pipeline.xadd(Redstream.stream_key_name(object.redstream_name), { payload: JSON.dump(object.redstream_payload) })
end
end
end
Expand All @@ -115,7 +114,7 @@ def bulk_queue(records)

def delay(object)
Redstream.connection_pool.with do |redis|
res = redis.xadd(Redstream.stream_key_name("#{stream_name(object)}.delay"), { payload: JSON.dump(object.redstream_payload) })
res = redis.xadd(Redstream.stream_key_name("#{object.redstream_name}.delay"), { payload: JSON.dump(object.redstream_payload) })
redis.wait(@wait, 0) if @wait
res
end
Expand All @@ -129,18 +128,10 @@ def delay(object)

def queue(object)
Redstream.connection_pool.with do |redis|
redis.xadd(Redstream.stream_key_name(stream_name(object)), { payload: JSON.dump(object.redstream_payload) })
redis.xadd(Redstream.stream_key_name(object.redstream_name), { payload: JSON.dump(object.redstream_payload) })
end

true
end

private

def stream_name(object)
synchronize do
@stream_name_cache[object.class] ||= object.class.redstream_name
end
end
end
end
2 changes: 1 addition & 1 deletion lib/redstream/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module Redstream
VERSION = "0.5.0"
VERSION = "0.6.0"
end
39 changes: 39 additions & 0 deletions spec/redstream/consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -86,5 +86,44 @@

expect(redis.get(Redstream.offset_key_name(stream_name: "products", consumer_name: "consumer"))).to eq(all_messages.last[0])
end

it "does not starve" do
product = create(:product)
stopped = false
results = Concurrent::Array.new

thread1 = Thread.new do
until stopped
Redstream::Consumer.new(name: "consumer", stream_name: "products").run_once do
results.push("thread1")

product.touch

sleep 0.1
end
end
end

thread2 = Thread.new do
until stopped
Redstream::Consumer.new(name: "consumer", stream_name: "products").run_once do
results.push("thread2")

product.touch

sleep 0.1
end
end
end

sleep 6

stopped = true
[thread1, thread2].each(&:join)

expect(results.size).to be > 10
expect(results.count("thread1").to_f / results.size).to be > 0.2
expect(results.count("thread2").to_f / results.size).to be > 0.2
end
end
end
Loading
Loading