Skip to content

Commit

Permalink
Merge pull request #26 from GoogleCloudPlatform/main
Browse files Browse the repository at this point in the history
tests: Adding Forward Migration Tests (GoogleCloudPlatform#2001)
  • Loading branch information
taherkl authored Dec 18, 2024
2 parents 7512825 + 1bae9ad commit 5aa21dc
Show file tree
Hide file tree
Showing 16 changed files with 886 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class DataStreamToSpannerDDLIT extends DataStreamToSpannerITBase {
private static final String TABLE5 = "Users";
private static final String TABLE6 = "Books";
private static final String TABLE7 = "Authors";
private static final String TABLE8 = "Singers";

private static final String TRANSFORMATION_TABLE = "AllDatatypeTransformation";

Expand Down Expand Up @@ -411,6 +412,31 @@ public void migrationTestWithCharsetConversion() {
assertAuthorsBackfillContents();
}

@Test
public void migrationTestWithSequenceColumns() {
// Construct a ChainedConditionCheck with 2 stages.
// 1. Send initial wave of events
// 2. Wait on Spanner to have events
ChainedConditionCheck conditionCheck =
ChainedConditionCheck.builder(
List.of(
uploadDataStreamFile(
jobInfo, TABLE8, "sequence.avro", "DataStreamToSpannerDDLIT/Singers.avro"),
SpannerRowsCheck.builder(spannerResourceManager, TABLE8)
.setMinRows(2)
.setMaxRows(2)
.build()))
.build();

// Wait for conditions
PipelineOperator.Result result =
pipelineOperator()
.waitForCondition(createConfig(jobInfo, Duration.ofMinutes(8)), conditionCheck);

// Assert Conditions
assertThatResult(result).meetsConditions();
}

private void assertAllDatatypeColumnsTableBackfillContents() {
List<Map<String, Object>> events = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;

import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult;

import com.google.cloud.teleport.metadata.SkipDirectRunnerTest;
import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.PipelineOperator;
import org.apache.beam.it.common.utils.ResourceManagerUtils;
import org.apache.beam.it.conditions.ChainedConditionCheck;
import org.apache.beam.it.gcp.pubsub.PubsubResourceManager;
import org.apache.beam.it.gcp.spanner.SpannerResourceManager;
import org.apache.beam.it.gcp.spanner.conditions.SpannerRowsCheck;
import org.apache.beam.it.gcp.spanner.matchers.SpannerAsserts;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/**
* An integration test for {@link DataStreamToSpanner} Flex template which multiple use-cases
* tested: 1. Foreign Keys. 2. Table Dropped. 3. Column Rename. 4. DLQ Retry. 5. Missing PK 6.
* Indexes
*/
@Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class})
@TemplateIntegrationTest(DataStreamToSpanner.class)
@RunWith(JUnit4.class)
public class DataStreamToSpannerMixedIT extends DataStreamToSpannerITBase {

private static final String TABLE1 = "Authors";
private static final String TABLE2 = "Books";
private static final String TABLE3 = "Genre";
private static PipelineLauncher.LaunchInfo jobInfo;
private static HashSet<DataStreamToSpannerMixedIT> testInstances = new HashSet<>();
public static PubsubResourceManager pubsubResourceManager;
public static SpannerResourceManager spannerResourceManager;
private static final String SPANNER_DDL_RESOURCE =
"DataStreamToSpannerMixedIT/spanner-schema.sql";
private static final String SESSION_FILE_RESOURCE =
"DataStreamToSpannerMixedIT/mysql-session.json";

/**
* Setup resource managers and Launch dataflow job once during the execution of this test class.
*/
@Before
public void setUp() throws IOException {
// Prevent cleaning up of dataflow job after a test method is executed.
skipBaseCleanup = true;
synchronized (DataStreamToSpannerMixedIT.class) {
testInstances.add(this);
if (jobInfo == null) {
spannerResourceManager = setUpSpannerResourceManager();
pubsubResourceManager = setUpPubSubResourceManager();
createSpannerDDL(spannerResourceManager, SPANNER_DDL_RESOURCE);
jobInfo =
launchDataflowJob(
getClass().getSimpleName(),
SESSION_FILE_RESOURCE,
null,
"MixedIT",
spannerResourceManager,
pubsubResourceManager,
new HashMap<>() {
{
put("inputFileFormat", "avro");
}
},
null,
null);
}
}
}

/** Cleanup dataflow job and all the resources and resource managers. */
@AfterClass
public static void cleanUp() throws IOException {
for (DataStreamToSpannerMixedIT instance : testInstances) {
instance.tearDownBase();
}
ResourceManagerUtils.cleanResources(spannerResourceManager, pubsubResourceManager);
}

@Test
public void mixedMigrationTest() throws InterruptedException {
// Construct a ChainedConditionCheck with 2 stages.
// 1. Send initial wave of events
// 2. Wait on Spanner to have events
ChainedConditionCheck conditionCheck =
ChainedConditionCheck.builder(
List.of(
uploadDataStreamFile(
jobInfo,
TABLE1,
"authors_1.avro",
"DataStreamToSpannerMixedIT/Authors_1.avro"),
uploadDataStreamFile(
jobInfo, TABLE2, "books.avro", "DataStreamToSpannerMixedIT/Books.avro"),
uploadDataStreamFile(
jobInfo, TABLE3, "genre.avro", "DataStreamToSpannerMixedIT/Genre.avro"),
SpannerRowsCheck.builder(spannerResourceManager, TABLE1)
.setMinRows(1)
.setMaxRows(1)
.build()))
.build();

// Wait for conditions
PipelineOperator.Result result =
pipelineOperator()
.waitForCondition(createConfig(jobInfo, Duration.ofMinutes(8)), conditionCheck);

TimeUnit.MINUTES.sleep(1);
// Assert Conditions
assertThatResult(result).meetsConditions();

conditionCheck =
ChainedConditionCheck.builder(
List.of(
uploadDataStreamFile(
jobInfo,
TABLE1,
"authors_2.avro",
"DataStreamToSpannerMixedIT/Authors_2.avro"),
SpannerRowsCheck.builder(spannerResourceManager, TABLE1)
.setMinRows(4)
.setMaxRows(4)
.build(),
SpannerRowsCheck.builder(spannerResourceManager, TABLE2)
.setMinRows(3)
.setMaxRows(3)
.build()))
.build();

result =
pipelineOperator()
.waitForCondition(createConfig(jobInfo, Duration.ofMinutes(8)), conditionCheck);

// Assert Conditions
assertThatResult(result).meetsConditions();

assertAuthorsTableContents();

assertBooksTableContents();
}

private void assertAuthorsTableContents() {
List<Map<String, Object>> authorEvents = new ArrayList<>();

Map<String, Object> authorRow1 = new HashMap<>();
authorRow1.put("author_id", 4);
authorRow1.put("full_name", "Stephen King");

Map<String, Object> authorRow2 = new HashMap<>();
authorRow2.put("author_id", 1);
authorRow2.put("full_name", "Jane Austen");

Map<String, Object> authorRow3 = new HashMap<>();
authorRow3.put("author_id", 2);
authorRow3.put("full_name", "Charles Dickens");

Map<String, Object> authorRow4 = new HashMap<>();
authorRow4.put("author_id", 3);
authorRow4.put("full_name", "Leo Tolstoy");

authorEvents.add(authorRow1);
authorEvents.add(authorRow2);
authorEvents.add(authorRow3);
authorEvents.add(authorRow4);

SpannerAsserts.assertThatStructs(spannerResourceManager.runQuery("select * from Authors"))
.hasRecordsUnorderedCaseInsensitiveColumns(authorEvents);
}

private void assertBooksTableContents() {
List<Map<String, Object>> bookEvents = new ArrayList<>();

Map<String, Object> bookRow1 = new HashMap<>();
bookRow1.put("id", 1);
bookRow1.put("title", "Pride and Prejudice");

Map<String, Object> bookRow2 = new HashMap<>();
bookRow2.put("id", 2);
bookRow2.put("title", "Oliver Twist");

Map<String, Object> bookRow3 = new HashMap<>();
bookRow3.put("id", 3);
bookRow3.put("title", "War and Peace");

bookEvents.add(bookRow1);
bookEvents.add(bookRow2);
bookEvents.add(bookRow3);

SpannerAsserts.assertThatStructs(spannerResourceManager.runQuery("select id, title from Books"))
.hasRecordsUnorderedCaseInsensitiveColumns(bookEvents);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,9 @@ public static void cleanUp() throws IOException {
ResourceManagerUtils.cleanResources(spannerResourceManager, pubsubResourceManager);
}

/** Test checks for the following use-cases: 1. Drop Column. 2. Rename Column. 3. Drop Table */
@Test
public void migrationTestWithRenameAndDropColumn() {
public void migrationTestWithRenameAndDrops() {
// Construct a ChainedConditionCheck with 4 stages.
// 1. Send initial wave of events
// 2. Wait on Spanner to have events
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,11 @@ CREATE TABLE `AllDatatypeTransformation` (
`binary_column` binary(20), -- To: binary_column BYTES(MAX)
`bit_column` bit(7), -- To: bit_column BYTES(MAX)
PRIMARY KEY (`varchar_column`)
);

CREATE TABLE `Singers` (
`singer_id` int NOT NULL AUTO_INCREMENT,
`first_name` varchar(1024),
`last_name` varchar(1024),
PRIMARY KEY (`singer_id`)
);
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,11 @@ CREATE TABLE AllDatatypeTransformation (
binary_column BYTES(MAX),
bit_column BYTES(MAX),
) PRIMARY KEY (varchar_column);

CREATE SEQUENCE sequence_id OPTIONS (sequence_kind='bit_reversed_positive', skip_range_min = 0, skip_range_max = 3);

CREATE TABLE Singers (
singer_id INT64 NOT NULL DEFAULT (GET_NEXT_SEQUENCE_VALUE(SEQUENCE sequence_id)) ,
first_name STRING(1024),
last_name STRING(1024),
) PRIMARY KEY (singer_id);
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,10 @@
"t2": {
"ColumnLevelIssues": {},
"TableLevelIssues": null
},
"t3": {
"ColumnLevelIssues": {},
"TableLevelIssues": null
}
},
"Location": {},
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
CREATE TABLE `Authors` (
`author_id` int NOT NULL, -- To: category_id INT64
`name` varchar(25), -- To: full_name STRING(25) Column name renamed
`last_update` timestamp, -- To: Column dropped in spanner
PRIMARY KEY (`author_id`)
);

CREATE TABLE `Books` (
`id` int NOT NULL,
`title` varchar(200),
`author_id` int,
FOREIGN KEY (author_id) REFERENCES Author(author_id)
);

CREATE TABLE `Genre` (
`genre_id` int not NULL,
`name` varchar(200),
PRIMARY KEY (`genre_id`)
);
Loading

0 comments on commit 5aa21dc

Please sign in to comment.