Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lantern staging #14

Merged
merged 6 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions lib/hosting/gcp_apis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,10 @@ def resize_vm_disk(zone, disk_source, storage_size_gib)
wait_for_operation(zone, data["id"])
end

def list_objects(bucket, prefix)
def list_objects(bucket, pattern)
connection = Excon.new("https://storage.googleapis.com", headers: @host[:headers])
query = {prefix: prefix}
query = {matchGlob: pattern, delimiter: "/"}

response = connection.get(path: "/storage/v1/b/#{bucket}/o", query: query, expects: [200, 400])
Hosting::GcpApis.check_errors(response)
data = JSON.parse(response.body)
Expand All @@ -247,7 +248,8 @@ def list_objects(bucket, prefix)
return []
end

data["items"].map { |hsh| {key: hsh["name"], last_modified: Time.new(hsh["updated"])} }
# customTime will be only present when we migrate a bucket to another bucket
data["items"].map { |hsh| {key: hsh["name"], last_modified: Time.new(hsh["customTime"] || hsh["updated"])} }
end

def get_json_object(bucket, object)
Expand Down
6 changes: 3 additions & 3 deletions lib/option.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ def self.lantern_locations_for_provider(provider)
VmSize = Struct.new(:name, :family, :vcpu, :memory, :storage_size_gib) do
alias_method :display_name, :name
end
VmSizes = [2, 4, 8, 16, 32, 64].map {
VmSizes = [1, 2, 4, 8, 16, 32, 64].map {
VmSize.new("n1-standard-#{_1}", "n1-standard", _1, _1 * 4, (_1 / 2) * 25)
}.freeze

LanternSize = Struct.new(:name, :vm_size, :family, :vcpu, :memory, :storage_size_gib) do
alias_method :display_name, :name
end

LanternSizes = [2, 4, 8, 16, 32, 64].map {
LanternSize.new("n1-standard-#{_1}", "n1-standard-#{_1}", "n1-standard", _1, _1 * 4, (_1 / 2) * 128)
LanternSizes = [1, 2, 4, 8, 16, 32, 64].map {
LanternSize.new("n1-standard-#{_1}", "n1-standard-#{_1}", "n1-standard", _1, _1 * 4, _1 * 64)
}.freeze

LanternHaOption = Struct.new(:name, :standby_count, :title, :explanation)
Expand Down
9 changes: 9 additions & 0 deletions migrate/20240502_lantern_resource_display_state.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# frozen_string_literal: true

Sequel.migration do
change do
alter_table(:lantern_resource) do
add_column :display_state, :text, null: true
end
end
end
9 changes: 8 additions & 1 deletion misc/misc_queries.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,14 @@ def self.update_collation_on_all_databases
# We didn't need to update rebuild indexes this time as we didn't have any indexes with collation coming from libc
resources = LanternResource.all
resources.each do |resource|
resource.representative_server.run_query("ALTER DATABASE template1 REFRESH COLLATION VERSION")
update_collation resource
end
end

def self.update_collation(resource)
all_dbs = resource.representative_server.run_query("SELECT datname from pg_database WHERE datname != 'template0'").split("\n")
all_dbs.each do |db|
resource.representative_server.run_query("ALTER DATABASE #{db} REFRESH COLLATION VERSION")
end
end
end
5 changes: 0 additions & 5 deletions model/gcp_vm.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,6 @@ def host
sshable&.host
end

def display_state
return "deleting" if destroy_set?
super
end

def mem_gib_ratio
return 3.2 if arch == "arm64"
8
Expand Down
10 changes: 9 additions & 1 deletion model/lantern/lantern_resource.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ class LanternResource < Sequel::Model
one_to_one :strand, key: :id
many_to_one :project
many_to_one :parent, key: :parent_id, class: self
one_to_many :forks, key: :parent_id, class: self
one_to_many :servers, class: LanternServer, key: :resource_id
one_to_one :representative_server, class: LanternServer, key: :resource_id, conditions: Sequel.~(representative_at: nil)
one_through_one :timeline, class: LanternTimeline, join_table: :lantern_server, left_key: :resource_id, right_key: :timeline_id
Expand Down Expand Up @@ -39,7 +40,7 @@ def path
end

def display_state
representative_server&.display_state || "unavailable"
super || representative_server&.display_state || "unavailable"
end

def connection_string
Expand All @@ -51,6 +52,13 @@ def required_standby_count
required_standby_count_map[ha_type]
end

def dissociate_forks
forks.each {
_1.update(parent_id: nil)
_1.timeline.update(parent_id: nil)
}
end

module HaType
NONE = "none"
ASYNC = "async"
Expand Down
14 changes: 14 additions & 0 deletions model/lantern/lantern_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ def display_state
return "domain setup" if strand.label.include?("domain")
return "ssl setup" if strand.label.include?("setup_ssl")
return "updating" if strand.label.include?("update")
return "updating" if strand.label.include?("init_sql")
return "stopped" if vm.display_state.include?("stopped")
return "stopping" if vm.display_state.include?("stopping")
return "starting" if vm.display_state.include?("starting")
return "failed" if vm.display_state.include?("failed")
return "unavailable" if strand.label.include?("wait_db_available")
return "running" if ["wait"].include?(strand.label)
return "deleting" if destroy_set? || strand.label == "destroy"
Expand Down Expand Up @@ -137,12 +142,21 @@ def container_image
end

def init_health_monitor_session
if strand.label != "wait"
fail "server is not ready to initialize session"
end

{
db_connection: nil
}
end

def check_pulse(session:, previous_pulse:)
if display_state != "running"
# if there's an operation ongoing, do not check the pulse
return previous_pulse
end

reading = begin
session[:db_connection] ||= Sequel.connect(connection_string)
lsn_function = primary? ? "pg_current_wal_lsn()" : "pg_last_wal_receive_lsn()"
Expand Down
21 changes: 14 additions & 7 deletions model/lantern/lantern_timeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,24 @@ def take_manual_backup

def backups
blob_storage_client
.list_objects(Config.lantern_backup_bucket, "#{ubid}/basebackups_005/")
.select { _1[:key].end_with?("backup_stop_sentinel.json") }
.list_objects(Config.lantern_backup_bucket, "#{ubid}/basebackups_005/*_backup_stop_sentinel.json")
end

def backups_with_metadata
storage_client = blob_storage_client
backups
.map {
metadata = storage_client.get_json_object(Config.lantern_backup_bucket, _1[:key])
{**_1, compressed_size: metadata["CompressedSize"], uncompressed_size: metadata["UncompressedSize"]}
}
mutex = Mutex.new
thread_count = 8
backup_list = backups
results = []
Array.new(thread_count) {
Thread.new(backup_list, results) do |backup_list, results|
while (backup = mutex.synchronize { backup_list.pop })
metadata = storage_client.get_json_object(Config.lantern_backup_bucket, backup[:key])
mutex.synchronize { results << {**backup, compressed_size: metadata["CompressedSize"], uncompressed_size: metadata["UncompressedSize"]} }
end
end
}.each(&:join)
results
end

def get_backup_label(key)
Expand Down
44 changes: 27 additions & 17 deletions prog/gcp_vm/nexus.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# frozen_string_literal: true

require "forwardable"
require "netaddr"
require "json"
require "shellwords"
Expand All @@ -10,6 +11,10 @@

class Prog::GcpVm::Nexus < Prog::Base
subject_is :gcp_vm

extend Forwardable
def_delegators :gcp_vm

semaphore :destroy, :start_vm, :stop_vm, :update_storage, :update_size

def self.assemble(public_key, project_id, name: nil, size: "n1-standard-2",
Expand Down Expand Up @@ -85,10 +90,14 @@ def host
end

label def start
register_deadline(:failed_provisioning, 10 * 60)
hop_create_vm
end

label def create_vm
gcp_client = Hosting::GcpApis.new
labels = frame["labels"]
gcp_client.create_vm(gcp_vm.name, "#{gcp_vm.location}-a", gcp_vm.boot_image, gcp_vm.public_key, gcp_vm.unix_user, "#{gcp_vm.family}-#{gcp_vm.cores}", gcp_vm.storage_size_gib, labels: labels)
register_deadline(:wait, 10 * 60)

# remove labels from stack
current_frame = strand.stack.first
Expand All @@ -99,6 +108,11 @@ def host
hop_wait_create_vm
end

label def failed_provisioning
gcp_vm.update(display_state: "failed")
hop_wait
end

label def wait_sshable
addr = gcp_vm.sshable.host

Expand All @@ -123,23 +137,30 @@ def host
label def wait
when_stop_vm_set? do
register_deadline(:wait, 5 * 60)
gcp_vm.update(display_state: "stopping")
hop_stop_vm
end

when_start_vm_set? do
register_deadline(:wait, 5 * 60)
gcp_vm.update(display_state: "starting")
hop_start_vm
end

when_destroy_set? do
gcp_vm.update(display_state: "deleting")
hop_destroy
end

when_update_size_set? do
register_deadline(:wait, 5 * 60)
gcp_vm.update(display_state: "updating")
hop_update_size
end

when_update_storage_set? do
register_deadline(:wait, 5 * 60)
gcp_vm.update(display_state: "updating")
hop_update_storage
end

Expand All @@ -157,8 +178,6 @@ def host
end

label def stop_vm
gcp_vm.update(display_state: "stopping")

gcp_client = Hosting::GcpApis.new
gcp_client.stop_vm(gcp_vm.name, "#{gcp_vm.location}-a")

Expand All @@ -168,13 +187,9 @@ def host
end

label def start_vm
gcp_vm.update(display_state: "starting")

gcp_client = Hosting::GcpApis.new
gcp_client.start_vm(gcp_vm.name, "#{gcp_vm.location}-a")

gcp_vm.update(display_state: "running")

decr_start_vm

hop_wait_sshable
Expand All @@ -185,7 +200,6 @@ def host
hop_stop_vm
end
decr_update_storage
register_deadline(:wait, 5 * 60)
gcp_vm.update(display_state: "updating")
gcp_client = Hosting::GcpApis.new
zone = "#{gcp_vm.location}-a"
Expand All @@ -205,12 +219,12 @@ def host
hop_stop_vm
end
decr_update_size
register_deadline(:wait, 5 * 60)
gcp_vm.update(display_state: "updating")
gcp_client = Hosting::GcpApis.new
gcp_client.update_vm_type(gcp_vm.name, "#{gcp_vm.location}-a", gcp_vm.display_size)

when_update_storage_set? do
register_deadline(:wait, 5 * 60)
hop_update_storage
end

Expand All @@ -219,14 +233,10 @@ def host

label def destroy
DB.transaction do
gcp_vm.update(display_state: "deleting")
# TODO:: disable vm deletion until stable release
if !Config.production? || Config.e2e_test?
gcp_client = Hosting::GcpApis.new
gcp_client.delete_vm(gcp_vm.name, "#{gcp_vm.location}-a")
if gcp_vm.has_static_ipv4
gcp_client.release_ipv4(gcp_vm.name, gcp_vm.location)
end
gcp_client = Hosting::GcpApis.new
gcp_client.delete_vm(gcp_vm.name, "#{gcp_vm.location}-a")
if gcp_vm.has_static_ipv4
gcp_client.release_ipv4(gcp_vm.name, gcp_vm.location)
end
strand.children.each { _1.destroy }
gcp_vm.projects.map { gcp_vm.dissociate_with_project(_1) }
Expand Down
11 changes: 10 additions & 1 deletion prog/lantern/lantern_resource_nexus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,18 @@ def before_run

label def start
nap 5 unless representative_server.vm.strand.label == "wait"
register_deadline(:wait, 10 * 60)
register_deadline(:failed_provisioning, 10 * 60)
# bud self.class, frame, :trigger_pg_current_xact_id_on_parent if lantern_resource.parent

# hop_wait_trigger_pg_current_xact_id_on_parent
hop_wait_servers
end

label def failed_provisioning
lantern_resource.update(display_state: "failed")
hop_wait
end

# TODO:: check why is this needed
# label def trigger_pg_current_xact_id_on_parent
# lantern_resource.parent.representative_server.run_query("SELECT pg_current_xact_id()")
Expand All @@ -142,6 +147,10 @@ def before_run
Prog::Lantern::LanternServerNexus.assemble(resource_id: lantern_resource.id, timeline_id: lantern_resource.timeline.id, timeline_access: "fetch")
end

if lantern_resource.display_state == "failed" && servers.any? { _1.strand.label == "wait" }
lantern_resource.update(display_state: nil)
end

nap 30
end

Expand Down
Loading