Skip to content

Commit

Permalink
One client change 2/3: Add client provider (#590)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-rcheng committed May 15, 2023
1 parent f8dd9f5 commit 4b594b0
Show file tree
Hide file tree
Showing 7 changed files with 915 additions and 0 deletions.
25 changes: 25 additions & 0 deletions src/main/java/com/snowflake/kafka/connector/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.net.PasswordAuthentication;
import java.net.URL;
import java.net.URLConnection;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -100,6 +101,10 @@ public class Utils {
public static final String TABLE_COLUMN_CONTENT = "RECORD_CONTENT";
public static final String TABLE_COLUMN_METADATA = "RECORD_METADATA";

public static final String GET_EXCEPTION_FORMAT = "{}, Exception message: {}, cause: {}";
public static final String GET_EXCEPTION_MISSING_MESSAGE = "missing exception message";
public static final String GET_EXCEPTION_MISSING_CAUSE = "missing exception cause";

private static final KCLogger LOGGER = new KCLogger(Utils.class.getName());

/**
Expand Down Expand Up @@ -679,6 +684,26 @@ public static String formatString(String format, Object... vars) {
return format;
}

/**
* Get the message and cause of a missing exception, handling the null or empty cases of each
*
* @param customMessage A custom message to prepend to the exception
* @param ex The message to parse through
* @return A string with the custom message and the exceptions message or cause, if exists
*/
public static String getExceptionMessage(String customMessage, Exception ex) {
String message =
ex.getMessage() == null || ex.getMessage().isEmpty()
? GET_EXCEPTION_MISSING_MESSAGE
: ex.getMessage();
String cause =
ex.getCause() == null || ex.getCause().getStackTrace() == null
? GET_EXCEPTION_MISSING_CAUSE
: Arrays.toString(ex.getCause().getStackTrace());

return formatString(GET_EXCEPTION_FORMAT, customMessage, message, cause);
}

private static void handleInvalidParameters(ImmutableMap<String, String> invalidConfigParams) {
// log all invalid params and throw exception
if (!invalidConfigParams.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright (c) 2023 Snowflake Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.snowflake.kafka.connector.internal.streaming;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION;
import static net.snowflake.ingest.utils.ParameterProvider.BLOB_FORMAT_VERSION;

import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.internal.KCLogger;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClientFactory;
import net.snowflake.ingest.utils.SFException;
import org.apache.kafka.connect.errors.ConnectException;

/** This class handles all calls to manage the streaming ingestion client */
public class StreamingClientHandler {
private static final KCLogger LOGGER = new KCLogger(StreamingClientHandler.class.getName());
private static final String STREAMING_CLIENT_PREFIX_NAME = "KC_CLIENT_";
private static final String TEST_CLIENT_NAME = "TEST_CLIENT";

private AtomicInteger createdClientId = new AtomicInteger(0);

/**
* Checks if a given client is valid (not null, open and has a name)
*
* @param client The client to validate
* @return If the client is valid
*/
public static boolean isClientValid(SnowflakeStreamingIngestClient client) {
return client != null && !client.isClosed() && client.getName() != null;
}

/**
* Creates a streaming client from the given config
*
* @param connectorConfig The config to create the client
* @return A newly created client
*/
public SnowflakeStreamingIngestClient createClient(Map<String, String> connectorConfig) {
LOGGER.info("Initializing Streaming Client...");

// get streaming properties from config
Properties streamingClientProps = new Properties();
streamingClientProps.putAll(
StreamingUtils.convertConfigForStreamingClient(new HashMap<>(connectorConfig)));

try {
// Override only if bdec version is explicitly set in config, default to the version set
// inside Ingest SDK
Map<String, Object> parameterOverrides = new HashMap<>();
Optional<String> snowpipeStreamingBdecVersion =
Optional.ofNullable(connectorConfig.get(SNOWPIPE_STREAMING_FILE_VERSION));
snowpipeStreamingBdecVersion.ifPresent(
overriddenValue -> {
LOGGER.info("Config is overridden for {} ", SNOWPIPE_STREAMING_FILE_VERSION);
parameterOverrides.put(BLOB_FORMAT_VERSION, overriddenValue);
});

String clientName = this.getNewClientName(connectorConfig);

SnowflakeStreamingIngestClient createdClient =
SnowflakeStreamingIngestClientFactory.builder(clientName)
.setProperties(streamingClientProps)
.setParameterOverrides(parameterOverrides)
.build();

LOGGER.info("Successfully initialized Streaming Client:{}", clientName);

return createdClient;
} catch (SFException ex) {
LOGGER.error("Exception creating streamingIngestClient");
throw new ConnectException(ex);
}
}

/**
* Closes the given client. Swallows any exceptions
*
* @param client The client to be closed
*/
public void closeClient(SnowflakeStreamingIngestClient client) {
LOGGER.info("Closing Streaming Client...");

// don't do anything if client is already invalid
if (!isClientValid(client)) {
LOGGER.info("Streaming Client is already closed");
return;
}

try {
String clientName = client.getName();
client.close();
LOGGER.info("Successfully closed Streaming Client:{}", clientName);
} catch (Exception e) {
LOGGER.error(Utils.getExceptionMessage("Failure closing Streaming client", e));
}
}

private String getNewClientName(Map<String, String> connectorConfig) {
return STREAMING_CLIENT_PREFIX_NAME
+ connectorConfig.getOrDefault(Utils.NAME, TEST_CLIENT_NAME)
+ "_"
+ createdClientId.getAndIncrement();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright (c) 2023 Snowflake Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.snowflake.kafka.connector.internal.streaming;

import com.google.common.annotations.VisibleForTesting;
import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
import com.snowflake.kafka.connector.internal.KCLogger;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;

/**
* Factory that provides the streaming client(s). There should only be one provider, but it may
* provide multiple clients if optimizations are disabled - see
* ENABLE_STREAMING_CLIENT_OPTIMIZATION_CONFIG in the {@link SnowflakeSinkConnectorConfig }
*/
public class StreamingClientProvider {
private static class StreamingClientProviderSingleton {
private static final StreamingClientProvider streamingClientProvider =
new StreamingClientProvider();
}

/**
* Gets the current streaming provider
*
* @return The streaming client provider
*/
public static StreamingClientProvider getStreamingClientProviderInstance() {
return StreamingClientProviderSingleton.streamingClientProvider;
}

/** ONLY FOR TESTING - to get a provider with injected properties */
@VisibleForTesting
public static StreamingClientProvider injectStreamingClientProviderForTests(
SnowflakeStreamingIngestClient parameterEnabledClient,
StreamingClientHandler streamingClientHandler) {
return new StreamingClientProvider(parameterEnabledClient, streamingClientHandler);
}

/** ONLY FOR TESTING - private constructor to inject properties for testing */
private StreamingClientProvider(
SnowflakeStreamingIngestClient parameterEnabledClient,
StreamingClientHandler streamingClientHandler) {
this();
this.parameterEnabledClient = parameterEnabledClient;
this.streamingClientHandler = streamingClientHandler;
}

private static final KCLogger LOGGER = new KCLogger(StreamingClientProvider.class.getName());
private SnowflakeStreamingIngestClient parameterEnabledClient;
private StreamingClientHandler streamingClientHandler;
private Lock providerLock;

// private constructor for singleton
private StreamingClientProvider() {
this.streamingClientHandler = new StreamingClientHandler();
providerLock = new ReentrantLock(true);
}

/**
* Gets the current client or creates a new one from the given connector config. If client
* optimization is not enabled, it will create a new streaming client and the caller is
* responsible for closing it
*
* @param connectorConfig The connector config
* @return A streaming client
*/
public SnowflakeStreamingIngestClient getClient(Map<String, String> connectorConfig) {
if (Boolean.parseBoolean(
connectorConfig.getOrDefault(
SnowflakeSinkConnectorConfig.ENABLE_STREAMING_CLIENT_OPTIMIZATION_CONFIG, "false"))) {
LOGGER.debug(
"Streaming client optimization is enabled, returning the existing streaming client if"
+ " valid");
this.providerLock.lock();
// recreate streaming client if needed
if (!StreamingClientHandler.isClientValid(this.parameterEnabledClient)) {
LOGGER.error("Current streaming client is invalid, recreating client");
this.parameterEnabledClient = this.streamingClientHandler.createClient(connectorConfig);
}
this.providerLock.unlock();
return this.parameterEnabledClient;
} else {
LOGGER.debug("Streaming client optimization is disabled, creating a new streaming client");
return this.streamingClientHandler.createClient(connectorConfig);
}
}

/**
* Closes the given client
*
* @param client The client to be closed
*/
public void closeClient(SnowflakeStreamingIngestClient client) {
this.providerLock.lock();
this.streamingClientHandler.closeClient(client);
this.providerLock.unlock();
}
}
34 changes: 34 additions & 0 deletions src/test/java/com/snowflake/kafka/connector/UtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,4 +185,38 @@ public void testLogMessageMultiLines() {
"{} test message\n{} test message\n{} test " + "message\n{} test message", 1, 2, 3, 4)
.equals(expected);
}

@Test
public void testGetExceptionMessage() throws Exception {
String customMessage = "customMessage";
String exceptionMessage = "exceptionMessage";
Exception cause = new Exception("cause");
StackTraceElement[] stackTrace = new StackTraceElement[0];

Exception nullMessageEx = new Exception();
assert Utils.getExceptionMessage(customMessage, nullMessageEx)
.equals(
Utils.formatString(
Utils.GET_EXCEPTION_FORMAT,
customMessage,
Utils.GET_EXCEPTION_MISSING_MESSAGE,
Utils.GET_EXCEPTION_MISSING_CAUSE));

Exception nullCauseEx = new Exception(exceptionMessage);
nullCauseEx.initCause(null);
assert Utils.getExceptionMessage(customMessage, nullCauseEx)
.equals(
Utils.formatString(
Utils.GET_EXCEPTION_FORMAT,
customMessage,
exceptionMessage,
Utils.GET_EXCEPTION_MISSING_CAUSE));

Exception stacktraceEx = new Exception(exceptionMessage);
stacktraceEx.initCause(cause);
stacktraceEx.getCause().setStackTrace(stackTrace);
assert Utils.getExceptionMessage(customMessage, stacktraceEx)
.equals(
Utils.formatString(Utils.GET_EXCEPTION_FORMAT, customMessage, exceptionMessage, "[]"));
}
}
Loading

0 comments on commit 4b594b0

Please sign in to comment.