Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataset project Id Feature implemented #242

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions docs/bigquery-cdcTarget.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,17 @@ on the new primary key.
Properties
----------

**Project ID**: Project of the BigQuery dataset. When running on a Dataproc cluster, this can be left blank,
which will use the project of the cluster.
**Project ID**: Google Cloud Project ID, which uniquely identifies a project.
It can be found on the Dashboard in the Google Cloud Platform Console. This is the project
that the BigQuery job will run in. `BigQuery Job User` role on this project must be granted to the specified service
account to run the job. If a temporary bucket needs to be created, the bucket will also be created in this project and
'GCE Storage Bucket Admin' role on this project must be granted to the specified service account to create buckets.

**Dataset Project ID**: Project the dataset belongs to. This is only required if the dataset is not
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add "destination dataset" to be more clear

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

in the same project that the BigQuery job will run in. If no value is given, it will default to the
configured Project ID. `BigQuery Data Editor` role on this project must be granted to the specified service account to
write BigQuery data to this project.


**Location**: The location where the BigQuery dataset and GCS staging bucket will get created. For example, 'us-east1'
for regional bucket, 'us' for multi-regional bucket. A complete list of available locations can be found at
Expand Down
74 changes: 61 additions & 13 deletions src/main/java/io/cdap/delta/bigquery/BigQueryTarget.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package io.cdap.delta.bigquery;

import com.google.auth.Credentials;
import com.google.auth.oauth2.ExternalAccountCredentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.ServiceOptions;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
Expand All @@ -30,6 +32,7 @@
import com.google.cloud.storage.StorageOptions;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
Expand All @@ -54,8 +57,10 @@
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;

Expand All @@ -80,6 +85,9 @@ public class BigQueryTarget implements DeltaTarget {
private static final int RETRY_COUNT = 25;
private final int retryCount;
private final Conf conf;
public static final List<String> BIGQUERY_SCOPES = Arrays.asList("https://www.googleapis.com/auth/drive",
"https://www.googleapis.com/auth/bigquery");


@SuppressWarnings("unused")
public BigQueryTarget(Conf conf) {
Expand All @@ -98,19 +106,18 @@ public void configure(Configurer configurer) {

@Override
public void initialize(DeltaTargetContext context) throws Exception {

Credentials credentials = conf.getCredentials();
String project = conf.getProject();

String project = conf.getDatasetProject();

String cmekKey = context.getRuntimeArguments().get(GCP_CMEK_KEY_NAME) != null ?
context.getRuntimeArguments().get(GCP_CMEK_KEY_NAME) : conf.getEncryptionKeyName();

EncryptionConfiguration encryptionConfig = cmekKey == null ? null :
EncryptionConfiguration.newBuilder().setKmsKeyName(cmekKey).build();

BigQuery bigQuery = BigQueryOptions.newBuilder()
.setCredentials(credentials)
.setProjectId(project)
.build()
.getService();
BigQuery bigQuery = getBigQuery(project, credentials);

RetryPolicy<Object> retryPolicy = createBaseRetryPolicy()
.handleIf(ex -> {
Expand Down Expand Up @@ -147,22 +154,18 @@ public void initialize(DeltaTargetContext context) throws Exception {
@Override
public EventConsumer createConsumer(DeltaTargetContext context) throws IOException {
Credentials credentials = conf.getCredentials();
String project = conf.getProject();
String project = conf.getDatasetProject();

String cmekKey = context.getRuntimeArguments().get(GCP_CMEK_KEY_NAME) != null ?
context.getRuntimeArguments().get(GCP_CMEK_KEY_NAME) : conf.getEncryptionKeyName();

EncryptionConfiguration encryptionConfig = cmekKey == null ? null :
EncryptionConfiguration.newBuilder().setKmsKeyName(cmekKey).build();

BigQuery bigQuery = BigQueryOptions.newBuilder()
.setCredentials(credentials)
.setProjectId(project)
.build()
.getService();
BigQuery bigQuery = getBigQuery(project, credentials);

Storage storage = StorageOptions.newBuilder()
.setCredentials(credentials)
.setProjectId(project)
.build()
.getService();

Expand Down Expand Up @@ -240,17 +243,47 @@ private <T> RetryPolicy<T> createBaseRetryPolicy() {
.withJitter(0.1);
}

public static BigQuery getBigQuery(String project, @Nullable Credentials credentials) {
BigQueryOptions.Builder bigqueryBuilder = BigQueryOptions.newBuilder().setProjectId(project);
if (credentials != null) {
Set<String> scopes = new HashSet<>(BIGQUERY_SCOPES);

if (credentials instanceof ServiceAccountCredentials) {
scopes.addAll(((ServiceAccountCredentials) credentials).getScopes());
} else if (credentials instanceof ExternalAccountCredentials) {
Collection<String> currentScopes = ((ExternalAccountCredentials) credentials).getScopes();
if (currentScopes != null) {
scopes.addAll(currentScopes);
}
}

if (credentials instanceof GoogleCredentials) {
credentials = ((GoogleCredentials) credentials).createScoped(scopes);
}
bigqueryBuilder.setCredentials(credentials);
}
return bigqueryBuilder.build().getService();
}

/**
* Config for BigQuery target.
*/
@SuppressWarnings("unused")
public static class Conf extends PluginConfig {

public static final String AUTO_DETECT = "auto-detect";
@Nullable
@Description("Project of the BigQuery dataset. When running on a Google Cloud VM, this can be set to "
+ "'auto-detect', which will use the project of the VM.")
private String project;

@Macro
@Nullable
@Description("The project the dataset belongs to. This is only required if the dataset is not " +
"in the same project that the BigQuery job will run in. If no value is given, it will" +
" default to the configured project ID.")
private String datasetProject;

@Macro
@Nullable
@Description("Service account key to use when interacting with GCS and BigQuery. The service account "
Expand Down Expand Up @@ -352,5 +385,20 @@ private Credentials getCredentials() throws IOException {
.createScoped(Collections.singleton("https://www.googleapis.com/auth/cloud-platform"));
}
}
public String getDatasetProject() {
// if it is set to 'auto-detect,' the default project ID will be automatically detected
// otherwise if the user provides an ID, it will be used.
// or else IllegalArgument exception will be thrown
if (AUTO_DETECT.equalsIgnoreCase(datasetProject)) {
String defaultProject = ServiceOptions.getDefaultProjectId();
if (defaultProject == null) {
throw new IllegalArgumentException(
"Could not detect Google Cloud project id from the environment. Please specify a dataset project id.");
}
return defaultProject;
}
// if it's null or empty that means it should be same as project
return Strings.isNullOrEmpty(datasetProject) ? getProject() : datasetProject;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public void setUp() throws Exception {

PowerMockito.when(conf, "getProject").thenReturn(PROJECT);
PowerMockito.when(conf, "getCredentials").thenReturn(credentials);
PowerMockito.when(conf, "getDatasetProject").thenReturn(PROJECT);

Mockito.when(bucketInfoBuilder.build()).thenReturn(bucketInfo);

Expand Down
8 changes: 8 additions & 0 deletions widgets/bigquery-cdcTarget.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@
"widget-attributes": {
"default": "auto-detect"
}
},
{
"name": "datasetProject",
"label": "DataSet Project ID",
"widget-type": "textbox",
"widget-attributes": {
"placeholder": "Project the dataset belongs to, if different from the project ID."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar : Please add "destination dataset" to be more clear

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

}
}
]
},
Expand Down