Skip to content

Commit

Permalink
add automatic logical replication
Browse files Browse the repository at this point in the history
  • Loading branch information
var77 committed Jun 21, 2024
1 parent 3f80908 commit c079729
Show file tree
Hide file tree
Showing 7 changed files with 411 additions and 11 deletions.
3 changes: 3 additions & 0 deletions migrate/20240619_lantern_doctor_queries.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
up do
# percent toward transaction wraparound
sql = <<SQL
WITH max_age AS (
SELECT 2000000000 as max_old_xid,
setting AS autovacuum_freeze_max_age
FROM pg_catalog.pg_settings
Expand All @@ -29,5 +30,7 @@
response_type: "bool",
severity: "warning"
)
# Create semaphores for all lantern doctors to sync system queries
run "INSERT INTO semaphore (id, strand_id, name) SELECT gen_random_uuid(), id, 'sync_system_queries' FROM strand s WHERE s.prog = 'Lantern::LanternDoctorNexus'"
end
end
9 changes: 9 additions & 0 deletions migrate/20240621_lantern_resource_logical_replication.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# frozen_string_literal: true

Sequel.migration do
up do
alter_table(:lantern_resource) do
add_column :logical_replication, :bool, default: false
end
end
end
116 changes: 113 additions & 3 deletions model/lantern/lantern_resource.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class LanternResource < Sequel::Model
include Authorization::HyperTagMethods
include Authorization::TaggableMethods

semaphore :destroy
semaphore :destroy, :swap_leaders_with_parent

plugin :column_encryption do |enc|
enc.column :superuser_password
Expand Down Expand Up @@ -55,8 +55,8 @@ def display_state
super || representative_server&.display_state || "unavailable"
end

def connection_string
representative_server&.connection_string
def connection_string(port: 6432)
representative_server&.connection_string(port: port)
end

def required_standby_count
Expand Down Expand Up @@ -84,6 +84,116 @@ def allow_timeline_access_to_bucket
api.allow_bucket_usage_by_prefix(service_account_name, Config.lantern_backup_bucket, timeline.ubid)
end

def set_to_readonly(status: "on")
representative_server.run_query("
ALTER SYSTEM SET default_transaction_read_only TO #{status};
SELECT pg_reload_conf();
SHOW default_transaction_read_only;
")
end

def create_replication_slot(name)
representative_server.run_query("SELECT lsn FROM pg_create_logical_replication_slot('#{name}', 'pgoutput');").chomp.strip
end

def create_ddl_log
commands = <<SQL
BEGIN;
CREATE TABLE IF NOT EXISTS ddl_log(
id SERIAL PRIMARY KEY,
object_tag TEXT,
ddl_command TEXT,
timestamp TIMESTAMP
);
CREATE OR REPLACE FUNCTION log_ddl_changes()
RETURNS event_trigger AS $$
BEGIN
INSERT INTO ddl_log (object_tag, ddl_command, timestamp)
VALUES (tg_tag, current_query(), current_timestamp);
END;
$$ LANGUAGE plpgsql;
DROP EVENT TRIGGER IF EXISTS log_ddl_trigger;
CREATE EVENT TRIGGER log_ddl_trigger
ON ddl_command_end
EXECUTE FUNCTION log_ddl_changes();
COMMIT;
SQL
representative_server.run_query_all(commands)
end

def listen_ddl_log
commands = <<SQL
DROP EVENT TRIGGER IF EXISTS log_ddl_trigger;
CREATE OR REPLACE FUNCTION execute_ddl_command()
RETURNS TRIGGER AS $$
BEGIN
SET search_path TO public;
EXECUTE NEW.ddl_command;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER execute_ddl_after_insert
AFTER INSERT ON ddl_log
FOR EACH ROW
EXECUTE FUNCTION execute_ddl_command();
SQL
representative_server.run_query_all(commands)
end

def create_publication(name)
representative_server.run_query_all("CREATE PUBLICATION #{name} FOR ALL TABLES")
end

def create_and_enable_subscription
representative_server.list_all_databases.each do |db|
commands = <<SQL
CREATE SUBSCRIPTION sub_#{ubid}
CONNECTION '#{parent.connection_string(port: 5432)}/#{db}'
PUBLICATION pub_#{ubid}
WITH (
copy_data = false,
create_slot = false,
enabled = true,
synchronous_commit = false,
connect = true,
slot_name = 'slot_#{ubid}'
);
SQL
representative_server.run_query(commands)
end
end

def disable_logical_subscription
representative_server.run_query_all("ALTER SUBSCRIPTION sub_#{ubid} DISABLE")
end

def create_logical_replica(lantern_version: nil, extras_version: nil, minor_version: nil)
ubid = LanternResource.generate_ubid
create_publication("pub_#{ubid}")
create_ddl_log
slot_lsn = create_replication_slot("slot_#{ubid}")
Prog::Lantern::LanternResourceNexus.assemble(
project_id: project_id,
location: location,
name: "#{name}-#{Time.now.to_i}",
label: "#{label}-logical",
ubid: ubid,
target_vm_size: representative_server.target_vm_size,
target_storage_size_gib: representative_server.target_storage_size_gib,
parent_id: id,
restore_target: timeline.latest_restore_time.utc.to_s[..-5],
recovery_target_lsn: slot_lsn,
org_id: org_id,
version_upgrade: true,
logical_replication: true,
lantern_version: lantern_version || representative_server.lantern_version,
extras_version: extras_version || representative_server.extras_version,
minor_version: minor_version || representative_server.minor_version
)
end

def create_logging_table
api = Hosting::GcpApis.new
schema = [
Expand Down
4 changes: 2 additions & 2 deletions model/lantern/lantern_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ def hostname
vm.sshable.host
end

def connection_string
def connection_string(port: 6432)
return nil unless (hn = hostname)
URI::Generic.build2(
scheme: "postgres",
userinfo: "postgres:#{URI.encode_uri_component(resource.superuser_password)}",
host: hn,
port: 6432
port: port
).to_s
end

Expand Down
73 changes: 68 additions & 5 deletions prog/lantern/lantern_resource_nexus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,16 @@ class Prog::Lantern::LanternResourceNexus < Prog::Base
extend Forwardable
def_delegators :lantern_resource, :servers, :representative_server

semaphore :destroy
semaphore :destroy, :swap_leaders_with_parent

def self.assemble(project_id:, location:, name:, target_vm_size:, target_storage_size_gib:, ha_type: LanternResource::HaType::NONE, parent_id: nil, restore_target: nil, recovery_target_lsn: nil,
def self.assemble(project_id:, location:, name:, target_vm_size:, target_storage_size_gib:, ubid: LanternResource.generate_ubid, ha_type: LanternResource::HaType::NONE, parent_id: nil, restore_target: nil, recovery_target_lsn: nil,
org_id: nil, db_name: "postgres", db_user: "postgres", db_user_password: nil, superuser_password: nil, repl_password: nil, app_env: Config.rack_env,
lantern_version: Config.lantern_default_version, extras_version: Config.lantern_extras_default_version, minor_version: Config.lantern_minor_default_version, domain: nil, enable_debug: false,
label: "", version_upgrade: false)
label: "", version_upgrade: false, logical_replication: false)
unless (project = Project[project_id])
fail "No existing project"
end

ubid = LanternResource.generate_ubid
name ||= LanternResource.ubid_to_name(ubid)

Validation.validate_location(location, project.provider)
Expand Down Expand Up @@ -84,7 +83,8 @@ def self.assemble(project_id:, location:, name:, target_vm_size:, target_storage
superuser_password: superuser_password, ha_type: ha_type, parent_id: parent_id,
restore_target: restore_target, db_name: db_name, db_user: db_user,
db_user_password: db_user_password, repl_user: repl_user, repl_password: repl_password,
label: label, doctor_id: lantern_doctor.id, recovery_target_lsn: recovery_target_lsn, version_upgrade: version_upgrade
label: label, doctor_id: lantern_doctor.id, recovery_target_lsn: recovery_target_lsn, version_upgrade: version_upgrade,
logical_replication: logical_replication
) { _1.id = ubid.to_uuid }
lantern_resource.associate_with_project(project)

Expand Down Expand Up @@ -155,6 +155,17 @@ def before_run
label def wait_servers
lantern_resource.set_failed_on_deadline
nap 5 if servers.any? { _1.strand.label != "wait" }

if lantern_resource.logical_replication
hop_enable_logical_replication
end

hop_wait
end

label def enable_logical_replication
lantern_resource.listen_ddl_log
lantern_resource.create_and_enable_subscription
hop_wait
end

Expand All @@ -177,9 +188,61 @@ def before_run
lantern_resource.update(display_state: nil)
end

when_swap_leaders_with_parent_set? do
if lantern_resource.parent.nil?
decr_swap_leaders_with_parent
else
lantern_resource.update(display_state: "failover")
lantern_resource.parent.update(display_state: "failover")
register_deadline(:wait, 10 * 60)
hop_swap_leaders_with_parent
end
end

nap 30
end

label def update_hosts
current_master = lantern_resource.parent.representative_server
current_master_domain = current_master.domain
new_master_domain = lantern_resource.representative_server.domain

lantern_resource.representative_server.update(domain: current_master_domain)
current_master.update(domain: new_master_domain)

# update display_states
lantern_resource.update(display_state: nil)
lantern_resource.parent.update(display_state: nil)

# remove fork association so parent can be deleted
lantern_resource.update(parent_id: nil)
lantern_resource.timeline.update(parent_id: nil)
hop_wait
end

label def wait_swap_ip
ready = false
begin
lantern_resource.representative_server.run_query("SELECT 1")
ready = true
rescue
end

if ready
hop_update_hosts
else
nap 5
end
end

label def swap_leaders_with_parent
decr_swap_leaders_with_parent
lantern_resource.parent.set_to_readonly
lantern_resource.disable_logical_subscription
lantern_resource.representative_server.vm.swap_ip(lantern_resource.parent.representative_server.vm)
hop_wait_swap_ip
end

label def destroy
register_deadline(nil, 5 * 60)

Expand Down
Loading

0 comments on commit c079729

Please sign in to comment.