Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

logical replication main #62

Merged
merged 4 commits into from
Jun 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions migrate/20240619_lantern_doctor_queries.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
up do
# percent toward transaction wraparound
sql = <<SQL
WITH max_age AS (
SELECT 2000000000 as max_old_xid,
setting AS autovacuum_freeze_max_age
FROM pg_catalog.pg_settings
Expand All @@ -29,5 +30,7 @@
response_type: "bool",
severity: "warning"
)
# Create semaphores for all lantern doctors to sync system queries
run "INSERT INTO semaphore (id, strand_id, name) SELECT gen_random_uuid(), id, 'sync_system_queries' FROM strand s WHERE s.prog = 'Lantern::LanternDoctorNexus'"
end
end
9 changes: 9 additions & 0 deletions migrate/20240621_lantern_resource_logical_replication.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# frozen_string_literal: true

Sequel.migration do
up do
alter_table(:lantern_resource) do
add_column :logical_replication, :bool, default: false
end
end
end
138 changes: 135 additions & 3 deletions model/lantern/lantern_resource.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class LanternResource < Sequel::Model
include Authorization::HyperTagMethods
include Authorization::TaggableMethods

semaphore :destroy
semaphore :destroy, :swap_leaders_with_parent

plugin :column_encryption do |enc|
enc.column :superuser_password
Expand Down Expand Up @@ -55,8 +55,8 @@ def display_state
super || representative_server&.display_state || "unavailable"
end

def connection_string
representative_server&.connection_string
def connection_string(port: 6432)
representative_server&.connection_string(port: port)
end

def required_standby_count
Expand Down Expand Up @@ -84,6 +84,138 @@ def allow_timeline_access_to_bucket
api.allow_bucket_usage_by_prefix(service_account_name, Config.lantern_backup_bucket, timeline.ubid)
end

def set_to_readonly(status: "on")
representative_server.run_query("
ALTER SYSTEM SET default_transaction_read_only TO #{status};
SELECT pg_reload_conf();
SHOW default_transaction_read_only;
")
end

def create_replication_slot(name)
representative_server.run_query("SELECT lsn FROM pg_create_logical_replication_slot('#{name}', 'pgoutput');").chomp.strip
end

def create_ddl_log
commands = <<SQL
BEGIN;
CREATE TABLE IF NOT EXISTS ddl_log(
id SERIAL PRIMARY KEY,
object_tag TEXT,
ddl_command TEXT,
timestamp TIMESTAMP
);
CREATE OR REPLACE FUNCTION log_ddl_changes()
RETURNS event_trigger AS $$
BEGIN
INSERT INTO ddl_log (object_tag, ddl_command, timestamp)
VALUES (tg_tag, current_query(), current_timestamp);
END;
$$ LANGUAGE plpgsql;

DROP EVENT TRIGGER IF EXISTS log_ddl_trigger;
CREATE EVENT TRIGGER log_ddl_trigger
ON ddl_command_end
EXECUTE FUNCTION log_ddl_changes();
COMMIT;
SQL
representative_server.run_query_all(commands)
end

def listen_ddl_log
commands = <<SQL
DROP EVENT TRIGGER IF EXISTS log_ddl_trigger;
TRUNCATE TABLE ddl_log RESTART IDENTITY;
CREATE OR REPLACE FUNCTION execute_ddl_command()
RETURNS TRIGGER AS $$
BEGIN
SET search_path TO public;
EXECUTE NEW.ddl_command;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER execute_ddl_after_insert
AFTER INSERT ON ddl_log
FOR EACH ROW
EXECUTE FUNCTION execute_ddl_command();
ALTER TABLE ddl_log ENABLE REPLICA TRIGGER execute_ddl_after_insert;
SQL
representative_server.run_query_all(commands)
end

def create_publication(name)
representative_server.run_query_all("CREATE PUBLICATION #{name} FOR ALL TABLES")
end

def sync_sequences_with_parent
representative_server.list_all_databases.each do |db|
res = parent.representative_server.run_query("
SELECT sequence_schema, sequence_name, last_value
FROM information_schema.sequences
JOIN pg_sequences
ON (information_schema.sequences.sequence_schema = pg_sequences.schemaname
AND information_schema.sequences.sequence_name = pg_sequences.sequencename)
WHERE last_value > 0;", db: db)

statements = res.chomp.strip.split("\n").map do |row|
values = row.split(",")
"SELECT setval('#{values[0]}.#{values[1]}', #{values[2]});"
end

representative_server.run_query(statements, db: db)
end
end

def create_and_enable_subscription
representative_server.list_all_databases.each do |db|
commands = <<SQL
CREATE SUBSCRIPTION sub_#{ubid}
CONNECTION '#{parent.connection_string(port: 5432)}/#{db}'
PUBLICATION pub_#{ubid}
WITH (
copy_data = false,
create_slot = false,
binary = true,
enabled = true,
synchronous_commit = false,
connect = true,
slot_name = 'slot_#{ubid}'
);
SQL
representative_server.run_query(commands, db: db)
end
end

def disable_logical_subscription
representative_server.run_query_all("ALTER SUBSCRIPTION sub_#{ubid} DISABLE")
end

def create_logical_replica(lantern_version: nil, extras_version: nil, minor_version: nil)
ubid = LanternResource.generate_ubid
create_ddl_log
create_publication("pub_#{ubid}")
slot_lsn = create_replication_slot("slot_#{ubid}")
Prog::Lantern::LanternResourceNexus.assemble(
project_id: project_id,
location: location,
name: "#{name}-#{Time.now.to_i}",
label: "#{label}-logical",
ubid: ubid,
target_vm_size: representative_server.target_vm_size,
target_storage_size_gib: representative_server.target_storage_size_gib,
parent_id: id,
restore_target: timeline.latest_restore_time.utc.to_s[..-5],
recovery_target_lsn: slot_lsn,
org_id: org_id,
version_upgrade: true,
logical_replication: true,
lantern_version: lantern_version || representative_server.lantern_version,
extras_version: extras_version || representative_server.extras_version,
minor_version: minor_version || representative_server.minor_version
)
end

def create_logging_table
api = Hosting::GcpApis.new
schema = [
Expand Down
4 changes: 2 additions & 2 deletions model/lantern/lantern_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ def hostname
vm.sshable.host
end

def connection_string
def connection_string(port: 6432)
return nil unless (hn = hostname)
URI::Generic.build2(
scheme: "postgres",
userinfo: "postgres:#{URI.encode_uri_component(resource.superuser_password)}",
host: hn,
port: 6432
port: port
).to_s
end

Expand Down
74 changes: 69 additions & 5 deletions prog/lantern/lantern_resource_nexus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,16 @@ class Prog::Lantern::LanternResourceNexus < Prog::Base
extend Forwardable
def_delegators :lantern_resource, :servers, :representative_server

semaphore :destroy
semaphore :destroy, :swap_leaders_with_parent

def self.assemble(project_id:, location:, name:, target_vm_size:, target_storage_size_gib:, ha_type: LanternResource::HaType::NONE, parent_id: nil, restore_target: nil, recovery_target_lsn: nil,
def self.assemble(project_id:, location:, name:, target_vm_size:, target_storage_size_gib:, ubid: LanternResource.generate_ubid, ha_type: LanternResource::HaType::NONE, parent_id: nil, restore_target: nil, recovery_target_lsn: nil,
org_id: nil, db_name: "postgres", db_user: "postgres", db_user_password: nil, superuser_password: nil, repl_password: nil, app_env: Config.rack_env,
lantern_version: Config.lantern_default_version, extras_version: Config.lantern_extras_default_version, minor_version: Config.lantern_minor_default_version, domain: nil, enable_debug: false,
label: "", version_upgrade: false)
label: "", version_upgrade: false, logical_replication: false)
unless (project = Project[project_id])
fail "No existing project"
end

ubid = LanternResource.generate_ubid
name ||= LanternResource.ubid_to_name(ubid)

Validation.validate_location(location, project.provider)
Expand Down Expand Up @@ -84,7 +83,8 @@ def self.assemble(project_id:, location:, name:, target_vm_size:, target_storage
superuser_password: superuser_password, ha_type: ha_type, parent_id: parent_id,
restore_target: restore_target, db_name: db_name, db_user: db_user,
db_user_password: db_user_password, repl_user: repl_user, repl_password: repl_password,
label: label, doctor_id: lantern_doctor.id, recovery_target_lsn: recovery_target_lsn, version_upgrade: version_upgrade
label: label, doctor_id: lantern_doctor.id, recovery_target_lsn: recovery_target_lsn, version_upgrade: version_upgrade,
logical_replication: logical_replication
) { _1.id = ubid.to_uuid }
lantern_resource.associate_with_project(project)

Expand Down Expand Up @@ -155,6 +155,17 @@ def before_run
label def wait_servers
lantern_resource.set_failed_on_deadline
nap 5 if servers.any? { _1.strand.label != "wait" }

if lantern_resource.logical_replication
hop_enable_logical_replication
end

hop_wait
end

label def enable_logical_replication
lantern_resource.listen_ddl_log
lantern_resource.create_and_enable_subscription
hop_wait
end

Expand All @@ -177,9 +188,62 @@ def before_run
lantern_resource.update(display_state: nil)
end

when_swap_leaders_with_parent_set? do
if lantern_resource.parent.nil?
decr_swap_leaders_with_parent
else
lantern_resource.update(display_state: "failover")
lantern_resource.parent.update(display_state: "failover")
register_deadline(:wait, 10 * 60)
hop_swap_leaders_with_parent
end
end

nap 30
end

label def update_hosts
current_master = lantern_resource.parent.representative_server
current_master_domain = current_master.domain
new_master_domain = lantern_resource.representative_server.domain

lantern_resource.representative_server.update(domain: current_master_domain)
current_master.update(domain: new_master_domain)

# update display_states
lantern_resource.update(display_state: nil)
lantern_resource.parent.update(display_state: nil)

# remove fork association so parent can be deleted
lantern_resource.update(parent_id: nil)
lantern_resource.timeline.update(parent_id: nil)
hop_wait
end

label def wait_swap_ip
ready = false
begin
lantern_resource.representative_server.run_query("SELECT 1")
ready = true
rescue
end

if ready
hop_update_hosts
else
nap 5
end
end

label def swap_leaders_with_parent
decr_swap_leaders_with_parent
lantern_resource.parent.set_to_readonly
lantern_resource.disable_logical_subscription
lantern_resource.sync_sequences_with_parent
lantern_resource.representative_server.vm.swap_ip(lantern_resource.parent.representative_server.vm)
hop_wait_swap_ip
end

label def destroy
register_deadline(nil, 5 * 60)

Expand Down
Loading