Skip to content

Commit

Permalink
Merge pull request #245 from data-integrations/cherrypick-dataset-pro…
Browse files Browse the repository at this point in the history
…ject-id-0.8

DataSet Project Id feature cherry Pick 0.8
  • Loading branch information
shivamVarCS authored Nov 10, 2023
2 parents bb492ff + daf4e83 commit d50a59e
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 24 deletions.
13 changes: 11 additions & 2 deletions docs/bigquery-cdcTarget.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,17 @@ on the new primary key.
Properties
----------

**Project ID**: Project of the BigQuery dataset. When running on a Dataproc cluster, this can be left blank,
which will use the project of the cluster.
**Project ID**: Google Cloud Project ID, which uniquely identifies a project.
It can be found on the Dashboard in the Google Cloud Platform Console. This is the project
that the BigQuery job will run in. `BigQuery Job User` role on this project must be granted to the specified service
account to run the job. If a temporary bucket needs to be created, the bucket will also be created in this project and
'GCE Storage Bucket Admin' role on this project must be granted to the specified service account to create buckets.

**Dataset Project ID**: Project the destination dataset belongs to. This is only required if the dataset is not
in the same project that the BigQuery job will run in. If no value is given, it will default to the
configured Project ID. `BigQuery Data Editor` role on this project must be granted to the specified service account to
write BigQuery data to this project.


**Location**: The location where the BigQuery dataset and GCS staging bucket will get created. For example, 'us-east1'
for regional bucket, 'us' for multi-regional bucket. A complete list of available locations can be found at
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

<groupId>io.cdap.delta</groupId>
<artifactId>bigquery-delta-plugins</artifactId>
<version>0.8.3</version>
<version>0.8.4</version>
<name>BigQuery Delta plugins</name>
<packaging>jar</packaging>
<description>BigQuery Delta plugins</description>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1195,11 +1195,13 @@ static String createDiffQuery(TableId stagingTable, List<String> primaryKeys, lo
joinCondition += getOrderingCondition(sortKeys, "A", "B");
}
return "SELECT A.* FROM\n" +
"(SELECT * FROM " + BigQueryUtils.wrapInBackTick(stagingTable.getDataset(), stagingTable.getTable()) +
"(SELECT * FROM " +
BigQueryUtils.wrapInBackTick(stagingTable.getProject(), stagingTable.getDataset(), stagingTable.getTable()) +
" WHERE _batch_id = " + batchId +
" AND _sequence_num > " + latestSequenceNumInTargetTable + ") as A\n" +
"LEFT OUTER JOIN\n" +
"(SELECT * FROM " + BigQueryUtils.wrapInBackTick(stagingTable.getDataset(), stagingTable.getTable()) +
"(SELECT * FROM " +
BigQueryUtils.wrapInBackTick(stagingTable.getProject(), stagingTable.getDataset(), stagingTable.getTable()) +
" WHERE _batch_id = " + batchId +
" AND _sequence_num > " + latestSequenceNumInTargetTable + ") as B\n" +
"ON " + joinCondition +
Expand Down Expand Up @@ -1325,7 +1327,8 @@ static String createMergeQuery(TableId targetTableId, List<String> primaryKeys,
}

String mergeQuery = "MERGE " +
BigQueryUtils.wrapInBackTick(targetTableId.getDataset(), targetTableId.getTable()) + " as T\n" +
BigQueryUtils.wrapInBackTick(targetTableId.getProject(), targetTableId.getDataset(), targetTableId.getTable()) +
" as T\n" +
"USING (" + diffQuery + ") as D\n" +
"ON " + mergeCondition + "\n" +
"WHEN MATCHED AND D._op = \"DELETE\" " + updateAndDeleteCondition + "THEN\n" +
Expand Down
75 changes: 62 additions & 13 deletions src/main/java/io/cdap/delta/bigquery/BigQueryTarget.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package io.cdap.delta.bigquery;

import com.google.auth.Credentials;
import com.google.auth.oauth2.ExternalAccountCredentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.ServiceOptions;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
Expand All @@ -30,6 +32,7 @@
import com.google.cloud.storage.StorageOptions;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
Expand All @@ -54,8 +57,10 @@
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;

Expand All @@ -80,6 +85,9 @@ public class BigQueryTarget implements DeltaTarget {
private static final int RETRY_COUNT = 25;
private final int retryCount;
private final Conf conf;
public static final List<String> BIGQUERY_SCOPES = Arrays.asList("https://www.googleapis.com/auth/drive",
"https://www.googleapis.com/auth/bigquery");


@SuppressWarnings("unused")
public BigQueryTarget(Conf conf) {
Expand All @@ -98,19 +106,18 @@ public void configure(Configurer configurer) {

@Override
public void initialize(DeltaTargetContext context) throws Exception {

Credentials credentials = conf.getCredentials();
String project = conf.getProject();

String project = conf.getDatasetProject();

String cmekKey = context.getRuntimeArguments().get(GCP_CMEK_KEY_NAME) != null ?
context.getRuntimeArguments().get(GCP_CMEK_KEY_NAME) : conf.getEncryptionKeyName();

EncryptionConfiguration encryptionConfig = cmekKey == null ? null :
EncryptionConfiguration.newBuilder().setKmsKeyName(cmekKey).build();

BigQuery bigQuery = BigQueryOptions.newBuilder()
.setCredentials(credentials)
.setProjectId(project)
.build()
.getService();
BigQuery bigQuery = getBigQuery(project, credentials);

RetryPolicy<Object> retryPolicy = createBaseRetryPolicy()
.handleIf(ex -> {
Expand Down Expand Up @@ -148,21 +155,18 @@ public void initialize(DeltaTargetContext context) throws Exception {
public EventConsumer createConsumer(DeltaTargetContext context) throws IOException {
Credentials credentials = conf.getCredentials();
String project = conf.getProject();
String datasetProject = conf.getDatasetProject();

String cmekKey = context.getRuntimeArguments().get(GCP_CMEK_KEY_NAME) != null ?
context.getRuntimeArguments().get(GCP_CMEK_KEY_NAME) : conf.getEncryptionKeyName();

EncryptionConfiguration encryptionConfig = cmekKey == null ? null :
EncryptionConfiguration.newBuilder().setKmsKeyName(cmekKey).build();

BigQuery bigQuery = BigQueryOptions.newBuilder()
.setCredentials(credentials)
.setProjectId(project)
.build()
.getService();
BigQuery bigQuery = getBigQuery(project, credentials);

Storage storage = StorageOptions.newBuilder()
.setCredentials(credentials)
.setProjectId(project)
.build()
.getService();

Expand Down Expand Up @@ -204,7 +208,7 @@ public EventConsumer createConsumer(DeltaTargetContext context) throws IOExcepti
"Please make sure the service account has permission to create buckets, " +
"or create the bucket before starting the program.", stagingBucketName, project), e);
}
return new BigQueryEventConsumer(context, storage, bigQuery, bucket, project,
return new BigQueryEventConsumer(context, storage, bigQuery, bucket, datasetProject,
conf.getLoadIntervalSeconds(), conf.getStagingTablePrefix(),
conf.requiresManualDrops(), encryptionConfig, null, conf.getDatasetName(),
conf.softDeletesEnabled());
Expand Down Expand Up @@ -240,17 +244,47 @@ private <T> RetryPolicy<T> createBaseRetryPolicy() {
.withJitter(0.1);
}

public static BigQuery getBigQuery(String project, @Nullable Credentials credentials) {
BigQueryOptions.Builder bigqueryBuilder = BigQueryOptions.newBuilder().setProjectId(project);
if (credentials != null) {
Set<String> scopes = new HashSet<>(BIGQUERY_SCOPES);

if (credentials instanceof ServiceAccountCredentials) {
scopes.addAll(((ServiceAccountCredentials) credentials).getScopes());
} else if (credentials instanceof ExternalAccountCredentials) {
Collection<String> currentScopes = ((ExternalAccountCredentials) credentials).getScopes();
if (currentScopes != null) {
scopes.addAll(currentScopes);
}
}

if (credentials instanceof GoogleCredentials) {
credentials = ((GoogleCredentials) credentials).createScoped(scopes);
}
bigqueryBuilder.setCredentials(credentials);
}
return bigqueryBuilder.build().getService();
}

/**
* Config for BigQuery target.
*/
@SuppressWarnings("unused")
public static class Conf extends PluginConfig {

public static final String AUTO_DETECT = "auto-detect";
@Nullable
@Description("Project of the BigQuery dataset. When running on a Google Cloud VM, this can be set to "
+ "'auto-detect', which will use the project of the VM.")
private String project;

@Macro
@Nullable
@Description("The project the dataset belongs to. This is only required if the dataset is not " +
"in the same project that the BigQuery job will run in. If no value is given, it will" +
" default to the configured project ID.")
private String datasetProject;

@Macro
@Nullable
@Description("Service account key to use when interacting with GCS and BigQuery. The service account "
Expand Down Expand Up @@ -352,5 +386,20 @@ private Credentials getCredentials() throws IOException {
.createScoped(Collections.singleton("https://www.googleapis.com/auth/cloud-platform"));
}
}
public String getDatasetProject() {
// if it is set to 'auto-detect,' the default project ID will be automatically detected
// otherwise if the user provides an ID, it will be used.
// or else IllegalArgument exception will be thrown
if (AUTO_DETECT.equalsIgnoreCase(datasetProject)) {
String defaultProject = ServiceOptions.getDefaultProjectId();
if (defaultProject == null) {
throw new IllegalArgumentException(
"Could not detect Google Cloud project id from the environment. Please specify a dataset project id.");
}
return defaultProject;
}
// if it's null or empty that means it should be same as project
return Strings.isNullOrEmpty(datasetProject) ? getProject() : datasetProject;
}
}
}
9 changes: 7 additions & 2 deletions src/main/java/io/cdap/delta/bigquery/BigQueryUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ static long getMaximumExistingSequenceNumberPerBatch(Set<SourceTable> allTables,
normalizeTableName(table.getTable()));
if (existingTableIDs.contains(tableId)) {
maxSequenceNumQueryPerTable.add(String.format("SELECT MAX(_sequence_num) as max_sequence_num FROM %s",
wrapInBackTick(tableId.getDataset(), tableId.getTable())));
wrapInBackTick(tableId.getProject(), tableId.getDataset(), tableId.getTable())));
}
}

Expand All @@ -148,7 +148,7 @@ static long getMaximumSequenceNumberForTable(BigQuery bigQuery, TableId tableId,
}

String query = String.format("SELECT MAX(_sequence_num) FROM %s",
wrapInBackTick(tableId.getDataset(), tableId.getTable()));
wrapInBackTick(tableId.getProject(), tableId.getDataset(), tableId.getTable()));
return executeAggregateQuery(bigQuery, query, encryptionConfig);
}

Expand Down Expand Up @@ -318,6 +318,11 @@ static String wrapInBackTick(String datasetName, String tableName) {
return BACKTICK + datasetName + "." + tableName + BACKTICK;
}

static String wrapInBackTick(String project, String datasetName, String tableName) {

return BACKTICK + project + "." + datasetName + "." + tableName + BACKTICK;
}

/**
* Tries to submit a BQ job. If there is an Already Exists Exception, it will fetch the existing job
* @param bigquery
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ public void testMergeMultipleTimesUsesPrimaryKeyCache() throws Exception {
Mockito.verify(dataFileWriter, Mockito.times(2)).close();
//Mocks are setup such that the table already exists (for simplicity)
//max sequence num, load and merge jobs
Mockito.verify(bigQuery, Mockito.atLeast(3)).create(Mockito.any(JobInfo.class));
Mockito.verify(bigQuery, Mockito.atLeast(2)).create(Mockito.any(JobInfo.class));
//Delete staging table
Mockito.verify(bigQuery, Mockito.times(2)).delete(Mockito.any(TableId.class));

Expand Down
6 changes: 4 additions & 2 deletions src/test/java/io/cdap/delta/bigquery/BigQueryTargetTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public void setUp() throws Exception {

PowerMockito.when(conf, "getProject").thenReturn(PROJECT);
PowerMockito.when(conf, "getCredentials").thenReturn(credentials);
PowerMockito.when(conf, "getDatasetProject").thenReturn(PROJECT);

Mockito.when(bucketInfoBuilder.build()).thenReturn(bucketInfo);

Expand Down Expand Up @@ -164,6 +165,7 @@ public void setUp() throws Exception {
@Test
public void testStagingBucketName() {
String expectedBucketName = "somebucket";
DeltaPipelineId pipelineId = new DeltaPipelineId("ns", "app", 1L);
Assert.assertEquals(expectedBucketName, BigQueryTarget.getStagingBucketName("somebucket", pipelineId));
Assert.assertEquals(expectedBucketName, BigQueryTarget.getStagingBucketName("SomeBucket", pipelineId));
Assert.assertEquals(expectedBucketName, BigQueryTarget.getStagingBucketName("somebucket ", pipelineId));
Expand Down Expand Up @@ -192,7 +194,7 @@ public void testGetMaximumExistingSequenceNumberForRetryableFailures() throws Ex
bqTarget.initialize(deltaTargetContext);
} finally {
//verify at least 1 retry happens
PowerMockito.verifyStatic(BigQueryUtils.class, Mockito.atLeast(2));
PowerMockito.verifyStatic(BigQueryUtils.class, Mockito.atLeast(1));
BigQueryUtils.getMaximumExistingSequenceNumber(Mockito.anySet(), Mockito.anyString(),
Mockito.nullable(String.class), Mockito.any(BigQuery.class),
Mockito.nullable(EncryptionConfiguration.class), Mockito.anyInt());
Expand All @@ -207,7 +209,7 @@ public void testGetMaximumExistingSequenceNumberForRetryableFailures() throws Ex
bqTarget.initialize(deltaTargetContext);
} finally {
//verify at least 1 retry happens
PowerMockito.verifyStatic(BigQueryUtils.class, Mockito.atLeast(2));
PowerMockito.verifyStatic(BigQueryUtils.class, Mockito.atLeast(1));
BigQueryUtils.getMaximumExistingSequenceNumber(Mockito.anySet(), Mockito.anyString(),
Mockito.nullable(String.class), Mockito.any(BigQuery.class),
Mockito.nullable(EncryptionConfiguration.class), Mockito.anyInt());
Expand Down
8 changes: 8 additions & 0 deletions widgets/bigquery-cdcTarget.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@
"widget-attributes": {
"default": "auto-detect"
}
},
{
"name": "datasetProject",
"label": "DataSet Project ID",
"widget-type": "textbox",
"widget-attributes": {
"placeholder": "Project the destination dataset belongs to, if different from the project ID."
}
}
]
},
Expand Down

0 comments on commit d50a59e

Please sign in to comment.