Skip to content

Commit

Permalink
fix failover logic and make fault tolerant
Browse files Browse the repository at this point in the history
  • Loading branch information
var77 committed Jun 19, 2024
1 parent 478a40e commit e297d72
Show file tree
Hide file tree
Showing 10 changed files with 136 additions and 80 deletions.
7 changes: 0 additions & 7 deletions lib/hosting/gcp_apis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -444,11 +444,4 @@ def get_image(name)
Hosting::GcpApis.check_errors(response)
JSON.parse(response.body).merge({"resource_name" => "projects/#{@project}/global/images/#{name}"})
end

def swap_ips(vm_name1:, vm_name2:, zone1:, zone2:, ip1:, ip2:)
delete_ephermal_ipv4(vm_name1, zone1)
delete_ephermal_ipv4(vm_name2, zone2)
assign_static_ipv4(vm_name1, ip2, zone1)
assign_static_ipv4(vm_name2, ip1, zone2)
end
end
26 changes: 26 additions & 0 deletions model/gcp_vm.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,30 @@ def is_stopped?
def self.redacted_columns
super + [:public_key]
end

def swap_ip(vm)
# swap ips in gcp
gcp_client = Hosting::GcpApis.new
zone1 = "#{location}-a"
zone2 = "#{vm.location}-a"
gcp_client.delete_ephermal_ipv4(name, zone1)
gcp_client.delete_ephermal_ipv4(vm.name, zone2)
gcp_client.assign_static_ipv4(name, vm.sshable.host, zone1)
gcp_client.assign_static_ipv4(vm.name, sshable.host, zone2)

# update sshable hosts
current_host = sshable.host
new_host = vm.sshable.host
sshable.update(host: "temp_#{name}")
vm.sshable.update(host: current_host)
sshable.update(host: new_host)
current_address_name = address_name

# update address names
update(address_name: vm.address_name)
vm.update(address_name: current_address_name)

sshable.invalidate_cache_entry
vm.sshable.invalidate_cache_entry
end
end
2 changes: 1 addition & 1 deletion model/lantern/lantern_doctor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def has_system_query?(queries, query)

def should_run?
return false unless resource
resource.representative_server.display_state == "running" && resource.representative_server.strand.label == "wait"
resource.display_state == "running" && resource.representative_server.strand.label == "wait"
end

def sync_system_queries
Expand Down
1 change: 1 addition & 0 deletions model/lantern/lantern_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def display_state
return "updating" if vm.display_state == "updating" || strand.label.include?("update") || strand.label == "init_sql"
return "unavailable" if strand.label == "wait_db_available"
return "running" if strand.label == "wait"
return "failover" if ["take_over", "wait_swap_ip", "promote_server"].include?(strand.label)
"creating"
end

Expand Down
52 changes: 26 additions & 26 deletions prog/lantern/lantern_server_nexus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -449,45 +449,45 @@ def destroy_domain
nap 30
end

label def take_over
decr_take_over
if !lantern_server.standby?
hop_wait
end

label def promote_server
current_master = lantern_server.resource.representative_server
api = Hosting::GcpApis.new

current_master_host = current_master.vm.sshable.host
new_master_host = lantern_server.vm.sshable.host
current_master_domain = current_master.domain
new_master_domain = lantern_server.domain

api.swap_ips(
vm_name1: current_master.vm.name,
vm_name2: lantern_server.vm.name,
zone1: "#{current_master.vm.location}-a",
zone2: "#{lantern_server.vm.location}-a",
ip1: current_master_host,
ip2: new_master_host
)

lantern_server.vm.sshable.update(host: current_master_host)
current_master.vm.sshable.update(host: new_master_host)
lantern_server.update(domain: current_master_domain)
current_master.update(domain: new_master_domain)

current_master_addr_name = current_master.vm.address_name
new_master_addr_name = lantern_server.vm.address_name
lantern_server.vm.update(address_name: current_master_addr_name)
current_master.vm.update(address_name: new_master_addr_name)

lantern_server.run_query("SELECT pg_promote(true, 120);")
current_master.lazy_change_replication_mode("slave")
lantern_server.lazy_change_replication_mode("master")

hop_wait
end

label def wait_swap_ip
# wait until ip change will propogate
begin
is_in_recovery = lantern_server.run_query("SELECT pg_is_in_recovery()").chomp == "t"
nap 5 if !is_in_recovery
rescue
nap 5
end

hop_promote_server
end

label def take_over
decr_take_over
if !lantern_server.standby?
hop_wait
end

lantern_server.vm.swap_ip(lantern_server.resource.representative_server.vm)

register_deadline(:promote_server, 5 * 60)
hop_wait_swap_ip
end

label def unavailable
# TODO
# if postgres_server.primary? && (standby = postgres_server.failover_target)
Expand Down
11 changes: 0 additions & 11 deletions spec/lib/hosting/gcp_apis_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -454,16 +454,5 @@
expect { api.allow_access_to_big_query_dataset(service_account_email, dataset_id) }.not_to raise_error
end
end

describe "#swap_ips" do
it "swap server ips" do
api = described_class.new
expect(api).to receive(:delete_ephermal_ipv4).with("vm1", "zone1")
expect(api).to receive(:delete_ephermal_ipv4).with("vm2", "zone2")
expect(api).to receive(:assign_static_ipv4).with("vm1", "ip2", "zone1")
expect(api).to receive(:assign_static_ipv4).with("vm2", "ip1", "zone2")
expect { api.swap_ips(vm_name1: "vm1", vm_name2: "vm2", zone1: "zone1", zone2: "zone2", ip1: "ip1", ip2: "ip2") }.not_to raise_error
end
end
end
end
24 changes: 23 additions & 1 deletion spec/model/gcp_vm_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
subject(:gcp_vm) {
described_class.new(
name: "vm1",
location: "us-central1"
location: "us-central1",
address_name: "vm1-addr"
) { _1.id = "c068cac7-ed45-82db-bf38-a003582b36ee" }
}

Expand Down Expand Up @@ -70,4 +71,25 @@
expect(gcp_vm.is_stopped?).to be(true)
end
end

describe "#swap_ip" do
it "swap server ips" do
api = instance_double(Hosting::GcpApis)
expect(api).to receive(:delete_ephermal_ipv4).with("vm1", "us-central1-a")
expect(api).to receive(:delete_ephermal_ipv4).with("vm2", "us-central1-a")
expect(api).to receive(:assign_static_ipv4).with("vm1", "ip2", "us-central1-a")
expect(api).to receive(:assign_static_ipv4).with("vm2", "ip1", "us-central1-a")
expect(Hosting::GcpApis).to receive(:new).and_return(api)
vm2 = instance_double(described_class, name: "vm2", address_name: "vm2-addr", location: "us-central1", sshable: instance_double(Sshable, host: "ip2"))
expect(gcp_vm).to receive(:sshable).and_return(instance_double(Sshable, host: "ip1")).at_least(:once)
expect(gcp_vm.sshable).to receive(:invalidate_cache_entry)
expect(vm2.sshable).to receive(:invalidate_cache_entry)
expect(gcp_vm.sshable).to receive(:update).with(host: "temp_vm1")
expect(gcp_vm.sshable).to receive(:update).with(host: "ip2")
expect(vm2.sshable).to receive(:update).with(host: "ip1")
expect(gcp_vm).to receive(:update).with(address_name: "vm2-addr")
expect(vm2).to receive(:update).with(address_name: "vm1-addr")
expect { gcp_vm.swap_ip(vm2) }.not_to raise_error
end
end
end
6 changes: 3 additions & 3 deletions spec/model/lantern/lantern_doctor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,17 @@

describe "#should_run" do
it "returns true" do
expect(lantern_doctor).to receive(:resource).and_return(instance_double(LanternResource, representative_server: instance_double(LanternServer, display_state: "running", strand: instance_double(Strand, label: "wait")))).at_least(:once)
expect(lantern_doctor).to receive(:resource).and_return(instance_double(LanternResource, display_state: "running", representative_server: instance_double(LanternServer, strand: instance_double(Strand, label: "wait")))).at_least(:once)
expect(lantern_doctor.should_run?).to be(true)
end

it "returns false if not running" do
expect(lantern_doctor).to receive(:resource).and_return(instance_double(LanternResource, representative_server: instance_double(LanternServer, display_state: "stopped", strand: instance_double(Strand, label: "start")))).at_least(:once)
expect(lantern_doctor).to receive(:resource).and_return(instance_double(LanternResource, display_state: "failover", representative_server: instance_double(LanternServer, strand: instance_double(Strand, label: "start")))).at_least(:once)
expect(lantern_doctor.should_run?).to be(false)
end

it "returns false" do
expect(lantern_doctor).to receive(:resource).and_return(instance_double(LanternResource, representative_server: instance_double(LanternServer, display_state: "running", strand: instance_double(Strand, label: "start")))).at_least(:once)
expect(lantern_doctor).to receive(:resource).and_return(instance_double(LanternResource, display_state: "running", representative_server: instance_double(LanternServer, display_state: "running", strand: instance_double(Strand, label: "start")))).at_least(:once)
expect(lantern_doctor.should_run?).to be(false)
end

Expand Down
32 changes: 25 additions & 7 deletions spec/model/lantern/lantern_server_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,24 @@
expect(lantern_server).to receive(:strand).and_return(instance_double(Strand, label: "unknown")).at_least(:once)
expect(lantern_server.display_state).to eq("failed")
end

it "shows failover when label is take_over" do
expect(lantern_server.vm).to receive(:display_state).and_return("running").at_least(:once)
expect(lantern_server).to receive(:strand).and_return(instance_double(Strand, label: "take_over")).at_least(:once)
expect(lantern_server.display_state).to eq("failover")
end

it "shows failover when label is wait_swap_ip" do
expect(lantern_server.vm).to receive(:display_state).and_return("running").at_least(:once)
expect(lantern_server).to receive(:strand).and_return(instance_double(Strand, label: "wait_swap_ip")).at_least(:once)
expect(lantern_server.display_state).to eq("failover")
end

it "shows failover when label is promote_server" do
expect(lantern_server.vm).to receive(:display_state).and_return("running").at_least(:once)
expect(lantern_server).to receive(:strand).and_return(instance_double(Strand, label: "promote_server")).at_least(:once)
expect(lantern_server.display_state).to eq("failover")
end
end

it "returns name from ubid" do
Expand Down Expand Up @@ -531,7 +549,7 @@

describe "Lsn monitor" do
it "fails to initiate a new health monitor session" do
expect(lantern_server).to receive(:strand).and_return(instance_double(Strand, label: "setup domain")).at_least(:once)
expect(lantern_server).to receive(:strand).and_return(instance_double(Strand, label: "setup domain")).at_least(:once).at_least(:once)
expect { lantern_server.init_health_monitor_session }.to raise_error "server is not ready to initialize session"
end

Expand All @@ -551,8 +569,8 @@
}

expect(lantern_server).to receive(:destroy_set?).and_return(false)
expect(lantern_server).to receive(:display_state).and_return("running")
expect(lantern_server).to receive(:strand).and_return(instance_double(Strand, label: "wait"))
expect(lantern_server).to receive(:display_state).and_return("running").at_least(:once)
expect(lantern_server).to receive(:strand).and_return(instance_double(Strand, label: "wait")).at_least(:once)
expect(lantern_server).not_to receive(:incr_checkup)
lantern_server.check_pulse(session: session, previous_pulse: pulse)
end
Expand All @@ -567,9 +585,9 @@
reading_chg: Time.now - 30
}

expect(lantern_server).to receive(:display_state).and_return("running")
expect(lantern_server).to receive(:display_state).and_return("running").at_least(:once)
expect(lantern_server).to receive(:destroy_set?).and_return(false)
expect(lantern_server).to receive(:strand).and_return(instance_double(Strand, label: "wait"))
expect(lantern_server).to receive(:strand).and_return(instance_double(Strand, label: "wait")).at_least(:once)
expect(lantern_server).to receive(:primary?).and_return(true)
expect(lantern_server).not_to receive(:incr_checkup)
lantern_server.check_pulse(session: session, previous_pulse: pulse)
Expand All @@ -585,9 +603,9 @@
reading_chg: Time.now - 30
}

expect(lantern_server).to receive(:display_state).and_return("running")
expect(lantern_server).to receive(:display_state).and_return("running").at_least(:once)
expect(lantern_server).to receive(:destroy_set?).and_return(false)
expect(lantern_server).to receive(:strand).and_return(instance_double(Strand, label: "wait"))
expect(lantern_server).to receive(:strand).and_return(instance_double(Strand, label: "wait")).at_least(:once)
expect(session[:db_connection]).to receive(:[]).and_raise(Sequel::DatabaseConnectionError)
expect(lantern_server).to receive(:incr_checkup)
lantern_server.check_pulse(session: session, previous_pulse: pulse)
Expand Down
55 changes: 31 additions & 24 deletions spec/prog/lantern/lantern_server_nexus_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -927,36 +927,43 @@
expect { nx.take_over }.to hop("wait")
end

it "promotes to master" do
it "swap ips" do
expect(lantern_server).to receive(:standby?).and_return(true)
expect(lantern_server).to receive(:run_query).with("SELECT pg_promote(true, 120);")
expect(lantern_server.vm.sshable).to receive(:host).and_return("127.0.0.2").at_least(:once)
expect(lantern_server.vm).to receive(:name).and_return("new-master").at_least(:once)
expect(lantern_server.vm).to receive(:location).and_return("us-central1").at_least(:once)
expect(lantern_server.vm).to receive(:address_name).and_return("new-addr").at_least(:once)

current_master = instance_double(LanternServer, domain: "db1.lantern.dev", vm: instance_double(GcpVm, sshable: instance_double(Sshable, host: "127.0.0.1"), name: "old-master", location: "us-east1", address_name: "old-addr"))
current_master_host = current_master.vm.sshable.host
new_master_host = lantern_server.vm.sshable.host
expect(lantern_server.vm.sshable).to receive(:update).with(host: current_master_host).at_least(:once)
expect(current_master.vm.sshable).to receive(:update).with(host: new_master_host).at_least(:once)
expect(lantern_server.resource).to receive(:representative_server).and_return(current_master).at_least(:once)

expect(lantern_server.vm).to receive(:swap_ip).with(current_master.vm)

expect { nx.take_over }.to hop("wait_swap_ip")
end

it "waits until vm available" do
expect(lantern_server).to receive(:run_query).with("SELECT pg_is_in_recovery()").and_raise "test"
expect { nx.wait_swap_ip }.to nap 5
end

it "waits until ip swap done" do
expect(lantern_server).to receive(:run_query).with("SELECT pg_is_in_recovery()").and_return("f")
expect { nx.wait_swap_ip }.to nap 5
end

it "hops to promote" do
expect(lantern_server).to receive(:run_query).with("SELECT pg_is_in_recovery()").and_return("t")
expect { nx.wait_swap_ip }.to hop("promote_server")
end

it "promotes server" do
current_master = instance_double(LanternServer, domain: "db1.lantern.dev", vm: instance_double(GcpVm, sshable: instance_double(Sshable, host: "127.0.0.1"), name: "old-master", location: "us-east1", address_name: "old-addr"))
expect(lantern_server.resource).to receive(:representative_server).and_return(current_master).at_least(:once)

expect(current_master).to receive(:update).with(domain: lantern_server.domain).at_least(:once)
expect(lantern_server).to receive(:update).with(domain: current_master.domain).at_least(:once)
expect(lantern_server.vm).to receive(:update).with(address_name: "old-addr").at_least(:once)
expect(current_master.vm).to receive(:update).with(address_name: "new-addr").at_least(:once)
expect(lantern_server.resource).to receive(:representative_server).and_return(current_master).at_least(:once)
gcp_api = instance_double(Hosting::GcpApis)
expect(Hosting::GcpApis).to receive(:new).and_return(gcp_api).at_least(:once)
expect(gcp_api).to receive(:swap_ips).with(
vm_name1: current_master.vm.name,
vm_name2: lantern_server.vm.name,
zone1: "#{current_master.vm.location}-a",
zone2: "#{lantern_server.vm.location}-a",
ip1: current_master_host,
ip2: new_master_host
)

expect(lantern_server).to receive(:run_query).with("SELECT pg_promote(true, 120);")
expect(current_master).to receive(:lazy_change_replication_mode).with("slave")
expect(lantern_server).to receive(:lazy_change_replication_mode).with("master")
expect { nx.take_over }.to hop("wait")
expect { nx.promote_server }.to hop("wait")
end
end
end

0 comments on commit e297d72

Please sign in to comment.