diff --git a/ruby/lib/ci/queue/redis/base.rb b/ruby/lib/ci/queue/redis/base.rb index 7453ac50..be72d1d3 100644 --- a/ruby/lib/ci/queue/redis/base.rb +++ b/ruby/lib/ci/queue/redis/base.rb @@ -44,6 +44,56 @@ def initialize(redis_url, config) end end + class Monitor + def initialize(redis_url, zset_key, processed_key) + @redis_url = redis_url + @zset_key = zset_key + @processed_key = processed_key + end + + def boot! + child_read, @pipe = IO.pipe + ready_pipe, child_write = IO.pipe + @pipe.binmode + + @pid = Process.spawn( + RbConfig.ruby, + ::File.join(__dir__, "monitor.rb"), + @redis_url, + @zset_key, + @processed_key, + in: child_read, + out: child_write, + ) + child_write.close + child_read.close + + # Check the process is alive. + ready_pipe.wait_readable(2) + Process.kill(0, @pid) + @pipe + end + + def shutdown! + @pipe.close + end + + def tick!(id) + send_message(:tick!, id: id) + end + + private + + def send_message(*message) + payload = Marshal.dump(message) + @pipe.write([payload.bytesize].pack("L").b, payload) + end + end + + def monitor + @monitor ||= Monitor.new(@redis_url, key('running'), key('processed')) + end + def custom_config return unless config.debug_log diff --git a/ruby/lib/ci/queue/redis/monitor.rb b/ruby/lib/ci/queue/redis/monitor.rb new file mode 100755 index 00000000..517304f9 --- /dev/null +++ b/ruby/lib/ci/queue/redis/monitor.rb @@ -0,0 +1,127 @@ +#!/usr/bin/env -S ruby --disable-gems +# typed: false +# frozen_string_literal: true + +require 'logger' +require 'redis' + +module CI + module Queue + module Redis + class Monitor + def initialize(pipe, logger, redis_url, zset_key, processed_key) + @zset_key = zset_key + @processed_key = processed_key + @redis = ::Redis.new(url: redis_url) + @shutdown = false + @logger = logger + @pipe = pipe + @self_pipe_reader, @self_pipe_writer = IO.pipe + @self_pipe_writer.sync = true + @queue = [] + @deadlines = {} + %i[TERM INT USR1].each do |sig| + Signal.trap(sig) { soft_signal(sig) } + end + end + + def soft_signal(sig) + @queue << sig + @self_pipe_writer << '.' + end + + HEARTBEAT_SCRIPT = <<~LUA + local zset_key = KEYS[1] + local processed_key = KEYS[2] + + local current_time = ARGV[1] + local test = ARGV[2] + + if redis.call('sismember', processed_key, test) ~= 1 then + redis.call('zadd', zset_key, current_time, test) + end + LUA + def process_tick!(id:) + @script ||= @redis.script(:load, HEARTBEAT_SCRIPT) + @redis.evalsha(@script, keys: [@zset_key, @processed_key], argv: [Time.now.to_f, id]) + end + + HEADER = 'L' + HEADER_SIZE = [0].pack(HEADER).bytesize + def read_message(io) + case header = io.read_nonblock(HEADER_SIZE, exception: false) + when :wait_readable + nil + when nil + @logger.debug('Broken pipe, exiting') + @shutdown = 0 + false + else + Marshal.load(io.read(header.unpack1(HEADER))) + end + end + + def process_messages(io) + while (message = read_message(io)) + type, kwargs = message + public_send("process_#{type}", **kwargs) + end + end + + def wait_for_events(ios) + return if @shutdown + + return unless (ready = IO.select(ios, nil, nil, 10)) + + ready[0].each do |io| + case io + when @self_pipe_reader + io.read_nonblock(512, exception: false) # Just flush the pipe, the information is in the @queue + when @pipe + process_messages(@pipe) + else + raise "Unknown reader: #{io.inspect}" + end + end + end + + def monitor + ios = [@self_pipe_reader, @pipe] + + until @shutdown + while (sig = @queue.shift) + case sig + when :INT, :TERM + @logger.debug("Received #{sig}, exiting") + @shutdown = 0 + break + else + raise "Unknown signal: #{sig.inspect}" + end + end + + wait_for_events(ios) + end + + @logger.debug('Done') + @shutdown + end + end + end + end +end + +logger = Logger.new($stderr) +if ARGV.include?('-v') + logger.level = Logger::DEBUG +else + logger.level = Logger::INFO + logger.formatter = ->(_severity, _timestamp, _progname, msg) { "#{msg}\n" } +end + +redis_url = ARGV[0] +zset_key = ARGV[1] +processed_key = ARGV[2] +manager = CI::Queue::Redis::Monitor.new($stdin, logger, redis_url, zset_key, processed_key) +$stdout.close +exit(manager.monitor) diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 1fa7df88..d77a1429 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -136,6 +136,10 @@ def release! nil end + def timeout + config.timeout + end + private attr_reader :index @@ -144,10 +148,6 @@ def worker_id config.worker_id end - def timeout - config.timeout - end - def raise_on_mismatching_test(test) if @reserved_test == test @reserved_test = nil @@ -179,6 +179,7 @@ def try_to_reserve_test ) end + MAX_MISSED_HEARTBEATS = 10 def try_to_reserve_lost_test lost_test = eval_script( :reserve_lost, @@ -188,7 +189,7 @@ def try_to_reserve_lost_test key('worker', worker_id, 'queue'), key('owners'), ], - argv: [CI::Queue.time_now.to_f, timeout], + argv: [CI::Queue.time_now.to_f, MAX_MISSED_HEARTBEATS], ) if lost_test diff --git a/ruby/lib/ci/queue/static.rb b/ruby/lib/ci/queue/static.rb index 9d470d33..e8fc7f0c 100644 --- a/ruby/lib/ci/queue/static.rb +++ b/ruby/lib/ci/queue/static.rb @@ -48,6 +48,12 @@ def populate(tests, random: nil) self end + def timeout + config.timeout + end + + def heartbeat(_); end + def created_at=(timestamp) @created_at ||= timestamp end diff --git a/ruby/lib/minitest/queue.rb b/ruby/lib/minitest/queue.rb index c326668d..ab7d3374 100644 --- a/ruby/lib/minitest/queue.rb +++ b/ruby/lib/minitest/queue.rb @@ -226,7 +226,10 @@ def __run(*args) def run_from_queue(reporter, *) queue.poll do |example| - result = example.run + result = with_heartbeat(example) do + example.run + end + failed = !(result.passed? || result.skipped?) if example.flaky? @@ -256,6 +259,25 @@ def run_from_queue(reporter, *) reporter.record(result) end end + + queue.monitor.shutdown! + end + + def with_heartbeat(example) + finished = false + t = Thread.start do + heartbeats = queue.config.timeout.to_f * 1_000 # convert to milliseconds + while heartbeats > 0 && !finished + queue.monitor.tick!(example.id) + heartbeats -= 1 + sleep 0.001 + end + end + + yield + ensure + finished = true + t.join end end end diff --git a/ruby/lib/minitest/queue/runner.rb b/ruby/lib/minitest/queue/runner.rb index d1a9e230..c5a4d33f 100644 --- a/ruby/lib/minitest/queue/runner.rb +++ b/ruby/lib/minitest/queue/runner.rb @@ -64,6 +64,7 @@ def run_command end queue.rescue_connection_errors { queue.created_at = CI::Queue.time_now.to_f } + queue.monitor.boot! set_load_path Minitest.queue = queue