Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

physical replication #64

Merged
merged 2 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def self.e2e_test?
override :e2e_test, "0"
override :backup_retention_days, 7, int
override :lantern_log_dataset, "lantern_logs", string
override :compose_file, "/var/lib/lantern/docker-compose.yaml", string

# Cloudflare
optional :cf_token, string
Expand Down
4 changes: 4 additions & 0 deletions model/lantern/lantern_resource.rb
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ def disable_logical_subscription
end

def create_logical_replica(lantern_version: nil, extras_version: nil, minor_version: nil)
# TODO::
# 1. If new database will be created during logical replication it won't be added automatically
# 2. New timeline will be generated for lantern resource
# 3. We need rollback mechanism (basically that will be ip swap again)
ubid = LanternResource.generate_ubid
create_ddl_log
create_publication("pub_#{ubid}")
Expand Down
22 changes: 12 additions & 10 deletions model/lantern/lantern_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class LanternServer < Sequel::Model
include SemaphoreMethods

semaphore :initial_provisioning, :update_user_password, :update_lantern_extension, :update_extras_extension, :update_image, :setup_ssl, :add_domain, :update_rhizome, :checkup
semaphore :start_server, :stop_server, :restart_server, :take_over, :destroy, :update_storage_size, :update_vm_size, :update_memory_limits, :init_sql, :restart
semaphore :start_server, :stop_server, :restart_server, :take_over, :destroy, :update_storage_size, :update_vm_size, :update_memory_limits, :init_sql, :restart, :container_stopped

def self.ubid_to_name(id)
id.to_s[0..7]
Expand Down Expand Up @@ -48,7 +48,7 @@ def connection_string(port: 6432)
end

def run_query(query, db: "postgres", user: "postgres")
vm.sshable.cmd("sudo docker compose -f /var/lib/lantern/docker-compose.yaml exec -T postgresql psql -q -U #{user} -t --csv #{db}", stdin: query).chomp
vm.sshable.cmd("sudo docker compose -f #{Config.compose_file} exec -T postgresql psql -q -U #{user} -t --csv #{db}", stdin: query).chomp
end

def run_query_all(query)
Expand All @@ -57,7 +57,7 @@ def run_query_all(query)

def display_state
return "deleting" if destroy_set? || strand.label == "destroy"
return "stopped" if vm.display_state == "stopped"
return "stopped" if vm.display_state == "stopped" || strand.label == "container_stopped"
return "stopping" if vm.display_state == "stopping"
return "starting" if vm.display_state == "starting"
return "failed" if vm.display_state == "failed"
Expand Down Expand Up @@ -132,13 +132,15 @@ def configure_hash
})
end

def lazy_change_replication_mode(replication_mode)
def change_replication_mode(replication_mode, update_env: true)
update(timeline_access: (replication_mode == "master") ? "push" : "fetch", representative_at: (replication_mode == "master") ? Time.new : nil)
vm.sshable.cmd("sudo lantern/bin/lazy_update_env", stdin: JSON.generate([
["POSTGRESQL_REPLICATION_MODE", replication_mode],
["INSTANCE_TYPE", (replication_mode == "master") ? "writer" : "reader"],
["POSTGRESQL_RECOVER_FROM_BACKUP", ""]
]))
if update_env
vm.sshable.cmd("sudo lantern/bin/update_env", stdin: JSON.generate([
["POSTGRESQL_REPLICATION_MODE", replication_mode],
["INSTANCE_TYPE", (replication_mode == "master") ? "writer" : "reader"],
["POSTGRESQL_RECOVER_FROM_BACKUP", ""]
]))
end
end

def update_walg_creds
Expand Down Expand Up @@ -218,7 +220,7 @@ def prewarm_indexes_query
end

def list_all_databases
vm.sshable.cmd("sudo docker compose -f /var/lib/lantern/docker-compose.yaml exec postgresql psql -U postgres -P \"footer=off\" -c 'SELECT datname from pg_database' | tail -n +3 | grep -v 'template0' | grep -v 'template1'")
vm.sshable.cmd("sudo docker compose -f #{Config.compose_file} exec postgresql psql -U postgres -P \"footer=off\" -c 'SELECT datname from pg_database' | tail -n +3 | grep -v 'template0' | grep -v 'template1'")
.chomp
.strip
.split("\n")
Expand Down
35 changes: 29 additions & 6 deletions prog/lantern/lantern_server_nexus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class Prog::Lantern::LanternServerNexus < Prog::Base
def_delegators :lantern_server, :vm

semaphore :initial_provisioning, :update_user_password, :update_lantern_extension, :update_extras_extension, :update_image, :add_domain, :update_rhizome, :checkup
semaphore :start_server, :stop_server, :restart_server, :take_over, :destroy, :update_storage_size, :update_vm_size, :update_memory_limits, :init_sql, :restart
semaphore :start_server, :stop_server, :restart_server, :take_over, :destroy, :update_storage_size, :update_vm_size, :update_memory_limits, :init_sql, :restart, :container_stopped

def self.assemble(
resource_id: nil, lantern_version: "0.2.2", extras_version: "0.1.4", minor_version: "1", domain: nil,
Expand Down Expand Up @@ -446,9 +446,23 @@ def destroy_domain
hop_take_over
end

when_container_stopped_set? do
hop_container_stopped
end

nap 30
end

label def container_stopped
decr_container_stopped
when_take_over_set? do
vm.sshable.cmd("sudo docker compose -f #{Config.compose_file} up -d")
hop_take_over
end

nap 15
end

label def promote_server
current_master = lantern_server.resource.representative_server
current_master_domain = current_master.domain
Expand All @@ -458,17 +472,19 @@ def destroy_domain
current_master.update(domain: new_master_domain)

lantern_server.run_query("SELECT pg_promote(true, 120);")
current_master.lazy_change_replication_mode("slave")
lantern_server.lazy_change_replication_mode("master")
# we will mark the old server as slave,
# but don't change the docker env, so in case of emergency
# we could rollback to that instance
current_master.change_replication_mode("slave", update_env: false)
lantern_server.change_replication_mode("master")

hop_wait
end

label def wait_swap_ip
# wait until ip change will propogate
begin
is_in_recovery = lantern_server.run_query("SELECT pg_is_in_recovery()").chomp == "t"
nap 5 if !is_in_recovery
lantern_server.run_query("SELECT 1")
rescue
nap 5
end
Expand All @@ -482,8 +498,15 @@ def destroy_domain
hop_wait
end

lantern_server.vm.swap_ip(lantern_server.resource.representative_server.vm)
lantern_server.resource.representative_server.vm.sshable.cmd("sudo docker compose -f #{Config.compose_file} down -t 60")
# put the old server in container_stopped mode, so no healthcheck will be done
lantern_server.resource.representative_server.incr_container_stopped

hop_swap_ip
end

label def swap_ip
lantern_server.vm.swap_ip(lantern_server.resource.representative_server.vm)
register_deadline(:promote_server, 5 * 60)
hop_wait_swap_ip
end
Expand Down
45 changes: 45 additions & 0 deletions routes/web/project/location/lantern.rb
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,51 @@ class CloverWeb
# pg.gcp_vm.incr_restart
# r.redirect "#{@project.path}#{pg.path}"
# end

r.on "replica" do
r.post true do
Authorization.authorize(@current_user.id, "Postgres:create", @project.id)
Authorization.authorize(@current_user.id, "Postgres:view", pg.id)

Prog::Lantern::LanternServerNexus.assemble(
resource_id: pg.id,
lantern_version: r.params["replica_lantern_version"],
extras_version: r.params["replica_extras_version"],
minor_version: r.params["replica_minor_version"],
target_vm_size: r.params["replica_vm_size"],
target_storage_size_gib: pg.representative_server.target_storage_size_gib,
timeline_id: pg.timeline.id,
timeline_access: "fetch"
)

flash["notice"] = "A new replica server is being added"
r.redirect "#{@project.path}#{pg.path}"
end

r.on String do |server_id|
server = pg.servers.find { |s| s.id == server_id }

r.post "promote" do
Authorization.authorize(@current_user.id, "Postgres:edit", @project.id)
Authorization.authorize(@current_user.id, "Postgres:view", pg.id)

server.incr_take_over
r.redirect "#{@project.path}#{pg.path}"
end

r.delete true do
Authorization.authorize(@current_user.id, "Postgres:delete", @project.id)
Authorization.authorize(@current_user.id, "Postgres:view", pg.id)

if server.primary?
return {message: "Cannot delete primary server"}.to_json
else
server.incr_destroy
return {message: "Deleting replica"}.to_json
end
end
end
end
end
end
end
23 changes: 18 additions & 5 deletions spec/model/lantern/lantern_server_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@
expect(lantern_server.display_state).to eq("stopped")
end

it "shows stopped (container)" do
expect(lantern_server.vm).to receive(:display_state).and_return("running").at_least(:once)
expect(lantern_server).to receive(:strand).and_return(instance_double(Strand, label: "container_stopped")).at_least(:once)
expect(lantern_server.display_state).to eq("stopped")
end

it "shows failed" do
expect(lantern_server.vm).to receive(:display_state).and_return("failed").at_least(:once)
expect(lantern_server).to receive(:strand).and_return(instance_double(Strand, label: "unknown")).at_least(:once)
Expand Down Expand Up @@ -708,27 +714,34 @@
end
end

describe "#lazy_change_replication_mode" do
describe "#change_replication_mode" do
it "changes to master without env" do
time = Time.new
expect(Time).to receive(:new).and_return(time)
expect(lantern_server).to receive(:update).with(timeline_access: "push", representative_at: time)
lantern_server.change_replication_mode("master", update_env: false)
end

it "changes to master" do
time = Time.new
expect(lantern_server.vm.sshable).to receive(:cmd).with("sudo lantern/bin/lazy_update_env", stdin: JSON.generate([
expect(lantern_server.vm.sshable).to receive(:cmd).with("sudo lantern/bin/update_env", stdin: JSON.generate([
["POSTGRESQL_REPLICATION_MODE", "master"],
["INSTANCE_TYPE", "writer"],
["POSTGRESQL_RECOVER_FROM_BACKUP", ""]
]))
expect(Time).to receive(:new).and_return(time)
expect(lantern_server).to receive(:update).with(timeline_access: "push", representative_at: time)
lantern_server.lazy_change_replication_mode("master")
lantern_server.change_replication_mode("master", update_env: true)
end

it "changes to slave" do
expect(lantern_server.vm.sshable).to receive(:cmd).with("sudo lantern/bin/lazy_update_env", stdin: JSON.generate([
expect(lantern_server.vm.sshable).to receive(:cmd).with("sudo lantern/bin/update_env", stdin: JSON.generate([
["POSTGRESQL_REPLICATION_MODE", "slave"],
["INSTANCE_TYPE", "reader"],
["POSTGRESQL_RECOVER_FROM_BACKUP", ""]
]))
expect(lantern_server).to receive(:update).with(timeline_access: "fetch", representative_at: nil)
lantern_server.lazy_change_replication_mode("slave")
lantern_server.change_replication_mode("slave")
end
end
end
45 changes: 33 additions & 12 deletions spec/prog/lantern/lantern_server_nexus_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,11 @@
expect { nx.wait }.to hop("take_over")
end

it "hops to container_stopped" do
nx.incr_container_stopped
expect { nx.wait }.to hop("container_stopped")
end

it "decrements checkup" do
nx.incr_checkup
expect(nx).to receive(:available?).and_return(true)
Expand Down Expand Up @@ -923,33 +928,37 @@
describe "#take_over" do
it "returns if primary" do
expect(lantern_server).to receive(:standby?).and_return(false)
expect(lantern_server).not_to receive(:run_query)
expect { nx.take_over }.to hop("wait")
end

it "swap ips" do
it "stop old master" do
expect(lantern_server).to receive(:standby?).and_return(true)

current_master = instance_double(LanternServer, domain: "db1.lantern.dev", vm: instance_double(GcpVm, sshable: instance_double(Sshable, host: "127.0.0.1"), name: "old-master", location: "us-east1", address_name: "old-addr"))
expect(lantern_server.resource).to receive(:representative_server).and_return(current_master).at_least(:once)

expect(lantern_server.vm).to receive(:swap_ip).with(current_master.vm)
expect(current_master.vm.sshable).to receive(:cmd)
expect(current_master).to receive(:incr_container_stopped)

expect { nx.take_over }.to hop("wait_swap_ip")
expect { nx.take_over }.to hop("swap_ip")
end

it "waits until vm available" do
expect(lantern_server).to receive(:run_query).with("SELECT pg_is_in_recovery()").and_raise "test"
expect { nx.wait_swap_ip }.to nap 5
it "swap ips" do
current_master = instance_double(LanternServer, domain: "db1.lantern.dev", vm: instance_double(GcpVm, sshable: instance_double(Sshable, host: "127.0.0.1"), name: "old-master", location: "us-east1", address_name: "old-addr"))
expect(lantern_server.resource).to receive(:representative_server).and_return(current_master).at_least(:once)

expect(lantern_server.vm).to receive(:swap_ip).with(current_master.vm)

expect { nx.swap_ip }.to hop("wait_swap_ip")
end

it "waits until ip swap done" do
expect(lantern_server).to receive(:run_query).with("SELECT pg_is_in_recovery()").and_return("f")
it "waits until vm available" do
expect(lantern_server).to receive(:run_query).with("SELECT 1").and_raise "test"
expect { nx.wait_swap_ip }.to nap 5
end

it "hops to promote" do
expect(lantern_server).to receive(:run_query).with("SELECT pg_is_in_recovery()").and_return("t")
expect(lantern_server).to receive(:run_query).with("SELECT 1")
expect { nx.wait_swap_ip }.to hop("promote_server")
end

Expand All @@ -961,9 +970,21 @@
expect(lantern_server).to receive(:update).with(domain: current_master.domain).at_least(:once)

expect(lantern_server).to receive(:run_query).with("SELECT pg_promote(true, 120);")
expect(current_master).to receive(:lazy_change_replication_mode).with("slave")
expect(lantern_server).to receive(:lazy_change_replication_mode).with("master")
expect(current_master).to receive(:change_replication_mode).with("slave", update_env: false)
expect(lantern_server).to receive(:change_replication_mode).with("master")
expect { nx.promote_server }.to hop("wait")
end
end

describe "#container_stopped" do
it "hops to take_over" do
nx.incr_take_over
expect(lantern_server.vm.sshable).to receive(:cmd)
expect { nx.container_stopped }.to hop("take_over")
end

it "naps 15" do
expect { nx.container_stopped }.to nap(15)
end
end
end
Loading