Skip to content

Commit

Permalink
Add an option to control the maximum number of workers to fork
Browse files Browse the repository at this point in the history
  • Loading branch information
jefmathiot committed Jan 1, 2015
1 parent c01f751 commit f96c8fe
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 29 deletions.
17 changes: 10 additions & 7 deletions lib/electric_sheep/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ class CLI < Thor
def self.startup_options
run_options
process_options
option :workers, aliases: %w(-w), type: :numeric,
desc: 'Maximum number of parallel workers', default: 1
end

def self.run_options
Expand Down Expand Up @@ -53,11 +55,8 @@ def encrypt(secret)

desc "start", "Start a master process in the background"
startup_options

def start
rescued(true) do
master(config: configuration).start!
end
launch_master(:start!)
end

desc "stop", "Stop the master process"
Expand All @@ -74,9 +73,7 @@ def stop
startup_options

def restart
rescued(true) do
master(config: configuration).restart!
end
launch_master(:restart!)
end

default_task :work
Expand Down Expand Up @@ -104,6 +101,12 @@ def logger
@logger ||= stdout_logger
end

def launch_master(method)
rescued(true) do
master(config: configuration, workers: options[:workers]).send(method)
end
end

def master(opts={})
@logger=file_logger
Master.new({
Expand Down
23 changes: 13 additions & 10 deletions lib/electric_sheep/master.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ class Master
def initialize(options)
@config = options[:config]
@logger = options[:logger]
@pidfile=File.expand_path(options[:pidfile])
@workers = [1, options[:workers]].compact.max
@pidfile=File.expand_path(options[:pidfile]) if options[:pidfile]
end

def start!
Expand Down Expand Up @@ -94,16 +95,18 @@ def daemonize(&block)

def run_scheduled
@config.iterate do |project|
project.on_schedule do
@logger.info "Forking a new worker to handle project " +
"\"#{project.id}\""
# Turn children into daemons to let them run on master stop
worker=daemonize do
Runner::SingleRun.new(@config, @logger, project).run!
if worker_pids.size < @workers
project.on_schedule do
# Turn children into daemons to let them run on master stop
@logger.info "Forking a new worker to handle project " +
"\"#{project.id}\""
worker=daemonize do
Runner::SingleRun.new(@config, @logger, project).run!
end
worker_pids[worker]=project.id
@logger.debug "Forked a worker for project \"#{project.id}\", " +
"pid: #{worker}"
end
worker_pids[worker]=project.id
@logger.debug "Forked a worker for project \"#{project.id}\", pid: " +
"#{worker}"
end
end
end
Expand Down
15 changes: 11 additions & 4 deletions spec/electric_sheep/cli_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ def self.ensure_exception_handling(&block)
concise do

it 'encrypts secrets' do
ElectricSheep::Crypto.expects(:encrypt).with('SECRET', '/some/key').returns('CIPHER')
ElectricSheep::Crypto.expects(:encrypt).with('SECRET', '/some/key').
returns('CIPHER')
logger.expects(:info).with("CIPHER")
subject.new([], key: '/some/key').encrypt('SECRET')
end
Expand All @@ -132,10 +133,10 @@ def expects_master(options)
ElectricSheep::Master.expects(:new).with(options).returns(master)
end

def expects_startup(method, master_options, config_file=nil)
def expects_startup(method, master_options, config_file=nil, workers=nil)
expects_evaluator(config_file || 'Sheepfile')
expects_control(method,
{config: config}.merge(master_options)
{config: config, workers: workers}.merge(master_options),
)
end

Expand All @@ -160,7 +161,13 @@ def self.ensure_startup(action)

it 'overrides the path to pidfile' do
expects_startup("#{action}!", {pidfile: '/tmp/es.lock'})
subject.new([], config: 'Sheepfile', pidfile: '/tmp/es.lock').send(action)
subject.new([], config: 'Sheepfile', pidfile: '/tmp/es.lock').
send(action)
end

it 'overrides the maximum number of workers' do
expects_startup("#{action}!", {workers: 2})
subject.new([], config: 'Sheepfile', workers: 2).send(action)
end

end
Expand Down
39 changes: 31 additions & 8 deletions spec/electric_sheep/master_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@
@pidfile.unlink
end

it 'defaults the number of workers to 1' do
subject.new({}).instance_variable_get(:@workers).must_equal 1
end

it 'uses the provided number of workers' do
subject.new(workers: 2).instance_variable_get(:@workers).must_equal 2
end

it 'ignores a non-positive number of workers' do
subject.new(workers: 0).instance_variable_get(:@workers).must_equal 1
end

describe 'starting' do

before do
Expand Down Expand Up @@ -85,26 +97,29 @@ def expects_child_worker(&block)
end
logger.expects(:debug).in_sequence(seq).
with("Forked a worker for project \"some-project\", pid: 10001")
yield if block_given?
yield if block_given?
end
end

def launch
master.start!
expects_pidfile
end

it 'forks' do
config.stubs(:iterate)
expects_startup do
logger.expects(:debug).in_sequence(seq).with("Active workers: 0")
end
master.start!
expects_pidfile
launch
end

it 'forks then launches a worker' do
expects_child_worker do
Process.expects(:kill).with(0, 10001).in_sequence(seq).returns(true)
logger.expects(:debug).in_sequence(seq).with("Active workers: 1")
end
master.start!
expects_pidfile
launch
end

it 'forks then flushes a completed worker' do
Expand All @@ -114,12 +129,20 @@ def expects_child_worker(&block)
with("Worker for project \"some-project\" completed, pid: 10001")
logger.expects(:debug).in_sequence(seq).with("Active workers: 0")
end
master.start!
expects_pidfile
launch
end

end
it 'does not fork when the max number of workers has been reached' do
config.stubs(:iterate).yields(project=mock)
master.instance_variable_set(:@workers, 0)
expects_startup do
project.expects(:on_schedule).never
logger.expects(:debug).in_sequence(seq).with("Active workers: 0")
end
launch
end

end

it 'restarts' do
# Definitely enjoyed writing this test
Expand Down

0 comments on commit f96c8fe

Please sign in to comment.