Skip to content

Commit

Permalink
reset sequences before failover on logical replica
Browse files Browse the repository at this point in the history
  • Loading branch information
var77 committed Jun 22, 2024
1 parent 7906a17 commit 95a6ef1
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 25 deletions.
20 changes: 19 additions & 1 deletion model/lantern/lantern_resource.rb
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,24 @@ def create_publication(name)
representative_server.run_query_all("CREATE PUBLICATION #{name} FOR ALL TABLES")
end

def sync_sequences_with_parent
representative_server.list_all_databases.each do |db|
res = parent.representative_server.run_query("SELECT sequence_schema, sequence_name, last_value
FROM information_schema.sequences
JOIN pg_sequences
ON (information_schema.sequences.sequence_schema = pg_sequences.schemaname
AND information_schema.sequences.sequence_name = pg_sequences.sequencename);", db: db)

statements = res.chomp.strip.split("\n").map do |row|
values = row.split(",")
{schema: values[0], sequence: values[1], last_value: values[2]}
"SELECT setval('#{values[0]}.#{values[1]}', #{values[2]});"
end

representative_server.run_query(statements, db: db)
end
end

def create_and_enable_subscription
representative_server.list_all_databases.each do |db|
commands = <<SQL
Expand All @@ -164,7 +182,7 @@ def create_and_enable_subscription
slot_name = 'slot_#{ubid}'
);
SQL
representative_server.run_query(commands)
representative_server.run_query(commands, db: db)
end
end

Expand Down
1 change: 1 addition & 0 deletions prog/lantern/lantern_resource_nexus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ def before_run
decr_swap_leaders_with_parent
lantern_resource.parent.set_to_readonly
lantern_resource.disable_logical_subscription
lantern_resource.sync_sequences_with_parent
lantern_resource.representative_server.vm.swap_ip(lantern_resource.parent.representative_server.vm)
hop_wait_swap_ip
end
Expand Down
75 changes: 51 additions & 24 deletions spec/model/lantern/lantern_resource_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,11 @@
it "creates new subscription" do
representative_server = instance_double(LanternServer)
expect(lantern_resource).to receive(:representative_server).and_return(representative_server).at_least(:once)
expect(lantern_resource.representative_server).to receive(:list_all_databases).and_return(["db1"])
expect(lantern_resource.representative_server).to receive(:run_query).with(a_string_matching(/CREATE SUBSCRIPTION/))
expect(lantern_resource).to receive(:connection_string).and_return("postgres://localhost:5432")
expect(lantern_resource).to receive(:parent).and_return(lantern_resource)
expect(lantern_resource.representative_server).to receive(:list_all_databases).and_return(["db1", "db2"])
expect(lantern_resource.representative_server).to receive(:run_query).with(a_string_matching(/CREATE SUBSCRIPTION/), db: "db1")
expect(lantern_resource.representative_server).to receive(:run_query).with(a_string_matching(/CREATE SUBSCRIPTION/), db: "db2")
expect(lantern_resource).to receive(:connection_string).and_return("postgres://localhost:5432").at_least(:once)
expect(lantern_resource).to receive(:parent).and_return(lantern_resource).at_least(:once)
expect { lantern_resource.create_and_enable_subscription }.not_to raise_error
end
end
Expand Down Expand Up @@ -233,27 +234,53 @@
))
expect { lantern_resource.create_logical_replica }.not_to raise_error
end

it "create logical replica with specified version" do
representative_server = instance_double(LanternServer,
target_vm_size: "n1-standard-1",
target_storage_size_gib: 120)
expect(lantern_resource).to receive(:representative_server).and_return(representative_server).at_least(:once)
timeline = instance_double(LanternTimeline,
latest_restore_time: Time.new)
expect(lantern_resource).to receive(:timeline).and_return(timeline).at_least(:once)
expect(lantern_resource).to receive(:create_replication_slot)
expect(lantern_resource).to receive(:create_ddl_log)
expect(lantern_resource).to receive(:create_publication)
expect(Prog::Lantern::LanternResourceNexus).to receive(:assemble).with(hash_including(
parent_id: lantern_resource.id,
version_upgrade: true,
logical_replication: true,
lantern_version: "0.3.0",
extras_version: "0.2.6",
minor_version: "1"
))
expect { lantern_resource.create_logical_replica(lantern_version: "0.3.0", extras_version: "0.2.6", minor_version: "1") }.not_to raise_error
end
end

it "create logical replica with specified version" do
representative_server = instance_double(LanternServer,
target_vm_size: "n1-standard-1",
target_storage_size_gib: 120)
expect(lantern_resource).to receive(:representative_server).and_return(representative_server).at_least(:once)
timeline = instance_double(LanternTimeline,
latest_restore_time: Time.new)
expect(lantern_resource).to receive(:timeline).and_return(timeline).at_least(:once)
expect(lantern_resource).to receive(:create_replication_slot)
expect(lantern_resource).to receive(:create_ddl_log)
expect(lantern_resource).to receive(:create_publication)
expect(Prog::Lantern::LanternResourceNexus).to receive(:assemble).with(hash_including(
parent_id: lantern_resource.id,
version_upgrade: true,
logical_replication: true,
lantern_version: "0.3.0",
extras_version: "0.2.6",
minor_version: "1"
))
expect { lantern_resource.create_logical_replica(lantern_version: "0.3.0", extras_version: "0.2.6", minor_version: "1") }.not_to raise_error
describe "#sync_sequences_with_parent" do
it "syncs sequences with parent" do
representative_server = instance_double(LanternServer)
parent_representative_server = instance_double(LanternServer)
parent = instance_double(described_class, representative_server: parent_representative_server)
databases = ["db1", "db2"]
query_result = "public,seq1,100\npublic,seq2,200"

allow(lantern_resource).to receive_messages(representative_server: representative_server, parent: parent)
allow(representative_server).to receive(:list_all_databases).and_return(databases)
allow(parent_representative_server).to receive(:run_query).with(anything, db: "db1").and_return(query_result)
allow(parent_representative_server).to receive(:run_query).with(anything, db: "db2").and_return(query_result)

statements_db1 = [
"SELECT setval('public.seq1', 100);",
"SELECT setval('public.seq2', 200);"
]
statements_db2 = statements_db1 # identical statements for the test

expect(representative_server).to receive(:run_query).with(statements_db1, db: "db1")
expect(representative_server).to receive(:run_query).with(statements_db2, db: "db2")

expect { lantern_resource.sync_sequences_with_parent }.not_to raise_error
end
end
end
1 change: 1 addition & 0 deletions spec/prog/lantern/lantern_resource_nexus_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@
expect(parent).to receive(:representative_server).and_return(representative_server)
expect(lantern_resource).to receive(:representative_server).and_return(representative_server).at_least(:once)
expect(lantern_resource).to receive(:disable_logical_subscription)
expect(lantern_resource).to receive(:sync_sequences_with_parent)
expect(representative_server).to receive(:vm).and_return(vm).at_least(:once)
expect(vm).to receive(:swap_ip)
expect(lantern_resource).to receive(:parent).and_return(parent).at_least(:once)
Expand Down

0 comments on commit 95a6ef1

Please sign in to comment.