Skip to content

Commit

Permalink
fix: remove dependencies on the thread local storage (#250)
Browse files Browse the repository at this point in the history
  • Loading branch information
supercaracal authored Sep 2, 2023
1 parent daad2f0 commit ace76d3
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 123 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ jobs:
- name: Print disk info
run: df -h
- name: Run minitest
run: bundle exec rake bench
run: bundle exec rake bench | grep BenchCommand | grep -v 'Envoy#bench_pipeline_echo\|Envoy#bench_single_echo' | sort
- name: Reset qdisc
run: |
for i in {5..9..2}
Expand Down
84 changes: 43 additions & 41 deletions lib/redis_client/cluster/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -94,28 +94,19 @@ def load_info(options, **kwargs) # rubocop:disable Metrics/AbcSize, Metrics/Cycl
startup_options = options.to_a.sample(MAX_STARTUP_SAMPLE).to_h
startup_nodes = ::RedisClient::Cluster::Node.new(startup_options, **kwargs)
startup_nodes.each_slice(MAX_THREADS).with_index do |chuncked_startup_nodes, chuncked_idx|
threads = chuncked_startup_nodes.each_with_index.map do |raw_client, idx|
Thread.new(raw_client, (MAX_THREADS * chuncked_idx) + idx) do |cli, i|
Thread.current[:index] = i
reply = cli.call('CLUSTER', 'NODES')
Thread.current[:info] = parse_cluster_node_reply(reply)
rescue StandardError => e
Thread.current[:error] = e
ensure
cli&.close
chuncked_startup_nodes
.each_with_index
.map { |raw_client, idx| [(MAX_THREADS * chuncked_idx) + idx, build_thread_for_cluster_node(raw_client)] }
.each do |i, t|
case v = t.value
when StandardError
errors ||= Array.new(startup_size)
errors[i] = v
else
node_info_list ||= Array.new(startup_size)
node_info_list[i] = v
end
end
end

threads.each do |t|
t.join
if t.key?(:info)
node_info_list ||= Array.new(startup_size)
node_info_list[t[:index]] = t[:info]
elsif t.key?(:error)
errors ||= Array.new(startup_size)
errors[t[:index]] = t[:error]
end
end
end

raise ::RedisClient::Cluster::InitialSetupError, errors if node_info_list.nil?
Expand All @@ -132,6 +123,17 @@ def load_info(options, **kwargs) # rubocop:disable Metrics/AbcSize, Metrics/Cycl

private

def build_thread_for_cluster_node(raw_client)
Thread.new(raw_client) do |client|
reply = client.call('CLUSTER', 'NODES')
parse_cluster_node_reply(reply)
rescue StandardError => e
e
ensure
client&.close
end
end

# @see https://redis.io/commands/cluster-nodes/
# @see https://github.com/redis/redis/blob/78960ad57b8a5e6af743d789ed8fd767e37d42b8/src/cluster.c#L4660-L4683
def parse_cluster_node_reply(reply) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
Expand Down Expand Up @@ -331,33 +333,33 @@ def call_multiple_nodes!(clients, method, command, args, &block)
raise ::RedisClient::Cluster::ErrorCollection, errors
end

def try_map(clients) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
def try_map(clients, &block)
results = errors = nil
clients.each_slice(MAX_THREADS) do |chuncked_clients|
threads = chuncked_clients.map do |k, v|
Thread.new(k, v) do |node_key, client|
Thread.current[:node_key] = node_key
reply = yield(node_key, client)
Thread.current[:result] = reply
rescue StandardError => e
Thread.current[:error] = e
end
end

threads.each do |t|
t.join
if t.key?(:result)
results ||= {}
results[t[:node_key]] = t[:result]
elsif t.key?(:error)
errors ||= {}
errors[t[:node_key]] = t[:error]
chuncked_clients
.map { |node_key, client| [node_key, build_thread_for_command(node_key, client, &block)] }
.each do |node_key, thread|
case v = thread.value
when StandardError
errors ||= {}
errors[node_key] = v
else
results ||= {}
results[node_key] = v
end
end
end
end

[results, errors]
end

def build_thread_for_command(node_key, client)
Thread.new(node_key, client) do |nk, cli|
yield(nk, cli)
rescue StandardError => e
e
end
end
end
end
end
39 changes: 18 additions & 21 deletions lib/redis_client/cluster/node/latency_replica.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,30 +39,27 @@ def any_replica_node_key(seed: nil)

private

def measure_latencies(clients) # rubocop:disable Metrics/AbcSize
def measure_latencies(clients)
clients.each_slice(::RedisClient::Cluster::Node::MAX_THREADS).each_with_object({}) do |chuncked_clients, acc|
threads = chuncked_clients.map do |k, v|
Thread.new(k, v) do |node_key, client|
Thread.current[:node_key] = node_key

min = DUMMY_LATENCY_MSEC
MEASURE_ATTEMPT_COUNT.times do
starting = Process.clock_gettime(Process::CLOCK_MONOTONIC, :microsecond)
client.call_once('PING')
duration = Process.clock_gettime(Process::CLOCK_MONOTONIC, :microsecond) - starting
min = duration if duration < min
end

Thread.current[:latency] = min
rescue StandardError
Thread.current[:latency] = DUMMY_LATENCY_MSEC
end
end
chuncked_clients
.map { |node_key, client| [node_key, build_thread_for_measuring_latency(client)] }
.each { |node_key, thread| acc[node_key] = thread.value }
end
end

threads.each do |t|
t.join
acc[t[:node_key]] = t[:latency]
def build_thread_for_measuring_latency(client)
Thread.new(client) do |cli|
min = DUMMY_LATENCY_MSEC
MEASURE_ATTEMPT_COUNT.times do
starting = Process.clock_gettime(Process::CLOCK_MONOTONIC, :microsecond)
cli.call_once('PING')
duration = Process.clock_gettime(Process::CLOCK_MONOTONIC, :microsecond) - starting
min = duration if duration < min
end

min
rescue StandardError
DUMMY_LATENCY_MSEC
end
end

Expand Down
4 changes: 2 additions & 2 deletions lib/redis_client/cluster/node/replica_mixin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ def any_primary_node_key(seed: nil)
private

def build_clients(primary_node_keys, options, pool, **kwargs)
options.filter_map do |node_key, option|
options.to_h do |node_key, option|
option = option.merge(kwargs.reject { |k, _| ::RedisClient::Cluster::Node::IGNORE_GENERIC_CONFIG_KEYS.include?(k) })
config = ::RedisClient::Cluster::Node::Config.new(scale_read: !primary_node_keys.include?(node_key), **option)
client = pool.nil? ? config.new_client : config.new_pool(**pool)
[node_key, client]
end.to_h
end
end
end
end
Expand Down
58 changes: 27 additions & 31 deletions lib/redis_client/cluster/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -148,38 +148,23 @@ def empty?
def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
all_replies = errors = nil
@pipelines&.each_slice(MAX_THREADS) do |chuncked_pipelines|
threads = chuncked_pipelines.map do |node_key, pipeline|
Thread.new(node_key, pipeline) do |nk, pl|
Thread.current[:node_key] = nk
replies = do_pipelining(@router.find_node(nk), pl)
raise ReplySizeError, "commands: #{pl._size}, replies: #{replies.size}" if pl._size != replies.size

Thread.current[:replies] = replies
rescue ::RedisClient::Cluster::Pipeline::RedirectionNeeded => e
Thread.current[:redirection_needed] = e
rescue StandardError => e
Thread.current[:error] = e
end
end

threads.each(&:join)
threads.each do |t|
if t.key?(:replies)
all_replies ||= Array.new(@size)
@pipelines[t[:node_key]]
.outer_indices
.each_with_index { |outer, inner| all_replies[outer] = t[:replies][inner] }
elsif t.key?(:redirection_needed)
all_replies ||= Array.new(@size)
pipeline = @pipelines[t[:node_key]]
err = t[:redirection_needed]
err.indices.each { |i| err.replies[i] = handle_redirection(err.replies[i], pipeline, i) }
pipeline.outer_indices.each_with_index { |outer, inner| all_replies[outer] = err.replies[inner] }
elsif t.key?(:error)
errors ||= {}
errors[t[:node_key]] = t[:error]
chuncked_pipelines
.map { |node_key, pipeline| [node_key, build_thread_for_pipeline(@router, node_key, pipeline)] }
.each do |node_key, thread|
case v = thread.value
when ::RedisClient::Cluster::Pipeline::RedirectionNeeded
all_replies ||= Array.new(@size)
pipeline = @pipelines[node_key]
v.indices.each { |i| v.replies[i] = handle_redirection(v.replies[i], pipeline, i) }
pipeline.outer_indices.each_with_index { |outer, inner| all_replies[outer] = v.replies[inner] }
when StandardError
errors ||= {}
errors[node_key] = v
else
all_replies ||= Array.new(@size)
@pipelines[node_key].outer_indices.each_with_index { |outer, inner| all_replies[outer] = v[inner] }
end
end
end
end

raise ::RedisClient::Cluster::ErrorCollection, errors unless errors.nil?
Expand All @@ -197,6 +182,17 @@ def append_pipeline(node_key)
@pipelines[node_key]
end

def build_thread_for_pipeline(router, node_key, pipeline)
Thread.new(router, node_key, pipeline) do |rt, nk, pl|
replies = do_pipelining(rt.find_node(nk), pl)
raise ReplySizeError, "commands: #{pl._size}, replies: #{replies.size}" if pl._size != replies.size

replies
rescue StandardError => e
e
end
end

def do_pipelining(client, pipeline)
case client
when ::RedisClient then send_pipeline(client, pipeline)
Expand Down
6 changes: 3 additions & 3 deletions lib/redis_client/cluster/pub_sub.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def take_message(timeout)
@worker = subscribe(@client, timeout) if @worker.nil?
return if @worker.alive?

message = @worker[:reply]
message = @worker.value
@worker = nil
message
end
Expand All @@ -33,9 +33,9 @@ def take_message(timeout)

def subscribe(client, timeout)
Thread.new(client, timeout) do |pubsub, to|
Thread.current[:reply] = pubsub.next_event(to)
pubsub.next_event(to)
rescue StandardError => e
Thread.current[:reply] = e
e
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion test/bench_command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class Envoy < BenchmarkWrapper
include BenchmarkMixinForProxy

# https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/other_protocols/redis#supported-commands
def bench_echo
def bench_single_echo
skip('Envoy does not support ECHO command.')
end

Expand Down
6 changes: 3 additions & 3 deletions test/benchmark_mixin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,23 @@ def teardown
@client&.close
end

def bench_echo
def bench_single_echo
assert_performance_linear(MIN_THRESHOLD) do |n|
n.times do
@client.call('ECHO', 'Hello world')
end
end
end

def bench_set
def bench_single_set
assert_performance_linear(MIN_THRESHOLD) do |n|
n.times do |i|
@client.call('SET', "key#{i}", i)
end
end
end

def bench_get
def bench_single_get
assert_performance_linear(MIN_THRESHOLD) do |n|
n.times do |i|
@client.call('GET', "key#{i}")
Expand Down
22 changes: 8 additions & 14 deletions test/redis_client/cluster/test_normalized_cmd_name.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,24 +66,18 @@ def test_thread_safety
attempts = Array.new(100, 'dummy')

threads = attempts.each_with_index.map do |_, i|
Thread.new do
Thread.current[:index] = i
got = if i.even?
@subject.get_by_command(%w[SET foo bar])
else
@subject.clear ? 'set' : 'clear failed'
end
Thread.current[:got] = got
Thread.new(i) do
if i.even?
@subject.get_by_command(%w[SET foo bar])
else
@subject.clear ? 'set' : 'clear failed'
end
rescue StandardError => e
Thread.current[:got] = "#{e.class.name}: #{e.message}"
"#{e.class.name}: #{e.message}"
end
end

threads.each do |t|
t.join
attempts[t[:index]] = t[:got]
end

threads.each_with_index { |t, i| attempts[i] = t.value }
attempts.each { |got| assert_equal('set', got) }
end
end
Expand Down
12 changes: 6 additions & 6 deletions test/test_concurrency.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ def test_threading
Thread.new do
ATTEMPTS.times { MAX_THREADS.times { |i| @client.call('INCR', "key#{i}") } }
ATTEMPTS.times { MAX_THREADS.times { |i| @client.call('DECR', "key#{i}") } }
nil
rescue StandardError => e
Thread.current[:error] = e
e
end
end

threads.each(&:join)
threads.each { |t| assert_nil(t[:error]) }
threads.each { |t| assert_nil(t.value) }
MAX_THREADS.times { |i| assert_equal(WANT, @client.call('GET', "key#{i}")) }
end

Expand All @@ -74,13 +74,13 @@ def test_threading_with_pipelining
Thread.new do
@client.pipelined { |pi| ATTEMPTS.times { MAX_THREADS.times { |i| pi.call('INCR', "key#{i}") } } }
@client.pipelined { |pi| ATTEMPTS.times { MAX_THREADS.times { |i| pi.call('DECR', "key#{i}") } } }
nil
rescue StandardError => e
Thread.current[:error] = e
e
end
end

threads.each(&:join)
threads.each { |t| assert_nil(t[:error]) }
threads.each { |t| assert_nil(t.value) }
MAX_THREADS.times { |i| assert_equal(WANT, @client.call('GET', "key#{i}")) }
end

Expand Down

0 comments on commit ace76d3

Please sign in to comment.