From 2ccf52b75f1846d033f536ebe7ba624225d83087 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Bobowski?= Date: Tue, 17 Sep 2024 17:31:37 +0200 Subject: [PATCH 1/6] SNOW-1658905 Create snowflake.streaming.iceberg.enabled parameter --- .../kafka/connector/config/ConnectorConfigDefinition.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/snowflake/kafka/connector/config/ConnectorConfigDefinition.java b/src/main/java/com/snowflake/kafka/connector/config/ConnectorConfigDefinition.java index a4314cd49..8ef07cef0 100644 --- a/src/main/java/com/snowflake/kafka/connector/config/ConnectorConfigDefinition.java +++ b/src/main/java/com/snowflake/kafka/connector/config/ConnectorConfigDefinition.java @@ -466,7 +466,7 @@ public static ConfigDef getConfig() { ICEBERG_ENABLED, ConfigDef.Type.BOOLEAN, ICEBERG_ENABLED_DEFAULT_VALUE, - ConfigDef.Importance.HIGH, + ConfigDef.Importance.MEDIUM, "When set to true the connector will ingest data into the Iceberg table. Check the" + " official Snowflake documentation for the prerequisites."); } From 270bdc9507c1289a6ba130d0557ecd13e7a4f4a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Bobowski?= Date: Thu, 19 Sep 2024 13:49:27 +0200 Subject: [PATCH 2/6] PR fixes --- .../kafka/connector/config/ConnectorConfigDefinition.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/snowflake/kafka/connector/config/ConnectorConfigDefinition.java b/src/main/java/com/snowflake/kafka/connector/config/ConnectorConfigDefinition.java index 8ef07cef0..a4314cd49 100644 --- a/src/main/java/com/snowflake/kafka/connector/config/ConnectorConfigDefinition.java +++ b/src/main/java/com/snowflake/kafka/connector/config/ConnectorConfigDefinition.java @@ -466,7 +466,7 @@ public static ConfigDef getConfig() { ICEBERG_ENABLED, ConfigDef.Type.BOOLEAN, ICEBERG_ENABLED_DEFAULT_VALUE, - ConfigDef.Importance.MEDIUM, + ConfigDef.Importance.HIGH, "When set to true the connector will ingest data into the Iceberg table. Check the" + " official Snowflake documentation for the prerequisites."); } From c94f6c276d2a9995dfe2b70581cb3ab8e481ba6e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Bobowski?= Date: Thu, 19 Sep 2024 14:08:28 +0200 Subject: [PATCH 3/6] SNOW-1658905 Validate Iceberg table structure on startup --- .../SnowflakeConnectionServiceV1.java | 4 ++ .../iceberg/IcebergTableSchemaValidator.java | 31 ++++++++++++ .../kafka/connector/internal/TestUtils.java | 47 ++++++++++++++++++ .../IcebergTableSchemaValidatorIT.java | 49 +++++++++++++++++++ 4 files changed, 131 insertions(+) create mode 100644 src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java create mode 100644 src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java index d2d9e1ef2..dec066511 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java @@ -660,6 +660,7 @@ public void schemaExists(String schemaName) { } @Override + // TODO - move to test-only class public void dropPipe(final String pipeName) { checkConnection(); InternalUtils.assertNotEmpty("pipeName", pipeName); @@ -678,6 +679,7 @@ public void dropPipe(final String pipeName) { } @Override + // TODO - move to test-only class public boolean dropStageIfEmpty(final String stageName) { checkConnection(); InternalUtils.assertNotEmpty("stageName", stageName); @@ -761,6 +763,7 @@ public void moveToTableStage( } @Override + // TODO - move to test-only class public void moveToTableStage( final String tableName, final String stageName, final String prefix) { InternalUtils.assertNotEmpty("tableName", tableName); @@ -808,6 +811,7 @@ public List listStage(final String stageName, final String prefix) { @Override @Deprecated // Only using it in test for performance testing + // TODO - move to test-only class public void put(final String stageName, final String fileName, final String content) { InternalUtils.assertNotEmpty("stageName", stageName); SnowflakeConnectionV1 sfconn = (SnowflakeConnectionV1) conn; diff --git a/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java b/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java new file mode 100644 index 000000000..3a25863d9 --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java @@ -0,0 +1,31 @@ +package com.snowflake.kafka.connector.streaming.iceberg; + +import com.snowflake.kafka.connector.internal.SnowflakeConnectionService; + +/** + * Performs validations of Iceberg table schema on the connector startup. + **/ +public class IcebergTableSchemaValidator { + + private final SnowflakeConnectionService snowflakeConnectionService; + + public IcebergTableSchemaValidator(SnowflakeConnectionService snowflakeConnectionService) { + this.snowflakeConnectionService = snowflakeConnectionService; + } + + /** + * Ensure that table exists and record_metadata column is of type OBJECT(). + * TODO SNOW-1658914 - write a test for table with record_metadata schema altered by the connector + */ + public void validateTable(String tableName, String role) { + if (!snowflakeConnectionService.tableExist(tableName)) { + throw new RuntimeException("TODO"); + } + + if (!snowflakeConnectionService.hasSchemaEvolutionPermission(tableName, role)) { + throw new RuntimeException("TODO"); + } + + // TODO - call describe table and analyze record_metadata schema + } +} diff --git a/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java b/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java index cab7a389b..f781fdd8a 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java @@ -47,6 +47,7 @@ import java.nio.charset.StandardCharsets; import java.security.PrivateKey; import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; @@ -421,6 +422,23 @@ static ResultSet executeQuery(String query) { } } + /** + * execute sql query + * + * @param query sql query string + * @param parameter parameter to be inserted at index 1 + */ + static void executeQueryWithParameter(String query, String parameter) { + try { + PreparedStatement stmt = getConnection().prepareStatement(query); + stmt.setString(1, parameter); + stmt.execute(); + stmt.close(); + } catch (Exception e) { + throw new RuntimeException("Error executing query: " + query, e); + } + } + /** * drop a table * @@ -432,6 +450,35 @@ public static void dropTable(String tableName) { executeQuery(query); } + /** + * drop an iceberg table + * + * @param tableName table name + */ + public static void dropIcebergTable(String tableName) { + String query = "drop iceberg table if exists identifier(?)"; + executeQueryWithParameter(query, tableName); + } + + /** + * create an iceberg table + * + * @param tableName table name + */ + public static void createIcebergTable(String tableName) throws Exception { + String query = + "create or replace iceberg table identifier(?) (record_metadata object())" + + "external_volume = 'test_exvol'" + + "catalog = 'SNOWFLAKE'" + + "base_location = 'it'"; + executeQueryWithParameter(query, tableName); + } + + public static void enableSchemaEvolution(String tableName) throws Exception { + String query = "alter iceberg table identifier(?) set enable_schema_evolution = true"; + executeQueryWithParameter(query, tableName); + } + /** Select * from table */ public static ResultSet showTable(String tableName) { String query = "select * from " + tableName; diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java new file mode 100644 index 000000000..c885af712 --- /dev/null +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java @@ -0,0 +1,49 @@ +package com.snowflake.kafka.connector.streaming.iceberg; + +import com.snowflake.kafka.connector.internal.SnowflakeConnectionService; +import com.snowflake.kafka.connector.internal.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class IcebergTableSchemaValidatorIT { + + private static final String TEST_ROLE = "testrole_kafka" + + private static final SnowflakeConnectionService conn = TestUtils.getConnectionServiceForStreaming(); + private static final IcebergTableSchemaValidator schemaValidator = new IcebergTableSchemaValidator(conn); + + private String tableName; + + @BeforeEach + public void setUp() { + tableName = TestUtils.randomTableName(); + } + + @AfterEach + public void tearDown() { + TestUtils.dropIcebergTable(tableName); + } + + @Test + public void shouldValidateExpectedIcebergTableSchema() throws Exception { + // given + TestUtils.createIcebergTable(tableName); + TestUtils.enableSchemaEvolution(tableName); + + // when, then + schemaValidator.validateTable(tableName, TEST_ROLE); + } + + @Test + public void shouldThrowExceptionWhenTableDoesNotExist() { + // Assertions.assertThrows(RuntimeException.class, () -> schemaValidator.validateTable(tableName, TEST_ROLE)); + } + + @Test + public void shouldThrowExceptionWhenRecordMetadataDoesNotExist() {} + + @Test + public void shouldThrowExceptionWhenRecordMetadataHasInvalidType() {} +} From 7007096a5f9d54d098b090c141b2d3fb80a3655e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Bobowski?= Date: Fri, 20 Sep 2024 12:04:22 +0200 Subject: [PATCH 4/6] SNOW-1658905 Missing ; and format --- .../iceberg/IcebergTableSchemaValidator.java | 38 ++++++------ .../kafka/connector/internal/TestUtils.java | 8 +-- .../IcebergTableSchemaValidatorIT.java | 62 ++++++++++--------- 3 files changed, 54 insertions(+), 54 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java b/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java index 3a25863d9..12ac859fe 100644 --- a/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java +++ b/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java @@ -2,30 +2,28 @@ import com.snowflake.kafka.connector.internal.SnowflakeConnectionService; -/** - * Performs validations of Iceberg table schema on the connector startup. - **/ +/** Performs validations of Iceberg table schema on the connector startup. */ public class IcebergTableSchemaValidator { - private final SnowflakeConnectionService snowflakeConnectionService; + private final SnowflakeConnectionService snowflakeConnectionService; - public IcebergTableSchemaValidator(SnowflakeConnectionService snowflakeConnectionService) { - this.snowflakeConnectionService = snowflakeConnectionService; - } - - /** - * Ensure that table exists and record_metadata column is of type OBJECT(). - * TODO SNOW-1658914 - write a test for table with record_metadata schema altered by the connector - */ - public void validateTable(String tableName, String role) { - if (!snowflakeConnectionService.tableExist(tableName)) { - throw new RuntimeException("TODO"); - } + public IcebergTableSchemaValidator(SnowflakeConnectionService snowflakeConnectionService) { + this.snowflakeConnectionService = snowflakeConnectionService; + } - if (!snowflakeConnectionService.hasSchemaEvolutionPermission(tableName, role)) { - throw new RuntimeException("TODO"); - } + /** + * Ensure that table exists and record_metadata column is of type OBJECT(). TODO SNOW-1658914 - + * write a test for table with record_metadata schema altered by the connector + */ + public void validateTable(String tableName, String role) { + if (!snowflakeConnectionService.tableExist(tableName)) { + throw new RuntimeException("TODO"); + } - // TODO - call describe table and analyze record_metadata schema + if (!snowflakeConnectionService.hasSchemaEvolutionPermission(tableName, role)) { + throw new RuntimeException("TODO"); } + + // TODO - call describe table and analyze record_metadata schema + } } diff --git a/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java b/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java index f781fdd8a..f4d5ca4aa 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java @@ -467,10 +467,10 @@ public static void dropIcebergTable(String tableName) { */ public static void createIcebergTable(String tableName) throws Exception { String query = - "create or replace iceberg table identifier(?) (record_metadata object())" + - "external_volume = 'test_exvol'" + - "catalog = 'SNOWFLAKE'" + - "base_location = 'it'"; + "create or replace iceberg table identifier(?) (record_metadata object())" + + "external_volume = 'test_exvol'" + + "catalog = 'SNOWFLAKE'" + + "base_location = 'it'"; executeQueryWithParameter(query, tableName); } diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java index c885af712..d9aca7905 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java @@ -3,47 +3,49 @@ import com.snowflake.kafka.connector.internal.SnowflakeConnectionService; import com.snowflake.kafka.connector.internal.TestUtils; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; public class IcebergTableSchemaValidatorIT { - private static final String TEST_ROLE = "testrole_kafka" + private static final String TEST_ROLE = "testrole_kafka"; - private static final SnowflakeConnectionService conn = TestUtils.getConnectionServiceForStreaming(); - private static final IcebergTableSchemaValidator schemaValidator = new IcebergTableSchemaValidator(conn); + private static final SnowflakeConnectionService conn = + TestUtils.getConnectionServiceForStreaming(); + private static final IcebergTableSchemaValidator schemaValidator = + new IcebergTableSchemaValidator(conn); - private String tableName; + private String tableName; - @BeforeEach - public void setUp() { - tableName = TestUtils.randomTableName(); - } + @BeforeEach + public void setUp() { + tableName = TestUtils.randomTableName(); + } - @AfterEach - public void tearDown() { - TestUtils.dropIcebergTable(tableName); - } - - @Test - public void shouldValidateExpectedIcebergTableSchema() throws Exception { - // given - TestUtils.createIcebergTable(tableName); - TestUtils.enableSchemaEvolution(tableName); + @AfterEach + public void tearDown() { + TestUtils.dropIcebergTable(tableName); + } - // when, then - schemaValidator.validateTable(tableName, TEST_ROLE); - } + @Test + public void shouldValidateExpectedIcebergTableSchema() throws Exception { + // given + TestUtils.createIcebergTable(tableName); + TestUtils.enableSchemaEvolution(tableName); - @Test - public void shouldThrowExceptionWhenTableDoesNotExist() { - // Assertions.assertThrows(RuntimeException.class, () -> schemaValidator.validateTable(tableName, TEST_ROLE)); - } + // when, then + schemaValidator.validateTable(tableName, TEST_ROLE); + } - @Test - public void shouldThrowExceptionWhenRecordMetadataDoesNotExist() {} + @Test + public void shouldThrowExceptionWhenTableDoesNotExist() { + // Assertions.assertThrows(RuntimeException.class, () -> + // schemaValidator.validateTable(tableName, TEST_ROLE)); + } - @Test - public void shouldThrowExceptionWhenRecordMetadataHasInvalidType() {} + @Test + public void shouldThrowExceptionWhenRecordMetadataDoesNotExist() {} + + @Test + public void shouldThrowExceptionWhenRecordMetadataHasInvalidType() {} } From 76b7e8c009c17a44d57c1aee8d8611043aae810e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Bobowski?= Date: Fri, 20 Sep 2024 12:40:06 +0200 Subject: [PATCH 5/6] SNOW-1658905 Test improvements --- .../iceberg/IcebergTableSchemaValidator.java | 10 ++- .../internal/TestSnowflakeConnection.java | 23 ++++++ .../kafka/connector/internal/TestUtils.java | 71 ++----------------- .../streaming/iceberg/BaseIcebergIT.java | 42 +++++++++++ .../IcebergTableSchemaValidatorIT.java | 24 ++++--- 5 files changed, 90 insertions(+), 80 deletions(-) create mode 100644 src/test/java/com/snowflake/kafka/connector/internal/TestSnowflakeConnection.java create mode 100644 src/test/java/com/snowflake/kafka/connector/streaming/iceberg/BaseIcebergIT.java diff --git a/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java b/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java index 12ac859fe..63e7ce389 100644 --- a/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java +++ b/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java @@ -12,15 +12,21 @@ public IcebergTableSchemaValidator(SnowflakeConnectionService snowflakeConnectio } /** - * Ensure that table exists and record_metadata column is of type OBJECT(). TODO SNOW-1658914 - - * write a test for table with record_metadata schema altered by the connector + * Ensure that table exists and record_metadata column is of type OBJECT(). + * + *

TODO SNOW-1658914 - write a test for table with record_metadata schema altered by the + * connector */ public void validateTable(String tableName, String role) { + // TODO - plug into connector startup if (!snowflakeConnectionService.tableExist(tableName)) { + // TODO - better errors throw new RuntimeException("TODO"); } + // TODO - why is it so slow? if (!snowflakeConnectionService.hasSchemaEvolutionPermission(tableName, role)) { + // TODO - better errors throw new RuntimeException("TODO"); } diff --git a/src/test/java/com/snowflake/kafka/connector/internal/TestSnowflakeConnection.java b/src/test/java/com/snowflake/kafka/connector/internal/TestSnowflakeConnection.java new file mode 100644 index 000000000..cf05dc6f0 --- /dev/null +++ b/src/test/java/com/snowflake/kafka/connector/internal/TestSnowflakeConnection.java @@ -0,0 +1,23 @@ +package com.snowflake.kafka.connector.internal; + +import static com.snowflake.kafka.connector.internal.TestUtils.getConfFromFileName; + +import com.snowflake.client.jdbc.SnowflakeDriver; +import com.snowflake.kafka.connector.Utils; +import java.sql.Connection; +import java.util.Properties; + +/** Connection to test environment generated from a profile file stored locally. */ +public class TestSnowflakeConnection { + + /** Given a profile file path name, generate a connection by constructing a snowflake driver. */ + public static Connection getConnection() throws Exception { + SnowflakeURL url = + new SnowflakeURL(getConfFromFileName(TestUtils.PROFILE_PATH).get(Utils.SF_URL)); + + Properties properties = + InternalUtils.createProperties(getConfFromFileName(TestUtils.PROFILE_PATH), url); + + return new SnowflakeDriver().connect(url.getJdbcUrl(), properties); + } +} diff --git a/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java b/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java index f4d5ca4aa..9ccc7064e 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java @@ -31,7 +31,6 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; -import com.snowflake.client.jdbc.SnowflakeDriver; import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; import com.snowflake.kafka.connector.Utils; import com.snowflake.kafka.connector.config.SnowflakeSinkConnectorConfigBuilder; @@ -46,7 +45,6 @@ import java.io.UnsupportedEncodingException; import java.nio.charset.StandardCharsets; import java.security.PrivateKey; -import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; @@ -56,7 +54,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.Random; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -129,14 +126,10 @@ public class TestUtils { Pattern.compile("^[^/]+/[^/]+/(\\d+)/(\\d+)_(key|value)_(\\d+)\\.gz$"); // profile path - private static final String PROFILE_PATH = "profile.json"; + public static final String PROFILE_PATH = "profile.json"; private static final ObjectMapper mapper = new ObjectMapper(); - private static Connection conn = null; - - private static Connection connForStreamingIngestTests = null; - private static Map conf = null; private static Map confWithOAuth = null; @@ -262,33 +255,6 @@ public static PrivateKey getPrivateKey() { return InternalUtils.parsePrivateKey(TestUtils.getKeyString()); } - /** - * Create snowflake jdbc connection - * - * @return jdbc connection - * @throws Exception when meeting error - */ - private static Connection getConnection() throws Exception { - if (conn != null) { - return conn; - } - - return generateConnectionToSnowflake(PROFILE_PATH); - } - - /** Given a profile file path name, generate a connection by constructing a snowflake driver. */ - private static Connection generateConnectionToSnowflake(final String profileFileName) - throws Exception { - SnowflakeURL url = new SnowflakeURL(getConfFromFileName(profileFileName).get(Utils.SF_URL)); - - Properties properties = - InternalUtils.createProperties(getConfFromFileName(profileFileName), url); - - Connection connToSnowflake = new SnowflakeDriver().connect(url.getJdbcUrl(), properties); - - return connToSnowflake; - } - /** * read conf file * @@ -413,7 +379,7 @@ public static Map getConfWithOAuth() { */ static ResultSet executeQuery(String query) { try { - Statement statement = getConnection().createStatement(); + Statement statement = TestSnowflakeConnection.getConnection().createStatement(); return statement.executeQuery(query); } // if ANY exceptions occur, an illegal state has been reached @@ -428,9 +394,9 @@ static ResultSet executeQuery(String query) { * @param query sql query string * @param parameter parameter to be inserted at index 1 */ - static void executeQueryWithParameter(String query, String parameter) { + public static void executeQueryWithParameter(String query, String parameter) { try { - PreparedStatement stmt = getConnection().prepareStatement(query); + PreparedStatement stmt = TestSnowflakeConnection.getConnection().prepareStatement(query); stmt.setString(1, parameter); stmt.execute(); stmt.close(); @@ -450,35 +416,6 @@ public static void dropTable(String tableName) { executeQuery(query); } - /** - * drop an iceberg table - * - * @param tableName table name - */ - public static void dropIcebergTable(String tableName) { - String query = "drop iceberg table if exists identifier(?)"; - executeQueryWithParameter(query, tableName); - } - - /** - * create an iceberg table - * - * @param tableName table name - */ - public static void createIcebergTable(String tableName) throws Exception { - String query = - "create or replace iceberg table identifier(?) (record_metadata object())" - + "external_volume = 'test_exvol'" - + "catalog = 'SNOWFLAKE'" - + "base_location = 'it'"; - executeQueryWithParameter(query, tableName); - } - - public static void enableSchemaEvolution(String tableName) throws Exception { - String query = "alter iceberg table identifier(?) set enable_schema_evolution = true"; - executeQueryWithParameter(query, tableName); - } - /** Select * from table */ public static ResultSet showTable(String tableName) { String query = "select * from " + tableName; diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/BaseIcebergIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/BaseIcebergIT.java new file mode 100644 index 000000000..46603a837 --- /dev/null +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/BaseIcebergIT.java @@ -0,0 +1,42 @@ +package com.snowflake.kafka.connector.streaming.iceberg; + +import static com.snowflake.kafka.connector.internal.TestUtils.executeQueryWithParameter; + +import com.snowflake.kafka.connector.internal.SnowflakeConnectionService; +import com.snowflake.kafka.connector.internal.TestUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; + +public class BaseIcebergIT { + + protected static SnowflakeConnectionService conn; + + @BeforeAll + public static void setup() { + conn = TestUtils.getConnectionServiceForStreaming(); + } + + @AfterAll + public static void teardown() { + conn.close(); + } + + protected static void createIcebergTable(String tableName) throws Exception { + String query = + "create or replace iceberg table identifier(?) (record_metadata object())" + + "external_volume = 'test_exvol'" + + "catalog = 'SNOWFLAKE'" + + "base_location = 'it'"; + executeQueryWithParameter(query, tableName); + } + + protected static void dropIcebergTable(String tableName) { + String query = "drop iceberg table if exists identifier(?)"; + executeQueryWithParameter(query, tableName); + } + + protected static void enableSchemaEvolution(String tableName) throws Exception { + String query = "alter iceberg table identifier(?) set enable_schema_evolution = true"; + executeQueryWithParameter(query, tableName); + } +} diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java index d9aca7905..249bde3c6 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java @@ -1,17 +1,15 @@ package com.snowflake.kafka.connector.streaming.iceberg; -import com.snowflake.kafka.connector.internal.SnowflakeConnectionService; import com.snowflake.kafka.connector.internal.TestUtils; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -public class IcebergTableSchemaValidatorIT { +public class IcebergTableSchemaValidatorIT extends BaseIcebergIT { private static final String TEST_ROLE = "testrole_kafka"; - private static final SnowflakeConnectionService conn = - TestUtils.getConnectionServiceForStreaming(); private static final IcebergTableSchemaValidator schemaValidator = new IcebergTableSchemaValidator(conn); @@ -24,14 +22,14 @@ public void setUp() { @AfterEach public void tearDown() { - TestUtils.dropIcebergTable(tableName); + dropIcebergTable(tableName); } @Test public void shouldValidateExpectedIcebergTableSchema() throws Exception { // given - TestUtils.createIcebergTable(tableName); - TestUtils.enableSchemaEvolution(tableName); + createIcebergTable(tableName); + enableSchemaEvolution(tableName); // when, then schemaValidator.validateTable(tableName, TEST_ROLE); @@ -39,13 +37,17 @@ public void shouldValidateExpectedIcebergTableSchema() throws Exception { @Test public void shouldThrowExceptionWhenTableDoesNotExist() { - // Assertions.assertThrows(RuntimeException.class, () -> - // schemaValidator.validateTable(tableName, TEST_ROLE)); + Assertions.assertThrows( + RuntimeException.class, () -> schemaValidator.validateTable(tableName, TEST_ROLE)); } @Test - public void shouldThrowExceptionWhenRecordMetadataDoesNotExist() {} + public void shouldThrowExceptionWhenRecordMetadataDoesNotExist() { + // TODO + } @Test - public void shouldThrowExceptionWhenRecordMetadataHasInvalidType() {} + public void shouldThrowExceptionWhenRecordMetadataHasInvalidType() { + // TODO + } } From 33e13eeb2e409b1001f8b720eaf070a0143e1552 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Bobowski?= Date: Fri, 20 Sep 2024 14:07:01 +0200 Subject: [PATCH 6/6] SNOW-1658905 Fix null pointer --- .../iceberg/IcebergTableSchemaValidatorIT.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java index 249bde3c6..abc8bca8e 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java @@ -3,6 +3,7 @@ import com.snowflake.kafka.connector.internal.TestUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -10,11 +11,17 @@ public class IcebergTableSchemaValidatorIT extends BaseIcebergIT { private static final String TEST_ROLE = "testrole_kafka"; - private static final IcebergTableSchemaValidator schemaValidator = - new IcebergTableSchemaValidator(conn); + private static IcebergTableSchemaValidator schemaValidator; private String tableName; + @BeforeAll + // overrides the base class @BeforeAll + public static void setup() { + conn = TestUtils.getConnectionServiceForStreaming(); + schemaValidator = new IcebergTableSchemaValidator(conn); + } + @BeforeEach public void setUp() { tableName = TestUtils.randomTableName();