Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade ruby to v3.2 as 2.7 is not maintained anymore. #345

Merged
merged 12 commits into from
Oct 18, 2023
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ jobs:
- name: Setup Ruby
uses: ruby/setup-ruby@v1
with:
ruby-version: 2.7
ruby-version: 3.2
bundler-cache: true

- name: Starting up MySQL
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/tests_5.7.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ jobs:
- name: Setup Ruby
uses: ruby/setup-ruby@v1
with:
ruby-version: 2.7
ruby-version: 3.2
bundler-cache: true

- name: Starting up MySQL
Expand Down
2 changes: 2 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ source "https://rubygems.org"
group :test do
gem "minitest"
gem "mysql2"
gem "webrick"

gem "minitest-hooks"
gem "minitest-reporters", "~> 1.4"
gem "minitest-retry"
gem "minitest-fail-fast", "~> 0.1.0"
end

Expand Down
23 changes: 14 additions & 9 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,28 @@ GEM
byebug (11.1.3)
coderay (1.1.3)
method_source (1.0.0)
minitest (5.14.4)
minitest (5.20.0)
minitest-fail-fast (0.1.0)
minitest (~> 5)
minitest-hooks (1.5.0)
minitest-hooks (1.5.1)
minitest (> 5.3)
minitest-reporters (1.4.3)
minitest-reporters (1.6.1)
ansi
builder
minitest (>= 5.0)
ruby-progressbar
mysql2 (0.5.3)
pry (0.13.1)
minitest-retry (0.2.2)
minitest (>= 5.0)
mysql2 (0.5.5)
pry (0.14.2)
coderay (~> 1.1)
method_source (~> 1.0)
pry-byebug (3.9.0)
pry-byebug (3.10.1)
byebug (~> 11.0)
pry (~> 0.13.0)
ruby-progressbar (1.11.0)
tqdm (0.3.0)
pry (>= 0.13, < 0.15)
ruby-progressbar (1.13.0)
tqdm (0.4.1)
webrick (1.8.1)

PLATFORMS
ruby
Expand All @@ -34,9 +37,11 @@ DEPENDENCIES
minitest-fail-fast (~> 0.1.0)
minitest-hooks
minitest-reporters (~> 1.4)
minitest-retry
mysql2
pry-byebug
tqdm
webrick

BUNDLED WITH
2.2.22
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ test-go:

test-ruby:
bundle install
ruby test/main.rb
bundle exec ruby test/main.rb

test: test-go test-ruby

Expand Down
2 changes: 1 addition & 1 deletion dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ up:
or: [mysql@5.7]
conflicts: [mysql-connector-c, mysql, mysql-client]

- ruby: "2.7.3"
- ruby: "3.2.2"
- bundler
- go:
version: "1.16"
Expand Down
1 change: 1 addition & 0 deletions ferry.go
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,7 @@ func (f *Ferry) WaitUntilBinlogStreamerCatchesUp() {
// You will know that the BinlogStreamer finished when .Run() returns.
func (f *Ferry) FlushBinlogAndStopStreaming() {
if f.WaitUntilReplicaIsCaughtUpToMaster != nil {
f.logger.Info("flush binlog and stop streaming: wait until replica is caught up to master")
isReplica, err := CheckDbIsAReplica(f.WaitUntilReplicaIsCaughtUpToMaster.MasterDB)
if err != nil {
f.ErrorHandler.Fatal("wait_replica", err)
Expand Down
91 changes: 59 additions & 32 deletions test/helpers/ghostferry_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
require "tmpdir"
require "webrick"
require "cgi"
require "securerandom"

module GhostferryHelper
GHOSTFERRY_TEMPDIR = File.join(Dir.tmpdir, "ghostferry-integration")
Expand All @@ -14,8 +15,6 @@ def self.remove_all_binaries
FileUtils.remove_entry(GHOSTFERRY_TEMPDIR) if Dir.exist?(GHOSTFERRY_TEMPDIR)
end

class GhostferryExitFailure < StandardError
end

class Ghostferry
# Manages compiling, running, and communicating with Ghostferry.
Expand All @@ -32,6 +31,10 @@ class Ghostferry
# Keep these in sync with integrationferry.go
ENV_KEY_PORT = "GHOSTFERRY_INTEGRATION_PORT"

Error = Class.new(StandardError)
ExitError = Class.new(Error)
TimeoutError = Class.new(Error)

module Status
# This should be in sync with integrationferry.go
READY = "READY"
Expand All @@ -47,14 +50,12 @@ module Status
AFTER_BINLOG_APPLY = "AFTER_BINLOG_APPLY"
end

attr_reader :stdout, :stderr, :logrus_lines, :exit_status, :pid, :error, :error_lines
attr_reader :stdout, :stderr, :logrus_lines, :exit_status, :pid, :error, :error_lines, :tag

def initialize(main_path, config: {}, logger: nil, message_timeout: 30, port: 39393)
@logger = logger
if @logger.nil?
@logger = Logger.new(STDOUT)
@logger.level = Logger::DEBUG
end
def initialize(main_path, config: {}, log_capturer:, message_timeout: 30, port: 39393)
@log_capturer = log_capturer
@logger = log_capturer.logger
@tag = SecureRandom.hex[0..3]

@main_path = main_path
@config = config
Expand Down Expand Up @@ -97,6 +98,7 @@ def initialize(main_path, config: {}, logger: nil, message_timeout: 30, port: 39

# The main method to call to run a Ghostferry subprocess.
def run(resuming_state = nil)
@logger.info("[#{@tag}] ghostferry#run(state:#{(!resuming_state.nil?).inspect})")
resuming_state = JSON.generate(resuming_state) if resuming_state.is_a?(Hash)

compile_binary
Expand All @@ -115,21 +117,26 @@ def run(resuming_state = nil)
# When using this method, you need to ensure that the datawriter has been
# stopped properly (if you're using stop_datawriter_during_cutover).
def run_expecting_interrupt(resuming_state = nil)
@logger.info("[#{@tag}] ghostferry#run_expecting_interrupt(state:#{(!resuming_state.nil?).inspect})")
run(resuming_state)
rescue GhostferryExitFailure
rescue ExitError
@logger.info("[#{@tag}] ghostferry#run_expecting_interrupt: got Ghostferry::ExitError")
dumped_state = @stdout.join("")
JSON.parse(dumped_state)
else
@logger.error("[#{@tag}] ghostferry#run_expecting_interrupt: something's wrong")
raise "Ghostferry did not get interrupted"
end

# Same as above - ensure that the datawriter has been
# stopped properly (if you're using stop_datawriter_during_cutover).
def run_expecting_failure(resuming_state = nil)
@logger.info("[#{@tag}] ghostferry#run_expecting_failure(state:#{(!resuming_state.nil?).inspect})")
run(resuming_state)
rescue GhostferryExitFailure
rescue ExitError
@logger.info("[#{@tag}] ghostferry#run_expecting_failure: got Ghostferry::ExitError")
else
raise "Ghostferry did not fail"
raise "[#{@tag}] Ghostferry did not fail"
end

def run_with_logs(resuming_state = nil)
Expand All @@ -143,24 +150,25 @@ def run_with_logs(resuming_state = nil)
def compile_binary
return if File.exist?(@compiled_binary_path)

@logger.debug("compiling test binary to #{@compiled_binary_path}")
@logger.debug("[#{@tag}] compiling test binary to #{@compiled_binary_path}")
rc = system(
"go", "build",
"-o", @compiled_binary_path,
@main_path
)

raise "could not compile ghostferry" unless rc
raise "[#{@tag}] could not compile ghostferry" unless rc
end

def start_server
@server_last_error = nil

@last_message_time = Time.now
@last_message_time = now
@server = WEBrick::HTTPServer.new(
BindAddress: "127.0.0.1",
Port: @server_port,
Logger: @logger,
MaxClients: 1024,
AccessLog: [],
)

Expand All @@ -174,19 +182,27 @@ def start_server

query = CGI::parse(req.body)

status = query["status"]
data = query["data"]
statuses = Array(query["status"])

unless status
if statuses.empty?
@server_last_error = ArgumentError.new("Ghostferry is improperly implemented and did not send a status")
resp.status = 400
@server.shutdown
elsif statuses.size > 1
@logger.warn("[#{@tag}] Got multiple statuses at once: #{statuses.inspect}")
puts "Got multiple statuses at once: #{statuses.inspect}"
end

status = status.first
@last_message_time = now

data = query["data"]

@last_message_time = Time.now
@status_handlers[status].each { |f| f.call(*data) } unless @status_handlers[status].nil?
@logger.info("[#{@tag}] server: got / with #{statuses.inspect}")
statuses.each do |status|
next if @status_handlers[status].nil?

@status_handlers[status].each { |f| f.call(*data) }
end
rescue StandardError => e
# errors are not reported from WEBrick but the server should fail early
# as this indicates there is likely a programming error.
Expand All @@ -197,6 +213,7 @@ def start_server

@server.mount_proc "/callbacks/progress" do |req, resp|
begin
@logger.info("[#{@tag}] server: got /callbacks/progress")
unless req.body
@server_last_error = ArgumentError.new("Ghostferry is improperly implemented and did not send data")
resp.status = 400
Expand All @@ -211,6 +228,7 @@ def start_server

@server.mount_proc "/callbacks/state" do |req, resp|
begin
@logger.info("[#{@tag}] server: got /callbacks/state")
unless req.body
@server_last_error = ArgumentError.new("Ghostferry is improperly implemented and did not send data")
resp.status = 400
Expand All @@ -223,14 +241,15 @@ def start_server
end

@server.mount_proc "/callbacks/error" do |req, resp|
@logger.info("[#{@tag}] server: got /callbacks/error")
@error = JSON.parse(JSON.parse(req.body)["Payload"])
@callback_handlers["error"].each { |f| f.call(@error) } unless @callback_handlers["error"].nil?
end

@server_thread = Thread.new do
@logger.debug("starting server thread")
@logger.debug("[#{@tag}] starting server thread")
@server.start
@logger.debug("server thread stopped")
@logger.debug("[#{@tag}] server thread stopped")
end
end

Expand Down Expand Up @@ -270,7 +289,7 @@ def start_ghostferry(resuming_state = nil)
environment["GHOSTFERRY_MARGINALIA"] = @config[:marginalia]
end

@logger.debug("starting ghostferry test binary #{@compiled_binary_path}")
@logger.debug("[#{@tag}] starting ghostferry test binary #{@compiled_binary_path}")
Open3.popen3(environment, @compiled_binary_path) do |stdin, stdout, stderr, wait_thr|
stdin.puts(resuming_state) unless resuming_state.nil?
stdin.close
Expand All @@ -292,7 +311,7 @@ def start_ghostferry(resuming_state = nil)

if reader == stdout
@stdout << line
@logger.debug("stdout: #{line}")
@logger.debug("[#{@tag}] stdout: #{line}")
elsif reader == stderr
@stderr << line
if json_log_line?(line)
Expand All @@ -305,8 +324,11 @@ def start_ghostferry(resuming_state = nil)
if logline["level"] == "error"
@error_lines << logline
end

@logger.debug("[#{@tag}] stderr: #{line}") unless tag.end_with?("_binlog_streamer")
else
@logger.debug("[#{@tag}] stderr: #{line}")
end
@logger.debug("stderr: #{line}")
end
end
end
Expand All @@ -315,9 +337,9 @@ def start_ghostferry(resuming_state = nil)
@pid = 0
end

@logger.debug("ghostferry test binary exitted: #{@exit_status}")
@logger.debug("[#{@tag}] ghostferry test binary exitted: #{@exit_status}")
if @exit_status.exitstatus != 0
raise GhostferryExitFailure, "ghostferry test binary returned non-zero status: #{@exit_status}"
raise ExitError, "[#{@tag}] ghostferry test binary returned non-zero status: #{@exit_status}"
end
end
end
Expand All @@ -328,16 +350,17 @@ def start_server_watchdog
# HTTP server to free up the port.
@server_watchdog_thread = Thread.new do
while @subprocess_thread.alive? do
if Time.now - @last_message_time > @message_timeout
if (now - @last_message_time) > @message_timeout
@server.shutdown
raise "ghostferry did not report to the integration test server for the last #{@message_timeout}s"
@log_capturer.print_output
raise TimeoutError, "[#{@tag}] ghostferry did not report to the integration test server for the last #{@message_timeout}s"
end

sleep 1
end

@server.shutdown
@logger.debug("server watchdog thread stopped")
@logger.debug("[#{@tag}] server watchdog thread stopped")
end

@server_watchdog_thread.abort_on_exception = true
Expand Down Expand Up @@ -387,7 +410,7 @@ def kill

begin
@subprocess_thread.join if @subprocess_thread
rescue GhostferryExitFailure
rescue ExitError
# ignore
end
end
Expand All @@ -405,5 +428,9 @@ def with_env(key, value)
ensure
ENV[key] = previous_value
end

def now
::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
end
end
end
Loading
Loading