From f26fb2f2340ebf4a64b1c3641be5783c6c98783f Mon Sep 17 00:00:00 2001 From: revi cheng Date: Thu, 18 May 2023 15:33:55 -0700 Subject: [PATCH] Enable topic regexes for table2topic mapping (#628) --- .../kafka/connector/SnowflakeSinkTask.java | 10 ++ .../com/snowflake/kafka/connector/Utils.java | 20 +++ .../snowflake/kafka/connector/SinkTaskIT.java | 8 ++ .../SnowflakeSinkTaskForStreamingIT.java | 135 ++++++++++++++++++ .../snowflake/kafka/connector/UtilsTest.java | 60 ++++++++ 5 files changed, 233 insertions(+) diff --git a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java index a31c3050e..91a3bc078 100644 --- a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java +++ b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java @@ -102,6 +102,16 @@ public SnowflakeSinkTask( this.conn = connectionService; } + @VisibleForTesting + // @codeCoverageIgnore + public SnowflakeSinkTask( + SnowflakeSinkService service, + SnowflakeConnectionService connectionService, + Map topic2table) { + this(service, connectionService); + this.topic2table = topic2table; + } + private SnowflakeConnectionService getConnection() { try { waitFor(() -> conn != null); diff --git a/src/main/java/com/snowflake/kafka/connector/Utils.java b/src/main/java/com/snowflake/kafka/connector/Utils.java index cd66e391f..e1fea0645 100644 --- a/src/main/java/com/snowflake/kafka/connector/Utils.java +++ b/src/main/java/com/snowflake/kafka/connector/Utils.java @@ -547,6 +547,14 @@ public static String generateValidName(String topic, Map topic2t if (topic2table.containsKey(topic)) { return topic2table.get(topic); } + + // try matching regex tables + for (String regexTopic : topic2table.keySet()) { + if (topic.matches(regexTopic)) { + return topic2table.get(regexTopic); + } + } + if (Utils.isValidSnowflakeObjectIdentifier(topic)) { return topic; } @@ -554,6 +562,9 @@ public static String generateValidName(String topic, Map topic2t StringBuilder result = new StringBuilder(); + // remove wildcard regex from topic name to generate table name + topic = topic.replaceAll("\\.\\*", ""); + int index = 0; // first char if (topic.substring(index, index + 1).matches("[_a-zA-Z]")) { @@ -606,6 +617,15 @@ public static Map parseTopicToTableMap(String input) { isInvalid = true; } + // check that regexes don't overlap + for (String parsedTopic : topic2Table.keySet()) { + if (parsedTopic.matches(topic) || topic.matches(parsedTopic)) { + LOGGER.error( + "topic regexes cannot overlap. overlapping regexes: {}, {}", parsedTopic, topic); + isInvalid = true; + } + } + topic2Table.put(tt[0].trim(), tt[1].trim()); } if (isInvalid) { diff --git a/src/test/java/com/snowflake/kafka/connector/SinkTaskIT.java b/src/test/java/com/snowflake/kafka/connector/SinkTaskIT.java index a9c2c1279..d49512312 100644 --- a/src/test/java/com/snowflake/kafka/connector/SinkTaskIT.java +++ b/src/test/java/com/snowflake/kafka/connector/SinkTaskIT.java @@ -305,4 +305,12 @@ public void testMultipleSinkTasksWithLogs() throws Exception { assert offsetMap1.get(topicPartitions0.get(0)).offset() == BUFFER_COUNT_RECORDS_DEFAULT; assert offsetMap0.get(topicPartitions1.get(0)).offset() == BUFFER_COUNT_RECORDS_DEFAULT; } + + @Test + public void testTopicToTableRegex() { + Map config = TestUtils.getConf(); + SnowflakeSinkConnectorConfig.setDefaultValues(config); + + SnowflakeSinkTaskForStreamingIT.testTopicToTableRegexMain(config); + } } diff --git a/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskForStreamingIT.java b/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskForStreamingIT.java index 278df9f09..bc87d1919 100644 --- a/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskForStreamingIT.java +++ b/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskForStreamingIT.java @@ -4,11 +4,15 @@ import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT; import com.snowflake.kafka.connector.internal.KCLogger; +import com.snowflake.kafka.connector.internal.SnowflakeConnectionService; +import com.snowflake.kafka.connector.internal.SnowflakeErrors; +import com.snowflake.kafka.connector.internal.SnowflakeSinkService; import com.snowflake.kafka.connector.internal.TestUtils; import com.snowflake.kafka.connector.internal.streaming.InMemorySinkTaskContext; import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig; import java.sql.ResultSet; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -192,4 +196,135 @@ public void testSinkTaskWithMultipleOpenClose() throws Exception { assert partitionsInTable.size() == 2; } + + @Test + public void testTopicToTableRegex() { + Map config = TestUtils.getConfForStreaming(); + + testTopicToTableRegexMain(config); + } + + // runner for topic to table regex testing, used to test both streaming and snowpipe scenarios. + // Unfortunately cannot be moved to test utils due to the scope of some static variables + public static void testTopicToTableRegexMain(Map config) { + // constants + String catTable = "cat_table"; + String catTopicRegex = ".*_cat"; + String catTopicStr1 = "calico_cat"; + String catTopicStr2 = "orange_cat"; + + String bigCatTable = "big_cat_table"; + String bigCatTopicRegex = "big.*_.*_cat"; + String bigCatTopicStr1 = "big_calico_cat"; + String bigCatTopicStr2 = "biggest_orange_cat"; + + String dogTable = "dog_table"; + String dogTopicRegex = ".*_dog"; + String dogTopicStr1 = "corgi_dog"; + + String catchallTable = "animal_table"; + String catchAllRegex = ".*"; + String birdTopicStr1 = "bird"; + + // test two regexes. bird should create its own table + String twoRegexConfig = + Utils.formatString("{}:{}, {}:{}", bigCatTopicRegex, bigCatTable, dogTopicRegex, dogTable); + List twoRegexPartitionStrs = + Arrays.asList(bigCatTopicStr1, bigCatTopicStr2, dogTopicStr1, birdTopicStr1); + Map twoRegexExpected = new HashMap<>(); + twoRegexExpected.put(bigCatTopicStr1, bigCatTable); + twoRegexExpected.put(bigCatTopicStr2, bigCatTable); + twoRegexExpected.put(dogTopicStr1, dogTable); + twoRegexExpected.put(birdTopicStr1, birdTopicStr1); + testTopicToTableRegexRunner(config, twoRegexConfig, twoRegexPartitionStrs, twoRegexExpected); + + // test two regexes with catchall. catchall should overlap both regexes and fail the test + String twoRegexCatchAllConfig = + Utils.formatString( + "{}:{}, {}:{},{}:{}", + catchAllRegex, + catchallTable, + catTopicRegex, + catTable, + dogTopicRegex, + dogTable); + List twoRegexCatchAllPartitionStrs = + Arrays.asList(catTopicStr1, catTopicStr2, bigCatTopicStr1, dogTopicStr1, birdTopicStr1); + Map twoRegexCatchAllExpected = new HashMap<>(); + assert TestUtils.assertError( + SnowflakeErrors.ERROR_0021, + () -> + testTopicToTableRegexRunner( + config, + twoRegexCatchAllConfig, + twoRegexCatchAllPartitionStrs, + twoRegexCatchAllExpected)); + + // test invalid overlapping regexes + String invalidTwoRegexConfig = + Utils.formatString("{}:{}, {}:{}", catTopicRegex, catTable, bigCatTopicRegex, bigCatTable); + List invalidTwoRegexPartitionStrs = + Arrays.asList(catTopicStr1, catTopicStr2, dogTopicStr1, birdTopicStr1); + Map invalidTwoRegexExpected = new HashMap<>(); + assert TestUtils.assertError( + SnowflakeErrors.ERROR_0021, + () -> + testTopicToTableRegexRunner( + config, + invalidTwoRegexConfig, + invalidTwoRegexPartitionStrs, + invalidTwoRegexExpected)); + + // test catchall regex + String catchAllConfig = Utils.formatString("{}:{}", catchAllRegex, catchallTable); + List catchAllPartitionStrs = + Arrays.asList(catTopicStr1, catTopicStr2, dogTopicStr1, birdTopicStr1); + Map catchAllExpected = new HashMap<>(); + catchAllExpected.put(catTopicStr1, catchallTable); + catchAllExpected.put(catTopicStr2, catchallTable); + catchAllExpected.put(dogTopicStr1, catchallTable); + catchAllExpected.put(birdTopicStr1, catchallTable); + testTopicToTableRegexRunner(config, catchAllConfig, catchAllPartitionStrs, catchAllExpected); + } + + private static void testTopicToTableRegexRunner( + Map connectorBaseConfig, + String topic2tableRegex, + List partitionStrList, + Map expectedTopic2TableConfig) { + // setup + connectorBaseConfig.put(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP, topic2tableRegex); + + // setup partitions + List testPartitions = new ArrayList<>(); + for (int i = 0; i < partitionStrList.size(); i++) { + testPartitions.add(new TopicPartition(partitionStrList.get(i), i)); + } + + // mocks + SnowflakeSinkService serviceSpy = Mockito.spy(SnowflakeSinkService.class); + SnowflakeConnectionService connSpy = Mockito.spy(SnowflakeConnectionService.class); + Map parsedConfig = SnowflakeSinkTask.getTopicToTableMap(connectorBaseConfig); + + SnowflakeSinkTask sinkTask = new SnowflakeSinkTask(serviceSpy, connSpy, parsedConfig); + + // test topics were mapped correctly + sinkTask.open(testPartitions); + + // verify expected num tasks opened + Mockito.verify(serviceSpy, Mockito.times(expectedTopic2TableConfig.size())) + .startTask(Mockito.anyString(), Mockito.any(TopicPartition.class)); + + for (String topicStr : expectedTopic2TableConfig.keySet()) { + TopicPartition topic = null; + String table = expectedTopic2TableConfig.get(topicStr); + for (TopicPartition currTp : testPartitions) { + if (currTp.topic().equals(topicStr)) { + topic = currTp; + Mockito.verify(serviceSpy, Mockito.times(1)).startTask(table, topic); + } + } + Assert.assertNotNull("Expected topic partition was not opened by the tast", topic); + } + } } diff --git a/src/test/java/com/snowflake/kafka/connector/UtilsTest.java b/src/test/java/com/snowflake/kafka/connector/UtilsTest.java index 42680c542..3c831966b 100644 --- a/src/test/java/com/snowflake/kafka/connector/UtilsTest.java +++ b/src/test/java/com/snowflake/kafka/connector/UtilsTest.java @@ -54,6 +54,44 @@ public void testParseTopicToTable() { SnowflakeErrors.ERROR_0021, () -> Utils.parseTopicToTableMap("abc:@123,bvd:adsa")); } + @Test + public void testParseTopicToTableRegex() { + String catTable = "cat_table"; + String dogTable = "dog_table"; + String catTopicRegex = ".*_cat"; + String dogTopicRegex = ".*_dog"; + + // test two different regexs + Map topic2table = + Utils.parseTopicToTableMap( + Utils.formatString("{}:{},{}:{}", catTopicRegex, catTable, dogTopicRegex, dogTable)); + assert topic2table.containsKey(catTopicRegex); + assert topic2table.containsKey(dogTopicRegex); + assert topic2table.containsValue(catTable); + assert topic2table.containsValue(dogTable); + assert topic2table.keySet().size() == 2; + + // error: overlapping regex, same table + assert TestUtils.assertError( + SnowflakeErrors.ERROR_0021, + () -> + Utils.parseTopicToTableMap( + Utils.formatString( + "{}:{},{}:{}", catTopicRegex, catTable, "big_" + catTopicRegex, catTable))); + + // error: overlapping regex, different table + assert TestUtils.assertError( + SnowflakeErrors.ERROR_0021, + () -> + Utils.parseTopicToTableMap( + Utils.formatString( + "{}:{},{}:{}", + catTopicRegex, + catTable, + dogTopicRegex + catTopicRegex, + dogTable))); + } + @Test public void testTableName() { Map topic2table = Utils.parseTopicToTableMap("ab@cd:abcd, 1234:_1234"); @@ -71,6 +109,28 @@ public void testTableName() { assert Utils.tableName(topic, topic2table).equals("_12345_" + Math.abs(topic.hashCode())); } + @Test + public void testTableNameRegex() { + String catTable = "cat_table"; + String dogTable = "dog_table"; + String catTopicRegex = ".*_cat"; + String dogTopicRegex = ".*_dog"; + + // test two different regexs + Map topic2table = + Utils.parseTopicToTableMap( + Utils.formatString("{}:{},{}:{}", catTopicRegex, catTable, dogTopicRegex, dogTable)); + + assert Utils.tableName("calico_cat", topic2table).equals(catTable); + assert Utils.tableName("orange_cat", topic2table).equals(catTable); + assert Utils.tableName("_cat", topic2table).equals(catTable); + assert Utils.tableName("corgi_dog", topic2table).equals(dogTable); + + // test new topic should not have wildcard + String topic = "bird.*"; + assert Utils.tableName(topic, topic2table).equals("bird_" + Math.abs(topic.hashCode())); + } + @Test public void testTableFullName() { assert Utils.isValidSnowflakeTableName("_1342dfsaf$");