Skip to content

Commit

Permalink
Add necessarry gcp api calls for setting up bigquery table
Browse files Browse the repository at this point in the history
  • Loading branch information
var77 committed May 21, 2024
1 parent 5e57992 commit 9154a97
Show file tree
Hide file tree
Showing 18 changed files with 269 additions and 85 deletions.
81 changes: 79 additions & 2 deletions lib/hosting/gcp_apis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -332,12 +332,89 @@ def allow_bucket_usage_by_prefix(service_account_email, bucket_name, prefix)
Hosting::GcpApis.check_errors(response)
end

def create_big_query_table(dataset, table, schema)
connection = Excon.new("https://bigquery.googleapis.com", headers: @host[:headers])

request_body = {
tableReference: {
projectId: @project,
datasetId: dataset,
tableId: table
},
schema: {
fields: schema
}
}

response = connection.post(
path: "/bigquery/v2/projects/#{@project}/datasets/#{dataset}/tables",
body: JSON.dump(request_body),
headers: {"Content-Type" => "application/json"},
expects: [200, 400, 403]
)

Hosting::GcpApis.check_errors(response)
end

def allow_access_to_big_query_dataset(service_account_email, dataset)
connection = Excon.new("https://bigquery.googleapis.com", headers: @host[:headers])
response = connection.get(
path: "/bigquery/v2/projects/#{@project}/datasets/#{dataset}",
expects: [200, 400, 403]
)

Hosting::GcpApis.check_errors(response)
body = JSON.parse(response.body)
access = body["access"]
access << {
role: "roles/bigquery.metadataViewer",
userByEmail: service_account_email
}

response = connection.patch(
path: "/bigquery/v2/projects/#{@project}/datasets/#{dataset}",
body: JSON.dump({
access: access
}),
expects: [200, 400, 403]
)
Hosting::GcpApis.check_errors(response)
end

def allow_access_to_big_query_table(service_account_email, dataset, table)
# TODO
connection = Excon.new("https://bigquery.googleapis.com", headers: @host[:headers])
response = connection.post(
path: "/bigquery/v2/projects/#{@project}/datasets/#{dataset}/tables/#{table}:getIamPolicy",
body: JSON.dump({}),
expects: [200, 400, 403]
)

Hosting::GcpApis.check_errors(response)

policy = JSON.parse(response.body)

policy["bindings"] ||= []
policy["bindings"] << {
role: "roles/bigquery.dataEditor",
members: ["serviceAccount:#{service_account_email}"]
}

response = connection.post(
path: "/bigquery/v2/projects/#{@project}/datasets/#{dataset}/tables/#{table}:setIamPolicy",
body: JSON.dump({policy: policy}),
expects: [200, 400, 403]
)

Hosting::GcpApis.check_errors(response)
end

def remove_big_query_table(dataset, table)
# TODO
connection = Excon.new("https://bigquery.googleapis.com", headers: @host[:headers])

connection.delete(
path: "/bigquery/v2/projects/#{@project}/datasets/#{dataset}/tables/#{table}",
expects: [204, 404]
)
end

def create_image(name:, vm_name:, zone:, description: "", family: "lantern-ubuntu")
Expand Down
32 changes: 31 additions & 1 deletion model/lantern/lantern_resource.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,38 @@ def allow_timeline_access_to_bucket
api.allow_bucket_usage_by_prefix(service_account_name, Config.lantern_backup_bucket, timeline.ubid)
end

def allow_access_to_big_query
def create_logging_table
api = Hosting::GcpApis.new
schema = [
{name: "log_time", type: "TIMESTAMP", mode: "NULLABLE"},
{name: "user_name", type: "STRING", mode: "NULLABLE"},
{name: "database_name", type: "STRING", mode: "NULLABLE"},
{name: "process_id", type: "INTEGER", mode: "NULLABLE"},
{name: "connection_from", type: "STRING", mode: "NULLABLE"},
{name: "session_id", type: "STRING", mode: "NULLABLE"},
{name: "session_line_num", type: "INTEGER", mode: "NULLABLE"},
{name: "command_tag", type: "STRING", mode: "NULLABLE"},
{name: "session_start_time", type: "TIMESTAMP", mode: "NULLABLE"},
{name: "virtual_transaction_id", type: "STRING", mode: "NULLABLE"},
{name: "transaction_id", type: "INTEGER", mode: "NULLABLE"},
{name: "error_severity", type: "STRING", mode: "NULLABLE"},
{name: "sql_state_code", type: "STRING", mode: "NULLABLE"},
{name: "duration", type: "FLOAT", mode: "NULLABLE"},
{name: "message", type: "STRING", mode: "NULLABLE"},
{name: "detail", type: "STRING", mode: "NULLABLE"},
{name: "hint", type: "STRING", mode: "NULLABLE"},
{name: "internal_query", type: "STRING", mode: "NULLABLE"},
{name: "internal_query_pos", type: "INTEGER", mode: "NULLABLE"},
{name: "context", type: "STRING", mode: "NULLABLE"},
{name: "query", type: "STRING", mode: "NULLABLE"},
{name: "query_pos", type: "INTEGER", mode: "NULLABLE"},
{name: "location", type: "STRING", mode: "NULLABLE"},
{name: "application_name", type: "STRING", mode: "NULLABLE"}
]
api.create_big_query_table(Config.lantern_log_dataset, big_query_table, schema)
# Add metadata viewer access
api.allow_access_to_big_query_dataset(service_account_name, Config.lantern_log_dataset)
# Add access to only this table
api.allow_access_to_big_query_table(service_account_name, Config.lantern_log_dataset, big_query_table)
end

Expand Down
18 changes: 7 additions & 11 deletions prog/lantern/lantern_resource_nexus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,6 @@ def self.assemble(project_id:, location:, name:, target_vm_size:, target_storage
) { _1.id = ubid.to_uuid }
lantern_resource.associate_with_project(project)

lantern_resource.setup_service_account
lantern_resource.allow_access_to_big_query

Prog::Lantern::LanternServerNexus.assemble(
resource_id: lantern_resource.id,
lantern_version: lantern_version,
Expand All @@ -101,10 +98,6 @@ def self.assemble(project_id:, location:, name:, target_vm_size:, target_storage
representative_at: Time.now
)

if parent_id.nil?
lantern_resource.allow_timeline_access_to_bucket
end

lantern_resource.required_standby_count.times do
Prog::Lantern::LanternServerNexus.assemble(resource_id: lantern_resource.id, timeline_id: timeline_id, timeline_access: "fetch")
end
Expand All @@ -122,11 +115,14 @@ def before_run
end

label def start
nap 5 unless representative_server.vm.strand.label == "wait"
register_deadline(:wait, 10 * 60)
# bud self.class, frame, :trigger_pg_current_xact_id_on_parent if lantern_resource.parent
lantern_resource.setup_service_account
lantern_resource.create_logging_table

if lantern_resource.parent_id.nil?
lantern_resource.allow_timeline_access_to_bucket
end

# hop_wait_trigger_pg_current_xact_id_on_parent
register_deadline(:wait, 10 * 60)
hop_wait_servers
end

Expand Down
2 changes: 1 addition & 1 deletion prog/lantern/lantern_server_nexus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def before_run
end

label def start
nap 5 unless vm.strand.label == "wait" && lantern_server.timeline.strand.label != "start"
nap 5 unless vm.strand.label == "wait" && lantern_server.resource.strand.label != "start"

lantern_server.incr_initial_provisioning

Expand Down
88 changes: 88 additions & 0 deletions spec/lib/hosting/gcp_apis_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -366,5 +366,93 @@
expect(api.get_image("test-image")).to be_nil
end
end

describe "#allow_access_to_big_query_table" do
it "allows access" do
stub_request(:post, "https://oauth2.googleapis.com/token").to_return(status: 200, body: JSON.dump({}), headers: {"Content-Type" => "application/json"})
stub_request(:post, "https://bigquery.googleapis.com/bigquery/v2/projects/test-project/datasets/test-dataset/tables/test-table:getIamPolicy")
.to_return(status: 200, body: JSON.dump({"bindings" => [], "etag" => "etag-value"}), headers: {})

stub_request(:post, "https://bigquery.googleapis.com/bigquery/v2/projects/test-project/datasets/test-dataset/tables/test-table:setIamPolicy")
.with(body: JSON.dump({
policy: {
bindings: [
{
role: "roles/bigquery.dataEditor",
members: ["serviceAccount:test-sa-email"]
}
],
etag: "etag-value"
}
}))
.to_return(status: 200, body: JSON.dump({}), headers: {})

api = described_class.new
expect { api.allow_access_to_big_query_table("test-sa-email", "test-dataset", "test-table") }.not_to raise_error
end
end

describe "#remove_big_query_table" do
it "removes the table" do
stub_request(:post, "https://oauth2.googleapis.com/token").to_return(status: 200, body: JSON.dump({}), headers: {"Content-Type" => "application/json"})
stub_request(:delete, "https://bigquery.googleapis.com/bigquery/v2/projects/test-project/datasets/test-dataset/tables/test-table")
.to_return(status: 204, body: "", headers: {})

api = described_class.new
expect { api.remove_big_query_table("test-dataset", "test-table") }.not_to raise_error
end
end

describe "#create_big_query_table" do
it "creates a table successfully" do
stub_request(:post, "https://oauth2.googleapis.com/token").to_return(status: 200, body: JSON.dump({}), headers: {"Content-Type" => "application/json"})
schema = [{name: "log_time", type: "TIMESTAMP", mode: "NULLABLE"}]
stub_request(:post, "https://bigquery.googleapis.com/bigquery/v2/projects/test-project/datasets/test-dataset/tables")
.with(
body: JSON.dump({
tableReference: {
projectId: "test-project",
datasetId: "test-dataset",
tableId: "test-table"
},
schema: {
fields: schema
}
})
)
.to_return(status: 200, body: JSON.dump({}), headers: {})

api = described_class.new
expect { api.create_big_query_table("test-dataset", "test-table", schema) }.not_to raise_error
end
end

describe "#assign_metadata_viewer_role" do
it "assigns the metadata viewer role with a condition successfully" do
project_id = "test-project"
dataset_id = "test-dataset"
service_account_email = "test-sa-email"
stub_request(:post, "https://oauth2.googleapis.com/token").to_return(status: 200, body: JSON.dump({}), headers: {"Content-Type" => "application/json"})
stub_request(:get, "https://bigquery.googleapis.com/bigquery/v2/projects/#{project_id}/datasets/#{dataset_id}")
.to_return(status: 200, body: JSON.dump({"access" => [{role: "test"}], "etag" => "etag-value"}), headers: {"Content-Type" => "application/json"})

stub_request(:patch, "https://bigquery.googleapis.com/bigquery/v2/projects/#{project_id}/datasets/#{dataset_id}")
.with(
body: JSON.dump({
access: [
{role: "test"},
{
role: "roles/bigquery.metadataViewer",
userByEmail: service_account_email
}
]
})
)
.to_return(status: 200, body: JSON.dump({}), headers: {"Content-Type" => "application/json"})

api = described_class.new
expect { api.allow_access_to_big_query_dataset(service_account_email, dataset_id) }.not_to raise_error
end
end
end
end
5 changes: 0 additions & 5 deletions spec/lib/pagination_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,6 @@

RSpec.describe Pagination do
before do
api = instance_double(Hosting::GcpApis)
allow(Hosting::GcpApis).to receive(:new).and_return(api)
allow(api).to receive_messages(create_service_account: {"email" => "test-sa"}, export_service_account_key: "test-key")
allow(api).to receive(:allow_bucket_usage_by_prefix)
allow(api).to receive(:allow_access_to_big_query_table)
allow(LanternServer).to receive(:get_vm_image).and_return(Config.gcp_default_image)
end

Expand Down
47 changes: 47 additions & 0 deletions spec/model/lantern/lantern_resource_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,51 @@
expect { lantern_resource.dissociate_forks }.not_to raise_error
end
end

describe "#setup_service_account" do
it "sets up service account and updates resource" do
api = instance_double(Hosting::GcpApis)
allow(Hosting::GcpApis).to receive(:new).and_return(api)
allow(api).to receive_messages(create_service_account: {"email" => "test-sa"}, export_service_account_key: "test-key")
expect(lantern_resource).to receive(:update).with(gcp_creds_b64: "test-key", service_account_name: "test-sa")
expect { lantern_resource.setup_service_account }.not_to raise_error
end
end

describe "#create_logging_table" do
it "create bigquery table and gives access" do
instance_double(LanternTimeline, ubid: "test")
api = instance_double(Hosting::GcpApis)
expect(lantern_resource).to receive(:big_query_table).and_return("test-table-name").at_least(:once)
expect(lantern_resource).to receive(:service_account_name).and_return("test-sa").at_least(:once)

allow(Hosting::GcpApis).to receive(:new).and_return(api)
allow(api).to receive(:create_big_query_table)
allow(api).to receive(:allow_access_to_big_query_dataset)
allow(api).to receive(:allow_access_to_big_query_table)

expect { lantern_resource.create_logging_table }.not_to raise_error
end
end

describe "#allow_timeline_access_to_bucket" do
it "allows access to bucket by prefix" do
timeline = instance_double(LanternTimeline, ubid: "test")
expect(lantern_resource).to receive(:gcp_creds_b64).and_return("test-creds")
expect(lantern_resource).to receive(:service_account_name).and_return("test-sa")
expect(lantern_resource).to receive(:timeline).and_return(timeline).at_least(:once)
expect(timeline).to receive(:update).with(gcp_creds_b64: "test-creds")

api = instance_double(Hosting::GcpApis)
allow(Hosting::GcpApis).to receive(:new).and_return(api)
allow(api).to receive(:allow_bucket_usage_by_prefix).with("test-sa", Config.lantern_backup_bucket, timeline.ubid)
expect { lantern_resource.allow_timeline_access_to_bucket }.not_to raise_error
end
end

describe "#big_query_table" do
it "returns table name" do
expect(lantern_resource.big_query_table).to eq("#{lantern_resource.name}_logs")
end
end
end
27 changes: 13 additions & 14 deletions spec/prog/lantern/lantern_resource_nexus_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,6 @@
end

describe ".assemble" do
before do
api = instance_double(Hosting::GcpApis)
allow(Hosting::GcpApis).to receive(:new).and_return(api)
allow(api).to receive_messages(create_service_account: {"email" => "test-sa"}, export_service_account_key: "test-key")
allow(api).to receive(:allow_bucket_usage_by_prefix)
allow(api).to receive(:allow_access_to_big_query_table)
end

let(:lantern_project) { Project.create_with_id(name: "default", provider: "gcp").tap { _1.associate_with_project(_1) } }

it "validates input" do
Expand Down Expand Up @@ -116,14 +108,21 @@
end

describe "#start" do
it "naps if vm not ready" do
expect(lantern_resource.representative_server.vm).to receive(:strand).and_return(instance_double(Strand, label: "prep"))
expect { nx.start }.to nap(5)
it "sets up gcp service account and allows bucket usage" do
expect(lantern_resource).to receive(:setup_service_account)
expect(lantern_resource).to receive(:create_logging_table)
expect(lantern_resource).to receive(:parent_id).and_return("test-parent")
expect(lantern_resource).not_to receive(:allow_timeline_access_to_bucket)
expect(nx).to receive(:register_deadline)
expect { nx.start }.to hop("wait_servers")
end

it "registers deadline and hops" do
expect(lantern_resource.representative_server.vm).to receive(:strand).and_return(instance_double(Strand, label: "wait"))
# expect(nx).to receive(:register_deadline)
it "sets up gcp service account" do
expect(lantern_resource).to receive(:setup_service_account)
expect(lantern_resource).to receive(:create_logging_table)
expect(lantern_resource).to receive(:parent_id).and_return(nil)
expect(lantern_resource).to receive(:allow_timeline_access_to_bucket)
expect(nx).to receive(:register_deadline)
expect { nx.start }.to hop("wait_servers")
end

Expand Down
Loading

0 comments on commit 9154a97

Please sign in to comment.