Skip to content

Commit

Permalink
Merge pull request #378 from zendesk/rajesh.sharma/fix-producer-config
Browse files Browse the repository at this point in the history
Ensure partitioner is correctly passed to internal producer
  • Loading branch information
deepredsky authored Jul 4, 2024
2 parents 097eb35 + fe905df commit e62b329
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 2 deletions.
1 change: 1 addition & 0 deletions lib/racecar/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def init_internal_producer(config)
"client.id" => config.client_id,
"statistics.interval.ms" => config.statistics_interval_ms,
"message.timeout.ms" => config.message_timeout * 1000,
"partitioner" => config.partitioner.to_s
}
producer_config["compression.codec"] = config.producer_compression_codec.to_s unless config.producer_compression_codec.nil?
producer_config.merge!(config.rdkafka_producer)
Expand Down
23 changes: 21 additions & 2 deletions spec/producer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
allow(producer).to receive(:internal_producer).and_return(double("Rdkafka::Producer", :produce => delivery_handle))
end

after do
Racecar::Producer.shutdown!
Racecar::Producer.class_variable_set(:@@init_internal_producer, nil)
end


describe "#produce_async" do
it "sends the message without waiting for feedback or guarantees" do
expect(producer.produce_async(value: value, topic: topic)).to be_nil
Expand All @@ -31,7 +37,7 @@
describe "#wait_for_delivery" do
it "sends the message and waits for a delivery handle" do
expect(delivery_handle).to receive(:wait).exactly(5).times

producer.wait_for_delivery do
5.times do |message|
producer.produce_async(value: message.to_s, topic: topic)
Expand All @@ -46,4 +52,17 @@
expect {producer.produce_sync(value: value, topic: topic) }.to raise_error(Racecar::MessageDeliveryError)
end
end
end

context "for producer config" do
let(:config) do
test_config = Racecar::Config.new()
test_config.partitioner = "murmur2_random"
test_config
end

it "sets the partitioner corretly" do
internal_producer = producer.instance_variable_get("@internal_producer")
expect(internal_producer.instance_variable_get("@partitioner_name")).to eq("murmur2_random")
end
end
end

0 comments on commit e62b329

Please sign in to comment.