Skip to content

Commit

Permalink
check embedding jobs in internal schema if table exists
Browse files Browse the repository at this point in the history
  • Loading branch information
var77 committed May 31, 2024
1 parent c40e047 commit 9f3b7e3
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 6 deletions.
25 changes: 24 additions & 1 deletion model/lantern/lantern_doctor_query.rb
Original file line number Diff line number Diff line change
Expand Up @@ -131,19 +131,42 @@ def check_daemon_embedding_jobs(db, query_user)
fail "No connection to lantern backend database specified"
end

# TODO:: the backend db connection will be removed after daemon version update is done
jobs = LanternBackend.db
.select(:schema, :table, :src_column, :dst_column)
.from(:embedding_generation_jobs)
.where(database_id: doctor.resource.name)
.where(Sequel.like(:db_connection, "%/#{db}"))
.where(Sequel.lit("init_finished_at IS NOT NULL"))
.where(Sequel.lit("canceled_at IS NULL"))
.all

lantern_server = doctor.resource.representative_server
new_jobs_exists = lantern_server.run_query(<<SQL).chomp.strip
SELECT EXISTS (
SELECT FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = '_lantern_internal'
AND c.relname = 'embedding_generation_jobs'
AND c.relkind = 'r'
);
SQL

if new_jobs_exists == "t"
new_jobs = lantern_server.run_query("SELECT \"schema\", \"table\", src_column, dst_column FROM _lantern_internal.embedding_generation_jobs WHERE init_finished_at IS NOT NULL AND canceled_at IS NULL;")

new_jobs = new_jobs.split("\n").map do |row|
values = row.split(",")
{schema: values[0], table: values[1], src_column: values[2], dst_column: values[3]}
end

jobs.concat new_jobs
end

if jobs.empty?
return "f"
end

lantern_server = doctor.resource.representative_server
failed = jobs.any? do |job|
res = lantern_server.run_query("SELECT (SELECT COUNT(*) FROM \"#{job[:schema]}\".\"#{job[:table]}\" WHERE \"#{job[:src_column]}\" IS NOT NULL AND \"#{job[:src_column]}\" != '' AND \"#{job[:src_column]}\" != 'Error: Summary failed (llm)' AND \"#{job[:dst_column]}\" IS NULL) > 2000", db: db, user: query_user).strip
res == "t"
Expand Down
19 changes: 14 additions & 5 deletions spec/model/lantern/lantern_doctor_query_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -391,8 +391,13 @@
where: instance_double(Sequel::Dataset,
where: instance_double(Sequel::Dataset,
where: instance_double(Sequel::Dataset,
all: [{schema: "public", table: "test", src_column: "test-src", dst_column: "test-dst"}]))))))
expect(serv).to receive(:run_query).with("SELECT (SELECT COUNT(*) FROM \"public\".\"test\" WHERE \"test-src\" IS NOT NULL AND \"test-src\" != '' AND \"test-src\" != 'Error: Summary failed (llm)' AND \"test-dst\" IS NULL) > 2000", db: "postgres", user: "postgres").and_return("t")
where: instance_double(Sequel::Dataset,
all: [{schema: "public", table: "test", src_column: "test-src", dst_column: "test-dst"}])))))))
expect(serv).to receive(:run_query).and_return("t")
expect(serv).to receive(:run_query).and_return("public,test2,test-src,test-dst\npublic,test3,test-src,test-dst")
expect(serv).to receive(:run_query).and_return("f")
expect(serv).to receive(:run_query).and_return("f")
expect(serv).to receive(:run_query).and_return("t")
expect(lantern_doctor_query.check_daemon_embedding_jobs("postgres", "postgres")).to eq("t")
end

Expand All @@ -408,7 +413,9 @@
where: instance_double(Sequel::Dataset,
where: instance_double(Sequel::Dataset,
where: instance_double(Sequel::Dataset,
all: []))))))
where: instance_double(Sequel::Dataset,
all: [])))))))
expect(serv).to receive(:run_query).and_return("f")
expect(lantern_doctor_query.check_daemon_embedding_jobs("postgres", "postgres")).to eq("f")
end
end
Expand All @@ -425,8 +432,10 @@
where: instance_double(Sequel::Dataset,
where: instance_double(Sequel::Dataset,
where: instance_double(Sequel::Dataset,
all: [{schema: "public", table: "test", src_column: "test-src", dst_column: "test-dst"}]))))))
expect(serv).to receive(:run_query).with("SELECT (SELECT COUNT(*) FROM \"public\".\"test\" WHERE \"test-src\" IS NOT NULL AND \"test-src\" != '' AND \"test-src\" != 'Error: Summary failed (llm)' AND \"test-dst\" IS NULL) > 2000", db: "postgres", user: "postgres").and_return("f")
where: instance_double(Sequel::Dataset,
all: [{schema: "public", table: "test", src_column: "test-src", dst_column: "test-dst"}])))))))
expect(serv).to receive(:run_query).and_return("f")
expect(serv).to receive(:run_query).and_return("f")
expect(lantern_doctor_query.check_daemon_embedding_jobs("postgres", "postgres")).to eq("f")
end

Expand Down

0 comments on commit 9f3b7e3

Please sign in to comment.