Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lantern Doctor #25

Merged
merged 14 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ gem "octokit"
gem "argon2-kdf"
gem "googleauth"
gem "simplecov"
gem "parse-cron"

group :development do
gem "brakeman"
Expand Down
2 changes: 2 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ GEM
parallel (1.24.0)
parallel_tests (4.5.2)
parallel
parse-cron (0.1.4)
parser (3.3.0.5)
ast (~> 2.4.1)
racc
Expand Down Expand Up @@ -346,6 +347,7 @@ DEPENDENCIES
nokogiri
octokit
pagerduty (>= 4.0)
parse-cron
pry
pry-byebug
puma (>= 6.2.2)
Expand Down
1 change: 1 addition & 0 deletions config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def self.e2e_test?
override :lantern_backup_bucket, "walg-dev-backups"
override :e2e_test, "0"
override :backup_retention_days, 7, int
optional :lantern_backend_database_url, string

# GCP
override :gcp_project_id, "lantern-development", string
Expand Down
8 changes: 8 additions & 0 deletions db.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,11 @@
# DB.extension :date_arithmetic
DB.extension :pg_json, :pg_auto_parameterize, :pg_timestamptz, :pg_range, :pg_array
Sequel.extension :pg_range_ops

module LanternBackend
@@db = Sequel.connect(Config.lantern_backend_database_url, max_connections: Config.db_pool, pool_timeout: Config.database_timeout) if Config.lantern_backend_database_url

def self.db
@@db
end
end
34 changes: 34 additions & 0 deletions migrate/20240505_lantern_doctor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# frozen_string_literal: true

Sequel.migration do
change do
# doctor
create_table(:lantern_doctor) do
column :id, :uuid, primary_key: true, default: nil
column :created_at, :timestamptz, null: false, default: Sequel.lit("now()")
column :updated_at, :timestamptz, null: false, default: Sequel.lit("now()")
end
alter_table(:lantern_resource) do
add_foreign_key :doctor_id, :lantern_doctor, type: :uuid, null: true
end
# queries
create_enum(:query_condition, %w[unknown healthy failed])
create_enum(:query_type, %w[system user])
create_table(:lantern_doctor_query) do
column :id, :uuid, primary_key: true, default: nil
foreign_key :parent_id, :lantern_doctor_query, type: :uuid
foreign_key :doctor_id, :lantern_doctor, type: :uuid
column :name, :text, null: true
column :db_name, :text, null: true
column :schedule, :text, null: true
column :condition, :query_condition, null: false, default: "unknown"
column :fn_label, :text, null: true
column :sql, :text, null: true
column :type, :query_type, null: false
column :severity, :severity, default: "error", null: true
column :last_checked, :timestamptz, null: true
column :created_at, :timestamptz, null: false, default: Sequel.lit("now()")
column :updated_at, :timestamptz, null: false, default: Sequel.lit("now()")
end
end
end
8 changes: 8 additions & 0 deletions migrate/20240506_lantern_doctor_queries.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# frozen_string_literal: true

Sequel.migration do
change do
run "INSERT INTO lantern_doctor_query (id, name, db_name, schedule, condition, fn_label, type, severity)
VALUES ('4f916f44-3c7a-89b7-9795-1ccd417b45ba', 'Lantern Embedding Generation Job', '*', '*/30 * * * *', 'unknown', 'check_daemon_embedding_jobs', 'system', 'error')"
end
end
9 changes: 8 additions & 1 deletion misc/misc_operations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def self.reindex_all_concurrently(resource_name, disable_indexes: false)
schema, idx = _1.split(".")
queries.push("REINDEX INDEX CONCURRENTLY \\\"#{schema}\\\".\\\"#{idx}\\\";")
if disable_indexes
queries.push("UPDATE pg_index SET indisvalid = false, indisready = false WHERE indexrelid = quote_ident('#{_1}')::regclass::oid;")
queries.push("UPDATE pg_index SET indisvalid = false, indisready = false WHERE indexrelid = quote_ident('#{idx}')::regclass::oid;")
end
}

Expand Down Expand Up @@ -145,4 +145,11 @@ def self.get_all_lantern_indexes(resource_name)
end
all_indexes
end

def self.add_lantern_doctor_to_all
LanternResource.all.each {
lantern_doctor = Prog::Lantern::LanternDoctorNexus.assemble
_1.update(doctor_id: lantern_doctor.id)
}
end
end
41 changes: 41 additions & 0 deletions model/lantern/lantern_doctor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# frozen_string_literal: true

require_relative "../../model"

class LanternDoctor < Sequel::Model
one_to_one :strand, key: :id
one_to_one :resource, class: LanternResource, key: :doctor_id
one_to_many :queries, key: :doctor_id, class: LanternDoctorQuery
one_to_many :failed_queries, key: :doctor_id, class: LanternDoctorQuery, conditions: {condition: "failed"}

plugin :association_dependencies, queries: :destroy

include ResourceMethods
include SemaphoreMethods

semaphore :destroy, :sync_system_queries

def system_queries
@system_queries ||= LanternDoctorQuery.where(type: "system").all
end

def has_system_query?(queries, query)
queries.any? { _1.parent_id == query.id }
end

def should_run?
return false unless resource
resource.representative_server.display_state == "running" && resource.representative_server.strand.label == "wait"
end

def sync_system_queries
doctor_query_list = queries
system_query_list = system_queries

system_query_list.each {
if !has_system_query?(doctor_query_list, _1)
LanternDoctorQuery.create_with_id(parent_id: _1.id, doctor_id: id, condition: "unknown", type: "user")
end
}
end
end
139 changes: 139 additions & 0 deletions model/lantern/lantern_doctor_query.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# frozen_string_literal: true

require "parse-cron"
require_relative "../../model"
require_relative "../../db"

class LanternDoctorQuery < Sequel::Model
one_to_one :strand, key: :id
many_to_one :doctor, class: LanternDoctor, key: :doctor_id, primary_key: :id
many_to_one :parent, class: self, key: :parent_id
one_to_many :children, key: :parent_id, class: self

plugin :association_dependencies, children: :destroy
dataset_module Pagination

include ResourceMethods
include SemaphoreMethods

semaphore :destroy

def sql
parent&.sql || super
end

def name
parent&.name || super
end

def db_name
parent&.db_name || super
end

def severity
parent&.severity || super
end

def schedule
parent&.schedule || super
end

def fn_label
parent&.fn_label || super
end

def should_run?
CronParser.new(schedule).next(last_checked || Time.new - 61) <= Time.new
end

def is_system?
!parent.nil?
end

def user
return "postgres" if is_system?
doctor.resource.db_user
end

def active_pages
tag = Page.generate_tag("LanternDoctorQueryFailed", id)
Page.active.where(Sequel.like(:tag, "%#{tag}-%")).all
end

def all_pages
tag = Page.generate_tag("LanternDoctorQueryFailed", id)
Page.where(Sequel.like(:tag, "%#{tag}-%")).all
end

def run
if !should_run?
return nil
end

lantern_server = doctor.resource.representative_server
dbs = (db_name == "*") ? lantern_server.list_all_databases : [db_name]
query_user = user

any_failed = false
dbs.each do |db|
err_msg = ""

failed = false
begin
if is_system? && fn_label && LanternDoctorQuery.method_defined?(fn_label)
res = send(fn_label, db, query_user)
elsif sql
res = lantern_server.run_query(sql, db: db, user: query_user).strip
else
fail "BUG: non-system query without sql"
end

if res != "f"
failed = true
any_failed = true
end
rescue => e
failed = true
any_failed = true
Clog.emit("LanternDoctorQuery failed") { {error: e, query_name: name, db: db, resource_name: doctor.resource.name} }
err_msg = e.message
end

pg = Page.from_tag_parts("LanternDoctorQueryFailed", id, db)

if failed && !pg
Prog::PageNexus.assemble_with_logs("Healthcheck: #{name} failed on #{doctor.resource.name} (#{db})", [ubid, doctor.ubid, lantern_server.ubid], {"stderr" => err_msg}, severity, "LanternDoctorQueryFailed", id, db)
elsif !failed && pg
pg.incr_resolve
end
end

update(condition: any_failed ? "failed" : "healthy", last_checked: Time.new)
end

def check_daemon_embedding_jobs(db, query_user)
if !LanternBackend.db
fail "No connection to lantern backend database specified"
end

jobs = LanternBackend.db
.select(:schema, :table, :src_column, :dst_column)
.from(:embedding_generation_jobs)
.where(database_id: doctor.resource.name)
.where(Sequel.like(:db_connection, "%/#{db}"))
.where(Sequel.lit("init_finished_at IS NOT NULL"))
.all

if jobs.empty?
return "f"
end

lantern_server = doctor.resource.representative_server
failed = jobs.any? do |job|
res = lantern_server.run_query("SELECT (SELECT COUNT(*) FROM \"#{job[:schema]}\".\"#{job[:table]}\" WHERE \"#{job[:src_column]}\" IS NOT NULL AND \"#{job[:src_column]}\" != '' AND \"#{job[:dst_column]}\" IS NULL) > 1000", db: db, user: query_user).strip
res == "t"
end

failed ? "t" : "f"
end
end
1 change: 1 addition & 0 deletions model/lantern/lantern_resource.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class LanternResource < Sequel::Model
one_to_many :servers, class: LanternServer, key: :resource_id
one_to_one :representative_server, class: LanternServer, key: :resource_id, conditions: Sequel.~(representative_at: nil)
one_through_one :timeline, class: LanternTimeline, join_table: :lantern_server, left_key: :resource_id, right_key: :timeline_id
one_to_one :doctor, class: LanternDoctor, key: :id, primary_key: :doctor_id

dataset_module Authorization::Dataset
dataset_module Pagination
Expand Down
14 changes: 11 additions & 3 deletions model/lantern/lantern_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ def connection_string
).to_s
end

def run_query(query)
vm.sshable.cmd("sudo lantern/bin/exec", stdin: query).chomp
def run_query(query, db: "postgres", user: "postgres")
vm.sshable.cmd("sudo docker compose -f /var/lib/lantern/docker-compose.yaml exec -T postgresql psql -U #{user} -t --csv #{db}", stdin: query).chomp
end

def run_query_all(query)
vm.sshable.cmd("sudo lantern/bin/exec_all", stdin: query).chomp
list_all_databases.map { [_1, run_query(query, db: _1)] }
end

def display_state
Expand Down Expand Up @@ -196,6 +196,14 @@ def prewarm_indexes_query
SQL
end

def list_all_databases
vm.sshable.cmd("sudo docker compose -f /var/lib/lantern/docker-compose.yaml exec postgresql psql -U postgres -P \"footer=off\" -c 'SELECT datname from pg_database' | tail -n +3 | grep -v 'template0' | grep -v 'template1'")
.chomp
.strip
.split("\n")
.map { _1.strip }
end

# def failover_target
# nil
# end
Expand Down
Loading