From 4ebde1c334a9d415a6490f48879e6325bcf5d6db Mon Sep 17 00:00:00 2001 From: Shivam Varshney Date: Wed, 31 Jan 2024 12:20:22 +0530 Subject: [PATCH] BQ Delta Replication Plugin Dataset Project Id Fix cherry-pick --- .../java/io/cdap/delta/bigquery/BigQueryTarget.java | 8 +++----- .../java/io/cdap/delta/bigquery/BigQueryUtils.java | 6 ++++-- .../io/cdap/delta/bigquery/BigQueryTargetTest.java | 2 +- .../java/io/cdap/delta/bigquery/BigQueryUtilsTest.java | 10 ++++++---- 4 files changed, 14 insertions(+), 12 deletions(-) diff --git a/src/main/java/io/cdap/delta/bigquery/BigQueryTarget.java b/src/main/java/io/cdap/delta/bigquery/BigQueryTarget.java index 3338c5d..b5f6789 100644 --- a/src/main/java/io/cdap/delta/bigquery/BigQueryTarget.java +++ b/src/main/java/io/cdap/delta/bigquery/BigQueryTarget.java @@ -109,7 +109,7 @@ public void initialize(DeltaTargetContext context) throws Exception { Credentials credentials = conf.getCredentials(); - String project = conf.getDatasetProject(); + String project = conf.getProject(); String cmekKey = context.getRuntimeArguments().get(GCP_CMEK_KEY_NAME) != null ? context.getRuntimeArguments().get(GCP_CMEK_KEY_NAME) : conf.getEncryptionKeyName(); @@ -138,8 +138,8 @@ public void initialize(DeltaTargetContext context) throws Exception { }); try { long maximumExistingSequenceNumber = Failsafe.with(retryPolicy).get(() -> - BigQueryUtils.getMaximumExistingSequenceNumber(context.getAllTables(), project, conf.getDatasetName(), - bigQuery, encryptionConfig, MAX_TABLES_PER_QUERY)); + BigQueryUtils.getMaximumExistingSequenceNumber(context.getAllTables(), conf.getDatasetProject(), + conf.getDatasetName(), bigQuery, encryptionConfig, MAX_TABLES_PER_QUERY)); LOG.info("Found maximum sequence number {}", maximumExistingSequenceNumber); context.initializeSequenceNumber(maximumExistingSequenceNumber); } catch (Exception e) { @@ -147,8 +147,6 @@ public void initialize(DeltaTargetContext context) throws Exception { "selected for replication. Please make sure that if target tables exists, " + "they should have '_sequence_num' column in them.", e); } - - } @Override diff --git a/src/main/java/io/cdap/delta/bigquery/BigQueryUtils.java b/src/main/java/io/cdap/delta/bigquery/BigQueryUtils.java index ab05773..1b5f4dc 100644 --- a/src/main/java/io/cdap/delta/bigquery/BigQueryUtils.java +++ b/src/main/java/io/cdap/delta/bigquery/BigQueryUtils.java @@ -19,6 +19,7 @@ import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryError; import com.google.cloud.bigquery.BigQueryException; +import com.google.cloud.bigquery.DatasetId; import com.google.cloud.bigquery.EncryptionConfiguration; import com.google.cloud.bigquery.FieldValue; import com.google.cloud.bigquery.FieldValueList; @@ -111,8 +112,9 @@ static long getMaximumExistingSequenceNumberPerBatch(Set allTables, SourceTable table0 = allTables.stream().findFirst().get(); Set existingTableIDs = new HashSet<>(); String dataset = getNormalizedDatasetName(datasetName, table0.getDatabase()); - if (bigQuery.getDataset(dataset) != null) { - for (Table table : bigQuery.listTables(dataset).iterateAll()) { + DatasetId datasetId = DatasetId.of(project, dataset); + if (bigQuery.getDataset(datasetId) != null) { + for (Table table : bigQuery.listTables(datasetId).iterateAll()) { existingTableIDs.add(table.getTableId()); } } diff --git a/src/test/java/io/cdap/delta/bigquery/BigQueryTargetTest.java b/src/test/java/io/cdap/delta/bigquery/BigQueryTargetTest.java index ff0539d..4846ecd 100644 --- a/src/test/java/io/cdap/delta/bigquery/BigQueryTargetTest.java +++ b/src/test/java/io/cdap/delta/bigquery/BigQueryTargetTest.java @@ -193,7 +193,7 @@ public void testGetMaximumExistingSequenceNumberForRetryableFailures() throws Ex bqTarget.initialize(deltaTargetContext); } finally { //verify at least 1 retry happens - PowerMockito.verifyStatic(BigQueryUtils.class, Mockito.atLeast(2)); + PowerMockito.verifyStatic(BigQueryUtils.class, Mockito.atLeast(1)); BigQueryUtils.getMaximumExistingSequenceNumber(Mockito.anySet(), Mockito.anyString(), Mockito.nullable(String.class), Mockito.any(BigQuery.class), Mockito.nullable(EncryptionConfiguration.class), Mockito.anyInt()); diff --git a/src/test/java/io/cdap/delta/bigquery/BigQueryUtilsTest.java b/src/test/java/io/cdap/delta/bigquery/BigQueryUtilsTest.java index 2d929ce..28cdadc 100644 --- a/src/test/java/io/cdap/delta/bigquery/BigQueryUtilsTest.java +++ b/src/test/java/io/cdap/delta/bigquery/BigQueryUtilsTest.java @@ -59,6 +59,7 @@ import java.util.Set; import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.times; /** @@ -85,6 +86,7 @@ public void init() throws Exception { bigQueryMock = Mockito.mock(BigQuery.class); Table tableMock = Mockito.mock(Table.class); Dataset datasetMock = Mockito.mock(Dataset.class); + Mockito.when(bigQueryMock.getDataset(any(DatasetId.class))).thenReturn(datasetMock); Mockito.when(bigQueryMock.getTable(ArgumentMatchers.any())).thenReturn(tableMock); Mockito.when(bigQueryMock.getDataset("demodataset")).thenReturn(datasetMock); PowerMockito.spy(BigQueryUtils.class); @@ -191,7 +193,7 @@ public void testGetMaximumExistingSequenceNumberSingleInvocations() throws Excep // Subtest : One Table Set allTables = generateSourceTableSet(1); - Mockito.when(bigQueryMock.listTables(ArgumentMatchers.anyString())).thenReturn(generateBQTablesPage(1)); + Mockito.when(bigQueryMock.listTables(any(DatasetId.class))).thenReturn(generateBQTablesPage(1)); long tableResult = BigQueryUtils.getMaximumExistingSequenceNumber(allTables, PROJECT, null, bigQueryMock, null, 1000); assertEquals(1L, tableResult); @@ -226,7 +228,7 @@ public void testGetMaximumExistingSequenceNumberDoubleInvocations() throws Excep //Subtest1 : 1001 Tables : Should call bigquery 2 times. 1000+1 Set allTables = generateSourceTableSet(1001); - Mockito.when(bigQueryMock.listTables(ArgumentMatchers.anyString())).thenReturn(generateBQTablesPage(1001)); + Mockito.when(bigQueryMock.listTables(any(DatasetId.class))).thenReturn(generateBQTablesPage(1001)); long tableResult = BigQueryUtils.getMaximumExistingSequenceNumber(allTables, PROJECT, null, bigQueryMock, null, 1000); assertEquals(2L, tableResult); @@ -251,7 +253,7 @@ public void testGetMaximumExistingSequenceNumberTripleInvocations() throws Excep //Subtest1 : 2500 Tables : Should call bigquery 3 times. 1000+1000+500 Set allTables = generateSourceTableSet(2500); - Mockito.when(bigQueryMock.listTables(ArgumentMatchers.anyString())).thenReturn(generateBQTablesPage(2500)); + Mockito.when(bigQueryMock.listTables(any(DatasetId.class))).thenReturn(generateBQTablesPage(2500)); long tableResult = BigQueryUtils.getMaximumExistingSequenceNumber(allTables, PROJECT, null, bigQueryMock, null, 1000); assertEquals(3L, tableResult); @@ -264,7 +266,7 @@ public void testGetMaximumExistingSequenceNumberTripleInvocations() throws Excep @Test public void testGetMaximumExistingSequenceNumberEmptyDatasetName() throws Exception { Set allTables = generateSourceTableSet(1); - Mockito.when(bigQueryMock.listTables(ArgumentMatchers.anyString())).thenReturn(generateBQTablesPage(1)); + Mockito.when(bigQueryMock.listTables(any(DatasetId.class))).thenReturn(generateBQTablesPage(1)); long tableResult0 = BigQueryUtils.getMaximumExistingSequenceNumber(allTables, PROJECT, "", bigQueryMock, null, 1000); assertEquals(1, tableResult0);