From 24b0c424b34162b8d52ceb0ad62d78cf91c0c3e4 Mon Sep 17 00:00:00 2001 From: Dong Wook Koo Date: Mon, 17 Apr 2017 10:45:24 -0400 Subject: [PATCH 1/8] Implementation of exponential backoff retry. --- lib/sneakers/handlers/expbackoff.rb | 225 ++++++++++++++++++++++++++++ 1 file changed, 225 insertions(+) create mode 100644 lib/sneakers/handlers/expbackoff.rb diff --git a/lib/sneakers/handlers/expbackoff.rb b/lib/sneakers/handlers/expbackoff.rb new file mode 100644 index 00000000..2735ccdc --- /dev/null +++ b/lib/sneakers/handlers/expbackoff.rb @@ -0,0 +1,225 @@ +require 'base64' +require 'json' + +module Sneakers + module Handlers + # + # Maxretry uses dead letter policies on Rabbitmq to requeue and retry + # messages after failure (rejections, errors and timeouts). When the maximum + # number of retries is reached it will put the message on an error queue. + # This handler will only retry at the queue level. To accomplish that, the + # setup is a bit complex. + # + # Input: + # worker_exchange (eXchange) + # worker_queue (Queue) + # We create: + # worker_queue-retry - (X) where we setup the worker queue to dead-letter. + # worker_queue-retry - (Q) queue bound to ^ exchange, dead-letters to + # worker_queue-retry-requeue. + # worker_queue-error - (X) where to send max-retry failures + # worker_queue-error - (Q) bound to worker_queue-error. + # worker_queue-retry-requeue - (X) exchange to bind worker_queue to for + # requeuing directly to the worker_queue. + # + # This requires that you setup arguments to the worker queue to line up the + # dead letter queue. See the example for more information. + # + # Many of these can be override with options: + # - retry_exchange - sets retry exchange & queue + # - retry_error_exchange - sets error exchange and queue + # - retry_requeue_exchange - sets the exchange created to re-queue things + # back to the worker queue. + # + # initial_delay - 60 sec, etc + # backoff_factor - 2, x2, x3, x4 etc + # + + # - retry_backoff_base = 0, 30, 60, 120, 180, etc defaults to 0 + # - retry_backoff_step = 1, 2, 3, etc defaults to 1 + + class Expbackoff + + def initialize(channel, queue, opts) + @worker_queue_name = queue.name + Sneakers.logger.debug do + "#{log_prefix} creating handler, opts=#{opts}" + end + + @channel = channel + @opts = opts + + # Construct names, defaulting where suitable + retry_name = @opts[:retry_backoff_exchange] || "#{@worker_queue_name}-backoff" + error_name = @opts[:retry_error_exchange] || "#{@worker_queue_name}-error" + requeue_name = @opts[:retry_requeue_exchange] || "#{@worker_queue_name}-retry-requeue" + retry_routing_key = @opts[:retry_routing_key] || "#" + + @max_retries = @opts[:retry_max_times] || 5 + @backoff_base = @opts[:retry_backoff_base] || 0 + @backoff_step = @opts[:retry_backoff_step] || 1 + + backoffs = Expbackoff.backoff_periods(@max_retries, @backoff_base, @backoff_step) + + # Create the exchanges + Sneakers.logger.debug { "#{log_prefix} creating exchange=#{retry_name}" } + @retry_exchange = @channel.exchange(retry_name, :type => 'headers', :durable => exchange_durable?) + @error_exchange, @requeue_exchange = [error_name, requeue_name].map do |name| + Sneakers.logger.debug { "#{log_prefix} creating exchange=#{name}" } + @channel.exchange(name, + :type => 'topic', + :durable => exchange_durable?) + end + + backoffs.each do |bo| + # Create the queues and bindings + Sneakers.logger.debug do + "#{log_prefix} creating queue=#{retry_name}-#{bo} x-dead-letter-exchange=#{requeue_name}" + end + retry_queue = @channel.queue("#{retry_name}-#{bo}", + :durable => queue_durable?, + :arguments => { + :'x-dead-letter-exchange' => requeue_name, + :'x-message-ttl' => bo * 1000 + }) + retry_queue.bind(@retry_exchange, :arguments => { :backoff => bo }) + end + + Sneakers.logger.debug do + "#{log_prefix} creating queue=#{error_name}" + end + @error_queue = @channel.queue(error_name, + :durable => queue_durable?) + @error_queue.bind(@error_exchange, :routing_key => '#') + + # Finally, bind the worker queue to our requeue exchange + queue.bind(@requeue_exchange, :routing_key => retry_routing_key) + + end + + def acknowledge(hdr, props, msg) + @channel.acknowledge(hdr.delivery_tag, false) + end + + def reject(hdr, props, msg, requeue = false) + if requeue + # This was explicitly rejected specifying it be requeued so we do not + # want it to pass through our retry logic. + @channel.reject(hdr.delivery_tag, requeue) + else + handle_retry(hdr, props, msg, :reject) + end + end + + + def error(hdr, props, msg, err) + handle_retry(hdr, props, msg, err) + end + + def timeout(hdr, props, msg) + handle_retry(hdr, props, msg, :timeout) + end + + def noop(hdr, props, msg) + + end + + ##################################################### + # formula + # base X = 0, 30, 60, 120, 180, etc defaults to 0 + # step Y = 1, 2, 3, etc defaults to 1 + # (X + 15) * 2 ** (count + Y) + def self.backoff_periods(max_retries, backoff_base, backoff_step) + (1..max_retries).map{ |c| next_ttl(c, backoff_base, backoff_step) } + end + + def self.next_ttl(count, backoff_base, backoff_step) + (backoff_base + 15) * 2 ** (count + backoff_step) + end + + ##################################################### + + # Helper logic for retry handling. This will reject the message if there + # are remaining retries left on it, otherwise it will publish it to the + # error exchange along with the reason. + # @param hdr [Bunny::DeliveryInfo] + # @param props [Bunny::MessageProperties] + # @param msg [String] The message + # @param reason [String, Symbol, Exception] Reason for the retry, included + # in the JSON we put on the error exchange. + def handle_retry(hdr, props, msg, reason) + # +1 for the current attempt + num_attempts = failure_count(props[:headers]) + 1 + if num_attempts <= @max_retries + # We call reject which will route the message to the + # x-dead-letter-exchange (ie. retry exchange) on the queue + Sneakers.logger.info do + "#{log_prefix} msg=retrying, count=#{num_attempts}, headers=#{props[:headers]}" + end + backoff_ttl = Expbackoff.next_ttl(num_attempts, @backoff_base, @backoff_step) + @retry_exchange.publish(msg, :routing_key => hdr.routing_key, :headers => { :backoff => backoff_ttl, :count => num_attempts }) + @channel.acknowledge(hdr.delivery_tag, false) + # TODO: metrics + else + # Retried more than the max times + # Publish the original message with the routing_key to the error exchange + Sneakers.logger.info do + "#{log_prefix} msg=failing, retry_count=#{num_attempts}, reason=#{reason}" + end + data = { + error: reason, + num_attempts: num_attempts, + failed_at: Time.now.iso8601, + payload: Base64.encode64(msg.to_s), + properties: Base64.encode64(props.to_json) + }.tap do |hash| + if reason.is_a?(Exception) + hash[:error_class] = reason.class.to_s + hash[:error_message] = "#{reason}" + if reason.backtrace + hash[:backtrace] = reason.backtrace.take(10).join(', ') + end + end + end.to_json + @error_exchange.publish(data, :routing_key => hdr.routing_key) + @channel.acknowledge(hdr.delivery_tag, false) + # TODO: metrics + end + end + private :handle_retry + + # Uses the x-death header to determine the number of failures this job has + # seen in the past. This does not count the current failure. So for + # instance, the first time the job fails, this will return 0, the second + # time, 1, etc. + # @param headers [Hash] Hash of headers that Rabbit delivers as part of + # the message + # @return [Integer] Count of number of failures. + def failure_count(headers) + if headers.nil? || headers['count'].nil? + 0 + else + headers['count'] + end + end + private :failure_count + + # Prefix all of our log messages so they are easier to find. We don't have + # the worker, so the next best thing is the queue name. + def log_prefix + "Expbackoff handler [queue=#{@worker_queue_name}]" + end + private :log_prefix + + private + + def queue_durable? + @opts.fetch(:queue_options, {}).fetch(:durable, false) + end + + def exchange_durable? + queue_durable? + end + end + end +end From 2e62f1752e1e03db4f392e0b921d4266c3342c05 Mon Sep 17 00:00:00 2001 From: Dong Wook Koo Date: Mon, 17 Apr 2017 13:27:49 -0400 Subject: [PATCH 2/8] Example for Expbackoff handler. --- examples/exp_backoff_handler.rb | 69 +++++++++++++++++++++++++++++ lib/sneakers/handlers/expbackoff.rb | 5 ++- 2 files changed, 73 insertions(+), 1 deletion(-) create mode 100644 examples/exp_backoff_handler.rb diff --git a/examples/exp_backoff_handler.rb b/examples/exp_backoff_handler.rb new file mode 100644 index 00000000..4808ebc4 --- /dev/null +++ b/examples/exp_backoff_handler.rb @@ -0,0 +1,69 @@ +$: << File.expand_path('../lib', File.dirname(__FILE__)) +require 'sneakers' +require 'sneakers/runner' +require 'sneakers/handlers/expbackoff' +require 'logger' + +Sneakers.configure(:handler => Sneakers::Handlers::Expbackoff, + :workers => 1, + :threads => 1, + :prefetch => 1, + :exchange => 'sneakers-exp', + :exchange_options => { :type => 'topic', durable: true }, + :routing_key => ['#', 'something'], + :retry_max_times => 3, + :retry_backoff_multiplier => 100 + ) +Sneakers.logger.level = Logger::DEBUG + +WORKER_OPTIONS = { + :ack => true, + :threads => 1, + :prefetch => 1, + :timeout_job_after => 60, + :heartbeat => 5, + :amqp_heartbeat => 10 +} + +# Example of how to write a retry worker. If your rabbit system is empty, then +# you must run this twice. Once to setup the exchanges, queues and bindings a +# second time to have the sent message end up on the downloads queue. +# +# Run this via: +# bundle exec ruby examples/exp_backoff_handler.rb +# +class ExpBackoffWorker + include Sneakers::Worker + from_queue 'downloads-exp', WORKER_OPTIONS + + def work(msg) + logger.info("ExpBackoffWorker rejecting msg: #{msg.inspect}") + + # We always want to reject to see if we do the proper timeout + reject! + end +end + +# Example of a worker on the same exchange that does not fail, so it should only +# see the message once. +class SucceedingWorker + include Sneakers::Worker + from_queue 'uploads-exp', WORKER_OPTIONS + + def work(msg) + logger.info("SucceedingWorker succeeding on msg: #{msg.inspect}") + ack! + end +end + +messages = 1 +puts "feeding messages in" +messages.times { + Sneakers.publish(" -- message -- ", + :to_queue => 'anywhere', + :persistence => true) +} +puts "done" + +r = Sneakers::Runner.new([ExpBackoffWorker, SucceedingWorker]) +r.run diff --git a/lib/sneakers/handlers/expbackoff.rb b/lib/sneakers/handlers/expbackoff.rb index 2735ccdc..9f5af041 100644 --- a/lib/sneakers/handlers/expbackoff.rb +++ b/lib/sneakers/handlers/expbackoff.rb @@ -59,6 +59,9 @@ def initialize(channel, queue, opts) @backoff_base = @opts[:retry_backoff_base] || 0 @backoff_step = @opts[:retry_backoff_step] || 1 + # This is for example/dev/test + @backoff_multiplier = @opts[:retry_backoff_multiplier] || 1000 + backoffs = Expbackoff.backoff_periods(@max_retries, @backoff_base, @backoff_step) # Create the exchanges @@ -80,7 +83,7 @@ def initialize(channel, queue, opts) :durable => queue_durable?, :arguments => { :'x-dead-letter-exchange' => requeue_name, - :'x-message-ttl' => bo * 1000 + :'x-message-ttl' => bo * @backoff_multiplier }) retry_queue.bind(@retry_exchange, :arguments => { :backoff => bo }) end From e5a2e202268b7485c283936bc22caf0153c0b8c4 Mon Sep 17 00:00:00 2001 From: Dong Wook Koo Date: Mon, 17 Apr 2017 18:09:00 -0400 Subject: [PATCH 3/8] Spec for Expbackoff. --- spec/sneakers/worker_handlers_spec.rb | 301 ++++++++++++++++++++++++++ 1 file changed, 301 insertions(+) diff --git a/spec/sneakers/worker_handlers_spec.rb b/spec/sneakers/worker_handlers_spec.rb index 5e4377f4..743f9d32 100644 --- a/spec/sneakers/worker_handlers_spec.rb +++ b/spec/sneakers/worker_handlers_spec.rb @@ -3,6 +3,7 @@ require 'timeout' require 'sneakers/handlers/oneshot' require 'sneakers/handlers/maxretry' +require 'sneakers/handlers/expbackoff' require 'json' @@ -394,4 +395,304 @@ def publish(data, opts) end end + + describe 'Expbackoff' do + let(:max_retries) { 3 } + let(:props_with_x_death_count) { + { + :headers => { + 'count' => 3, + :backoff => 240, + "x-death" => [ + { + "count" => 1, + "reason" => "expired", + "queue" => "downloads-backoff-240", + "time" => Time.now, + "exchange" => "downloads-backoff", + "routing-keys" => ["downloads"] + } + ] + }, + :delivery_mode => 1 + } + } + + before(:each) do + @opts = { + :exchange => 'sneakers', + :queue_options => { + :durable => 'true', + } + }.tap do |opts| + opts[:retry_max_times] = max_retries unless max_retries.nil? + end + + mock(queue).name { 'downloads' } + + @backoff_exchange = Object.new + @error_exchange = Object.new + @requeue_exchange = Object.new + + @backoff_60_queue = Object.new + @backoff_120_queue = Object.new + @backoff_240_queue = Object.new + @error_queue = Object.new + + mock(channel).exchange('downloads-backoff', + :type => 'headers', + :durable => 'true').once { @backoff_exchange } + mock(channel).exchange('downloads-error', + :type => 'topic', + :durable => 'true').once { @error_exchange } + mock(channel).exchange('downloads-retry-requeue', + :type => 'topic', + :durable => 'true').once { @requeue_exchange } + + mock(channel).queue('downloads-backoff-60', + :durable => 'true', + :arguments => { + :'x-dead-letter-exchange' => 'downloads-retry-requeue', + :'x-message-ttl' => 60000 + } + ).once { @backoff_60_queue } + mock(@backoff_60_queue).bind(@backoff_exchange, :arguments => {:backoff => 60}) + + mock(channel).queue('downloads-backoff-120', + :durable => 'true', + :arguments => { + :'x-dead-letter-exchange' => 'downloads-retry-requeue', + :'x-message-ttl' => 120000 + } + ).once { @backoff_120_queue } + mock(@backoff_120_queue).bind(@backoff_exchange, :arguments => {:backoff => 120}) + + mock(channel).queue('downloads-backoff-240', + :durable => 'true', + :arguments => { + :'x-dead-letter-exchange' => 'downloads-retry-requeue', + :'x-message-ttl' => 240000 + } + ).once { @backoff_240_queue } + mock(@backoff_240_queue).bind(@backoff_exchange, :arguments => {:backoff => 240}) + + mock(channel).queue('downloads-error', + :durable => 'true').once { @error_queue } + mock(@error_queue).bind(@error_exchange, :routing_key => '#') + + mock(queue).bind(@requeue_exchange, :routing_key => '#') + + @handler = Sneakers::Handlers::Expbackoff.new(channel, queue, @opts) + + @header = Object.new + stub(@header).delivery_tag { 37 } + + @props = {} + @props_with_x_death = { + :headers => { + 'count' => 1, + :backoff => 60, + "x-death" => [ + { + "count" => 1, + "reason" => "expired", + "queue" => "downloads-backoff-60", + "time" => Time.now, + "exchange" => "downloads-backoff", + "routing-keys" => ["downloads"] + } + ] + }, + :delivery_mode => 1} + end + + # it 'allows overriding the retry exchange name' + # it 'allows overriding the error exchange name' + # it 'allows overriding the retry timeout' + + describe '#do_work' do + before do + @now = Time.now + end + + # Used to stub out the publish method args. Sadly RR doesn't support + # this, only proxying existing methods. + module MockPublish + attr_reader :data, :opts, :called + + def publish(data, opts) + @data = data + @opts = opts + @called = true + end + end + + it 'should work and handle acks' do + mock(channel).acknowledge(37, false) + + worker.do_work(@header, @props, :ack, @handler) + end + + describe 'rejects' do + describe 'more retries ahead' do + it 'should work and handle rejects' do + mock(@header).routing_key { '#' } + mock(channel).acknowledge(37, false) + + @backoff_exchange.extend MockPublish + worker.do_work(@header, @props_with_x_death, :reject, @handler) + end + end + + describe 'no more retries' do + let(:max_retries) { 3 } + + it 'sends the rejection to the error queue' do + mock(@header).routing_key { '#' } + mock(channel).acknowledge(37, false) + + @error_exchange.extend MockPublish + @backoff_exchange.extend MockPublish + worker.do_work(@header, props_with_x_death_count, :reject, @handler) + @error_exchange.called.must_equal(true) + @error_exchange.opts.must_equal({ :routing_key => '#' }) + data = JSON.parse(@error_exchange.data) + data['error'].must_equal('reject') + data['num_attempts'].must_equal(4) + data['payload'].must_equal(Base64.encode64(:reject.to_s)) + data['properties'].must_equal(Base64.encode64(props_with_x_death_count.to_json)) + Time.parse(data['failed_at']).wont_be_nil + end + + end + end + + describe 'requeues' do + it 'should work and handle requeues' do + mock(channel).reject(37, true) + + worker.do_work(@header, @props_with_x_death, :requeue, @handler) + end + + describe 'no more retries left' do + let(:max_retries) { 3 } + + it 'continues to reject with requeue' do + mock(channel).reject(37, true) + + worker.do_work(@header, @props_with_x_death, :requeue, @handler) + end + end + + end + + describe 'timeouts' do + describe 'more retries ahead' do + it 'should reject the message' do + mock(@header).routing_key { '#' } + mock(channel).acknowledge(37, false) + + @backoff_exchange.extend MockPublish + worker.do_work(@header, @props_with_x_death, :timeout, @handler) + end + end + + describe 'no more retries left' do + let(:max_retries) { 3 } + + it 'sends the rejection to the error queue' do + mock(@header).routing_key { '#' } + mock(channel).acknowledge(37, false) + @error_exchange.extend MockPublish + @backoff_exchange.extend MockPublish + + worker.do_work(@header, props_with_x_death_count, :timeout, @handler) + @error_exchange.called.must_equal(true) + @error_exchange.opts.must_equal({ :routing_key => '#' }) + data = JSON.parse(@error_exchange.data) + data['error'].must_equal('timeout') + data['num_attempts'].must_equal(4) + data['payload'].must_equal(Base64.encode64(:timeout.to_s)) + data['properties'].must_equal(Base64.encode64(props_with_x_death_count.to_json)) + Time.parse(data['failed_at']).wont_be_nil + end + end + end + + describe 'exceptions' do + describe 'more retries ahead' do + it 'should reject the message' do + mock(@header).routing_key { '#' } + mock(channel).acknowledge(37, false) + @backoff_exchange.extend MockPublish + + worker.do_work(@header, @props_with_x_death, StandardError.new('boom!'), @handler) + end + end + + describe 'no more retries left' do + let(:max_retries) { 3 } + + it 'sends the rejection to the error queue' do + mock(@header).routing_key { '#' } + mock(channel).acknowledge(37, false) + @error_exchange.extend MockPublish + @backoff_exchange.extend MockPublish + + worker.do_work(@header, props_with_x_death_count, StandardError.new('boom!'), @handler) + @error_exchange.called.must_equal(true) + @error_exchange.opts.must_equal({ :routing_key => '#' }) + data = JSON.parse(@error_exchange.data) + data['error'].must_equal('boom!') + data['error_class'].must_equal(StandardError.to_s) + data['backtrace'].wont_be_nil + data['num_attempts'].must_equal(4) + data['payload'].must_equal(Base64.encode64('boom!')) + data['properties'].must_equal(Base64.encode64(props_with_x_death_count.to_json)) + Time.parse(data['failed_at']).wont_be_nil + end + end + end + + it 'should work and handle user-land error' do + mock(@header).routing_key { '#' } + mock(channel).acknowledge(37, false) + @backoff_exchange.extend MockPublish + + worker.do_work(@header, @props, StandardError.new('boom!'), @handler) + end + + it 'should work and handle noops' do + @backoff_exchange.extend MockPublish + worker.do_work(@header, @props, :wait, @handler) + end + + # Since we encode in json, we want to make sure if the actual payload is + # json, then it's something you can get back out. + describe 'JSON payloads' do + let(:max_retries) { 3 } + + it 'properly encodes the json payload' do + mock(@header).routing_key { '#' } + mock(channel).acknowledge(37, false) + @error_exchange.extend MockPublish + + payload = { + data: 'hello', + response: :timeout + } + worker.do_work(@header, props_with_x_death_count, payload.to_json, @handler) + @error_exchange.called.must_equal(true) + @error_exchange.opts.must_equal({ :routing_key => '#' }) + data = JSON.parse(@error_exchange.data) + data['error'].must_equal('timeout') + data['num_attempts'].must_equal(4) + data['payload'].must_equal(Base64.encode64(payload.to_json)) + data['properties'].must_equal(Base64.encode64(props_with_x_death_count.to_json)) + end + + end + + end + end end From ba1a2f161fcd27e49ee052975ddc128e91f52909 Mon Sep 17 00:00:00 2001 From: Dong Wook Koo Date: Tue, 18 Apr 2017 11:42:18 -0400 Subject: [PATCH 4/8] add header for last execution to error log. --- lib/sneakers/handlers/expbackoff.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/sneakers/handlers/expbackoff.rb b/lib/sneakers/handlers/expbackoff.rb index 9f5af041..01803738 100644 --- a/lib/sneakers/handlers/expbackoff.rb +++ b/lib/sneakers/handlers/expbackoff.rb @@ -167,7 +167,7 @@ def handle_retry(hdr, props, msg, reason) # Retried more than the max times # Publish the original message with the routing_key to the error exchange Sneakers.logger.info do - "#{log_prefix} msg=failing, retry_count=#{num_attempts}, reason=#{reason}" + "#{log_prefix} msg=failing, retry_count=#{num_attempts}, headers=#{props[:headers]}, reason=#{reason}" end data = { error: reason, From f589aab33db29f0cf228265f4987ebc295d7af47 Mon Sep 17 00:00:00 2001 From: Dong Wook Koo Date: Tue, 18 Apr 2017 11:43:45 -0400 Subject: [PATCH 5/8] Queue name change to match max retry handler. Also keep the x-dead-letter-exchange from max retry handler to make it interchangeable. --- examples/exp_backoff_handler.rb | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/examples/exp_backoff_handler.rb b/examples/exp_backoff_handler.rb index 4808ebc4..06afd33c 100644 --- a/examples/exp_backoff_handler.rb +++ b/examples/exp_backoff_handler.rb @@ -8,7 +8,7 @@ :workers => 1, :threads => 1, :prefetch => 1, - :exchange => 'sneakers-exp', + :exchange => 'sneakers', :exchange_options => { :type => 'topic', durable: true }, :routing_key => ['#', 'something'], :retry_max_times => 3, @@ -29,12 +29,20 @@ # you must run this twice. Once to setup the exchanges, queues and bindings a # second time to have the sent message end up on the downloads queue. # +# x-dead-letter-exchange is not needed but keeping it so that handler can be +# interchangeable with the Maxretry handler for the queue. +# # Run this via: # bundle exec ruby examples/exp_backoff_handler.rb # class ExpBackoffWorker include Sneakers::Worker - from_queue 'downloads-exp', WORKER_OPTIONS + from_queue 'downloads', + WORKER_OPTIONS.merge({ + :arguments => { + :'x-dead-letter-exchange' => 'downloads-retry' + }, + }) def work(msg) logger.info("ExpBackoffWorker rejecting msg: #{msg.inspect}") @@ -48,7 +56,12 @@ def work(msg) # see the message once. class SucceedingWorker include Sneakers::Worker - from_queue 'uploads-exp', WORKER_OPTIONS + from_queue 'uploads', + WORKER_OPTIONS.merge({ + :arguments => { + :'x-dead-letter-exchange' => 'uploads-retry' + }, + }) def work(msg) logger.info("SucceedingWorker succeeding on msg: #{msg.inspect}") From 18fb2e42a1861475400acdfb6665409c87f12d48 Mon Sep 17 00:00:00 2001 From: Dong Wook Koo Date: Tue, 18 Apr 2017 15:54:57 -0400 Subject: [PATCH 6/8] Simplify by eliminating unnecessary parameter. --- lib/sneakers/handlers/expbackoff.rb | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/lib/sneakers/handlers/expbackoff.rb b/lib/sneakers/handlers/expbackoff.rb index 01803738..3870e53f 100644 --- a/lib/sneakers/handlers/expbackoff.rb +++ b/lib/sneakers/handlers/expbackoff.rb @@ -36,7 +36,7 @@ module Handlers # # - retry_backoff_base = 0, 30, 60, 120, 180, etc defaults to 0 - # - retry_backoff_step = 1, 2, 3, etc defaults to 1 + # class Expbackoff @@ -57,12 +57,11 @@ def initialize(channel, queue, opts) @max_retries = @opts[:retry_max_times] || 5 @backoff_base = @opts[:retry_backoff_base] || 0 - @backoff_step = @opts[:retry_backoff_step] || 1 # This is for example/dev/test @backoff_multiplier = @opts[:retry_backoff_multiplier] || 1000 - backoffs = Expbackoff.backoff_periods(@max_retries, @backoff_base, @backoff_step) + backoffs = Expbackoff.backoff_periods(@max_retries, @backoff_base) # Create the exchanges Sneakers.logger.debug { "#{log_prefix} creating exchange=#{retry_name}" } @@ -130,14 +129,13 @@ def noop(hdr, props, msg) ##################################################### # formula # base X = 0, 30, 60, 120, 180, etc defaults to 0 - # step Y = 1, 2, 3, etc defaults to 1 - # (X + 15) * 2 ** (count + Y) - def self.backoff_periods(max_retries, backoff_base, backoff_step) - (1..max_retries).map{ |c| next_ttl(c, backoff_base, backoff_step) } + # (X + 15) * 2 ** (count + 1) + def self.backoff_periods(max_retries, backoff_base) + (1..max_retries).map{ |c| next_ttl(c, backoff_base) } end - def self.next_ttl(count, backoff_base, backoff_step) - (backoff_base + 15) * 2 ** (count + backoff_step) + def self.next_ttl(count, backoff_base) + (backoff_base + 15) * 2 ** (count + 1) end ##################################################### @@ -159,7 +157,7 @@ def handle_retry(hdr, props, msg, reason) Sneakers.logger.info do "#{log_prefix} msg=retrying, count=#{num_attempts}, headers=#{props[:headers]}" end - backoff_ttl = Expbackoff.next_ttl(num_attempts, @backoff_base, @backoff_step) + backoff_ttl = Expbackoff.next_ttl(num_attempts, @backoff_base) @retry_exchange.publish(msg, :routing_key => hdr.routing_key, :headers => { :backoff => backoff_ttl, :count => num_attempts }) @channel.acknowledge(hdr.delivery_tag, false) # TODO: metrics From 1e322df88380def7ed088d3c8b053eb0689235cd Mon Sep 17 00:00:00 2001 From: Dong Wook Koo Date: Fri, 21 Apr 2017 10:47:04 -0400 Subject: [PATCH 7/8] comments --- lib/sneakers/handlers/expbackoff.rb | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/lib/sneakers/handlers/expbackoff.rb b/lib/sneakers/handlers/expbackoff.rb index 3870e53f..3b577ced 100644 --- a/lib/sneakers/handlers/expbackoff.rb +++ b/lib/sneakers/handlers/expbackoff.rb @@ -3,10 +3,14 @@ module Sneakers module Handlers + # + # Expbackoff is based on Maxretry and modified to setup a different set of + # queues and exchange to achieve exponential backoff with the dead letter policies. # # Maxretry uses dead letter policies on Rabbitmq to requeue and retry # messages after failure (rejections, errors and timeouts). When the maximum # number of retries is reached it will put the message on an error queue. + # # This handler will only retry at the queue level. To accomplish that, the # setup is a bit complex. # @@ -14,9 +18,10 @@ module Handlers # worker_exchange (eXchange) # worker_queue (Queue) # We create: - # worker_queue-retry - (X) where we setup the worker queue to dead-letter. - # worker_queue-retry - (Q) queue bound to ^ exchange, dead-letters to - # worker_queue-retry-requeue. + # worker_queue-backoff - (X) where we setup the worker queue to dead-letter. + # worker_queue-backoff-### - (Q) queues bound to ^ exchange with ttl specified + # where ### denotes the backoff period, + # dead-letters to worker_queue-retry-requeue. # worker_queue-error - (X) where to send max-retry failures # worker_queue-error - (Q) bound to worker_queue-error. # worker_queue-retry-requeue - (X) exchange to bind worker_queue to for @@ -26,16 +31,17 @@ module Handlers # dead letter queue. See the example for more information. # # Many of these can be override with options: - # - retry_exchange - sets retry exchange & queue + # - retry_backoff_exchange - sets retry exchange & queues # - retry_error_exchange - sets error exchange and queue # - retry_requeue_exchange - sets the exchange created to re-queue things # back to the worker queue. # - # initial_delay - 60 sec, etc - # backoff_factor - 2, x2, x3, x4 etc + # Backoff periods can be calculated using + # Sneakers::Handlers::Expbackoff.backoff_periods(retry_max_times, retry_backoff_base) # - - # - retry_backoff_base = 0, 30, 60, 120, 180, etc defaults to 0 + # retry_max_times = 5, retry_backoff_base = 0 yields [60, 120, 240, 480, 960] + # + # - retry_backoff_base = 0, 15, 30, 45, 60, ... 120, 180, etc defaults to 0 # class Expbackoff From 6b97125ef68783132c0fa4fe7f8a7dee06c340ec Mon Sep 17 00:00:00 2001 From: Dong Wook Koo Date: Tue, 28 Nov 2017 17:35:43 -0500 Subject: [PATCH 8/8] examples in comment --- lib/sneakers/handlers/expbackoff.rb | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/lib/sneakers/handlers/expbackoff.rb b/lib/sneakers/handlers/expbackoff.rb index 3b577ced..884fa522 100644 --- a/lib/sneakers/handlers/expbackoff.rb +++ b/lib/sneakers/handlers/expbackoff.rb @@ -7,7 +7,7 @@ module Handlers # Expbackoff is based on Maxretry and modified to setup a different set of # queues and exchange to achieve exponential backoff with the dead letter policies. # - # Maxretry uses dead letter policies on Rabbitmq to requeue and retry + # Expbackoff/Maxretry uses dead letter policies on Rabbitmq to requeue and retry # messages after failure (rejections, errors and timeouts). When the maximum # number of retries is reached it will put the message on an error queue. # @@ -41,8 +41,18 @@ module Handlers # # retry_max_times = 5, retry_backoff_base = 0 yields [60, 120, 240, 480, 960] # - # - retry_backoff_base = 0, 15, 30, 45, 60, ... 120, 180, etc defaults to 0 + # - retry_backoff_base = value can be one of 0, 15, 30, 45, 60, ... 120, 180, etc. defaults to 0 # + # with queue name 'carrot' and backoff periods of [60, 120, 240, 480, 960], following exchanges and queues are created: + # (X) carrot-backoff + # (X) carrot-error + # (X) carrot-retry-requeue + # (Q) carrot-backoff-60 + # (Q) carrot-backoff-120 + # (Q) carrot-backoff-240 + # (Q) carrot-backoff-480 + # (Q) carrot-backoff-960 + # (Q) carrot-error class Expbackoff