diff --git a/src/main/java/com/snowflake/kafka/connector/ConnectorConfigValidator.java b/src/main/java/com/snowflake/kafka/connector/ConnectorConfigValidator.java new file mode 100644 index 000000000..9b882ddde --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/ConnectorConfigValidator.java @@ -0,0 +1,14 @@ +package com.snowflake.kafka.connector; + +import java.util.Map; + +public interface ConnectorConfigValidator { + + /** + * Validate input configuration + * + * @param config configuration Map + * @return connector name + */ + String validateConfig(Map config); +} diff --git a/src/main/java/com/snowflake/kafka/connector/DefaultConnectorConfigValidator.java b/src/main/java/com/snowflake/kafka/connector/DefaultConnectorConfigValidator.java new file mode 100644 index 000000000..c40fb4277 --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/DefaultConnectorConfigValidator.java @@ -0,0 +1,280 @@ +package com.snowflake.kafka.connector; + +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG; +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.BehaviorOnNullValues.VALIDATOR; +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.JMX_OPT; +import static com.snowflake.kafka.connector.Utils.*; + +import com.google.common.collect.ImmutableMap; +import com.snowflake.kafka.connector.internal.BufferThreshold; +import com.snowflake.kafka.connector.internal.KCLogger; +import com.snowflake.kafka.connector.internal.SnowflakeErrors; +import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig; +import com.snowflake.kafka.connector.internal.streaming.StreamingConfigValidator; +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.common.config.ConfigException; + +public class DefaultConnectorConfigValidator implements ConnectorConfigValidator { + + private static final KCLogger LOGGER = + new KCLogger(DefaultConnectorConfigValidator.class.getName()); + + private final StreamingConfigValidator streamingConfigValidator; + + public DefaultConnectorConfigValidator(StreamingConfigValidator streamingConfigValidator) { + this.streamingConfigValidator = streamingConfigValidator; + } + + /** + * Validate input configuration + * + * @param config configuration Map + * @return connector name + */ + public String validateConfig(Map config) { + Map invalidConfigParams = new HashMap(); + + // define the input parameters / keys in one place as static constants, + // instead of using them directly + // define the thresholds statically in one place as static constants, + // instead of using the values directly + + // unique name of this connector instance + String connectorName = config.getOrDefault(SnowflakeSinkConnectorConfig.NAME, ""); + if (connectorName.isEmpty() || !isValidSnowflakeApplicationName(connectorName)) { + invalidConfigParams.put( + SnowflakeSinkConnectorConfig.NAME, + Utils.formatString( + "{} is empty or invalid. It should match Snowflake object identifier syntax. Please" + + " see the documentation.", + SnowflakeSinkConnectorConfig.NAME)); + } + + // If config doesnt have ingestion method defined, default is snowpipe or if snowpipe is + // explicitly passed in as ingestion method + // Below checks are just for snowpipe. + if (isSnowpipeIngestion(config)) { + invalidConfigParams.putAll( + BufferThreshold.validateBufferThreshold(config, IngestionMethodConfig.SNOWPIPE)); + + if (config.containsKey(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG) + && Boolean.parseBoolean( + config.get(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG))) { + invalidConfigParams.put( + SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG, + Utils.formatString( + "Schematization is only available with {}.", + IngestionMethodConfig.SNOWPIPE_STREAMING.toString())); + } + if (config.containsKey(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_CLIENT_LAG)) { + invalidConfigParams.put( + SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_CLIENT_LAG, + Utils.formatString( + "{} is only available with ingestion type: {}.", + SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_CLIENT_LAG, + IngestionMethodConfig.SNOWPIPE_STREAMING.toString())); + } + if (config.containsKey( + SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_MEMORY_LIMIT_IN_BYTES)) { + invalidConfigParams.put( + SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_MEMORY_LIMIT_IN_BYTES, + Utils.formatString( + "{} is only available with ingestion type: {}.", + SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_MEMORY_LIMIT_IN_BYTES, + IngestionMethodConfig.SNOWPIPE_STREAMING.toString())); + } + if (config.containsKey( + SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER)) { + invalidConfigParams.put( + SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, + Utils.formatString( + "{} is only available with ingestion type: {}.", + SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, + IngestionMethodConfig.SNOWPIPE_STREAMING.toString())); + } + if (config.containsKey( + SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_CLIENT_PROVIDER_OVERRIDE_MAP)) { + invalidConfigParams.put( + SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_CLIENT_PROVIDER_OVERRIDE_MAP, + Utils.formatString( + "{} is only available with ingestion type: {}.", + SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_CLIENT_PROVIDER_OVERRIDE_MAP, + IngestionMethodConfig.SNOWPIPE_STREAMING.toString())); + } + if (config.containsKey( + SnowflakeSinkConnectorConfig.ENABLE_STREAMING_CLIENT_OPTIMIZATION_CONFIG) + && Boolean.parseBoolean( + config.get( + SnowflakeSinkConnectorConfig.ENABLE_STREAMING_CLIENT_OPTIMIZATION_CONFIG))) { + invalidConfigParams.put( + SnowflakeSinkConnectorConfig.ENABLE_STREAMING_CLIENT_OPTIMIZATION_CONFIG, + Utils.formatString( + "Streaming client optimization is only available with {}.", + IngestionMethodConfig.SNOWPIPE_STREAMING.toString())); + } + if (config.containsKey( + SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG)) { + invalidConfigParams.put( + SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG, + Utils.formatString( + "Streaming client Channel migration is only available with {}.", + IngestionMethodConfig.SNOWPIPE_STREAMING.toString())); + } + } + + if (config.containsKey(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP) + && parseTopicToTableMap(config.get(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP)) + == null) { + invalidConfigParams.put( + SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP, + Utils.formatString( + "Invalid {} config format: {}", + SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP, + config.get(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP))); + } + + // sanity check + if (!config.containsKey(SnowflakeSinkConnectorConfig.SNOWFLAKE_DATABASE)) { + invalidConfigParams.put( + SnowflakeSinkConnectorConfig.SNOWFLAKE_DATABASE, + Utils.formatString( + "{} cannot be empty.", SnowflakeSinkConnectorConfig.SNOWFLAKE_DATABASE)); + } + + // sanity check + if (!config.containsKey(SnowflakeSinkConnectorConfig.SNOWFLAKE_SCHEMA)) { + invalidConfigParams.put( + SnowflakeSinkConnectorConfig.SNOWFLAKE_SCHEMA, + Utils.formatString("{} cannot be empty.", SnowflakeSinkConnectorConfig.SNOWFLAKE_SCHEMA)); + } + + switch (config + .getOrDefault(SnowflakeSinkConnectorConfig.AUTHENTICATOR_TYPE, Utils.SNOWFLAKE_JWT) + .toLowerCase()) { + // TODO: SNOW-889748 change to enum + case Utils.SNOWFLAKE_JWT: + if (!config.containsKey(SnowflakeSinkConnectorConfig.SNOWFLAKE_PRIVATE_KEY)) { + invalidConfigParams.put( + SnowflakeSinkConnectorConfig.SNOWFLAKE_PRIVATE_KEY, + Utils.formatString( + "{} cannot be empty when using {} authenticator.", + SnowflakeSinkConnectorConfig.SNOWFLAKE_PRIVATE_KEY, + Utils.SNOWFLAKE_JWT)); + } + break; + case Utils.OAUTH: + if (!config.containsKey(SnowflakeSinkConnectorConfig.OAUTH_CLIENT_ID)) { + invalidConfigParams.put( + SnowflakeSinkConnectorConfig.OAUTH_CLIENT_ID, + Utils.formatString( + "{} cannot be empty when using {} authenticator.", + SnowflakeSinkConnectorConfig.OAUTH_CLIENT_ID, + Utils.OAUTH)); + } + if (!config.containsKey(SnowflakeSinkConnectorConfig.OAUTH_CLIENT_SECRET)) { + invalidConfigParams.put( + SnowflakeSinkConnectorConfig.OAUTH_CLIENT_SECRET, + Utils.formatString( + "{} cannot be empty when using {} authenticator.", + SnowflakeSinkConnectorConfig.OAUTH_CLIENT_SECRET, + Utils.OAUTH)); + } + if (!config.containsKey(SnowflakeSinkConnectorConfig.OAUTH_REFRESH_TOKEN)) { + invalidConfigParams.put( + SnowflakeSinkConnectorConfig.OAUTH_REFRESH_TOKEN, + Utils.formatString( + "{} cannot be empty when using {} authenticator.", + SnowflakeSinkConnectorConfig.OAUTH_REFRESH_TOKEN, + Utils.OAUTH)); + } + break; + default: + invalidConfigParams.put( + SnowflakeSinkConnectorConfig.AUTHENTICATOR_TYPE, + Utils.formatString( + "{} should be one of {} or {}.", + SnowflakeSinkConnectorConfig.AUTHENTICATOR_TYPE, + Utils.SNOWFLAKE_JWT, + Utils.OAUTH)); + } + + if (!config.containsKey(SnowflakeSinkConnectorConfig.SNOWFLAKE_USER)) { + invalidConfigParams.put( + SnowflakeSinkConnectorConfig.SNOWFLAKE_USER, + Utils.formatString("{} cannot be empty.", SnowflakeSinkConnectorConfig.SNOWFLAKE_USER)); + } + + if (!config.containsKey(SnowflakeSinkConnectorConfig.SNOWFLAKE_URL)) { + invalidConfigParams.put( + SnowflakeSinkConnectorConfig.SNOWFLAKE_URL, + Utils.formatString("{} cannot be empty.", SnowflakeSinkConnectorConfig.SNOWFLAKE_URL)); + } + // jvm proxy settings + invalidConfigParams.putAll(validateProxySettings(config)); + + // set jdbc logging directory + Utils.setJDBCLoggingDirectory(); + + // validate whether kafka provider config is a valid value + if (config.containsKey(SnowflakeSinkConnectorConfig.PROVIDER_CONFIG)) { + try { + SnowflakeSinkConnectorConfig.KafkaProvider.of( + config.get(SnowflakeSinkConnectorConfig.PROVIDER_CONFIG)); + } catch (IllegalArgumentException exception) { + invalidConfigParams.put( + SnowflakeSinkConnectorConfig.PROVIDER_CONFIG, + Utils.formatString("Kafka provider config error:{}", exception.getMessage())); + } + } + + if (config.containsKey(BEHAVIOR_ON_NULL_VALUES_CONFIG)) { + try { + // This throws an exception if config value is invalid. + VALIDATOR.ensureValid( + BEHAVIOR_ON_NULL_VALUES_CONFIG, config.get(BEHAVIOR_ON_NULL_VALUES_CONFIG)); + } catch (ConfigException exception) { + invalidConfigParams.put( + BEHAVIOR_ON_NULL_VALUES_CONFIG, + Utils.formatString( + "Kafka config:{} error:{}", + BEHAVIOR_ON_NULL_VALUES_CONFIG, + exception.getMessage())); + } + } + + if (config.containsKey(JMX_OPT)) { + if (!(config.get(JMX_OPT).equalsIgnoreCase("true") + || config.get(JMX_OPT).equalsIgnoreCase("false"))) { + invalidConfigParams.put( + JMX_OPT, Utils.formatString("Kafka config:{} should either be true or false", JMX_OPT)); + } + } + + // Check all config values for ingestion method == IngestionMethodConfig.SNOWPIPE_STREAMING + invalidConfigParams.putAll(streamingConfigValidator.validate(config)); + + // logs and throws exception if there are invalid params + handleInvalidParameters(ImmutableMap.copyOf(invalidConfigParams)); + + return connectorName; + } + + private void handleInvalidParameters(ImmutableMap invalidConfigParams) { + // log all invalid params and throw exception + if (!invalidConfigParams.isEmpty()) { + String invalidParamsMessage = ""; + + for (String invalidKey : invalidConfigParams.keySet()) { + String invalidValue = invalidConfigParams.get(invalidKey); + String errorMessage = + Utils.formatString( + "Config value '{}' is invalid. Error message: '{}'", invalidKey, invalidValue); + invalidParamsMessage += errorMessage + "\n"; + } + + LOGGER.error("Invalid config: " + invalidParamsMessage); + throw SnowflakeErrors.ERROR_0001.getException(invalidParamsMessage); + } + } +} diff --git a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnector.java b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnector.java index cf7e8b81f..a619a677a 100644 --- a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnector.java +++ b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnector.java @@ -21,6 +21,7 @@ import com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceFactory; import com.snowflake.kafka.connector.internal.SnowflakeErrors; import com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException; +import com.snowflake.kafka.connector.internal.streaming.DefaultStreamingConfigValidator; import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService; import java.util.ArrayList; import java.util.HashMap; @@ -63,9 +64,8 @@ public class SnowflakeSinkConnector extends SinkConnector { // Using setupComplete to synchronize private boolean setupComplete; - private static final int VALIDATION_NETWORK_TIMEOUT_IN_MS = 45000; - - private static final int VALIDATION_LOGIN_TIMEOUT_IN_SEC = 20; + private final ConnectorConfigValidator connectorConfigValidator = + new DefaultConnectorConfigValidator(new DefaultStreamingConfigValidator()); /** No-Arg constructor. Required by Kafka Connect framework */ public SnowflakeSinkConnector() { @@ -95,7 +95,7 @@ public void start(final Map parsedConfig) { // modify invalid connector name Utils.convertAppName(config); - Utils.validateConfig(config); + connectorConfigValidator.validateConfig(config); // enable mdc logging if needed KCLogger.toggleGlobalMdcLoggingContext( @@ -226,12 +226,7 @@ public Config validate(Map connectorConfigs) { SnowflakeConnectionService testConnection; try { testConnection = - SnowflakeConnectionServiceFactory.builder() - .setNetworkTimeout(VALIDATION_NETWORK_TIMEOUT_IN_MS) - .setLoginTimeOut(VALIDATION_LOGIN_TIMEOUT_IN_SEC) - .setProperties(connectorConfigs) - .build(); - + SnowflakeConnectionServiceFactory.builder().setProperties(connectorConfigs).build(); } catch (SnowflakeKafkaConnectorException e) { LOGGER.error( "Validate: Error connecting to snowflake:{}, errorCode:{}", e.getMessage(), e.getCode()); diff --git a/src/main/java/com/snowflake/kafka/connector/Utils.java b/src/main/java/com/snowflake/kafka/connector/Utils.java index 7cf9db56d..c793c50dc 100644 --- a/src/main/java/com/snowflake/kafka/connector/Utils.java +++ b/src/main/java/com/snowflake/kafka/connector/Utils.java @@ -16,13 +16,9 @@ */ package com.snowflake.kafka.connector; -import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG; -import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.BehaviorOnNullValues.VALIDATOR; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT; -import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.JMX_OPT; import com.google.common.collect.ImmutableMap; -import com.snowflake.kafka.connector.internal.BufferThreshold; import com.snowflake.kafka.connector.internal.InternalUtils; import com.snowflake.kafka.connector.internal.KCLogger; import com.snowflake.kafka.connector.internal.OAuthConstants; @@ -30,7 +26,6 @@ import com.snowflake.kafka.connector.internal.SnowflakeInternalOperations; import com.snowflake.kafka.connector.internal.SnowflakeURL; import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig; -import com.snowflake.kafka.connector.internal.streaming.StreamingUtils; import java.io.BufferedReader; import java.io.File; import java.io.InputStream; @@ -64,7 +59,6 @@ import net.snowflake.client.jdbc.internal.google.gson.JsonObject; import net.snowflake.client.jdbc.internal.google.gson.JsonParser; import org.apache.kafka.common.config.Config; -import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigValue; /** Various arbitrary helper functions */ @@ -391,241 +385,6 @@ static boolean isValidSnowflakeTableName(String tableName) { return tableName.matches("^([_a-zA-Z]{1}[_$a-zA-Z0-9]+\\.){0,2}[_a-zA-Z]{1}[_$a-zA-Z0-9]+$"); } - /** - * Validate input configuration - * - * @param config configuration Map - * @return connector name - */ - static String validateConfig(Map config) { - Map invalidConfigParams = new HashMap(); // verify all config - - // define the input parameters / keys in one place as static constants, - // instead of using them directly - // define the thresholds statically in one place as static constants, - // instead of using the values directly - - // unique name of this connector instance - String connectorName = config.getOrDefault(SnowflakeSinkConnectorConfig.NAME, ""); - if (connectorName.isEmpty() || !isValidSnowflakeApplicationName(connectorName)) { - invalidConfigParams.put( - SnowflakeSinkConnectorConfig.NAME, - Utils.formatString( - "{} is empty or invalid. It should match Snowflake object identifier syntax. Please" - + " see the documentation.", - SnowflakeSinkConnectorConfig.NAME)); - } - - // If config doesnt have ingestion method defined, default is snowpipe or if snowpipe is - // explicitly passed in as ingestion method - // Below checks are just for snowpipe. - if (isSnowpipeIngestion(config)) { - invalidConfigParams.putAll( - BufferThreshold.validateBufferThreshold(config, IngestionMethodConfig.SNOWPIPE)); - - if (config.containsKey(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG) - && Boolean.parseBoolean( - config.get(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG))) { - invalidConfigParams.put( - SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG, - Utils.formatString( - "Schematization is only available with {}.", - IngestionMethodConfig.SNOWPIPE_STREAMING.toString())); - } - if (config.containsKey(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_CLIENT_LAG)) { - invalidConfigParams.put( - SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_CLIENT_LAG, - Utils.formatString( - "{} is only available with ingestion type: {}.", - SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_CLIENT_LAG, - IngestionMethodConfig.SNOWPIPE_STREAMING.toString())); - } - if (config.containsKey( - SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_MEMORY_LIMIT_IN_BYTES)) { - invalidConfigParams.put( - SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_MEMORY_LIMIT_IN_BYTES, - Utils.formatString( - "{} is only available with ingestion type: {}.", - SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_MEMORY_LIMIT_IN_BYTES, - IngestionMethodConfig.SNOWPIPE_STREAMING.toString())); - } - if (config.containsKey( - SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER)) { - invalidConfigParams.put( - SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, - Utils.formatString( - "{} is only available with ingestion type: {}.", - SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, - IngestionMethodConfig.SNOWPIPE_STREAMING.toString())); - } - if (config.containsKey( - SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_CLIENT_PROVIDER_OVERRIDE_MAP)) { - invalidConfigParams.put( - SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_CLIENT_PROVIDER_OVERRIDE_MAP, - Utils.formatString( - "{} is only available with ingestion type: {}.", - SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_CLIENT_PROVIDER_OVERRIDE_MAP, - IngestionMethodConfig.SNOWPIPE_STREAMING.toString())); - } - if (config.containsKey( - SnowflakeSinkConnectorConfig.ENABLE_STREAMING_CLIENT_OPTIMIZATION_CONFIG) - && Boolean.parseBoolean( - config.get( - SnowflakeSinkConnectorConfig.ENABLE_STREAMING_CLIENT_OPTIMIZATION_CONFIG))) { - invalidConfigParams.put( - SnowflakeSinkConnectorConfig.ENABLE_STREAMING_CLIENT_OPTIMIZATION_CONFIG, - Utils.formatString( - "Streaming client optimization is only available with {}.", - IngestionMethodConfig.SNOWPIPE_STREAMING.toString())); - } - if (config.containsKey( - SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG)) { - invalidConfigParams.put( - SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG, - Utils.formatString( - "Streaming client Channel migration is only available with {}.", - IngestionMethodConfig.SNOWPIPE_STREAMING.toString())); - } - } - - if (config.containsKey(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP) - && !config.get(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP).isEmpty() - && parseTopicToTableMap(config.get(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP)) - == null) { - invalidConfigParams.put( - SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP, - Utils.formatString( - "Invalid {} config format: {}", - SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP, - config.get(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP))); - } - - // sanity check - if (!config.containsKey(SnowflakeSinkConnectorConfig.SNOWFLAKE_DATABASE)) { - invalidConfigParams.put( - SnowflakeSinkConnectorConfig.SNOWFLAKE_DATABASE, - Utils.formatString( - "{} cannot be empty.", SnowflakeSinkConnectorConfig.SNOWFLAKE_DATABASE)); - } - - // sanity check - if (!config.containsKey(SnowflakeSinkConnectorConfig.SNOWFLAKE_SCHEMA)) { - invalidConfigParams.put( - SnowflakeSinkConnectorConfig.SNOWFLAKE_SCHEMA, - Utils.formatString("{} cannot be empty.", SnowflakeSinkConnectorConfig.SNOWFLAKE_SCHEMA)); - } - - switch (config - .getOrDefault(SnowflakeSinkConnectorConfig.AUTHENTICATOR_TYPE, Utils.SNOWFLAKE_JWT) - .toLowerCase()) { - // TODO: SNOW-889748 change to enum - case Utils.SNOWFLAKE_JWT: - if (!config.containsKey(SnowflakeSinkConnectorConfig.SNOWFLAKE_PRIVATE_KEY)) { - invalidConfigParams.put( - SnowflakeSinkConnectorConfig.SNOWFLAKE_PRIVATE_KEY, - Utils.formatString( - "{} cannot be empty when using {} authenticator.", - SnowflakeSinkConnectorConfig.SNOWFLAKE_PRIVATE_KEY, - Utils.SNOWFLAKE_JWT)); - } - break; - case Utils.OAUTH: - if (!config.containsKey(SnowflakeSinkConnectorConfig.OAUTH_CLIENT_ID)) { - invalidConfigParams.put( - SnowflakeSinkConnectorConfig.OAUTH_CLIENT_ID, - Utils.formatString( - "{} cannot be empty when using {} authenticator.", - SnowflakeSinkConnectorConfig.OAUTH_CLIENT_ID, - Utils.OAUTH)); - } - if (!config.containsKey(SnowflakeSinkConnectorConfig.OAUTH_CLIENT_SECRET)) { - invalidConfigParams.put( - SnowflakeSinkConnectorConfig.OAUTH_CLIENT_SECRET, - Utils.formatString( - "{} cannot be empty when using {} authenticator.", - SnowflakeSinkConnectorConfig.OAUTH_CLIENT_SECRET, - Utils.OAUTH)); - } - if (!config.containsKey(SnowflakeSinkConnectorConfig.OAUTH_REFRESH_TOKEN)) { - invalidConfigParams.put( - SnowflakeSinkConnectorConfig.OAUTH_REFRESH_TOKEN, - Utils.formatString( - "{} cannot be empty when using {} authenticator.", - SnowflakeSinkConnectorConfig.OAUTH_REFRESH_TOKEN, - Utils.OAUTH)); - } - break; - default: - invalidConfigParams.put( - SnowflakeSinkConnectorConfig.AUTHENTICATOR_TYPE, - Utils.formatString( - "{} should be one of {} or {}.", - SnowflakeSinkConnectorConfig.AUTHENTICATOR_TYPE, - Utils.SNOWFLAKE_JWT, - Utils.OAUTH)); - } - - if (!config.containsKey(SnowflakeSinkConnectorConfig.SNOWFLAKE_USER)) { - invalidConfigParams.put( - SnowflakeSinkConnectorConfig.SNOWFLAKE_USER, - Utils.formatString("{} cannot be empty.", SnowflakeSinkConnectorConfig.SNOWFLAKE_USER)); - } - - if (!config.containsKey(SnowflakeSinkConnectorConfig.SNOWFLAKE_URL)) { - invalidConfigParams.put( - SnowflakeSinkConnectorConfig.SNOWFLAKE_URL, - Utils.formatString("{} cannot be empty.", SnowflakeSinkConnectorConfig.SNOWFLAKE_URL)); - } - // jvm proxy settings - invalidConfigParams.putAll(validateProxySettings(config)); - - // set jdbc logging directory - Utils.setJDBCLoggingDirectory(); - - // validate whether kafka provider config is a valid value - if (config.containsKey(SnowflakeSinkConnectorConfig.PROVIDER_CONFIG)) { - try { - SnowflakeSinkConnectorConfig.KafkaProvider.of( - config.get(SnowflakeSinkConnectorConfig.PROVIDER_CONFIG)); - } catch (IllegalArgumentException exception) { - invalidConfigParams.put( - SnowflakeSinkConnectorConfig.PROVIDER_CONFIG, - Utils.formatString("Kafka provider config error:{}", exception.getMessage())); - } - } - - if (config.containsKey(BEHAVIOR_ON_NULL_VALUES_CONFIG)) { - try { - // This throws an exception if config value is invalid. - VALIDATOR.ensureValid( - BEHAVIOR_ON_NULL_VALUES_CONFIG, config.get(BEHAVIOR_ON_NULL_VALUES_CONFIG)); - } catch (ConfigException exception) { - invalidConfigParams.put( - BEHAVIOR_ON_NULL_VALUES_CONFIG, - Utils.formatString( - "Kafka config:{} error:{}", - BEHAVIOR_ON_NULL_VALUES_CONFIG, - exception.getMessage())); - } - } - - if (config.containsKey(JMX_OPT)) { - if (!(config.get(JMX_OPT).equalsIgnoreCase("true") - || config.get(JMX_OPT).equalsIgnoreCase("false"))) { - invalidConfigParams.put( - JMX_OPT, Utils.formatString("Kafka config:{} should either be true or false", JMX_OPT)); - } - } - - // Check all config values for ingestion method == IngestionMethodConfig.SNOWPIPE_STREAMING - invalidConfigParams.putAll(StreamingUtils.validateStreamingSnowpipeConfig(config)); - - // logs and throws exception if there are invalid params - handleInvalidParameters(ImmutableMap.copyOf(invalidConfigParams)); - - return connectorName; - } - /** * Returns whether INGESTION_METHOD_OPT is set to SNOWPIPE. If INGESTION_METHOD_OPT not specified, * returns true as default. @@ -1014,22 +773,4 @@ public static String getExceptionMessage(String customMessage, Exception ex) { return formatString(GET_EXCEPTION_FORMAT, customMessage, message, cause); } - - private static void handleInvalidParameters(ImmutableMap invalidConfigParams) { - // log all invalid params and throw exception - if (!invalidConfigParams.isEmpty()) { - String invalidParamsMessage = ""; - - for (String invalidKey : invalidConfigParams.keySet()) { - String invalidValue = invalidConfigParams.get(invalidKey); - String errorMessage = - Utils.formatString( - "Config value '{}' is invalid. Error message: '{}'", invalidKey, invalidValue); - invalidParamsMessage += errorMessage + "\n"; - } - - LOGGER.error("Invalid config: " + invalidParamsMessage); - throw SnowflakeErrors.ERROR_0001.getException(invalidParamsMessage); - } - } } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/DefaultStreamingConfigValidator.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/DefaultStreamingConfigValidator.java new file mode 100644 index 000000000..027a3d775 --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/DefaultStreamingConfigValidator.java @@ -0,0 +1,220 @@ +package com.snowflake.kafka.connector.internal.streaming; + +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.*; +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; +import com.snowflake.kafka.connector.Utils; +import com.snowflake.kafka.connector.internal.BufferThreshold; +import com.snowflake.kafka.connector.internal.KCLogger; +import com.snowflake.kafka.connector.internal.parameters.InternalBufferParameters; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.kafka.common.config.ConfigException; + +public class DefaultStreamingConfigValidator implements StreamingConfigValidator { + + private final SingleBufferConfigValidator singleBufferConfigValidator = + new SingleBufferConfigValidator(); + private final DoubleBufferConfigValidator doubleBufferConfigValidator = + new DoubleBufferConfigValidator(); + + private static final Set DISALLOWED_CONVERTERS_STREAMING = CUSTOM_SNOWFLAKE_CONVERTERS; + private static final String STRING_CONVERTER_KEYWORD = "StringConverter"; + private static final String BYTE_ARRAY_CONVERTER_KEYWORD = "ByteArrayConverter"; + + @Override + public ImmutableMap validate(Map inputConfig) { + Map invalidParams = new HashMap<>(); + + // For snowpipe_streaming, role should be non empty + if (inputConfig.containsKey(INGESTION_METHOD_OPT)) { + if (InternalBufferParameters.isSingleBufferEnabled(inputConfig)) { + singleBufferConfigValidator.logDoubleBufferingParametersWarning(inputConfig); + } else { + invalidParams.putAll(doubleBufferConfigValidator.validate(inputConfig)); + } + + try { + // This throws an exception if config value is invalid. + IngestionMethodConfig.VALIDATOR.ensureValid( + INGESTION_METHOD_OPT, inputConfig.get(INGESTION_METHOD_OPT)); + if (inputConfig + .get(INGESTION_METHOD_OPT) + .equalsIgnoreCase(IngestionMethodConfig.SNOWPIPE_STREAMING.toString())) { + invalidParams.putAll(validateConfigConverters(KEY_CONVERTER_CONFIG_FIELD, inputConfig)); + invalidParams.putAll(validateConfigConverters(VALUE_CONVERTER_CONFIG_FIELD, inputConfig)); + + // Validate if snowflake role is present + if (!inputConfig.containsKey(Utils.SF_ROLE) + || Strings.isNullOrEmpty(inputConfig.get(Utils.SF_ROLE))) { + invalidParams.put( + Utils.SF_ROLE, + Utils.formatString( + "Config:{} should be present if ingestionMethod is:{}", + Utils.SF_ROLE, + inputConfig.get(INGESTION_METHOD_OPT))); + } + + /** + * Only checking in streaming since we are utilizing the values before we send it to + * DLQ/output to log file + */ + if (inputConfig.containsKey(ERRORS_TOLERANCE_CONFIG)) { + SnowflakeSinkConnectorConfig.ErrorTolerance.VALIDATOR.ensureValid( + ERRORS_TOLERANCE_CONFIG, inputConfig.get(ERRORS_TOLERANCE_CONFIG)); + } + if (inputConfig.containsKey(ERRORS_LOG_ENABLE_CONFIG)) { + BOOLEAN_VALIDATOR.ensureValid( + ERRORS_LOG_ENABLE_CONFIG, inputConfig.get(ERRORS_LOG_ENABLE_CONFIG)); + } + if (inputConfig.containsKey(ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG)) { + BOOLEAN_VALIDATOR.ensureValid( + ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG, + inputConfig.get(ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG)); + } + + if (inputConfig.containsKey(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER)) { + BOOLEAN_VALIDATOR.ensureValid( + SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, + inputConfig.get(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER)); + } + + if (inputConfig.containsKey(SNOWPIPE_STREAMING_MAX_CLIENT_LAG)) { + ensureValidLong(inputConfig, SNOWPIPE_STREAMING_MAX_CLIENT_LAG, invalidParams); + } + + if (inputConfig.containsKey(SNOWPIPE_STREAMING_MAX_MEMORY_LIMIT_IN_BYTES)) { + ensureValidLong( + inputConfig, SNOWPIPE_STREAMING_MAX_MEMORY_LIMIT_IN_BYTES, invalidParams); + } + + // Valid schematization for Snowpipe Streaming + invalidParams.putAll(validateSchematizationConfig(inputConfig)); + } + } catch (ConfigException exception) { + invalidParams.put( + INGESTION_METHOD_OPT, + Utils.formatString( + "Kafka config:{} error:{}", INGESTION_METHOD_OPT, exception.getMessage())); + } + } + + return ImmutableMap.copyOf(invalidParams); + } + + private static void ensureValidLong( + Map inputConfig, String param, Map invalidParams) { + try { + Long.parseLong(inputConfig.get(param)); + } catch (NumberFormatException exception) { + invalidParams.put( + param, + Utils.formatString( + param + " configuration must be a parsable long. Given configuration" + " was: {}", + inputConfig.get(param))); + } + } + + /** + * Validates if the configs are allowed values when schematization is enabled. + * + *

return a map of invalid params + */ + private static Map validateSchematizationConfig(Map inputConfig) { + Map invalidParams = new HashMap<>(); + + if (inputConfig.containsKey(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG)) { + BOOLEAN_VALIDATOR.ensureValid( + SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG, + inputConfig.get(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG)); + + if (Boolean.parseBoolean( + inputConfig.get(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG)) + && inputConfig.get(VALUE_CONVERTER_CONFIG_FIELD) != null + && (inputConfig.get(VALUE_CONVERTER_CONFIG_FIELD).contains(STRING_CONVERTER_KEYWORD) + || inputConfig + .get(VALUE_CONVERTER_CONFIG_FIELD) + .contains(BYTE_ARRAY_CONVERTER_KEYWORD))) { + invalidParams.put( + inputConfig.get(VALUE_CONVERTER_CONFIG_FIELD), + Utils.formatString( + "The value converter:{} is not supported with schematization.", + inputConfig.get(VALUE_CONVERTER_CONFIG_FIELD))); + } + } + + return invalidParams; + } + + /** + * Validates if key and value converters are allowed values if {@link + * IngestionMethodConfig#SNOWPIPE_STREAMING} is used. + * + *

Map if invalid parameters + */ + private static Map validateConfigConverters( + final String inputConfigConverterField, Map inputConfig) { + Map invalidParams = new HashMap<>(); + + if (inputConfig.containsKey(inputConfigConverterField) + && DISALLOWED_CONVERTERS_STREAMING.contains(inputConfig.get(inputConfigConverterField))) { + invalidParams.put( + inputConfigConverterField, + Utils.formatString( + "Config:{} has provided value:{}. If ingestionMethod is:{}, Snowflake Custom" + + " Converters are not allowed. \n" + + "Invalid Converters:{}", + inputConfigConverterField, + inputConfig.get(inputConfigConverterField), + IngestionMethodConfig.SNOWPIPE_STREAMING, + Iterables.toString(DISALLOWED_CONVERTERS_STREAMING))); + } + + return invalidParams; + } + + /** Config validations specific to single buffer architecture */ + private static class SingleBufferConfigValidator { + + private static final KCLogger LOGGER = + new KCLogger(SingleBufferConfigValidator.class.getName()); + + private void logDoubleBufferingParametersWarning(Map config) { + if (InternalBufferParameters.isSingleBufferEnabled(config)) { + List ignoredParameters = + Arrays.asList(BUFFER_FLUSH_TIME_SEC, BUFFER_SIZE_BYTES, BUFFER_COUNT_RECORDS); + ignoredParameters.stream() + .filter(config::containsKey) + .forEach( + param -> + LOGGER.warn( + "{} parameter value is ignored because internal buffer is disabled. To go" + + " back to previous behaviour set " + + SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER + + " to false", + param)); + } + } + } + + /** Config validations specific to double buffer architecture */ + private static class DoubleBufferConfigValidator { + private Map validate(Map inputConfig) { + Map invalidParams = new HashMap<>(); + + // check if buffer thresholds are within permissible range + invalidParams.putAll( + BufferThreshold.validateBufferThreshold( + inputConfig, IngestionMethodConfig.SNOWPIPE_STREAMING)); + + return invalidParams; + } + } +} diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingConfigValidator.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingConfigValidator.java new file mode 100644 index 000000000..1649cf550 --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingConfigValidator.java @@ -0,0 +1,14 @@ +package com.snowflake.kafka.connector.internal.streaming; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; + +/** Validates connector config for Snowpipe Streaming */ +public interface StreamingConfigValidator { + + /** + * @param inputConfig connector provided by user + * @return map of invalid parameters + */ + ImmutableMap validate(final Map inputConfig); +} diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingUtils.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingUtils.java index 81931c1a6..42b54ff59 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingUtils.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingUtils.java @@ -1,33 +1,18 @@ package com.snowflake.kafka.connector.internal.streaming; -import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.BOOLEAN_VALIDATOR; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.CUSTOM_SNOWFLAKE_CONVERTERS; -import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_LOG_ENABLE_CONFIG; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_TOLERANCE_CONFIG; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ErrorTolerance; -import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT; -import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD; -import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER; -import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_CLIENT_LAG; -import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_MEMORY_LIMIT_IN_BYTES; -import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD; -import com.google.common.base.Strings; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; import com.snowflake.kafka.connector.Utils; -import com.snowflake.kafka.connector.internal.BufferThreshold; import java.time.Duration; -import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.Set; import net.snowflake.ingest.streaming.OffsetTokenVerificationFunction; import net.snowflake.ingest.utils.Constants; -import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.record.DefaultRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -188,161 +173,4 @@ public static boolean logErrors(Map sfConnectorConfig) { public static String getDlqTopicName(Map sfConnectorConfig) { return sfConnectorConfig.getOrDefault(ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, ""); } - - /** - * Validate Streaming snowpipe related config provided by config(customer's config) - * - * @param inputConfig given in connector json file - * @return map of invalid parameters - */ - public static ImmutableMap validateStreamingSnowpipeConfig( - final Map inputConfig) { - Map invalidParams = new HashMap<>(); - - // For snowpipe_streaming, role should be non empty - if (inputConfig.containsKey(INGESTION_METHOD_OPT)) { - try { - // This throws an exception if config value is invalid. - IngestionMethodConfig.VALIDATOR.ensureValid( - INGESTION_METHOD_OPT, inputConfig.get(INGESTION_METHOD_OPT)); - if (inputConfig - .get(INGESTION_METHOD_OPT) - .equalsIgnoreCase(IngestionMethodConfig.SNOWPIPE_STREAMING.toString())) { - - // check if buffer thresholds are within permissible range - invalidParams.putAll( - BufferThreshold.validateBufferThreshold( - inputConfig, IngestionMethodConfig.SNOWPIPE_STREAMING)); - - invalidParams.putAll(validateConfigConverters(KEY_CONVERTER_CONFIG_FIELD, inputConfig)); - invalidParams.putAll(validateConfigConverters(VALUE_CONVERTER_CONFIG_FIELD, inputConfig)); - - // Validate if snowflake role is present - if (!inputConfig.containsKey(Utils.SF_ROLE) - || Strings.isNullOrEmpty(inputConfig.get(Utils.SF_ROLE))) { - invalidParams.put( - Utils.SF_ROLE, - Utils.formatString( - "Config:{} should be present if ingestionMethod is:{}", - Utils.SF_ROLE, - inputConfig.get(INGESTION_METHOD_OPT))); - } - - /** - * Only checking in streaming since we are utilizing the values before we send it to - * DLQ/output to log file - */ - if (inputConfig.containsKey(ERRORS_TOLERANCE_CONFIG)) { - SnowflakeSinkConnectorConfig.ErrorTolerance.VALIDATOR.ensureValid( - ERRORS_TOLERANCE_CONFIG, inputConfig.get(ERRORS_TOLERANCE_CONFIG)); - } - if (inputConfig.containsKey(ERRORS_LOG_ENABLE_CONFIG)) { - BOOLEAN_VALIDATOR.ensureValid( - ERRORS_LOG_ENABLE_CONFIG, inputConfig.get(ERRORS_LOG_ENABLE_CONFIG)); - } - if (inputConfig.containsKey(ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG)) { - BOOLEAN_VALIDATOR.ensureValid( - ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG, - inputConfig.get(ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG)); - } - - if (inputConfig.containsKey(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER)) { - BOOLEAN_VALIDATOR.ensureValid( - SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, - inputConfig.get(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER)); - } - - if (inputConfig.containsKey(SNOWPIPE_STREAMING_MAX_CLIENT_LAG)) { - ensureValidLong(inputConfig, SNOWPIPE_STREAMING_MAX_CLIENT_LAG, invalidParams); - } - - if (inputConfig.containsKey(SNOWPIPE_STREAMING_MAX_MEMORY_LIMIT_IN_BYTES)) { - ensureValidLong( - inputConfig, SNOWPIPE_STREAMING_MAX_MEMORY_LIMIT_IN_BYTES, invalidParams); - } - - // Valid schematization for Snowpipe Streaming - invalidParams.putAll(validateSchematizationConfig(inputConfig)); - } - } catch (ConfigException exception) { - invalidParams.put( - INGESTION_METHOD_OPT, - Utils.formatString( - "Kafka config:{} error:{}", INGESTION_METHOD_OPT, exception.getMessage())); - } - } - - return ImmutableMap.copyOf(invalidParams); - } - - private static void ensureValidLong( - Map inputConfig, String param, Map invalidParams) { - try { - Long.parseLong(inputConfig.get(param)); - } catch (NumberFormatException exception) { - invalidParams.put( - param, - Utils.formatString( - param + " configuration must be a parsable long. Given configuration" + " was: {}", - inputConfig.get(param))); - } - } - - /** - * Validates if key and value converters are allowed values if {@link - * IngestionMethodConfig#SNOWPIPE_STREAMING} is used. - * - *

Map if invalid parameters - */ - private static Map validateConfigConverters( - final String inputConfigConverterField, Map inputConfig) { - Map invalidParams = new HashMap<>(); - - if (inputConfig.containsKey(inputConfigConverterField) - && DISALLOWED_CONVERTERS_STREAMING.contains(inputConfig.get(inputConfigConverterField))) { - invalidParams.put( - inputConfigConverterField, - Utils.formatString( - "Config:{} has provided value:{}. If ingestionMethod is:{}, Snowflake Custom" - + " Converters are not allowed. \n" - + "Invalid Converters:{}", - inputConfigConverterField, - inputConfig.get(inputConfigConverterField), - IngestionMethodConfig.SNOWPIPE_STREAMING, - Iterables.toString(DISALLOWED_CONVERTERS_STREAMING))); - } - - return invalidParams; - } - - /** - * Validates if the configs are allowed values when schematization is enabled. - * - *

return a map of invalid params - */ - private static Map validateSchematizationConfig(Map inputConfig) { - Map invalidParams = new HashMap<>(); - - if (inputConfig.containsKey(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG)) { - BOOLEAN_VALIDATOR.ensureValid( - SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG, - inputConfig.get(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG)); - - if (Boolean.parseBoolean( - inputConfig.get(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG)) - && inputConfig.get(VALUE_CONVERTER_CONFIG_FIELD) != null - && (inputConfig.get(VALUE_CONVERTER_CONFIG_FIELD).contains(STRING_CONVERTER_KEYWORD) - || inputConfig - .get(VALUE_CONVERTER_CONFIG_FIELD) - .contains(BYTE_ARRAY_CONVERTER_KEYWORD))) { - invalidParams.put( - inputConfig.get(VALUE_CONVERTER_CONFIG_FIELD), - Utils.formatString( - "The value converter:{} is not supported with schematization.", - inputConfig.get(VALUE_CONVERTER_CONFIG_FIELD))); - } - } - - return invalidParams; - } } diff --git a/src/test/java/com/snowflake/kafka/connector/ConnectorConfigValidatorLogsTest.java b/src/test/java/com/snowflake/kafka/connector/ConnectorConfigValidatorLogsTest.java new file mode 100644 index 000000000..5669e927a --- /dev/null +++ b/src/test/java/com/snowflake/kafka/connector/ConnectorConfigValidatorLogsTest.java @@ -0,0 +1,86 @@ +package com.snowflake.kafka.connector; + +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.*; +import static com.snowflake.kafka.connector.internal.TestUtils.getConfig; + +import com.snowflake.kafka.connector.internal.EncryptionUtils; +import com.snowflake.kafka.connector.internal.FIPSTest; +import com.snowflake.kafka.connector.internal.TestUtils; +import com.snowflake.kafka.connector.internal.streaming.DefaultStreamingConfigValidator; +import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.Map; +import org.bouncycastle.operator.OperatorCreationException; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +public class ConnectorConfigValidatorLogsTest { + + private final ConnectorConfigValidator connectorConfigValidator = + new DefaultConnectorConfigValidator(new DefaultStreamingConfigValidator()); + + @Test + public void testRSAPasswordOutput() throws IOException, OperatorCreationException { + // given + String testPasswd = "TestPassword1234!"; + String testKey = FIPSTest.generateAESKey(TestUtils.getPrivateKey(), testPasswd.toCharArray()); + Map testConf = getConfig(); + testConf.remove(SnowflakeSinkConnectorConfig.SNOWFLAKE_PRIVATE_KEY); + testConf.put(SnowflakeSinkConnectorConfig.SNOWFLAKE_PRIVATE_KEY, testKey); + testConf.put(SnowflakeSinkConnectorConfig.SNOWFLAKE_PRIVATE_KEY_PASSPHRASE, testPasswd); + + // when + connectorConfigValidator.validateConfig(testConf); + + // then + EncryptionUtils.parseEncryptedPrivateKey(testKey, testPasswd); + Assertions.assertFalse(logFileContains(testPasswd)); + } + + @ParameterizedTest + @ValueSource(strings = {BUFFER_FLUSH_TIME_SEC, BUFFER_SIZE_BYTES, BUFFER_COUNT_RECORDS}) + public void shouldLogWarningIfBufferingPropertiesDefinedForSingleBuffer(String parameter) + throws IOException { + // given + Map config = TestUtils.getConfForStreaming(); + config.put(INGESTION_METHOD_OPT, IngestionMethodConfig.SNOWPIPE_STREAMING.toString()); + config.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "true"); + config.put(parameter, "1000"); + String expectedLog = + parameter + + " parameter value is ignored because internal buffer is disabled. To go back to" + + " previous behaviour set " + + SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER + + " to false"; + + // when + connectorConfigValidator.validateConfig(config); + + // then + Assertions.assertTrue(logFileContains(expectedLog)); + } + + // Note that sf.log accumulates logs between the consecutive test runs + // That's why it's very hard to test many scenarios without hacks like test ordering and deleting + // log file + private boolean logFileContains(String str) throws IOException { + String fileName = "sf.log"; + File log = new File(fileName); + FileReader fileReader = new FileReader(log); + BufferedReader buffer = new BufferedReader(fileReader); + String line; + while ((line = buffer.readLine()) != null) { + if (line.contains(str)) { + return true; + } + } + buffer.close(); + fileReader.close(); + return false; + } +} diff --git a/src/test/java/com/snowflake/kafka/connector/ConnectorConfigTest.java b/src/test/java/com/snowflake/kafka/connector/ConnectorConfigValidatorTest.java similarity index 86% rename from src/test/java/com/snowflake/kafka/connector/ConnectorConfigTest.java rename to src/test/java/com/snowflake/kafka/connector/ConnectorConfigValidatorTest.java index 52068d571..c27277c09 100644 --- a/src/test/java/com/snowflake/kafka/connector/ConnectorConfigTest.java +++ b/src/test/java/com/snowflake/kafka/connector/ConnectorConfigValidatorTest.java @@ -19,9 +19,9 @@ import static org.assertj.core.api.Assertions.*; import static org.junit.Assert.assertEquals; -import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.TopicToTableValidator; import com.snowflake.kafka.connector.internal.SnowflakeErrors; import com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException; +import com.snowflake.kafka.connector.internal.streaming.DefaultStreamingConfigValidator; import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig; import com.snowflake.kafka.connector.internal.streaming.StreamingUtils; import java.util.ArrayList; @@ -29,8 +29,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; - -import org.apache.kafka.common.config.ConfigException; import java.util.stream.Stream; import org.apache.kafka.connect.storage.Converter; import org.junit.Assert; @@ -40,9 +38,11 @@ import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.MethodSource; -import static org.junit.Assert.assertThrows; +public class ConnectorConfigValidatorTest { + + private final ConnectorConfigValidator connectorConfigValidator = + new DefaultConnectorConfigValidator(new DefaultStreamingConfigValidator()); -public class ConnectorConfigTest { // subset of valid community converters public static final List COMMUNITY_CONVERTER_SUBSET = Arrays.asList( @@ -64,7 +64,7 @@ private static Stream customSnowflakeConverters() { @Test public void testConfig() { Map config = getConfig(); - Utils.validateConfig(config); + connectorConfigValidator.validateConfig(config); } @Test @@ -74,14 +74,14 @@ public void testConfig_ConvertedInvalidAppName() { Utils.convertAppName(config); - Utils.validateConfig(config); + connectorConfigValidator.validateConfig(config); } @Test public void testEmptyFlushTime() { Map config = getConfig(); config.remove(SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC); } @@ -92,7 +92,7 @@ public void testFlushTimeSmall() { config.put( SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC, (SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC_MIN - 1) + ""); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC); } @@ -101,7 +101,7 @@ public void testFlushTimeSmall() { public void testFlushTimeNotNumber() { Map config = getConfig(); config.put(SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC, "fdas"); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC); } @@ -120,7 +120,7 @@ public void testFlushTimeNotNumber() { public void shouldThrowExForEmptyProperty(String prop) { Map config = getConfig(); config.remove(prop); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(prop); } @@ -130,14 +130,14 @@ public void testCorrectProxyHost() { Map config = getConfig(); config.put(JVM_PROXY_HOST, "127.0.0.1"); config.put(JVM_PROXY_PORT, "3128"); - Utils.validateConfig(config); + connectorConfigValidator.validateConfig(config); } @Test public void testEmptyPort() { Map config = getConfig(); config.put(JVM_PROXY_HOST, "127.0.0.1"); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(JVM_PROXY_HOST); } @@ -146,7 +146,7 @@ public void testEmptyPort() { public void testEmptyHost() { Map config = getConfig(); config.put(JVM_PROXY_PORT, "3128"); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(JVM_PROXY_PORT); } @@ -182,7 +182,7 @@ public void testNonProxyHosts() { public void testIllegalTopicMap() { Map config = getConfig(); config.put(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP, "$@#$#@%^$12312"); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP); } @@ -191,7 +191,7 @@ public void testIllegalTopicMap() { public void testIllegalTableName() { Map config = getConfig(); config.put(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP, "topic1:!@#@!#!@"); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .matches( ex -> @@ -204,7 +204,7 @@ public void testIllegalTableName() { public void testDuplicatedTopic() { Map config = getConfig(); config.put(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP, "topic1:table1,topic1:table2"); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .matches( ex -> @@ -217,23 +217,7 @@ public void testDuplicatedTopic() { public void testDuplicatedTableName() { Map config = getConfig(); config.put(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP, "topic1:table1,topic2:table1"); - Utils.validateConfig(config); - } - - @Test - public void testTopicToTableValidatorOnlyThrowsConfigException() { - assertThrows(ConfigException.class, () -> { - new TopicToTableValidator().ensureValid(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP, - "$@#$#@%^$12312"); - }); - assertThrows(ConfigException.class, () -> { - new TopicToTableValidator().ensureValid(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP, - "topic1:!@#@!#!@"); - }); - assertThrows(ConfigException.class, () -> { - new TopicToTableValidator().ensureValid(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP, - "topic1:table1,topic1:table2"); - }); + connectorConfigValidator.validateConfig(config); } @Test @@ -241,14 +225,14 @@ public void testNameMapCovered() { Map config = getConfig(); config.put(SnowflakeSinkConnectorConfig.TOPICS, "!@#,$%^,test"); config.put(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP, "!@#:table1,$%^:table2"); - Utils.validateConfig(config); + connectorConfigValidator.validateConfig(config); } @Test public void testBufferSizeRange() { Map config = getConfig(); config.put(BUFFER_SIZE_BYTES, SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES_MIN - 1 + ""); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(BUFFER_SIZE_BYTES); } @@ -257,7 +241,7 @@ public void testBufferSizeRange() { public void testBufferSizeValue() { Map config = getConfig(); config.put(BUFFER_SIZE_BYTES, "afdsa"); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(BUFFER_SIZE_BYTES); } @@ -266,7 +250,7 @@ public void testBufferSizeValue() { public void testEmptyBufferCountNegative() { Map config = getConfig(); config.put(BUFFER_COUNT_RECORDS, "-1"); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(BUFFER_COUNT_RECORDS); } @@ -275,7 +259,7 @@ public void testEmptyBufferCountNegative() { public void testBufferCountValue() { Map config = getConfig(); config.put(BUFFER_COUNT_RECORDS, "adssadsa"); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(BUFFER_COUNT_RECORDS); } @@ -284,34 +268,34 @@ public void testBufferCountValue() { public void testKafkaProviderConfigValue_valid_null() { Map config = getConfig(); config.put(SnowflakeSinkConnectorConfig.PROVIDER_CONFIG, null); - Utils.validateConfig(config); + connectorConfigValidator.validateConfig(config); } @Test public void testKafkaProviderConfigValue_valid_empty() { Map config = getConfig(); config.put(SnowflakeSinkConnectorConfig.PROVIDER_CONFIG, ""); - Utils.validateConfig(config); + connectorConfigValidator.validateConfig(config); } @Test public void testKafkaProviderConfigValue_valid_provider() { Map config = getConfig(); config.put(SnowflakeSinkConnectorConfig.PROVIDER_CONFIG, "self_hosted"); - Utils.validateConfig(config); + connectorConfigValidator.validateConfig(config); config.put(SnowflakeSinkConnectorConfig.PROVIDER_CONFIG, "CONFLUENT"); - Utils.validateConfig(config); + connectorConfigValidator.validateConfig(config); config.put(SnowflakeSinkConnectorConfig.PROVIDER_CONFIG, "UNKNOWN"); - Utils.validateConfig(config); + connectorConfigValidator.validateConfig(config); } @Test public void testKafkaProviderConfigValue_invalid_value() { Map config = getConfig(); config.put(SnowflakeSinkConnectorConfig.PROVIDER_CONFIG, "Something_which_is_not_supported"); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(SnowflakeSinkConnectorConfig.PROVIDER_CONFIG); } @@ -320,17 +304,17 @@ public void testKafkaProviderConfigValue_invalid_value() { public void testBehaviorOnNullValuesConfig_valid_value() { Map config = getConfig(); config.put(SnowflakeSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG, "IGNORE"); - Utils.validateConfig(config); + connectorConfigValidator.validateConfig(config); config.put(SnowflakeSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG, "DEFAULT"); - Utils.validateConfig(config); + connectorConfigValidator.validateConfig(config); } @Test public void testBehaviorOnNullValuesConfig_invalid_value() { Map config = getConfig(); config.put(SnowflakeSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG, "INVALID"); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(SnowflakeSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG); } @@ -339,17 +323,17 @@ public void testBehaviorOnNullValuesConfig_invalid_value() { public void testJMX_valid_value() { Map config = getConfig(); config.put(SnowflakeSinkConnectorConfig.JMX_OPT, "true"); - Utils.validateConfig(config); + connectorConfigValidator.validateConfig(config); config.put(SnowflakeSinkConnectorConfig.JMX_OPT, "False"); - Utils.validateConfig(config); + connectorConfigValidator.validateConfig(config); } @Test public void testJMX_invalid_value() { Map config = getConfig(); config.put(SnowflakeSinkConnectorConfig.JMX_OPT, "INVALID"); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(SnowflakeSinkConnectorConfig.JMX_OPT); } @@ -360,7 +344,7 @@ public void testIngestionTypeConfig_valid_value_snowpipe() { config.put( SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT, IngestionMethodConfig.SNOWPIPE.toString()); - Utils.validateConfig(config); + connectorConfigValidator.validateConfig(config); } @Test @@ -371,7 +355,7 @@ public void testIngestionTypeConfig_valid_value_snowpipe_streaming() { SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT, IngestionMethodConfig.SNOWPIPE_STREAMING.toString()); config.put(Utils.SF_ROLE, "ACCOUNTADMIN"); - Utils.validateConfig(config); + connectorConfigValidator.validateConfig(config); } @Test @@ -382,7 +366,7 @@ public void testIngestionTypeConfig_invalid_snowpipe_streaming() { SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT, IngestionMethodConfig.SNOWPIPE_STREAMING.toString()); config.put(Utils.SF_ROLE, ""); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(Utils.SF_ROLE); } @@ -391,7 +375,7 @@ public void testIngestionTypeConfig_invalid_snowpipe_streaming() { public void testIngestionTypeConfig_invalid_value() { Map config = getConfig(); config.put(SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT, "INVALID_VALUE"); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT); } @@ -442,14 +426,14 @@ public void testErrorTolerance_AllowedValues() { SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT, IngestionMethodConfig.SNOWPIPE_STREAMING.toString()); config.put(Utils.SF_ROLE, "ACCOUNTADMIN"); - Utils.validateConfig(config); + connectorConfigValidator.validateConfig(config); config.put( ERRORS_TOLERANCE_CONFIG, SnowflakeSinkConnectorConfig.ErrorTolerance.NONE.toString()); - Utils.validateConfig(config); + connectorConfigValidator.validateConfig(config); config.put(ERRORS_TOLERANCE_CONFIG, "all"); - Utils.validateConfig(config); + connectorConfigValidator.validateConfig(config); } @Test @@ -460,7 +444,7 @@ public void testErrorTolerance_DisallowedValues() { SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT, IngestionMethodConfig.SNOWPIPE_STREAMING.toString()); config.put(Utils.SF_ROLE, "ACCOUNTADMIN"); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT); } @@ -473,13 +457,13 @@ public void testErrorLog_AllowedValues() { SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT, IngestionMethodConfig.SNOWPIPE_STREAMING.toString()); config.put(Utils.SF_ROLE, "ACCOUNTADMIN"); - Utils.validateConfig(config); + connectorConfigValidator.validateConfig(config); config.put(ERRORS_LOG_ENABLE_CONFIG, "FALSE"); - Utils.validateConfig(config); + connectorConfigValidator.validateConfig(config); config.put(ERRORS_LOG_ENABLE_CONFIG, "TRUE"); - Utils.validateConfig(config); + connectorConfigValidator.validateConfig(config); } @Test @@ -490,7 +474,7 @@ public void testErrorLog_DisallowedValues() { SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT, IngestionMethodConfig.SNOWPIPE_STREAMING.toString()); config.put(Utils.SF_ROLE, "ACCOUNTADMIN"); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(SnowflakeSinkConnectorConfig.ERRORS_LOG_ENABLE_CONFIG); } @@ -504,7 +488,7 @@ public void testStreamingEmptyFlushTime() { IngestionMethodConfig.SNOWPIPE_STREAMING.toString()); config.put(Utils.SF_ROLE, "ACCOUNTADMIN"); config.remove(SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC); } @@ -519,7 +503,7 @@ public void testStreamingFlushTimeSmall() { config.put( SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC, (StreamingUtils.STREAMING_BUFFER_FLUSH_TIME_MINIMUM_SEC - 1) + ""); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC); } @@ -532,7 +516,7 @@ public void testStreamingFlushTimeNotNumber() { IngestionMethodConfig.SNOWPIPE_STREAMING.toString()); config.put(Utils.SF_ROLE, "ACCOUNTADMIN"); config.put(SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC, "fdas"); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC); } @@ -545,7 +529,7 @@ public void testStreamingEmptyBufferSize() { IngestionMethodConfig.SNOWPIPE_STREAMING.toString()); config.put(Utils.SF_ROLE, "ACCOUNTADMIN"); config.remove(BUFFER_SIZE_BYTES); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES); } @@ -558,7 +542,7 @@ public void testStreamingEmptyBufferCount() { IngestionMethodConfig.SNOWPIPE_STREAMING.toString()); config.put(Utils.SF_ROLE, "ACCOUNTADMIN"); config.remove(BUFFER_COUNT_RECORDS); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(BUFFER_COUNT_RECORDS); } @@ -571,7 +555,7 @@ public void testStreamingBufferCountNegative() { IngestionMethodConfig.SNOWPIPE_STREAMING.toString()); config.put(Utils.SF_ROLE, "ACCOUNTADMIN"); config.put(BUFFER_COUNT_RECORDS, "-1"); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(BUFFER_COUNT_RECORDS); } @@ -584,7 +568,7 @@ public void testStreamingBufferCountValue() { IngestionMethodConfig.SNOWPIPE_STREAMING.toString()); config.put(Utils.SF_ROLE, "ACCOUNTADMIN"); config.put(BUFFER_COUNT_RECORDS, "adssadsa"); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(BUFFER_COUNT_RECORDS); } @@ -602,7 +586,7 @@ public void testValidKeyAndValueConvertersForStreamingSnowpipe() { config.put( SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD, converter.getClass().toString()); - Utils.validateConfig(config); + connectorConfigValidator.validateConfig(config); }); COMMUNITY_CONVERTER_SUBSET.forEach( @@ -610,7 +594,7 @@ public void testValidKeyAndValueConvertersForStreamingSnowpipe() { config.put( SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD, converter.getClass().toString()); - Utils.validateConfig(config); + connectorConfigValidator.validateConfig(config); }); } @@ -629,7 +613,7 @@ public void testInvalidKeyConvertersForStreamingSnowpipe(Converter converter) { SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD, "org.apache.kafka.connect.storage.StringConverter"); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD); } @@ -649,7 +633,7 @@ public void testInvalidValueConvertersForStreamingSnowpipe(Converter converter) config.put( SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD, converter.getClass().getName()); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD); } @@ -670,7 +654,7 @@ public void shouldNotThrowExceptionForProperStreamingClientPropsValue(String pro config.put(prop, value); // WHEN/THEN - Utils.validateConfig(config); + connectorConfigValidator.validateConfig(config); } @ParameterizedTest @@ -692,7 +676,7 @@ public void shouldThrowExceptionForInvalidStreamingClientPropsValue(String prop, config.put(prop, value); // WHEN/THEN - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(prop); } @@ -713,7 +697,7 @@ public void shouldThrowExceptionForStreamingClientPropsWhenSnowpipeStreamingNotE config.put(prop, value); // WHEN/THEN - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(prop); } @@ -726,7 +710,7 @@ public void testInvalidSchematizationForSnowpipe() { IngestionMethodConfig.SNOWPIPE.toString()); config.put(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG, "true"); config.put(Utils.SF_ROLE, "ACCOUNTADMIN"); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(IngestionMethodConfig.SNOWPIPE_STREAMING.toString()); } @@ -739,7 +723,7 @@ public void testValidSchematizationForStreamingSnowpipe() { IngestionMethodConfig.SNOWPIPE_STREAMING.toString()); config.put(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG, "true"); config.put(Utils.SF_ROLE, "ACCOUNTADMIN"); - Utils.validateConfig(config); + connectorConfigValidator.validateConfig(config); } @Test @@ -753,7 +737,7 @@ public void testSchematizationWithUnsupportedConverter() { SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD, "org.apache.kafka.connect.storage.StringConverter"); config.put(Utils.SF_ROLE, "ACCOUNTADMIN"); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining("org.apache.kafka.connect.storage.StringConverter"); } @@ -769,7 +753,7 @@ public void testDisabledSchematizationWithUnsupportedConverter() { SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD, "org.apache.kafka.connect.storage.StringConverter"); config.put(Utils.SF_ROLE, "ACCOUNTADMIN"); - Utils.validateConfig(config); + connectorConfigValidator.validateConfig(config); } @Test @@ -781,7 +765,7 @@ public void testEnableOptimizeStreamingClientConfig() { config.put(Utils.SF_ROLE, "ACCOUNTADMIN"); config.put(SnowflakeSinkConnectorConfig.ENABLE_STREAMING_CLIENT_OPTIMIZATION_CONFIG, "true"); - Utils.validateConfig(config); + connectorConfigValidator.validateConfig(config); } @Test @@ -792,7 +776,7 @@ public void testInvalidEnableOptimizeStreamingClientConfig() { IngestionMethodConfig.SNOWPIPE.toString()); config.put(SnowflakeSinkConnectorConfig.ENABLE_STREAMING_CLIENT_OPTIMIZATION_CONFIG, "true"); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining( SnowflakeSinkConnectorConfig.ENABLE_STREAMING_CLIENT_OPTIMIZATION_CONFIG); @@ -807,7 +791,7 @@ public void testEnableStreamingChannelMigrationConfig() { config.put(Utils.SF_ROLE, "ACCOUNTADMIN"); config.put(SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG, "true"); - Utils.validateConfig(config); + connectorConfigValidator.validateConfig(config); } @Test @@ -818,7 +802,7 @@ public void testEnableStreamingChannelMigrationConfig_invalidWithSnowpipe() { IngestionMethodConfig.SNOWPIPE.toString()); config.put(SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG, "true"); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining( SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG); @@ -834,7 +818,7 @@ public void testEnableStreamingChannelMigrationConfig_invalidWithSnowpipe() { config.put(Utils.SF_ROLE, "ACCOUNTADMIN"); config.put( SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG, "INVALID"); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining( SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG); @@ -849,7 +833,7 @@ public void testStreamingProviderOverrideConfig_invalidWithSnowpipe() { config.put( SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_CLIENT_PROVIDER_OVERRIDE_MAP, "a:b,c:d"); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining( SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_CLIENT_PROVIDER_OVERRIDE_MAP); @@ -865,13 +849,13 @@ public void testStreamingProviderOverrideConfig_validWithSnowpipeStreaming() { config.put( SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_CLIENT_PROVIDER_OVERRIDE_MAP, "a:b,c:d,e:100,f:true"); - Utils.validateConfig(config); + connectorConfigValidator.validateConfig(config); } @Test public void testInvalidEmptyConfig() { Map config = new HashMap<>(); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(SNOWFLAKE_DATABASE) .hasMessageContaining(SNOWFLAKE_SCHEMA) @@ -913,14 +897,14 @@ public void testOAuthAuthenticator() { config.put(SnowflakeSinkConnectorConfig.OAUTH_CLIENT_ID, "client_id"); config.put(SnowflakeSinkConnectorConfig.OAUTH_CLIENT_SECRET, "client_secret"); config.put(SnowflakeSinkConnectorConfig.OAUTH_REFRESH_TOKEN, "refresh_token"); - Utils.validateConfig(config); + connectorConfigValidator.validateConfig(config); } @Test public void testInvalidAuthenticator() { Map config = getConfig(); config.put(SnowflakeSinkConnectorConfig.AUTHENTICATOR_TYPE, "invalid_authenticator"); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(SnowflakeSinkConnectorConfig.AUTHENTICATOR_TYPE); } @@ -931,7 +915,7 @@ public void testEmptyClientId() { config.put(SnowflakeSinkConnectorConfig.AUTHENTICATOR_TYPE, Utils.OAUTH); config.put(SnowflakeSinkConnectorConfig.OAUTH_CLIENT_SECRET, "client_secret"); config.put(SnowflakeSinkConnectorConfig.OAUTH_REFRESH_TOKEN, "refresh_token"); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(SnowflakeSinkConnectorConfig.OAUTH_CLIENT_ID); } @@ -942,7 +926,7 @@ public void testEmptyClientSecret() { config.put(SnowflakeSinkConnectorConfig.AUTHENTICATOR_TYPE, Utils.OAUTH); config.put(SnowflakeSinkConnectorConfig.OAUTH_CLIENT_ID, "client_id"); config.put(SnowflakeSinkConnectorConfig.OAUTH_REFRESH_TOKEN, "refresh_token"); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(SnowflakeSinkConnectorConfig.OAUTH_CLIENT_SECRET); } @@ -953,7 +937,7 @@ public void testEmptyRefreshToken() { config.put(SnowflakeSinkConnectorConfig.AUTHENTICATOR_TYPE, Utils.OAUTH); config.put(SnowflakeSinkConnectorConfig.OAUTH_CLIENT_ID, "client_id"); config.put(SnowflakeSinkConnectorConfig.OAUTH_CLIENT_SECRET, "client_secret"); - assertThatThrownBy(() -> Utils.validateConfig(config)) + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(SnowflakeSinkConnectorConfig.OAUTH_REFRESH_TOKEN); } @@ -965,18 +949,11 @@ private void invalidConfigRunner(List paramsToRemove) { } try { - Utils.validateConfig(config); + connectorConfigValidator.validateConfig(config); } catch (SnowflakeKafkaConnectorException exception) { for (String configParam : paramsToRemove) { assert exception.getMessage().contains(configParam); } } } - - @Test - public void testEmptyTopic2TableMap() { - Map config = getConfig(); - config.put(SnowflakeSinkConnectorConfig.TOPICS, ""); - Utils.validateConfig(config); - } } diff --git a/src/test/java/com/snowflake/kafka/connector/SecurityTest.java b/src/test/java/com/snowflake/kafka/connector/SecurityTest.java deleted file mode 100644 index e85c4b478..000000000 --- a/src/test/java/com/snowflake/kafka/connector/SecurityTest.java +++ /dev/null @@ -1,46 +0,0 @@ -package com.snowflake.kafka.connector; - -import static com.snowflake.kafka.connector.internal.TestUtils.getConfig; - -import com.snowflake.kafka.connector.internal.EncryptionUtils; -import com.snowflake.kafka.connector.internal.FIPSTest; -import com.snowflake.kafka.connector.internal.TestUtils; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileReader; -import java.io.IOException; -import java.util.Map; -import org.bouncycastle.operator.OperatorCreationException; -import org.junit.Test; - -public class SecurityTest { - - @Test - public void testRSAPasswordOutput() throws IOException, OperatorCreationException { - String testPasswd = "TestPassword1234!"; - String testKey = FIPSTest.generateAESKey(TestUtils.getPrivateKey(), testPasswd.toCharArray()); - Map testConf = getConfig(); - testConf.remove(SnowflakeSinkConnectorConfig.SNOWFLAKE_PRIVATE_KEY); - testConf.put(SnowflakeSinkConnectorConfig.SNOWFLAKE_PRIVATE_KEY, testKey); - testConf.put(SnowflakeSinkConnectorConfig.SNOWFLAKE_PRIVATE_KEY_PASSPHRASE, testPasswd); - Utils.validateConfig(testConf); - EncryptionUtils.parseEncryptedPrivateKey(testKey, testPasswd); - assert !searchInLogFile(testPasswd); - } - - static boolean searchInLogFile(String str) throws IOException { - String fileName = "sf.log"; - File log = new File(fileName); - FileReader fileReader = new FileReader(log); - BufferedReader buffer = new BufferedReader(fileReader); - String line; - while ((line = buffer.readLine()) != null) { - if (line.contains(str)) { - return true; - } - } - buffer.close(); - fileReader.close(); - return false; - } -} diff --git a/src/test/java/com/snowflake/kafka/connector/internal/TombstoneRecordIngestionIT.java b/src/test/java/com/snowflake/kafka/connector/internal/TombstoneRecordIngestionIT.java index db962cc3f..71401d067 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/TombstoneRecordIngestionIT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/TombstoneRecordIngestionIT.java @@ -1,7 +1,7 @@ package com.snowflake.kafka.connector.internal; -import static com.snowflake.kafka.connector.ConnectorConfigTest.COMMUNITY_CONVERTER_SUBSET; -import static com.snowflake.kafka.connector.ConnectorConfigTest.CUSTOM_SNOWFLAKE_CONVERTERS; +import static com.snowflake.kafka.connector.ConnectorConfigValidatorTest.COMMUNITY_CONVERTER_SUBSET; +import static com.snowflake.kafka.connector.ConnectorConfigValidatorTest.CUSTOM_SNOWFLAKE_CONVERTERS; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets;