diff --git a/lib/dalli/pipelined_getter.rb b/lib/dalli/pipelined_getter.rb index 5fbb8bb2..cc0485db 100644 --- a/lib/dalli/pipelined_getter.rb +++ b/lib/dalli/pipelined_getter.rb @@ -17,9 +17,8 @@ def process(keys, &block) return {} if keys.empty? @ring.lock do - servers = setup_requests(keys) - start_time = Time.now - servers = fetch_responses(servers, start_time, @ring.socket_timeout, &block) until servers.empty? + requests = setup_requests(keys) + fetch_responses(requests, @ring.socket_timeout, &block) end rescue NetworkError => e Dalli.logger.debug { e.inspect } @@ -27,58 +26,17 @@ def process(keys, &block) retry end - def setup_requests(keys) - groups = groups_for_keys(keys) - make_getkq_requests(groups) - - # TODO: How does this exit on a NetworkError - finish_queries(groups.keys) - end - - ## - # Loop through the server-grouped sets of keys, writing - # the corresponding getkq requests to the appropriate servers - # - # It's worth noting that we could potentially reduce bytes - # on the wire by switching from getkq to getq, and using - # the opaque value to match requests to responses. - ## - def make_getkq_requests(groups) - groups.each do |server, keys_for_server| - server.request(:pipelined_get, keys_for_server) - rescue DalliError, NetworkError => e - Dalli.logger.debug { e.inspect } - Dalli.logger.debug { "unable to get keys for server #{server.name}" } + def setup_requests(all_keys) + groups_for_keys(all_keys).to_h do |server, keys| + # It's worth noting that we could potentially reduce bytes + # on the wire by switching from getkq to getq, and using + # the opaque value to match requests to responses. + [server, server.pipelined_get_request(keys)] end end - ## - # This loops through the servers that have keys in - # our set, sending the noop to terminate the set of queries. - ## - def finish_queries(servers) - deleted = [] - - servers.each do |server| - next unless server.connected? - - begin - finish_query_for_server(server) - rescue Dalli::NetworkError - raise - rescue Dalli::DalliError - deleted.append(server) - end - end - - servers.delete_if { |server| deleted.include?(server) } - rescue Dalli::NetworkError - abort_without_timeout(servers) - raise - end - def finish_query_for_server(server) - server.pipeline_response_setup + server.finish_pipeline_request rescue Dalli::NetworkError raise rescue Dalli::DalliError => e @@ -92,29 +50,101 @@ def abort_without_timeout(servers) servers.each(&:pipeline_abort) end - def fetch_responses(servers, start_time, timeout, &block) + def fetch_responses(requests, timeout, &block) + # FIXME: this was here. why. where should it go? # Remove any servers which are not connected + # servers.delete_if { |s| !s.connected? } + + start_time = Time.now + servers = requests.keys + + # FIXME: this was executed before the finish request was sent. Why? servers.delete_if { |s| !s.connected? } - return [] if servers.empty? - time_left = remaining_time(start_time, timeout) - readable_servers = servers_with_response(servers, time_left) - if readable_servers.empty? + # could be postponed to after the first write + servers.each(&:pipeline_response_setup) + + until servers.empty? + time_left = remaining_time(start_time, timeout) + servers = read_write_select(servers, requests, time_left, &block) + end + rescue NetworkError + # Abort and raise if we encountered a network error. This triggers + # a retry at the top level. + abort_without_timeout(servers) + raise + end + + def read_write_select(servers, requests, time_left, &block) + # TODO: - This is a bit challenging. Essentially the PipelinedGetter + # is a reactor, but without the benefit of a Fiber or separate thread. + # My suspicion is that we may want to try and push this down into the + # individual servers, but I'm not sure. For now, we keep the + # mapping between the alerted object (the socket) and the + # corrresponding server here. + server_map = servers.each_with_object({}) { |s, h| h[s.sock] = s } + + readable, writable, = IO.select(server_map.keys, server_map.keys, + nil, time_left) + + if readable.nil? abort_with_timeout(servers) return [] end - # Loop through the servers with responses, and - # delete any from our list that are finished - readable_servers.each do |server| + writable.each do |socket| + server = server_map[socket] + process_writable(server, servers, requests) + end + + readable.each do |socket| + server = server_map[socket] + servers.delete(server) if process_server(server, &block) end + servers - rescue NetworkError - # Abort and raise if we encountered a network error. This triggers - # a retry at the top level. + end + + def process_writable(server, servers, requests) + request = requests[server] + return unless request + + new_request = server_pipelined_get(server, request) + + if new_request.empty? + requests.delete(server) + + begin + finish_query_for_server(server) + rescue Dalli::NetworkError + raise + rescue Dalli::DalliError + servers.delete(server) + end + else + requests[server] = new_request + end + rescue Dalli::NetworkError abort_without_timeout(servers) raise + rescue DalliError => e + Dalli.logger.debug { e.inspect } + Dalli.logger.debug { "unable to get keys for server #{server.name}" } + end + + def server_pipelined_get(server, request) + buffer_size = server.socket_sndbuf + chunk = request[0..buffer_size] + written = server.request(:pipelined_get, chunk) + return if written == :wait_writable + + request[written..] + rescue Dalli::NetworkError + raise + rescue DalliError => e + Dalli.logger.debug { e.inspect } + Dalli.logger.debug { "unable to get keys for server #{server.name}" } end def remaining_time(start, timeout) @@ -144,23 +174,6 @@ def process_server(server) server.pipeline_complete? end - def servers_with_response(servers, timeout) - return [] if servers.empty? - - # TODO: - This is a bit challenging. Essentially the PipelinedGetter - # is a reactor, but without the benefit of a Fiber or separate thread. - # My suspicion is that we may want to try and push this down into the - # individual servers, but I'm not sure. For now, we keep the - # mapping between the alerted object (the socket) and the - # corrresponding server here. - server_map = servers.each_with_object({}) { |s, h| h[s.sock] = s } - - readable, = IO.select(server_map.keys, nil, nil, timeout) - return [] if readable.nil? - - readable.map { |sock| server_map[sock] } - end - def groups_for_keys(*keys) keys.flatten! keys.map! { |a| @key_manager.validate_key(a.to_s) } diff --git a/lib/dalli/protocol/base.rb b/lib/dalli/protocol/base.rb index 74274f80..94defc2b 100644 --- a/lib/dalli/protocol/base.rb +++ b/lib/dalli/protocol/base.rb @@ -18,7 +18,8 @@ class Base def_delegators :@value_marshaller, :serializer, :compressor, :compression_min_size, :compress_by_default? def_delegators :@connection_manager, :name, :sock, :hostname, :port, :close, :connected?, :socket_timeout, - :socket_type, :up!, :down!, :write, :reconnect_down_server?, :raise_down_error + :socket_type, :socket_sndbuf, :up!, :down!, :write, :write_nonblock, :reconnect_down_server?, + :raise_down_error def initialize(attribs, client_options = {}) hostname, port, socket_type, @weight, user_creds = ServerConfigParser.parse(attribs) @@ -66,17 +67,19 @@ def lock!; end def unlock!; end - # Start reading key/value pairs from this connection. This is usually called - # after a series of GETKQ commands. A NOOP is sent, and the server begins - # flushing responses for kv pairs that were found. + # Get ready to read key/value pairs from this connection. + # This is usually called before or after the first GETKQ command. # # Returns nothing. def pipeline_response_setup verify_pipelined_state(:getkq) - write_noop response_buffer.reset end + def finish_pipeline_request + write_noop + end + # Attempt to receive and parse as many key/value pairs as possible # from this server. After #pipeline_response_setup, this should be invoked # repeatedly whenever this server's socket is readable until @@ -149,6 +152,14 @@ def quiet? end alias multi? quiet? + def pipelined_get_request(keys) + req = +String.new(capacity: pipelined_get_capacity(keys)) + keys.each do |key| + req << quiet_get_request(key) + end + req + end + # NOTE: Additional public methods should be overridden in Dalli::Threadsafe private @@ -210,13 +221,14 @@ def connect up! end - def pipelined_get(keys) - req = +'' - keys.each do |key| - req << quiet_get_request(key) - end - # Could send noop here instead of in pipeline_response_setup - write(req) + def pipelined_get(bytes) + write_nonblock(bytes) + rescue SystemCallError, Timeout::Error, EOFError => e + @connection_manager.error_on_request!(e) + end + + def pipelined_get_capacity(keys) + (keys.size * request_header_size) + keys.reduce(0) { |acc, k| acc + k.size } end def response_buffer diff --git a/lib/dalli/protocol/binary.rb b/lib/dalli/protocol/binary.rb index 66f71516..fe6c86e9 100644 --- a/lib/dalli/protocol/binary.rb +++ b/lib/dalli/protocol/binary.rb @@ -163,6 +163,10 @@ def write_noop write(req) end + def request_header_size + 24 + end + require_relative 'binary/request_formatter' require_relative 'binary/response_header' require_relative 'binary/response_processor' diff --git a/lib/dalli/protocol/connection_manager.rb b/lib/dalli/protocol/connection_manager.rb index fe8bd911..ae66d625 100644 --- a/lib/dalli/protocol/connection_manager.rb +++ b/lib/dalli/protocol/connection_manager.rb @@ -172,6 +172,10 @@ def read_nonblock @sock.read_available end + def write_nonblock(bytes) + @sock.write_nonblock(bytes, exception: false) + end + def max_allowed_failures @max_allowed_failures ||= @options[:socket_max_failures] || 2 end @@ -250,6 +254,12 @@ def log_up_detected time = Time.now - @down_at Dalli.logger.warn { format('%s is back (downtime was %