Skip to content

Commit

Permalink
WIP: first attempt at #776 / #941
Browse files Browse the repository at this point in the history
  • Loading branch information
Martin Sander authored and marvinthepa committed Dec 12, 2022
1 parent bcfe112 commit dac656d
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 93 deletions.
173 changes: 93 additions & 80 deletions lib/dalli/pipelined_getter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,68 +17,26 @@ def process(keys, &block)
return {} if keys.empty?

@ring.lock do
servers = setup_requests(keys)
start_time = Time.now
servers = fetch_responses(servers, start_time, @ring.socket_timeout, &block) until servers.empty?
requests = setup_requests(keys)
fetch_responses(requests, @ring.socket_timeout, &block)
end
rescue NetworkError => e
Dalli.logger.debug { e.inspect }
Dalli.logger.debug { 'retrying pipelined gets because of timeout' }
retry
end

def setup_requests(keys)
groups = groups_for_keys(keys)
make_getkq_requests(groups)

# TODO: How does this exit on a NetworkError
finish_queries(groups.keys)
end

##
# Loop through the server-grouped sets of keys, writing
# the corresponding getkq requests to the appropriate servers
#
# It's worth noting that we could potentially reduce bytes
# on the wire by switching from getkq to getq, and using
# the opaque value to match requests to responses.
##
def make_getkq_requests(groups)
groups.each do |server, keys_for_server|
server.request(:pipelined_get, keys_for_server)
rescue DalliError, NetworkError => e
Dalli.logger.debug { e.inspect }
Dalli.logger.debug { "unable to get keys for server #{server.name}" }
def setup_requests(all_keys)
groups_for_keys(all_keys).to_h do |server, keys|
# It's worth noting that we could potentially reduce bytes
# on the wire by switching from getkq to getq, and using
# the opaque value to match requests to responses.
[server, server.pipelined_get_request(keys)]
end
end

##
# This loops through the servers that have keys in
# our set, sending the noop to terminate the set of queries.
##
def finish_queries(servers)
deleted = []

servers.each do |server|
next unless server.alive?

begin
finish_query_for_server(server)
rescue Dalli::NetworkError
raise
rescue Dalli::DalliError
deleted.append(server)
end
end

servers.delete_if { |server| deleted.include?(server) }
rescue Dalli::NetworkError
abort_without_timeout(servers)
raise
end

def finish_query_for_server(server)
server.pipeline_response_setup
server.finish_pipeline_request
rescue Dalli::NetworkError
raise
rescue Dalli::DalliError => e
Expand All @@ -92,29 +50,101 @@ def abort_without_timeout(servers)
servers.each(&:pipeline_abort)
end

def fetch_responses(servers, start_time, timeout, &block)
def fetch_responses(requests, timeout, &block)
# FIXME: this was here. why. where should it go?
# Remove any servers which are not connected
servers.delete_if { |s| !s.connected? }
return [] if servers.empty?
# servers.delete_if { |s| !s.connected? }

start_time = Time.now
servers = requests.keys

# FIXME: this was executed before the finish request was sent. Why?
servers.delete_if { |s| !s.alive? }

# could be postponed to after the first write
servers.each(&:pipeline_response_setup)

time_left = remaining_time(start_time, timeout)
readable_servers = servers_with_response(servers, time_left)
if readable_servers.empty?
until servers.empty?
time_left = remaining_time(start_time, timeout)
servers = read_write_select(servers, requests, time_left, &block)
end
rescue NetworkError
# Abort and raise if we encountered a network error. This triggers
# a retry at the top level.
abort_without_timeout(servers)
raise
end

def read_write_select(servers, requests, time_left, &block)
# TODO: - This is a bit challenging. Essentially the PipelinedGetter
# is a reactor, but without the benefit of a Fiber or separate thread.
# My suspicion is that we may want to try and push this down into the
# individual servers, but I'm not sure. For now, we keep the
# mapping between the alerted object (the socket) and the
# corrresponding server here.
server_map = servers.each_with_object({}) { |s, h| h[s.sock] = s }

readable, writable, = IO.select(server_map.keys, server_map.keys,
nil, time_left)

if readable.nil?
abort_with_timeout(servers)
return []
end

# Loop through the servers with responses, and
# delete any from our list that are finished
readable_servers.each do |server|
writable.each do |socket|
server = server_map[socket]
process_writable(server, servers, requests)
end

readable.each do |socket|
server = server_map[socket]

servers.delete(server) if process_server(server, &block)
end

servers
rescue NetworkError
# Abort and raise if we encountered a network error. This triggers
# a retry at the top level.
end

def process_writable(server, servers, requests)
request = requests[server]
return unless request

new_request = server_pipelined_get(server, request)

if new_request.empty?
requests.delete(server)

begin
finish_query_for_server(server)
rescue Dalli::NetworkError
raise
rescue Dalli::DalliError
servers.delete(server)
end
else
requests[server] = new_request
end
rescue Dalli::NetworkError
abort_without_timeout(servers)
raise
rescue DalliError => e
Dalli.logger.debug { e.inspect }
Dalli.logger.debug { "unable to get keys for server #{server.name}" }
end

def server_pipelined_get(server, request)
buffer_size = server.socket_sndbuf
chunk = request[0..buffer_size]
written = server.request(:pipelined_get, chunk)
return if written == :wait_writable

request[written..]
rescue Dalli::NetworkError
raise
rescue DalliError => e
Dalli.logger.debug { e.inspect }
Dalli.logger.debug { "unable to get keys for server #{server.name}" }
end

def remaining_time(start, timeout)
Expand Down Expand Up @@ -144,23 +174,6 @@ def process_server(server)
server.pipeline_complete?
end

def servers_with_response(servers, timeout)
return [] if servers.empty?

# TODO: - This is a bit challenging. Essentially the PipelinedGetter
# is a reactor, but without the benefit of a Fiber or separate thread.
# My suspicion is that we may want to try and push this down into the
# individual servers, but I'm not sure. For now, we keep the
# mapping between the alerted object (the socket) and the
# corrresponding server here.
server_map = servers.each_with_object({}) { |s, h| h[s.sock] = s }

readable, = IO.select(server_map.keys, nil, nil, timeout)
return [] if readable.nil?

readable.map { |sock| server_map[sock] }
end

def groups_for_keys(*keys)
keys.flatten!
keys.map! { |a| @key_manager.validate_key(a.to_s) }
Expand Down
37 changes: 24 additions & 13 deletions lib/dalli/protocol/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ class Base

def_delegators :@value_marshaller, :serializer, :compressor, :compression_min_size, :compress_by_default?
def_delegators :@connection_manager, :name, :sock, :hostname, :port, :close, :connected?, :socket_timeout,
:socket_type, :up!, :down!, :write, :reconnect_down_server?, :raise_down_error
:socket_type, :socket_sndbuf, :up!, :down!, :write, :write_nonblock, :reconnect_down_server?,
:raise_down_error

def initialize(attribs, client_options = {})
hostname, port, socket_type, @weight, user_creds = ServerConfigParser.parse(attribs)
Expand Down Expand Up @@ -59,16 +60,17 @@ def lock!; end

def unlock!; end

# Start reading key/value pairs from this connection. This is usually called
# after a series of GETKQ commands. A NOOP is sent, and the server begins
# flushing responses for kv pairs that were found.
# Get ready to read key/value pairs from this connection.
# This is usually called before or after the first GETKQ command.
#
# Returns nothing.
def pipeline_response_setup
verify_state(:getkq)
write_noop
response_buffer.reset
@connection_manager.start_request!
end

def finish_pipeline_request
write_noop
end

# Attempt to receive and parse as many key/value pairs as possible
Expand Down Expand Up @@ -143,6 +145,14 @@ def quiet?
end
alias multi? quiet?

def pipelined_get_request(keys)
req = +String.new(capacity: pipelined_get_capacity(keys))
keys.each do |key|
req << quiet_get_request(key)
end
req
end

# NOTE: Additional public methods should be overridden in Dalli::Threadsafe

private
Expand Down Expand Up @@ -201,13 +211,14 @@ def connect
raise
end

def pipelined_get(keys)
req = +''
keys.each do |key|
req << quiet_get_request(key)
end
# Could send noop here instead of in pipeline_response_setup
write(req)
def pipelined_get(bytes)
write_nonblock(bytes)
rescue SystemCallError, Timeout::Error, EOFError => e
@connection_manager.error_on_request!(e)
end

def pipelined_get_capacity(keys)
(keys.size * request_header_size) + keys.reduce(0) { |acc, k| acc + k.size }
end

def response_buffer
Expand Down
4 changes: 4 additions & 0 deletions lib/dalli/protocol/binary.rb
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ def write_noop
write(req)
end

def request_header_size
24
end

require_relative 'binary/request_formatter'
require_relative 'binary/response_header'
require_relative 'binary/response_processor'
Expand Down
10 changes: 10 additions & 0 deletions lib/dalli/protocol/connection_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ def read_nonblock
@sock.read_available
end

def write_nonblock(bytes)
@sock.write_nonblock(bytes, exception: false)
end

def max_allowed_failures
@max_allowed_failures ||= @options[:socket_max_failures] || 2
end
Expand Down Expand Up @@ -247,6 +251,12 @@ def log_up_detected
time = Time.now - @down_at
Dalli.logger.warn { format('%<name>s is back (downtime was %<time>.3f seconds)', name: name, time: time) }
end

def socket_sndbuf
@socket_sndbuf ||=
@sock&.getsockopt(::Socket::SOL_SOCKET, ::Socket::SO_SNDBUF)&.int ||
32_768
end
end
end
end
4 changes: 4 additions & 0 deletions lib/dalli/protocol/meta.rb
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ def authenticate_connection
raise Dalli::DalliError, 'Authentication not supported for the meta protocol.'
end

def request_header_size
17
end

require_relative 'meta/key_regularizer'
require_relative 'meta/request_formatter'
require_relative 'meta/response_processor'
Expand Down
21 changes: 21 additions & 0 deletions test/integration/test_pipelined_get.rb
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,27 @@
end
end

it 'does not block for a large number of existing keys' do
memcached_persistent(p) do |dc|
dc.close
dc.flush

key_count = 200_000
range = 0...key_count
dc.quiet do
range.each { |i| dc.set(i, "foobar_#{i}") }
end

Timeout.timeout 60 do
resp = dc.get_multi(range.to_a)

assert_equal key_count, resp.count
end
rescue Timeout::Error
flunk "timed out while getting #{key_count} keys with get_multi"
end
end

describe 'pipeline_next_responses' do
it 'raises NetworkError when called before pipeline_response_setup' do
memcached_persistent(p) do |dc|
Expand Down

0 comments on commit dac656d

Please sign in to comment.