Skip to content

Commit

Permalink
SNOW-1658905 Create snowflake.streaming.iceberg.enabled parameter (#927)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-mbobowski committed Sep 19, 2024
1 parent 7e93503 commit 6edba99
Show file tree
Hide file tree
Showing 11 changed files with 262 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@ public class DefaultConnectorConfigValidator implements ConnectorConfigValidator
new KCLogger(DefaultConnectorConfigValidator.class.getName());

private final StreamingConfigValidator streamingConfigValidator;
private final StreamingConfigValidator icebergConfigValidator;

public DefaultConnectorConfigValidator(StreamingConfigValidator streamingConfigValidator) {
public DefaultConnectorConfigValidator(
StreamingConfigValidator streamingConfigValidator,
StreamingConfigValidator icebergConfigValidator) {
this.streamingConfigValidator = streamingConfigValidator;
this.icebergConfigValidator = icebergConfigValidator;
}

/**
Expand Down Expand Up @@ -253,6 +257,7 @@ && parseTopicToTableMap(config.get(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MA

// Check all config values for ingestion method == IngestionMethodConfig.SNOWPIPE_STREAMING
invalidConfigParams.putAll(streamingConfigValidator.validate(config));
invalidConfigParams.putAll(icebergConfigValidator.validate(config));

// logs and throws exception if there are invalid params
handleInvalidParameters(ImmutableMap.copyOf(invalidConfigParams));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.snowflake.kafka.connector;

import com.snowflake.kafka.connector.config.ConnectorConfigDefinition;
import com.snowflake.kafka.connector.config.IcebergConfigValidator;
import com.snowflake.kafka.connector.internal.KCLogger;
import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;
import com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceFactory;
Expand Down Expand Up @@ -66,7 +67,8 @@ public class SnowflakeSinkConnector extends SinkConnector {
private boolean setupComplete;

private final ConnectorConfigValidator connectorConfigValidator =
new DefaultConnectorConfigValidator(new DefaultStreamingConfigValidator());
new DefaultConnectorConfigValidator(
new DefaultStreamingConfigValidator(), new IcebergConfigValidator());

/** No-Arg constructor. Required by Kafka Connect framework */
public SnowflakeSinkConnector() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ public class SnowflakeSinkConnectorConfig {
public static final String SNOWPIPE_STREAMING_CLIENT_PROVIDER_OVERRIDE_MAP =
"snowflake.streaming.client.provider.override.map";

// Iceberg
public static final String ICEBERG_ENABLED = "snowflake.streaming.iceberg.enabled";
public static final boolean ICEBERG_ENABLED_DEFAULT_VALUE = false;

// TESTING
public static final String REBALANCING = "snowflake.test.rebalancing";
public static final boolean REBALANCING_DEFAULT = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,13 @@ public static ConfigDef getConfig() {
ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS_DEFAULT,
ConfigDef.Importance.LOW,
"If set to true the Connector will fail its tasks when authorization error from"
+ " Snowflake occurred");
+ " Snowflake occurred")
.define(
ICEBERG_ENABLED,
ConfigDef.Type.BOOLEAN,
ICEBERG_ENABLED_DEFAULT_VALUE,
ConfigDef.Importance.HIGH,
"When set to true the connector will ingest data into the Iceberg table. Check the"
+ " official Snowflake documentation for the prerequisites.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.snowflake.kafka.connector.config;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.*;
import static com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig.SNOWPIPE_STREAMING;

import com.google.common.collect.ImmutableMap;
import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig;
import com.snowflake.kafka.connector.internal.streaming.StreamingConfigValidator;
import java.util.HashMap;
import java.util.Map;

/** Validates dependencies between parameters in Iceberg mode. */
public class IcebergConfigValidator implements StreamingConfigValidator {

private static final String NO_SCHEMATIZATION_ERROR_MESSAGE =
"Ingestion to Iceberg table requires " + ENABLE_SCHEMATIZATION_CONFIG + " set to true";
private static final String INCOMPATIBLE_INGESTION_METHOD =
"Ingestion to Iceberg table is supported only for Snowpipe Streaming";

@Override
public ImmutableMap<String, String> validate(Map<String, String> inputConfig) {
boolean isIcebergEnabled = Boolean.parseBoolean(inputConfig.get(ICEBERG_ENABLED));

if (!isIcebergEnabled) {
return ImmutableMap.of();
}

Map<String, String> validationErrors = new HashMap<>();

boolean isSchematizationEnabled =
Boolean.parseBoolean(
inputConfig.get(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG));
IngestionMethodConfig ingestionMethod =
IngestionMethodConfig.valueOf(inputConfig.get(INGESTION_METHOD_OPT).toUpperCase());

if (!isSchematizationEnabled) {
validationErrors.put(ENABLE_SCHEMATIZATION_CONFIG, NO_SCHEMATIZATION_ERROR_MESSAGE);
}
if (ingestionMethod != SNOWPIPE_STREAMING) {
validationErrors.put(INGESTION_METHOD_OPT, INCOMPATIBLE_INGESTION_METHOD);
}

return ImmutableMap.copyOf(validationErrors);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.Map;

/** Validates connector config for Snowpipe Streaming */
// TODO (separate PR) - rename to ConfigValidator and return an ordinary Map
public interface StreamingConfigValidator {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.*;
import static com.snowflake.kafka.connector.internal.TestUtils.getConfig;

import com.snowflake.kafka.connector.config.IcebergConfigValidator;
import com.snowflake.kafka.connector.internal.EncryptionUtils;
import com.snowflake.kafka.connector.internal.FIPSTest;
import com.snowflake.kafka.connector.internal.TestUtils;
Expand All @@ -22,7 +23,8 @@
public class ConnectorConfigValidatorLogsTest {

private final ConnectorConfigValidator connectorConfigValidator =
new DefaultConnectorConfigValidator(new DefaultStreamingConfigValidator());
new DefaultConnectorConfigValidator(
new DefaultStreamingConfigValidator(), new IcebergConfigValidator());

@Test
public void testRSAPasswordOutput() throws IOException, OperatorCreationException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import static org.assertj.core.api.Assertions.*;
import static org.junit.Assert.assertEquals;

import com.snowflake.kafka.connector.config.IcebergConfigValidator;
import com.snowflake.kafka.connector.config.SnowflakeSinkConnectorConfigBuilder;
import com.snowflake.kafka.connector.internal.SnowflakeErrors;
import com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException;
import com.snowflake.kafka.connector.internal.streaming.DefaultStreamingConfigValidator;
Expand All @@ -41,7 +43,8 @@
public class ConnectorConfigValidatorTest {

private final ConnectorConfigValidator connectorConfigValidator =
new DefaultConnectorConfigValidator(new DefaultStreamingConfigValidator());
new DefaultConnectorConfigValidator(
new DefaultStreamingConfigValidator(), new IcebergConfigValidator());

// subset of valid community converters
public static final List<Converter> COMMUNITY_CONVERTER_SUBSET =
Expand All @@ -63,14 +66,16 @@ private static Stream<Arguments> customSnowflakeConverters() {

@Test
public void testConfig() {
Map<String, String> config = getConfig();
Map<String, String> config = SnowflakeSinkConnectorConfigBuilder.snowpipeConfig().build();
connectorConfigValidator.validateConfig(config);
}

@Test
public void testConfig_ConvertedInvalidAppName() {
Map<String, String> config = getConfig();
config.put(NAME, "testConfig.snowflake-connector");
Map<String, String> config =
SnowflakeSinkConnectorConfigBuilder.snowpipeConfig()
.withName("testConfig.snowflake-connector")
.build();

Utils.convertAppName(config);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.snowflake.kafka.connector.config;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT;

import com.google.common.collect.ImmutableMap;
import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig;
import com.snowflake.kafka.connector.internal.streaming.StreamingConfigValidator;
import java.util.Map;
import java.util.stream.Stream;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public class IcebergConfigValidationTest {

private static final StreamingConfigValidator validator = new IcebergConfigValidator();

@ParameterizedTest
@MethodSource("validConfigs")
public void shouldValidateCorrectConfig(Map<String, String> config) {
// when
ImmutableMap<String, String> invalidParameters = validator.validate(config);

// then
Assertions.assertTrue(invalidParameters.isEmpty());
}

@ParameterizedTest
@MethodSource("invalidConfigs")
public void shouldReturnErrorOnInvalidConfig(Map<String, String> config, String errorKey) {
// when
ImmutableMap<String, String> invalidParameters = validator.validate(config);

// then
Assertions.assertTrue(invalidParameters.containsKey(errorKey));
}

public static Stream<Arguments> validConfigs() {
return Stream.of(
Arguments.of(SnowflakeSinkConnectorConfigBuilder.snowpipeConfig().build()),
Arguments.of(SnowflakeSinkConnectorConfigBuilder.icebergConfig().build()));
}

public static Stream<Arguments> invalidConfigs() {
return Stream.of(
Arguments.of(
SnowflakeSinkConnectorConfigBuilder.icebergConfig()
.withIngestionMethod(IngestionMethodConfig.SNOWPIPE)
.build(),
INGESTION_METHOD_OPT),
Arguments.of(
SnowflakeSinkConnectorConfigBuilder.icebergConfig()
.withSchematizationEnabled(false)
.build(),
ENABLE_SCHEMATIZATION_CONFIG));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package com.snowflake.kafka.connector.config;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG;
import static com.snowflake.kafka.connector.Utils.*;
import static com.snowflake.kafka.connector.Utils.SF_DATABASE;

import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig;
import java.util.HashMap;
import java.util.Map;

/**
* This is a builder class for the connector config. For now it returns map. Let's change it to a
* more convenient abstraction when we have it.
*/
public class SnowflakeSinkConnectorConfigBuilder {

private final Map<String, String> config = new HashMap<String, String>();

private SnowflakeSinkConnectorConfigBuilder() {}

public static SnowflakeSinkConnectorConfigBuilder snowpipeConfig() {
return commonRequiredFields().withIngestionMethod(IngestionMethodConfig.SNOWPIPE);
}

public static SnowflakeSinkConnectorConfigBuilder icebergConfig() {
return commonRequiredFields()
.withIcebergEnabled()
.withIngestionMethod(IngestionMethodConfig.SNOWPIPE_STREAMING)
.withSchematizationEnabled(true)
.withRole("role");
}

private static SnowflakeSinkConnectorConfigBuilder commonRequiredFields() {
return new SnowflakeSinkConnectorConfigBuilder()
.withName("test")
.withTopics("topic1,topic2")
.withUrl("https://testaccount.snowflake.com:443")
.withSchema("testSchema")
.withDatabase("testDatabase")
.withUser("userName")
.withPrivateKey("fdsfsdfsdfdsfdsrqwrwewrwrew42314424")
.withDefaultBufferConfig();
}

public SnowflakeSinkConnectorConfigBuilder withName(String name) {
config.put(Utils.NAME, name);
return this;
}

public SnowflakeSinkConnectorConfigBuilder withTopics(String topics) {
config.put(SnowflakeSinkConnectorConfig.TOPICS, topics);
return this;
}

public SnowflakeSinkConnectorConfigBuilder withUrl(String url) {
config.put(SF_URL, url);
return this;
}

public SnowflakeSinkConnectorConfigBuilder withDatabase(String database) {
config.put(SF_DATABASE, database);
return this;
}

public SnowflakeSinkConnectorConfigBuilder withSchema(String schema) {
config.put(SF_SCHEMA, schema);
return this;
}

public SnowflakeSinkConnectorConfigBuilder withUser(String user) {
config.put(SF_USER, user);
return this;
}

public SnowflakeSinkConnectorConfigBuilder withPrivateKey(String privateKey) {
config.put(Utils.SF_PRIVATE_KEY, privateKey);
return this;
}

public SnowflakeSinkConnectorConfigBuilder withDefaultBufferConfig() {
config.put(
SnowflakeSinkConnectorConfig.BUFFER_COUNT_RECORDS,
SnowflakeSinkConnectorConfig.BUFFER_COUNT_RECORDS_DEFAULT + "");
config.put(
SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES,
SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES_DEFAULT + "");
config.put(
SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC,
SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC_DEFAULT + "");
return this;
}

public SnowflakeSinkConnectorConfigBuilder withIngestionMethod(
IngestionMethodConfig ingestionMethod) {
config.put(SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT, ingestionMethod.toString());
return this;
}

public SnowflakeSinkConnectorConfigBuilder withIcebergEnabled() {
config.put(SnowflakeSinkConnectorConfig.ICEBERG_ENABLED, "true");
return this;
}

public SnowflakeSinkConnectorConfigBuilder withRole(String role) {
config.put(SF_ROLE, role);
return this;
}

public SnowflakeSinkConnectorConfigBuilder withSchematizationEnabled(boolean enabled) {
config.put(ENABLE_SCHEMATIZATION_CONFIG, Boolean.toString(enabled));
return this;
}

public Map<String, String> build() {
return config;
}
}
Loading

0 comments on commit 6edba99

Please sign in to comment.