Skip to content

Commit

Permalink
NO-SNOW Enable single buffering in Snowpipe Streaming by default (#924)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-mbobowski authored Sep 19, 2024
1 parent 39eae66 commit f0e5f23
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public class SnowflakeSinkConnectorConfig {
public static final String SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER =
"snowflake.streaming.enable.single.buffer";

public static final boolean SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER_DEFAULT = false;
public static final boolean SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER_DEFAULT = true;
public static final String SNOWPIPE_STREAMING_MAX_CLIENT_LAG =
"snowflake.streaming.max.client.lag";

Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,6 @@
package com.snowflake.kafka.connector;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.BUFFER_COUNT_RECORDS;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_LOG_ENABLE_CONFIG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_TOLERANCE_CONFIG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.JVM_PROXY_HOST;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.JVM_PROXY_PORT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.NAME;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_DATABASE;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_PRIVATE_KEY;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_SCHEMA;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_URL;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_USER;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_CLIENT_LAG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_MEMORY_LIMIT_IN_BYTES;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.*;
import static com.snowflake.kafka.connector.Utils.HTTP_NON_PROXY_HOSTS;
import static com.snowflake.kafka.connector.internal.TestUtils.getConfig;
import static org.assertj.core.api.Assertions.*;
Expand Down Expand Up @@ -493,6 +480,7 @@ public void testStreamingEmptyFlushTime() {
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
config.remove(SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC);
config.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "false");
assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config))
.isInstanceOf(SnowflakeKafkaConnectorException.class)
.hasMessageContaining(SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC);
Expand All @@ -508,6 +496,7 @@ public void testStreamingFlushTimeSmall() {
config.put(
SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC,
(StreamingUtils.STREAMING_BUFFER_FLUSH_TIME_MINIMUM_SEC - 1) + "");
config.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "false");
assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config))
.isInstanceOf(SnowflakeKafkaConnectorException.class)
.hasMessageContaining(SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC);
Expand All @@ -521,6 +510,7 @@ public void testStreamingFlushTimeNotNumber() {
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
config.put(SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC, "fdas");
config.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "false");
assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config))
.isInstanceOf(SnowflakeKafkaConnectorException.class)
.hasMessageContaining(SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC);
Expand All @@ -533,6 +523,7 @@ public void testStreamingEmptyBufferSize() {
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
config.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "false");
config.remove(BUFFER_SIZE_BYTES);
assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config))
.isInstanceOf(SnowflakeKafkaConnectorException.class)
Expand All @@ -547,6 +538,7 @@ public void testStreamingEmptyBufferCount() {
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
config.remove(BUFFER_COUNT_RECORDS);
config.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "false");
assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config))
.isInstanceOf(SnowflakeKafkaConnectorException.class)
.hasMessageContaining(BUFFER_COUNT_RECORDS);
Expand All @@ -560,6 +552,7 @@ public void testStreamingBufferCountNegative() {
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
config.put(BUFFER_COUNT_RECORDS, "-1");
config.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "false");
assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config))
.isInstanceOf(SnowflakeKafkaConnectorException.class)
.hasMessageContaining(BUFFER_COUNT_RECORDS);
Expand All @@ -573,6 +566,7 @@ public void testStreamingBufferCountValue() {
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
config.put(BUFFER_COUNT_RECORDS, "adssadsa");
config.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "false");
assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config))
.isInstanceOf(SnowflakeKafkaConnectorException.class)
.hasMessageContaining(BUFFER_COUNT_RECORDS);
Expand Down Expand Up @@ -920,6 +914,7 @@ public void testEmptyClientId() {
config.put(SnowflakeSinkConnectorConfig.AUTHENTICATOR_TYPE, Utils.OAUTH);
config.put(SnowflakeSinkConnectorConfig.OAUTH_CLIENT_SECRET, "client_secret");
config.put(SnowflakeSinkConnectorConfig.OAUTH_REFRESH_TOKEN, "refresh_token");
config.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "false");
assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config))
.isInstanceOf(SnowflakeKafkaConnectorException.class)
.hasMessageContaining(SnowflakeSinkConnectorConfig.OAUTH_CLIENT_ID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ void shouldNotPropagateStreamingClientProperties_SingleBufferDisabled() {

connectorConfig.put(BUFFER_SIZE_BYTES, "10000000");
connectorConfig.put(SNOWPIPE_STREAMING_MAX_MEMORY_LIMIT_IN_BYTES, "20000000");
connectorConfig.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "false");

// WHEN
StreamingClientProperties resultProperties = new StreamingClientProperties(connectorConfig);
Expand Down

0 comments on commit f0e5f23

Please sign in to comment.