diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java index d2d9e1ef2..dec066511 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java @@ -660,6 +660,7 @@ public void schemaExists(String schemaName) { } @Override + // TODO - move to test-only class public void dropPipe(final String pipeName) { checkConnection(); InternalUtils.assertNotEmpty("pipeName", pipeName); @@ -678,6 +679,7 @@ public void dropPipe(final String pipeName) { } @Override + // TODO - move to test-only class public boolean dropStageIfEmpty(final String stageName) { checkConnection(); InternalUtils.assertNotEmpty("stageName", stageName); @@ -761,6 +763,7 @@ public void moveToTableStage( } @Override + // TODO - move to test-only class public void moveToTableStage( final String tableName, final String stageName, final String prefix) { InternalUtils.assertNotEmpty("tableName", tableName); @@ -808,6 +811,7 @@ public List listStage(final String stageName, final String prefix) { @Override @Deprecated // Only using it in test for performance testing + // TODO - move to test-only class public void put(final String stageName, final String fileName, final String content) { InternalUtils.assertNotEmpty("stageName", stageName); SnowflakeConnectionV1 sfconn = (SnowflakeConnectionV1) conn; diff --git a/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java b/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java new file mode 100644 index 000000000..63e7ce389 --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java @@ -0,0 +1,35 @@ +package com.snowflake.kafka.connector.streaming.iceberg; + +import com.snowflake.kafka.connector.internal.SnowflakeConnectionService; + +/** Performs validations of Iceberg table schema on the connector startup. */ +public class IcebergTableSchemaValidator { + + private final SnowflakeConnectionService snowflakeConnectionService; + + public IcebergTableSchemaValidator(SnowflakeConnectionService snowflakeConnectionService) { + this.snowflakeConnectionService = snowflakeConnectionService; + } + + /** + * Ensure that table exists and record_metadata column is of type OBJECT(). + * + *

TODO SNOW-1658914 - write a test for table with record_metadata schema altered by the + * connector + */ + public void validateTable(String tableName, String role) { + // TODO - plug into connector startup + if (!snowflakeConnectionService.tableExist(tableName)) { + // TODO - better errors + throw new RuntimeException("TODO"); + } + + // TODO - why is it so slow? + if (!snowflakeConnectionService.hasSchemaEvolutionPermission(tableName, role)) { + // TODO - better errors + throw new RuntimeException("TODO"); + } + + // TODO - call describe table and analyze record_metadata schema + } +} diff --git a/src/test/java/com/snowflake/kafka/connector/internal/TestSnowflakeConnection.java b/src/test/java/com/snowflake/kafka/connector/internal/TestSnowflakeConnection.java new file mode 100644 index 000000000..cf05dc6f0 --- /dev/null +++ b/src/test/java/com/snowflake/kafka/connector/internal/TestSnowflakeConnection.java @@ -0,0 +1,23 @@ +package com.snowflake.kafka.connector.internal; + +import static com.snowflake.kafka.connector.internal.TestUtils.getConfFromFileName; + +import com.snowflake.client.jdbc.SnowflakeDriver; +import com.snowflake.kafka.connector.Utils; +import java.sql.Connection; +import java.util.Properties; + +/** Connection to test environment generated from a profile file stored locally. */ +public class TestSnowflakeConnection { + + /** Given a profile file path name, generate a connection by constructing a snowflake driver. */ + public static Connection getConnection() throws Exception { + SnowflakeURL url = + new SnowflakeURL(getConfFromFileName(TestUtils.PROFILE_PATH).get(Utils.SF_URL)); + + Properties properties = + InternalUtils.createProperties(getConfFromFileName(TestUtils.PROFILE_PATH), url); + + return new SnowflakeDriver().connect(url.getJdbcUrl(), properties); + } +} diff --git a/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java b/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java index cab7a389b..9ccc7064e 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java @@ -31,7 +31,6 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; -import com.snowflake.client.jdbc.SnowflakeDriver; import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; import com.snowflake.kafka.connector.Utils; import com.snowflake.kafka.connector.config.SnowflakeSinkConnectorConfigBuilder; @@ -46,7 +45,7 @@ import java.io.UnsupportedEncodingException; import java.nio.charset.StandardCharsets; import java.security.PrivateKey; -import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; @@ -55,7 +54,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.Random; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -128,14 +126,10 @@ public class TestUtils { Pattern.compile("^[^/]+/[^/]+/(\\d+)/(\\d+)_(key|value)_(\\d+)\\.gz$"); // profile path - private static final String PROFILE_PATH = "profile.json"; + public static final String PROFILE_PATH = "profile.json"; private static final ObjectMapper mapper = new ObjectMapper(); - private static Connection conn = null; - - private static Connection connForStreamingIngestTests = null; - private static Map conf = null; private static Map confWithOAuth = null; @@ -261,33 +255,6 @@ public static PrivateKey getPrivateKey() { return InternalUtils.parsePrivateKey(TestUtils.getKeyString()); } - /** - * Create snowflake jdbc connection - * - * @return jdbc connection - * @throws Exception when meeting error - */ - private static Connection getConnection() throws Exception { - if (conn != null) { - return conn; - } - - return generateConnectionToSnowflake(PROFILE_PATH); - } - - /** Given a profile file path name, generate a connection by constructing a snowflake driver. */ - private static Connection generateConnectionToSnowflake(final String profileFileName) - throws Exception { - SnowflakeURL url = new SnowflakeURL(getConfFromFileName(profileFileName).get(Utils.SF_URL)); - - Properties properties = - InternalUtils.createProperties(getConfFromFileName(profileFileName), url); - - Connection connToSnowflake = new SnowflakeDriver().connect(url.getJdbcUrl(), properties); - - return connToSnowflake; - } - /** * read conf file * @@ -412,7 +379,7 @@ public static Map getConfWithOAuth() { */ static ResultSet executeQuery(String query) { try { - Statement statement = getConnection().createStatement(); + Statement statement = TestSnowflakeConnection.getConnection().createStatement(); return statement.executeQuery(query); } // if ANY exceptions occur, an illegal state has been reached @@ -421,6 +388,23 @@ static ResultSet executeQuery(String query) { } } + /** + * execute sql query + * + * @param query sql query string + * @param parameter parameter to be inserted at index 1 + */ + public static void executeQueryWithParameter(String query, String parameter) { + try { + PreparedStatement stmt = TestSnowflakeConnection.getConnection().prepareStatement(query); + stmt.setString(1, parameter); + stmt.execute(); + stmt.close(); + } catch (Exception e) { + throw new RuntimeException("Error executing query: " + query, e); + } + } + /** * drop a table * diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/BaseIcebergIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/BaseIcebergIT.java new file mode 100644 index 000000000..46603a837 --- /dev/null +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/BaseIcebergIT.java @@ -0,0 +1,42 @@ +package com.snowflake.kafka.connector.streaming.iceberg; + +import static com.snowflake.kafka.connector.internal.TestUtils.executeQueryWithParameter; + +import com.snowflake.kafka.connector.internal.SnowflakeConnectionService; +import com.snowflake.kafka.connector.internal.TestUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; + +public class BaseIcebergIT { + + protected static SnowflakeConnectionService conn; + + @BeforeAll + public static void setup() { + conn = TestUtils.getConnectionServiceForStreaming(); + } + + @AfterAll + public static void teardown() { + conn.close(); + } + + protected static void createIcebergTable(String tableName) throws Exception { + String query = + "create or replace iceberg table identifier(?) (record_metadata object())" + + "external_volume = 'test_exvol'" + + "catalog = 'SNOWFLAKE'" + + "base_location = 'it'"; + executeQueryWithParameter(query, tableName); + } + + protected static void dropIcebergTable(String tableName) { + String query = "drop iceberg table if exists identifier(?)"; + executeQueryWithParameter(query, tableName); + } + + protected static void enableSchemaEvolution(String tableName) throws Exception { + String query = "alter iceberg table identifier(?) set enable_schema_evolution = true"; + executeQueryWithParameter(query, tableName); + } +} diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java new file mode 100644 index 000000000..abc8bca8e --- /dev/null +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java @@ -0,0 +1,60 @@ +package com.snowflake.kafka.connector.streaming.iceberg; + +import com.snowflake.kafka.connector.internal.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class IcebergTableSchemaValidatorIT extends BaseIcebergIT { + + private static final String TEST_ROLE = "testrole_kafka"; + + private static IcebergTableSchemaValidator schemaValidator; + + private String tableName; + + @BeforeAll + // overrides the base class @BeforeAll + public static void setup() { + conn = TestUtils.getConnectionServiceForStreaming(); + schemaValidator = new IcebergTableSchemaValidator(conn); + } + + @BeforeEach + public void setUp() { + tableName = TestUtils.randomTableName(); + } + + @AfterEach + public void tearDown() { + dropIcebergTable(tableName); + } + + @Test + public void shouldValidateExpectedIcebergTableSchema() throws Exception { + // given + createIcebergTable(tableName); + enableSchemaEvolution(tableName); + + // when, then + schemaValidator.validateTable(tableName, TEST_ROLE); + } + + @Test + public void shouldThrowExceptionWhenTableDoesNotExist() { + Assertions.assertThrows( + RuntimeException.class, () -> schemaValidator.validateTable(tableName, TEST_ROLE)); + } + + @Test + public void shouldThrowExceptionWhenRecordMetadataDoesNotExist() { + // TODO + } + + @Test + public void shouldThrowExceptionWhenRecordMetadataHasInvalidType() { + // TODO + } +}