Skip to content

Commit

Permalink
Enable topic regexes for table2topic mapping (#628)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-rcheng committed May 18, 2023
1 parent bc86610 commit f26fb2f
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 0 deletions.
10 changes: 10 additions & 0 deletions src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@ public SnowflakeSinkTask(
this.conn = connectionService;
}

@VisibleForTesting
// @codeCoverageIgnore
public SnowflakeSinkTask(
SnowflakeSinkService service,
SnowflakeConnectionService connectionService,
Map<String, String> topic2table) {
this(service, connectionService);
this.topic2table = topic2table;
}

private SnowflakeConnectionService getConnection() {
try {
waitFor(() -> conn != null);
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/com/snowflake/kafka/connector/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -547,13 +547,24 @@ public static String generateValidName(String topic, Map<String, String> topic2t
if (topic2table.containsKey(topic)) {
return topic2table.get(topic);
}

// try matching regex tables
for (String regexTopic : topic2table.keySet()) {
if (topic.matches(regexTopic)) {
return topic2table.get(regexTopic);
}
}

if (Utils.isValidSnowflakeObjectIdentifier(topic)) {
return topic;
}
int hash = Math.abs(topic.hashCode());

StringBuilder result = new StringBuilder();

// remove wildcard regex from topic name to generate table name
topic = topic.replaceAll("\\.\\*", "");

int index = 0;
// first char
if (topic.substring(index, index + 1).matches("[_a-zA-Z]")) {
Expand Down Expand Up @@ -606,6 +617,15 @@ public static Map<String, String> parseTopicToTableMap(String input) {
isInvalid = true;
}

// check that regexes don't overlap
for (String parsedTopic : topic2Table.keySet()) {
if (parsedTopic.matches(topic) || topic.matches(parsedTopic)) {
LOGGER.error(
"topic regexes cannot overlap. overlapping regexes: {}, {}", parsedTopic, topic);
isInvalid = true;
}
}

topic2Table.put(tt[0].trim(), tt[1].trim());
}
if (isInvalid) {
Expand Down
8 changes: 8 additions & 0 deletions src/test/java/com/snowflake/kafka/connector/SinkTaskIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -305,4 +305,12 @@ public void testMultipleSinkTasksWithLogs() throws Exception {
assert offsetMap1.get(topicPartitions0.get(0)).offset() == BUFFER_COUNT_RECORDS_DEFAULT;
assert offsetMap0.get(topicPartitions1.get(0)).offset() == BUFFER_COUNT_RECORDS_DEFAULT;
}

@Test
public void testTopicToTableRegex() {
Map<String, String> config = TestUtils.getConf();
SnowflakeSinkConnectorConfig.setDefaultValues(config);

SnowflakeSinkTaskForStreamingIT.testTopicToTableRegexMain(config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT;

import com.snowflake.kafka.connector.internal.KCLogger;
import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;
import com.snowflake.kafka.connector.internal.SnowflakeErrors;
import com.snowflake.kafka.connector.internal.SnowflakeSinkService;
import com.snowflake.kafka.connector.internal.TestUtils;
import com.snowflake.kafka.connector.internal.streaming.InMemorySinkTaskContext;
import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -192,4 +196,135 @@ public void testSinkTaskWithMultipleOpenClose() throws Exception {

assert partitionsInTable.size() == 2;
}

@Test
public void testTopicToTableRegex() {
Map<String, String> config = TestUtils.getConfForStreaming();

testTopicToTableRegexMain(config);
}

// runner for topic to table regex testing, used to test both streaming and snowpipe scenarios.
// Unfortunately cannot be moved to test utils due to the scope of some static variables
public static void testTopicToTableRegexMain(Map<String, String> config) {
// constants
String catTable = "cat_table";
String catTopicRegex = ".*_cat";
String catTopicStr1 = "calico_cat";
String catTopicStr2 = "orange_cat";

String bigCatTable = "big_cat_table";
String bigCatTopicRegex = "big.*_.*_cat";
String bigCatTopicStr1 = "big_calico_cat";
String bigCatTopicStr2 = "biggest_orange_cat";

String dogTable = "dog_table";
String dogTopicRegex = ".*_dog";
String dogTopicStr1 = "corgi_dog";

String catchallTable = "animal_table";
String catchAllRegex = ".*";
String birdTopicStr1 = "bird";

// test two regexes. bird should create its own table
String twoRegexConfig =
Utils.formatString("{}:{}, {}:{}", bigCatTopicRegex, bigCatTable, dogTopicRegex, dogTable);
List<String> twoRegexPartitionStrs =
Arrays.asList(bigCatTopicStr1, bigCatTopicStr2, dogTopicStr1, birdTopicStr1);
Map<String, String> twoRegexExpected = new HashMap<>();
twoRegexExpected.put(bigCatTopicStr1, bigCatTable);
twoRegexExpected.put(bigCatTopicStr2, bigCatTable);
twoRegexExpected.put(dogTopicStr1, dogTable);
twoRegexExpected.put(birdTopicStr1, birdTopicStr1);
testTopicToTableRegexRunner(config, twoRegexConfig, twoRegexPartitionStrs, twoRegexExpected);

// test two regexes with catchall. catchall should overlap both regexes and fail the test
String twoRegexCatchAllConfig =
Utils.formatString(
"{}:{}, {}:{},{}:{}",
catchAllRegex,
catchallTable,
catTopicRegex,
catTable,
dogTopicRegex,
dogTable);
List<String> twoRegexCatchAllPartitionStrs =
Arrays.asList(catTopicStr1, catTopicStr2, bigCatTopicStr1, dogTopicStr1, birdTopicStr1);
Map<String, String> twoRegexCatchAllExpected = new HashMap<>();
assert TestUtils.assertError(
SnowflakeErrors.ERROR_0021,
() ->
testTopicToTableRegexRunner(
config,
twoRegexCatchAllConfig,
twoRegexCatchAllPartitionStrs,
twoRegexCatchAllExpected));

// test invalid overlapping regexes
String invalidTwoRegexConfig =
Utils.formatString("{}:{}, {}:{}", catTopicRegex, catTable, bigCatTopicRegex, bigCatTable);
List<String> invalidTwoRegexPartitionStrs =
Arrays.asList(catTopicStr1, catTopicStr2, dogTopicStr1, birdTopicStr1);
Map<String, String> invalidTwoRegexExpected = new HashMap<>();
assert TestUtils.assertError(
SnowflakeErrors.ERROR_0021,
() ->
testTopicToTableRegexRunner(
config,
invalidTwoRegexConfig,
invalidTwoRegexPartitionStrs,
invalidTwoRegexExpected));

// test catchall regex
String catchAllConfig = Utils.formatString("{}:{}", catchAllRegex, catchallTable);
List<String> catchAllPartitionStrs =
Arrays.asList(catTopicStr1, catTopicStr2, dogTopicStr1, birdTopicStr1);
Map<String, String> catchAllExpected = new HashMap<>();
catchAllExpected.put(catTopicStr1, catchallTable);
catchAllExpected.put(catTopicStr2, catchallTable);
catchAllExpected.put(dogTopicStr1, catchallTable);
catchAllExpected.put(birdTopicStr1, catchallTable);
testTopicToTableRegexRunner(config, catchAllConfig, catchAllPartitionStrs, catchAllExpected);
}

private static void testTopicToTableRegexRunner(
Map<String, String> connectorBaseConfig,
String topic2tableRegex,
List<String> partitionStrList,
Map<String, String> expectedTopic2TableConfig) {
// setup
connectorBaseConfig.put(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP, topic2tableRegex);

// setup partitions
List<TopicPartition> testPartitions = new ArrayList<>();
for (int i = 0; i < partitionStrList.size(); i++) {
testPartitions.add(new TopicPartition(partitionStrList.get(i), i));
}

// mocks
SnowflakeSinkService serviceSpy = Mockito.spy(SnowflakeSinkService.class);
SnowflakeConnectionService connSpy = Mockito.spy(SnowflakeConnectionService.class);
Map<String, String> parsedConfig = SnowflakeSinkTask.getTopicToTableMap(connectorBaseConfig);

SnowflakeSinkTask sinkTask = new SnowflakeSinkTask(serviceSpy, connSpy, parsedConfig);

// test topics were mapped correctly
sinkTask.open(testPartitions);

// verify expected num tasks opened
Mockito.verify(serviceSpy, Mockito.times(expectedTopic2TableConfig.size()))
.startTask(Mockito.anyString(), Mockito.any(TopicPartition.class));

for (String topicStr : expectedTopic2TableConfig.keySet()) {
TopicPartition topic = null;
String table = expectedTopic2TableConfig.get(topicStr);
for (TopicPartition currTp : testPartitions) {
if (currTp.topic().equals(topicStr)) {
topic = currTp;
Mockito.verify(serviceSpy, Mockito.times(1)).startTask(table, topic);
}
}
Assert.assertNotNull("Expected topic partition was not opened by the tast", topic);
}
}
}
60 changes: 60 additions & 0 deletions src/test/java/com/snowflake/kafka/connector/UtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,44 @@ public void testParseTopicToTable() {
SnowflakeErrors.ERROR_0021, () -> Utils.parseTopicToTableMap("abc:@123,bvd:adsa"));
}

@Test
public void testParseTopicToTableRegex() {
String catTable = "cat_table";
String dogTable = "dog_table";
String catTopicRegex = ".*_cat";
String dogTopicRegex = ".*_dog";

// test two different regexs
Map<String, String> topic2table =
Utils.parseTopicToTableMap(
Utils.formatString("{}:{},{}:{}", catTopicRegex, catTable, dogTopicRegex, dogTable));
assert topic2table.containsKey(catTopicRegex);
assert topic2table.containsKey(dogTopicRegex);
assert topic2table.containsValue(catTable);
assert topic2table.containsValue(dogTable);
assert topic2table.keySet().size() == 2;

// error: overlapping regex, same table
assert TestUtils.assertError(
SnowflakeErrors.ERROR_0021,
() ->
Utils.parseTopicToTableMap(
Utils.formatString(
"{}:{},{}:{}", catTopicRegex, catTable, "big_" + catTopicRegex, catTable)));

// error: overlapping regex, different table
assert TestUtils.assertError(
SnowflakeErrors.ERROR_0021,
() ->
Utils.parseTopicToTableMap(
Utils.formatString(
"{}:{},{}:{}",
catTopicRegex,
catTable,
dogTopicRegex + catTopicRegex,
dogTable)));
}

@Test
public void testTableName() {
Map<String, String> topic2table = Utils.parseTopicToTableMap("ab@cd:abcd, 1234:_1234");
Expand All @@ -71,6 +109,28 @@ public void testTableName() {
assert Utils.tableName(topic, topic2table).equals("_12345_" + Math.abs(topic.hashCode()));
}

@Test
public void testTableNameRegex() {
String catTable = "cat_table";
String dogTable = "dog_table";
String catTopicRegex = ".*_cat";
String dogTopicRegex = ".*_dog";

// test two different regexs
Map<String, String> topic2table =
Utils.parseTopicToTableMap(
Utils.formatString("{}:{},{}:{}", catTopicRegex, catTable, dogTopicRegex, dogTable));

assert Utils.tableName("calico_cat", topic2table).equals(catTable);
assert Utils.tableName("orange_cat", topic2table).equals(catTable);
assert Utils.tableName("_cat", topic2table).equals(catTable);
assert Utils.tableName("corgi_dog", topic2table).equals(dogTable);

// test new topic should not have wildcard
String topic = "bird.*";
assert Utils.tableName(topic, topic2table).equals("bird_" + Math.abs(topic.hashCode()));
}

@Test
public void testTableFullName() {
assert Utils.isValidSnowflakeTableName("_1342dfsaf$");
Expand Down

0 comments on commit f26fb2f

Please sign in to comment.