From bc86610da925937bdb97de25416d95837c629bc9 Mon Sep 17 00:00:00 2001 From: revi cheng Date: Thu, 18 May 2023 14:50:46 -0700 Subject: [PATCH] One client change 3/3: Call into client provider and corresponding resilience tests (#609) --- .../streaming/SnowflakeSinkServiceV2.java | 84 +++---------------- .../streaming/StreamingClientProvider.java | 2 +- .../SnowflakeSinkTaskStreamingTest.java | 2 - .../StreamingClientConcurrencyTest.java | 2 +- .../StreamingClientProviderTest.java | 8 +- .../streaming/TopicPartitionChannelIT.java | 2 + 6 files changed, 20 insertions(+), 80 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java index 84a0970e6..b1dc34530 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java @@ -1,11 +1,9 @@ package com.snowflake.kafka.connector.internal.streaming; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES_DEFAULT; -import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION; import static com.snowflake.kafka.connector.internal.streaming.StreamingUtils.STREAMING_BUFFER_COUNT_RECORDS_DEFAULT; import static com.snowflake.kafka.connector.internal.streaming.StreamingUtils.STREAMING_BUFFER_FLUSH_TIME_DEFAULT_SEC; import static com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE; -import static net.snowflake.ingest.utils.ParameterProvider.BLOB_FORMAT_VERSION; import com.codahale.metrics.MetricRegistry; import com.google.common.annotations.VisibleForTesting; @@ -19,17 +17,12 @@ import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService; import com.snowflake.kafka.connector.records.RecordService; import com.snowflake.kafka.connector.records.SnowflakeMetadataConfig; -import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Optional; -import java.util.Properties; import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient; -import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClientFactory; -import net.snowflake.ingest.utils.SFException; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTaskContext; @@ -52,8 +45,6 @@ public class SnowflakeSinkServiceV2 implements SnowflakeSinkService { private static final KCLogger LOGGER = new KCLogger(SnowflakeSinkServiceV2.class.getName()); - private static String STREAMING_CLIENT_PREFIX_NAME = "KC_CLIENT_"; - // Assume next three values are a threshold after which we will call insertRows API // Set in config (Time based flush) in seconds private long flushTimeSeconds; @@ -94,10 +85,6 @@ public class SnowflakeSinkServiceV2 implements SnowflakeSinkService { // Config set in JSON private final Map connectorConfig; - private final String taskId; - - private final String streamingIngestClientName; - private boolean enableSchematization; /** @@ -130,10 +117,10 @@ public SnowflakeSinkServiceV2( this.enableSchematization = this.recordService.setAndGetEnableSchematizationFromConfig(this.connectorConfig); - this.taskId = connectorConfig.getOrDefault(Utils.TASK_ID, "-1"); - this.streamingIngestClientName = - STREAMING_CLIENT_PREFIX_NAME + conn.getConnectorName() + "_" + taskId; - initStreamingClient(); + this.streamingIngestClient = + StreamingClientProvider.getStreamingClientProviderInstance() + .getClient(this.connectorConfig); + this.partitionsToChannel = new HashMap<>(); } @@ -152,8 +139,6 @@ public SnowflakeSinkServiceV2( SinkTaskContext sinkTaskContext, SnowflakeStreamingIngestClient streamingIngestClient, Map connectorConfig, - String taskId, - String streamingIngestClientName, boolean enableSchematization, Map partitionsToChannel) { this.flushTimeSeconds = flushTimeSeconds; @@ -169,8 +154,9 @@ public SnowflakeSinkServiceV2( this.sinkTaskContext = sinkTaskContext; this.streamingIngestClient = streamingIngestClient; this.connectorConfig = connectorConfig; - this.taskId = taskId; - this.streamingIngestClientName = streamingIngestClientName; + this.streamingIngestClient = + StreamingClientProvider.getStreamingClientProviderInstance() + .getClient(this.connectorConfig); this.enableSchematization = enableSchematization; this.partitionsToChannel = partitionsToChannel; } @@ -308,7 +294,9 @@ public void closeAll() { topicPartitionChannel.closeChannel(); }); partitionsToChannel.clear(); - closeStreamingClient(); + + StreamingClientProvider.getStreamingClientProviderInstance() + .closeClient(this.streamingIngestClient); } /** @@ -477,7 +465,8 @@ public static String partitionChannelKey(String topic, int partition) { /* Used for testing */ @VisibleForTesting SnowflakeStreamingIngestClient getStreamingIngestClient() { - return this.streamingIngestClient; + return StreamingClientProvider.getStreamingClientProviderInstance() + .getClient(this.connectorConfig); } /** @@ -494,55 +483,6 @@ protected Optional getTopicPartitionChannelFromCacheKey( } // ------ Streaming Ingest Related Functions ------ // - - /* Init Streaming client. If is also used to re-init the client if client was closed before. */ - private void initStreamingClient() { - Map streamingPropertiesMap = - StreamingUtils.convertConfigForStreamingClient(new HashMap<>(this.connectorConfig)); - Properties streamingClientProps = new Properties(); - streamingClientProps.putAll(streamingPropertiesMap); - if (this.streamingIngestClient == null || this.streamingIngestClient.isClosed()) { - try { - // Override only if bdec version is explicitly set in config, default to the version set - // inside - // Ingest SDK - Map parameterOverrides = new HashMap<>(); - Optional snowpipeStreamingBdecVersion = - Optional.ofNullable(this.connectorConfig.get(SNOWPIPE_STREAMING_FILE_VERSION)); - snowpipeStreamingBdecVersion.ifPresent( - overriddenValue -> { - LOGGER.info("Config is overridden for {} ", SNOWPIPE_STREAMING_FILE_VERSION); - parameterOverrides.put(BLOB_FORMAT_VERSION, overriddenValue); - }); - - LOGGER.info("Initializing Streaming Client. ClientName:{}", this.streamingIngestClientName); - this.streamingIngestClient = - SnowflakeStreamingIngestClientFactory.builder(this.streamingIngestClientName) - .setProperties(streamingClientProps) - .setParameterOverrides(parameterOverrides) - .build(); - } catch (SFException ex) { - LOGGER.error( - "Exception creating streamingIngestClient with name:{}", - this.streamingIngestClientName); - throw new ConnectException(ex); - } - } - } - - /** Closes the streaming client. */ - private void closeStreamingClient() { - LOGGER.info("Closing Streaming Client:{}", this.streamingIngestClientName); - try { - streamingIngestClient.close(); - } catch (Exception e) { - LOGGER.error( - "Failure closing Streaming client msg:{}, cause:{}", - e.getMessage(), - Arrays.toString(e.getCause().getStackTrace())); - } - } - private void createTableIfNotExists(final String tableName) { if (this.conn.tableExist(tableName)) { if (!this.enableSchematization) { diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProvider.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProvider.java index 96241877e..066738fe9 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProvider.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProvider.java @@ -47,7 +47,7 @@ public static StreamingClientProvider getStreamingClientProviderInstance() { /** ONLY FOR TESTING - to get a provider with injected properties */ @VisibleForTesting - public static StreamingClientProvider injectStreamingClientProviderForTests( + public static StreamingClientProvider getStreamingClientProviderForTests( SnowflakeStreamingIngestClient parameterEnabledClient, StreamingClientHandler streamingClientHandler) { return new StreamingClientProvider(parameterEnabledClient, streamingClientHandler); diff --git a/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskStreamingTest.java b/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskStreamingTest.java index b4bc0b197..3e8678645 100644 --- a/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskStreamingTest.java +++ b/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskStreamingTest.java @@ -111,8 +111,6 @@ public void testSinkTaskInvalidRecord_InMemoryDLQ() throws Exception { inMemorySinkTaskContext, mockStreamingClient, config, - "0", - "TEST_CLIENT", false, topicPartitionChannelMap); diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientConcurrencyTest.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientConcurrencyTest.java index d1b6ed1c8..236a482f1 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientConcurrencyTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientConcurrencyTest.java @@ -68,7 +68,7 @@ public void setup() { this.streamingClientHandler = Mockito.spy(StreamingClientHandler.class); this.streamingClientProvider = - StreamingClientProvider.injectStreamingClientProviderForTests( + StreamingClientProvider.getStreamingClientProviderForTests( null, this.streamingClientHandler); this.getClientFuturesTeardown = new ArrayList<>(); diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProviderTest.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProviderTest.java index 2f6d3af63..751d3c534 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProviderTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProviderTest.java @@ -17,7 +17,7 @@ package com.snowflake.kafka.connector.internal.streaming; -import static com.snowflake.kafka.connector.internal.streaming.StreamingClientProvider.injectStreamingClientProviderForTests; +import static com.snowflake.kafka.connector.internal.streaming.StreamingClientProvider.getStreamingClientProviderForTests; import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; import com.snowflake.kafka.connector.Utils; @@ -74,7 +74,7 @@ public void setup() { this.streamingClientHandler = Mockito.spy(StreamingClientHandler.class); this.streamingClientProvider = - StreamingClientProvider.injectStreamingClientProviderForTests( + StreamingClientProvider.getStreamingClientProviderForTests( null, this.streamingClientHandler); } @@ -110,7 +110,7 @@ public void testGetInvalidClient() { this.invalidClient = Mockito.mock(SnowflakeStreamingIngestClient.class); Mockito.when(this.invalidClient.isClosed()).thenReturn(true); StreamingClientProvider injectedProvider = - injectStreamingClientProviderForTests(this.invalidClient, this.streamingClientHandler); + getStreamingClientProviderForTests(this.invalidClient, this.streamingClientHandler); // test: getting invalid client with valid config this.validClient = injectedProvider.getClient(validClientConfig); @@ -169,7 +169,7 @@ public void testCloseClients() throws Exception { // test closing all clients StreamingClientProvider injectedProvider = - injectStreamingClientProviderForTests(this.client1, this.streamingClientHandler); + getStreamingClientProviderForTests(this.client1, this.streamingClientHandler); injectedProvider.closeClient(this.client1); diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java index 4e311a382..a78b2e0c3 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java @@ -258,6 +258,7 @@ public void testAutoChannelReopen_InsertRowsSFException() throws Exception { public void testAutoChannelReopen_MultiplePartitionsInsertRowsSFException() throws Exception { Map config = TestUtils.getConfForStreaming(); SnowflakeSinkConnectorConfig.setDefaultValues(config); + config.put(SnowflakeSinkConnectorConfig.ENABLE_STREAMING_CLIENT_OPTIMIZATION_CONFIG, "true"); InMemorySinkTaskContext inMemorySinkTaskContext = new InMemorySinkTaskContext(Collections.singleton(topicPartition)); @@ -375,6 +376,7 @@ public void testAutoChannelReopen_MultiplePartitionsInsertRowsSFException() thro public void testAutoChannelReopen_SinglePartitionsInsertRowsSFException() throws Exception { Map config = TestUtils.getConfForStreaming(); SnowflakeSinkConnectorConfig.setDefaultValues(config); + config.put(SnowflakeSinkConnectorConfig.ENABLE_STREAMING_CLIENT_OPTIMIZATION_CONFIG, "true"); InMemorySinkTaskContext inMemorySinkTaskContext = new InMemorySinkTaskContext(Collections.singleton(topicPartition));