Skip to content

Commit

Permalink
Dataset project Id Feature implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
shivamVarCS committed Oct 6, 2023
1 parent 5e98483 commit 0fb8884
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 15 deletions.
13 changes: 11 additions & 2 deletions docs/bigquery-cdcTarget.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,17 @@ on the new primary key.
Properties
----------

**Project ID**: Project of the BigQuery dataset. When running on a Dataproc cluster, this can be left blank,
which will use the project of the cluster.
**Project ID**: Google Cloud Project ID, which uniquely identifies a project.
It can be found on the Dashboard in the Google Cloud Platform Console. This is the project
that the BigQuery job will run in. `BigQuery Job User` role on this project must be granted to the specified service
account to run the job. If a temporary bucket needs to be created, the bucket will also be created in this project and
'GCE Storage Bucket Admin' role on this project must be granted to the specified service account to create buckets.

**Dataset Project ID**: Project the dataset belongs to. This is only required if the dataset is not
in the same project that the BigQuery job will run in. If no value is given, it will default to the
configured Project ID. `BigQuery Data Editor` role on this project must be granted to the specified service account to
write BigQuery data to this project.


**Location**: The location where the BigQuery dataset and GCS staging bucket will get created. For example, 'us-east1'
for regional bucket, 'us' for multi-regional bucket. A complete list of available locations can be found at
Expand Down
74 changes: 61 additions & 13 deletions src/main/java/io/cdap/delta/bigquery/BigQueryTarget.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package io.cdap.delta.bigquery;

import com.google.auth.Credentials;
import com.google.auth.oauth2.ExternalAccountCredentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.ServiceOptions;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
Expand All @@ -30,6 +32,7 @@
import com.google.cloud.storage.StorageOptions;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
Expand All @@ -54,8 +57,10 @@
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;

Expand All @@ -80,6 +85,9 @@ public class BigQueryTarget implements DeltaTarget {
private static final int RETRY_COUNT = 25;
private final int retryCount;
private final Conf conf;
public static final List<String> BIGQUERY_SCOPES = Arrays.asList("https://www.googleapis.com/auth/drive",
"https://www.googleapis.com/auth/bigquery");


@SuppressWarnings("unused")
public BigQueryTarget(Conf conf) {
Expand All @@ -98,19 +106,18 @@ public void configure(Configurer configurer) {

@Override
public void initialize(DeltaTargetContext context) throws Exception {

Credentials credentials = conf.getCredentials();
String project = conf.getProject();

String project = conf.getDatasetProject();

String cmekKey = context.getRuntimeArguments().get(GCP_CMEK_KEY_NAME) != null ?
context.getRuntimeArguments().get(GCP_CMEK_KEY_NAME) : conf.getEncryptionKeyName();

EncryptionConfiguration encryptionConfig = cmekKey == null ? null :
EncryptionConfiguration.newBuilder().setKmsKeyName(cmekKey).build();

BigQuery bigQuery = BigQueryOptions.newBuilder()
.setCredentials(credentials)
.setProjectId(project)
.build()
.getService();
BigQuery bigQuery = getBigQuery(project, credentials);

RetryPolicy<Object> retryPolicy = createBaseRetryPolicy()
.handleIf(ex -> {
Expand Down Expand Up @@ -147,22 +154,18 @@ public void initialize(DeltaTargetContext context) throws Exception {
@Override
public EventConsumer createConsumer(DeltaTargetContext context) throws IOException {
Credentials credentials = conf.getCredentials();
String project = conf.getProject();
String project = conf.getDatasetProject();

String cmekKey = context.getRuntimeArguments().get(GCP_CMEK_KEY_NAME) != null ?
context.getRuntimeArguments().get(GCP_CMEK_KEY_NAME) : conf.getEncryptionKeyName();

EncryptionConfiguration encryptionConfig = cmekKey == null ? null :
EncryptionConfiguration.newBuilder().setKmsKeyName(cmekKey).build();

BigQuery bigQuery = BigQueryOptions.newBuilder()
.setCredentials(credentials)
.setProjectId(project)
.build()
.getService();
BigQuery bigQuery = getBigQuery(project, credentials);

Storage storage = StorageOptions.newBuilder()
.setCredentials(credentials)
.setProjectId(project)
.build()
.getService();

Expand Down Expand Up @@ -240,17 +243,47 @@ private <T> RetryPolicy<T> createBaseRetryPolicy() {
.withJitter(0.1);
}

public static BigQuery getBigQuery(String project, @Nullable Credentials credentials) {
BigQueryOptions.Builder bigqueryBuilder = BigQueryOptions.newBuilder().setProjectId(project);
if (credentials != null) {
Set<String> scopes = new HashSet<>(BIGQUERY_SCOPES);

if (credentials instanceof ServiceAccountCredentials) {
scopes.addAll(((ServiceAccountCredentials) credentials).getScopes());
} else if (credentials instanceof ExternalAccountCredentials) {
Collection<String> currentScopes = ((ExternalAccountCredentials) credentials).getScopes();
if (currentScopes != null) {
scopes.addAll(currentScopes);
}
}

if (credentials instanceof GoogleCredentials) {
credentials = ((GoogleCredentials) credentials).createScoped(scopes);
}
bigqueryBuilder.setCredentials(credentials);
}
return bigqueryBuilder.build().getService();
}

/**
* Config for BigQuery target.
*/
@SuppressWarnings("unused")
public static class Conf extends PluginConfig {

public static final String AUTO_DETECT = "auto-detect";
@Nullable
@Description("Project of the BigQuery dataset. When running on a Google Cloud VM, this can be set to "
+ "'auto-detect', which will use the project of the VM.")
private String project;

@Macro
@Nullable
@Description("The project the dataset belongs to. This is only required if the dataset is not " +
"in the same project that the BigQuery job will run in. If no value is given, it will" +
" default to the configured project ID.")
private String datasetProject;

@Macro
@Nullable
@Description("Service account key to use when interacting with GCS and BigQuery. The service account "
Expand Down Expand Up @@ -352,5 +385,20 @@ private Credentials getCredentials() throws IOException {
.createScoped(Collections.singleton("https://www.googleapis.com/auth/cloud-platform"));
}
}
public String getDatasetProject() {
// if it is set to 'auto-detect,' the default project ID will be automatically detected
// otherwise if the user provides an ID, it will be used.
// or else IllegalArgument exception will be thrown
if (AUTO_DETECT.equalsIgnoreCase(datasetProject)) {
String defaultProject = ServiceOptions.getDefaultProjectId();
if (defaultProject == null) {
throw new IllegalArgumentException(
"Could not detect Google Cloud project id from the environment. Please specify a dataset project id.");
}
return defaultProject;
}
// if it's null or empty that means it should be same as project
return Strings.isNullOrEmpty(datasetProject) ? getProject() : datasetProject;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public void setUp() throws Exception {

PowerMockito.when(conf, "getProject").thenReturn(PROJECT);
PowerMockito.when(conf, "getCredentials").thenReturn(credentials);
PowerMockito.when(conf, "getDatasetProject").thenReturn(PROJECT);

Mockito.when(bucketInfoBuilder.build()).thenReturn(bucketInfo);

Expand Down
8 changes: 8 additions & 0 deletions widgets/bigquery-cdcTarget.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@
"widget-attributes": {
"default": "auto-detect"
}
},
{
"name": "datasetProject",
"label": "DataSet Project ID",
"widget-type": "textbox",
"widget-attributes": {
"placeholder": "Project the dataset belongs to, if different from the project ID."
}
}
]
},
Expand Down

0 comments on commit 0fb8884

Please sign in to comment.