Skip to content

Commit

Permalink
watch logical replication after enabling, to refresh subscription eac…
Browse files Browse the repository at this point in the history
…h 20s
  • Loading branch information
var77 committed Nov 20, 2024
1 parent 4d7e276 commit ca695a5
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 1 deletion.
1 change: 0 additions & 1 deletion model/lantern/lantern_resource.rb
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ def listen_ddl_log
EXECUTE format('SET ROLE %I', NEW.session_user);
EXECUTE NEW.ddl_command;
RESET ROLE;
ALTER SUBSCRIPTION sub_#{ubid} REFRESH PUBLICATION;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
Expand Down
20 changes: 20 additions & 0 deletions prog/lantern/lantern_resource_nexus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,27 @@ def before_run
label def enable_logical_replication
lantern_resource.listen_ddl_log
lantern_resource.create_and_enable_subscription
bud Prog::Lantern::LanternResourceNexus, {"subscription" => "sub_#{lantern_resource.ubid}"}, "watch_logical_replication"
hop_wait
end

label def watch_logical_replication
if !lantern_resource.logical_replication
pop "logical replication disabled"
end

sub = frame["subscription"]
subscription_exists = !lantern_resource.representative_server.run_query("SELECT subname FROM pg_subscription WHERE subname='#{sub}'").empty?

if !subscription_exists
pop "subscription deleted"
end

lantern_resource.representative_server.run_query("ALTER SUBSCRIPTION #{sub} REFRESH PUBLICATION")

nap 20
end

label def wait
# Create missing standbys
(lantern_resource.required_standby_count + 1 - lantern_resource.servers.count).times do
Expand Down Expand Up @@ -232,6 +250,8 @@ def before_run
end
end

reap

nap 30
end

Expand Down
26 changes: 26 additions & 0 deletions spec/prog/lantern/lantern_resource_nexus_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -348,10 +348,36 @@
it "enables logical replication" do
expect(lantern_resource).to receive(:listen_ddl_log)
expect(lantern_resource).to receive(:create_and_enable_subscription)
expect(nx).to receive(:bud).with(described_class, {"subscription" => "sub_#{lantern_resource.ubid}"}, "watch_logical_replication")
expect { nx.enable_logical_replication }.to hop("wait")
end
end

describe "#watch_logical_replication" do
it "pops if not in logical replication" do
expect(lantern_resource).to receive(:logical_replication).and_return(false)
expect { nx.watch_logical_replication }.to exit({"msg" => "logical replication disabled"})
end

it "pops if subscription is deleted" do
expect(lantern_resource).to receive(:logical_replication).and_return(true)
representative_server = instance_double(LanternServer)
expect(lantern_resource).to receive(:representative_server).and_return(representative_server).at_least(:once)
expect(representative_server).to receive(:run_query).and_return("")
expect { nx.watch_logical_replication }.to exit({"msg" => "subscription deleted"})
end

it "refreshes subscription" do
expect(lantern_resource).to receive(:logical_replication).and_return(true)
representative_server = instance_double(LanternServer)
expect(lantern_resource).to receive(:representative_server).and_return(representative_server).at_least(:once)
expect(representative_server).to receive(:run_query).and_return("sub")
expect(nx).to receive("frame").and_return({"subscription" => "test_sub"})
expect(representative_server).to receive(:run_query).with("ALTER SUBSCRIPTION test_sub REFRESH PUBLICATION")
expect { nx.watch_logical_replication }.to nap 20
end
end

describe "#swap_leaders_with_parent" do
it "swaps ips with parent leader" do
parent = instance_double(LanternResource)
Expand Down

0 comments on commit ca695a5

Please sign in to comment.