Skip to content

Commit

Permalink
SNOW-1658905 Test improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-mbobowski committed Sep 20, 2024
1 parent 7007096 commit 76b7e8c
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,21 @@ public IcebergTableSchemaValidator(SnowflakeConnectionService snowflakeConnectio
}

/**
* Ensure that table exists and record_metadata column is of type OBJECT(). TODO SNOW-1658914 -
* write a test for table with record_metadata schema altered by the connector
* Ensure that table exists and record_metadata column is of type OBJECT().
*
* <p>TODO SNOW-1658914 - write a test for table with record_metadata schema altered by the
* connector
*/
public void validateTable(String tableName, String role) {
// TODO - plug into connector startup
if (!snowflakeConnectionService.tableExist(tableName)) {
// TODO - better errors
throw new RuntimeException("TODO");
}

// TODO - why is it so slow?
if (!snowflakeConnectionService.hasSchemaEvolutionPermission(tableName, role)) {
// TODO - better errors
throw new RuntimeException("TODO");
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.snowflake.kafka.connector.internal;

import static com.snowflake.kafka.connector.internal.TestUtils.getConfFromFileName;

import com.snowflake.client.jdbc.SnowflakeDriver;
import com.snowflake.kafka.connector.Utils;
import java.sql.Connection;
import java.util.Properties;

/** Connection to test environment generated from a profile file stored locally. */
public class TestSnowflakeConnection {

/** Given a profile file path name, generate a connection by constructing a snowflake driver. */
public static Connection getConnection() throws Exception {
SnowflakeURL url =
new SnowflakeURL(getConfFromFileName(TestUtils.PROFILE_PATH).get(Utils.SF_URL));

Properties properties =
InternalUtils.createProperties(getConfFromFileName(TestUtils.PROFILE_PATH), url);

return new SnowflakeDriver().connect(url.getJdbcUrl(), properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.snowflake.client.jdbc.SnowflakeDriver;
import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.config.SnowflakeSinkConnectorConfigBuilder;
Expand All @@ -46,7 +45,6 @@
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.security.PrivateKey;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
Expand All @@ -56,7 +54,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -129,14 +126,10 @@ public class TestUtils {
Pattern.compile("^[^/]+/[^/]+/(\\d+)/(\\d+)_(key|value)_(\\d+)\\.gz$");

// profile path
private static final String PROFILE_PATH = "profile.json";
public static final String PROFILE_PATH = "profile.json";

private static final ObjectMapper mapper = new ObjectMapper();

private static Connection conn = null;

private static Connection connForStreamingIngestTests = null;

private static Map<String, String> conf = null;

private static Map<String, String> confWithOAuth = null;
Expand Down Expand Up @@ -262,33 +255,6 @@ public static PrivateKey getPrivateKey() {
return InternalUtils.parsePrivateKey(TestUtils.getKeyString());
}

/**
* Create snowflake jdbc connection
*
* @return jdbc connection
* @throws Exception when meeting error
*/
private static Connection getConnection() throws Exception {
if (conn != null) {
return conn;
}

return generateConnectionToSnowflake(PROFILE_PATH);
}

/** Given a profile file path name, generate a connection by constructing a snowflake driver. */
private static Connection generateConnectionToSnowflake(final String profileFileName)
throws Exception {
SnowflakeURL url = new SnowflakeURL(getConfFromFileName(profileFileName).get(Utils.SF_URL));

Properties properties =
InternalUtils.createProperties(getConfFromFileName(profileFileName), url);

Connection connToSnowflake = new SnowflakeDriver().connect(url.getJdbcUrl(), properties);

return connToSnowflake;
}

/**
* read conf file
*
Expand Down Expand Up @@ -413,7 +379,7 @@ public static Map<String, String> getConfWithOAuth() {
*/
static ResultSet executeQuery(String query) {
try {
Statement statement = getConnection().createStatement();
Statement statement = TestSnowflakeConnection.getConnection().createStatement();
return statement.executeQuery(query);
}
// if ANY exceptions occur, an illegal state has been reached
Expand All @@ -428,9 +394,9 @@ static ResultSet executeQuery(String query) {
* @param query sql query string
* @param parameter parameter to be inserted at index 1
*/
static void executeQueryWithParameter(String query, String parameter) {
public static void executeQueryWithParameter(String query, String parameter) {
try {
PreparedStatement stmt = getConnection().prepareStatement(query);
PreparedStatement stmt = TestSnowflakeConnection.getConnection().prepareStatement(query);
stmt.setString(1, parameter);
stmt.execute();
stmt.close();
Expand All @@ -450,35 +416,6 @@ public static void dropTable(String tableName) {
executeQuery(query);
}

/**
* drop an iceberg table
*
* @param tableName table name
*/
public static void dropIcebergTable(String tableName) {
String query = "drop iceberg table if exists identifier(?)";
executeQueryWithParameter(query, tableName);
}

/**
* create an iceberg table
*
* @param tableName table name
*/
public static void createIcebergTable(String tableName) throws Exception {
String query =
"create or replace iceberg table identifier(?) (record_metadata object())"
+ "external_volume = 'test_exvol'"
+ "catalog = 'SNOWFLAKE'"
+ "base_location = 'it'";
executeQueryWithParameter(query, tableName);
}

public static void enableSchemaEvolution(String tableName) throws Exception {
String query = "alter iceberg table identifier(?) set enable_schema_evolution = true";
executeQueryWithParameter(query, tableName);
}

/** Select * from table */
public static ResultSet showTable(String tableName) {
String query = "select * from " + tableName;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.snowflake.kafka.connector.streaming.iceberg;

import static com.snowflake.kafka.connector.internal.TestUtils.executeQueryWithParameter;

import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;
import com.snowflake.kafka.connector.internal.TestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;

public class BaseIcebergIT {

protected static SnowflakeConnectionService conn;

@BeforeAll
public static void setup() {
conn = TestUtils.getConnectionServiceForStreaming();
}

@AfterAll
public static void teardown() {
conn.close();
}

protected static void createIcebergTable(String tableName) throws Exception {
String query =
"create or replace iceberg table identifier(?) (record_metadata object())"
+ "external_volume = 'test_exvol'"
+ "catalog = 'SNOWFLAKE'"
+ "base_location = 'it'";
executeQueryWithParameter(query, tableName);
}

protected static void dropIcebergTable(String tableName) {
String query = "drop iceberg table if exists identifier(?)";
executeQueryWithParameter(query, tableName);
}

protected static void enableSchemaEvolution(String tableName) throws Exception {
String query = "alter iceberg table identifier(?) set enable_schema_evolution = true";
executeQueryWithParameter(query, tableName);
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
package com.snowflake.kafka.connector.streaming.iceberg;

import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;
import com.snowflake.kafka.connector.internal.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class IcebergTableSchemaValidatorIT {
public class IcebergTableSchemaValidatorIT extends BaseIcebergIT {

private static final String TEST_ROLE = "testrole_kafka";

private static final SnowflakeConnectionService conn =
TestUtils.getConnectionServiceForStreaming();
private static final IcebergTableSchemaValidator schemaValidator =
new IcebergTableSchemaValidator(conn);

Expand All @@ -24,28 +22,32 @@ public void setUp() {

@AfterEach
public void tearDown() {
TestUtils.dropIcebergTable(tableName);
dropIcebergTable(tableName);
}

@Test
public void shouldValidateExpectedIcebergTableSchema() throws Exception {
// given
TestUtils.createIcebergTable(tableName);
TestUtils.enableSchemaEvolution(tableName);
createIcebergTable(tableName);
enableSchemaEvolution(tableName);

// when, then
schemaValidator.validateTable(tableName, TEST_ROLE);
}

@Test
public void shouldThrowExceptionWhenTableDoesNotExist() {
// Assertions.assertThrows(RuntimeException.class, () ->
// schemaValidator.validateTable(tableName, TEST_ROLE));
Assertions.assertThrows(
RuntimeException.class, () -> schemaValidator.validateTable(tableName, TEST_ROLE));
}

@Test
public void shouldThrowExceptionWhenRecordMetadataDoesNotExist() {}
public void shouldThrowExceptionWhenRecordMetadataDoesNotExist() {
// TODO
}

@Test
public void shouldThrowExceptionWhenRecordMetadataHasInvalidType() {}
public void shouldThrowExceptionWhenRecordMetadataHasInvalidType() {
// TODO
}
}

0 comments on commit 76b7e8c

Please sign in to comment.