diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 56bf5a2..16fb18b 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -5,7 +5,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - ruby: ['2.6', '2.7', '3.0'] + ruby: ['2.7', '3.0', '3.2'] redis: - redis:5.0 - redis:6.0 @@ -26,7 +26,6 @@ jobs: path: vendor/bundler key: ${{ hashFiles('Gemfile.lock') }}-${{ matrix.ruby }} - run: | - gem install bundler bundle install --path=vendor/bundler bundle exec rspec bundle exec rubocop diff --git a/CHANGELOG.md b/CHANGELOG.md index a021cbc..1114681 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # CHANGELOG +## v0.6.0 +* Allow sharding records into multiple streams + ## v0.5.0 * No longer delete delay messages diff --git a/Gemfile b/Gemfile index 57b0b58..15a89d2 100644 --- a/Gemfile +++ b/Gemfile @@ -7,7 +7,6 @@ gem "bundler" gem "concurrent-ruby" gem "database_cleaner" gem "factory_bot" -gem "mocha" gem "rake" gem "rspec" gem "rspec-instafail" diff --git a/README.md b/README.md index 3907f3d..6f72f7a 100644 --- a/README.md +++ b/README.md @@ -231,6 +231,33 @@ array of records to `Redstream::Producer#bulk`, like shown above. If you pass an `ActiveRecord::Relation`, the `#bulk` method will convert it to an array, i.e. load the whole result set into memory. +## Sharding + +When you want to attach multiple consumers to a single stream, you maybe want +to add sharding. This can be accomplished by specifying a dynamic stream name +where you compute the shard key by hashing the primary key. + +```ruby +class Product < ActiveRecord::Base + include Redstream::Model + + NUM_SHARDS = 4 + + def self.redstream_name(shard) + "products-#{shard}" + end + + def redstream_name + self.class.redstream_name(Digest::SHA1.hexdigest(id.to_s)[0, 4].to_i(16) % NUM_SHARDS) + end +end +``` + +The sharding via hashing the primary key is neccessary, because we want each +change of a specific object to end up in the same stream. Otherwise the order +of changes for a specific object gets mixed up. Subsequently, you can add +consumers, etc for each individual stream name. + ## Namespacing In case you are using a shared redis, where multiple appications read/write diff --git a/lib/redstream/consumer.rb b/lib/redstream/consumer.rb index d08335a..c029ab4 100644 --- a/lib/redstream/consumer.rb +++ b/lib/redstream/consumer.rb @@ -90,7 +90,7 @@ def run_once(&block) commit offset end - sleep(5) unless got_lock + @lock.wait(5) unless got_lock rescue StandardError => e @logger.error e diff --git a/lib/redstream/lock.rb b/lib/redstream/lock.rb index 2f81ac3..85a356b 100644 --- a/lib/redstream/lock.rb +++ b/lib/redstream/lock.rb @@ -21,6 +21,21 @@ module Redstream # end class Lock + class Signal + def initialize + @mutex = Mutex.new + @condition_variable = ConditionVariable.new + end + + def wait(timeout) + @mutex.synchronize { @condition_variable.wait(@mutex, timeout) } + end + + def signal + @condition_variable.signal + end + end + def initialize(name:) @name = name @id = SecureRandom.hex @@ -28,33 +43,45 @@ def initialize(name:) def acquire(&block) got_lock = get_lock - keep_lock(&block) if got_lock + + if got_lock + keep_lock(&block) + release_lock + end + got_lock end + def wait(timeout) + @wait_redis ||= Redstream.connection_pool.with(&:dup) + @wait_redis.brpop("#{Redstream.lock_key_name(@name)}.notify", timeout: timeout) + end + private def keep_lock(&block) - stop = false - mutex = Mutex.new + stopped = false + signal = Signal.new - Thread.new do - until mutex.synchronize { stop } - Redstream.connection_pool.with { |redis| redis.expire(Redstream.lock_key_name(@name), 5) } + thread = Thread.new do + until stopped + Redstream.connection_pool.with do |redis| + redis.expire(Redstream.lock_key_name(@name), 5) + end - sleep 3 + signal.wait(3) end end block.call ensure - mutex.synchronize do - stop = true - end + stopped = true + signal&.signal + thread&.join end def get_lock - @get_lock_script = <<~GET_LOCK_SCRIPT + @get_lock_script = <<~SCRIPT local lock_key_name, id = ARGV[1], ARGV[2] local cur = redis.call('get', lock_key_name) @@ -70,9 +97,30 @@ def get_lock end return false - GET_LOCK_SCRIPT + SCRIPT + + Redstream.connection_pool.with do |redis| + redis.eval(@get_lock_script, argv: [Redstream.lock_key_name(@name), @id]) + end + end + + def release_lock + @release_lock_script = <<~SCRIPT + local lock_key_name, id = ARGV[1], ARGV[2] - Redstream.connection_pool.with { |redis| redis.eval(@get_lock_script, argv: [Redstream.lock_key_name(@name), @id]) } + local cur = redis.call('get', lock_key_name) + + if cur and cur == id then + redis.call('del', lock_key_name) + end + + redis.call('del', lock_key_name .. '.notify') + redis.call('rpush', lock_key_name .. '.notify', '1') + SCRIPT + + Redstream.connection_pool.with do |redis| + redis.eval(@release_lock_script, argv: [Redstream.lock_key_name(@name), @id]) + end end end end diff --git a/lib/redstream/model.rb b/lib/redstream/model.rb index 51acf75..800d15c 100644 --- a/lib/redstream/model.rb +++ b/lib/redstream/model.rb @@ -58,5 +58,28 @@ def redstream_name def redstream_payload { id: id } end + + # Override to customize the stream name. By default, the stream name + # is determined by the class name. If you override the instance method, + # please also override the class method. + # + # @example Sharding + # class Product + # include Redstream::Model + # + # NUM_SHARDS = 4 + # + # def redstream_name + # self.class.redstream_name(Digest::SHA1.hexdigest(id.to_s)[0, 4].to_i(16) % NUM_SHARDS) + # end + # + # def self.redstream_name(shard) + # "products-#{shard} + # end + # end + + def redstream_name + self.class.redstream_name + end end end diff --git a/lib/redstream/producer.rb b/lib/redstream/producer.rb index 08c6843..663d4a6 100644 --- a/lib/redstream/producer.rb +++ b/lib/redstream/producer.rb @@ -31,7 +31,6 @@ class Producer def initialize(wait: false) @wait = wait - @stream_name_cache = {} super() end @@ -72,7 +71,7 @@ def bulk_delay(records) Redstream.connection_pool.with do |redis| redis.pipelined do |pipeline| slice.each do |object| - pipeline.xadd(Redstream.stream_key_name("#{stream_name(object)}.delay"), { payload: JSON.dump(object.redstream_payload) }) + pipeline.xadd(Redstream.stream_key_name("#{object.redstream_name}.delay"), { payload: JSON.dump(object.redstream_payload) }) end end end @@ -96,7 +95,7 @@ def bulk_queue(records) Redstream.connection_pool.with do |redis| redis.pipelined do |pipeline| slice.each do |object| - pipeline.xadd(Redstream.stream_key_name(stream_name(object)), { payload: JSON.dump(object.redstream_payload) }) + pipeline.xadd(Redstream.stream_key_name(object.redstream_name), { payload: JSON.dump(object.redstream_payload) }) end end end @@ -115,7 +114,7 @@ def bulk_queue(records) def delay(object) Redstream.connection_pool.with do |redis| - res = redis.xadd(Redstream.stream_key_name("#{stream_name(object)}.delay"), { payload: JSON.dump(object.redstream_payload) }) + res = redis.xadd(Redstream.stream_key_name("#{object.redstream_name}.delay"), { payload: JSON.dump(object.redstream_payload) }) redis.wait(@wait, 0) if @wait res end @@ -129,18 +128,10 @@ def delay(object) def queue(object) Redstream.connection_pool.with do |redis| - redis.xadd(Redstream.stream_key_name(stream_name(object)), { payload: JSON.dump(object.redstream_payload) }) + redis.xadd(Redstream.stream_key_name(object.redstream_name), { payload: JSON.dump(object.redstream_payload) }) end true end - - private - - def stream_name(object) - synchronize do - @stream_name_cache[object.class] ||= object.class.redstream_name - end - end end end diff --git a/lib/redstream/version.rb b/lib/redstream/version.rb index ee58c4f..6108d60 100644 --- a/lib/redstream/version.rb +++ b/lib/redstream/version.rb @@ -1,3 +1,3 @@ module Redstream - VERSION = "0.5.0" + VERSION = "0.6.0" end diff --git a/spec/redstream/consumer_spec.rb b/spec/redstream/consumer_spec.rb index f3a8d5d..672bb46 100644 --- a/spec/redstream/consumer_spec.rb +++ b/spec/redstream/consumer_spec.rb @@ -86,5 +86,44 @@ expect(redis.get(Redstream.offset_key_name(stream_name: "products", consumer_name: "consumer"))).to eq(all_messages.last[0]) end + + it "does not starve" do + product = create(:product) + stopped = false + results = Concurrent::Array.new + + thread1 = Thread.new do + until stopped + Redstream::Consumer.new(name: "consumer", stream_name: "products").run_once do + results.push("thread1") + + product.touch + + sleep 0.1 + end + end + end + + thread2 = Thread.new do + until stopped + Redstream::Consumer.new(name: "consumer", stream_name: "products").run_once do + results.push("thread2") + + product.touch + + sleep 0.1 + end + end + end + + sleep 6 + + stopped = true + [thread1, thread2].each(&:join) + + expect(results.size).to be > 10 + expect(results.count("thread1").to_f / results.size).to be > 0.2 + expect(results.count("thread2").to_f / results.size).to be > 0.2 + end end end diff --git a/spec/redstream/lock_spec.rb b/spec/redstream/lock_spec.rb index 5be3368..b89268e 100644 --- a/spec/redstream/lock_spec.rb +++ b/spec/redstream/lock_spec.rb @@ -34,7 +34,7 @@ end end - sleep 6 + sleep 5 threads << Thread.new do Redstream::Lock.new(name: "lock").acquire do @@ -62,5 +62,87 @@ expect(calls).to eq(2) expect(lock_results).to eq([1, 1]) end + + it "releases the lock and notifies" do + lock = Redstream::Lock.new(name: "lock") + + expect(redis.llen("#{Redstream.lock_key_name("lock")}.notify")).to eq(0) + + lock.acquire do + # nothing + end + + expect(redis.exists?(Redstream.lock_key_name("lock"))).to eq(false) + expect(redis.llen("#{Redstream.lock_key_name("lock")}.notify")).to eq(1) + end + + it "does not release the lock when the lock is already taken again" do + lock = Redstream::Lock.new(name: "lock") + + lock.acquire do + redis.set(Redstream.lock_key_name("lock"), "other") + end + + expect(redis.get(Redstream.lock_key_name("lock"))).to eq("other") + end + + it "acquires the lock as soon as it gets released" do + time = nil + + thread = Thread.new do + Redstream::Lock.new(name: "lock").acquire do + time = Time.now.to_f + + sleep 2 + end + end + + sleep 1 + + lock = Redstream::Lock.new(name: "lock") + lock.wait(10) until lock.acquire { "nothing" } + + thread.join + + expect(Time.now.to_f - time).to be < 3 + end + end + + describe "#wait" do + it "blocks for the specified time max" do + stopped = false + + thread = Thread.new do + Redstream::Lock.new(name: "lock").acquire do + sleep 0.1 until stopped + end + end + + time = Time.now.to_f + + Redstream::Lock.new(name: "lock").wait(2) + + expect(Time.now.to_f - time).to be < 3 + + stopped = true + + thread.join + end + + it "wakes up when the lock gets released" do + thread = Thread.new do + Redstream::Lock.new(name: "lock").acquire do + sleep 2 + end + end + + time = Time.now.to_f + + Redstream::Lock.new(name: "lock").wait(10) + + expect(Time.now.to_f - time).to be < 3 + + thread.join + end end end diff --git a/spec/redstream/producer_spec.rb b/spec/redstream/producer_spec.rb index 6c1fd64..04d5cb3 100644 --- a/spec/redstream/producer_spec.rb +++ b/spec/redstream/producer_spec.rb @@ -10,6 +10,17 @@ expect { Redstream::Producer.new.queue(product) }.to change { redis.xlen(stream_key_name) }.by(1) expect(redis.xrange(stream_key_name, "-", "+").last[1]).to eq("payload" => JSON.dump(product.redstream_payload)) end + + it "uses a custom stream name when specified" do + product = create(:product) + + allow(product).to receive(:redstream_name).and_return("stream-name") + + stream_key_name = Redstream.stream_key_name("stream-name") + + expect { Redstream::Producer.new.queue(product) }.to change { redis.xlen(stream_key_name) }.by(1) + expect(redis.xrange(stream_key_name, "-", "+").last[1]).to eq("payload" => JSON.dump(product.redstream_payload)) + end end describe "#delay" do @@ -22,7 +33,18 @@ expect(redis.xrange(stream_key_name, "-", "+").last[1]).to eq("payload" => JSON.dump(product.redstream_payload)) end - it "resepects wait" do + it "uses a custom stream name when specified" do + product = create(:product) + + allow(product).to receive(:redstream_name).and_return("stream-name") + + stream_key_name = Redstream.stream_key_name("stream-name.delay") + + expect { Redstream::Producer.new.delay(product) }.to change { redis.xlen(stream_key_name) }.by(1) + expect(redis.xrange(stream_key_name, "-", "+").last[1]).to eq("payload" => JSON.dump(product.redstream_payload)) + end + + it "respects wait" do product = create(:product) stream_key_name = Redstream.stream_key_name("products.delay") @@ -51,6 +73,24 @@ end end + it "uses a custom stream name when specified" do + allow_any_instance_of(Product).to receive(:redstream_name).and_return("stream-name") + + create_list(:product, 2) + + stream_key_name = Redstream.stream_key_name("stream-name") + + expect(redis.xlen(stream_key_name)).to eq(2) + expect(redis.xlen("#{stream_key_name}.delay")).to eq(2) + + Redstream::Producer.new.bulk(Product.all) do + # nothing + end + + expect(redis.xlen(stream_key_name)).to eq(4) + expect(redis.xlen("#{stream_key_name}.delay")).to eq(4) + end + it "adds bulk queue messages for scopes" do products = create_list(:product, 2) @@ -86,6 +126,16 @@ { "payload" => JSON.dump(products[1].redstream_payload) } ]) end + + it "uses a custom stream nameadds bulk queue messages for scopes" do + allow_any_instance_of(Product).to receive(:redstream_name).and_return("stream-name") + + create_list(:product, 2) + + stream_key_name = Redstream.stream_key_name("stream-name") + + expect { Redstream::Producer.new.bulk_queue(Product.all) }.to change { redis.xlen(stream_key_name) }.by(2) + end end describe "#bulk_delay" do @@ -104,6 +154,16 @@ ]) end + it "uses a custom stream name when specified" do + allow_any_instance_of(Product).to receive(:redstream_name).and_return("stream-name") + + create_list(:product, 2) + + stream_key_name = Redstream.stream_key_name("stream-name.delay") + + expect { Redstream::Producer.new.bulk_delay(Product.all) }.to change { redis.xlen(stream_key_name) }.by(2) + end + it "should respect wait for delay" do create(:product) diff --git a/spec/redstream/trimmer_spec.rb b/spec/redstream/trimmer_spec.rb index 7cc9eac..332b945 100644 --- a/spec/redstream/trimmer_spec.rb +++ b/spec/redstream/trimmer_spec.rb @@ -23,7 +23,7 @@ it "sleeps for the specified time if there's nothing to trim" do trimmer = Redstream::Trimmer.new(interval: 1, stream_name: "default", consumer_names: ["unknown_consumer"]) - trimmer.expects(:sleep).with(1).returns(true) + allow(trimmer).to receive(:sleep).with(1).and_return(true) trimmer.run_once end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 674759c..ccc769c 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -4,13 +4,8 @@ require "database_cleaner" require "timecop" require "concurrent" -require "mocha" require "rspec/instafail" -RSpec.configure do |config| - config.mock_with :mocha -end - ActiveRecord::Base.establish_connection(adapter: "sqlite3", database: "/tmp/redstream.sqlite3") ActiveRecord::Base.connection.execute "DROP TABLE IF EXISTS products"