Skip to content

Commit

Permalink
SNOW-1658905 Integration tests setup for Iceberg (#930)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-mbobowski committed Sep 20, 2024
1 parent f0e5f23 commit 64e6f24
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,7 @@ public void schemaExists(String schemaName) {
}

@Override
// TODO - move to test-only class
public void dropPipe(final String pipeName) {
checkConnection();
InternalUtils.assertNotEmpty("pipeName", pipeName);
Expand All @@ -678,6 +679,7 @@ public void dropPipe(final String pipeName) {
}

@Override
// TODO - move to test-only class
public boolean dropStageIfEmpty(final String stageName) {
checkConnection();
InternalUtils.assertNotEmpty("stageName", stageName);
Expand Down Expand Up @@ -761,6 +763,7 @@ public void moveToTableStage(
}

@Override
// TODO - move to test-only class
public void moveToTableStage(
final String tableName, final String stageName, final String prefix) {
InternalUtils.assertNotEmpty("tableName", tableName);
Expand Down Expand Up @@ -808,6 +811,7 @@ public List<String> listStage(final String stageName, final String prefix) {
@Override
@Deprecated
// Only using it in test for performance testing
// TODO - move to test-only class
public void put(final String stageName, final String fileName, final String content) {
InternalUtils.assertNotEmpty("stageName", stageName);
SnowflakeConnectionV1 sfconn = (SnowflakeConnectionV1) conn;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.snowflake.kafka.connector.streaming.iceberg;

import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;

/** Performs validations of Iceberg table schema on the connector startup. */
public class IcebergTableSchemaValidator {

private final SnowflakeConnectionService snowflakeConnectionService;

public IcebergTableSchemaValidator(SnowflakeConnectionService snowflakeConnectionService) {
this.snowflakeConnectionService = snowflakeConnectionService;
}

/**
* Ensure that table exists and record_metadata column is of type OBJECT().
*
* <p>TODO SNOW-1658914 - write a test for table with record_metadata schema altered by the
* connector
*/
public void validateTable(String tableName, String role) {
// TODO - plug into connector startup
if (!snowflakeConnectionService.tableExist(tableName)) {
// TODO - better errors
throw new RuntimeException("TODO");
}

// TODO - why is it so slow?
if (!snowflakeConnectionService.hasSchemaEvolutionPermission(tableName, role)) {
// TODO - better errors
throw new RuntimeException("TODO");
}

// TODO - call describe table and analyze record_metadata schema
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.snowflake.kafka.connector.internal;

import static com.snowflake.kafka.connector.internal.TestUtils.getConfFromFileName;

import com.snowflake.client.jdbc.SnowflakeDriver;
import com.snowflake.kafka.connector.Utils;
import java.sql.Connection;
import java.util.Properties;

/** Connection to test environment generated from a profile file stored locally. */
public class TestSnowflakeConnection {

/** Given a profile file path name, generate a connection by constructing a snowflake driver. */
public static Connection getConnection() throws Exception {
SnowflakeURL url =
new SnowflakeURL(getConfFromFileName(TestUtils.PROFILE_PATH).get(Utils.SF_URL));

Properties properties =
InternalUtils.createProperties(getConfFromFileName(TestUtils.PROFILE_PATH), url);

return new SnowflakeDriver().connect(url.getJdbcUrl(), properties);
}
}
56 changes: 20 additions & 36 deletions src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.snowflake.client.jdbc.SnowflakeDriver;
import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.config.SnowflakeSinkConnectorConfigBuilder;
Expand All @@ -46,7 +45,7 @@
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.security.PrivateKey;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
Expand All @@ -55,7 +54,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -128,14 +126,10 @@ public class TestUtils {
Pattern.compile("^[^/]+/[^/]+/(\\d+)/(\\d+)_(key|value)_(\\d+)\\.gz$");

// profile path
private static final String PROFILE_PATH = "profile.json";
public static final String PROFILE_PATH = "profile.json";

private static final ObjectMapper mapper = new ObjectMapper();

private static Connection conn = null;

private static Connection connForStreamingIngestTests = null;

private static Map<String, String> conf = null;

private static Map<String, String> confWithOAuth = null;
Expand Down Expand Up @@ -261,33 +255,6 @@ public static PrivateKey getPrivateKey() {
return InternalUtils.parsePrivateKey(TestUtils.getKeyString());
}

/**
* Create snowflake jdbc connection
*
* @return jdbc connection
* @throws Exception when meeting error
*/
private static Connection getConnection() throws Exception {
if (conn != null) {
return conn;
}

return generateConnectionToSnowflake(PROFILE_PATH);
}

/** Given a profile file path name, generate a connection by constructing a snowflake driver. */
private static Connection generateConnectionToSnowflake(final String profileFileName)
throws Exception {
SnowflakeURL url = new SnowflakeURL(getConfFromFileName(profileFileName).get(Utils.SF_URL));

Properties properties =
InternalUtils.createProperties(getConfFromFileName(profileFileName), url);

Connection connToSnowflake = new SnowflakeDriver().connect(url.getJdbcUrl(), properties);

return connToSnowflake;
}

/**
* read conf file
*
Expand Down Expand Up @@ -412,7 +379,7 @@ public static Map<String, String> getConfWithOAuth() {
*/
static ResultSet executeQuery(String query) {
try {
Statement statement = getConnection().createStatement();
Statement statement = TestSnowflakeConnection.getConnection().createStatement();
return statement.executeQuery(query);
}
// if ANY exceptions occur, an illegal state has been reached
Expand All @@ -421,6 +388,23 @@ static ResultSet executeQuery(String query) {
}
}

/**
* execute sql query
*
* @param query sql query string
* @param parameter parameter to be inserted at index 1
*/
public static void executeQueryWithParameter(String query, String parameter) {
try {
PreparedStatement stmt = TestSnowflakeConnection.getConnection().prepareStatement(query);
stmt.setString(1, parameter);
stmt.execute();
stmt.close();
} catch (Exception e) {
throw new RuntimeException("Error executing query: " + query, e);
}
}

/**
* drop a table
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.snowflake.kafka.connector.streaming.iceberg;

import static com.snowflake.kafka.connector.internal.TestUtils.executeQueryWithParameter;

import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;
import com.snowflake.kafka.connector.internal.TestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;

public class BaseIcebergIT {

protected static SnowflakeConnectionService conn;

@BeforeAll
public static void setup() {
conn = TestUtils.getConnectionServiceForStreaming();
}

@AfterAll
public static void teardown() {
conn.close();
}

protected static void createIcebergTable(String tableName) throws Exception {
String query =
"create or replace iceberg table identifier(?) (record_metadata object())"
+ "external_volume = 'test_exvol'"
+ "catalog = 'SNOWFLAKE'"
+ "base_location = 'it'";
executeQueryWithParameter(query, tableName);
}

protected static void dropIcebergTable(String tableName) {
String query = "drop iceberg table if exists identifier(?)";
executeQueryWithParameter(query, tableName);
}

protected static void enableSchemaEvolution(String tableName) throws Exception {
String query = "alter iceberg table identifier(?) set enable_schema_evolution = true";
executeQueryWithParameter(query, tableName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.snowflake.kafka.connector.streaming.iceberg;

import com.snowflake.kafka.connector.internal.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class IcebergTableSchemaValidatorIT extends BaseIcebergIT {

private static final String TEST_ROLE = "testrole_kafka";

private static IcebergTableSchemaValidator schemaValidator;

private String tableName;

@BeforeAll
// overrides the base class @BeforeAll
public static void setup() {
conn = TestUtils.getConnectionServiceForStreaming();
schemaValidator = new IcebergTableSchemaValidator(conn);
}

@BeforeEach
public void setUp() {
tableName = TestUtils.randomTableName();
}

@AfterEach
public void tearDown() {
dropIcebergTable(tableName);
}

@Test
public void shouldValidateExpectedIcebergTableSchema() throws Exception {
// given
createIcebergTable(tableName);
enableSchemaEvolution(tableName);

// when, then
schemaValidator.validateTable(tableName, TEST_ROLE);
}

@Test
public void shouldThrowExceptionWhenTableDoesNotExist() {
Assertions.assertThrows(
RuntimeException.class, () -> schemaValidator.validateTable(tableName, TEST_ROLE));
}

@Test
public void shouldThrowExceptionWhenRecordMetadataDoesNotExist() {
// TODO
}

@Test
public void shouldThrowExceptionWhenRecordMetadataHasInvalidType() {
// TODO
}
}

0 comments on commit 64e6f24

Please sign in to comment.