diff --git a/src/main/java/com/snowflake/kafka/connector/DefaultConnectorConfigValidator.java b/src/main/java/com/snowflake/kafka/connector/DefaultConnectorConfigValidator.java index c40fb4277..90c07e13b 100644 --- a/src/main/java/com/snowflake/kafka/connector/DefaultConnectorConfigValidator.java +++ b/src/main/java/com/snowflake/kafka/connector/DefaultConnectorConfigValidator.java @@ -21,9 +21,13 @@ public class DefaultConnectorConfigValidator implements ConnectorConfigValidator new KCLogger(DefaultConnectorConfigValidator.class.getName()); private final StreamingConfigValidator streamingConfigValidator; + private final StreamingConfigValidator icebergConfigValidator; - public DefaultConnectorConfigValidator(StreamingConfigValidator streamingConfigValidator) { + public DefaultConnectorConfigValidator( + StreamingConfigValidator streamingConfigValidator, + StreamingConfigValidator icebergConfigValidator) { this.streamingConfigValidator = streamingConfigValidator; + this.icebergConfigValidator = icebergConfigValidator; } /** @@ -253,6 +257,7 @@ && parseTopicToTableMap(config.get(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MA // Check all config values for ingestion method == IngestionMethodConfig.SNOWPIPE_STREAMING invalidConfigParams.putAll(streamingConfigValidator.validate(config)); + invalidConfigParams.putAll(icebergConfigValidator.validate(config)); // logs and throws exception if there are invalid params handleInvalidParameters(ImmutableMap.copyOf(invalidConfigParams)); diff --git a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnector.java b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnector.java index 2744d7834..cd8bac55e 100644 --- a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnector.java +++ b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnector.java @@ -17,6 +17,7 @@ package com.snowflake.kafka.connector; import com.snowflake.kafka.connector.config.ConnectorConfigDefinition; +import com.snowflake.kafka.connector.config.IcebergConfigValidator; import com.snowflake.kafka.connector.internal.KCLogger; import com.snowflake.kafka.connector.internal.SnowflakeConnectionService; import com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceFactory; @@ -66,7 +67,8 @@ public class SnowflakeSinkConnector extends SinkConnector { private boolean setupComplete; private final ConnectorConfigValidator connectorConfigValidator = - new DefaultConnectorConfigValidator(new DefaultStreamingConfigValidator()); + new DefaultConnectorConfigValidator( + new DefaultStreamingConfigValidator(), new IcebergConfigValidator()); /** No-Arg constructor. Required by Kafka Connect framework */ public SnowflakeSinkConnector() { diff --git a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java index e7c0907dd..8ec9482f6 100644 --- a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java +++ b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java @@ -139,6 +139,10 @@ public class SnowflakeSinkConnectorConfig { public static final String SNOWPIPE_STREAMING_CLIENT_PROVIDER_OVERRIDE_MAP = "snowflake.streaming.client.provider.override.map"; + // Iceberg + public static final String ICEBERG_ENABLED = "snowflake.streaming.iceberg.enabled"; + public static final boolean ICEBERG_ENABLED_DEFAULT_VALUE = false; + // TESTING public static final String REBALANCING = "snowflake.test.rebalancing"; public static final boolean REBALANCING_DEFAULT = false; diff --git a/src/main/java/com/snowflake/kafka/connector/config/ConnectorConfigDefinition.java b/src/main/java/com/snowflake/kafka/connector/config/ConnectorConfigDefinition.java index ab18394f9..a4314cd49 100644 --- a/src/main/java/com/snowflake/kafka/connector/config/ConnectorConfigDefinition.java +++ b/src/main/java/com/snowflake/kafka/connector/config/ConnectorConfigDefinition.java @@ -461,6 +461,13 @@ public static ConfigDef getConfig() { ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS_DEFAULT, ConfigDef.Importance.LOW, "If set to true the Connector will fail its tasks when authorization error from" - + " Snowflake occurred"); + + " Snowflake occurred") + .define( + ICEBERG_ENABLED, + ConfigDef.Type.BOOLEAN, + ICEBERG_ENABLED_DEFAULT_VALUE, + ConfigDef.Importance.HIGH, + "When set to true the connector will ingest data into the Iceberg table. Check the" + + " official Snowflake documentation for the prerequisites."); } } diff --git a/src/main/java/com/snowflake/kafka/connector/config/IcebergConfigValidator.java b/src/main/java/com/snowflake/kafka/connector/config/IcebergConfigValidator.java new file mode 100644 index 000000000..ecc8c8ff8 --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/config/IcebergConfigValidator.java @@ -0,0 +1,46 @@ +package com.snowflake.kafka.connector.config; + +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.*; +import static com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig.SNOWPIPE_STREAMING; + +import com.google.common.collect.ImmutableMap; +import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; +import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig; +import com.snowflake.kafka.connector.internal.streaming.StreamingConfigValidator; +import java.util.HashMap; +import java.util.Map; + +/** Validates dependencies between parameters in Iceberg mode. */ +public class IcebergConfigValidator implements StreamingConfigValidator { + + private static final String NO_SCHEMATIZATION_ERROR_MESSAGE = + "Ingestion to Iceberg table requires " + ENABLE_SCHEMATIZATION_CONFIG + " set to true"; + private static final String INCOMPATIBLE_INGESTION_METHOD = + "Ingestion to Iceberg table is supported only for Snowpipe Streaming"; + + @Override + public ImmutableMap validate(Map inputConfig) { + boolean isIcebergEnabled = Boolean.parseBoolean(inputConfig.get(ICEBERG_ENABLED)); + + if (!isIcebergEnabled) { + return ImmutableMap.of(); + } + + Map validationErrors = new HashMap<>(); + + boolean isSchematizationEnabled = + Boolean.parseBoolean( + inputConfig.get(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG)); + IngestionMethodConfig ingestionMethod = + IngestionMethodConfig.valueOf(inputConfig.get(INGESTION_METHOD_OPT).toUpperCase()); + + if (!isSchematizationEnabled) { + validationErrors.put(ENABLE_SCHEMATIZATION_CONFIG, NO_SCHEMATIZATION_ERROR_MESSAGE); + } + if (ingestionMethod != SNOWPIPE_STREAMING) { + validationErrors.put(INGESTION_METHOD_OPT, INCOMPATIBLE_INGESTION_METHOD); + } + + return ImmutableMap.copyOf(validationErrors); + } +} diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingConfigValidator.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingConfigValidator.java index 1649cf550..e04150d8c 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingConfigValidator.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingConfigValidator.java @@ -4,6 +4,7 @@ import java.util.Map; /** Validates connector config for Snowpipe Streaming */ +// TODO (separate PR) - rename to ConfigValidator and return an ordinary Map public interface StreamingConfigValidator { /** diff --git a/src/test/java/com/snowflake/kafka/connector/ConnectorConfigValidatorLogsTest.java b/src/test/java/com/snowflake/kafka/connector/ConnectorConfigValidatorLogsTest.java index 5669e927a..623cfc1ae 100644 --- a/src/test/java/com/snowflake/kafka/connector/ConnectorConfigValidatorLogsTest.java +++ b/src/test/java/com/snowflake/kafka/connector/ConnectorConfigValidatorLogsTest.java @@ -3,6 +3,7 @@ import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.*; import static com.snowflake.kafka.connector.internal.TestUtils.getConfig; +import com.snowflake.kafka.connector.config.IcebergConfigValidator; import com.snowflake.kafka.connector.internal.EncryptionUtils; import com.snowflake.kafka.connector.internal.FIPSTest; import com.snowflake.kafka.connector.internal.TestUtils; @@ -22,7 +23,8 @@ public class ConnectorConfigValidatorLogsTest { private final ConnectorConfigValidator connectorConfigValidator = - new DefaultConnectorConfigValidator(new DefaultStreamingConfigValidator()); + new DefaultConnectorConfigValidator( + new DefaultStreamingConfigValidator(), new IcebergConfigValidator()); @Test public void testRSAPasswordOutput() throws IOException, OperatorCreationException { diff --git a/src/test/java/com/snowflake/kafka/connector/ConnectorConfigValidatorTest.java b/src/test/java/com/snowflake/kafka/connector/ConnectorConfigValidatorTest.java index c27277c09..ea84dd6fc 100644 --- a/src/test/java/com/snowflake/kafka/connector/ConnectorConfigValidatorTest.java +++ b/src/test/java/com/snowflake/kafka/connector/ConnectorConfigValidatorTest.java @@ -19,6 +19,8 @@ import static org.assertj.core.api.Assertions.*; import static org.junit.Assert.assertEquals; +import com.snowflake.kafka.connector.config.IcebergConfigValidator; +import com.snowflake.kafka.connector.config.SnowflakeSinkConnectorConfigBuilder; import com.snowflake.kafka.connector.internal.SnowflakeErrors; import com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException; import com.snowflake.kafka.connector.internal.streaming.DefaultStreamingConfigValidator; @@ -41,7 +43,8 @@ public class ConnectorConfigValidatorTest { private final ConnectorConfigValidator connectorConfigValidator = - new DefaultConnectorConfigValidator(new DefaultStreamingConfigValidator()); + new DefaultConnectorConfigValidator( + new DefaultStreamingConfigValidator(), new IcebergConfigValidator()); // subset of valid community converters public static final List COMMUNITY_CONVERTER_SUBSET = @@ -63,14 +66,16 @@ private static Stream customSnowflakeConverters() { @Test public void testConfig() { - Map config = getConfig(); + Map config = SnowflakeSinkConnectorConfigBuilder.snowpipeConfig().build(); connectorConfigValidator.validateConfig(config); } @Test public void testConfig_ConvertedInvalidAppName() { - Map config = getConfig(); - config.put(NAME, "testConfig.snowflake-connector"); + Map config = + SnowflakeSinkConnectorConfigBuilder.snowpipeConfig() + .withName("testConfig.snowflake-connector") + .build(); Utils.convertAppName(config); diff --git a/src/test/java/com/snowflake/kafka/connector/config/IcebergConfigValidationTest.java b/src/test/java/com/snowflake/kafka/connector/config/IcebergConfigValidationTest.java new file mode 100644 index 000000000..421555428 --- /dev/null +++ b/src/test/java/com/snowflake/kafka/connector/config/IcebergConfigValidationTest.java @@ -0,0 +1,59 @@ +package com.snowflake.kafka.connector.config; + +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG; +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT; + +import com.google.common.collect.ImmutableMap; +import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig; +import com.snowflake.kafka.connector.internal.streaming.StreamingConfigValidator; +import java.util.Map; +import java.util.stream.Stream; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +public class IcebergConfigValidationTest { + + private static final StreamingConfigValidator validator = new IcebergConfigValidator(); + + @ParameterizedTest + @MethodSource("validConfigs") + public void shouldValidateCorrectConfig(Map config) { + // when + ImmutableMap invalidParameters = validator.validate(config); + + // then + Assertions.assertTrue(invalidParameters.isEmpty()); + } + + @ParameterizedTest + @MethodSource("invalidConfigs") + public void shouldReturnErrorOnInvalidConfig(Map config, String errorKey) { + // when + ImmutableMap invalidParameters = validator.validate(config); + + // then + Assertions.assertTrue(invalidParameters.containsKey(errorKey)); + } + + public static Stream validConfigs() { + return Stream.of( + Arguments.of(SnowflakeSinkConnectorConfigBuilder.snowpipeConfig().build()), + Arguments.of(SnowflakeSinkConnectorConfigBuilder.icebergConfig().build())); + } + + public static Stream invalidConfigs() { + return Stream.of( + Arguments.of( + SnowflakeSinkConnectorConfigBuilder.icebergConfig() + .withIngestionMethod(IngestionMethodConfig.SNOWPIPE) + .build(), + INGESTION_METHOD_OPT), + Arguments.of( + SnowflakeSinkConnectorConfigBuilder.icebergConfig() + .withSchematizationEnabled(false) + .build(), + ENABLE_SCHEMATIZATION_CONFIG)); + } +} diff --git a/src/test/java/com/snowflake/kafka/connector/config/SnowflakeSinkConnectorConfigBuilder.java b/src/test/java/com/snowflake/kafka/connector/config/SnowflakeSinkConnectorConfigBuilder.java new file mode 100644 index 000000000..21c98e75c --- /dev/null +++ b/src/test/java/com/snowflake/kafka/connector/config/SnowflakeSinkConnectorConfigBuilder.java @@ -0,0 +1,119 @@ +package com.snowflake.kafka.connector.config; + +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG; +import static com.snowflake.kafka.connector.Utils.*; +import static com.snowflake.kafka.connector.Utils.SF_DATABASE; + +import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; +import com.snowflake.kafka.connector.Utils; +import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig; +import java.util.HashMap; +import java.util.Map; + +/** + * This is a builder class for the connector config. For now it returns map. Let's change it to a + * more convenient abstraction when we have it. + */ +public class SnowflakeSinkConnectorConfigBuilder { + + private final Map config = new HashMap(); + + private SnowflakeSinkConnectorConfigBuilder() {} + + public static SnowflakeSinkConnectorConfigBuilder snowpipeConfig() { + return commonRequiredFields().withIngestionMethod(IngestionMethodConfig.SNOWPIPE); + } + + public static SnowflakeSinkConnectorConfigBuilder icebergConfig() { + return commonRequiredFields() + .withIcebergEnabled() + .withIngestionMethod(IngestionMethodConfig.SNOWPIPE_STREAMING) + .withSchematizationEnabled(true) + .withRole("role"); + } + + private static SnowflakeSinkConnectorConfigBuilder commonRequiredFields() { + return new SnowflakeSinkConnectorConfigBuilder() + .withName("test") + .withTopics("topic1,topic2") + .withUrl("https://testaccount.snowflake.com:443") + .withSchema("testSchema") + .withDatabase("testDatabase") + .withUser("userName") + .withPrivateKey("fdsfsdfsdfdsfdsrqwrwewrwrew42314424") + .withDefaultBufferConfig(); + } + + public SnowflakeSinkConnectorConfigBuilder withName(String name) { + config.put(Utils.NAME, name); + return this; + } + + public SnowflakeSinkConnectorConfigBuilder withTopics(String topics) { + config.put(SnowflakeSinkConnectorConfig.TOPICS, topics); + return this; + } + + public SnowflakeSinkConnectorConfigBuilder withUrl(String url) { + config.put(SF_URL, url); + return this; + } + + public SnowflakeSinkConnectorConfigBuilder withDatabase(String database) { + config.put(SF_DATABASE, database); + return this; + } + + public SnowflakeSinkConnectorConfigBuilder withSchema(String schema) { + config.put(SF_SCHEMA, schema); + return this; + } + + public SnowflakeSinkConnectorConfigBuilder withUser(String user) { + config.put(SF_USER, user); + return this; + } + + public SnowflakeSinkConnectorConfigBuilder withPrivateKey(String privateKey) { + config.put(Utils.SF_PRIVATE_KEY, privateKey); + return this; + } + + public SnowflakeSinkConnectorConfigBuilder withDefaultBufferConfig() { + config.put( + SnowflakeSinkConnectorConfig.BUFFER_COUNT_RECORDS, + SnowflakeSinkConnectorConfig.BUFFER_COUNT_RECORDS_DEFAULT + ""); + config.put( + SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES, + SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES_DEFAULT + ""); + config.put( + SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC, + SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC_DEFAULT + ""); + return this; + } + + public SnowflakeSinkConnectorConfigBuilder withIngestionMethod( + IngestionMethodConfig ingestionMethod) { + config.put(SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT, ingestionMethod.toString()); + return this; + } + + public SnowflakeSinkConnectorConfigBuilder withIcebergEnabled() { + config.put(SnowflakeSinkConnectorConfig.ICEBERG_ENABLED, "true"); + return this; + } + + public SnowflakeSinkConnectorConfigBuilder withRole(String role) { + config.put(SF_ROLE, role); + return this; + } + + public SnowflakeSinkConnectorConfigBuilder withSchematizationEnabled(boolean enabled) { + config.put(ENABLE_SCHEMATIZATION_CONFIG, Boolean.toString(enabled)); + return this; + } + + public Map build() { + return config; + } +} diff --git a/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java b/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java index 610ede649..cab7a389b 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java @@ -26,10 +26,6 @@ import static com.snowflake.kafka.connector.Utils.HTTP_PROXY_USER; import static com.snowflake.kafka.connector.Utils.HTTP_USE_PROXY; import static com.snowflake.kafka.connector.Utils.JDK_HTTP_AUTH_TUNNELING; -import static com.snowflake.kafka.connector.Utils.SF_DATABASE; -import static com.snowflake.kafka.connector.Utils.SF_SCHEMA; -import static com.snowflake.kafka.connector.Utils.SF_URL; -import static com.snowflake.kafka.connector.Utils.SF_USER; import static com.snowflake.kafka.connector.Utils.buildOAuthHttpPostRequest; import static com.snowflake.kafka.connector.Utils.getSnowflakeOAuthToken; @@ -38,6 +34,7 @@ import com.snowflake.client.jdbc.SnowflakeDriver; import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; import com.snowflake.kafka.connector.Utils; +import com.snowflake.kafka.connector.config.SnowflakeSinkConnectorConfigBuilder; import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig; import com.snowflake.kafka.connector.records.SnowflakeJsonSchema; import com.snowflake.kafka.connector.records.SnowflakeRecordContent; @@ -762,25 +759,10 @@ public static List createBigAvroRecords( return records; } + /** @deprecated use SnowflakeSinkConnectorConfigBuilder instead */ + @Deprecated public static Map getConfig() { - Map config = new HashMap<>(); - config.put(Utils.NAME, "test"); - config.put(SnowflakeSinkConnectorConfig.TOPICS, "topic1,topic2"); - config.put(SF_URL, "https://testaccount.snowflake.com:443"); - config.put(SF_USER, "userName"); - config.put(Utils.SF_PRIVATE_KEY, "fdsfsdfsdfdsfdsrqwrwewrwrew42314424"); - config.put(SF_SCHEMA, "testSchema"); - config.put(SF_DATABASE, "testDatabase"); - config.put( - SnowflakeSinkConnectorConfig.BUFFER_COUNT_RECORDS, - SnowflakeSinkConnectorConfig.BUFFER_COUNT_RECORDS_DEFAULT + ""); - config.put( - SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES, - SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES_DEFAULT + ""); - config.put( - SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC, - SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC_DEFAULT + ""); - return config; + return SnowflakeSinkConnectorConfigBuilder.snowpipeConfig().build(); } /**