Skip to content

Commit

Permalink
run doctor queries in target vms with daemonizer to not exceed transa…
Browse files Browse the repository at this point in the history
…ction maximum time
  • Loading branch information
var77 committed Jul 10, 2024
1 parent 1f22606 commit 561c2f9
Show file tree
Hide file tree
Showing 18 changed files with 473 additions and 416 deletions.
19 changes: 19 additions & 0 deletions migrate/20240710_lantern_doctor_rhizome.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# frozen_string_literal: true

Sequel.migration do
up do
alter_table(:lantern_doctor_page) do
add_column :vm_name, :text, null: true
end

alter_table(:lantern_doctor_query) do
add_column :server_type, :text, null: true, default: "primary"
end

# this is the query to check disk size, it should run on all servers
run "UPDATE lantern_doctor_query SET server_type='*' WHERE id='09b1b1d1-7095-89b7-8ae4-158e15e11871'"

# update queries to sync rhizome
run "INSERT INTO semaphore (id, strand_id, name) SELECT gen_random_uuid(), id, 'sync_system_queries' FROM strand s WHERE s.prog = 'Lantern::LanternDoctorNexus'"
end
end
3 changes: 3 additions & 0 deletions model/lantern/lantern_doctor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ def sync_system_queries
doctor_query_list = queries
system_query_list = system_queries

# update rhizome in case new method will be added
resource.servers.each { _1.incr_update_rhizome }

system_query_list.each {
if !has_system_query?(doctor_query_list, _1)
LanternDoctorQuery.create_with_id(parent_id: _1.id, doctor_id: id, condition: "unknown", type: "user", response_type: _1.response_type)
Expand Down
8 changes: 5 additions & 3 deletions model/lantern/lantern_doctor_page.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ class LanternDoctorPage < Sequel::Model

include ResourceMethods

def self.create_incident(query, db_name, err: "", output: "")
pg = Prog::PageNexus.assemble_with_logs("Healthcheck: #{query.name} failed on #{query.doctor.resource.name} - #{query.doctor.resource.label} (#{db_name})", [query.ubid, query.doctor.ubid], {"stderr" => err, "stdout" => output}, query.severity, "LanternDoctorQueryFailed", query.id, db_name)
def self.create_incident(query, db_name, vm_name, err: "", output: "")
pg = Prog::PageNexus.assemble_with_logs("Healthcheck: #{query.name} failed on #{query.doctor.resource.name} - #{query.doctor.resource.label} (#{db_name} - #{vm_name})", [query.ubid, query.doctor.ubid], {"stderr" => err, "stdout" => output}, query.severity, "LanternDoctorQueryFailed", query.id, db_name, vm_name)
LanternDoctorPage.create_with_id(
query_id: query.id,
page_id: pg.id,
status: "new"
status: "new",
db: db_name,
vm_name: vm_name
)
end

Expand Down
124 changes: 20 additions & 104 deletions model/lantern/lantern_doctor_query.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ def name
parent&.name || super
end

def task_name
"healthcheck_#{ubid}"
end

def db_name
parent&.db_name || super
end
Expand All @@ -47,8 +51,17 @@ def response_type
parent&.response_type || super
end

def server_type
parent&.server_type || super
end

def servers
doctor.resource.servers.select { (server_type == "*") || (server_type == "primary" && _1.primary?) || (server_type == "standby" && _1.standby?) }
end

def should_run?
CronParser.new(schedule).next(last_checked || Time.new - 365 * 24 * 60 * 60) <= Time.new
is_scheduled_time = CronParser.new(schedule).next(last_checked || Time.new - 365 * 24 * 60 * 60) <= Time.new
is_scheduled_time && doctor.resource.representative_server.vm.sshable.cmd("common/bin/daemonizer --check #{task_name}") == "NotStarted"
end

def is_system?
Expand All @@ -68,109 +81,12 @@ def new_and_active_pages
LanternDoctorPage.where(query_id: id, status: ["new", "triggered", "acknowledged"]).all
end

def run
if !should_run?
return nil
end

lantern_server = doctor.resource.representative_server
dbs = (db_name == "*") ? lantern_server.list_all_databases : [db_name]
query_user = user

any_failed = false
dbs.each do |db|
err_msg = ""
output = ""

failed = false
begin
if is_system? && fn_label && LanternDoctorQuery.method_defined?(fn_label)
res = send(fn_label, db, query_user)
elsif sql
res = lantern_server.run_query(sql, db: db, user: query_user).strip
else
fail "BUG: non-system query without sql"
end

case response_type
when "bool"
if res != "f"
failed = true
any_failed = true
end
when "rows"
if res != ""
failed = true
any_failed = true
end
output = res
else
fail "BUG: invalid response type (#{response_type}) on query #{name}"
end
rescue => e
failed = true
any_failed = true
Clog.emit("LanternDoctorQuery failed") { {error: e, query_name: name, db: db, resource_name: doctor.resource.name} }
err_msg = e.message
end

pg = LanternDoctorPage.where(query_id: id, db: db).where(Sequel.lit("status != 'resolved' ")).first

if failed && !pg
LanternDoctorPage.create_incident(self, db, err: err_msg, output: output)
elsif !failed && pg
pg.resolve
end
end

update(condition: any_failed ? "failed" : "healthy", last_checked: Time.new)
end

def check_daemon_embedding_jobs(db, query_user)
lantern_server = doctor.resource.representative_server
jobs_table_exists = lantern_server.run_query(<<SQL).chomp.strip
SELECT EXISTS (
SELECT FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = '_lantern_internal'
AND c.relname = 'embedding_generation_jobs'
AND c.relkind = 'r'
);
SQL

if jobs_table_exists == "f"
return "f"
end

jobs = lantern_server.run_query("SELECT \"schema\", \"table\", src_column, dst_column FROM _lantern_internal.embedding_generation_jobs WHERE init_finished_at IS NOT NULL AND canceled_at IS NULL;")

jobs = jobs.chomp.strip.split("\n").map do |row|
values = row.split(",")
{schema: values[0], table: values[1], src_column: values[2], dst_column: values[3]}
end

if jobs.empty?
return "f"
end

failed = jobs.any? do |job|
res = lantern_server.run_query("SELECT (SELECT COUNT(*) FROM \"#{job[:schema]}\".\"#{job[:table]}\" WHERE \"#{job[:src_column]}\" IS NOT NULL AND \"#{job[:src_column]}\" != '' AND \"#{job[:src_column]}\" != 'Error: Summary failed (llm)' AND \"#{job[:dst_column]}\" IS NULL) > 2000", db: db, user: query_user).strip
res == "t"
end

failed ? "t" : "f"
end

def check_disk_space_usage(_db, _query_user)
output = ""
doctor.resource.servers.each do |serv|
usage_percent = serv.vm.sshable.cmd("df | awk '$1 == \"/dev/root\" {print $5}' | sed 's/%//'").strip.to_i
if usage_percent > 90
server_type = serv.primary? ? "primary" : "standby"
output += "#{server_type} server - usage #{usage_percent}%\n"
end
rescue
def update_page_status(db, vm_name, success, output, err_msg)
pg = LanternDoctorPage.where(query_id: id, db: db, vm_name: vm_name).where(Sequel.lit("status != 'resolved' ")).first
if !success && !pg
LanternDoctorPage.create_incident(self, db, vm_name, err: err_msg, output: output)
elsif success && pg
pg.resolve
end
output.chomp
end
end
1 change: 1 addition & 0 deletions model/sshable.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# frozen_string_literal: true

require "stringio"
require "net/ssh"
require_relative "../model"

Expand Down
1 change: 1 addition & 0 deletions prog/gcp_vm/nexus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ def host
end
strand.children.each { _1.destroy }
gcp_vm.projects.map { gcp_vm.dissociate_with_project(_1) }
LanternDoctorPage.where(vm_name: gcp_vm.name).each { _1.resolve }
gcp_vm.destroy
end
pop "gcp vm deleted"
Expand Down
59 changes: 58 additions & 1 deletion prog/lantern/lantern_doctor_nexus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,69 @@ def before_run
end

if lantern_doctor.should_run?
lantern_doctor.queries.each { _1.run }
hop_run_queries
end

nap 60
end

label def run_queries
lantern_doctor.queries.each do |query|
next if !query.should_run?

dbs = (query.db_name == "*") ? lantern_doctor.resource.representative_server.list_all_databases : [query.db_name]

query.servers.each do |server|
server.vm.sshable.cmd("common/bin/daemonizer 'lantern/bin/doctor/run_query' #{query.task_name}", stdin: JSON.generate({query: {is_system: query.is_system?, response_type: query.response_type, name: query.name, sql: query.sql&.tr("\n", " "), fn_label: query.fn_label, query_user: query.user}, server_type: server.primary? ? "primary" : "standby", dbs: dbs}))
end
end

hop_wait_queries
end

label def wait_queries
lantern_doctor.queries.each do |query|
query.servers.each do |server|
vm = server.vm
status = "Unknown"
begin
status = vm.sshable.cmd("common/bin/daemonizer --check #{query.task_name}")
rescue
end

case status
when "Failed", "Succeeded"
logs = JSON.parse(vm.sshable.cmd("common/bin/daemonizer --logs #{query.task_name}"))
all_output = []

if !logs["stdout"].empty?
# stdout will be [{ "db": string, "result": string }]
begin
all_output = JSON.parse(logs["stdout"])
rescue
all_output = [{"db" => "*", "result" => logs["stdout"], "err" => logs["stderr"]}]
end

# resolve errored page if exists
query.update_page_status("*", vm.name, true, nil, nil)
else
# this is the case when command errored for some reason
all_output = [{"db" => "*", "result" => "", "err" => logs["stderr"]}]
end

all_output.each do |output|
query.update_page_status(output["db"], vm.name, status == "Succeeded", output["result"], output["err"])
end

query.update(condition: (status == "Failed") ? "failed" : "healthy", last_checked: Time.new)
vm.sshable.cmd("common/bin/daemonizer --clean #{query.task_name}")
end
end
end

hop_wait
end

label def sync_system_queries
decr_sync_system_queries
lantern_doctor.sync_system_queries
Expand Down
117 changes: 117 additions & 0 deletions rhizome/lantern/bin/doctor/run_query
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
#!/bin/env ruby
# frozen_string_literal: true

require "json"
require "yaml"
require_relative "../../../common/lib/util"
require_relative "../../lib/common"

$configure_hash = JSON.parse($stdin.read)
dbs = $configure_hash["dbs"]

def exec_sql(sql, user: "postgres", db: "postgres")
r("docker compose -f #{$compose_file} exec -T postgresql psql -q -U #{user} -t --csv #{db}", stdin: sql).chomp.strip
end

def run_for_db(db)
query = $configure_hash["query"]
err = ""
output = ""
response_type = query["response_type"]
name = query["name"]
is_system = query["is_system"]
fn_label = query["fn_label"]
query_user = query["query_user"]
sql = query["sql"]

success = true
begin
if is_system && fn_label && SystemQueries.respond_to?(fn_label)
res = SystemQueries.send(fn_label, db, query_user)
elsif sql
res = exec_sql(sql, db: db, user: query_user)
else
fail "BUG: non-system query without sql"
end

case response_type
when "bool"
if res != "f"
success = false
end
when "rows"
if res != ""
success = false
end
output = res
else
fail "BUG: invalid response type (#{response_type}) on query #{name}"
end
rescue => e
success = false
err = e.message
end

[success, {db: db, result: output, err: err}]
end

class SystemQueries
def self.check_daemon_embedding_jobs(db, query_user)
jobs_table_exists = exec_sql(<<SQL)
SELECT EXISTS (
SELECT FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = '_lantern_internal'
AND c.relname = 'embedding_generation_jobs'
AND c.relkind = 'r'
);
SQL

if jobs_table_exists == "f"
return "f"
end

jobs = exec_sql("SELECT \"schema\", \"table\", src_column, dst_column FROM _lantern_internal.embedding_generation_jobs WHERE init_finished_at IS NOT NULL AND canceled_at IS NULL;")

jobs = jobs.split("\n").map do |row|
values = row.split(",")
{schema: values[0], table: values[1], src_column: values[2], dst_column: values[3]}
end

if jobs.empty?
return "f"
end

failed = jobs.any? do |job|
res = exec_sql("SELECT (SELECT COUNT(*) FROM \"#{job[:schema]}\".\"#{job[:table]}\" WHERE \"#{job[:src_column]}\" IS NOT NULL AND \"#{job[:src_column]}\" != '' AND \"#{job[:src_column]}\" != 'Error: Summary failed (llm)' AND \"#{job[:dst_column]}\" IS NULL) > 2000", db: db, user: query_user)
res == "t"
end

failed ? "t" : "f"
end

def self.check_disk_space_usage(_db, _query_user)
server_type = $configure_hash["server_type"]
output = ""
usage_percent = r("df | awk '$1 == \"/dev/root\" {print $5}' | sed 's/%//'").chomp.strip.to_i
if usage_percent > 90
output += "#{server_type} server - usage #{usage_percent}%\n"
end
output.chomp
end
end

exit_code = 0

response = []
dbs.each do |db|
success, res = run_for_db(db)
if !success
exit_code = 1
end
response.push(res)
end

puts JSON.generate(response)

exit(exit_code)
Loading

0 comments on commit 561c2f9

Please sign in to comment.