Skip to content

Commit

Permalink
add tests for Config Validation funciton
Browse files Browse the repository at this point in the history
  • Loading branch information
binglihub committed Jul 9, 2019
1 parent 5f6e2bf commit d8dad80
Show file tree
Hide file tree
Showing 7 changed files with 474 additions and 247 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,12 @@ public void start(final Map<String, String> parsedConfig)
connectorStartTime = System.currentTimeMillis();

config = new HashMap<>(parsedConfig);
if (!validateConfig())
{
throw SnowflakeErrors.ERROR_0001.getException();
}

Utils.ParameterValidationResult result = Utils.validateConfig(config);

connectorName = result.connectorName;
topics = result.topics;
topicsTablesMap = result.topicsTableMap;

// create a persisted connection, and validate snowflake connection
// config as a side effect
Expand Down Expand Up @@ -202,219 +204,6 @@ public void start(final Map<String, String> parsedConfig)
setupComplete = true;
}

/**
* Validate the input configurations
*/
private boolean validateConfig()
{
boolean configIsValid = true; // verify all config

// define the input parameters / keys in one place as static constants,
// instead of using them directly
// define the thresholds statically in one place as static constants,
// instead of using the values directly

// unique name of this connector instance
connectorName = config.get(SnowflakeSinkConnectorConfig.NAME);
if (connectorName.isEmpty() || !isValidSnowflakeObjectIdentifier
(connectorName))
{
LOGGER.error(Logging.logMessage("{}: {} is empty or invalid. It " +
"should match Snowflake object identifier syntax. Please see the " +
"documentation.", SnowflakeSinkConnectorConfig.NAME, connectorName));
configIsValid = false;
}


// set buffer.count.records -- a Snowflake connector setting
// default : 10000 records
// Number of records buffered in memory per partition before ingesting to
// Snowflake
if (!config.containsKey(SnowflakeSinkConnectorConfig
.BUFFER_COUNT_RECORDS))//buffer.count.records
{
config.put(SnowflakeSinkConnectorConfig.BUFFER_COUNT_RECORDS, "10000");
LOGGER.info(Logging.logMessage("{} set to default 10000 records.",
SnowflakeSinkConnectorConfig.BUFFER_COUNT_RECORDS));
}

// set buffer.size.bytes -- a Snowflake connector setting
// default : 5000000 bytes
// Cumulative size of records buffered in memory per partition before
// ingesting to Snowflake
if (config.containsKey(SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES))
//buffer.size.bytes
{
long bsb = Long.parseLong(config.get(SnowflakeSinkConnectorConfig
.BUFFER_SIZE_BYTES));
if (bsb > 100000000) // 100mb
{
LOGGER.error(Logging.logMessage("{} is too high at {}. It must be " +
"100000000 (100MB) or smaller.", SnowflakeSinkConnectorConfig
.BUFFER_SIZE_BYTES, bsb));
configIsValid = false;
}
}
else
{
config.put(SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES, "5000000");
LOGGER.info(Logging.logMessage("{} set to default 5000000 bytes.",
SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES));
}

// validate topics
if (config.containsKey(SnowflakeSinkConnectorConfig.TOPICS))
{
topics = new ArrayList<>(Arrays.asList(config.get
(SnowflakeSinkConnectorConfig.TOPICS).split(",")));

LOGGER.info(Logging.logMessage("Connector {} consuming topics {}",
connectorName, topics.toString()));

// check for duplicates
HashSet<String> topicsHashSet = new HashSet<>(Arrays.asList(config.get
(SnowflakeSinkConnectorConfig.TOPICS).split(",")));
if (topics.size() != topicsHashSet.size())
{
LOGGER.error(Logging.logMessage("{}: {} contains duplicate " +
"entries.", SnowflakeSinkConnectorConfig.TOPICS,
config.get(SnowflakeSinkConnectorConfig.TOPICS)));
configIsValid = false;
}
}
else
{
LOGGER.error(Logging.logMessage("{} cannot be empty.",
SnowflakeSinkConnectorConfig.TOPICS));
configIsValid = false;
}

// validate snowflake.topic2table.map (optional parameter)
topicsTablesMap = new HashMap<>(); // initialize even if not present in
// config, as it is needed
if (config.containsKey(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP))
{
List<String> topicsTables = new ArrayList<>(Arrays.asList(config.get
(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP).split(",")));

for (String topicTable : topicsTables)
{
String[] tt = topicTable.split(":");

if (tt.length != 2 || tt[0].isEmpty() || tt[1].isEmpty())
{
LOGGER.error(Logging.logMessage("Invalid {} config format: {}",
SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP, topicsTables));
configIsValid = false;
}

if (!isValidSnowflakeObjectIdentifier(tt[1]))
{
LOGGER.error(
Logging.logMessage("table name {} should have at least 2 " +
"characters, start with _a-zA-Z, and only contains " +
"_$a-zA-z0-9", tt[1]));
configIsValid = false;
}

if (!topics.contains(tt[0]))
{
LOGGER.error(Logging.logMessage("topic name {} has not been " +
"registered in this task", tt[0]));
configIsValid = false;
}

if (topicsTablesMap.containsKey(tt[0]))
{
LOGGER.error(Logging.logMessage("topic name {} is duplicated",
tt[0]));
configIsValid = false;
}

if (topicsTablesMap.containsKey(tt[1]))
{
//todo: support multiple topics map to one table ?
LOGGER.error(Logging.logMessage("table name {} is duplicated",
tt[1]));
configIsValid = false;
}

topicsTablesMap.put(tt[0], tt[1]);
}
}

// validate that topics which don't have a table name mapped, have a
// Snowflake compatible name
for (String topic : topics)
{
if (!topicsTablesMap.containsKey(topic))
{
if (isValidSnowflakeObjectIdentifier(topic))
{
topicsTablesMap.put(topic, topic); // use topic name as the table
// name
}
else
{
LOGGER.error(Logging.logMessage("topic: {} in {}: {} config " +
"should either match Snowflake object identifier syntax, or {}:" +
" {} config should contain a mapping table name.", topic,
SnowflakeSinkConnectorConfig.TOPICS,
config.get(SnowflakeSinkConnectorConfig.TOPICS),
SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP,
config.get(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP)));
configIsValid = false;
}
}
}

// sanity check
if (!config.containsKey(SnowflakeSinkConnectorConfig.SNOWFLAKE_DATABASE))
{
LOGGER.error(Logging.logMessage("{} cannot be empty.",
SnowflakeSinkConnectorConfig.SNOWFLAKE_DATABASE));
configIsValid = false;
}

// sanity check
if (!config.containsKey(SnowflakeSinkConnectorConfig.SNOWFLAKE_SCHEMA))
{
LOGGER.error(Logging.logMessage("{} cannot be empty.",
SnowflakeSinkConnectorConfig.SNOWFLAKE_SCHEMA));
configIsValid = false;
}

// jvm proxy settings
configIsValid = Utils.enableJVMProxy(config);

//schemaRegistry

String authSource = config.getOrDefault(
SnowflakeSinkConnectorConfig.SCHEMA_REGISTRY_AUTH_CREDENTIALS_SOURCE, "");
String userInfo = config.getOrDefault(
SnowflakeSinkConnectorConfig.SCHEMA_REGISTRY_AUTH_USER_INFO, "");

if(authSource.isEmpty() ^ userInfo.isEmpty())
{
configIsValid = false;
LOGGER.error(Logging.logMessage("Parameters {} and {} should be defined at the same time",
SnowflakeSinkConnectorConfig.SCHEMA_REGISTRY_AUTH_USER_INFO,
SnowflakeSinkConnectorConfig.SCHEMA_REGISTRY_AUTH_CREDENTIALS_SOURCE));
}

return configIsValid;
}

/**
* validates that given name is a valid snowflake object identifier
*
* @param objName snowflake object name
* @return true if given object name is valid
*/
private boolean isValidSnowflakeObjectIdentifier(String objName)
{
return objName.matches("^[_a-zA-Z]{1}[_$a-zA-Z0-9]+$");
}

/**
* stop method will be called to stop a connector,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ class SnowflakeSinkConnectorConfig

// Connector config
private static final String CONNECTOR_CONFIG = "Connector Config";
static final String BUFFER_COUNT_RECORDS = "buffer.count.records";
static final String BUFFER_SIZE_BYTES = "buffer.size.bytes";
// static final String BUFFER_COUNT_RECORDS = "buffer.count.records";
// static final String BUFFER_SIZE_BYTES = "buffer.size.bytes";
static final String TOPICS_TABLES_MAP = "snowflake.topic2table.map";


Expand Down Expand Up @@ -68,6 +68,7 @@ static ConfigDef newConfigDef()
//snowflake login info
.define(SNOWFLAKE_URL,
Type.STRING,
"",
Importance.HIGH,
"Snowflake account url",
SNOWFLAKE_LOGIN_INFO,
Expand All @@ -76,6 +77,7 @@ static ConfigDef newConfigDef()
SNOWFLAKE_URL)
.define(SNOWFLAKE_USER,
Type.STRING,
"",
Importance.HIGH,
"Snowflake user name",
SNOWFLAKE_LOGIN_INFO,
Expand All @@ -84,6 +86,7 @@ static ConfigDef newConfigDef()
SNOWFLAKE_USER)
.define(SNOWFLAKE_PRIVATE_KEY,
Type.STRING,
"",
Importance.HIGH,
"Private key for Snowflake user",
SNOWFLAKE_LOGIN_INFO,
Expand All @@ -92,6 +95,7 @@ static ConfigDef newConfigDef()
SNOWFLAKE_PRIVATE_KEY)
.define(SNOWFLAKE_DATABASE,
Type.STRING,
"",
Importance.HIGH,
"Snowflake database name",
SNOWFLAKE_LOGIN_INFO,
Expand All @@ -100,6 +104,7 @@ static ConfigDef newConfigDef()
SNOWFLAKE_DATABASE)
.define(SNOWFLAKE_SCHEMA,
Type.STRING,
"",
Importance.HIGH,
"Snowflake database schema name",
SNOWFLAKE_LOGIN_INFO,
Expand All @@ -109,6 +114,7 @@ static ConfigDef newConfigDef()
//proxy
.define(JVM_PROXY_HOST,
Type.STRING,
"",
Importance.LOW,
"JVM option: https.proxyHost",
PROXY_INFO,
Expand All @@ -118,6 +124,7 @@ static ConfigDef newConfigDef()
)
.define(JVM_PROXY_PORT,
Type.STRING,
"",
Importance.LOW,
"JVM option: https.proxyPort",
PROXY_INFO,
Expand All @@ -127,6 +134,7 @@ static ConfigDef newConfigDef()
//schema registry
.define(REGISTRY_URL,
Type.STRING,
"",
Importance.LOW,
"Required by SnowflakeAvroConnector if schema registry is used. Leave blank if schema is included in AVRO record",
SCHEMA_REGISTRY_INFO,
Expand All @@ -135,6 +143,7 @@ static ConfigDef newConfigDef()
REGISTRY_URL)
.define(SCHEMA_REGISTRY_AUTH_CREDENTIALS_SOURCE,
Type.STRING,
"",
Importance.LOW,
"Required by SnowflakeAvroConnector if schema registry authentication used. e.g USER_INFO",
SCHEMA_REGISTRY_INFO,
Expand All @@ -143,6 +152,7 @@ static ConfigDef newConfigDef()
SCHEMA_REGISTRY_AUTH_CREDENTIALS_SOURCE)
.define(SCHEMA_REGISTRY_AUTH_USER_INFO,
Type.STRING,
"",
Importance.LOW,
"User info of schema registry authentication, format: <user name>:<password>",
SCHEMA_REGISTRY_INFO,
Expand All @@ -153,28 +163,30 @@ static ConfigDef newConfigDef()
//Connector Config
.define(TOPICS_TABLES_MAP,
Type.STRING,
"",
Importance.LOW,
"Map of topics to tables (optional). Format : comma-seperated tuples, e.g. <topic-1>:<table-1>,<topic-2>:<table-2>,... ",
CONNECTOR_CONFIG,
0,
ConfigDef.Width.NONE,
TOPICS_TABLES_MAP)
.define(BUFFER_COUNT_RECORDS,
Type.LONG,
Importance.LOW,
"Number of records buffered in memory per partition before triggering Snowflake ingestion",
CONNECTOR_CONFIG,
1,
ConfigDef.Width.NONE,
BUFFER_COUNT_RECORDS)
.define(BUFFER_SIZE_BYTES,
Type.LONG,
Importance.LOW,
"Cumulative size of records buffered in memory per partition before triggering Snowflake ingestion",
CONNECTOR_CONFIG,
2,
ConfigDef.Width.NONE,
BUFFER_SIZE_BYTES)
//todo: support these parameters in next major update
// .define(BUFFER_COUNT_RECORDS,
// Type.LONG,
// Importance.LOW,
// "Number of records buffered in memory per partition before triggering Snowflake ingestion",
// CONNECTOR_CONFIG,
// 1,
// ConfigDef.Width.NONE,
// BUFFER_COUNT_RECORDS)
// .define(BUFFER_SIZE_BYTES,
// Type.LONG,
// Importance.LOW,
// "Cumulative size of records buffered in memory per partition before triggering Snowflake ingestion",
// CONNECTOR_CONFIG,
// 2,
// ConfigDef.Width.NONE,
// BUFFER_SIZE_BYTES)
;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,14 @@ public void start(final Map<String, String> parsedConfig)
connectorName = config.get("name");

recordService = new RecordService(); // default : include all

this.bufferCountRecords = Long.parseLong(config.get
(SnowflakeSinkConnectorConfig.BUFFER_COUNT_RECORDS));
this.bufferSizeBytes = Long.parseLong(config.get
(SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES));
// todo: enable these
// this.bufferCountRecords = Long.parseLong(config.get
// (SnowflakeSinkConnectorConfig.BUFFER_COUNT_RECORDS));
// this.bufferSizeBytes = Long.parseLong(config.get
// (SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES));

this.bufferCountRecords = 10000;
this.bufferSizeBytes = 5000000;

snowflakeConnection = new SnowflakeJDBCWrapper(config);

Expand Down
Loading

0 comments on commit d8dad80

Please sign in to comment.