Skip to content

Commit

Permalink
One client change 3/3: Call into client provider and corresponding re…
Browse files Browse the repository at this point in the history
…silience tests (#609)
  • Loading branch information
sfc-gh-rcheng committed May 18, 2023
1 parent 4b594b0 commit bc86610
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 80 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package com.snowflake.kafka.connector.internal.streaming;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES_DEFAULT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION;
import static com.snowflake.kafka.connector.internal.streaming.StreamingUtils.STREAMING_BUFFER_COUNT_RECORDS_DEFAULT;
import static com.snowflake.kafka.connector.internal.streaming.StreamingUtils.STREAMING_BUFFER_FLUSH_TIME_DEFAULT_SEC;
import static com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE;
import static net.snowflake.ingest.utils.ParameterProvider.BLOB_FORMAT_VERSION;

import com.codahale.metrics.MetricRegistry;
import com.google.common.annotations.VisibleForTesting;
Expand All @@ -19,17 +17,12 @@
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService;
import com.snowflake.kafka.connector.records.RecordService;
import com.snowflake.kafka.connector.records.SnowflakeMetadataConfig;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClientFactory;
import net.snowflake.ingest.utils.SFException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;

Expand All @@ -52,8 +45,6 @@ public class SnowflakeSinkServiceV2 implements SnowflakeSinkService {

private static final KCLogger LOGGER = new KCLogger(SnowflakeSinkServiceV2.class.getName());

private static String STREAMING_CLIENT_PREFIX_NAME = "KC_CLIENT_";

// Assume next three values are a threshold after which we will call insertRows API
// Set in config (Time based flush) in seconds
private long flushTimeSeconds;
Expand Down Expand Up @@ -94,10 +85,6 @@ public class SnowflakeSinkServiceV2 implements SnowflakeSinkService {
// Config set in JSON
private final Map<String, String> connectorConfig;

private final String taskId;

private final String streamingIngestClientName;

private boolean enableSchematization;

/**
Expand Down Expand Up @@ -130,10 +117,10 @@ public SnowflakeSinkServiceV2(
this.enableSchematization =
this.recordService.setAndGetEnableSchematizationFromConfig(this.connectorConfig);

this.taskId = connectorConfig.getOrDefault(Utils.TASK_ID, "-1");
this.streamingIngestClientName =
STREAMING_CLIENT_PREFIX_NAME + conn.getConnectorName() + "_" + taskId;
initStreamingClient();
this.streamingIngestClient =
StreamingClientProvider.getStreamingClientProviderInstance()
.getClient(this.connectorConfig);

this.partitionsToChannel = new HashMap<>();
}

Expand All @@ -152,8 +139,6 @@ public SnowflakeSinkServiceV2(
SinkTaskContext sinkTaskContext,
SnowflakeStreamingIngestClient streamingIngestClient,
Map<String, String> connectorConfig,
String taskId,
String streamingIngestClientName,
boolean enableSchematization,
Map<String, TopicPartitionChannel> partitionsToChannel) {
this.flushTimeSeconds = flushTimeSeconds;
Expand All @@ -169,8 +154,9 @@ public SnowflakeSinkServiceV2(
this.sinkTaskContext = sinkTaskContext;
this.streamingIngestClient = streamingIngestClient;
this.connectorConfig = connectorConfig;
this.taskId = taskId;
this.streamingIngestClientName = streamingIngestClientName;
this.streamingIngestClient =
StreamingClientProvider.getStreamingClientProviderInstance()
.getClient(this.connectorConfig);
this.enableSchematization = enableSchematization;
this.partitionsToChannel = partitionsToChannel;
}
Expand Down Expand Up @@ -308,7 +294,9 @@ public void closeAll() {
topicPartitionChannel.closeChannel();
});
partitionsToChannel.clear();
closeStreamingClient();

StreamingClientProvider.getStreamingClientProviderInstance()
.closeClient(this.streamingIngestClient);
}

/**
Expand Down Expand Up @@ -477,7 +465,8 @@ public static String partitionChannelKey(String topic, int partition) {
/* Used for testing */
@VisibleForTesting
SnowflakeStreamingIngestClient getStreamingIngestClient() {
return this.streamingIngestClient;
return StreamingClientProvider.getStreamingClientProviderInstance()
.getClient(this.connectorConfig);
}

/**
Expand All @@ -494,55 +483,6 @@ protected Optional<TopicPartitionChannel> getTopicPartitionChannelFromCacheKey(
}

// ------ Streaming Ingest Related Functions ------ //

/* Init Streaming client. If is also used to re-init the client if client was closed before. */
private void initStreamingClient() {
Map<String, String> streamingPropertiesMap =
StreamingUtils.convertConfigForStreamingClient(new HashMap<>(this.connectorConfig));
Properties streamingClientProps = new Properties();
streamingClientProps.putAll(streamingPropertiesMap);
if (this.streamingIngestClient == null || this.streamingIngestClient.isClosed()) {
try {
// Override only if bdec version is explicitly set in config, default to the version set
// inside
// Ingest SDK
Map<String, Object> parameterOverrides = new HashMap<>();
Optional<String> snowpipeStreamingBdecVersion =
Optional.ofNullable(this.connectorConfig.get(SNOWPIPE_STREAMING_FILE_VERSION));
snowpipeStreamingBdecVersion.ifPresent(
overriddenValue -> {
LOGGER.info("Config is overridden for {} ", SNOWPIPE_STREAMING_FILE_VERSION);
parameterOverrides.put(BLOB_FORMAT_VERSION, overriddenValue);
});

LOGGER.info("Initializing Streaming Client. ClientName:{}", this.streamingIngestClientName);
this.streamingIngestClient =
SnowflakeStreamingIngestClientFactory.builder(this.streamingIngestClientName)
.setProperties(streamingClientProps)
.setParameterOverrides(parameterOverrides)
.build();
} catch (SFException ex) {
LOGGER.error(
"Exception creating streamingIngestClient with name:{}",
this.streamingIngestClientName);
throw new ConnectException(ex);
}
}
}

/** Closes the streaming client. */
private void closeStreamingClient() {
LOGGER.info("Closing Streaming Client:{}", this.streamingIngestClientName);
try {
streamingIngestClient.close();
} catch (Exception e) {
LOGGER.error(
"Failure closing Streaming client msg:{}, cause:{}",
e.getMessage(),
Arrays.toString(e.getCause().getStackTrace()));
}
}

private void createTableIfNotExists(final String tableName) {
if (this.conn.tableExist(tableName)) {
if (!this.enableSchematization) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public static StreamingClientProvider getStreamingClientProviderInstance() {

/** ONLY FOR TESTING - to get a provider with injected properties */
@VisibleForTesting
public static StreamingClientProvider injectStreamingClientProviderForTests(
public static StreamingClientProvider getStreamingClientProviderForTests(
SnowflakeStreamingIngestClient parameterEnabledClient,
StreamingClientHandler streamingClientHandler) {
return new StreamingClientProvider(parameterEnabledClient, streamingClientHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,6 @@ public void testSinkTaskInvalidRecord_InMemoryDLQ() throws Exception {
inMemorySinkTaskContext,
mockStreamingClient,
config,
"0",
"TEST_CLIENT",
false,
topicPartitionChannelMap);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void setup() {

this.streamingClientHandler = Mockito.spy(StreamingClientHandler.class);
this.streamingClientProvider =
StreamingClientProvider.injectStreamingClientProviderForTests(
StreamingClientProvider.getStreamingClientProviderForTests(
null, this.streamingClientHandler);

this.getClientFuturesTeardown = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package com.snowflake.kafka.connector.internal.streaming;

import static com.snowflake.kafka.connector.internal.streaming.StreamingClientProvider.injectStreamingClientProviderForTests;
import static com.snowflake.kafka.connector.internal.streaming.StreamingClientProvider.getStreamingClientProviderForTests;

import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
import com.snowflake.kafka.connector.Utils;
Expand Down Expand Up @@ -74,7 +74,7 @@ public void setup() {

this.streamingClientHandler = Mockito.spy(StreamingClientHandler.class);
this.streamingClientProvider =
StreamingClientProvider.injectStreamingClientProviderForTests(
StreamingClientProvider.getStreamingClientProviderForTests(
null, this.streamingClientHandler);
}

Expand Down Expand Up @@ -110,7 +110,7 @@ public void testGetInvalidClient() {
this.invalidClient = Mockito.mock(SnowflakeStreamingIngestClient.class);
Mockito.when(this.invalidClient.isClosed()).thenReturn(true);
StreamingClientProvider injectedProvider =
injectStreamingClientProviderForTests(this.invalidClient, this.streamingClientHandler);
getStreamingClientProviderForTests(this.invalidClient, this.streamingClientHandler);

// test: getting invalid client with valid config
this.validClient = injectedProvider.getClient(validClientConfig);
Expand Down Expand Up @@ -169,7 +169,7 @@ public void testCloseClients() throws Exception {

// test closing all clients
StreamingClientProvider injectedProvider =
injectStreamingClientProviderForTests(this.client1, this.streamingClientHandler);
getStreamingClientProviderForTests(this.client1, this.streamingClientHandler);

injectedProvider.closeClient(this.client1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ public void testAutoChannelReopen_InsertRowsSFException() throws Exception {
public void testAutoChannelReopen_MultiplePartitionsInsertRowsSFException() throws Exception {
Map<String, String> config = TestUtils.getConfForStreaming();
SnowflakeSinkConnectorConfig.setDefaultValues(config);
config.put(SnowflakeSinkConnectorConfig.ENABLE_STREAMING_CLIENT_OPTIMIZATION_CONFIG, "true");

InMemorySinkTaskContext inMemorySinkTaskContext =
new InMemorySinkTaskContext(Collections.singleton(topicPartition));
Expand Down Expand Up @@ -375,6 +376,7 @@ public void testAutoChannelReopen_MultiplePartitionsInsertRowsSFException() thro
public void testAutoChannelReopen_SinglePartitionsInsertRowsSFException() throws Exception {
Map<String, String> config = TestUtils.getConfForStreaming();
SnowflakeSinkConnectorConfig.setDefaultValues(config);
config.put(SnowflakeSinkConnectorConfig.ENABLE_STREAMING_CLIENT_OPTIMIZATION_CONFIG, "true");

InMemorySinkTaskContext inMemorySinkTaskContext =
new InMemorySinkTaskContext(Collections.singleton(topicPartition));
Expand Down

0 comments on commit bc86610

Please sign in to comment.