forked from ubicloud/ubicloud
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
22 changed files
with
1,073 additions
and
8 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
# frozen_string_literal: true | ||
|
||
Sequel.migration do | ||
change do | ||
# doctor | ||
create_table(:lantern_doctor) do | ||
column :id, :uuid, primary_key: true, default: nil | ||
column :created_at, :timestamptz, null: false, default: Sequel.lit("now()") | ||
column :updated_at, :timestamptz, null: false, default: Sequel.lit("now()") | ||
end | ||
alter_table(:lantern_resource) do | ||
add_foreign_key :doctor_id, :lantern_doctor, type: :uuid, null: true | ||
end | ||
# queries | ||
create_enum(:query_condition, %w[unknown healthy failed]) | ||
create_enum(:query_type, %w[system user]) | ||
create_table(:lantern_doctor_query) do | ||
column :id, :uuid, primary_key: true, default: nil | ||
foreign_key :parent_id, :lantern_doctor_query, type: :uuid | ||
foreign_key :doctor_id, :lantern_doctor, type: :uuid | ||
column :name, :text, null: true | ||
column :db_name, :text, null: true | ||
column :schedule, :text, null: true | ||
column :condition, :query_condition, null: false, default: "unknown" | ||
column :fn_label, :text, null: true | ||
column :sql, :text, null: true | ||
column :type, :query_type, null: false | ||
column :severity, :severity, default: "error", null: true | ||
column :last_checked, :timestamptz, null: true | ||
column :created_at, :timestamptz, null: false, default: Sequel.lit("now()") | ||
column :updated_at, :timestamptz, null: false, default: Sequel.lit("now()") | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
# frozen_string_literal: true | ||
|
||
Sequel.migration do | ||
change do | ||
run "INSERT INTO lantern_doctor_query (id, name, db_name, schedule, condition, fn_label, type, severity) | ||
VALUES ('4f916f44-3c7a-89b7-9795-1ccd417b45ba', 'Lantern Embedding Generation Job', '*', '*/30 * * * *', 'unknown', 'check_daemon_embedding_jobs', 'system', 'error')" | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
# frozen_string_literal: true | ||
|
||
require_relative "../../model" | ||
|
||
class LanternDoctor < Sequel::Model | ||
one_to_one :strand, key: :id | ||
one_to_one :resource, class: LanternResource, key: :doctor_id | ||
one_to_many :queries, key: :doctor_id, class: LanternDoctorQuery | ||
one_to_many :failed_queries, key: :doctor_id, class: LanternDoctorQuery, conditions: {condition: "failed"} | ||
|
||
plugin :association_dependencies, queries: :destroy | ||
|
||
include ResourceMethods | ||
include SemaphoreMethods | ||
|
||
semaphore :destroy, :sync_system_queries | ||
|
||
def system_queries | ||
@system_queries ||= LanternDoctorQuery.where(type: "system").all | ||
end | ||
|
||
def has_system_query?(queries, query) | ||
queries.any? { _1.parent_id == query.id } | ||
end | ||
|
||
def sync_system_queries | ||
doctor_query_list = queries | ||
system_query_list = system_queries | ||
|
||
system_query_list.each { | ||
if !has_system_query?(doctor_query_list, _1) | ||
LanternDoctorQuery.create_with_id(parent_id: _1.id, doctor_id: id, condition: "unknown", type: "user") | ||
end | ||
} | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,139 @@ | ||
# frozen_string_literal: true | ||
|
||
require "parse-cron" | ||
require_relative "../../model" | ||
require_relative "../../db" | ||
|
||
class LanternDoctorQuery < Sequel::Model | ||
one_to_one :strand, key: :id | ||
many_to_one :doctor, class: LanternDoctor, key: :doctor_id, primary_key: :id | ||
many_to_one :parent, class: self, key: :parent_id | ||
one_to_many :children, key: :parent_id, class: self | ||
|
||
plugin :association_dependencies, children: :destroy | ||
dataset_module Pagination | ||
|
||
include ResourceMethods | ||
include SemaphoreMethods | ||
|
||
semaphore :destroy | ||
|
||
def sql | ||
parent&.sql || super | ||
end | ||
|
||
def name | ||
parent&.name || super | ||
end | ||
|
||
def db_name | ||
parent&.db_name || super | ||
end | ||
|
||
def severity | ||
parent&.severity || super | ||
end | ||
|
||
def schedule | ||
parent&.schedule || super | ||
end | ||
|
||
def fn_label | ||
parent&.fn_label || super | ||
end | ||
|
||
def should_run? | ||
CronParser.new(schedule).next(last_checked || Time.new - 61) <= Time.new | ||
end | ||
|
||
def is_system? | ||
!parent.nil? | ||
end | ||
|
||
def user | ||
return "postgres" if is_system? | ||
doctor.resource.db_user | ||
end | ||
|
||
def active_pages | ||
tag = Page.generate_tag("LanternDoctorQueryFailed", id) | ||
Page.active.where(Sequel.like(:tag, "%#{tag}-%")).all | ||
end | ||
|
||
def all_pages | ||
tag = Page.generate_tag("LanternDoctorQueryFailed", id) | ||
Page.where(Sequel.like(:tag, "%#{tag}-%")).all | ||
end | ||
|
||
def run | ||
if !should_run? | ||
return nil | ||
end | ||
|
||
lantern_server = doctor.resource.representative_server | ||
dbs = (db_name == "*") ? lantern_server.list_all_databases : [db_name] | ||
query_user = user | ||
|
||
any_failed = false | ||
dbs.each do |db| | ||
err_msg = "" | ||
|
||
failed = false | ||
begin | ||
if is_system? && fn_label && LanternDoctorQuery.method_defined?(fn_label) | ||
res = send(fn_label, db, query_user) | ||
elsif sql | ||
res = lantern_server.run_query(sql, db: db, user: query_user).strip | ||
else | ||
fail "BUG: non-system query without sql" | ||
end | ||
|
||
if res != "f" | ||
failed = true | ||
any_failed = true | ||
end | ||
rescue => e | ||
failed = true | ||
any_failed = true | ||
Clog.emit("LanternDoctorQuery failed") { {error: e, query_name: name, db: db, resource_name: doctor.resource.name} } | ||
err_msg = e.message | ||
end | ||
|
||
pg = Page.from_tag_parts("LanternDoctorQueryFailed", id, db) | ||
|
||
if failed && !pg | ||
Prog::PageNexus.assemble_with_logs("Healthcheck: #{name} failed on #{doctor.resource.name} (#{db})", [ubid, doctor.ubid, lantern_server.ubid], {"stderr" => err_msg}, severity, "LanternDoctorQueryFailed", id, db) | ||
elsif !failed && pg | ||
pg.incr_resolve | ||
end | ||
end | ||
|
||
update(condition: any_failed ? "failed" : "healthy", last_checked: Time.new) | ||
end | ||
|
||
def check_daemon_embedding_jobs(db, query_user) | ||
if !LanternBackend.db | ||
fail "No connection to lantern backend database specified" | ||
end | ||
|
||
jobs = LanternBackend.db | ||
.select(:schema, :table, :src_column, :dst_column) | ||
.from(:embedding_generation_jobs) | ||
.where(database_id: doctor.resource.name) | ||
.where(Sequel.like(:db_connection, "%/#{db}")) | ||
.where(Sequel.lit("init_finished_at IS NOT NULL")) | ||
.all | ||
|
||
if jobs.empty? | ||
return "f" | ||
end | ||
|
||
lantern_server = doctor.resource.representative_server | ||
failed = jobs.any? do |job| | ||
res = lantern_server.run_query("SELECT (SELECT COUNT(*) FROM \"#{job[:schema]}\".\"#{job[:table]}\" WHERE \"#{job[:src_column]}\" IS NOT NULL AND \"#{job[:src_column]}\" != '' AND \"#{job[:dst_column]}\" IS NULL) > 1000", db: db, user: query_user).strip | ||
res == "t" | ||
end | ||
|
||
failed ? "t" : "f" | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
# frozen_string_literal: true | ||
|
||
require "forwardable" | ||
|
||
class Prog::Lantern::LanternDoctorNexus < Prog::Base | ||
subject_is :lantern_doctor | ||
|
||
extend Forwardable | ||
def_delegators :lantern_doctor | ||
|
||
semaphore :destroy, :sync_system_queries | ||
|
||
def self.assemble | ||
DB.transaction do | ||
lantern_doctor = LanternDoctor.create_with_id | ||
Strand.create(prog: "Lantern::LanternDoctorNexus", label: "start") { _1.id = lantern_doctor.id } | ||
end | ||
end | ||
|
||
def before_run | ||
when_destroy_set? do | ||
if strand.label != "destroy" | ||
hop_destroy | ||
end | ||
end | ||
end | ||
|
||
label def start | ||
lantern_doctor.sync_system_queries | ||
hop_wait_resource | ||
end | ||
|
||
label def wait_resource | ||
nap 5 if lantern_doctor.resource&.strand&.label != "wait" | ||
hop_wait | ||
end | ||
|
||
label def wait | ||
if lantern_doctor.resource.nil? | ||
hop_destroy | ||
end | ||
|
||
when_sync_system_queries_set? do | ||
hop_sync_system_queries | ||
end | ||
|
||
lantern_doctor.queries.each { _1.run } | ||
nap 60 | ||
end | ||
|
||
label def sync_system_queries | ||
decr_sync_system_queries | ||
lantern_doctor.sync_system_queries | ||
hop_wait | ||
end | ||
|
||
label def destroy | ||
decr_destroy | ||
|
||
lantern_doctor.failed_queries.each { | ||
_1.active_pages.each { |pg| pg.incr_resolve } | ||
} | ||
|
||
lantern_doctor.destroy | ||
pop "lantern doctor is deleted" | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.