From 76b7e8c009c17a44d57c1aee8d8611043aae810e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Bobowski?= Date: Fri, 20 Sep 2024 12:40:06 +0200 Subject: [PATCH] SNOW-1658905 Test improvements --- .../iceberg/IcebergTableSchemaValidator.java | 10 ++- .../internal/TestSnowflakeConnection.java | 23 ++++++ .../kafka/connector/internal/TestUtils.java | 71 ++----------------- .../streaming/iceberg/BaseIcebergIT.java | 42 +++++++++++ .../IcebergTableSchemaValidatorIT.java | 24 ++++--- 5 files changed, 90 insertions(+), 80 deletions(-) create mode 100644 src/test/java/com/snowflake/kafka/connector/internal/TestSnowflakeConnection.java create mode 100644 src/test/java/com/snowflake/kafka/connector/streaming/iceberg/BaseIcebergIT.java diff --git a/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java b/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java index 12ac859fe..63e7ce389 100644 --- a/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java +++ b/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java @@ -12,15 +12,21 @@ public IcebergTableSchemaValidator(SnowflakeConnectionService snowflakeConnectio } /** - * Ensure that table exists and record_metadata column is of type OBJECT(). TODO SNOW-1658914 - - * write a test for table with record_metadata schema altered by the connector + * Ensure that table exists and record_metadata column is of type OBJECT(). + * + *

TODO SNOW-1658914 - write a test for table with record_metadata schema altered by the + * connector */ public void validateTable(String tableName, String role) { + // TODO - plug into connector startup if (!snowflakeConnectionService.tableExist(tableName)) { + // TODO - better errors throw new RuntimeException("TODO"); } + // TODO - why is it so slow? if (!snowflakeConnectionService.hasSchemaEvolutionPermission(tableName, role)) { + // TODO - better errors throw new RuntimeException("TODO"); } diff --git a/src/test/java/com/snowflake/kafka/connector/internal/TestSnowflakeConnection.java b/src/test/java/com/snowflake/kafka/connector/internal/TestSnowflakeConnection.java new file mode 100644 index 000000000..cf05dc6f0 --- /dev/null +++ b/src/test/java/com/snowflake/kafka/connector/internal/TestSnowflakeConnection.java @@ -0,0 +1,23 @@ +package com.snowflake.kafka.connector.internal; + +import static com.snowflake.kafka.connector.internal.TestUtils.getConfFromFileName; + +import com.snowflake.client.jdbc.SnowflakeDriver; +import com.snowflake.kafka.connector.Utils; +import java.sql.Connection; +import java.util.Properties; + +/** Connection to test environment generated from a profile file stored locally. */ +public class TestSnowflakeConnection { + + /** Given a profile file path name, generate a connection by constructing a snowflake driver. */ + public static Connection getConnection() throws Exception { + SnowflakeURL url = + new SnowflakeURL(getConfFromFileName(TestUtils.PROFILE_PATH).get(Utils.SF_URL)); + + Properties properties = + InternalUtils.createProperties(getConfFromFileName(TestUtils.PROFILE_PATH), url); + + return new SnowflakeDriver().connect(url.getJdbcUrl(), properties); + } +} diff --git a/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java b/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java index f4d5ca4aa..9ccc7064e 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java @@ -31,7 +31,6 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; -import com.snowflake.client.jdbc.SnowflakeDriver; import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; import com.snowflake.kafka.connector.Utils; import com.snowflake.kafka.connector.config.SnowflakeSinkConnectorConfigBuilder; @@ -46,7 +45,6 @@ import java.io.UnsupportedEncodingException; import java.nio.charset.StandardCharsets; import java.security.PrivateKey; -import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; @@ -56,7 +54,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.Random; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -129,14 +126,10 @@ public class TestUtils { Pattern.compile("^[^/]+/[^/]+/(\\d+)/(\\d+)_(key|value)_(\\d+)\\.gz$"); // profile path - private static final String PROFILE_PATH = "profile.json"; + public static final String PROFILE_PATH = "profile.json"; private static final ObjectMapper mapper = new ObjectMapper(); - private static Connection conn = null; - - private static Connection connForStreamingIngestTests = null; - private static Map conf = null; private static Map confWithOAuth = null; @@ -262,33 +255,6 @@ public static PrivateKey getPrivateKey() { return InternalUtils.parsePrivateKey(TestUtils.getKeyString()); } - /** - * Create snowflake jdbc connection - * - * @return jdbc connection - * @throws Exception when meeting error - */ - private static Connection getConnection() throws Exception { - if (conn != null) { - return conn; - } - - return generateConnectionToSnowflake(PROFILE_PATH); - } - - /** Given a profile file path name, generate a connection by constructing a snowflake driver. */ - private static Connection generateConnectionToSnowflake(final String profileFileName) - throws Exception { - SnowflakeURL url = new SnowflakeURL(getConfFromFileName(profileFileName).get(Utils.SF_URL)); - - Properties properties = - InternalUtils.createProperties(getConfFromFileName(profileFileName), url); - - Connection connToSnowflake = new SnowflakeDriver().connect(url.getJdbcUrl(), properties); - - return connToSnowflake; - } - /** * read conf file * @@ -413,7 +379,7 @@ public static Map getConfWithOAuth() { */ static ResultSet executeQuery(String query) { try { - Statement statement = getConnection().createStatement(); + Statement statement = TestSnowflakeConnection.getConnection().createStatement(); return statement.executeQuery(query); } // if ANY exceptions occur, an illegal state has been reached @@ -428,9 +394,9 @@ static ResultSet executeQuery(String query) { * @param query sql query string * @param parameter parameter to be inserted at index 1 */ - static void executeQueryWithParameter(String query, String parameter) { + public static void executeQueryWithParameter(String query, String parameter) { try { - PreparedStatement stmt = getConnection().prepareStatement(query); + PreparedStatement stmt = TestSnowflakeConnection.getConnection().prepareStatement(query); stmt.setString(1, parameter); stmt.execute(); stmt.close(); @@ -450,35 +416,6 @@ public static void dropTable(String tableName) { executeQuery(query); } - /** - * drop an iceberg table - * - * @param tableName table name - */ - public static void dropIcebergTable(String tableName) { - String query = "drop iceberg table if exists identifier(?)"; - executeQueryWithParameter(query, tableName); - } - - /** - * create an iceberg table - * - * @param tableName table name - */ - public static void createIcebergTable(String tableName) throws Exception { - String query = - "create or replace iceberg table identifier(?) (record_metadata object())" - + "external_volume = 'test_exvol'" - + "catalog = 'SNOWFLAKE'" - + "base_location = 'it'"; - executeQueryWithParameter(query, tableName); - } - - public static void enableSchemaEvolution(String tableName) throws Exception { - String query = "alter iceberg table identifier(?) set enable_schema_evolution = true"; - executeQueryWithParameter(query, tableName); - } - /** Select * from table */ public static ResultSet showTable(String tableName) { String query = "select * from " + tableName; diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/BaseIcebergIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/BaseIcebergIT.java new file mode 100644 index 000000000..46603a837 --- /dev/null +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/BaseIcebergIT.java @@ -0,0 +1,42 @@ +package com.snowflake.kafka.connector.streaming.iceberg; + +import static com.snowflake.kafka.connector.internal.TestUtils.executeQueryWithParameter; + +import com.snowflake.kafka.connector.internal.SnowflakeConnectionService; +import com.snowflake.kafka.connector.internal.TestUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; + +public class BaseIcebergIT { + + protected static SnowflakeConnectionService conn; + + @BeforeAll + public static void setup() { + conn = TestUtils.getConnectionServiceForStreaming(); + } + + @AfterAll + public static void teardown() { + conn.close(); + } + + protected static void createIcebergTable(String tableName) throws Exception { + String query = + "create or replace iceberg table identifier(?) (record_metadata object())" + + "external_volume = 'test_exvol'" + + "catalog = 'SNOWFLAKE'" + + "base_location = 'it'"; + executeQueryWithParameter(query, tableName); + } + + protected static void dropIcebergTable(String tableName) { + String query = "drop iceberg table if exists identifier(?)"; + executeQueryWithParameter(query, tableName); + } + + protected static void enableSchemaEvolution(String tableName) throws Exception { + String query = "alter iceberg table identifier(?) set enable_schema_evolution = true"; + executeQueryWithParameter(query, tableName); + } +} diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java index d9aca7905..249bde3c6 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java @@ -1,17 +1,15 @@ package com.snowflake.kafka.connector.streaming.iceberg; -import com.snowflake.kafka.connector.internal.SnowflakeConnectionService; import com.snowflake.kafka.connector.internal.TestUtils; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -public class IcebergTableSchemaValidatorIT { +public class IcebergTableSchemaValidatorIT extends BaseIcebergIT { private static final String TEST_ROLE = "testrole_kafka"; - private static final SnowflakeConnectionService conn = - TestUtils.getConnectionServiceForStreaming(); private static final IcebergTableSchemaValidator schemaValidator = new IcebergTableSchemaValidator(conn); @@ -24,14 +22,14 @@ public void setUp() { @AfterEach public void tearDown() { - TestUtils.dropIcebergTable(tableName); + dropIcebergTable(tableName); } @Test public void shouldValidateExpectedIcebergTableSchema() throws Exception { // given - TestUtils.createIcebergTable(tableName); - TestUtils.enableSchemaEvolution(tableName); + createIcebergTable(tableName); + enableSchemaEvolution(tableName); // when, then schemaValidator.validateTable(tableName, TEST_ROLE); @@ -39,13 +37,17 @@ public void shouldValidateExpectedIcebergTableSchema() throws Exception { @Test public void shouldThrowExceptionWhenTableDoesNotExist() { - // Assertions.assertThrows(RuntimeException.class, () -> - // schemaValidator.validateTable(tableName, TEST_ROLE)); + Assertions.assertThrows( + RuntimeException.class, () -> schemaValidator.validateTable(tableName, TEST_ROLE)); } @Test - public void shouldThrowExceptionWhenRecordMetadataDoesNotExist() {} + public void shouldThrowExceptionWhenRecordMetadataDoesNotExist() { + // TODO + } @Test - public void shouldThrowExceptionWhenRecordMetadataHasInvalidType() {} + public void shouldThrowExceptionWhenRecordMetadataHasInvalidType() { + // TODO + } }