Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

run doctor queries in target vms with daemonizer #68

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions migrate/20240710_lantern_doctor_rhizome.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# frozen_string_literal: true

Sequel.migration do
up do
alter_table(:lantern_doctor_page) do
add_column :vm_name, :text, null: true
end

alter_table(:lantern_doctor_query) do
add_column :server_type, :text, null: true, default: "primary"
end

# this is the query to check disk size, it should run on all servers
run "UPDATE lantern_doctor_query SET server_type='*' WHERE id='09b1b1d1-7095-89b7-8ae4-158e15e11871'"

# update queries to sync rhizome
run "INSERT INTO semaphore (id, strand_id, name) SELECT gen_random_uuid(), id, 'sync_system_queries' FROM strand s WHERE s.prog = 'Lantern::LanternDoctorNexus'"
end
end
3 changes: 3 additions & 0 deletions model/lantern/lantern_doctor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ def sync_system_queries
doctor_query_list = queries
system_query_list = system_queries

# update rhizome in case new method will be added
resource.servers.each { _1.incr_update_rhizome }

system_query_list.each {
if !has_system_query?(doctor_query_list, _1)
LanternDoctorQuery.create_with_id(parent_id: _1.id, doctor_id: id, condition: "unknown", type: "user", response_type: _1.response_type)
Expand Down
8 changes: 5 additions & 3 deletions model/lantern/lantern_doctor_page.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ class LanternDoctorPage < Sequel::Model

include ResourceMethods

def self.create_incident(query, db_name, err: "", output: "")
pg = Prog::PageNexus.assemble_with_logs("Healthcheck: #{query.name} failed on #{query.doctor.resource.name} - #{query.doctor.resource.label} (#{db_name})", [query.ubid, query.doctor.ubid], {"stderr" => err, "stdout" => output}, query.severity, "LanternDoctorQueryFailed", query.id, db_name)
def self.create_incident(query, db_name, vm_name, err: "", output: "")
pg = Prog::PageNexus.assemble_with_logs("Healthcheck: #{query.name} failed on #{query.doctor.resource.name} - #{query.doctor.resource.label} (#{db_name} - #{vm_name})", [query.ubid, query.doctor.ubid], {"stderr" => err, "stdout" => output}, query.severity, "LanternDoctorQueryFailed", query.id, db_name, vm_name)
LanternDoctorPage.create_with_id(
query_id: query.id,
page_id: pg.id,
status: "new"
status: "new",
db: db_name,
vm_name: vm_name
)
end

Expand Down
124 changes: 20 additions & 104 deletions model/lantern/lantern_doctor_query.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ def name
parent&.name || super
end

def task_name
"healthcheck_#{ubid}"
end

def db_name
parent&.db_name || super
end
Expand All @@ -47,8 +51,17 @@ def response_type
parent&.response_type || super
end

def server_type
parent&.server_type || super
end

def servers
doctor.resource.servers.select { (server_type == "*") || (server_type == "primary" && _1.primary?) || (server_type == "standby" && _1.standby?) }
end

def should_run?
CronParser.new(schedule).next(last_checked || Time.new - 365 * 24 * 60 * 60) <= Time.new
is_scheduled_time = CronParser.new(schedule).next(last_checked || Time.new - 365 * 24 * 60 * 60) <= Time.new
is_scheduled_time && doctor.resource.representative_server.vm.sshable.cmd("common/bin/daemonizer --check #{task_name}") == "NotStarted"
end

def is_system?
Expand All @@ -68,109 +81,12 @@ def new_and_active_pages
LanternDoctorPage.where(query_id: id, status: ["new", "triggered", "acknowledged"]).all
end

def run
if !should_run?
return nil
end

lantern_server = doctor.resource.representative_server
dbs = (db_name == "*") ? lantern_server.list_all_databases : [db_name]
query_user = user

any_failed = false
dbs.each do |db|
err_msg = ""
output = ""

failed = false
begin
if is_system? && fn_label && LanternDoctorQuery.method_defined?(fn_label)
res = send(fn_label, db, query_user)
elsif sql
res = lantern_server.run_query(sql, db: db, user: query_user).strip
else
fail "BUG: non-system query without sql"
end

case response_type
when "bool"
if res != "f"
failed = true
any_failed = true
end
when "rows"
if res != ""
failed = true
any_failed = true
end
output = res
else
fail "BUG: invalid response type (#{response_type}) on query #{name}"
end
rescue => e
failed = true
any_failed = true
Clog.emit("LanternDoctorQuery failed") { {error: e, query_name: name, db: db, resource_name: doctor.resource.name} }
err_msg = e.message
end

pg = LanternDoctorPage.where(query_id: id, db: db).where(Sequel.lit("status != 'resolved' ")).first

if failed && !pg
LanternDoctorPage.create_incident(self, db, err: err_msg, output: output)
elsif !failed && pg
pg.resolve
end
end

update(condition: any_failed ? "failed" : "healthy", last_checked: Time.new)
end

def check_daemon_embedding_jobs(db, query_user)
lantern_server = doctor.resource.representative_server
jobs_table_exists = lantern_server.run_query(<<SQL).chomp.strip
SELECT EXISTS (
SELECT FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = '_lantern_internal'
AND c.relname = 'embedding_generation_jobs'
AND c.relkind = 'r'
);
SQL

if jobs_table_exists == "f"
return "f"
end

jobs = lantern_server.run_query("SELECT \"schema\", \"table\", src_column, dst_column FROM _lantern_internal.embedding_generation_jobs WHERE init_finished_at IS NOT NULL AND canceled_at IS NULL;")

jobs = jobs.chomp.strip.split("\n").map do |row|
values = row.split(",")
{schema: values[0], table: values[1], src_column: values[2], dst_column: values[3]}
end

if jobs.empty?
return "f"
end

failed = jobs.any? do |job|
res = lantern_server.run_query("SELECT (SELECT COUNT(*) FROM \"#{job[:schema]}\".\"#{job[:table]}\" WHERE \"#{job[:src_column]}\" IS NOT NULL AND \"#{job[:src_column]}\" != '' AND \"#{job[:src_column]}\" != 'Error: Summary failed (llm)' AND \"#{job[:dst_column]}\" IS NULL) > 2000", db: db, user: query_user).strip
res == "t"
end

failed ? "t" : "f"
end

def check_disk_space_usage(_db, _query_user)
output = ""
doctor.resource.servers.each do |serv|
usage_percent = serv.vm.sshable.cmd("df | awk '$1 == \"/dev/root\" {print $5}' | sed 's/%//'").strip.to_i
if usage_percent > 90
server_type = serv.primary? ? "primary" : "standby"
output += "#{server_type} server - usage #{usage_percent}%\n"
end
rescue
def update_page_status(db, vm_name, success, output, err_msg)
pg = LanternDoctorPage.where(query_id: id, db: db, vm_name: vm_name).where(Sequel.lit("status != 'resolved' ")).first
if !success && !pg
LanternDoctorPage.create_incident(self, db, vm_name, err: err_msg, output: output)
elsif success && pg
pg.resolve
end
output.chomp
end
end
1 change: 1 addition & 0 deletions model/sshable.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# frozen_string_literal: true

require "stringio"
require "net/ssh"
require_relative "../model"

Expand Down
1 change: 1 addition & 0 deletions prog/gcp_vm/nexus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ def host
end
strand.children.each { _1.destroy }
gcp_vm.projects.map { gcp_vm.dissociate_with_project(_1) }
LanternDoctorPage.where(vm_name: gcp_vm.name).each { _1.resolve }
gcp_vm.destroy
end
pop "gcp vm deleted"
Expand Down
59 changes: 58 additions & 1 deletion prog/lantern/lantern_doctor_nexus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,69 @@ def before_run
end

if lantern_doctor.should_run?
lantern_doctor.queries.each { _1.run }
hop_run_queries
end

nap 60
end

label def run_queries
lantern_doctor.queries.each do |query|
next if !query.should_run?

dbs = (query.db_name == "*") ? lantern_doctor.resource.representative_server.list_all_databases : [query.db_name]

query.servers.each do |server|
server.vm.sshable.cmd("common/bin/daemonizer 'lantern/bin/doctor/run_query' #{query.task_name}", stdin: JSON.generate({query: {is_system: query.is_system?, response_type: query.response_type, name: query.name, sql: query.sql&.tr("\n", " "), fn_label: query.fn_label, query_user: query.user}, server_type: server.primary? ? "primary" : "standby", dbs: dbs}))
end
end

hop_wait_queries
end

label def wait_queries
lantern_doctor.queries.each do |query|
query.servers.each do |server|
vm = server.vm
status = "Unknown"
begin
status = vm.sshable.cmd("common/bin/daemonizer --check #{query.task_name}")
rescue
end

case status
when "Failed", "Succeeded"
logs = JSON.parse(vm.sshable.cmd("common/bin/daemonizer --logs #{query.task_name}"))
all_output = []

if !logs["stdout"].empty?
# stdout will be [{ "db": string, "result": string }]
begin
all_output = JSON.parse(logs["stdout"])
rescue
all_output = [{"db" => "*", "result" => logs["stdout"], "err" => logs["stderr"]}]
end

# resolve errored page if exists
query.update_page_status("*", vm.name, true, nil, nil)
else
# this is the case when command errored for some reason
all_output = [{"db" => "*", "result" => "", "err" => logs["stderr"]}]
end

all_output.each do |output|
query.update_page_status(output["db"], vm.name, status == "Succeeded", output["result"], output["err"])
end

query.update(condition: (status == "Failed") ? "failed" : "healthy", last_checked: Time.new)
vm.sshable.cmd("common/bin/daemonizer --clean #{query.task_name}")
end
end
end

hop_wait
end

label def sync_system_queries
decr_sync_system_queries
lantern_doctor.sync_system_queries
Expand Down
117 changes: 117 additions & 0 deletions rhizome/lantern/bin/doctor/run_query
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
#!/bin/env ruby
# frozen_string_literal: true

require "json"
require "yaml"
require_relative "../../../common/lib/util"
require_relative "../../lib/common"

$configure_hash = JSON.parse($stdin.read)
dbs = $configure_hash["dbs"]

def exec_sql(sql, user: "postgres", db: "postgres")
r("docker compose -f #{$compose_file} exec -T postgresql psql -q -U #{user} -t --csv #{db}", stdin: sql).chomp.strip
end

def run_for_db(db)
query = $configure_hash["query"]
err = ""
output = ""
response_type = query["response_type"]
name = query["name"]
is_system = query["is_system"]
fn_label = query["fn_label"]
query_user = query["query_user"]
sql = query["sql"]

success = true
begin
if is_system && fn_label && SystemQueries.respond_to?(fn_label)
res = SystemQueries.send(fn_label, db, query_user)
elsif sql
res = exec_sql(sql, db: db, user: query_user)
else
fail "BUG: non-system query without sql"
end

case response_type
when "bool"
if res != "f"
success = false
end
when "rows"
if res != ""
success = false
end
output = res
else
fail "BUG: invalid response type (#{response_type}) on query #{name}"
end
rescue => e
success = false
err = e.message
end

[success, {db: db, result: output, err: err}]
end

class SystemQueries
def self.check_daemon_embedding_jobs(db, query_user)
jobs_table_exists = exec_sql(<<SQL)
SELECT EXISTS (
SELECT FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = '_lantern_internal'
AND c.relname = 'embedding_generation_jobs'
AND c.relkind = 'r'
);
SQL

if jobs_table_exists == "f"
return "f"
end

jobs = exec_sql("SELECT \"schema\", \"table\", src_column, dst_column FROM _lantern_internal.embedding_generation_jobs WHERE init_finished_at IS NOT NULL AND canceled_at IS NULL;")

jobs = jobs.split("\n").map do |row|
values = row.split(",")
{schema: values[0], table: values[1], src_column: values[2], dst_column: values[3]}
end

if jobs.empty?
return "f"
end

failed = jobs.any? do |job|
res = exec_sql("SELECT (SELECT COUNT(*) FROM \"#{job[:schema]}\".\"#{job[:table]}\" WHERE \"#{job[:src_column]}\" IS NOT NULL AND \"#{job[:src_column]}\" != '' AND \"#{job[:src_column]}\" != 'Error: Summary failed (llm)' AND \"#{job[:dst_column]}\" IS NULL) > 2000", db: db, user: query_user)
res == "t"
end

failed ? "t" : "f"
end

def self.check_disk_space_usage(_db, _query_user)
server_type = $configure_hash["server_type"]
output = ""
usage_percent = r("df | awk '$1 == \"/dev/root\" {print $5}' | sed 's/%//'").chomp.strip.to_i
if usage_percent > 90
output += "#{server_type} server - usage #{usage_percent}%\n"
end
output.chomp
end
end

exit_code = 0

response = []
dbs.each do |db|
success, res = run_for_db(db)
if !success
exit_code = 1
end
response.push(res)
end

puts JSON.generate(response)

exit(exit_code)
Loading
Loading