diff --git a/lib/sneakers/handlers/routing_maxretry.rb b/lib/sneakers/handlers/routing_maxretry.rb new file mode 100644 index 00000000..40972ca9 --- /dev/null +++ b/lib/sneakers/handlers/routing_maxretry.rb @@ -0,0 +1,208 @@ +require 'json' + +module Sneakers + module Handlers + # This handler does basically the same as MaxRetry handler. But it does not + # create additional exchanges. Instead it uses dead-letter routing keys to + # create the bindings to the retry and error queues. + class RoutingMaxretry # rubocop:disable Metrics/ClassLength + attr_reader :opts, :exchanges, :channel, :queue + + # @param channel [Bunny::Channel] + # @param queue [Bunny::Queue] + # @param worker_opts [Hash] + def initialize(channel, queue, worker_opts) + @channel = channel + @queue = queue + @opts = init_opts(worker_opts) + + Sneakers.logger.debug { "#{log_prefix} creating handler, opts=#{worker_opts}" } + + create_queues_and_bindings + end + + # @param delivery_info [Bunny::DeliveryInfo] + def acknowledge(delivery_info, _, _) + channel.acknowledge(delivery_info.delivery_tag) + end + + # @param delivery_info [Bunny::DeliveryInfo] + # @param message_properties [Bunny::MessageProperties] + # @param message [String] + # @param requeue [Boolean] + def reject(delivery_info, message_properties, message, requeue = false) + if requeue + # This was explicitly rejected specifying it be requeued so we do not + # want it to pass through our retry logic. + channel.reject(delivery_info.delivery_tag, requeue) + else + handle_retry(delivery_info, message_properties, message, :reject) + end + end + + # @param delivery_info [Bunny::DeliveryInfo] + # @param message_properties [Bunny::MessageProperties] + # @param message [String] + # @param error [String, Symbol, Exception] + def error(delivery_info, message_properties, message, error) + handle_retry(delivery_info, message_properties, message, error) + end + + # @param delivery_info [Bunny::DeliveryInfo] + # @param message_properties [Bunny::MessageProperties] + # @param message [String] + def timeout(delivery_info, message_properties, message) + handle_retry(delivery_info, message_properties, message, :timeout) + end + + def noop(_, _, _); end + + private + + def init_opts(worker_opts) + { + error_queue_name: "#{queue.name}.error", + error_routing_key: "queue.#{queue.name}.error", + requeue_routing_key: "queue.#{queue.name}.requeue", + retry_max_times: 5, + retry_queue_name: "#{queue.name}.retry", + retry_routing_key: "queue.#{queue.name}.retry", + retry_timeout: 6000, + worker_queue_name: queue.name + }.merge!(worker_opts) + end + + def create_queues_and_bindings + create_retry_queue_and_binding + create_error_queue_and_binding + + # Route retry messages to worker queue + queue.bind( + opts[:exchange], + routing_key: opts[:requeue_routing_key] + ) + end + + def create_error_queue_and_binding + create_queue_and_binding( + opts[:error_queue_name], + opts[:error_routing_key] + ) + end + + def create_retry_queue_and_binding + create_queue_and_binding( + opts[:retry_queue_name], + opts[:retry_routing_key], + arguments: retry_queue_arguments + ) + end + + def retry_queue_arguments + { + 'x-dead-letter-exchange' => opts[:exchange], + 'x-message-ttl' => opts[:retry_timeout], + 'x-dead-letter-routing-key' => opts[:requeue_routing_key] + } + end + + def create_queue_and_binding(queue_name, routing_key, arguments = {}) + Sneakers.logger.debug do + "#{log_prefix} creating queue=#{queue_name}, arguments=#{arguments}" + end + + created_queue = channel.queue( + queue_name, + { durable: queue_durable? }.merge!(arguments) + ) + created_queue.bind(opts[:exchange], routing_key: routing_key) + end + + def handle_retry(delivery_info, message_properties, message, reason) + num_attempts = failure_count(message_properties.headers) + 1 + if num_attempts <= opts[:retry_max_times] + reject_to_retry(delivery_info, message_properties, num_attempts) + else + publish_to_error_queue(delivery_info, message_properties, message, reason, num_attempts) + end + end + + def publish_to_error_queue(delivery_info, message_properties, message, reason, num_attempts) + Sneakers.logger.info do + "#{log_prefix} message=failing, retry_count=#{num_attempts}, reason=#{reason}" + end + + channel.basic_publish( + error_payload(delivery_info, message_properties, message, reason, num_attempts), + opts[:exchange], + opts[:error_routing_key], + content_type: 'application/json' + ) + + channel.acknowledge(delivery_info.delivery_tag) + end + + def reject_to_retry(delivery_info, message_properties, num_attempts) + Sneakers.logger.info do + "#{log_prefix} msg=retrying, count=#{num_attempts}, headers=#{message_properties.headers}" + end + + channel.reject(delivery_info.delivery_tag) + end + + def error_payload(delivery_info, message_properties, payload, reason, num_attempts) + { + _error: { + reason: reason.to_s, + num_attempts: num_attempts, + failed_at: Time.now.iso8601, + delivery_info: delivery_info.to_hash, + message_properties: message_properties.to_hash, + payload: payload.to_s + }.merge!(exception_payload(reason)) + }.to_json + end + + def exception_payload(reason) + return {} unless reason.is_a?(Exception) + + { + error_class: reason.class.to_s, + error_message: reason.to_s + }.merge!(exception_backtrace(reason)) + end + + def exception_backtrace(reason) + return {} unless reason.backtrace + + { backtrace: reason.backtrace.take(10).join(', ') } + end + + def failure_count(headers) + x_death_array = x_death_array(headers) + + return 0 if x_death_array.count == 0 + + return x_death_array.count unless x_death_array.first['count'] + + x_death_array.first['count'].to_i + end + + def x_death_array(headers) + return [] unless headers && headers['x-death'] + + headers['x-death'].select do |x_death| + x_death['queue'] == opts[:worker_queue_name] + end + end + + def log_prefix + "#{self.class} handler [queue=#{opts[:worker_queue_name]}]" + end + + def queue_durable? + opts.fetch(:queue_options, {}).fetch(:durable, false) + end + end + end +end diff --git a/spec/fixtures/maxretry_worker.rb b/spec/fixtures/maxretry_worker.rb new file mode 100644 index 00000000..11fd064f --- /dev/null +++ b/spec/fixtures/maxretry_worker.rb @@ -0,0 +1,45 @@ +require 'sneakers' +require 'thread' +require 'redis' + +require 'sneakers/handlers/routing_maxretry' + +# This worker ... works +class AlwaysAckWorker + include Sneakers::Worker + + def work(_) + ack! + end +end + +# This worker fails +class AlwaysRejectWorker + include Sneakers::Worker + + def work(_) + reject! + end +end + +# This worker fails once +class RejectOnceWorker + include Sneakers::Worker + + def work_with_params(_, delivery_info, message_properties) + if message_properties[:headers].nil? || + message_properties[:headers]['x-death'].nil? + reject! + else + dump = JSON.dump( + 'delivery_info' => delivery_info.to_hash, + 'message_properties' => message_properties.to_hash + ) + Redis.new.set( + self.class.queue_name, + dump + ) + ack! + end + end +end diff --git a/spec/sneakers/handlers/routing_maxretry_spec.rb b/spec/sneakers/handlers/routing_maxretry_spec.rb new file mode 100644 index 00000000..9074b5ac --- /dev/null +++ b/spec/sneakers/handlers/routing_maxretry_spec.rb @@ -0,0 +1,1001 @@ +require 'spec_helper' +require 'sneakers' +require 'sneakers/handlers/routing_maxretry' + +describe Sneakers::Handlers::RoutingMaxretry do + let(:channel) { Object.new } + let(:queue) { Object.new } + let(:worker_opts) { {} } + let(:opts) { {} } + let(:log) { StringIO.new } + let(:logger) { Logger.new(log) } + + subject do + Sneakers::Handlers::RoutingMaxretry.new(channel, queue, worker_opts) + end + + before do + stub(Sneakers).logger { logger } + end + + describe '#initialize' do + let(:log_prefix) { 'BOOM!' } + + before do + any_instance_of(Sneakers::Handlers::RoutingMaxretry) do |handler| + mock(handler).log_prefix { log_prefix } + mock(handler).init_opts(worker_opts) { opts } + mock(handler).create_queues_and_bindings + end + end + + it 'assigns channel variable' do + assert_equal(channel, subject.channel) + end + + it 'assigns queue variable' do + assert_equal(queue, subject.queue) + end + + it 'assigns opts variable' do + assert_equal(opts, subject.opts) + end + + it 'writes log' do + subject + + log.rewind + + assert_match('BOOM! creating handler, opts={}', log.first) + end + end + + describe '#acknowledge' do + let(:channel) { Minitest::Mock.new } + let(:delivery_tag) { Object.new } + let(:delivery_info) { Minitest::Mock.new } + let(:message_properties) { Object.new } + let(:payload) { Object.new } + + before do + stub(delivery_info).delivery_tag { delivery_tag } + any_instance_of(Sneakers::Handlers::RoutingMaxretry) do |handler| + stub(handler).init_opts(worker_opts) { opts } + stub(handler).create_queues_and_bindings + end + end + + it 'acknowledges message' do + channel.expect(:acknowledge, true, [delivery_tag]) + + subject.acknowledge(delivery_info, message_properties, payload) + + channel.verify + end + end + + describe '#reject' do + let(:channel) { Minitest::Mock.new } + let(:delivery_tag) { Object.new } + let(:delivery_info) { Minitest::Mock.new } + + let(:message_properties) { Object.new } + let(:payload) { 'payload' } + + let(:reject) do + subject.reject(delivery_info, message_properties, payload, requeue) + end + + before do + stub(delivery_info).delivery_tag { delivery_tag } + any_instance_of(Sneakers::Handlers::RoutingMaxretry) do |handler| + stub(handler).init_opts(worker_opts) { opts } + stub(handler).create_queues_and_bindings + end + end + + describe 'when reject requeueing is enabled' do + let(:requeue) { true } + + it 'rejects message' do + channel.expect(:reject, true, [delivery_tag, requeue]) + + reject + + channel.verify + end + end + + describe 'when reject requeueing is disabled' do + let(:requeue) { false } + let(:handle_retry) { Minitest::Mock.new } + + before do + any_instance_of(Sneakers::Handlers::RoutingMaxretry) do |handler| + stub(handler).init_opts(worker_opts) { opts } + stub(handler).create_queues_and_bindings + end + end + + it 'calls "#handle_retry"' do + handle_retry.expect( + :call, + true, + [delivery_info, message_properties, payload, :reject] + ) + + subject.stub(:handle_retry, handle_retry) do + reject + end + + handle_retry.verify + end + end + end + + describe '#error' do + let(:delivery_info) { Object.new } + let(:message_properties) { Object.new } + let(:payload) { 'payload' } + let(:handle_retry) { Minitest::Mock.new } + let(:error) { Object.new } + + before do + any_instance_of(Sneakers::Handlers::RoutingMaxretry) do |handler| + stub(handler).init_opts(worker_opts) { opts } + stub(handler).create_queues_and_bindings + end + end + + it 'calls "#handle_retry"' do + handle_retry.expect( + :call, + true, + [delivery_info, message_properties, payload, error] + ) + + subject.stub(:handle_retry, handle_retry) do + subject.error(delivery_info, message_properties, payload, error) + end + + handle_retry.verify + end + end + + describe '#timeout' do + let(:delivery_info) { Object.new } + let(:message_properties) { Object.new } + let(:payload) { 'payload' } + let(:handle_retry) { Minitest::Mock.new } + + before do + any_instance_of(Sneakers::Handlers::RoutingMaxretry) do |handler| + stub(handler).init_opts(worker_opts) { opts } + stub(handler).create_queues_and_bindings + end + end + + it 'calls "#handle_retry"' do + handle_retry.expect( + :call, + true, + [delivery_info, message_properties, payload, :timeout] + ) + + subject.stub(:handle_retry, handle_retry) do + subject.timeout(delivery_info, message_properties, payload) + end + + handle_retry.verify + end + end + + describe '#init_opts' do + let(:queue) { Minitest::Mock.new } + + before do + stub(queue).name { 'foo' } + any_instance_of(Sneakers::Handlers::RoutingMaxretry) do |handler| + stub(handler).create_queues_and_bindings + end + end + + describe 'when given options are empty hash' do + let(:worker_opts) { {} } + it 'builds the options hash with defaults' do + expected = { + error_queue_name: 'foo.error', + error_routing_key: 'queue.foo.error', + requeue_routing_key: 'queue.foo.requeue', + retry_max_times: 5, + retry_queue_name: 'foo.retry', + retry_routing_key: 'queue.foo.retry', + retry_timeout: 6000, + worker_queue_name: 'foo' + } + + assert_equal(expected, subject.send(:init_opts, worker_opts)) + end + end + + describe 'when given options are not empty' do + let(:worker_opts) do + { + error_queue_name: 'bar.error', + error_routing_key: 'queue.bar.error', + requeue_routing_key: 'queue.bar.retry', + retry_queue_name: 'bar.delayed', + retry_routing_key: 'queue.bar.delayed', + worker_queue_name: 'bar' + } + end + + it 'builds the options hash with defaults' do + expected = { + error_queue_name: 'bar.error', + error_routing_key: 'queue.bar.error', + requeue_routing_key: 'queue.bar.retry', + retry_max_times: 5, + retry_queue_name: 'bar.delayed', + retry_routing_key: 'queue.bar.delayed', + retry_timeout: 6000, + worker_queue_name: 'bar' + } + assert_equal(expected, subject.send(:init_opts, worker_opts)) + end + end + end + + describe '#create_queues_and_bindings' do + let(:exchange_name) { 'name' } + let(:routing_key) { 'routing_key' } + let(:opts) { { requeue_routing_key: routing_key, exchange: exchange_name } } + let(:create_queues_and_bindings) do + subject.send(:create_queues_and_bindings) + end + + before do + any_instance_of(Sneakers::Handlers::RoutingMaxretry) do |handler| + stub(handler).init_opts(worker_opts) { opts } + stub(handler).create_retry_queue_and_binding + stub(handler).create_error_queue_and_binding + end + end + + it 'calls "#create_retry_queue_and_binding"' do + stub(queue).bind(exchange_name, routing_key: routing_key) + + create_retry_queue_and_binding = Minitest::Mock.new + create_retry_queue_and_binding.expect(:call, true) + + subject.stub( + :create_retry_queue_and_binding, + create_retry_queue_and_binding + ) { create_queues_and_bindings } + + create_retry_queue_and_binding.verify + end + + it 'calls "#create_error_queue_and_binding"' do + stub(queue).bind(exchange_name, routing_key: routing_key) + + create_error_queue_and_binding = Minitest::Mock.new + create_error_queue_and_binding.expect(:call, true) + + subject.stub( + :create_error_queue_and_binding, + create_error_queue_and_binding + ) { create_queues_and_bindings } + + create_error_queue_and_binding.verify + end + + it 'creates binding' do + stub(queue).bind + + mock_queue = Minitest::Mock.new + mock_queue.expect(:bind, nil, [exchange_name, routing_key: routing_key]) + + stub(subject).queue { mock_queue } + + create_queues_and_bindings + + mock_queue.verify + end + end + + describe '#create_error_queue_and_binding' do + let(:queue_name) { 'queue_name' } + let(:routing_key) { 'routing_key' } + let(:opts) do + { error_queue_name: queue_name, error_routing_key: routing_key } + end + + before do + any_instance_of(Sneakers::Handlers::RoutingMaxretry) do |handler| + stub(handler).init_opts(worker_opts) { opts } + stub(handler).create_queues_and_bindings + end + end + + it 'calls "#create_queue_and_binding"' do + create_queue_and_binding = Minitest::Mock.new + create_queue_and_binding.expect(:call, nil, [queue_name, routing_key]) + + subject.stub(:create_queue_and_binding, create_queue_and_binding) do + subject.send(:create_error_queue_and_binding) + end + + create_queue_and_binding.verify + end + end + + describe '#create_retry_queue_and_binding' do + let(:queue_name) { 'queue_name' } + let(:routing_key) { 'routing_key' } + let(:retry_queue_arguments) { Object.new } + let(:opts) do + { retry_queue_name: queue_name, retry_routing_key: routing_key } + end + + before do + any_instance_of(Sneakers::Handlers::RoutingMaxretry) do |handler| + stub(handler).init_opts(worker_opts) { opts } + stub(handler).create_queues_and_bindings + end + + stub(subject).retry_queue_arguments { retry_queue_arguments } + end + + it 'calls "#create_queue_and_binding"' do + create_queue_and_binding = Minitest::Mock.new + create_queue_and_binding.expect( + :call, + nil, + [queue_name, routing_key, arguments: retry_queue_arguments] + ) + + subject.stub(:create_queue_and_binding, create_queue_and_binding) do + subject.send(:create_retry_queue_and_binding) + end + + create_queue_and_binding.verify + end + end + + describe '#retry_queue_arguments' do + let(:exchange_name) { 'name' } + let(:retry_timeout) { 42 } + let(:requeue_routing_key) { 'foo' } + let(:opts) do + { + exchange: exchange_name, + retry_timeout: retry_timeout, + requeue_routing_key: requeue_routing_key + } + end + + before do + any_instance_of(Sneakers::Handlers::RoutingMaxretry) do |handler| + stub(handler).init_opts(worker_opts) { opts } + stub(handler).create_queues_and_bindings + end + end + + it 'returns arguments hash' do + expected = { + 'x-dead-letter-exchange' => exchange_name, + 'x-message-ttl' => retry_timeout, + 'x-dead-letter-routing-key' => requeue_routing_key + } + + assert_equal(expected, subject.send(:retry_queue_arguments)) + end + end + + describe '#create_queue_and_binding' do + let(:queue_name) { 'queue_name' } + let(:routing_key) { 'routing_key' } + + let(:log_prefix) { 'BAAAM!!!' } + let(:created_queue) { Object.new } + let(:durable) { true } + let(:exchange_name) { 'name' } + let(:opts) { { exchange: exchange_name } } + + let(:create_queue_and_binding) do + subject.send(:create_queue_and_binding, queue_name, routing_key) + end + + before do + any_instance_of(Sneakers::Handlers::RoutingMaxretry) do |handler| + stub(handler).init_opts(worker_opts) { opts } + stub(handler).create_queues_and_bindings + end + stub(subject).queue_durable? { durable } + end + + it 'writes log' do + stub(channel).queue { created_queue } + stub(subject).channel { channel } + stub(created_queue).bind + stub(subject).log_prefix { log_prefix } + + create_queue_and_binding + + log.rewind + + assert_match( + 'BAAAM!!! creating queue=queue_name, arguments={}', + log.readlines.last + ) + end + + it 'creates queue' do + stub(created_queue).bind + + mock_channel = Minitest::Mock.new + mock_channel.expect(:queue, created_queue, [queue_name, durable: durable]) + + stub(subject).channel { mock_channel } + + create_queue_and_binding + + mock_channel.verify + end + + it 'creates binding' do + mock_queue = Minitest::Mock.new + mock_queue.expect(:bind, nil, [exchange_name, routing_key: routing_key]) + + stub(channel).queue { mock_queue } + stub(subject).channel { channel } + + create_queue_and_binding + + mock_queue.verify + end + end + + describe '#handle_retry' do + let(:delivery_info) { Object.new } + let(:headers) { Object.new } + let(:message_properties) { Object.new } + let(:payload) { 'payload' } + let(:reason) { Object.new } + let(:failure_count) { 42 } + let(:retry_max_times) { 23 } + let(:opts) { { retry_max_times: retry_max_times } } + + let(:handle_retry) do + subject.send( + :handle_retry, + delivery_info, + message_properties, + payload, + reason + ) + end + + before do + stub(message_properties).headers { headers } + + any_instance_of(Sneakers::Handlers::RoutingMaxretry) do |handler| + stub(handler).init_opts(worker_opts) { opts } + stub(handler).create_queues_and_bindings + + stub(handler).failure_count { failure_count } + stub(handler).reject_to_retry + stub(handler).publish_to_error_queue + end + end + + it 'calls "#failure_count"' do + mock_failure_count = Minitest::Mock.new + mock_failure_count.expect(:call, failure_count, [headers]) + + subject.stub(:failure_count, mock_failure_count) do + handle_retry + end + + mock_failure_count.verify + end + + describe 'when max retries not reached' do + let(:failure_count) { 22 } + + it 'calls "#reject_to_retry"' do + mock_reject_to_retry = Minitest::Mock.new + mock_reject_to_retry.expect( + :call, + nil, + [delivery_info, message_properties, failure_count + 1] + ) + + subject.stub(:reject_to_retry, mock_reject_to_retry) do + handle_retry + end + + mock_reject_to_retry.verify + end + end + + describe 'when max retries reached' do + let(:failure_count) { 23 } + + it 'calls "#publish_to_error_queue"' do + mock_publish_to_error_queue = Minitest::Mock.new + mock_publish_to_error_queue.expect( + :call, + nil, + [ + delivery_info, + message_properties, + payload, + reason, + failure_count + 1 + ] + ) + + subject.stub(:publish_to_error_queue, mock_publish_to_error_queue) do + handle_retry + end + + mock_publish_to_error_queue.verify + end + end + end + + describe '#publish_to_error_queue' do + let(:exchange_name) { 'name' } + let(:error_routing_key) { 'routing_key' } + let(:opts) do + { error_routing_key: error_routing_key, exchange: exchange_name } + end + let(:delivery_tag) { Object.new } + let(:delivery_info) { Object.new } + let(:payload) { 'payload' } + let(:num_attempts) { 5 } + let(:reason) { :reason } + let(:log_prefix) { 'BOOM!!!' } + let(:error_payload) { Object.new } + let(:message_properties) { Object.new } + + let(:publish_to_error_queue) do + subject.send( + :publish_to_error_queue, + delivery_info, + message_properties, + message, + reason, + num_attempts + ) + end + + before do + any_instance_of(Sneakers::Handlers::RoutingMaxretry) do |handler| + stub(handler).init_opts(worker_opts) { opts } + stub(handler).create_queues_and_bindings + end + + stub(delivery_info).delivery_tag { delivery_tag } + stub(subject).error_payload { error_payload } + end + + it 'writes log' do + stub(subject).log_prefix { log_prefix } + stub(channel).basic_publish + stub(channel).acknowledge + + publish_to_error_queue + + log.rewind + + assert_match( + 'BOOM!!! '\ + "message=failing, retry_count=#{num_attempts}, reason=#{reason}", + log.readlines.last + ) + end + + it 'calls channel.basic_publish' do + mock_channel = Minitest::Mock.new + mock_channel.expect( + :basic_publish, + nil, + [error_payload, exchange_name, error_routing_key, content_type: 'application/json'] + ) + + stub(mock_channel).acknowledge + stub(subject).channel { mock_channel } + + publish_to_error_queue + + mock_channel.verify + end + + it 'calls channel.acknowledge' do + mock_channel = Minitest::Mock.new + mock_channel.expect(:acknowledge, nil, [delivery_tag]) + + stub(mock_channel).basic_publish + stub(subject).channel { mock_channel } + + publish_to_error_queue + + mock_channel.verify + end + end + + describe '#reject_to_retry' do + let(:headers) { Object.new } + let(:message_properties) { Object.new } + let(:delivery_tag) { Object.new } + let(:delivery_info) { Object.new } + let(:num_attempts) { 5 } + let(:log_prefix) { 'FOO!!!' } + + let(:reject_to_retry) do + subject.send( + :reject_to_retry, + delivery_info, + message_properties, + num_attempts + ) + end + + before do + any_instance_of(Sneakers::Handlers::RoutingMaxretry) do |handler| + stub(handler).init_opts(worker_opts) { opts } + stub(handler).create_queues_and_bindings + end + + stub(message_properties).headers { headers } + stub(delivery_info).delivery_tag { delivery_tag } + end + + it 'writes log' do + stub(subject).log_prefix { log_prefix } + stub(channel).reject + + reject_to_retry + + log.rewind + + assert_match( + "FOO!!! msg=retrying, count=#{num_attempts}, " \ + "headers=#{message_properties.headers}", + log.readlines.last + ) + end + + it 'calls channel.acknowledge' do + mock_channel = Minitest::Mock.new + mock_channel.expect(:reject, nil, [delivery_tag]) + + stub(subject).channel { mock_channel } + + reject_to_retry + + mock_channel.verify + end + end + + describe '#error_payload' do + let(:reason) { :reason } + let(:num_attempts) { 5 } + let(:payload) { 'ABCD' } + let(:timestamp) { '2016-02-11T17:44:55+01:00' } + let(:delivery_info) { Object.new } + let(:message_properties) { Object.new } + + let(:error_payload) do + JSON.parse( + subject.send( + :error_payload, + delivery_info, + message_properties, + payload, + reason, + num_attempts + ) + ) + end + + before do + any_instance_of(Sneakers::Handlers::RoutingMaxretry) do |handler| + stub(handler).init_opts(worker_opts) { opts } + stub(handler).create_queues_and_bindings + end + + any_instance_of(Time) do |time| + stub(time).iso8601 { timestamp } + end + + stub(delivery_info).to_hash { {} } + stub(message_properties).to_hash { {} } + end + + it 'creates JSON hash with _error key' do + assert_equal(error_payload.keys, ['_error']) + end + + it 'created the error_payload json including the reason' do + expected = { 'reason' => reason.to_s } + + assert_equal( + error_payload['_error'].merge(expected), + error_payload['_error'] + ) + end + + it 'created the error_payload json including the number of attempts' do + expected = { 'num_attempts' => num_attempts } + + assert_equal( + error_payload['_error'].merge(expected), + error_payload['_error'] + ) + end + + it 'created the error_payload json including the failed_at timestamp' do + expected = { 'failed_at' => timestamp } + + assert_equal( + error_payload['_error'].merge(expected), + error_payload['_error'] + ) + end + + it 'created the error_payload json including the payload' do + expected = { 'payload' => payload } + + assert_equal( + error_payload['_error'].merge(expected), + error_payload['_error'] + ) + end + + it 'created the error_payload json including the ' \ + '"#exception_payload" result' do + stub(subject).exception_payload(reason) { { foo: 'bar' } } + + expected = { 'foo' => 'bar' } + assert_equal( + error_payload['_error'].merge(expected), + error_payload['_error'] + ) + end + end + + describe '#exception_payload' do + let(:exception_payload) { subject.send(:exception_payload, reason) } + + before do + any_instance_of(Sneakers::Handlers::RoutingMaxretry) do |handler| + stub(handler).init_opts(worker_opts) { opts } + stub(handler).create_queues_and_bindings + end + end + + describe 'when reason is not an exception' do + let(:reason) { 'reason' } + + it 'returns empty hash' do + assert_equal({}, exception_payload) + end + end + + describe 'when reason is an exception' do + let(:reason) { RuntimeError.new('Foo') } + + it 'returns hash including error class and message' do + expected = { + error_class: 'RuntimeError', + error_message: 'Foo' + } + assert_equal(exception_payload.merge(expected), exception_payload) + end + + it 'returns hash including "#exception_backtrace" result' do + stub(subject).exception_backtrace(reason) { { foo: 'bar' } } + + expected = { foo: 'bar' } + + assert_equal(exception_payload.merge(expected), exception_payload) + end + end + end + + describe '#exception_backtrace' do + let(:reason) { Object.new } + + let(:exception_backtrace) { subject.send(:exception_backtrace, reason) } + + before do + any_instance_of(Sneakers::Handlers::RoutingMaxretry) do |handler| + stub(handler).init_opts(worker_opts) { opts } + stub(handler).create_queues_and_bindings + end + + stub(reason).backtrace { backtrace } + end + + describe 'when backtrace is nil' do + let(:backtrace) { nil } + + it 'returns empty hash' do + assert_equal({}, exception_backtrace) + end + end + + describe 'when backtrace exists' do + let(:backtrace) { (1..11).to_a.map { |a| "line#{a}" } } + + it 'returns hash including first 10 lines of backtrace' do + expected = { + backtrace: 'line1, line2, line3, line4, line5, line6, line7, ' \ + 'line8, line9, line10' + } + + assert_equal(exception_backtrace.merge(expected), exception_backtrace) + end + end + end + + describe '#failure_count' do + let(:headers) { Object.new } + let(:x_death_array) { [] } + + let(:failure_count) { subject.send(:failure_count, headers) } + + before do + any_instance_of(Sneakers::Handlers::RoutingMaxretry) do |handler| + stub(handler).init_opts(worker_opts) { opts } + stub(handler).create_queues_and_bindings + end + + stub(subject).x_death_array(headers) { x_death_array } + end + + it 'calls "#x_death_array"' do + mock_x_death_array = Minitest::Mock.new + mock_x_death_array.expect(:call, x_death_array, [headers]) + + subject.stub(:x_death_array, mock_x_death_array) do + failure_count + end + + mock_x_death_array.verify + end + + describe 'when x_death array is empty' do + it 'returns 0' do + assert_equal(0, failure_count) + end + end + + describe 'when first x_death array element has no count key' do + let(:x_death_array) { [{ one: 1 }, { two: 2 }, { three: 3 }] } + + it 'returns x_death array length' do + assert_equal(3, failure_count) + end + end + + describe 'when first x_death array element has a count key' do + let(:x_death_array) { [{ 'count' => '23' }, { 'count' => '2' }] } + + it 'returns first x_death array elements count key' do + assert_equal(23, failure_count) + end + end + end + + describe '#x_death_array' do + let(:x_death_array) { subject.send(:x_death_array, headers) } + + before do + any_instance_of(Sneakers::Handlers::RoutingMaxretry) do |handler| + stub(handler).init_opts(worker_opts) { opts } + stub(handler).create_queues_and_bindings + end + end + + describe 'when headers is nil' do + let(:headers) { nil } + + it 'returns empty array' do + assert_equal([], x_death_array) + end + end + + describe 'when no x-death header exists' do + let(:headers) { { foo: 'bar' } } + + it 'returns empty array' do + assert_equal([], x_death_array) + end + end + + describe 'when x-death header exists' do + let(:opts) { { worker_queue_name: 'foo' } } + let(:headers) do + { + 'x-death' => [ + { 'queue' => 'foo', 'xyz' => 1 }, + { 'queue' => 'bar', 'xyz' => 2 }, + { 'queue' => 'foo', 'xyz' => 3 }, + { 'queue' => 'bar', 'xyz' => 4 }, + { 'queue' => 'baz', 'xyz' => 5 } + ] + } + end + + it 'returns x_death header for worker queue' do + expected = [ + { 'queue' => 'foo', 'xyz' => 1 }, + { 'queue' => 'foo', 'xyz' => 3 } + ] + + assert_equal(expected, x_death_array) + end + end + end + + describe '#log_prefix' do + let(:opts) { { worker_queue_name: 'bamm' } } + + before do + any_instance_of(Sneakers::Handlers::RoutingMaxretry) do |handler| + stub(handler).init_opts(worker_opts) { opts } + stub(handler).create_queues_and_bindings + end + end + + it 'returns the log prefix' do + assert_equal( + 'Sneakers::Handlers::RoutingMaxretry handler [queue=bamm]', + subject.send(:log_prefix) + ) + end + end + + describe '#queue_durable?' do + let(:queue_durable?) { subject.send(:queue_durable?) } + + before do + any_instance_of(Sneakers::Handlers::RoutingMaxretry) do |handler| + stub(handler).init_opts(worker_opts) { opts } + stub(handler).create_queues_and_bindings + end + end + + describe 'when no queue options exists' do + let(:opts) { {} } + + it 'returns false' do + refute(queue_durable?) + end + end + + describe 'when queue options exists' do + describe 'when durable is not set' do + let(:opts) { { queue_options: {} } } + + it 'returns false' do + refute(queue_durable?) + end + end + + describe 'when durable is set' do + let(:durable) { true } + let(:opts) { { queue_options: { durable: durable } } } + + it 'returns durable value' do + assert_equal(durable, queue_durable?) + end + end + end + end +end diff --git a/spec/sneakers/integration_spec.rb b/spec/sneakers/integration_spec.rb index 13fe6053..1e6aa80b 100644 --- a/spec/sneakers/integration_spec.rb +++ b/spec/sneakers/integration_spec.rb @@ -2,9 +2,11 @@ require 'sneakers' require 'sneakers/runner' require 'fixtures/integration_worker' +require 'fixtures/maxretry_worker' require "rabbitmq/http/client" +REQUEUING_WAIT_TIME = 1 describe "integration" do describe 'first' do @@ -13,10 +15,6 @@ prepare end - def integration_log(msg) - puts msg if ENV['INTEGRATION_LOG'] - end - def prepare # clean up all integration queues; admin interface must be installed # in integration env @@ -65,26 +63,13 @@ def assert_all_accounted_for(opts) return end end - + integration_log "failed test. killing off workers." Process.kill("TERM", pid) sleep 1 fail "incomplete!" end - def start_worker(w) - integration_log "starting workers." - r = Sneakers::Runner.new([w]) - pid = fork { - r.run - } - - integration_log "waiting for workers to stabilize (5s)." - sleep 5 - - pid - end - it 'should pull down 100 jobs from a real queue' do job_count = 100 @@ -95,7 +80,7 @@ def start_worker(w) job_count.times do |i| p.publish("m #{i}", to_queue: IntegrationWorker.queue_name) end - + assert_all_accounted_for( queue: IntegrationWorker.queue_name, pid: pid, @@ -103,8 +88,310 @@ def start_worker(w) jobs: job_count, ) end - + end -end - + describe 'worker with routing maxretry handler' do + let(:rabbitmq_client) do + RabbitMQ::HTTP::Client.new('http://guest:guest@127.0.0.1:15672/') + end + let(:redis) { Redis.new } + let(:queue_name) { "integration_#{rand(10**36).to_s(36)}" } + let(:exchange_name) { "integration_#{rand(10**36).to_s(36)}" } + + before :each do + skip unless ENV['INTEGRATION'] + + begin + rabbitmq_client.overview + rescue + puts 'Rabbitmq admin seems to not exist? You better be running this on'\ + "Travis or Docker. proceeding.\n#{$ERROR_INFO}" + skip + end + + begin + redis.info + rescue + puts 'Redis seems to not exist? You better be running this on'\ + 'Travis or Docker.' + skip + end + + cleanup_rabbitmq(rabbitmq_client) + cleanup_redis(redis) + prepare_sneakers(exchange: exchange_name) + + worker.from_queue(queue_name, worker_opts) + @worker_pid = start_worker(worker) + end + + after :each do + Process.kill('TERM', @worker_pid) + + cleanup_rabbitmq(rabbitmq_client) + cleanup_redis(redis) + end + + describe 'with defaults for routing maxretry handler' do + let(:worker) { AlwaysAckWorker } + let(:worker_opts) do + { + handler: Sneakers::Handlers::RoutingMaxretry, + retry_max_times: 2, + retry_timeout: 100, + arguments: { + 'x-dead-letter-exchange' => exchange_name, + 'x-dead-letter-routing-key' => "queue.#{queue_name}.retry" + } + } + end + + it 'creates the required queues' do + expected_queue_names = [ + queue_name, + "#{queue_name}.error", + "#{queue_name}.retry" + ] + queue_names = rabbitmq_client.list_queues.map(&:name) + + expected_queue_names.each do |expected_queue_name| + assert_includes(queue_names, expected_queue_name) + end + end + + describe 'when worker allways fails' do + let(:worker) { AlwaysRejectWorker } + + before do + Sneakers::Publisher.new( + exchange: exchange_name + ).publish( + 'foo', + routing_key: queue_name + ) + end + + it 'has a message in the error queue' do + # wait for failing message + sleep REQUEUING_WAIT_TIME + + message = get_message_from_queue("#{queue_name}.error") + + refute_nil(message.first) + end + + it 'has been retried twice' do + # wait for failing message + sleep REQUEUING_WAIT_TIME + + message = get_message_from_queue("#{queue_name}.error") + + assert_equal(JSON.load(message[2])['_error']['num_attempts'], 3) + end + end + + describe 'when worker fails once' do + let(:worker) { RejectOnceWorker } + + before do + Sneakers::Publisher.new( + exchange: exchange_name + ).publish( + 'foo', + routing_key: queue_name + ) + end + + it 'has no message in the error queue' do + # wait for failing message + sleep REQUEUING_WAIT_TIME + + message = get_message_from_queue("#{queue_name}.error") + + assert_nil(message.first) + end + + it 'consumes the requeued message' do + # wait for failing message + sleep REQUEUING_WAIT_TIME + + message = redis.get(queue_name) + + refute_nil(message) + end + + it 'it has been rejected in original queue once' do + # wait for failing message + sleep REQUEUING_WAIT_TIME + + message = JSON.load(redis.get(queue_name)) + message_headers = message['message_properties']['headers'] + consumer_x_death_array = x_death_array(message_headers, queue_name) + + assert_equal( + 1, + consumer_x_death_array.first['count'] || + consumer_x_death_array.count + ) + assert_equal('rejected', consumer_x_death_array.first['reason']) + end + + it 'it has been expired in retry queue once' do + # wait for failing message + sleep REQUEUING_WAIT_TIME + + message = JSON.load(redis.get(queue_name)) + message_headers = message['message_properties']['headers'] + consumer_x_death_array = x_death_array( + message_headers, + "#{queue_name}.retry" + ) + + assert_equal( + 1, + consumer_x_death_array.first['count'] || + consumer_x_death_array.count + ) + assert_equal('expired', consumer_x_death_array.first['reason']) + end + end + end + + describe 'with customized config for maxretry handler' do + let(:worker) { AlwaysAckWorker } + let(:retry_routing_key) { 'foo' } + let(:requeue_routing_key) { 'bar' } + let(:error_routing_key) { 'baz' } + let(:retry_queue_name) { 'integration_retry_queue' } + let(:error_queue_name) { 'integration_error_queue' } + + let(:worker_opts) do + { + handler: Sneakers::Handlers::RoutingMaxretry, + retry_max_times: 2, + retry_timeout: 100, + retry_routing_key: retry_routing_key, + requeue_routing_key: requeue_routing_key, + error_routing_key: error_routing_key, + retry_queue_name: retry_queue_name, + error_queue_name: error_queue_name, + arguments: { + 'x-dead-letter-exchange' => exchange_name, + 'x-dead-letter-routing-key' => retry_routing_key + } + } + end + + it 'creates the required queues' do + expected_queue_names = [ + queue_name, + retry_queue_name, + error_queue_name + ] + queue_names = rabbitmq_client.list_queues.map(&:name) + + expected_queue_names.each do |expected_queue_name| + assert_includes(queue_names, expected_queue_name) + end + end + + describe 'when worker allways fails' do + let(:worker) { AlwaysRejectWorker } + + before do + Sneakers::Publisher.new( + exchange: exchange_name + ).publish( + 'foo', + routing_key: queue_name + ) + end + + it 'has a message in the error queue' do + # wait for failing message + sleep REQUEUING_WAIT_TIME + + message = get_message_from_queue(error_queue_name) + + refute_nil(message.first) + end + + it 'has been retried twice' do + # wait for failing message + sleep REQUEUING_WAIT_TIME + + message = get_message_from_queue(error_queue_name) + + assert_equal(3, JSON.load(message[2])['_error']['num_attempts']) + end + end + + describe 'when worker fails once' do + let(:worker) { RejectOnceWorker } + + before do + Sneakers::Publisher.new( + exchange: exchange_name + ).publish( + 'foo', + routing_key: queue_name + ) + end + + it 'has no message in the error queue' do + # wait for failing message + sleep REQUEUING_WAIT_TIME + + message = get_message_from_queue(error_queue_name) + + assert_nil(message.first) + end + + it 'consumes the requeued message' do + # wait for failing message + sleep REQUEUING_WAIT_TIME + + message = redis.get(queue_name) + + refute_nil(message) + end + + it 'it has been rejected in original queue once' do + # wait for failing message + sleep REQUEUING_WAIT_TIME + + message = JSON.load(redis.get(queue_name)) + message_headers = message['message_properties']['headers'] + consumer_x_death_array = x_death_array(message_headers, queue_name) + + assert_equal( + 1, + consumer_x_death_array.first['count'] || + consumer_x_death_array.count + ) + assert_equal('rejected', consumer_x_death_array.first['reason']) + end + + it 'it has been expired in retry queue once' do + # wait for failing message + sleep REQUEUING_WAIT_TIME + + message = JSON.load(redis.get(queue_name)) + message_headers = message['message_properties']['headers'] + consumer_x_death_array = x_death_array( + message_headers, + retry_queue_name + ) + + assert_equal( + 1, + consumer_x_death_array.first['count'] || + consumer_x_death_array.count + ) + assert_equal('expired', consumer_x_death_array.first['reason']) + end + end + end + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 301a8640..0b568fb3 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -11,9 +11,8 @@ def compose_or_localhost(key) Resolv::DNS.new.getaddress(key) -rescue +rescue "localhost" end - - +Dir[File.join(File.dirname(__FILE__), 'support/**/*.rb')].each { |f| require f } diff --git a/spec/support/intergration_helper.rb b/spec/support/intergration_helper.rb new file mode 100644 index 00000000..ac12b249 --- /dev/null +++ b/spec/support/intergration_helper.rb @@ -0,0 +1,75 @@ +def cleanup_rabbitmq(client) + # clean up all integration queues; admin interface must be installed + # in integration env + integration_log 'cleaning up RabbitMQ' + + cleanup_queues(client) + cleanup_exchanges(client) +end + +def cleanup_exchanges(client) + exchanges = client.list_exchanges + exchanges.each do |exchange| + name = exchange.name + if name.start_with? 'integration_' + client.delete_exchange('/', name) + integration_log "delete exchange #{name}." + end + end +end + +def cleanup_queues(client) + queues = client.list_queues + queues.each do |q| + name = q.name + if name.start_with? 'integration_' + client.delete_queue('/', name) + integration_log "delete queue #{name}." + end + end +end + +def x_death_array(message_headers, queue_name) + message_headers['x-death'].select do |x| + x['queue'] == queue_name + end +end + +def cleanup_redis(client) + keys = client.keys('integration_*') + integration_log 'cleaning up redis' + client.del(keys) unless keys.empty? +end + +def prepare_sneakers(opts = {}) + Sneakers.clear! + Sneakers.configure(opts) + Sneakers.logger.level = Logger::ERROR +end + +def get_message_from_queue(queue_name) + connection = Bunny.new + connection.start + channel = connection.create_channel + message = channel.basic_get(queue_name) + channel.acknowledge(message.first.delivery_tag) if message.first + + message +end + +def start_worker(worker) + integration_log 'starting workers.' + runner = Sneakers::Runner.new([worker]) + pid = fork do + runner.run + end + + integration_log 'waiting for workers to stabilize (5s).' + sleep 5 + + pid +end + +def integration_log(msg) + puts msg if ENV['INTEGRATION_LOG'] +end