Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configuration options for consumer subscribing to a queue #402

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@ source 'https://rubygems.org'

gem 'ruby-prof', platforms: [:ruby_22, :ruby_23, :ruby_24]

gem 'rake', '>= 12.3', '< 14.0'

gemspec
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ class Processor
include Sneakers::Worker
from_queue :logs


def work(msg)
err = JSON.parse(msg)
if err["type"] == "error"
Expand Down
2 changes: 1 addition & 1 deletion examples/benchmark_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ class BenchmarkWorker
from_queue 'downloads',
exchange_options: { durable: false },
queue_options: { durable: false },
:ack => true,
consumer_options: { manual_ack: true },
:threads => 50,
:prefetch => 50,
:timeout_job_after => 1,
Expand Down
2 changes: 1 addition & 1 deletion examples/max_retry_handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
Sneakers.logger.level = Logger::DEBUG

WORKER_OPTIONS = {
:ack => true,
:consumer_options => { :manual_ack => true },
:threads => 1,
:prefetch => 1,
:timeout_job_after => 60,
Expand Down
4 changes: 3 additions & 1 deletion examples/middleware_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ def initialize(app, *args)

def call(deserialized_msg, delivery_info, metadata, handler)
puts "******** DemoMiddleware - before; args #{@args}"
@app.call(deserialized_msg, delivery_info, metadata, handler)
res = @app.call(deserialized_msg, delivery_info, metadata, handler)
puts "******** DemoMiddleware - after"

res
end
end

Expand Down
2 changes: 1 addition & 1 deletion examples/profiling_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
class ProfilingWorker
include Sneakers::Worker
from_queue 'downloads',
:ack => true,
:consumer_options => { :manual_ack => true },
:threads => 50,
:prefetch => 50,
:timeout_job_after => 1,
Expand Down
2 changes: 1 addition & 1 deletion examples/workflow_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ class WorkflowWorker
from_queue 'downloads',
exchange_options: { durable: false },
queue_options: { durable: false },
:ack => true,
consumer_options: { manual_ack: true },
:threads => 50,
:prefetch => 50,
:timeout_job_after => 1,
Expand Down
11 changes: 9 additions & 2 deletions lib/sneakers/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ class Configuration
:arguments => {}
}.freeze

CONSUMER_OPTION_DEFAULTS = {
:block => false,
:manual_ack => true,
:arguments => {}
}.freeze

DEFAULTS = {
# Set up default handler which just logs the error.
# Remove this in production if you don't want sensitive data logged.
Expand All @@ -40,12 +46,12 @@ class Configuration
:prefetch => 10,
:threads => 10,
:share_threads => false,
:ack => true,
:heartbeat => 30,
:hooks => {},
:exchange => 'sneakers',
:exchange_options => EXCHANGE_OPTION_DEFAULTS,
:queue_options => QUEUE_OPTION_DEFAULTS
:queue_options => QUEUE_OPTION_DEFAULTS,
:consumer_options => CONSUMER_OPTION_DEFAULTS
}.freeze


Expand Down Expand Up @@ -107,6 +113,7 @@ def map_all_deprecated_options(hash)
hash = map_deprecated_options_key(:exchange_options, :durable, :durable, false, hash)
hash = map_deprecated_options_key(:queue_options, :durable, :durable, true, hash)
hash = map_deprecated_options_key(:queue_options, :arguments, :arguments, true, hash)
hash = map_deprecated_options_key(:consumer_options, :ack, :manual_ack, true, hash)
hash
end

Expand Down
2 changes: 1 addition & 1 deletion lib/sneakers/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def subscribe(worker)
# retry queues, etc).
handler = handler_klass.new(@channel, queue, worker.opts)

@consumer = queue.subscribe(block: false, manual_ack: @opts[:ack]) do | delivery_info, metadata, msg |
@consumer = queue.subscribe(@opts[:consumer_options]) do | delivery_info, metadata, msg |
worker.do_work(delivery_info, metadata, msg, handler)
end
nil
Expand Down
6 changes: 3 additions & 3 deletions lib/sneakers/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ def run
@se.run
end

def stop
@se.stop
def stop(stop_graceful=true)
@se.stop(stop_graceful)
end
end

Expand All @@ -40,7 +40,7 @@ def to_h


def reload_config!
Sneakers.logger.warn("Loading runner configuration...")
Sneakers.logger.info("Loading runner configuration...")
config_file = Sneakers::CONFIG[:runner_config_file]

if config_file
Expand Down
3 changes: 2 additions & 1 deletion lib/sneakers/spawner.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'yaml'
require 'erb'

module Sneakers
class Spawner
Expand All @@ -12,7 +13,7 @@ def self.spawn
end
@pids = []
@exec_string = "bundle exec rake sneakers:run"
worker_config = YAML.load(File.read(worker_group_config_file))
worker_config = YAML.load(ERB.new(File.read(worker_group_config_file)).result)
worker_config.keys.each do |group_name|
workers = worker_config[group_name]['classes']
workers = workers.join "," if workers.is_a?(Array)
Expand Down
6 changes: 5 additions & 1 deletion lib/sneakers/tasks.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@
Rake::Task['environment'].invoke

if defined?(::Rails)
::Rails.application.eager_load!
if defined?(::Zeitwerk)
::Zeitwerk::Loader.eager_load_all
else
::Rails.application.eager_load!
end
end

if ENV["WORKERS"].nil?
Expand Down
2 changes: 1 addition & 1 deletion lib/sneakers/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module Sneakers
VERSION = "2.12.0.pre"
VERSION = "2.13.0.pre"
end
38 changes: 19 additions & 19 deletions lib/sneakers/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def initialize(queue = nil, pool = nil, opts = {})
queue_name = self.class.queue_name
opts = Sneakers::CONFIG.merge(opts)

@should_ack = opts[:ack]
@should_ack = opts[:consumer_options][:manual_ack]
@pool = pool || Concurrent::FixedThreadPool.new(opts[:threads] || Sneakers::Configuration::DEFAULTS[:threads])
@call_with_params = respond_to?(:work_with_params)
@content_type = opts[:content_type]
Expand Down Expand Up @@ -72,31 +72,31 @@ def process_work(delivery_info, metadata, msg, handler)
end
res = block_to_call.call(deserialized_msg, delivery_info, metadata, handler)
end
rescue StandardError, ScriptError => ex
rescue SignalException, SystemExit
# ServerEngine handles these exceptions, so they are not expected to be raised within the worker.
# Nevertheless, they are listed here to ensure that they are not caught by the rescue block below.
raise
rescue Exception => ex
res = :error
error = ex
worker_error(ex, log_msg: log_msg(msg), class: self.class.name,
message: msg, delivery_info: delivery_info, metadata: metadata)
end

if @should_ack

if res == :ack
ensure
if @should_ack
case res
# note to future-self. never acknowledge multiple (multiple=true) messages under threads.
handler.acknowledge(delivery_info, metadata, msg)
elsif res == :error
handler.error(delivery_info, metadata, msg, error)
elsif res == :reject
handler.reject(delivery_info, metadata, msg)
elsif res == :requeue
handler.reject(delivery_info, metadata, msg, true)
else
handler.noop(delivery_info, metadata, msg)
when :ack then handler.acknowledge(delivery_info, metadata, msg)
when :error then handler.error(delivery_info, metadata, msg, error)
when :reject then handler.reject(delivery_info, metadata, msg)
when :requeue then handler.reject(delivery_info, metadata, msg, true)
else
handler.noop(delivery_info, metadata, msg)
end
metrics.increment("work.#{self.class.name}.handled.#{res || 'noop'}")
end
metrics.increment("work.#{self.class.name}.handled.#{res || 'noop'}")
end

metrics.increment("work.#{self.class.name}.ended")
metrics.increment("work.#{self.class.name}.ended")
end
end

def stop
Expand Down
5 changes: 2 additions & 3 deletions sneakers.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Gem::Specification.new do |gem|
gem.email = ['jondotan@gmail.com']
gem.description = %q( Fast background processing framework for Ruby and RabbitMQ )
gem.summary = %q( Fast background processing framework for Ruby and RabbitMQ )
gem.homepage = 'http://sneakers.io'
gem.homepage = 'https://github.com/jondot/sneakers'
gem.license = 'MIT'
gem.required_ruby_version = Gem::Requirement.new(">= 2.2")

Expand All @@ -27,13 +27,12 @@ Gem::Specification.new do |gem|
gem.add_dependency 'bunny', '~> 2.14'
gem.add_dependency 'concurrent-ruby', '~> 1.0'
gem.add_dependency 'thor'
gem.add_dependency 'rake', '~> 12.3'
gem.add_dependency 'rake', '>= 12.3', '< 14.0'

# for integration environment (see .travis.yml and integration_spec)
gem.add_development_dependency 'rabbitmq_http_api_client'
gem.add_development_dependency 'redis'

gem.add_development_dependency 'rake', '~> 12.3'
gem.add_development_dependency 'minitest', '~> 5.11'
gem.add_development_dependency 'rr', '~> 1.2.1'
gem.add_development_dependency 'unparser', '0.2.2' # keep below 0.2.5 for ruby 2.0 compat.
Expand Down
4 changes: 3 additions & 1 deletion spec/sneakers/publisher_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
{
:prefetch => 25,
:durable => true,
:ack => true,
:consumer_options => {
:manual_ack => true
},
:heartbeat => 2,
:vhost => '/',
:exchange => "sneakers",
Expand Down
5 changes: 4 additions & 1 deletion spec/sneakers/queue_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
let :queue_vars do
{
:prefetch => 25,
:ack => true,
:heartbeat => 2,
:vhost => '/',
:exchange => "sneakers",
Expand All @@ -16,6 +15,10 @@
},
queue_options: {
durable: true
},
:consumer_options => {
:block => false,
:manual_ack => true
}
}
end
Expand Down
5 changes: 4 additions & 1 deletion spec/sneakers/worker_handlers_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
class HandlerTestWorker
include Sneakers::Worker
from_queue 'defaults',
:ack => true
:consumer_options => {
:manual_ack => true
}


def work(msg)
if msg.is_a?(StandardError)
Expand Down
Loading