Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1658905 Integration tests setup for Iceberg #930

Merged
merged 6 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,7 @@ public void schemaExists(String schemaName) {
}

@Override
// TODO - move to test-only class
public void dropPipe(final String pipeName) {
checkConnection();
InternalUtils.assertNotEmpty("pipeName", pipeName);
Expand All @@ -678,6 +679,7 @@ public void dropPipe(final String pipeName) {
}

@Override
// TODO - move to test-only class
public boolean dropStageIfEmpty(final String stageName) {
checkConnection();
InternalUtils.assertNotEmpty("stageName", stageName);
Expand Down Expand Up @@ -761,6 +763,7 @@ public void moveToTableStage(
}

@Override
// TODO - move to test-only class
public void moveToTableStage(
final String tableName, final String stageName, final String prefix) {
InternalUtils.assertNotEmpty("tableName", tableName);
Expand Down Expand Up @@ -808,6 +811,7 @@ public List<String> listStage(final String stageName, final String prefix) {
@Override
@Deprecated
// Only using it in test for performance testing
// TODO - move to test-only class
public void put(final String stageName, final String fileName, final String content) {
InternalUtils.assertNotEmpty("stageName", stageName);
SnowflakeConnectionV1 sfconn = (SnowflakeConnectionV1) conn;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.snowflake.kafka.connector.streaming.iceberg;

import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;

/** Performs validations of Iceberg table schema on the connector startup. */
public class IcebergTableSchemaValidator {

private final SnowflakeConnectionService snowflakeConnectionService;

public IcebergTableSchemaValidator(SnowflakeConnectionService snowflakeConnectionService) {
this.snowflakeConnectionService = snowflakeConnectionService;
}

/**
* Ensure that table exists and record_metadata column is of type OBJECT().
*
* <p>TODO SNOW-1658914 - write a test for table with record_metadata schema altered by the
* connector
*/
public void validateTable(String tableName, String role) {
// TODO - plug into connector startup
if (!snowflakeConnectionService.tableExist(tableName)) {
// TODO - better errors
throw new RuntimeException("TODO");
}

// TODO - why is it so slow?
if (!snowflakeConnectionService.hasSchemaEvolutionPermission(tableName, role)) {
// TODO - better errors
throw new RuntimeException("TODO");
}

// TODO - call describe table and analyze record_metadata schema
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.snowflake.kafka.connector.internal;

import static com.snowflake.kafka.connector.internal.TestUtils.getConfFromFileName;

import com.snowflake.client.jdbc.SnowflakeDriver;
import com.snowflake.kafka.connector.Utils;
import java.sql.Connection;
import java.util.Properties;

/** Connection to test environment generated from a profile file stored locally. */
public class TestSnowflakeConnection {

/** Given a profile file path name, generate a connection by constructing a snowflake driver. */
public static Connection getConnection() throws Exception {
SnowflakeURL url =
new SnowflakeURL(getConfFromFileName(TestUtils.PROFILE_PATH).get(Utils.SF_URL));

Properties properties =
InternalUtils.createProperties(getConfFromFileName(TestUtils.PROFILE_PATH), url);

return new SnowflakeDriver().connect(url.getJdbcUrl(), properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.snowflake.client.jdbc.SnowflakeDriver;
import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.config.SnowflakeSinkConnectorConfigBuilder;
Expand All @@ -46,7 +45,7 @@
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.security.PrivateKey;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
Expand All @@ -55,7 +54,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -128,14 +126,10 @@ public class TestUtils {
Pattern.compile("^[^/]+/[^/]+/(\\d+)/(\\d+)_(key|value)_(\\d+)\\.gz$");

// profile path
private static final String PROFILE_PATH = "profile.json";
public static final String PROFILE_PATH = "profile.json";

private static final ObjectMapper mapper = new ObjectMapper();

private static Connection conn = null;

private static Connection connForStreamingIngestTests = null;

private static Map<String, String> conf = null;

private static Map<String, String> confWithOAuth = null;
Expand Down Expand Up @@ -261,33 +255,6 @@ public static PrivateKey getPrivateKey() {
return InternalUtils.parsePrivateKey(TestUtils.getKeyString());
}

/**
* Create snowflake jdbc connection
*
* @return jdbc connection
* @throws Exception when meeting error
*/
private static Connection getConnection() throws Exception {
if (conn != null) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this was the only usage of the conn, so it was never initialized...

return conn;
}

return generateConnectionToSnowflake(PROFILE_PATH);
}

/** Given a profile file path name, generate a connection by constructing a snowflake driver. */
private static Connection generateConnectionToSnowflake(final String profileFileName)
throws Exception {
SnowflakeURL url = new SnowflakeURL(getConfFromFileName(profileFileName).get(Utils.SF_URL));

Properties properties =
InternalUtils.createProperties(getConfFromFileName(profileFileName), url);

Connection connToSnowflake = new SnowflakeDriver().connect(url.getJdbcUrl(), properties);

return connToSnowflake;
}

/**
* read conf file
*
Expand Down Expand Up @@ -412,7 +379,7 @@ public static Map<String, String> getConfWithOAuth() {
*/
static ResultSet executeQuery(String query) {
try {
Statement statement = getConnection().createStatement();
Statement statement = TestSnowflakeConnection.getConnection().createStatement();
return statement.executeQuery(query);
}
// if ANY exceptions occur, an illegal state has been reached
Expand All @@ -421,6 +388,23 @@ static ResultSet executeQuery(String query) {
}
}

/**
* execute sql query
*
* @param query sql query string
* @param parameter parameter to be inserted at index 1
*/
public static void executeQueryWithParameter(String query, String parameter) {
try {
PreparedStatement stmt = TestSnowflakeConnection.getConnection().prepareStatement(query);
stmt.setString(1, parameter);
stmt.execute();
stmt.close();
} catch (Exception e) {
throw new RuntimeException("Error executing query: " + query, e);
}
}

/**
* drop a table
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.snowflake.kafka.connector.streaming.iceberg;

import static com.snowflake.kafka.connector.internal.TestUtils.executeQueryWithParameter;

import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;
import com.snowflake.kafka.connector.internal.TestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;

public class BaseIcebergIT {

protected static SnowflakeConnectionService conn;

@BeforeAll
public static void setup() {
conn = TestUtils.getConnectionServiceForStreaming();
}

@AfterAll
public static void teardown() {
conn.close();
}

protected static void createIcebergTable(String tableName) throws Exception {
String query =
"create or replace iceberg table identifier(?) (record_metadata object())"
+ "external_volume = 'test_exvol'"
+ "catalog = 'SNOWFLAKE'"
+ "base_location = 'it'";
executeQueryWithParameter(query, tableName);
}

protected static void dropIcebergTable(String tableName) {
String query = "drop iceberg table if exists identifier(?)";
executeQueryWithParameter(query, tableName);
}

protected static void enableSchemaEvolution(String tableName) throws Exception {
String query = "alter iceberg table identifier(?) set enable_schema_evolution = true";
executeQueryWithParameter(query, tableName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.snowflake.kafka.connector.streaming.iceberg;

import com.snowflake.kafka.connector.internal.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class IcebergTableSchemaValidatorIT extends BaseIcebergIT {

private static final String TEST_ROLE = "testrole_kafka";

private static final IcebergTableSchemaValidator schemaValidator =
new IcebergTableSchemaValidator(conn);

private String tableName;

@BeforeEach
public void setUp() {
tableName = TestUtils.randomTableName();
}

@AfterEach
public void tearDown() {
dropIcebergTable(tableName);
}

@Test
public void shouldValidateExpectedIcebergTableSchema() throws Exception {
// given
createIcebergTable(tableName);
enableSchemaEvolution(tableName);

// when, then
schemaValidator.validateTable(tableName, TEST_ROLE);
}

@Test
public void shouldThrowExceptionWhenTableDoesNotExist() {
Assertions.assertThrows(
RuntimeException.class, () -> schemaValidator.validateTable(tableName, TEST_ROLE));
}

@Test
public void shouldThrowExceptionWhenRecordMetadataDoesNotExist() {
// TODO
}

@Test
public void shouldThrowExceptionWhenRecordMetadataHasInvalidType() {
// TODO
}
}
Loading