Skip to content

Commit

Permalink
fix: make no-concurrency mode default (#411)
Browse files Browse the repository at this point in the history
  • Loading branch information
supercaracal authored Nov 19, 2024
1 parent b293513 commit 1512a0d
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 13 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ gem 'redis-cluster-client'
| `:replica_affinity` | Symbol or String | `:random` | scale reading strategy, `:random`, `random_with_primary` or `:latency` are valid |
| `:fixed_hostname` | String | `nil` | required if client should connect to single endpoint with SSL |
| `:slow_command_timeout` | Integer | `-1` | timeout used for "slow" queries that fetch metdata e.g. CLUSTER NODES, COMMAND |
| `:concurrency` | Hash | `{ model: :on_demand, size: 5}` | concurrency settings, `:on_demand`, `:pooled` and `:none` are valid models, size is a max number of workers, `:none` model is no concurrency, Please choose the one suited your environment if needed. |
| `:concurrency` | Hash | `{ model: :none }` | concurrency settings, `:on_demand`, `:pooled` and `:none` are valid models, size is a max number of workers, `:none` model is no concurrency, Please choose the one suited your environment if needed. |
| `:connect_with_original_config` | Boolean | `false` | `true` if client should retry the connection using the original endpoint that was passed in |
| `:max_startup_sample` | Integer | `3` | maximum number of nodes to fetch `CLUSTER NODES` information for startup |

Expand Down
10 changes: 4 additions & 6 deletions lib/redis_client/cluster/concurrent_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,12 @@ def inspect

module_function

def create(model: :on_demand, size: 5)
size = size.positive? ? size : 5

def create(model: :none, size: 5)
case model
when :on_demand, nil then ::RedisClient::Cluster::ConcurrentWorker::OnDemand.new(size: size)
when :pooled then ::RedisClient::Cluster::ConcurrentWorker::Pooled.new(size: size)
when :none then ::RedisClient::Cluster::ConcurrentWorker::None.new
else raise ArgumentError, "Unknown model: #{model}"
when :on_demand then ::RedisClient::Cluster::ConcurrentWorker::OnDemand.new(size: size)
when :pooled then ::RedisClient::Cluster::ConcurrentWorker::Pooled.new(size: size)
else raise ArgumentError, "unknown model: #{model}"
end
end
end
Expand Down
2 changes: 2 additions & 0 deletions lib/redis_client/cluster/concurrent_worker/on_demand.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ class Cluster
module ConcurrentWorker
class OnDemand
def initialize(size:)
raise ArgumentError, "size must be positive: #{size}" unless size.positive?

@q = SizedQueue.new(size)
end

Expand Down
2 changes: 2 additions & 0 deletions lib/redis_client/cluster/concurrent_worker/pooled.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ module ConcurrentWorker
# So it consumes memory 1 MB multiplied a number of workers.
class Pooled
def initialize(size:)
raise ArgumentError, "size must be positive: #{size}" unless size.positive?

@size = size
setup
end
Expand Down
16 changes: 10 additions & 6 deletions lib/redis_client/cluster_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class ClusterConfig
VALID_NODES_KEYS = %i[ssl username password host port db].freeze
MERGE_CONFIG_KEYS = %i[ssl username password].freeze
IGNORE_GENERIC_CONFIG_KEYS = %i[url host port path].freeze
MAX_WORKERS = Integer(ENV.fetch('REDIS_CLIENT_MAX_THREADS', 5))
MAX_WORKERS = Integer(ENV.fetch('REDIS_CLIENT_MAX_THREADS', -1)) # for backward compatibility
# It's used with slow queries of fetching meta data like CLUSTER NODES, COMMAND and so on.
SLOW_COMMAND_TIMEOUT = Float(ENV.fetch('REDIS_CLIENT_SLOW_COMMAND_TIMEOUT', -1))
# It affects to strike a balance between load and stability in initialization or changed states.
Expand Down Expand Up @@ -110,12 +110,16 @@ def server_url
private

def merge_concurrency_option(option)
case option
when Hash
option = option.transform_keys(&:to_sym)
{ size: MAX_WORKERS }.merge(option)
else { size: MAX_WORKERS }
opts = {}

if MAX_WORKERS.positive?
opts[:model] = :on_demand
opts[:size] = MAX_WORKERS
end

opts.merge!(option.transform_keys(&:to_sym)) if option.is_a?(Hash)
opts[:model] = :none if opts.empty?
opts.freeze
end

def build_node_configs(addrs)
Expand Down
12 changes: 12 additions & 0 deletions test/redis_client/test_cluster_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,18 @@ def test_command_builder
assert_equal(::RedisClient::CommandBuilder, ::RedisClient::ClusterConfig.new.command_builder)
end

def test_concurrency
[
{ value: nil, want: { model: :none } },
{ value: { model: :none }, want: { model: :none } },
{ value: { model: :on_demand, size: 3 }, want: { model: :on_demand, size: 3 } },
{ value: { model: :pooled, size: 6 }, want: { model: :pooled, size: 6 } }
].each do |c|
cfg = ::RedisClient::ClusterConfig.new(concurrency: c[:value])
assert_equal(c[:want], cfg.instance_variable_get(:@concurrency))
end
end

def test_build_node_configs
config = ::RedisClient::ClusterConfig.new
[
Expand Down

0 comments on commit 1512a0d

Please sign in to comment.