diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 1cd54270..6d0fdce6 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -63,7 +63,7 @@ jobs: - name: Setup Ruby uses: ruby/setup-ruby@v1 with: - ruby-version: 2.7 + ruby-version: 3.2 bundler-cache: true - name: Starting up MySQL diff --git a/.github/workflows/tests_5.7.yml b/.github/workflows/tests_5.7.yml index 62aa65b3..19eb45c6 100644 --- a/.github/workflows/tests_5.7.yml +++ b/.github/workflows/tests_5.7.yml @@ -63,7 +63,7 @@ jobs: - name: Setup Ruby uses: ruby/setup-ruby@v1 with: - ruby-version: 2.7 + ruby-version: 3.2 bundler-cache: true - name: Starting up MySQL diff --git a/Gemfile b/Gemfile index 3ef9386a..b3cdbf46 100644 --- a/Gemfile +++ b/Gemfile @@ -3,9 +3,11 @@ source "https://rubygems.org" group :test do gem "minitest" gem "mysql2" + gem "webrick" gem "minitest-hooks" gem "minitest-reporters", "~> 1.4" + gem "minitest-retry" gem "minitest-fail-fast", "~> 0.1.0" end diff --git a/Gemfile.lock b/Gemfile.lock index d4898b6d..c83f95e1 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -6,25 +6,28 @@ GEM byebug (11.1.3) coderay (1.1.3) method_source (1.0.0) - minitest (5.14.4) + minitest (5.20.0) minitest-fail-fast (0.1.0) minitest (~> 5) - minitest-hooks (1.5.0) + minitest-hooks (1.5.1) minitest (> 5.3) - minitest-reporters (1.4.3) + minitest-reporters (1.6.1) ansi builder minitest (>= 5.0) ruby-progressbar - mysql2 (0.5.3) - pry (0.13.1) + minitest-retry (0.2.2) + minitest (>= 5.0) + mysql2 (0.5.5) + pry (0.14.2) coderay (~> 1.1) method_source (~> 1.0) - pry-byebug (3.9.0) + pry-byebug (3.10.1) byebug (~> 11.0) - pry (~> 0.13.0) - ruby-progressbar (1.11.0) - tqdm (0.3.0) + pry (>= 0.13, < 0.15) + ruby-progressbar (1.13.0) + tqdm (0.4.1) + webrick (1.8.1) PLATFORMS ruby @@ -34,9 +37,11 @@ DEPENDENCIES minitest-fail-fast (~> 0.1.0) minitest-hooks minitest-reporters (~> 1.4) + minitest-retry mysql2 pry-byebug tqdm + webrick BUNDLED WITH 2.2.22 diff --git a/Makefile b/Makefile index 4dc22e85..29416f0d 100644 --- a/Makefile +++ b/Makefile @@ -67,7 +67,7 @@ test-go: test-ruby: bundle install - ruby test/main.rb + bundle exec ruby test/main.rb test: test-go test-ruby diff --git a/dev.yml b/dev.yml index 283830b5..d533c53e 100644 --- a/dev.yml +++ b/dev.yml @@ -9,7 +9,7 @@ up: or: [mysql@5.7] conflicts: [mysql-connector-c, mysql, mysql-client] - - ruby: "2.7.3" + - ruby: "3.2.2" - bundler - go: version: "1.16" diff --git a/test/helpers/ghostferry_helper.rb b/test/helpers/ghostferry_helper.rb index 4b9295bc..cfd473dd 100644 --- a/test/helpers/ghostferry_helper.rb +++ b/test/helpers/ghostferry_helper.rb @@ -14,8 +14,6 @@ def self.remove_all_binaries FileUtils.remove_entry(GHOSTFERRY_TEMPDIR) if Dir.exist?(GHOSTFERRY_TEMPDIR) end - class GhostferryExitFailure < StandardError - end class Ghostferry # Manages compiling, running, and communicating with Ghostferry. @@ -23,7 +21,7 @@ class Ghostferry # # To use this class: # - # ghostferry = Ghostferry.new("path/to/main.go") + # ghostferry = Ghostferry.new("path/to/main.go", logger: Logger.new(STDOUT)) # ghostferry.on_status(Ghostferry::Status::BEFORE_ROW_COPY) do # # do custom work here, such as injecting data into the database # end @@ -32,6 +30,10 @@ class Ghostferry # Keep these in sync with integrationferry.go ENV_KEY_PORT = "GHOSTFERRY_INTEGRATION_PORT" + Error = Class.new(StandardError) + ExitError = Class.new(Error) + TimeoutError = Class.new(Error) + module Status # This should be in sync with integrationferry.go READY = "READY" @@ -49,12 +51,8 @@ module Status attr_reader :stdout, :stderr, :logrus_lines, :exit_status, :pid, :error, :error_lines - def initialize(main_path, config: {}, logger: nil, message_timeout: 30, port: 39393) + def initialize(main_path, config: {}, logger:, message_timeout: 30, port: 39393) @logger = logger - if @logger.nil? - @logger = Logger.new(STDOUT) - @logger.level = Logger::DEBUG - end @main_path = main_path @config = config @@ -116,7 +114,7 @@ def run(resuming_state = nil) # stopped properly (if you're using stop_datawriter_during_cutover). def run_expecting_interrupt(resuming_state = nil) run(resuming_state) - rescue GhostferryExitFailure + rescue ExitError dumped_state = @stdout.join("") JSON.parse(dumped_state) else @@ -127,7 +125,7 @@ def run_expecting_interrupt(resuming_state = nil) # stopped properly (if you're using stop_datawriter_during_cutover). def run_expecting_failure(resuming_state = nil) run(resuming_state) - rescue GhostferryExitFailure + rescue ExitError else raise "Ghostferry did not fail" end @@ -156,11 +154,12 @@ def compile_binary def start_server @server_last_error = nil - @last_message_time = Time.now + @last_message_time = now @server = WEBrick::HTTPServer.new( BindAddress: "127.0.0.1", Port: @server_port, Logger: @logger, + MaxClients: 1024, AccessLog: [], ) @@ -174,18 +173,16 @@ def start_server query = CGI::parse(req.body) - status = query["status"] + status = Array(query["status"]).first data = query["data"] - unless status + if status.nil? @server_last_error = ArgumentError.new("Ghostferry is improperly implemented and did not send a status") resp.status = 400 @server.shutdown end - status = status.first - - @last_message_time = Time.now + @last_message_time = now @status_handlers[status].each { |f| f.call(*data) } unless @status_handlers[status].nil? rescue StandardError => e # errors are not reported from WEBrick but the server should fail early @@ -317,7 +314,7 @@ def start_ghostferry(resuming_state = nil) @logger.debug("ghostferry test binary exitted: #{@exit_status}") if @exit_status.exitstatus != 0 - raise GhostferryExitFailure, "ghostferry test binary returned non-zero status: #{@exit_status}" + raise ExitError, "ghostferry test binary returned non-zero status: #{@exit_status}" end end end @@ -328,9 +325,9 @@ def start_server_watchdog # HTTP server to free up the port. @server_watchdog_thread = Thread.new do while @subprocess_thread.alive? do - if Time.now - @last_message_time > @message_timeout + if (now - @last_message_time) > @message_timeout @server.shutdown - raise "ghostferry did not report to the integration test server for the last #{@message_timeout}s" + raise TimeoutError, "ghostferry did not report to the integration test server for the last #{@message_timeout}s" end sleep 1 @@ -387,7 +384,7 @@ def kill begin @subprocess_thread.join if @subprocess_thread - rescue GhostferryExitFailure + rescue ExitError # ignore end end @@ -405,5 +402,9 @@ def with_env(key, value) ensure ENV[key] = previous_value end + + def now + ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) + end end end diff --git a/test/integration/interrupt_resume_test.rb b/test/integration/interrupt_resume_test.rb index 4af99294..0da17264 100644 --- a/test/integration/interrupt_resume_test.rb +++ b/test/integration/interrupt_resume_test.rb @@ -50,18 +50,10 @@ def test_interrupt_and_resume_without_last_known_schema_cache def test_interrupt_resume_with_writes_to_source # Start a ghostferry run expecting it to be interrupted. datawriter = new_source_datawriter - ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) + ghostferry = new_ghostferry_with_interrupt_after_row_copy(MINIMAL_GHOSTFERRY, after_batches_written: 2) start_datawriter_with_ghostferry(datawriter, ghostferry) - batches_written = 0 - ghostferry.on_status(Ghostferry::Status::AFTER_ROW_COPY) do - batches_written += 1 - if batches_written >= 2 - ghostferry.send_signal("TERM") - end - end - dumped_state = ghostferry.run_expecting_interrupt assert_basic_fields_exist_in_dumped_state(dumped_state) @@ -465,10 +457,12 @@ def test_interrupt_resume_idempotence_with_multiple_interrupts_and_writes_to_sou assert_basic_fields_exist_in_dumped_state(dumped_state) ghostferry = new_ghostferry_with_interrupt_after_row_copy(MINIMAL_GHOSTFERRY, after_batches_written: 2) + ghostferry.run_expecting_interrupt(dumped_state) ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) stop_datawriter_during_cutover(datawriter, ghostferry) + ghostferry.run_with_logs(dumped_state) assert_test_table_is_identical diff --git a/test/main.rb b/test/main.rb index 7c28ab28..eaad9d0c 100644 --- a/test/main.rb +++ b/test/main.rb @@ -15,10 +15,12 @@ require "minitest" require "minitest/reporters" +require "minitest/retry" require "minitest/fail_fast" require "minitest/hooks/test" Minitest::Reporters.use! Minitest::Reporters::SpecReporter.new +Minitest::Retry.use!(exceptions_to_retry: [GhostferryHelper::Ghostferry::TimeoutError]) test_files.each do |f| require f diff --git a/test/test_helper.rb b/test/test_helper.rb index 59427bb2..9b9f8b4f 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -55,11 +55,12 @@ def new_ghostferry(filepath, config: {}) end def new_ghostferry_with_interrupt_after_row_copy(filepath, config: {}, after_batches_written: 0) - g = new_ghostferry(filepath, config) + g = new_ghostferry(filepath, config: config) batches_written = 0 g.on_status(Ghostferry::Status::AFTER_ROW_COPY) do batches_written += 1 + if batches_written >= after_batches_written g.send_signal("TERM") end