Skip to content

Commit

Permalink
[Feature][Transform-V2] Support transform with multi-table
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X committed Nov 7, 2024
1 parent a7837f1 commit 08ef91c
Show file tree
Hide file tree
Showing 58 changed files with 2,379 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@

import java.io.Serializable;
import java.net.URL;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -307,16 +306,15 @@ public static OptionRule sinkFullOptionRule(@NonNull TableSinkFactory factory) {
return sinkOptionRule;
}

public static SeaTunnelTransform<?> createAndPrepareTransform(
CatalogTable catalogTable,
public static SeaTunnelTransform<?> createAndPrepareMultiTableTransform(
List<CatalogTable> catalogTables,
ReadonlyConfig options,
ClassLoader classLoader,
String factoryIdentifier) {
final TableTransformFactory factory =
discoverFactory(classLoader, TableTransformFactory.class, factoryIdentifier);
TableTransformFactoryContext context =
new TableTransformFactoryContext(
Collections.singletonList(catalogTable), options, classLoader);
new TableTransformFactoryContext(catalogTables, options, classLoader);
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
return factory.createTransform(context).createTransform();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import org.apache.seatunnel.api.common.PluginIdentifierInterface;
import org.apache.seatunnel.api.source.SeaTunnelJobAware;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;

import java.io.Serializable;
import java.util.List;

public interface SeaTunnelTransform<T>
extends Serializable, PluginIdentifierInterface, SeaTunnelJobAware {
Expand Down Expand Up @@ -53,6 +55,12 @@ default void setTypeInfo(SeaTunnelDataType<T> inputDataType) {
*/
T map(T row);

List<CatalogTable> getProducedCatalogTables();

default SchemaChangeEvent mapSchemaChangeEvent(SchemaChangeEvent schemaChangeEvent) {
return schemaChangeEvent;
}

/** call it when Transformer completed */
default void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public final class Constants {
+ "\\____/ \\___| \\__,_| \\_/ \\__,_||_| |_||_| |_| \\___||_|\n"
+ " \n";
public static final String COPYRIGHT_LINE =
"Copyright © 2021-2022 The Apache Software Foundation. Apache SeaTunnel, SeaTunnel, and its feather logo are trademarks of The Apache Software Foundation.";
"Copyright © 2021-2024 The Apache Software Foundation. Apache SeaTunnel, SeaTunnel, and its feather logo are trademarks of The Apache Software Foundation.";

private Constants() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import java.io.Serializable;
import java.util.List;
import java.util.Objects;

import static org.apache.seatunnel.connectors.seatunnel.assertion.exception.AssertConnectorErrorCode.CATALOG_TABLE_FAILED;

Expand Down Expand Up @@ -137,15 +138,35 @@ public void checkRule(List<Column> check) {
if (CollectionUtils.isEmpty(check)) {
throw new AssertConnectorException(CATALOG_TABLE_FAILED, "columns is null");
}
if (CollectionUtils.isNotEmpty(columns)
&& !CollectionUtils.isEqualCollection(columns, check)) {

if (columns.size() != check.size()) {
throw new AssertConnectorException(
CATALOG_TABLE_FAILED,
String.format("columns: %s is not equal to %s", check, columns));
}
for (int i = 0; i < columns.size(); i++) {
if (!isColumnEqual(columns.get(i), check.get(i))) {
throw new AssertConnectorException(
CATALOG_TABLE_FAILED,
String.format(
"columns: %s is not equal to %s",
check.get(i), columns.get(i)));
}
}
}
}

private static boolean isColumnEqual(Column column1, Column column2) {
return Objects.equals(column1.getName(), column2.getName())
&& Objects.equals(column1.getDataType(), column2.getDataType())
&& Objects.equals(column1.getColumnLength(), column2.getColumnLength())
&& Objects.equals(column1.getScale(), column2.getScale())
&& column1.isNullable() == column2.isNullable()
&& Objects.equals(column1.getDefaultValue(), column2.getDefaultValue())
&& Objects.equals(column1.getComment(), column2.getComment())
&& Objects.equals(column1.getSourceType(), column2.getSourceType());
}

@Data
@AllArgsConstructor
public static class AssertTableIdentifierRule implements Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public AssertSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) {
assertFieldRules = new ConcurrentHashMap<>();
assertRowRules = new ConcurrentHashMap<>();
assertCatalogTableRule = new ConcurrentHashMap<>();
catalogTableName = catalogTable.getTablePath().getFullName();
Config ruleConfig = ConfigFactory.parseMap(pluginConfig.get(RULES));
if (ruleConfig.hasPath(TABLE_CONFIGS.key())) {
List<? extends Config> tableConfigs = ruleConfig.getConfigList(TABLE_CONFIGS.key());
Expand All @@ -78,7 +79,6 @@ public AssertSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) {
String tableName = catalogTable.getTablePath().getFullName();
initTableRule(catalogTable, ruleConfig, tableName);
}
catalogTableName = catalogTable.getTablePath().getFullName();

if (ruleConfig.hasPath(CatalogOptions.TABLE_NAMES.key())) {
assertTableRule =
Expand Down Expand Up @@ -114,7 +114,9 @@ private void initTableRule(CatalogTable catalogTable, Config tableConfig, String
AssertCatalogTableRule catalogTableRule =
new AssertRuleParser()
.parseCatalogTableRule(tableConfig.getConfig(CATALOG_TABLE_RULES));
catalogTableRule.checkRule(catalogTable);
if (tableName.equals(catalogTableName)) {
catalogTableRule.checkRule(catalogTable);
}
assertCatalogTableRule.put(tableName, catalogTableRule);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@

import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -125,7 +124,7 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
resultTableName,
new DataStreamTableInfo(
inputStream,
Collections.singletonList(transform.getProducedCatalogTable()),
transform.getProducedCatalogTables(),
resultTableName));
} catch (Exception e) {
throw new TaskExecuteException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ public class SeaTunnelConnectorTest extends TestSuiteBase implements TestResourc
};

// Match paimon source and paimon sink
private static final Pattern pattern1 =
private static final Pattern PATTERN1 =
Pattern.compile(
"(Paimon (source|sink))(.*?)(?=(Paimon (source|sink)|$))", Pattern.DOTALL);
// Match required options and optional options
private static final Pattern pattern2 =
private static final Pattern PATTERN2 =
Pattern.compile("Required Options:(.*?)(?:Optional Options: (.*?))?$", Pattern.DOTALL);

@Override
Expand Down Expand Up @@ -132,7 +132,7 @@ public void testExecCheck(TestContainer container) throws Exception {
}

private void checkStdOutForOptionRule(String stdout) {
Matcher matcher1 = pattern1.matcher(stdout.trim());
Matcher matcher1 = PATTERN1.matcher(stdout.trim());
String paimonSourceContent = StringUtils.EMPTY;
String paimonSinkContent = StringUtils.EMPTY;
Assertions.assertTrue(matcher1.groupCount() >= 3);
Expand All @@ -153,7 +153,7 @@ private void checkStdOutForOptionRule(String stdout) {

private void checkStdOutForOptionRuleOfSinglePluginTypeWithTransform(
String stdout, Factory factory) {
Matcher matcher2 = pattern2.matcher(stdout.trim());
Matcher matcher2 = PATTERN2.matcher(stdout.trim());
Assertions.assertTrue(matcher2.find());
Assertions.assertTrue(matcher2.groupCount() >= 2);
OptionRule optionRule = factory.optionRule();
Expand All @@ -169,11 +169,11 @@ private void checkStdOutForOptionRuleOfSinglePluginTypeWithTransform(
optionRule.getOptionalOptions().size(),
StringUtils.isBlank(optionalOptions)
? 0
: optionalOptions.split(StringUtils.LF).length);
: optionalOptions.trim().split(StringUtils.LF).length);
}

private void checkStdOutForOptionRuleOfSinglePluginTypeWithConnector(String stdout) {
Matcher matcher1 = pattern1.matcher(stdout.trim());
Matcher matcher1 = PATTERN1.matcher(stdout.trim());
Assertions.assertTrue(matcher1.find());
Assertions.assertTrue(matcher1.groupCount() >= 3);
String paimonPluginContent = matcher1.group(3).trim();
Expand All @@ -187,7 +187,7 @@ private void checkStdOutForOptionRuleOfSinglePluginTypeWithConnector(String stdo
}

private void checkOptionRuleOfSinglePluginType(Factory factory, String optionRules) {
Matcher matcher2 = pattern2.matcher(optionRules);
Matcher matcher2 = PATTERN2.matcher(optionRules);
Assertions.assertTrue(matcher2.find());
Assertions.assertTrue(matcher2.groupCount() >= 2);
String requiredOptions = matcher2.group(1).trim();
Expand All @@ -205,7 +205,7 @@ private void checkOptionRuleOfSinglePluginType(Factory factory, String optionRul
optionRule.getOptionalOptions().size(),
StringUtils.isBlank(optionalOptions)
? 0
: optionalOptions.split(StringUtils.LF).length);
: optionalOptions.trim().split(StringUtils.LF).length);
}

private void checkResultForCase1(Container.ExecResult execResult) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.seatunnel.e2e.transform;

import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
Expand All @@ -32,4 +34,15 @@ public void testCopy(TestContainer container) throws IOException, InterruptedExc
Container.ExecResult execResult = container.executeJob("/copy_transform.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}

@DisabledOnContainer(
value = {},
type = {EngineType.SPARK},
disabledReason = "Currently SPARK do not multi table transform")
@TestTemplate
public void testCopyMultiTable(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult = container.executeJob("/copy_transform_multi_table.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,18 @@ public void testEmbedding(TestContainer container) throws IOException, Interrupt
Assertions.assertEquals(0, execResult.getExitCode());
}

@DisabledOnContainer(
value = {},
type = {EngineType.SPARK},
disabledReason = "Currently SPARK do not multi table transform")
@TestTemplate
public void testEmbeddingMultiTable(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
container.executeJob("/embedding_transform_multi_table.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}

@TestTemplate
public void testEmbeddingWithCustomModel(TestContainer container)
throws IOException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.seatunnel.e2e.transform;

import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
Expand All @@ -32,4 +34,16 @@ public void testFilter(TestContainer container) throws IOException, InterruptedE
Container.ExecResult execResult = container.executeJob("/filter_transform.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}

@DisabledOnContainer(
value = {},
type = {EngineType.SPARK},
disabledReason = "Currently SPARK do not multi table transform")
@TestTemplate
public void testFilterMultiTable(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
container.executeJob("/filter_transform_multi_table.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.seatunnel.e2e.transform;

import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
Expand All @@ -40,4 +42,16 @@ public void testFilterRowKind(TestContainer container)
container.executeJob("/filter_row_kind_include_insert.conf");
Assertions.assertEquals(0, execResult3.getExitCode());
}

@DisabledOnContainer(
value = {},
type = {EngineType.SPARK},
disabledReason = "Currently SPARK do not multi table transform")
@TestTemplate
public void testFilterRowKindMultiTable(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
container.executeJob("/filter_row_kind_exclude_insert_multi_table.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.seatunnel.e2e.transform;

import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -88,6 +90,18 @@ public void testLLMWithOpenAI(TestContainer container)
Assertions.assertEquals(0, execResult.getExitCode());
}

@DisabledOnContainer(
value = {},
type = {EngineType.SPARK},
disabledReason = "Currently SPARK do not multi table transform")
@TestTemplate
public void testLLMWithOpenAIMultiTable(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
container.executeJob("/llm_openai_transform_multi_table.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}

@TestTemplate
public void testLLMWithMicrosoft(TestContainer container)
throws IOException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.seatunnel.e2e.transform;

import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
Expand All @@ -37,4 +39,16 @@ public void testRowKindExtractorTransform(TestContainer container)
container.executeJob("/rowkind_extractor_transform_case2.conf");
Assertions.assertEquals(0, execResult2.getExitCode());
}

@DisabledOnContainer(
value = {},
type = {EngineType.SPARK},
disabledReason = "Currently SPARK do not multi table transform")
@TestTemplate
public void testRowKindExtractorMultiTable(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
container.executeJob("/rowkind_extractor_transform_case1_multi_table.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.seatunnel.e2e.transform;

import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
Expand All @@ -32,4 +34,16 @@ public void testSplit(TestContainer container) throws IOException, InterruptedEx
Container.ExecResult execResult = container.executeJob("/split_transform.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}

@DisabledOnContainer(
value = {},
type = {EngineType.SPARK},
disabledReason = "Currently SPARK do not multi table transform")
@TestTemplate
public void testSplitMultiTable(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
container.executeJob("/rowkind_extractor_transform_case1_multi_table.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}
}
Loading

0 comments on commit 08ef91c

Please sign in to comment.