From b799cd6f3da77bfa51b74a7485a8a9b396479eec Mon Sep 17 00:00:00 2001 From: Christian Bruckmayer Date: Tue, 23 Jan 2024 11:18:14 +0000 Subject: [PATCH] Add heartbeat --- ruby/lib/ci/queue/redis/base.rb | 49 +++++++++++ ruby/lib/ci/queue/redis/monitor.rb | 129 +++++++++++++++++++++++++++++ ruby/lib/ci/queue/redis/worker.rb | 11 +-- ruby/lib/ci/queue/static.rb | 12 +++ ruby/lib/minitest/queue.rb | 65 ++++++++++++++- ruby/lib/minitest/queue/runner.rb | 1 + 6 files changed, 261 insertions(+), 6 deletions(-) create mode 100755 ruby/lib/ci/queue/redis/monitor.rb diff --git a/ruby/lib/ci/queue/redis/base.rb b/ruby/lib/ci/queue/redis/base.rb index 7453ac50..8be02f9b 100644 --- a/ruby/lib/ci/queue/redis/base.rb +++ b/ruby/lib/ci/queue/redis/base.rb @@ -44,6 +44,55 @@ def initialize(redis_url, config) end end + class Monitor + def initialize(redis_url, zset_key, processed_key) + @redis_url = redis_url + @zset_key = zset_key + @processed_key = processed_key + end + + def boot! + child_read, @pipe = IO.pipe + ready_pipe, child_write = IO.pipe + @pipe.binmode + @pid = Process.spawn( + RbConfig.ruby, + ::File.join(__dir__, "monitor.rb"), + @redis_url, + @zset_key, + @processed_key, + in: child_read, + out: child_write, + ) + child_write.close + child_read.close + + # Check the process is alive. + ready_pipe.wait_readable(2) + Process.kill(0, @pid) + @pipe + end + + def shutdown! + @pipe.close + end + + def tick!(id) + send_message(:tick!, id: id) + end + + private + + def send_message(*message) + payload = Marshal.dump(message) + @pipe.write([payload.bytesize].pack("L").b, payload) + end + end + + def monitor + @monitor ||= Monitor.new(@redis_url, key('running'), key('processed')) + end + def custom_config return unless config.debug_log diff --git a/ruby/lib/ci/queue/redis/monitor.rb b/ruby/lib/ci/queue/redis/monitor.rb new file mode 100755 index 00000000..3059cb7e --- /dev/null +++ b/ruby/lib/ci/queue/redis/monitor.rb @@ -0,0 +1,129 @@ +#!/usr/bin/env -S ruby --disable-gems +# typed: false +# frozen_string_literal: true + +require 'logger' +require 'redis' + +module CI + module Queue + module Redis + class Monitor + + def initialize(pipe, logger, redis_url, zset_key, processed_key) + @zset_key = zset_key + @processed_key = processed_key + @logger = logger + @redis = ::Redis.new(url: redis_url) + @shutdown = false + @pipe = pipe + @self_pipe_reader, @self_pipe_writer = IO.pipe + @self_pipe_writer.sync = true + @queue = [] + @deadlines = {} + %i[TERM INT USR1].each do |sig| + Signal.trap(sig) { soft_signal(sig) } + end + end + + def soft_signal(sig) + @queue << sig + @self_pipe_writer << '.' + end + + HEARTBEAT_SCRIPT = <<~LUA + local zset_key = KEYS[1] + local processed_key = KEYS[2] + + local current_time = ARGV[1] + local test = ARGV[2] + + if redis.call('sismember', processed_key, test) ~= 1 then + redis.call('zadd', zset_key, current_time, test) + end + LUA + def process_tick!(id:) + @script ||= @redis.script(:load, HEARTBEAT_SCRIPT) + @redis.evalsha(@script, keys: [@zset_key, @processed_key], argv: [Time.now.to_f, id]) + end + + HEADER = 'L' + HEADER_SIZE = [0].pack(HEADER).bytesize + def read_message(io) + case header = io.read_nonblock(HEADER_SIZE, exception: false) + when :wait_readable + nil + when nil + @logger.debug('Broken pipe, exiting') + @shutdown = 0 + false + else + Marshal.load(io.read(header.unpack1(HEADER))) + end + end + + def process_messages(io) + while (message = read_message(io)) + type, kwargs = message + public_send("process_#{type}", **kwargs) + end + end + + def wait_for_events(ios) + return if @shutdown + + return unless (ready = IO.select(ios, nil, nil, 10)) + + ready[0].each do |io| + case io + when @self_pipe_reader + io.read_nonblock(512, exception: false) # Just flush the pipe, the information is in the @queue + when @pipe + process_messages(@pipe) + else + raise "Unknown reader: #{io.inspect}" + end + end + end + + def monitor + ios = [@self_pipe_reader, @pipe] + + until @shutdown + while (sig = @queue.shift) + case sig + when :INT, :TERM + @logger.debug("Received #{sig}, exiting") + @shutdown = 0 + break + else + raise "Unknown signal: #{sig.inspect}" + end + end + + wait_for_events(ios) + end + + @logger.debug('Done') + @shutdown + end + end + end + end +end + +logger = Logger.new($stderr) +if ARGV.include?('-v') + logger.level = Logger::DEBUG +else + logger.level = Logger::INFO + logger.formatter = ->(_severity, _timestamp, _progname, msg) { "#{msg}\n" } +end + +redis_url = ARGV[0] +zset_key = ARGV[1] +processed_key = ARGV[2] +manager = CI::Queue::Redis::Monitor.new($stdin, logger, redis_url, zset_key, processed_key) +$stdout << "." +$stdout.close +exit(manager.monitor) diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 1fa7df88..d77a1429 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -136,6 +136,10 @@ def release! nil end + def timeout + config.timeout + end + private attr_reader :index @@ -144,10 +148,6 @@ def worker_id config.worker_id end - def timeout - config.timeout - end - def raise_on_mismatching_test(test) if @reserved_test == test @reserved_test = nil @@ -179,6 +179,7 @@ def try_to_reserve_test ) end + MAX_MISSED_HEARTBEATS = 10 def try_to_reserve_lost_test lost_test = eval_script( :reserve_lost, @@ -188,7 +189,7 @@ def try_to_reserve_lost_test key('worker', worker_id, 'queue'), key('owners'), ], - argv: [CI::Queue.time_now.to_f, timeout], + argv: [CI::Queue.time_now.to_f, MAX_MISSED_HEARTBEATS], ) if lost_test diff --git a/ruby/lib/ci/queue/static.rb b/ruby/lib/ci/queue/static.rb index 9d470d33..4d1981fd 100644 --- a/ruby/lib/ci/queue/static.rb +++ b/ruby/lib/ci/queue/static.rb @@ -48,6 +48,18 @@ def populate(tests, random: nil) self end + def timeout + config.timeout + end + + Monitor = Struct.new(:boot!, :shutdown!) do + def tick!(_) + end + end + def monitor + @monitor ||= Monitor.new + end + def created_at=(timestamp) @created_at ||= timestamp end diff --git a/ruby/lib/minitest/queue.rb b/ruby/lib/minitest/queue.rb index c326668d..a44c876a 100644 --- a/ruby/lib/minitest/queue.rb +++ b/ruby/lib/minitest/queue.rb @@ -224,9 +224,69 @@ def __run(*args) end end + class State + def initialize + @state = nil + @mutex = Mutex.new + @cond = ConditionVariable.new + end + + def set(*state) + @state = state + @mutex.synchronize do + @cond.broadcast + end + end + + def wait(timeout) + @mutex.synchronize do + @cond.wait(@mutex, timeout) + end + @state + end + end + + def state + @state ||= State.new + end + + def heartbeat + Thread.current.name = "CI::Queue#heartbeat" + + timeout = queue.config.timeout.to_i + loop do + command = nil + command = state.wait(1) # waits for max 1 second but wakes up immediately if we receive a command + + case command.first + when :tick + if timeout > 0 + queue.monitor.tick!(command.last) + timeout -= 1 + end + when :reset + timeout = queue.config.timeout.to_i + when :stop + break + end + end + end + + def with_heartbeat(example) + state.set(:tick, example) + yield + ensure + state.set(:reset) + end + def run_from_queue(reporter, *) + heartbeat = Thread.start { heartbeat } + queue.poll do |example| - result = example.run + result = with_heartbeat(example) do + example.run + end + failed = !(result.passed? || result.skipped?) if example.flaky? @@ -256,6 +316,9 @@ def run_from_queue(reporter, *) reporter.record(result) end end + + state.set(:stop) + queue.monitor.shutdown! end end end diff --git a/ruby/lib/minitest/queue/runner.rb b/ruby/lib/minitest/queue/runner.rb index d1a9e230..c5a4d33f 100644 --- a/ruby/lib/minitest/queue/runner.rb +++ b/ruby/lib/minitest/queue/runner.rb @@ -64,6 +64,7 @@ def run_command end queue.rescue_connection_errors { queue.created_at = CI::Queue.time_now.to_f } + queue.monitor.boot! set_load_path Minitest.queue = queue