Skip to content

Commit

Permalink
create replication slot before starting streaming replica
Browse files Browse the repository at this point in the history
  • Loading branch information
var77 committed Jun 28, 2024
1 parent bb40031 commit 067fa33
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 9 deletions.
12 changes: 10 additions & 2 deletions model/lantern/lantern_resource.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,18 @@ def set_to_readonly(status: "on")
")
end

def create_replication_slot(name)
def create_logical_replication_slot(name)
representative_server.run_query("SELECT lsn FROM pg_create_logical_replication_slot('#{name}', 'pgoutput');").chomp.strip
end

def create_physical_replication_slot(name)
representative_server.run_query("SELECT lsn FROM pg_create_physical_replication_slot('#{name}');").chomp.strip
end

def delete_replication_slot(name)
representative_server.run_query("SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name='#{name}';")
end

def create_ddl_log
commands = <<SQL
BEGIN;
Expand Down Expand Up @@ -199,7 +207,7 @@ def create_logical_replica(lantern_version: nil, extras_version: nil, minor_vers
ubid = LanternResource.generate_ubid
create_ddl_log
create_publication("pub_#{ubid}")
slot_lsn = create_replication_slot("slot_#{ubid}")
slot_lsn = create_logical_replication_slot("slot_#{ubid}")
Prog::Lantern::LanternResourceNexus.assemble(
project_id: project_id,
location: location,
Expand Down
12 changes: 10 additions & 2 deletions model/lantern/lantern_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ def configure_hash
end
end

postgresql_recovery_target_time = resource.recovery_target_lsn ? "" : resource.restore_target || ""
postgresql_recovery_target_lsn = resource.recovery_target_lsn || ""

if standby?
postgresql_recovery_target_time = ""
postgresql_recovery_target_lsn = resource.create_physical_replication_slot(ubid)
end

JSON.generate({
enable_coredumps: true,
skip_deps: vm.boot_image != Config.gcp_default_image,
Expand All @@ -123,8 +131,8 @@ def configure_hash
gcp_creds_logging_b64: Config.gcp_creds_logging_b64,
container_image: "#{Config.gcr_image}:lantern-#{lantern_version}-extras-#{extras_version}-minor-#{minor_version}",
postgresql_recover_from_backup: backup_label,
postgresql_recovery_target_time: resource.recovery_target_lsn ? "" : resource.restore_target || "",
postgresql_recovery_target_lsn: resource.recovery_target_lsn || "",
postgresql_recovery_target_time: postgresql_recovery_target_time,
postgresql_recovery_target_lsn: postgresql_recovery_target_lsn,
gcp_creds_walg_b64: walg_config[:gcp_creds_b64],
walg_gs_prefix: walg_config[:walg_gs_prefix],
gcp_creds_big_query_b64: resource.gcp_creds_b64,
Expand Down
3 changes: 3 additions & 0 deletions prog/lantern/lantern_server_nexus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ def before_run
nap 30 if lag.empty? || lag.to_i > 80 * 1024 * 1024 # 80 MB or ~5 WAL files

lantern_server.update(synchronization_status: "ready")
lantern_server.resource.delete_replication_slot(lantern_server.ubid)
hop_wait_synchronization if lantern_server.resource.ha_type == LanternResource::HaType::SYNC
hop_wait
end
Expand Down Expand Up @@ -559,6 +560,8 @@ def destroy_domain

if lantern_server.primary?
lantern_server.timeline.incr_destroy
else
lantern_server.resource.delete_replication_slot(lantern_server.ubid)
end
lantern_server.destroy

Expand Down
24 changes: 20 additions & 4 deletions spec/model/lantern/lantern_resource_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,27 @@
end

describe "#create_replication_slot" do
it "creates new replication slot" do
it "creates new logical replication slot" do
representative_server = instance_double(LanternServer)
expect(lantern_resource).to receive(:representative_server).and_return(representative_server).at_least(:once)
expect(lantern_resource.representative_server).to receive(:run_query).with("SELECT lsn FROM pg_create_logical_replication_slot('test', 'pgoutput');").and_return("0/6002748 \n")
expect(lantern_resource.create_replication_slot("test")).to eq("0/6002748")
expect(lantern_resource.create_logical_replication_slot("test")).to eq("0/6002748")
end

it "creates new physical replication slot" do
representative_server = instance_double(LanternServer)
expect(lantern_resource).to receive(:representative_server).and_return(representative_server).at_least(:once)
expect(lantern_resource.representative_server).to receive(:run_query).with("SELECT lsn FROM pg_create_physical_replication_slot('test');").and_return("0/6002748 \n")
expect(lantern_resource.create_physical_replication_slot("test")).to eq("0/6002748")
end
end

describe "#drop_replication_slot" do
it "drops replication slot" do
representative_server = instance_double(LanternServer)
expect(lantern_resource).to receive(:representative_server).and_return(representative_server).at_least(:once)
expect(lantern_resource.representative_server).to receive(:run_query).with("SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name='test';")
expect { lantern_resource.delete_replication_slot("test") }.not_to raise_error
end
end

Expand Down Expand Up @@ -221,7 +237,7 @@
timeline = instance_double(LanternTimeline,
latest_restore_time: Time.new)
expect(lantern_resource).to receive(:timeline).and_return(timeline).at_least(:once)
expect(lantern_resource).to receive(:create_replication_slot)
expect(lantern_resource).to receive(:create_logical_replication_slot)
expect(lantern_resource).to receive(:create_ddl_log)
expect(lantern_resource).to receive(:create_publication)
expect(Prog::Lantern::LanternResourceNexus).to receive(:assemble).with(hash_including(
Expand All @@ -243,7 +259,7 @@
timeline = instance_double(LanternTimeline,
latest_restore_time: Time.new)
expect(lantern_resource).to receive(:timeline).and_return(timeline).at_least(:once)
expect(lantern_resource).to receive(:create_replication_slot)
expect(lantern_resource).to receive(:create_logical_replication_slot)
expect(lantern_resource).to receive(:create_ddl_log)
expect(lantern_resource).to receive(:create_publication)
expect(Prog::Lantern::LanternResourceNexus).to receive(:assemble).with(hash_including(
Expand Down
3 changes: 2 additions & 1 deletion spec/model/lantern/lantern_server_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@
expect(lantern_server).to receive(:lantern_version).and_return("0.2.2").at_least(:once)
expect(lantern_server).to receive(:extras_version).and_return("0.1.4").at_least(:once)
expect(lantern_server).to receive(:minor_version).and_return("1").at_least(:once)
expect(lantern_server.resource).to receive(:create_physical_replication_slot).and_return("0/6002748")
expect(vm).to receive(:boot_image).and_return("custom-image").at_least(:once)

walg_conf = timeline.generate_walg_config
Expand Down Expand Up @@ -517,7 +518,7 @@
container_image: "#{Config.gcr_image}:lantern-#{lantern_server.lantern_version}-extras-#{lantern_server.extras_version}-minor-#{lantern_server.minor_version}",
postgresql_recover_from_backup: "LATEST",
postgresql_recovery_target_time: "",
postgresql_recovery_target_lsn: resource.recovery_target_lsn,
postgresql_recovery_target_lsn: "0/6002748",
gcp_creds_walg_b64: walg_conf[:gcp_creds_b64],
walg_gs_prefix: walg_conf[:walg_gs_prefix],
gcp_creds_big_query_b64: resource.gcp_creds_b64,
Expand Down
3 changes: 3 additions & 0 deletions spec/prog/lantern/lantern_server_nexus_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@
expect(lantern_server).to receive(:update).with({synchronization_status: "ready"})
expect(lantern_server.resource).to receive(:representative_server).and_return(leader)
expect(lantern_server.resource).to receive(:ha_type).and_return(LanternResource::HaType::SYNC)
expect(lantern_server.resource).to receive(:delete_replication_slot).with(lantern_server.ubid)
expect(leader).to receive(:run_query).and_return((1 * 1024 * 1024).to_s)
expect { nx.wait_catch_up }.to hop("wait_synchronization")
end
Expand All @@ -318,6 +319,7 @@
expect(lantern_server).to receive(:update).with({synchronization_status: "ready"})
expect(lantern_server.resource).to receive(:representative_server).and_return(leader)
expect(lantern_server.resource).to receive(:ha_type).and_return(LanternResource::HaType::ASYNC)
expect(lantern_server.resource).to receive(:delete_replication_slot).with(lantern_server.ubid)
expect(leader).to receive(:run_query).and_return((1 * 1024 * 1024).to_s)
expect { nx.wait_catch_up }.to hop("wait")
end
Expand Down Expand Up @@ -731,6 +733,7 @@
expect(lantern_server).to receive(:primary?).and_return(false)
expect(lantern_server).to receive(:domain).and_return(nil)
expect(lantern_server).to receive(:destroy)
expect(lantern_server.resource).to receive(:delete_replication_slot).with(lantern_server.ubid)
expect { nx.destroy }.to exit({"msg" => "lantern server was deleted"})
end

Expand Down

0 comments on commit 067fa33

Please sign in to comment.