Skip to content

Commit

Permalink
Dataset project Id Feature implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
shivamVarCS committed Sep 29, 2023
1 parent 5e98483 commit 15dd6f6
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 15 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

<groupId>io.cdap.delta</groupId>
<artifactId>bigquery-delta-plugins</artifactId>
<version>0.9.0-SNAPSHOT</version>
<version>0.9.1-SNAPSHOT</version>
<name>BigQuery Delta plugins</name>
<packaging>jar</packaging>
<description>BigQuery Delta plugins</description>
Expand Down
86 changes: 73 additions & 13 deletions src/main/java/io/cdap/delta/bigquery/BigQueryTarget.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package io.cdap.delta.bigquery;

import com.google.auth.Credentials;
import com.google.auth.oauth2.ExternalAccountCredentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.ServiceOptions;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
Expand All @@ -30,6 +32,7 @@
import com.google.cloud.storage.StorageOptions;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
Expand All @@ -54,8 +57,10 @@
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;

Expand All @@ -80,6 +85,9 @@ public class BigQueryTarget implements DeltaTarget {
private static final int RETRY_COUNT = 25;
private final int retryCount;
private final Conf conf;
public static final List<String> BIGQUERY_SCOPES = Arrays.asList("https://www.googleapis.com/auth/drive",
"https://www.googleapis.com/auth/bigquery");


@SuppressWarnings("unused")
public BigQueryTarget(Conf conf) {
Expand All @@ -98,19 +106,23 @@ public void configure(Configurer configurer) {

@Override
public void initialize(DeltaTargetContext context) throws Exception {

Credentials credentials = conf.getCredentials();
String project = conf.getProject();

if (conf.getProject() == null) {
return;
}
String project = conf.getDatasetProject();
if (project == null) {
return;
}
String cmekKey = context.getRuntimeArguments().get(GCP_CMEK_KEY_NAME) != null ?
context.getRuntimeArguments().get(GCP_CMEK_KEY_NAME) : conf.getEncryptionKeyName();

EncryptionConfiguration encryptionConfig = cmekKey == null ? null :
EncryptionConfiguration.newBuilder().setKmsKeyName(cmekKey).build();

BigQuery bigQuery = BigQueryOptions.newBuilder()
.setCredentials(credentials)
.setProjectId(project)
.build()
.getService();
BigQuery bigQuery = getBigQuery(project, credentials);

RetryPolicy<Object> retryPolicy = createBaseRetryPolicy()
.handleIf(ex -> {
Expand Down Expand Up @@ -147,22 +159,23 @@ public void initialize(DeltaTargetContext context) throws Exception {
@Override
public EventConsumer createConsumer(DeltaTargetContext context) throws IOException {
Credentials credentials = conf.getCredentials();
String project = conf.getProject();
if (conf.getProject() == null) {
throw new RuntimeException("Project id is not Present");
}
String project = conf.getDatasetProject();
if (project == null) {
throw new RuntimeException("Project Dataset id is not Present");
}
String cmekKey = context.getRuntimeArguments().get(GCP_CMEK_KEY_NAME) != null ?
context.getRuntimeArguments().get(GCP_CMEK_KEY_NAME) : conf.getEncryptionKeyName();

EncryptionConfiguration encryptionConfig = cmekKey == null ? null :
EncryptionConfiguration.newBuilder().setKmsKeyName(cmekKey).build();

BigQuery bigQuery = BigQueryOptions.newBuilder()
.setCredentials(credentials)
.setProjectId(project)
.build()
.getService();
BigQuery bigQuery = getBigQuery(project, credentials);

Storage storage = StorageOptions.newBuilder()
.setCredentials(credentials)
.setProjectId(project)
.build()
.getService();

Expand Down Expand Up @@ -240,17 +253,47 @@ private <T> RetryPolicy<T> createBaseRetryPolicy() {
.withJitter(0.1);
}

public static BigQuery getBigQuery(String project, @Nullable Credentials credentials) {
BigQueryOptions.Builder bigqueryBuilder = BigQueryOptions.newBuilder().setProjectId(project);
if (credentials != null) {
Set<String> scopes = new HashSet<>(BIGQUERY_SCOPES);

if (credentials instanceof ServiceAccountCredentials) {
scopes.addAll(((ServiceAccountCredentials) credentials).getScopes());
} else if (credentials instanceof ExternalAccountCredentials) {
Collection<String> currentScopes = ((ExternalAccountCredentials) credentials).getScopes();
if (currentScopes != null) {
scopes.addAll(currentScopes);
}
}

if (credentials instanceof GoogleCredentials) {
credentials = ((GoogleCredentials) credentials).createScoped(scopes);
}
bigqueryBuilder.setCredentials(credentials);
}
return bigqueryBuilder.build().getService();
}

/**
* Config for BigQuery target.
*/
@SuppressWarnings("unused")
public static class Conf extends PluginConfig {

public static final String AUTO_DETECT = "auto-detect";
@Nullable
@Description("Project of the BigQuery dataset. When running on a Google Cloud VM, this can be set to "
+ "'auto-detect', which will use the project of the VM.")
private String project;

@Macro
@Nullable
@Description("The project the dataset belongs to. This is only required if the dataset is not " +
"in the same project that the BigQuery job will run in. If no value is given, it will" +
" default to the configured project ID.")
private String datasetProject;

@Macro
@Nullable
@Description("Service account key to use when interacting with GCS and BigQuery. The service account "
Expand Down Expand Up @@ -352,5 +395,22 @@ private Credentials getCredentials() throws IOException {
.createScoped(Collections.singleton("https://www.googleapis.com/auth/cloud-platform"));
}
}
public String getDatasetProject() {
// if it's "auto-detect" that means we need to detect the default project settings
// for sandbox you can use `gcloud config set project my-project-id" to set it
// or you start the sandbox by specify the java argument `-DGOOGLE_CLOUD_PROJECT=my-project-id`
// or you start the sandbox by specify the java argument `-DGCLOUD_PROJECT=my-project-id`
// otherwise we will throw IllegalArgument exception here same as project
if (AUTO_DETECT.equalsIgnoreCase(datasetProject)) {
String defaultProject = ServiceOptions.getDefaultProjectId();
if (defaultProject == null) {
throw new IllegalArgumentException(
"Could not detect Google Cloud project id from the environment. Please specify a dataset project id.");
}
return defaultProject;
}
// if it's null or empty that means it should be same as project
return Strings.isNullOrEmpty(datasetProject) ? getProject() : datasetProject;
}
}
}
3 changes: 2 additions & 1 deletion src/test/java/io/cdap/delta/bigquery/BigQueryTargetTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public void setUp() throws Exception {

PowerMockito.when(conf, "getProject").thenReturn(PROJECT);
PowerMockito.when(conf, "getCredentials").thenReturn(credentials);
PowerMockito.when(conf, "getDatasetProject").thenReturn(PROJECT);

Mockito.when(bucketInfoBuilder.build()).thenReturn(bucketInfo);

Expand Down Expand Up @@ -267,7 +268,7 @@ public void testCreateConsumerRetryableFailureForGcsGet() throws Exception {
}

@Test
public void testCreateConsumerNonRetryableFailure() throws Exception {
public void testCreateConsumerNonRetryableFailure() throws Exception {
Throwable exception = new StorageException(501, null);
Mockito.when(storage.get(Mockito.anyString())).thenThrow(exception);
try {
Expand Down
8 changes: 8 additions & 0 deletions widgets/bigquery-cdcTarget.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@
"widget-attributes": {
"default": "auto-detect"
}
},
{
"name": "datasetProject",
"label": "DataSet Project ID",
"widget-type": "textbox",
"widget-attributes": {
"placeholder": "Project the dataset belongs to, if different from the project ID."
}
}
]
},
Expand Down

0 comments on commit 15dd6f6

Please sign in to comment.