diff --git a/lib/hosting/gcp_apis.rb b/lib/hosting/gcp_apis.rb index e227ac6a4..ae177500d 100644 --- a/lib/hosting/gcp_apis.rb +++ b/lib/hosting/gcp_apis.rb @@ -149,9 +149,8 @@ def get_vm(vm_name, zone) JSON.parse(response.body) end - def create_static_ipv4(vm_name, region) + def create_static_ipv4(address_name, region) connection = Excon.new(@host[:connection_string], headers: @host[:headers]) - address_name = "#{vm_name}-addr" body = { name: address_name, networkTier: "PREMIUM", @@ -161,9 +160,8 @@ def create_static_ipv4(vm_name, region) Hosting::GcpApis.check_errors(response) end - def get_static_ipv4(vm_name, region) + def get_static_ipv4(address_name, region) connection = Excon.new(@host[:connection_string], headers: @host[:headers]) - address_name = "#{vm_name}-addr" response = connection.get(path: "/compute/v1/projects/#{@project}/regions/#{region}/addresses/#{address_name}", expects: 200) JSON.parse(response.body) end @@ -194,9 +192,8 @@ def assign_static_ipv4(vm_name, addr, zone) wait_for_operation(zone, data["id"]) end - def release_ipv4(vm_name, region) + def release_ipv4(address_name, region) connection = Excon.new(@host[:connection_string], headers: @host[:headers]) - address_name = "#{vm_name}-addr" connection.delete(path: "/compute/v1/projects/#{@project}/regions/#{region}/addresses/#{address_name}", expects: [200, 404]) end diff --git a/migrate/20240618_gcp_vm_address_name.rb b/migrate/20240618_gcp_vm_address_name.rb new file mode 100644 index 000000000..7617144ba --- /dev/null +++ b/migrate/20240618_gcp_vm_address_name.rb @@ -0,0 +1,10 @@ +# frozen_string_literal: true + +Sequel.migration do + change do + alter_table(:gcp_vm) do + add_column :address_name, :text, collate: '"C"' + end + run "UPDATE gcp_vm SET address_name=name || '-addr'" + end +end diff --git a/model/gcp_vm.rb b/model/gcp_vm.rb index dc18c7553..df53a3e98 100644 --- a/model/gcp_vm.rb +++ b/model/gcp_vm.rb @@ -66,4 +66,30 @@ def is_stopped? def self.redacted_columns super + [:public_key] end + + def swap_ip(vm) + # swap ips in gcp + gcp_client = Hosting::GcpApis.new + zone1 = "#{location}-a" + zone2 = "#{vm.location}-a" + gcp_client.delete_ephermal_ipv4(name, zone1) + gcp_client.delete_ephermal_ipv4(vm.name, zone2) + gcp_client.assign_static_ipv4(name, vm.sshable.host, zone1) + gcp_client.assign_static_ipv4(vm.name, sshable.host, zone2) + + # update sshable hosts + current_host = sshable.host + new_host = vm.sshable.host + sshable.update(host: "temp_#{name}") + vm.sshable.update(host: current_host) + sshable.update(host: new_host) + current_address_name = address_name + + # update address names + update(address_name: vm.address_name) + vm.update(address_name: current_address_name) + + sshable.invalidate_cache_entry + vm.sshable.invalidate_cache_entry + end end diff --git a/model/lantern/lantern_doctor.rb b/model/lantern/lantern_doctor.rb index 278057542..c5e664859 100644 --- a/model/lantern/lantern_doctor.rb +++ b/model/lantern/lantern_doctor.rb @@ -25,7 +25,7 @@ def has_system_query?(queries, query) def should_run? return false unless resource - resource.representative_server.display_state == "running" && resource.representative_server.strand.label == "wait" + resource.display_state == "running" && resource.representative_server.strand.label == "wait" end def sync_system_queries diff --git a/model/lantern/lantern_resource.rb b/model/lantern/lantern_resource.rb index fda3d96b4..a9eae65d5 100644 --- a/model/lantern/lantern_resource.rb +++ b/model/lantern/lantern_resource.rb @@ -51,6 +51,7 @@ def label end def display_state + return "failover" if servers.find { _1.display_state == "failover" } super || representative_server&.display_state || "unavailable" end diff --git a/model/lantern/lantern_server.rb b/model/lantern/lantern_server.rb index 2e498d15d..259d5ccf5 100644 --- a/model/lantern/lantern_server.rb +++ b/model/lantern/lantern_server.rb @@ -66,6 +66,7 @@ def display_state return "updating" if vm.display_state == "updating" || strand.label.include?("update") || strand.label == "init_sql" return "unavailable" if strand.label == "wait_db_available" return "running" if strand.label == "wait" + return "failover" if ["take_over", "wait_swap_ip", "promote_server"].include?(strand.label) "creating" end @@ -114,8 +115,8 @@ def configure_hash db_user: resource.db_user || "", db_user_password: resource.db_user_password || "", postgres_password: resource.superuser_password || "", - # master_host: lantern_server.master_host, - # master_port: lantern_server.master_port, + master_host: resource.representative_server.hostname, + master_port: 5432, prom_password: Config.prom_password, gcp_creds_gcr_b64: Config.gcp_creds_gcr_b64, gcp_creds_coredumps_b64: Config.gcp_creds_coredumps_b64, @@ -131,6 +132,15 @@ def configure_hash }) end + def lazy_change_replication_mode(replication_mode) + update(timeline_access: (replication_mode == "master") ? "push" : "fetch", representative_at: (replication_mode == "master") ? Time.new : nil) + vm.sshable.cmd("sudo lantern/bin/lazy_update_env", stdin: JSON.generate([ + ["POSTGRESQL_REPLICATION_MODE", replication_mode], + ["INSTANCE_TYPE", (replication_mode == "master") ? "writer" : "reader"], + ["POSTGRESQL_RECOVER_FROM_BACKUP", ""] + ])) + end + def update_walg_creds walg_config = timeline.generate_walg_config vm.sshable.cmd("sudo lantern/bin/update_env", stdin: JSON.generate([ diff --git a/prog/gcp_vm/nexus.rb b/prog/gcp_vm/nexus.rb index 8e9017925..ceab25701 100644 --- a/prog/gcp_vm/nexus.rb +++ b/prog/gcp_vm/nexus.rb @@ -66,7 +66,7 @@ def host label def wait_ipv4 gcp_client = Hosting::GcpApis.new - addr_info = gcp_client.get_static_ipv4(gcp_vm.name, gcp_vm.location) + addr_info = gcp_client.get_static_ipv4(gcp_vm.address_name, gcp_vm.location) if addr_info["status"] == "RESERVED" gcp_client.delete_ephermal_ipv4(gcp_vm.name, "#{gcp_vm.location}-a") gcp_client.assign_static_ipv4(gcp_vm.name, addr_info["address"], "#{gcp_vm.location}-a") @@ -82,7 +82,9 @@ def host gcp_client = Hosting::GcpApis.new vm = gcp_client.get_vm(gcp_vm.name, "#{gcp_vm.location}-a") if vm["status"] == "RUNNING" - gcp_client.create_static_ipv4(gcp_vm.name, gcp_vm.location) + address_name = "#{gcp_vm.name}-addr" + gcp_client.create_static_ipv4(address_name, gcp_vm.location) + gcp_vm.update(address_name: address_name) register_deadline(:wait, 5 * 60) hop_wait_ipv4 else @@ -229,7 +231,7 @@ def host gcp_client = Hosting::GcpApis.new gcp_client.delete_vm(gcp_vm.name, "#{gcp_vm.location}-a") if gcp_vm.has_static_ipv4 - gcp_client.release_ipv4(gcp_vm.name, gcp_vm.location) + gcp_client.release_ipv4(gcp_vm.address_name, gcp_vm.location) end strand.children.each { _1.destroy } gcp_vm.projects.map { gcp_vm.dissociate_with_project(_1) } diff --git a/prog/lantern/lantern_resource_nexus.rb b/prog/lantern/lantern_resource_nexus.rb index b1d1afec1..c4df8b082 100644 --- a/prog/lantern/lantern_resource_nexus.rb +++ b/prog/lantern/lantern_resource_nexus.rb @@ -102,7 +102,16 @@ def self.assemble(project_id:, location:, name:, target_vm_size:, target_storage ) lantern_resource.required_standby_count.times do - Prog::Lantern::LanternServerNexus.assemble(resource_id: lantern_resource.id, timeline_id: timeline_id, timeline_access: "fetch") + Prog::Lantern::LanternServerNexus.assemble( + resource_id: lantern_resource.id, + timeline_id: timeline_id, + timeline_access: "fetch", + lantern_version: lantern_version, + extras_version: extras_version, + minor_version: minor_version, + target_vm_size: target_vm_size, + target_storage_size_gib: target_storage_size_gib + ) end Strand.create(prog: "Lantern::LanternResourceNexus", label: "start") { _1.id = lantern_resource.id } @@ -152,7 +161,16 @@ def before_run label def wait # Create missing standbys (lantern_resource.required_standby_count + 1 - lantern_resource.servers.count).times do - Prog::Lantern::LanternServerNexus.assemble(resource_id: lantern_resource.id, timeline_id: lantern_resource.timeline.id, timeline_access: "fetch") + Prog::Lantern::LanternServerNexus.assemble( + resource_id: lantern_resource.id, + lantern_version: lantern_resource.representative_server.lantern_version, + extras_version: lantern_resource.representative_server.extras_version, + minor_version: lantern_resource.representative_server.minor_version, + target_vm_size: lantern_resource.representative_server.target_vm_size, + target_storage_size_gib: lantern_resource.representative_server.target_storage_size_gib, + timeline_id: lantern_resource.timeline.id, + timeline_access: "fetch" + ) end if lantern_resource.display_state == "failed" diff --git a/prog/lantern/lantern_server_nexus.rb b/prog/lantern/lantern_server_nexus.rb index a22f2a637..05282ebb8 100644 --- a/prog/lantern/lantern_server_nexus.rb +++ b/prog/lantern/lantern_server_nexus.rb @@ -164,7 +164,7 @@ def before_run end label def wait_catch_up - query = "SELECT pg_current_wal_lsn() - replay_lsn FROM pg_stat_replication WHERE application_name = '#{lantern_server.ubid}'" + query = "SELECT pg_current_wal_lsn() - replay_lsn FROM pg_stat_replication WHERE application_name = 'walreceiver'" lag = lantern_server.resource.representative_server.run_query(query).chomp nap 30 if lag.empty? || lag.to_i > 80 * 1024 * 1024 # 80 MB or ~5 WAL files @@ -175,7 +175,7 @@ def before_run end label def wait_synchronization - query = "SELECT sync_state FROM pg_stat_replication WHERE application_name = '#{lantern_server.ubid}'" + query = "SELECT sync_state FROM pg_stat_replication WHERE application_name = 'walreceiver'" sync_state = lantern_server.resource.representative_server.run_query(query).chomp hop_wait if ["quorum", "sync"].include?(sync_state) @@ -442,9 +442,52 @@ def destroy_domain hop_update_vm_size end + when_take_over_set? do + hop_take_over + end + nap 30 end + label def promote_server + current_master = lantern_server.resource.representative_server + current_master_domain = current_master.domain + new_master_domain = lantern_server.domain + + lantern_server.update(domain: current_master_domain) + current_master.update(domain: new_master_domain) + + lantern_server.run_query("SELECT pg_promote(true, 120);") + current_master.lazy_change_replication_mode("slave") + lantern_server.lazy_change_replication_mode("master") + + hop_wait + end + + label def wait_swap_ip + # wait until ip change will propogate + begin + is_in_recovery = lantern_server.run_query("SELECT pg_is_in_recovery()").chomp == "t" + nap 5 if !is_in_recovery + rescue + nap 5 + end + + hop_promote_server + end + + label def take_over + decr_take_over + if !lantern_server.standby? + hop_wait + end + + lantern_server.vm.swap_ip(lantern_server.resource.representative_server.vm) + + register_deadline(:promote_server, 5 * 60) + hop_wait_swap_ip + end + label def unavailable # TODO # if postgres_server.primary? && (standby = postgres_server.failover_target) diff --git a/rhizome/lantern/bin/lazy_update_env b/rhizome/lantern/bin/lazy_update_env new file mode 100755 index 000000000..ab23f3992 --- /dev/null +++ b/rhizome/lantern/bin/lazy_update_env @@ -0,0 +1,11 @@ +#!/bin/env ruby +# frozen_string_literal: true + +require "json" +require "yaml" +require_relative "../../common/lib/util" +require_relative "../lib/common" + +env_arr = JSON.parse($stdin.read) + +append_env(env_arr) diff --git a/spec/lib/hosting/gcp_apis_spec.rb b/spec/lib/hosting/gcp_apis_spec.rb index b4b65f795..dd54f8cec 100644 --- a/spec/lib/hosting/gcp_apis_spec.rb +++ b/spec/lib/hosting/gcp_apis_spec.rb @@ -96,7 +96,7 @@ stub_request(:post, "https://compute.googleapis.com/compute/v1/projects/test-project/regions/us-central1/addresses").to_return(status: 200, body: JSON.dump({}), headers: {"Content-Type" => "application/json"}) stub_request(:get, "https://compute.googleapis.com/compute/v1/projects/test-project/regions/us-central1/addresses/dummy-vm-addr").to_return(status: 200, body: JSON.dump({status: "CREATING", address: "1.1.1.1"}), headers: {"Content-Type" => "application/json"}) api = described_class.new - expect(api.get_static_ipv4("dummy-vm", "us-central1")).to eq({"status" => "CREATING", "address" => "1.1.1.1"}) + expect(api.get_static_ipv4("dummy-vm-addr", "us-central1")).to eq({"status" => "CREATING", "address" => "1.1.1.1"}) end end @@ -155,7 +155,7 @@ api = described_class.new stub_request(:post, "https://oauth2.googleapis.com/token").to_return(status: 200, body: JSON.dump({}), headers: {"Content-Type" => "application/json"}) stub_request(:delete, "https://compute.googleapis.com/compute/v1/projects/test-project/regions/us-central1/addresses/dummy-vm-addr") - expect { api.release_ipv4("dummy-vm", "us-central1") }.not_to raise_error + expect { api.release_ipv4("dummy-vm-addr", "us-central1") }.not_to raise_error end end diff --git a/spec/model/gcp_vm_spec.rb b/spec/model/gcp_vm_spec.rb index 610dd3e6e..3acfd0584 100644 --- a/spec/model/gcp_vm_spec.rb +++ b/spec/model/gcp_vm_spec.rb @@ -6,7 +6,8 @@ subject(:gcp_vm) { described_class.new( name: "vm1", - location: "us-central1" + location: "us-central1", + address_name: "vm1-addr" ) { _1.id = "c068cac7-ed45-82db-bf38-a003582b36ee" } } @@ -70,4 +71,25 @@ expect(gcp_vm.is_stopped?).to be(true) end end + + describe "#swap_ip" do + it "swap server ips" do + api = instance_double(Hosting::GcpApis) + expect(api).to receive(:delete_ephermal_ipv4).with("vm1", "us-central1-a") + expect(api).to receive(:delete_ephermal_ipv4).with("vm2", "us-central1-a") + expect(api).to receive(:assign_static_ipv4).with("vm1", "ip2", "us-central1-a") + expect(api).to receive(:assign_static_ipv4).with("vm2", "ip1", "us-central1-a") + expect(Hosting::GcpApis).to receive(:new).and_return(api) + vm2 = instance_double(described_class, name: "vm2", address_name: "vm2-addr", location: "us-central1", sshable: instance_double(Sshable, host: "ip2")) + expect(gcp_vm).to receive(:sshable).and_return(instance_double(Sshable, host: "ip1")).at_least(:once) + expect(gcp_vm.sshable).to receive(:invalidate_cache_entry) + expect(vm2.sshable).to receive(:invalidate_cache_entry) + expect(gcp_vm.sshable).to receive(:update).with(host: "temp_vm1") + expect(gcp_vm.sshable).to receive(:update).with(host: "ip2") + expect(vm2.sshable).to receive(:update).with(host: "ip1") + expect(gcp_vm).to receive(:update).with(address_name: "vm2-addr") + expect(vm2).to receive(:update).with(address_name: "vm1-addr") + expect { gcp_vm.swap_ip(vm2) }.not_to raise_error + end + end end diff --git a/spec/model/lantern/lantern_doctor_spec.rb b/spec/model/lantern/lantern_doctor_spec.rb index 6979438a5..0ce7f0586 100644 --- a/spec/model/lantern/lantern_doctor_spec.rb +++ b/spec/model/lantern/lantern_doctor_spec.rb @@ -53,17 +53,17 @@ describe "#should_run" do it "returns true" do - expect(lantern_doctor).to receive(:resource).and_return(instance_double(LanternResource, representative_server: instance_double(LanternServer, display_state: "running", strand: instance_double(Strand, label: "wait")))).at_least(:once) + expect(lantern_doctor).to receive(:resource).and_return(instance_double(LanternResource, display_state: "running", representative_server: instance_double(LanternServer, strand: instance_double(Strand, label: "wait")))).at_least(:once) expect(lantern_doctor.should_run?).to be(true) end it "returns false if not running" do - expect(lantern_doctor).to receive(:resource).and_return(instance_double(LanternResource, representative_server: instance_double(LanternServer, display_state: "stopped", strand: instance_double(Strand, label: "start")))).at_least(:once) + expect(lantern_doctor).to receive(:resource).and_return(instance_double(LanternResource, display_state: "failover", representative_server: instance_double(LanternServer, strand: instance_double(Strand, label: "start")))).at_least(:once) expect(lantern_doctor.should_run?).to be(false) end it "returns false" do - expect(lantern_doctor).to receive(:resource).and_return(instance_double(LanternResource, representative_server: instance_double(LanternServer, display_state: "running", strand: instance_double(Strand, label: "start")))).at_least(:once) + expect(lantern_doctor).to receive(:resource).and_return(instance_double(LanternResource, display_state: "running", representative_server: instance_double(LanternServer, display_state: "running", strand: instance_double(Strand, label: "start")))).at_least(:once) expect(lantern_doctor.should_run?).to be(false) end diff --git a/spec/model/lantern/lantern_resource_spec.rb b/spec/model/lantern/lantern_resource_spec.rb index 32ac4e682..496ec839f 100644 --- a/spec/model/lantern/lantern_resource_spec.rb +++ b/spec/model/lantern/lantern_resource_spec.rb @@ -29,6 +29,16 @@ expect(lantern_resource.display_state).to eq("running") end + it "returns failed as display state" do + expect(lantern_resource).to receive(:display_state).and_return("failed") + expect(lantern_resource.display_state).to eq("failed") + end + + it "returns failover" do + expect(lantern_resource).to receive(:servers).and_return([instance_double(LanternServer, display_state: "running", strand: instance_double(Strand, label: "wait")), instance_double(LanternServer, display_state: "failover", strand: instance_double(Strand, label: "take_over"))]) + expect(lantern_resource.display_state).to eq("failover") + end + it "returns unavailable as display state if no representative_server" do expect(lantern_resource).to receive(:representative_server).and_return(nil) expect(lantern_resource.display_state).to eq("unavailable") diff --git a/spec/model/lantern/lantern_server_spec.rb b/spec/model/lantern/lantern_server_spec.rb index 4d74390ff..8bed14d8d 100644 --- a/spec/model/lantern/lantern_server_spec.rb +++ b/spec/model/lantern/lantern_server_spec.rb @@ -15,7 +15,7 @@ let(:vm) { instance_double( GcpVm, - sshable: instance_double(Sshable), + sshable: instance_double(Sshable, host: "127.0.0.1"), mem_gib: 8 ) } @@ -115,6 +115,24 @@ expect(lantern_server).to receive(:strand).and_return(instance_double(Strand, label: "unknown")).at_least(:once) expect(lantern_server.display_state).to eq("failed") end + + it "shows failover when label is take_over" do + expect(lantern_server.vm).to receive(:display_state).and_return("running").at_least(:once) + expect(lantern_server).to receive(:strand).and_return(instance_double(Strand, label: "take_over")).at_least(:once) + expect(lantern_server.display_state).to eq("failover") + end + + it "shows failover when label is wait_swap_ip" do + expect(lantern_server.vm).to receive(:display_state).and_return("running").at_least(:once) + expect(lantern_server).to receive(:strand).and_return(instance_double(Strand, label: "wait_swap_ip")).at_least(:once) + expect(lantern_server.display_state).to eq("failover") + end + + it "shows failover when label is promote_server" do + expect(lantern_server.vm).to receive(:display_state).and_return("running").at_least(:once) + expect(lantern_server).to receive(:strand).and_return(instance_double(Strand, label: "promote_server")).at_least(:once) + expect(lantern_server.display_state).to eq("failover") + end end it "returns name from ubid" do @@ -248,6 +266,7 @@ superuser_password: "pwd1234", gcp_creds_b64: "test-creds", recovery_target_lsn: nil, + representative_server: lantern_server, restore_target: nil) expect(Config).to receive(:prom_password).and_return("pwd123").at_least(:once) expect(Config).to receive(:gcp_creds_gcr_b64).and_return("test-creds").at_least(:once) @@ -278,6 +297,8 @@ db_user: resource.db_user || "", db_user_password: resource.db_user_password || "", postgres_password: resource.superuser_password || "", + master_host: resource.representative_server.hostname, + master_port: 5432, prom_password: Config.prom_password, gcp_creds_gcr_b64: Config.gcp_creds_gcr_b64, gcp_creds_coredumps_b64: Config.gcp_creds_coredumps_b64, @@ -313,6 +334,7 @@ superuser_password: "pwd1234", gcp_creds_b64: "test-creds", recovery_target_lsn: nil, + representative_server: lantern_server, restore_target: Time.now) expect(Config).to receive(:prom_password).and_return("pwd123").at_least(:once) expect(Config).to receive(:gcp_creds_gcr_b64).and_return("test-creds").at_least(:once) @@ -344,6 +366,8 @@ db_user: resource.db_user || "", db_user_password: resource.db_user_password || "", postgres_password: resource.superuser_password || "", + master_host: resource.representative_server.hostname, + master_port: 5432, prom_password: Config.prom_password, gcp_creds_gcr_b64: Config.gcp_creds_gcr_b64, gcp_creds_coredumps_b64: Config.gcp_creds_coredumps_b64, @@ -378,6 +402,7 @@ superuser_password: "pwd1234", gcp_creds_b64: "test-creds", recovery_target_lsn: "16/B374D848", + representative_server: lantern_server, restore_target: nil) expect(Config).to receive(:prom_password).and_return("pwd123").at_least(:once) expect(Config).to receive(:gcp_creds_gcr_b64).and_return("test-creds").at_least(:once) @@ -408,6 +433,8 @@ db_user: resource.db_user || "", db_user_password: resource.db_user_password || "", postgres_password: resource.superuser_password || "", + master_host: resource.representative_server.hostname, + master_port: 5432, prom_password: Config.prom_password, gcp_creds_gcr_b64: Config.gcp_creds_gcr_b64, gcp_creds_coredumps_b64: Config.gcp_creds_coredumps_b64, @@ -442,6 +469,7 @@ superuser_password: "pwd1234", gcp_creds_b64: "test-creds", recovery_target_lsn: "16/B374D848", + representative_server: lantern_server, restore_target: Time.now) expect(Config).to receive(:prom_password).and_return("pwd123").at_least(:once) expect(Config).to receive(:gcp_creds_gcr_b64).and_return("test-creds").at_least(:once) @@ -473,6 +501,8 @@ db_user: resource.db_user || "", db_user_password: resource.db_user_password || "", postgres_password: resource.superuser_password || "", + master_host: resource.representative_server.hostname, + master_port: 5432, prom_password: Config.prom_password, gcp_creds_gcr_b64: Config.gcp_creds_gcr_b64, gcp_creds_coredumps_b64: Config.gcp_creds_coredumps_b64, @@ -519,7 +549,7 @@ describe "Lsn monitor" do it "fails to initiate a new health monitor session" do - expect(lantern_server).to receive(:strand).and_return(instance_double(Strand, label: "setup domain")).at_least(:once) + expect(lantern_server).to receive(:strand).and_return(instance_double(Strand, label: "setup domain")).at_least(:once).at_least(:once) expect { lantern_server.init_health_monitor_session }.to raise_error "server is not ready to initialize session" end @@ -539,8 +569,8 @@ } expect(lantern_server).to receive(:destroy_set?).and_return(false) - expect(lantern_server).to receive(:display_state).and_return("running") - expect(lantern_server).to receive(:strand).and_return(instance_double(Strand, label: "wait")) + expect(lantern_server).to receive(:display_state).and_return("running").at_least(:once) + expect(lantern_server).to receive(:strand).and_return(instance_double(Strand, label: "wait")).at_least(:once) expect(lantern_server).not_to receive(:incr_checkup) lantern_server.check_pulse(session: session, previous_pulse: pulse) end @@ -555,9 +585,9 @@ reading_chg: Time.now - 30 } - expect(lantern_server).to receive(:display_state).and_return("running") + expect(lantern_server).to receive(:display_state).and_return("running").at_least(:once) expect(lantern_server).to receive(:destroy_set?).and_return(false) - expect(lantern_server).to receive(:strand).and_return(instance_double(Strand, label: "wait")) + expect(lantern_server).to receive(:strand).and_return(instance_double(Strand, label: "wait")).at_least(:once) expect(lantern_server).to receive(:primary?).and_return(true) expect(lantern_server).not_to receive(:incr_checkup) lantern_server.check_pulse(session: session, previous_pulse: pulse) @@ -573,9 +603,9 @@ reading_chg: Time.now - 30 } - expect(lantern_server).to receive(:display_state).and_return("running") + expect(lantern_server).to receive(:display_state).and_return("running").at_least(:once) expect(lantern_server).to receive(:destroy_set?).and_return(false) - expect(lantern_server).to receive(:strand).and_return(instance_double(Strand, label: "wait")) + expect(lantern_server).to receive(:strand).and_return(instance_double(Strand, label: "wait")).at_least(:once) expect(session[:db_connection]).to receive(:[]).and_raise(Sequel::DatabaseConnectionError) expect(lantern_server).to receive(:incr_checkup) lantern_server.check_pulse(session: session, previous_pulse: pulse) @@ -677,4 +707,28 @@ expect(described_class.get_vm_image("0.2.7", "0.1.5", "1")).to eq("custom-image") end end + + describe "#lazy_change_replication_mode" do + it "changes to master" do + time = Time.new + expect(lantern_server.vm.sshable).to receive(:cmd).with("sudo lantern/bin/lazy_update_env", stdin: JSON.generate([ + ["POSTGRESQL_REPLICATION_MODE", "master"], + ["INSTANCE_TYPE", "writer"], + ["POSTGRESQL_RECOVER_FROM_BACKUP", ""] + ])) + expect(Time).to receive(:new).and_return(time) + expect(lantern_server).to receive(:update).with(timeline_access: "push", representative_at: time) + lantern_server.lazy_change_replication_mode("master") + end + + it "changes to slave" do + expect(lantern_server.vm.sshable).to receive(:cmd).with("sudo lantern/bin/lazy_update_env", stdin: JSON.generate([ + ["POSTGRESQL_REPLICATION_MODE", "slave"], + ["INSTANCE_TYPE", "reader"], + ["POSTGRESQL_RECOVER_FROM_BACKUP", ""] + ])) + expect(lantern_server).to receive(:update).with(timeline_access: "fetch", representative_at: nil) + lantern_server.lazy_change_replication_mode("slave") + end + end end diff --git a/spec/prog/gcp_vm/nexus_spec.rb b/spec/prog/gcp_vm/nexus_spec.rb index aeae878d5..a401cb039 100644 --- a/spec/prog/gcp_vm/nexus_spec.rb +++ b/spec/prog/gcp_vm/nexus_spec.rb @@ -92,15 +92,17 @@ gcp_api = instance_double(Hosting::GcpApis) expect(Hosting::GcpApis).to receive(:new).and_return(gcp_api) expect(gcp_api).to receive(:get_vm).with("dummy-vm", "us-central1-a").and_return({"status" => "RUNNING"}) - expect(gcp_api).to receive(:create_static_ipv4).with("dummy-vm", "us-central1").and_return({}) + expect(gcp_api).to receive(:create_static_ipv4).with("dummy-vm-addr", "us-central1").and_return({}) expect(gcp_vm).to receive(:strand).and_return(instance_double(Strand, prog: "GcpVm", stack: [{}])).at_least(:once) + expect(gcp_vm).to receive(:update).with(address_name: "dummy-vm-addr") expect { nx.wait_create_vm }.to hop("wait_ipv4") end it "naps if ip4 is not yet reserved" do gcp_api = instance_double(Hosting::GcpApis) expect(Hosting::GcpApis).to receive(:new).and_return(gcp_api) - expect(gcp_api).to receive(:get_static_ipv4).with("dummy-vm", "us-central1").and_return({"status" => "CREATING", "address" => "1.1.1.1"}) + expect(gcp_vm).to receive(:address_name).and_return("dummy-vm-addr") + expect(gcp_api).to receive(:get_static_ipv4).with("dummy-vm-addr", "us-central1").and_return({"status" => "CREATING", "address" => "1.1.1.1"}) expect { nx.wait_ipv4 }.to nap(10) end @@ -109,7 +111,8 @@ expect(gcp_vm).to receive(:sshable).and_return(sshable) gcp_api = instance_double(Hosting::GcpApis) expect(Hosting::GcpApis).to receive(:new).and_return(gcp_api) - expect(gcp_api).to receive(:get_static_ipv4).with("dummy-vm", "us-central1").and_return({"status" => "RESERVED", "address" => "1.1.1.1"}) + expect(gcp_vm).to receive(:address_name).and_return("dummy-vm-addr") + expect(gcp_api).to receive(:get_static_ipv4).with("dummy-vm-addr", "us-central1").and_return({"status" => "RESERVED", "address" => "1.1.1.1"}) expect(gcp_api).to receive(:delete_ephermal_ipv4).with("dummy-vm", "us-central1-a") expect(gcp_api).to receive(:assign_static_ipv4).with("dummy-vm", "1.1.1.1", "us-central1-a") expect(gcp_vm).to receive(:update).with({has_static_ipv4: true}) @@ -174,7 +177,8 @@ gcp_api = instance_double(Hosting::GcpApis) expect(Hosting::GcpApis).to receive(:new).and_return(gcp_api) expect(gcp_api).to receive(:delete_vm).with("dummy-vm", "us-central1-a") - expect(gcp_api).to receive(:release_ipv4).with("dummy-vm", "us-central1") + expect(gcp_vm).to receive(:address_name).and_return("dummy-vm-addr") + expect(gcp_api).to receive(:release_ipv4).with("dummy-vm-addr", "us-central1") expect { nx.destroy }.to exit({"msg" => "gcp vm deleted"}) end end diff --git a/spec/prog/lantern/lantern_resource_nexus_spec.rb b/spec/prog/lantern/lantern_resource_nexus_spec.rb index d122c40e6..e1779f966 100644 --- a/spec/prog/lantern/lantern_resource_nexus_spec.rb +++ b/spec/prog/lantern/lantern_resource_nexus_spec.rb @@ -21,6 +21,12 @@ )], representative_server: instance_double( LanternServer, + lantern_version: Config.lantern_default_version, + extras_version: Config.lantern_extras_default_version, + minor_version: Config.lantern_minor_default_version, + target_vm_size: "n1-standard-2", + target_storage_size_gib: 64, + vm: instance_double( GcpVm, id: "104b0033-b3f6-8214-ae27-0cd3cef18ce5" diff --git a/spec/prog/lantern/lantern_server_nexus_spec.rb b/spec/prog/lantern/lantern_server_nexus_spec.rb index 9595237df..5c090d36a 100644 --- a/spec/prog/lantern/lantern_server_nexus_spec.rb +++ b/spec/prog/lantern/lantern_server_nexus_spec.rb @@ -703,6 +703,11 @@ expect { nx.wait }.to hop("unavailable") end + it "hops to take_over" do + nx.incr_take_over + expect { nx.wait }.to hop("take_over") + end + it "decrements checkup" do nx.incr_checkup expect(nx).to receive(:available?).and_return(true) @@ -914,4 +919,51 @@ expect { nx.wait_timeline_available }.to hop("wait_db_available") end end + + describe "#take_over" do + it "returns if primary" do + expect(lantern_server).to receive(:standby?).and_return(false) + expect(lantern_server).not_to receive(:run_query) + expect { nx.take_over }.to hop("wait") + end + + it "swap ips" do + expect(lantern_server).to receive(:standby?).and_return(true) + + current_master = instance_double(LanternServer, domain: "db1.lantern.dev", vm: instance_double(GcpVm, sshable: instance_double(Sshable, host: "127.0.0.1"), name: "old-master", location: "us-east1", address_name: "old-addr")) + expect(lantern_server.resource).to receive(:representative_server).and_return(current_master).at_least(:once) + + expect(lantern_server.vm).to receive(:swap_ip).with(current_master.vm) + + expect { nx.take_over }.to hop("wait_swap_ip") + end + + it "waits until vm available" do + expect(lantern_server).to receive(:run_query).with("SELECT pg_is_in_recovery()").and_raise "test" + expect { nx.wait_swap_ip }.to nap 5 + end + + it "waits until ip swap done" do + expect(lantern_server).to receive(:run_query).with("SELECT pg_is_in_recovery()").and_return("f") + expect { nx.wait_swap_ip }.to nap 5 + end + + it "hops to promote" do + expect(lantern_server).to receive(:run_query).with("SELECT pg_is_in_recovery()").and_return("t") + expect { nx.wait_swap_ip }.to hop("promote_server") + end + + it "promotes server" do + current_master = instance_double(LanternServer, domain: "db1.lantern.dev", vm: instance_double(GcpVm, sshable: instance_double(Sshable, host: "127.0.0.1"), name: "old-master", location: "us-east1", address_name: "old-addr")) + expect(lantern_server.resource).to receive(:representative_server).and_return(current_master).at_least(:once) + + expect(current_master).to receive(:update).with(domain: lantern_server.domain).at_least(:once) + expect(lantern_server).to receive(:update).with(domain: current_master.domain).at_least(:once) + + expect(lantern_server).to receive(:run_query).with("SELECT pg_promote(true, 120);") + expect(current_master).to receive(:lazy_change_replication_mode).with("slave") + expect(lantern_server).to receive(:lazy_change_replication_mode).with("master") + expect { nx.promote_server }.to hop("wait") + end + end end diff --git a/spec/serializers/api/lantern_spec.rb b/spec/serializers/api/lantern_spec.rb index cb7d605ea..c6a52860e 100644 --- a/spec/serializers/api/lantern_spec.rb +++ b/spec/serializers/api/lantern_spec.rb @@ -75,7 +75,8 @@ display_state: "running", instance_type: "writer", hostname: "db.lantern.dev", - connection_string: "postgres://postgres:test123@db.lantern.dev:6432") + connection_string: "postgres://postgres:test123@db.lantern.dev:6432", + strand: instance_double(Strand, label: "wait")) expect(lantern).to receive(:representative_server).and_return(leader).at_least(:once) expect(lantern).to receive(:servers).and_return([leader]).at_least(:once) data = described_class.new(:detailed).serialize(lantern) diff --git a/spec/serializers/web/lantern_spec.rb b/spec/serializers/web/lantern_spec.rb index 07df261ae..2ad3f4d7a 100644 --- a/spec/serializers/web/lantern_spec.rb +++ b/spec/serializers/web/lantern_spec.rb @@ -77,6 +77,7 @@ instance_type: "writer", hostname: "db.lantern.dev", primary?: true, + strand: instance_double(Strand, label: "wait"), connection_string: "postgres://postgres:test123@db.lantern.dev:6432") expect(lantern).to receive(:representative_server).and_return(leader).at_least(:once) expect(lantern).to receive(:servers).and_return([leader]).at_least(:once)