Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ha and failover #55

Merged
merged 3 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions lib/hosting/gcp_apis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,8 @@ def get_vm(vm_name, zone)
JSON.parse(response.body)
end

def create_static_ipv4(vm_name, region)
def create_static_ipv4(address_name, region)
connection = Excon.new(@host[:connection_string], headers: @host[:headers])
address_name = "#{vm_name}-addr"
body = {
name: address_name,
networkTier: "PREMIUM",
Expand All @@ -161,9 +160,8 @@ def create_static_ipv4(vm_name, region)
Hosting::GcpApis.check_errors(response)
end

def get_static_ipv4(vm_name, region)
def get_static_ipv4(address_name, region)
connection = Excon.new(@host[:connection_string], headers: @host[:headers])
address_name = "#{vm_name}-addr"
response = connection.get(path: "/compute/v1/projects/#{@project}/regions/#{region}/addresses/#{address_name}", expects: 200)
JSON.parse(response.body)
end
Expand Down Expand Up @@ -194,9 +192,8 @@ def assign_static_ipv4(vm_name, addr, zone)
wait_for_operation(zone, data["id"])
end

def release_ipv4(vm_name, region)
def release_ipv4(address_name, region)
connection = Excon.new(@host[:connection_string], headers: @host[:headers])
address_name = "#{vm_name}-addr"
connection.delete(path: "/compute/v1/projects/#{@project}/regions/#{region}/addresses/#{address_name}", expects: [200, 404])
end

Expand Down
10 changes: 10 additions & 0 deletions migrate/20240618_gcp_vm_address_name.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# frozen_string_literal: true

Sequel.migration do
change do
alter_table(:gcp_vm) do
add_column :address_name, :text, collate: '"C"'
end
run "UPDATE gcp_vm SET address_name=name || '-addr'"
end
end
26 changes: 26 additions & 0 deletions model/gcp_vm.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,30 @@ def is_stopped?
def self.redacted_columns
super + [:public_key]
end

def swap_ip(vm)
# swap ips in gcp
gcp_client = Hosting::GcpApis.new
zone1 = "#{location}-a"
zone2 = "#{vm.location}-a"
gcp_client.delete_ephermal_ipv4(name, zone1)
gcp_client.delete_ephermal_ipv4(vm.name, zone2)
gcp_client.assign_static_ipv4(name, vm.sshable.host, zone1)
gcp_client.assign_static_ipv4(vm.name, sshable.host, zone2)

# update sshable hosts
current_host = sshable.host
new_host = vm.sshable.host
sshable.update(host: "temp_#{name}")
vm.sshable.update(host: current_host)
sshable.update(host: new_host)
current_address_name = address_name

# update address names
update(address_name: vm.address_name)
vm.update(address_name: current_address_name)

sshable.invalidate_cache_entry
vm.sshable.invalidate_cache_entry
end
end
2 changes: 1 addition & 1 deletion model/lantern/lantern_doctor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def has_system_query?(queries, query)

def should_run?
return false unless resource
resource.representative_server.display_state == "running" && resource.representative_server.strand.label == "wait"
resource.display_state == "running" && resource.representative_server.strand.label == "wait"
end

def sync_system_queries
Expand Down
1 change: 1 addition & 0 deletions model/lantern/lantern_resource.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def label
end

def display_state
return "failover" if servers.find { _1.display_state == "failover" }
super || representative_server&.display_state || "unavailable"
end

Expand Down
14 changes: 12 additions & 2 deletions model/lantern/lantern_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def display_state
return "updating" if vm.display_state == "updating" || strand.label.include?("update") || strand.label == "init_sql"
return "unavailable" if strand.label == "wait_db_available"
return "running" if strand.label == "wait"
return "failover" if ["take_over", "wait_swap_ip", "promote_server"].include?(strand.label)
"creating"
end

Expand Down Expand Up @@ -114,8 +115,8 @@ def configure_hash
db_user: resource.db_user || "",
db_user_password: resource.db_user_password || "",
postgres_password: resource.superuser_password || "",
# master_host: lantern_server.master_host,
# master_port: lantern_server.master_port,
master_host: resource.representative_server.hostname,
master_port: 5432,
prom_password: Config.prom_password,
gcp_creds_gcr_b64: Config.gcp_creds_gcr_b64,
gcp_creds_coredumps_b64: Config.gcp_creds_coredumps_b64,
Expand All @@ -131,6 +132,15 @@ def configure_hash
})
end

def lazy_change_replication_mode(replication_mode)
update(timeline_access: (replication_mode == "master") ? "push" : "fetch", representative_at: (replication_mode == "master") ? Time.new : nil)
vm.sshable.cmd("sudo lantern/bin/lazy_update_env", stdin: JSON.generate([
["POSTGRESQL_REPLICATION_MODE", replication_mode],
["INSTANCE_TYPE", (replication_mode == "master") ? "writer" : "reader"],
["POSTGRESQL_RECOVER_FROM_BACKUP", ""]
]))
end

def update_walg_creds
walg_config = timeline.generate_walg_config
vm.sshable.cmd("sudo lantern/bin/update_env", stdin: JSON.generate([
Expand Down
8 changes: 5 additions & 3 deletions prog/gcp_vm/nexus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def host

label def wait_ipv4
gcp_client = Hosting::GcpApis.new
addr_info = gcp_client.get_static_ipv4(gcp_vm.name, gcp_vm.location)
addr_info = gcp_client.get_static_ipv4(gcp_vm.address_name, gcp_vm.location)
if addr_info["status"] == "RESERVED"
gcp_client.delete_ephermal_ipv4(gcp_vm.name, "#{gcp_vm.location}-a")
gcp_client.assign_static_ipv4(gcp_vm.name, addr_info["address"], "#{gcp_vm.location}-a")
Expand All @@ -82,7 +82,9 @@ def host
gcp_client = Hosting::GcpApis.new
vm = gcp_client.get_vm(gcp_vm.name, "#{gcp_vm.location}-a")
if vm["status"] == "RUNNING"
gcp_client.create_static_ipv4(gcp_vm.name, gcp_vm.location)
address_name = "#{gcp_vm.name}-addr"
gcp_client.create_static_ipv4(address_name, gcp_vm.location)
gcp_vm.update(address_name: address_name)
register_deadline(:wait, 5 * 60)
hop_wait_ipv4
else
Expand Down Expand Up @@ -229,7 +231,7 @@ def host
gcp_client = Hosting::GcpApis.new
gcp_client.delete_vm(gcp_vm.name, "#{gcp_vm.location}-a")
if gcp_vm.has_static_ipv4
gcp_client.release_ipv4(gcp_vm.name, gcp_vm.location)
gcp_client.release_ipv4(gcp_vm.address_name, gcp_vm.location)
end
strand.children.each { _1.destroy }
gcp_vm.projects.map { gcp_vm.dissociate_with_project(_1) }
Expand Down
22 changes: 20 additions & 2 deletions prog/lantern/lantern_resource_nexus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,16 @@ def self.assemble(project_id:, location:, name:, target_vm_size:, target_storage
)

lantern_resource.required_standby_count.times do
Prog::Lantern::LanternServerNexus.assemble(resource_id: lantern_resource.id, timeline_id: timeline_id, timeline_access: "fetch")
Prog::Lantern::LanternServerNexus.assemble(
resource_id: lantern_resource.id,
timeline_id: timeline_id,
timeline_access: "fetch",
lantern_version: lantern_version,
extras_version: extras_version,
minor_version: minor_version,
target_vm_size: target_vm_size,
target_storage_size_gib: target_storage_size_gib
)
end

Strand.create(prog: "Lantern::LanternResourceNexus", label: "start") { _1.id = lantern_resource.id }
Expand Down Expand Up @@ -152,7 +161,16 @@ def before_run
label def wait
# Create missing standbys
(lantern_resource.required_standby_count + 1 - lantern_resource.servers.count).times do
Prog::Lantern::LanternServerNexus.assemble(resource_id: lantern_resource.id, timeline_id: lantern_resource.timeline.id, timeline_access: "fetch")
Prog::Lantern::LanternServerNexus.assemble(
resource_id: lantern_resource.id,
lantern_version: lantern_resource.representative_server.lantern_version,
extras_version: lantern_resource.representative_server.extras_version,
minor_version: lantern_resource.representative_server.minor_version,
target_vm_size: lantern_resource.representative_server.target_vm_size,
target_storage_size_gib: lantern_resource.representative_server.target_storage_size_gib,
timeline_id: lantern_resource.timeline.id,
timeline_access: "fetch"
)
end

if lantern_resource.display_state == "failed"
Expand Down
47 changes: 45 additions & 2 deletions prog/lantern/lantern_server_nexus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def before_run
end

label def wait_catch_up
query = "SELECT pg_current_wal_lsn() - replay_lsn FROM pg_stat_replication WHERE application_name = '#{lantern_server.ubid}'"
query = "SELECT pg_current_wal_lsn() - replay_lsn FROM pg_stat_replication WHERE application_name = 'walreceiver'"
lag = lantern_server.resource.representative_server.run_query(query).chomp

nap 30 if lag.empty? || lag.to_i > 80 * 1024 * 1024 # 80 MB or ~5 WAL files
Expand All @@ -175,7 +175,7 @@ def before_run
end

label def wait_synchronization
query = "SELECT sync_state FROM pg_stat_replication WHERE application_name = '#{lantern_server.ubid}'"
query = "SELECT sync_state FROM pg_stat_replication WHERE application_name = 'walreceiver'"
sync_state = lantern_server.resource.representative_server.run_query(query).chomp
hop_wait if ["quorum", "sync"].include?(sync_state)

Expand Down Expand Up @@ -442,9 +442,52 @@ def destroy_domain
hop_update_vm_size
end

when_take_over_set? do
hop_take_over
end

nap 30
end

label def promote_server
current_master = lantern_server.resource.representative_server
current_master_domain = current_master.domain
new_master_domain = lantern_server.domain

lantern_server.update(domain: current_master_domain)
current_master.update(domain: new_master_domain)

lantern_server.run_query("SELECT pg_promote(true, 120);")
current_master.lazy_change_replication_mode("slave")
lantern_server.lazy_change_replication_mode("master")

hop_wait
end

label def wait_swap_ip
# wait until ip change will propogate
begin
is_in_recovery = lantern_server.run_query("SELECT pg_is_in_recovery()").chomp == "t"
nap 5 if !is_in_recovery
rescue
nap 5
end

hop_promote_server
end

label def take_over
decr_take_over
if !lantern_server.standby?
hop_wait
end

lantern_server.vm.swap_ip(lantern_server.resource.representative_server.vm)

register_deadline(:promote_server, 5 * 60)
hop_wait_swap_ip
end

label def unavailable
# TODO
# if postgres_server.primary? && (standby = postgres_server.failover_target)
Expand Down
11 changes: 11 additions & 0 deletions rhizome/lantern/bin/lazy_update_env
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/env ruby
# frozen_string_literal: true

require "json"
require "yaml"
require_relative "../../common/lib/util"
require_relative "../lib/common"

env_arr = JSON.parse($stdin.read)

append_env(env_arr)
4 changes: 2 additions & 2 deletions spec/lib/hosting/gcp_apis_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
stub_request(:post, "https://compute.googleapis.com/compute/v1/projects/test-project/regions/us-central1/addresses").to_return(status: 200, body: JSON.dump({}), headers: {"Content-Type" => "application/json"})
stub_request(:get, "https://compute.googleapis.com/compute/v1/projects/test-project/regions/us-central1/addresses/dummy-vm-addr").to_return(status: 200, body: JSON.dump({status: "CREATING", address: "1.1.1.1"}), headers: {"Content-Type" => "application/json"})
api = described_class.new
expect(api.get_static_ipv4("dummy-vm", "us-central1")).to eq({"status" => "CREATING", "address" => "1.1.1.1"})
expect(api.get_static_ipv4("dummy-vm-addr", "us-central1")).to eq({"status" => "CREATING", "address" => "1.1.1.1"})
end
end

Expand Down Expand Up @@ -155,7 +155,7 @@
api = described_class.new
stub_request(:post, "https://oauth2.googleapis.com/token").to_return(status: 200, body: JSON.dump({}), headers: {"Content-Type" => "application/json"})
stub_request(:delete, "https://compute.googleapis.com/compute/v1/projects/test-project/regions/us-central1/addresses/dummy-vm-addr")
expect { api.release_ipv4("dummy-vm", "us-central1") }.not_to raise_error
expect { api.release_ipv4("dummy-vm-addr", "us-central1") }.not_to raise_error
end
end

Expand Down
24 changes: 23 additions & 1 deletion spec/model/gcp_vm_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
subject(:gcp_vm) {
described_class.new(
name: "vm1",
location: "us-central1"
location: "us-central1",
address_name: "vm1-addr"
) { _1.id = "c068cac7-ed45-82db-bf38-a003582b36ee" }
}

Expand Down Expand Up @@ -70,4 +71,25 @@
expect(gcp_vm.is_stopped?).to be(true)
end
end

describe "#swap_ip" do
it "swap server ips" do
api = instance_double(Hosting::GcpApis)
expect(api).to receive(:delete_ephermal_ipv4).with("vm1", "us-central1-a")
expect(api).to receive(:delete_ephermal_ipv4).with("vm2", "us-central1-a")
expect(api).to receive(:assign_static_ipv4).with("vm1", "ip2", "us-central1-a")
expect(api).to receive(:assign_static_ipv4).with("vm2", "ip1", "us-central1-a")
expect(Hosting::GcpApis).to receive(:new).and_return(api)
vm2 = instance_double(described_class, name: "vm2", address_name: "vm2-addr", location: "us-central1", sshable: instance_double(Sshable, host: "ip2"))
expect(gcp_vm).to receive(:sshable).and_return(instance_double(Sshable, host: "ip1")).at_least(:once)
expect(gcp_vm.sshable).to receive(:invalidate_cache_entry)
expect(vm2.sshable).to receive(:invalidate_cache_entry)
expect(gcp_vm.sshable).to receive(:update).with(host: "temp_vm1")
expect(gcp_vm.sshable).to receive(:update).with(host: "ip2")
expect(vm2.sshable).to receive(:update).with(host: "ip1")
expect(gcp_vm).to receive(:update).with(address_name: "vm2-addr")
expect(vm2).to receive(:update).with(address_name: "vm1-addr")
expect { gcp_vm.swap_ip(vm2) }.not_to raise_error
end
end
end
6 changes: 3 additions & 3 deletions spec/model/lantern/lantern_doctor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,17 @@

describe "#should_run" do
it "returns true" do
expect(lantern_doctor).to receive(:resource).and_return(instance_double(LanternResource, representative_server: instance_double(LanternServer, display_state: "running", strand: instance_double(Strand, label: "wait")))).at_least(:once)
expect(lantern_doctor).to receive(:resource).and_return(instance_double(LanternResource, display_state: "running", representative_server: instance_double(LanternServer, strand: instance_double(Strand, label: "wait")))).at_least(:once)
expect(lantern_doctor.should_run?).to be(true)
end

it "returns false if not running" do
expect(lantern_doctor).to receive(:resource).and_return(instance_double(LanternResource, representative_server: instance_double(LanternServer, display_state: "stopped", strand: instance_double(Strand, label: "start")))).at_least(:once)
expect(lantern_doctor).to receive(:resource).and_return(instance_double(LanternResource, display_state: "failover", representative_server: instance_double(LanternServer, strand: instance_double(Strand, label: "start")))).at_least(:once)
expect(lantern_doctor.should_run?).to be(false)
end

it "returns false" do
expect(lantern_doctor).to receive(:resource).and_return(instance_double(LanternResource, representative_server: instance_double(LanternServer, display_state: "running", strand: instance_double(Strand, label: "start")))).at_least(:once)
expect(lantern_doctor).to receive(:resource).and_return(instance_double(LanternResource, display_state: "running", representative_server: instance_double(LanternServer, display_state: "running", strand: instance_double(Strand, label: "start")))).at_least(:once)
expect(lantern_doctor.should_run?).to be(false)
end

Expand Down
10 changes: 10 additions & 0 deletions spec/model/lantern/lantern_resource_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@
expect(lantern_resource.display_state).to eq("running")
end

it "returns failed as display state" do
expect(lantern_resource).to receive(:display_state).and_return("failed")
expect(lantern_resource.display_state).to eq("failed")
end

it "returns failover" do
expect(lantern_resource).to receive(:servers).and_return([instance_double(LanternServer, display_state: "running", strand: instance_double(Strand, label: "wait")), instance_double(LanternServer, display_state: "failover", strand: instance_double(Strand, label: "take_over"))])
expect(lantern_resource.display_state).to eq("failover")
end

it "returns unavailable as display state if no representative_server" do
expect(lantern_resource).to receive(:representative_server).and_return(nil)
expect(lantern_resource.display_state).to eq("unavailable")
Expand Down
Loading