-
Notifications
You must be signed in to change notification settings - Fork 98
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
SNOW-1658905 Create snowflake.streaming.iceberg.enabled parameter
- Loading branch information
1 parent
cea8f1f
commit 4de5cff
Showing
11 changed files
with
258 additions
and
30 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
43 changes: 43 additions & 0 deletions
43
src/main/java/com/snowflake/kafka/connector/config/IcebergConfigValidator.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
package com.snowflake.kafka.connector.config; | ||
|
||
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.*; | ||
import static com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig.SNOWPIPE_STREAMING; | ||
|
||
import com.google.common.collect.ImmutableMap; | ||
import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; | ||
import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig; | ||
import com.snowflake.kafka.connector.internal.streaming.StreamingConfigValidator; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
|
||
/** Validates dependencies between parameters in Iceberg mode. */ | ||
public class IcebergConfigValidator implements StreamingConfigValidator { | ||
|
||
private static final String NO_SCHEMATIZATION_ERROR_MESSAGE = | ||
"Ingestion to Iceberg table requires " + ENABLE_SCHEMATIZATION_CONFIG + " set to true"; | ||
private static final String INCOMPATIBLE_INGESTION_METHOD = | ||
"Ingestion to Iceberg table is supported only for " + SNOWPIPE_STREAMING; | ||
|
||
@Override | ||
public ImmutableMap<String, String> validate(Map<String, String> inputConfig) { | ||
boolean isIcebergEnabled = Boolean.parseBoolean(inputConfig.get(ICEBERG_ENABLED)); | ||
Map<String, String> validationErrors = new HashMap<>(); | ||
|
||
if (isIcebergEnabled) { | ||
boolean isSchematizationEnabled = | ||
Boolean.parseBoolean( | ||
inputConfig.get(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG)); | ||
IngestionMethodConfig ingestionMethod = | ||
IngestionMethodConfig.valueOf(inputConfig.get(INGESTION_METHOD_OPT).toUpperCase()); | ||
|
||
if (!isSchematizationEnabled) { | ||
validationErrors.put(ENABLE_SCHEMATIZATION_CONFIG, NO_SCHEMATIZATION_ERROR_MESSAGE); | ||
} | ||
if (ingestionMethod != SNOWPIPE_STREAMING) { | ||
validationErrors.put(INGESTION_METHOD_OPT, INCOMPATIBLE_INGESTION_METHOD); | ||
} | ||
} | ||
|
||
return ImmutableMap.copyOf(validationErrors); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
59 changes: 59 additions & 0 deletions
59
src/test/java/com/snowflake/kafka/connector/config/IcebergConfigValidationTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
package com.snowflake.kafka.connector.config; | ||
|
||
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG; | ||
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT; | ||
|
||
import com.google.common.collect.ImmutableMap; | ||
import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig; | ||
import com.snowflake.kafka.connector.internal.streaming.StreamingConfigValidator; | ||
import java.util.Map; | ||
import java.util.stream.Stream; | ||
import org.junit.jupiter.api.Assertions; | ||
import org.junit.jupiter.params.ParameterizedTest; | ||
import org.junit.jupiter.params.provider.Arguments; | ||
import org.junit.jupiter.params.provider.MethodSource; | ||
|
||
public class IcebergConfigValidationTest { | ||
|
||
private static final StreamingConfigValidator validator = new IcebergConfigValidator(); | ||
|
||
@ParameterizedTest | ||
@MethodSource("validConfigs") | ||
public void shouldValidateCorrectConfig(Map<String, String> config) { | ||
// when | ||
ImmutableMap<String, String> invalidParameters = validator.validate(config); | ||
|
||
// then | ||
Assertions.assertTrue(invalidParameters.isEmpty()); | ||
} | ||
|
||
@ParameterizedTest | ||
@MethodSource("invalidConfigs") | ||
public void shouldReturnErrorOnInvalidConfig(Map<String, String> config, String errorKey) { | ||
// when | ||
ImmutableMap<String, String> invalidParameters = validator.validate(config); | ||
|
||
// then | ||
Assertions.assertTrue(invalidParameters.containsKey(errorKey)); | ||
} | ||
|
||
public static Stream<Arguments> validConfigs() { | ||
return Stream.of( | ||
Arguments.of(SnowflakeSinkConnectorConfigBuilder.snowpipeConfig().build()), | ||
Arguments.of(SnowflakeSinkConnectorConfigBuilder.icebergConfig().build())); | ||
} | ||
|
||
public static Stream<Arguments> invalidConfigs() { | ||
return Stream.of( | ||
Arguments.of( | ||
SnowflakeSinkConnectorConfigBuilder.icebergConfig() | ||
.withIngestionMethod(IngestionMethodConfig.SNOWPIPE) | ||
.build(), | ||
INGESTION_METHOD_OPT), | ||
Arguments.of( | ||
SnowflakeSinkConnectorConfigBuilder.icebergConfig() | ||
.withSchematizationEnabled(false) | ||
.build(), | ||
ENABLE_SCHEMATIZATION_CONFIG)); | ||
} | ||
} |
119 changes: 119 additions & 0 deletions
119
src/test/java/com/snowflake/kafka/connector/config/SnowflakeSinkConnectorConfigBuilder.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
package com.snowflake.kafka.connector.config; | ||
|
||
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG; | ||
import static com.snowflake.kafka.connector.Utils.*; | ||
import static com.snowflake.kafka.connector.Utils.SF_DATABASE; | ||
|
||
import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; | ||
import com.snowflake.kafka.connector.Utils; | ||
import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
|
||
/** | ||
* This is a builder class for the connector config For now it returns map. Let's change it to a | ||
* more convenient abstraction when we have it. | ||
*/ | ||
public class SnowflakeSinkConnectorConfigBuilder { | ||
|
||
private final Map<String, String> config = new HashMap<String, String>(); | ||
|
||
private SnowflakeSinkConnectorConfigBuilder() {} | ||
|
||
public static SnowflakeSinkConnectorConfigBuilder snowpipeConfig() { | ||
return commonRequiredFields().withIngestionMethod(IngestionMethodConfig.SNOWPIPE); | ||
} | ||
|
||
public static SnowflakeSinkConnectorConfigBuilder icebergConfig() { | ||
return commonRequiredFields() | ||
.withIcebergEnabled() | ||
.withIngestionMethod(IngestionMethodConfig.SNOWPIPE_STREAMING) | ||
.withSchematizationEnabled(true) | ||
.withRole("role"); | ||
} | ||
|
||
private static SnowflakeSinkConnectorConfigBuilder commonRequiredFields() { | ||
return new SnowflakeSinkConnectorConfigBuilder() | ||
.withName("test") | ||
.withTopics("topic1,topic2") | ||
.withUrl("https://testaccount.snowflake.com:443") | ||
.withSchema("testSchema") | ||
.withDatabase("testDatabase") | ||
.withUser("userName") | ||
.withPrivateKey("fdsfsdfsdfdsfdsrqwrwewrwrew42314424") | ||
.withDefaultBufferConfig(); | ||
} | ||
|
||
public SnowflakeSinkConnectorConfigBuilder withName(String name) { | ||
config.put(Utils.NAME, name); | ||
return this; | ||
} | ||
|
||
public SnowflakeSinkConnectorConfigBuilder withTopics(String topics) { | ||
config.put(SnowflakeSinkConnectorConfig.TOPICS, topics); | ||
return this; | ||
} | ||
|
||
public SnowflakeSinkConnectorConfigBuilder withUrl(String url) { | ||
config.put(SF_URL, url); | ||
return this; | ||
} | ||
|
||
public SnowflakeSinkConnectorConfigBuilder withDatabase(String database) { | ||
config.put(SF_DATABASE, database); | ||
return this; | ||
} | ||
|
||
public SnowflakeSinkConnectorConfigBuilder withSchema(String schema) { | ||
config.put(SF_SCHEMA, schema); | ||
return this; | ||
} | ||
|
||
public SnowflakeSinkConnectorConfigBuilder withUser(String user) { | ||
config.put(SF_USER, user); | ||
return this; | ||
} | ||
|
||
public SnowflakeSinkConnectorConfigBuilder withPrivateKey(String privateKey) { | ||
config.put(Utils.SF_PRIVATE_KEY, privateKey); | ||
return this; | ||
} | ||
|
||
public SnowflakeSinkConnectorConfigBuilder withDefaultBufferConfig() { | ||
config.put( | ||
SnowflakeSinkConnectorConfig.BUFFER_COUNT_RECORDS, | ||
SnowflakeSinkConnectorConfig.BUFFER_COUNT_RECORDS_DEFAULT + ""); | ||
config.put( | ||
SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES, | ||
SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES_DEFAULT + ""); | ||
config.put( | ||
SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC, | ||
SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC_DEFAULT + ""); | ||
return this; | ||
} | ||
|
||
public SnowflakeSinkConnectorConfigBuilder withIngestionMethod( | ||
IngestionMethodConfig ingestionMethod) { | ||
config.put(SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT, ingestionMethod.toString()); | ||
return this; | ||
} | ||
|
||
public SnowflakeSinkConnectorConfigBuilder withIcebergEnabled() { | ||
config.put(SnowflakeSinkConnectorConfig.ICEBERG_ENABLED, "true"); | ||
return this; | ||
} | ||
|
||
public SnowflakeSinkConnectorConfigBuilder withRole(String role) { | ||
config.put(SF_ROLE, role); | ||
return this; | ||
} | ||
|
||
public SnowflakeSinkConnectorConfigBuilder withSchematizationEnabled(boolean enabled) { | ||
config.put(ENABLE_SCHEMATIZATION_CONFIG, Boolean.toString(enabled)); | ||
return this; | ||
} | ||
|
||
public Map<String, String> build() { | ||
return config; | ||
} | ||
} |
Oops, something went wrong.