Skip to content

Commit

Permalink
Add heartbeat
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisBr committed Jan 24, 2024
1 parent 8d3c83a commit b799cd6
Show file tree
Hide file tree
Showing 6 changed files with 261 additions and 6 deletions.
49 changes: 49 additions & 0 deletions ruby/lib/ci/queue/redis/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,55 @@ def initialize(redis_url, config)
end
end

class Monitor
def initialize(redis_url, zset_key, processed_key)
@redis_url = redis_url
@zset_key = zset_key
@processed_key = processed_key
end

def boot!
child_read, @pipe = IO.pipe
ready_pipe, child_write = IO.pipe
@pipe.binmode
@pid = Process.spawn(
RbConfig.ruby,
::File.join(__dir__, "monitor.rb"),
@redis_url,
@zset_key,
@processed_key,
in: child_read,
out: child_write,
)
child_write.close
child_read.close

# Check the process is alive.
ready_pipe.wait_readable(2)
Process.kill(0, @pid)
@pipe
end

def shutdown!
@pipe.close
end

def tick!(id)
send_message(:tick!, id: id)
end

private

def send_message(*message)
payload = Marshal.dump(message)
@pipe.write([payload.bytesize].pack("L").b, payload)
end
end

def monitor
@monitor ||= Monitor.new(@redis_url, key('running'), key('processed'))
end

def custom_config
return unless config.debug_log

Expand Down
129 changes: 129 additions & 0 deletions ruby/lib/ci/queue/redis/monitor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#!/usr/bin/env -S ruby --disable-gems
# typed: false
# frozen_string_literal: true

require 'logger'
require 'redis'

module CI
module Queue
module Redis
class Monitor

def initialize(pipe, logger, redis_url, zset_key, processed_key)
@zset_key = zset_key
@processed_key = processed_key
@logger = logger
@redis = ::Redis.new(url: redis_url)
@shutdown = false
@pipe = pipe
@self_pipe_reader, @self_pipe_writer = IO.pipe
@self_pipe_writer.sync = true
@queue = []
@deadlines = {}
%i[TERM INT USR1].each do |sig|
Signal.trap(sig) { soft_signal(sig) }
end
end

def soft_signal(sig)
@queue << sig
@self_pipe_writer << '.'
end

HEARTBEAT_SCRIPT = <<~LUA
local zset_key = KEYS[1]
local processed_key = KEYS[2]
local current_time = ARGV[1]
local test = ARGV[2]
if redis.call('sismember', processed_key, test) ~= 1 then
redis.call('zadd', zset_key, current_time, test)
end
LUA
def process_tick!(id:)
@script ||= @redis.script(:load, HEARTBEAT_SCRIPT)
@redis.evalsha(@script, keys: [@zset_key, @processed_key], argv: [Time.now.to_f, id])
end

HEADER = 'L'
HEADER_SIZE = [0].pack(HEADER).bytesize
def read_message(io)
case header = io.read_nonblock(HEADER_SIZE, exception: false)
when :wait_readable
nil
when nil
@logger.debug('Broken pipe, exiting')
@shutdown = 0
false
else
Marshal.load(io.read(header.unpack1(HEADER)))
end
end

def process_messages(io)
while (message = read_message(io))
type, kwargs = message
public_send("process_#{type}", **kwargs)
end
end

def wait_for_events(ios)
return if @shutdown

return unless (ready = IO.select(ios, nil, nil, 10))

ready[0].each do |io|
case io
when @self_pipe_reader
io.read_nonblock(512, exception: false) # Just flush the pipe, the information is in the @queue
when @pipe
process_messages(@pipe)
else
raise "Unknown reader: #{io.inspect}"
end
end
end

def monitor
ios = [@self_pipe_reader, @pipe]

until @shutdown
while (sig = @queue.shift)
case sig
when :INT, :TERM
@logger.debug("Received #{sig}, exiting")
@shutdown = 0
break
else
raise "Unknown signal: #{sig.inspect}"
end
end

wait_for_events(ios)
end

@logger.debug('Done')
@shutdown
end
end
end
end
end

logger = Logger.new($stderr)
if ARGV.include?('-v')
logger.level = Logger::DEBUG
else
logger.level = Logger::INFO
logger.formatter = ->(_severity, _timestamp, _progname, msg) { "#{msg}\n" }
end

redis_url = ARGV[0]
zset_key = ARGV[1]
processed_key = ARGV[2]
manager = CI::Queue::Redis::Monitor.new($stdin, logger, redis_url, zset_key, processed_key)
$stdout << "."
$stdout.close
exit(manager.monitor)
11 changes: 6 additions & 5 deletions ruby/lib/ci/queue/redis/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ def release!
nil
end

def timeout
config.timeout
end

private

attr_reader :index
Expand All @@ -144,10 +148,6 @@ def worker_id
config.worker_id
end

def timeout
config.timeout
end

def raise_on_mismatching_test(test)
if @reserved_test == test
@reserved_test = nil
Expand Down Expand Up @@ -179,6 +179,7 @@ def try_to_reserve_test
)
end

MAX_MISSED_HEARTBEATS = 10
def try_to_reserve_lost_test
lost_test = eval_script(
:reserve_lost,
Expand All @@ -188,7 +189,7 @@ def try_to_reserve_lost_test
key('worker', worker_id, 'queue'),
key('owners'),
],
argv: [CI::Queue.time_now.to_f, timeout],
argv: [CI::Queue.time_now.to_f, MAX_MISSED_HEARTBEATS],
)

if lost_test
Expand Down
12 changes: 12 additions & 0 deletions ruby/lib/ci/queue/static.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,18 @@ def populate(tests, random: nil)
self
end

def timeout
config.timeout
end

Monitor = Struct.new(:boot!, :shutdown!) do
def tick!(_)
end
end
def monitor
@monitor ||= Monitor.new
end

def created_at=(timestamp)
@created_at ||= timestamp
end
Expand Down
65 changes: 64 additions & 1 deletion ruby/lib/minitest/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,69 @@ def __run(*args)
end
end

class State
def initialize
@state = nil
@mutex = Mutex.new
@cond = ConditionVariable.new
end

def set(*state)
@state = state
@mutex.synchronize do
@cond.broadcast
end
end

def wait(timeout)
@mutex.synchronize do
@cond.wait(@mutex, timeout)
end
@state
end
end

def state
@state ||= State.new
end

def heartbeat
Thread.current.name = "CI::Queue#heartbeat"

timeout = queue.config.timeout.to_i
loop do
command = nil
command = state.wait(1) # waits for max 1 second but wakes up immediately if we receive a command

case command.first
when :tick
if timeout > 0
queue.monitor.tick!(command.last)
timeout -= 1
end
when :reset
timeout = queue.config.timeout.to_i
when :stop
break
end
end
end

def with_heartbeat(example)
state.set(:tick, example)
yield
ensure
state.set(:reset)
end

def run_from_queue(reporter, *)
heartbeat = Thread.start { heartbeat }

queue.poll do |example|
result = example.run
result = with_heartbeat(example) do
example.run
end

failed = !(result.passed? || result.skipped?)

if example.flaky?
Expand Down Expand Up @@ -256,6 +316,9 @@ def run_from_queue(reporter, *)
reporter.record(result)
end
end

state.set(:stop)
queue.monitor.shutdown!
end
end
end
Expand Down
1 change: 1 addition & 0 deletions ruby/lib/minitest/queue/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def run_command
end

queue.rescue_connection_errors { queue.created_at = CI::Queue.time_now.to_f }
queue.monitor.boot!

set_load_path
Minitest.queue = queue
Expand Down

0 comments on commit b799cd6

Please sign in to comment.