From 15dd6f60608b9cc791b387f92a75b3f601220204 Mon Sep 17 00:00:00 2001 From: Shivam Varshney Date: Fri, 29 Sep 2023 19:47:07 +0530 Subject: [PATCH] Dataset project Id Feature implemented --- pom.xml | 2 +- .../cdap/delta/bigquery/BigQueryTarget.java | 86 ++++++++++++++++--- .../delta/bigquery/BigQueryTargetTest.java | 3 +- widgets/bigquery-cdcTarget.json | 8 ++ 4 files changed, 84 insertions(+), 15 deletions(-) diff --git a/pom.xml b/pom.xml index d702e44..0fb0f26 100644 --- a/pom.xml +++ b/pom.xml @@ -22,7 +22,7 @@ io.cdap.delta bigquery-delta-plugins - 0.9.0-SNAPSHOT + 0.9.1-SNAPSHOT BigQuery Delta plugins jar BigQuery Delta plugins diff --git a/src/main/java/io/cdap/delta/bigquery/BigQueryTarget.java b/src/main/java/io/cdap/delta/bigquery/BigQueryTarget.java index 7723884..5be74e2 100644 --- a/src/main/java/io/cdap/delta/bigquery/BigQueryTarget.java +++ b/src/main/java/io/cdap/delta/bigquery/BigQueryTarget.java @@ -17,7 +17,9 @@ package io.cdap.delta.bigquery; import com.google.auth.Credentials; +import com.google.auth.oauth2.ExternalAccountCredentials; import com.google.auth.oauth2.GoogleCredentials; +import com.google.auth.oauth2.ServiceAccountCredentials; import com.google.cloud.ServiceOptions; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryException; @@ -30,6 +32,7 @@ import com.google.cloud.storage.StorageOptions; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; +import com.google.common.base.Strings; import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Macro; import io.cdap.cdap.api.annotation.Name; @@ -54,8 +57,10 @@ import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Set; import javax.annotation.Nullable; @@ -80,6 +85,9 @@ public class BigQueryTarget implements DeltaTarget { private static final int RETRY_COUNT = 25; private final int retryCount; private final Conf conf; + public static final List BIGQUERY_SCOPES = Arrays.asList("https://www.googleapis.com/auth/drive", + "https://www.googleapis.com/auth/bigquery"); + @SuppressWarnings("unused") public BigQueryTarget(Conf conf) { @@ -98,19 +106,23 @@ public void configure(Configurer configurer) { @Override public void initialize(DeltaTargetContext context) throws Exception { + Credentials credentials = conf.getCredentials(); - String project = conf.getProject(); + + if (conf.getProject() == null) { + return; + } + String project = conf.getDatasetProject(); + if (project == null) { + return; + } String cmekKey = context.getRuntimeArguments().get(GCP_CMEK_KEY_NAME) != null ? context.getRuntimeArguments().get(GCP_CMEK_KEY_NAME) : conf.getEncryptionKeyName(); EncryptionConfiguration encryptionConfig = cmekKey == null ? null : EncryptionConfiguration.newBuilder().setKmsKeyName(cmekKey).build(); - BigQuery bigQuery = BigQueryOptions.newBuilder() - .setCredentials(credentials) - .setProjectId(project) - .build() - .getService(); + BigQuery bigQuery = getBigQuery(project, credentials); RetryPolicy retryPolicy = createBaseRetryPolicy() .handleIf(ex -> { @@ -147,22 +159,23 @@ public void initialize(DeltaTargetContext context) throws Exception { @Override public EventConsumer createConsumer(DeltaTargetContext context) throws IOException { Credentials credentials = conf.getCredentials(); - String project = conf.getProject(); + if (conf.getProject() == null) { + throw new RuntimeException("Project id is not Present"); + } + String project = conf.getDatasetProject(); + if (project == null) { + throw new RuntimeException("Project Dataset id is not Present"); + } String cmekKey = context.getRuntimeArguments().get(GCP_CMEK_KEY_NAME) != null ? context.getRuntimeArguments().get(GCP_CMEK_KEY_NAME) : conf.getEncryptionKeyName(); EncryptionConfiguration encryptionConfig = cmekKey == null ? null : EncryptionConfiguration.newBuilder().setKmsKeyName(cmekKey).build(); - BigQuery bigQuery = BigQueryOptions.newBuilder() - .setCredentials(credentials) - .setProjectId(project) - .build() - .getService(); + BigQuery bigQuery = getBigQuery(project, credentials); Storage storage = StorageOptions.newBuilder() .setCredentials(credentials) - .setProjectId(project) .build() .getService(); @@ -240,17 +253,47 @@ private RetryPolicy createBaseRetryPolicy() { .withJitter(0.1); } + public static BigQuery getBigQuery(String project, @Nullable Credentials credentials) { + BigQueryOptions.Builder bigqueryBuilder = BigQueryOptions.newBuilder().setProjectId(project); + if (credentials != null) { + Set scopes = new HashSet<>(BIGQUERY_SCOPES); + + if (credentials instanceof ServiceAccountCredentials) { + scopes.addAll(((ServiceAccountCredentials) credentials).getScopes()); + } else if (credentials instanceof ExternalAccountCredentials) { + Collection currentScopes = ((ExternalAccountCredentials) credentials).getScopes(); + if (currentScopes != null) { + scopes.addAll(currentScopes); + } + } + + if (credentials instanceof GoogleCredentials) { + credentials = ((GoogleCredentials) credentials).createScoped(scopes); + } + bigqueryBuilder.setCredentials(credentials); + } + return bigqueryBuilder.build().getService(); + } + /** * Config for BigQuery target. */ @SuppressWarnings("unused") public static class Conf extends PluginConfig { + public static final String AUTO_DETECT = "auto-detect"; @Nullable @Description("Project of the BigQuery dataset. When running on a Google Cloud VM, this can be set to " + "'auto-detect', which will use the project of the VM.") private String project; + @Macro + @Nullable + @Description("The project the dataset belongs to. This is only required if the dataset is not " + + "in the same project that the BigQuery job will run in. If no value is given, it will" + + " default to the configured project ID.") + private String datasetProject; + @Macro @Nullable @Description("Service account key to use when interacting with GCS and BigQuery. The service account " @@ -352,5 +395,22 @@ private Credentials getCredentials() throws IOException { .createScoped(Collections.singleton("https://www.googleapis.com/auth/cloud-platform")); } } + public String getDatasetProject() { + // if it's "auto-detect" that means we need to detect the default project settings + // for sandbox you can use `gcloud config set project my-project-id" to set it + // or you start the sandbox by specify the java argument `-DGOOGLE_CLOUD_PROJECT=my-project-id` + // or you start the sandbox by specify the java argument `-DGCLOUD_PROJECT=my-project-id` + // otherwise we will throw IllegalArgument exception here same as project + if (AUTO_DETECT.equalsIgnoreCase(datasetProject)) { + String defaultProject = ServiceOptions.getDefaultProjectId(); + if (defaultProject == null) { + throw new IllegalArgumentException( + "Could not detect Google Cloud project id from the environment. Please specify a dataset project id."); + } + return defaultProject; + } + // if it's null or empty that means it should be same as project + return Strings.isNullOrEmpty(datasetProject) ? getProject() : datasetProject; + } } } diff --git a/src/test/java/io/cdap/delta/bigquery/BigQueryTargetTest.java b/src/test/java/io/cdap/delta/bigquery/BigQueryTargetTest.java index df8a3e9..2b51f79 100644 --- a/src/test/java/io/cdap/delta/bigquery/BigQueryTargetTest.java +++ b/src/test/java/io/cdap/delta/bigquery/BigQueryTargetTest.java @@ -125,6 +125,7 @@ public void setUp() throws Exception { PowerMockito.when(conf, "getProject").thenReturn(PROJECT); PowerMockito.when(conf, "getCredentials").thenReturn(credentials); + PowerMockito.when(conf, "getDatasetProject").thenReturn(PROJECT); Mockito.when(bucketInfoBuilder.build()).thenReturn(bucketInfo); @@ -267,7 +268,7 @@ public void testCreateConsumerRetryableFailureForGcsGet() throws Exception { } @Test - public void testCreateConsumerNonRetryableFailure() throws Exception { + public void testCreateConsumerNonRetryableFailure() throws Exception { Throwable exception = new StorageException(501, null); Mockito.when(storage.get(Mockito.anyString())).thenThrow(exception); try { diff --git a/widgets/bigquery-cdcTarget.json b/widgets/bigquery-cdcTarget.json index 4e18141..3b81187 100644 --- a/widgets/bigquery-cdcTarget.json +++ b/widgets/bigquery-cdcTarget.json @@ -14,6 +14,14 @@ "widget-attributes": { "default": "auto-detect" } + }, + { + "name": "datasetProject", + "label": "DataSet Project ID", + "widget-type": "textbox", + "widget-attributes": { + "placeholder": "Project the dataset belongs to, if different from the project ID." + } } ] },