diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java index 9af2721e220..2b71980935b 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java @@ -21,9 +21,9 @@ import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.config.CheckConfigUtil; import org.apache.seatunnel.common.config.CheckResult; import org.apache.seatunnel.common.constants.PluginType; @@ -109,9 +109,9 @@ public void prepare(Config pluginConfig) throws PrepareFailException { case JSON: case EXCEL: case XML: - SeaTunnelRowType userDefinedSchema = - CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType(); - readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema); + CatalogTable userDefinedCatalogTable = + CatalogTableUtil.buildWithConfig(pluginConfig); + readStrategy.setCatalogTable(userDefinedCatalogTable); rowType = readStrategy.getActualSeaTunnelRowTypeInfo(); break; case ORC: diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java index 373ada564a8..10b969b0086 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java @@ -95,7 +95,7 @@ private CatalogTable parseCatalogTable(ReadonlyConfig readonlyConfig) { case JSON: case EXCEL: case XML: - readStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType()); + readStrategy.setCatalogTable(catalogTable); return newCatalogTable(catalogTable, readStrategy.getActualSeaTunnelRowTypeInfo()); case ORC: case PARQUET: diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java index 6686da98806..af6003c79ce 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java @@ -26,6 +26,8 @@ import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.sink.SinkAggregatedCommitter; import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; @@ -110,7 +112,9 @@ public void prepare(Config pluginConfig) throws PrepareFailException { protected WriteStrategy createWriteStrategy() { WriteStrategy writeStrategy = WriteStrategyFactory.of(fileSinkConfig.getFileFormat(), fileSinkConfig); - writeStrategy.setSeaTunnelRowTypeInfo(seaTunnelRowType); + writeStrategy.setCatalogTable( + CatalogTableUtil.getCatalogTable( + "file", null, null, TablePath.DEFAULT.getTableName(), seaTunnelRowType)); return writeStrategy; } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java index a48368be448..b35c113f8da 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java @@ -112,7 +112,7 @@ public Optional> getWriterStateSerializer() { protected WriteStrategy createWriteStrategy() { WriteStrategy writeStrategy = WriteStrategyFactory.of(fileSinkConfig.getFileFormat(), fileSinkConfig); - writeStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType()); + writeStrategy.setCatalogTable(catalogTable); return writeStrategy; } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java index 68476488a55..dd49c7f2d0c 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.file.sink.writer; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; @@ -150,11 +151,11 @@ public Configuration getConfiguration(HadoopConf hadoopConf) { /** * set seaTunnelRowTypeInfo in writer * - * @param seaTunnelRowType seaTunnelRowType + * @param catalogTable seaTunnelRowType */ @Override - public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) { - this.seaTunnelRowType = seaTunnelRowType; + public void setCatalogTable(CatalogTable catalogTable) { + this.seaTunnelRowType = catalogTable.getSeaTunnelRowType(); } /** diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/BinaryWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/BinaryWriteStrategy.java index 7f496b2927d..06d05d62505 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/BinaryWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/BinaryWriteStrategy.java @@ -17,8 +17,8 @@ package org.apache.seatunnel.connectors.seatunnel.file.sink.writer; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.exception.CommonError; import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode; @@ -46,9 +46,9 @@ public BinaryWriteStrategy(FileSinkConfig fileSinkConfig) { } @Override - public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) { - super.setSeaTunnelRowTypeInfo(seaTunnelRowType); - if (!seaTunnelRowType.equals(BinaryReadStrategy.binaryRowType)) { + public void setCatalogTable(CatalogTable catalogTable) { + super.setCatalogTable(catalogTable); + if (!catalogTable.getSeaTunnelRowType().equals(BinaryReadStrategy.binaryRowType)) { throw new FileConnectorException( FileConnectorErrorCode.FORMAT_NOT_SUPPORT, "BinaryWriteStrategy only supports binary format, please read file with `BINARY` format, and do not change schema in the transform."); diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java index f95973f4cfc..23fb7893a8f 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java @@ -18,8 +18,8 @@ package org.apache.seatunnel.connectors.seatunnel.file.sink.writer; import org.apache.seatunnel.api.serialization.SerializationSchema; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.exception.CommonError; import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; import org.apache.seatunnel.common.utils.EncodingUtils; @@ -55,11 +55,13 @@ public JsonWriteStrategy(FileSinkConfig textFileSinkConfig) { } @Override - public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) { - super.setSeaTunnelRowTypeInfo(seaTunnelRowType); + public void setCatalogTable(CatalogTable catalogTable) { + super.setCatalogTable(catalogTable); this.serializationSchema = new JsonSerializationSchema( - buildSchemaWithRowType(seaTunnelRowType, sinkColumnsIndexInRow), charset); + buildSchemaWithRowType( + catalogTable.getSeaTunnelRowType(), sinkColumnsIndexInRow), + charset); } @Override diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java index 621048fb39a..77e2eb5c5b0 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java @@ -18,8 +18,8 @@ package org.apache.seatunnel.connectors.seatunnel.file.sink.writer; import org.apache.seatunnel.api.serialization.SerializationSchema; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.exception.CommonError; import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; import org.apache.seatunnel.common.utils.DateTimeUtils; @@ -71,12 +71,13 @@ public TextWriteStrategy(FileSinkConfig fileSinkConfig) { } @Override - public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) { - super.setSeaTunnelRowTypeInfo(seaTunnelRowType); + public void setCatalogTable(CatalogTable catalogTable) { + super.setCatalogTable(catalogTable); this.serializationSchema = TextSerializationSchema.builder() .seaTunnelRowType( - buildSchemaWithRowType(seaTunnelRowType, sinkColumnsIndexInRow)) + buildSchemaWithRowType( + catalogTable.getSeaTunnelRowType(), sinkColumnsIndexInRow)) .delimiter(fieldDelimiter) .dateFormatter(dateFormat) .dateTimeFormatter(dateTimeFormat) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java index 6a1b1840b4d..24b23c9bfc3 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java @@ -17,8 +17,8 @@ package org.apache.seatunnel.connectors.seatunnel.file.sink.writer; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy; @@ -56,11 +56,11 @@ public interface WriteStrategy extends Transaction, Serializable, Closeable { void write(SeaTunnelRow seaTunnelRow) throws FileConnectorException; /** - * set seaTunnelRowTypeInfo in writer + * set catalog table to write strategy * - * @param seaTunnelRowType seaTunnelRowType + * @param catalogTable catalogTable */ - void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType); + void setCatalogTable(CatalogTable catalogTable); /** * use seaTunnelRow generate partition directory diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java index 3e71a3b2932..00d90d84195 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java @@ -20,6 +20,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.BasicType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; @@ -92,10 +93,10 @@ public void init(HadoopConf conf) { } @Override - public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) { - this.seaTunnelRowType = seaTunnelRowType; + public void setCatalogTable(CatalogTable catalogTable) { + this.seaTunnelRowType = catalogTable.getSeaTunnelRowType(); this.seaTunnelRowTypeWithPartition = - mergePartitionTypes(fileNames.get(0), seaTunnelRowType); + mergePartitionTypes(fileNames.get(0), catalogTable.getSeaTunnelRowType()); } boolean checkFileType(String path) { diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java index c90b6d6659b..d7dfe206ab5 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java @@ -21,6 +21,7 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; @@ -145,15 +146,15 @@ protected void readProcess( } @Override - public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) { - if (isNullOrEmpty(seaTunnelRowType.getFieldNames()) - || isNullOrEmpty(seaTunnelRowType.getFieldTypes())) { + public void setCatalogTable(CatalogTable catalogTable) { + SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType(); + if (isNullOrEmpty(rowType.getFieldNames()) || isNullOrEmpty(rowType.getFieldTypes())) { throw new FileConnectorException( CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION, "Schema information is not set or incorrect Schema settings"); } SeaTunnelRowType userDefinedRowTypeWithPartition = - mergePartitionTypes(fileNames.get(0), seaTunnelRowType); + mergePartitionTypes(fileNames.get(0), rowType); // column projection if (pluginConfig.hasPath(BaseSourceConfigOptions.READ_COLUMNS.key())) { // get the read column index from user-defined row type @@ -161,15 +162,15 @@ public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) { String[] fields = new String[readColumns.size()]; SeaTunnelDataType[] types = new SeaTunnelDataType[readColumns.size()]; for (int i = 0; i < indexes.length; i++) { - indexes[i] = seaTunnelRowType.indexOf(readColumns.get(i)); - fields[i] = seaTunnelRowType.getFieldName(indexes[i]); - types[i] = seaTunnelRowType.getFieldType(indexes[i]); + indexes[i] = rowType.indexOf(readColumns.get(i)); + fields[i] = rowType.getFieldName(indexes[i]); + types[i] = rowType.getFieldType(indexes[i]); } this.seaTunnelRowType = new SeaTunnelRowType(fields, types); this.seaTunnelRowTypeWithPartition = mergePartitionTypes(fileNames.get(0), this.seaTunnelRowType); } else { - this.seaTunnelRowType = seaTunnelRowType; + this.seaTunnelRowType = rowType; this.seaTunnelRowTypeWithPartition = userDefinedRowTypeWithPartition; } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java index 982419266f5..dfd57363d9d 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java @@ -20,6 +20,7 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.serialization.DeserializationSchema; import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; @@ -62,8 +63,8 @@ public void init(HadoopConf conf) { } @Override - public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) { - super.setSeaTunnelRowTypeInfo(seaTunnelRowType); + public void setCatalogTable(CatalogTable catalogTable) { + super.setCatalogTable(catalogTable); if (isMergePartition) { deserializationSchema = new JsonDeserializationSchema(false, false, this.seaTunnelRowTypeWithPartition); diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java index c5bdf281244..9389223814a 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java @@ -20,6 +20,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; @@ -56,8 +57,7 @@ default SeaTunnelRowType getSeaTunnelRowTypeInfoWithUserConfigRowType( return getSeaTunnelRowTypeInfo(path); } - // todo: use CatalogTable - void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType); + void setCatalogTable(CatalogTable catalogTable); List getFileNamesByPath(String path) throws IOException; diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java index 2b722593770..1a7a7398a4f 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java @@ -21,6 +21,7 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.serialization.DeserializationSchema; import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; @@ -170,9 +171,10 @@ public SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) { } @Override - public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) { + public void setCatalogTable(CatalogTable catalogTable) { + SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType(); SeaTunnelRowType userDefinedRowTypeWithPartition = - mergePartitionTypes(fileNames.get(0), seaTunnelRowType); + mergePartitionTypes(fileNames.get(0), rowType); Optional fieldDelimiterOptional = ReadonlyConfig.fromConfig(pluginConfig) .getOptional(BaseSourceConfigOptions.FIELD_DELIMITER); @@ -201,7 +203,7 @@ public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) { deserializationSchema = builder.seaTunnelRowType(userDefinedRowTypeWithPartition).build(); } else { - deserializationSchema = builder.seaTunnelRowType(seaTunnelRowType).build(); + deserializationSchema = builder.seaTunnelRowType(rowType).build(); } // column projection if (pluginConfig.hasPath(BaseSourceConfigOptions.READ_COLUMNS.key())) { @@ -210,15 +212,15 @@ public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) { String[] fields = new String[readColumns.size()]; SeaTunnelDataType[] types = new SeaTunnelDataType[readColumns.size()]; for (int i = 0; i < indexes.length; i++) { - indexes[i] = seaTunnelRowType.indexOf(readColumns.get(i)); - fields[i] = seaTunnelRowType.getFieldName(indexes[i]); - types[i] = seaTunnelRowType.getFieldType(indexes[i]); + indexes[i] = rowType.indexOf(readColumns.get(i)); + fields[i] = rowType.getFieldName(indexes[i]); + types[i] = rowType.getFieldType(indexes[i]); } this.seaTunnelRowType = new SeaTunnelRowType(fields, types); this.seaTunnelRowTypeWithPartition = mergePartitionTypes(fileNames.get(0), this.seaTunnelRowType); } else { - this.seaTunnelRowType = seaTunnelRowType; + this.seaTunnelRowType = rowType; this.seaTunnelRowTypeWithPartition = userDefinedRowTypeWithPartition; } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategy.java index a553a4f9d06..e012c46bdf5 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategy.java @@ -23,6 +23,7 @@ import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; @@ -173,20 +174,20 @@ public SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) throws FileConnecto } @Override - public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) { - if (ArrayUtils.isEmpty(seaTunnelRowType.getFieldNames()) - || ArrayUtils.isEmpty(seaTunnelRowType.getFieldTypes())) { + public void setCatalogTable(CatalogTable catalogTable) { + SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType(); + if (ArrayUtils.isEmpty(rowType.getFieldNames()) + || ArrayUtils.isEmpty(rowType.getFieldTypes())) { throw new FileConnectorException( CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, "Schema information is undefined or misconfigured, please check your configuration file."); } if (readColumns.isEmpty()) { - this.seaTunnelRowType = seaTunnelRowType; - this.seaTunnelRowTypeWithPartition = - mergePartitionTypes(fileNames.get(0), seaTunnelRowType); + this.seaTunnelRowType = rowType; + this.seaTunnelRowTypeWithPartition = mergePartitionTypes(fileNames.get(0), rowType); } else { - if (readColumns.retainAll(Arrays.asList(seaTunnelRowType.getFieldNames()))) { + if (readColumns.retainAll(Arrays.asList(rowType.getFieldNames()))) { log.warn( "The read columns configuration will be filtered by the schema configuration, this may cause the actual results to be inconsistent with expectations. This is due to read columns not being a subset of the schema, " + "maybe you should check the schema and read_columns!"); @@ -195,9 +196,9 @@ public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) { String[] fields = new String[readColumns.size()]; SeaTunnelDataType[] types = new SeaTunnelDataType[readColumns.size()]; for (int i = 0; i < readColumns.size(); i++) { - indexes[i] = seaTunnelRowType.indexOf(readColumns.get(i)); - fields[i] = seaTunnelRowType.getFieldName(indexes[i]); - types[i] = seaTunnelRowType.getFieldType(indexes[i]); + indexes[i] = rowType.indexOf(readColumns.get(i)); + fields[i] = rowType.getFieldName(indexes[i]); + types[i] = rowType.getFieldType(indexes[i]); } this.seaTunnelRowType = new SeaTunnelRowType(fields, types); this.seaTunnelRowTypeWithPartition = diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ExcelReadStrategyTest.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ExcelReadStrategyTest.java index 8aa43a03bdc..149ee7648d5 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ExcelReadStrategyTest.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ExcelReadStrategyTest.java @@ -21,9 +21,9 @@ import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.utils.DateTimeUtils; import org.apache.seatunnel.common.utils.DateUtils; import org.apache.seatunnel.common.utils.TimeUtils; @@ -72,9 +72,8 @@ private void testExcelRead(String filePath) throws IOException, URISyntaxExcepti excelReadStrategy.init(localConf); List fileNamesByPath = excelReadStrategy.getFileNamesByPath(excelFilePath); - SeaTunnelRowType userDefinedSchema = - CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType(); - excelReadStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema); + CatalogTable userDefinedCatalogTable = CatalogTableUtil.buildWithConfig(pluginConfig); + excelReadStrategy.setCatalogTable(userDefinedCatalogTable); TestCollector testCollector = new TestCollector(); excelReadStrategy.read(fileNamesByPath.get(0), "", testCollector); for (SeaTunnelRow seaTunnelRow : testCollector.getRows()) { diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetWriteStrategyTest.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetWriteStrategyTest.java index 236d6f5a037..e692d7294b7 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetWriteStrategyTest.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetWriteStrategyTest.java @@ -20,6 +20,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; import org.apache.seatunnel.api.table.type.BasicType; import org.apache.seatunnel.api.table.type.LocalTimeType; import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; @@ -82,7 +83,8 @@ public void testParquetWriteInt96() throws Exception { ParquetWriteStrategy writeStrategy = new ParquetWriteStrategy(writeSinkConfig); ParquetReadStrategyTest.LocalConf hadoopConf = new ParquetReadStrategyTest.LocalConf(FS_DEFAULT_NAME_DEFAULT); - writeStrategy.setSeaTunnelRowTypeInfo(writeRowType); + writeStrategy.setCatalogTable( + CatalogTableUtil.getCatalogTable("test", null, null, "test", writeRowType)); writeStrategy.init(hadoopConf, "test1", "test1", 0); writeStrategy.beginTransaction(1L); writeStrategy.write( diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ReadStrategyEncodingTest.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ReadStrategyEncodingTest.java index 736ae590963..ad23dd0186f 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ReadStrategyEncodingTest.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ReadStrategyEncodingTest.java @@ -21,9 +21,9 @@ import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; import org.apache.seatunnel.connectors.seatunnel.file.source.reader.AbstractReadStrategy; import org.apache.seatunnel.connectors.seatunnel.file.source.reader.JsonReadStrategy; @@ -121,11 +121,10 @@ private static void testRead( readStrategy.init(localConf); readStrategy.getFileNamesByPath(sourceFilePath); testCollector = new TestCollector(); - SeaTunnelRowType seaTunnelRowTypeInfo = - CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType(); - Assertions.assertNotNull(seaTunnelRowTypeInfo); - readStrategy.setSeaTunnelRowTypeInfo(seaTunnelRowTypeInfo); - log.info(seaTunnelRowTypeInfo.toString()); + CatalogTable catalogTable = CatalogTableUtil.buildWithConfig(pluginConfig); + Assertions.assertNotNull(catalogTable.getSeaTunnelRowType()); + readStrategy.setCatalogTable(catalogTable); + log.info(catalogTable.getSeaTunnelRowType().toString()); readStrategy.read(sourceFilePath, "", testCollector); assertRows(testCollector); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/XmlReadStrategyTest.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/XmlReadStrategyTest.java index 8bb2e483896..fca8f68fd2c 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/XmlReadStrategyTest.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/XmlReadStrategyTest.java @@ -21,9 +21,9 @@ import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.utils.DateTimeUtils; import org.apache.seatunnel.common.utils.DateUtils; import org.apache.seatunnel.common.utils.TimeUtils; @@ -66,9 +66,8 @@ public void testXmlRead() throws IOException, URISyntaxException { xmlReadStrategy.setPluginConfig(pluginConfig); xmlReadStrategy.init(localConf); List fileNamesByPath = xmlReadStrategy.getFileNamesByPath(xmlFilePath); - SeaTunnelRowType userDefinedSchema = - CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType(); - xmlReadStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema); + CatalogTable catalogTable = CatalogTableUtil.buildWithConfig(pluginConfig); + xmlReadStrategy.setCatalogTable(catalogTable); TestCollector testCollector = new TestCollector(); xmlReadStrategy.read(fileNamesByPath.get(0), "", testCollector); for (SeaTunnelRow seaTunnelRow : testCollector.getRows()) { diff --git a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java index 0690b2acebb..bd8df0261cb 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java @@ -22,9 +22,9 @@ import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.config.CheckConfigUtil; import org.apache.seatunnel.common.config.CheckResult; import org.apache.seatunnel.common.constants.PluginType; @@ -95,9 +95,9 @@ public void prepare(Config pluginConfig) throws PrepareFailException { case JSON: case EXCEL: case XML: - SeaTunnelRowType userDefinedSchema = - CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType(); - readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema); + CatalogTable userDefinedCatalogTable = + CatalogTableUtil.buildWithConfig(pluginConfig); + readStrategy.setCatalogTable(userDefinedCatalogTable); rowType = readStrategy.getActualSeaTunnelRowTypeInfo(); break; case ORC: diff --git a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/source/OssFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/source/OssFileSource.java index 335e3967808..ed9807729f1 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/source/OssFileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/source/OssFileSource.java @@ -22,9 +22,9 @@ import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.config.CheckConfigUtil; import org.apache.seatunnel.common.config.CheckResult; import org.apache.seatunnel.common.constants.PluginType; @@ -96,9 +96,9 @@ public void prepare(Config pluginConfig) throws PrepareFailException { case JSON: case EXCEL: case XML: - SeaTunnelRowType userDefinedSchema = - CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType(); - readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema); + CatalogTable userDefinedCatalogTable = + CatalogTableUtil.buildWithConfig(pluginConfig); + readStrategy.setCatalogTable(userDefinedCatalogTable); rowType = readStrategy.getActualSeaTunnelRowTypeInfo(); break; case ORC: diff --git a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSource.java index cf3061a44a3..8d2ae3d90bb 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSource.java @@ -22,9 +22,9 @@ import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.config.CheckConfigUtil; import org.apache.seatunnel.common.config.CheckResult; import org.apache.seatunnel.common.constants.PluginType; @@ -91,9 +91,9 @@ public void prepare(Config pluginConfig) throws PrepareFailException { case TEXT: case JSON: case EXCEL: - SeaTunnelRowType userDefinedSchema = - CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType(); - readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema); + CatalogTable userDefinedCatalogTable = + CatalogTableUtil.buildWithConfig(pluginConfig); + readStrategy.setCatalogTable(userDefinedCatalogTable); rowType = readStrategy.getActualSeaTunnelRowTypeInfo(); break; case ORC: diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java index 997c42f9faf..6e91baf0013 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java @@ -240,7 +240,7 @@ private Table getTableInformation() { private WriteStrategy getWriteStrategy() { if (writeStrategy == null) { writeStrategy = WriteStrategyFactory.of(fileSinkConfig.getFileFormat(), fileSinkConfig); - writeStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType()); + writeStrategy.setCatalogTable(catalogTable); } return writeStrategy; } diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java index e98143fcf0e..eba9b5a15be 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java @@ -279,7 +279,9 @@ private CatalogTable parseCatalogTableFromTable( } SeaTunnelRowType seaTunnelRowType = new SeaTunnelRowType(fieldNames, fieldTypes); - readStrategy.setSeaTunnelRowTypeInfo(seaTunnelRowType); + readStrategy.setCatalogTable( + CatalogTableUtil.getCatalogTable( + "hive", table.getDbName(), null, table.getTableName(), seaTunnelRowType)); final SeaTunnelRowType finalSeatunnelRowType = readStrategy.getActualSeaTunnelRowTypeInfo(); CatalogTable catalogTable = buildEmptyCatalogTable(readonlyConfig, table);