Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisBr committed Jan 25, 2024
1 parent c8f9bd9 commit 4500028
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 4 deletions.
2 changes: 1 addition & 1 deletion ruby/lib/ci/queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def initialize(
grind_count: nil, max_duration: nil, failure_file: nil, max_test_duration: nil,
max_test_duration_percentile: 0.5, track_test_duration: false, max_test_failed: nil,
queue_init_timeout: nil, redis_ttl: 8 * 60 * 60, report_timeout: nil, inactive_workers_timeout: nil,
export_flaky_tests_file: nil, warnings_file: nil, debug_log: nil, max_missed_heartbeat_seconds: 10)
export_flaky_tests_file: nil, warnings_file: nil, debug_log: nil, max_missed_heartbeat_seconds: 15)
@build_id = build_id
@circuit_breakers = [CircuitBreaker::Disabled]
@failure_file = failure_file
Expand Down
12 changes: 9 additions & 3 deletions ruby/lib/ci/queue/redis/monitor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def initialize(pipe, logger, redis_url, zset_key, processed_key)
@zset_key = zset_key
@processed_key = processed_key
@logger = logger
@redis = ::Redis.new(url: redis_url)
@redis = ::Redis.new(url: redis_url, reconnect_attempts: [0, 0, 0.1, 0.5, 1, 3, 5])
@shutdown = false
@pipe = pipe
@self_pipe_reader, @self_pipe_writer = IO.pipe
Expand Down Expand Up @@ -43,8 +43,11 @@ def soft_signal(sig)
end
LUA
def process_tick!(id:)
logger.info("Tick: #{id}")
@script ||= @redis.script(:load, HEARTBEAT_SCRIPT)
@redis.evalsha(@script, keys: [@zset_key, @processed_key], argv: [Time.now.to_f, id])
rescue
logger.info("Tick: #{id}")
end

HEADER = 'L'
Expand All @@ -54,7 +57,7 @@ def read_message(io)
when :wait_readable
nil
when nil
@logger.debug('Broken pipe, exiting')
@logger.info('Broken pipe, exiting')
@shutdown = 0
false
else
Expand All @@ -81,19 +84,21 @@ def wait_for_events(ios)
when @pipe
process_messages(@pipe)
else
@logger.info("Unknown reader: #{io.inspect}")
raise "Unknown reader: #{io.inspect}"
end
end
end

def monitor
@logger.info("Starting monitor")
ios = [@self_pipe_reader, @pipe]

until @shutdown
while (sig = @queue.shift)
case sig
when :INT, :TERM
@logger.debug("Received #{sig}, exiting")
@logger.info("Received #{sig}, exiting")
@shutdown = 0
break
else
Expand Down Expand Up @@ -123,6 +128,7 @@ def monitor
redis_url = ARGV[0]
zset_key = ARGV[1]
processed_key = ARGV[2]
logger.info("Starting monitor: #{redis_url} #{zset_key} #{processed_key}")
manager = CI::Queue::Redis::Monitor.new($stdin, logger, redis_url, zset_key, processed_key)
$stdout << "."
$stdout.close
Expand Down
1 change: 1 addition & 0 deletions ruby/lib/minitest/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ def heartbeat
case command.first
when :tick
if timeout > 0
puts "Tick: #{command.last}"
queue.monitor.tick!(command.last)
timeout -= 1
end
Expand Down

0 comments on commit 4500028

Please sign in to comment.