Skip to content

Commit

Permalink
[Improve][Connector-V2] Change File Read/WriteStrategy `setSeaTunnelR…
Browse files Browse the repository at this point in the history
…owTypeInfo` to `setCatalogTable` (#7829)
  • Loading branch information
Hisoka-X authored Oct 16, 2024
1 parent 418759d commit 6b5f74e
Show file tree
Hide file tree
Showing 24 changed files with 102 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
Expand Down Expand Up @@ -109,9 +109,9 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
case JSON:
case EXCEL:
case XML:
SeaTunnelRowType userDefinedSchema =
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema);
CatalogTable userDefinedCatalogTable =
CatalogTableUtil.buildWithConfig(pluginConfig);
readStrategy.setCatalogTable(userDefinedCatalogTable);
rowType = readStrategy.getActualSeaTunnelRowTypeInfo();
break;
case ORC:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private CatalogTable parseCatalogTable(ReadonlyConfig readonlyConfig) {
case JSON:
case EXCEL:
case XML:
readStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
readStrategy.setCatalogTable(catalogTable);
return newCatalogTable(catalogTable, readStrategy.getActualSeaTunnelRowTypeInfo());
case ORC:
case PARQUET:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
Expand Down Expand Up @@ -110,7 +112,9 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
protected WriteStrategy createWriteStrategy() {
WriteStrategy writeStrategy =
WriteStrategyFactory.of(fileSinkConfig.getFileFormat(), fileSinkConfig);
writeStrategy.setSeaTunnelRowTypeInfo(seaTunnelRowType);
writeStrategy.setCatalogTable(
CatalogTableUtil.getCatalogTable(
"file", null, null, TablePath.DEFAULT.getTableName(), seaTunnelRowType));
return writeStrategy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public Optional<Serializer<FileSinkState>> getWriterStateSerializer() {
protected WriteStrategy createWriteStrategy() {
WriteStrategy writeStrategy =
WriteStrategyFactory.of(fileSinkConfig.getFileFormat(), fileSinkConfig);
writeStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
writeStrategy.setCatalogTable(catalogTable);
return writeStrategy;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;

import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
Expand Down Expand Up @@ -150,11 +151,11 @@ public Configuration getConfiguration(HadoopConf hadoopConf) {
/**
* set seaTunnelRowTypeInfo in writer
*
* @param seaTunnelRowType seaTunnelRowType
* @param catalogTable seaTunnelRowType
*/
@Override
public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
this.seaTunnelRowType = seaTunnelRowType;
public void setCatalogTable(CatalogTable catalogTable) {
this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;

import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
Expand Down Expand Up @@ -46,9 +46,9 @@ public BinaryWriteStrategy(FileSinkConfig fileSinkConfig) {
}

@Override
public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
super.setSeaTunnelRowTypeInfo(seaTunnelRowType);
if (!seaTunnelRowType.equals(BinaryReadStrategy.binaryRowType)) {
public void setCatalogTable(CatalogTable catalogTable) {
super.setCatalogTable(catalogTable);
if (!catalogTable.getSeaTunnelRowType().equals(BinaryReadStrategy.binaryRowType)) {
throw new FileConnectorException(
FileConnectorErrorCode.FORMAT_NOT_SUPPORT,
"BinaryWriteStrategy only supports binary format, please read file with `BINARY` format, and do not change schema in the transform.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;

import org.apache.seatunnel.api.serialization.SerializationSchema;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.utils.EncodingUtils;
Expand Down Expand Up @@ -55,11 +55,13 @@ public JsonWriteStrategy(FileSinkConfig textFileSinkConfig) {
}

@Override
public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
super.setSeaTunnelRowTypeInfo(seaTunnelRowType);
public void setCatalogTable(CatalogTable catalogTable) {
super.setCatalogTable(catalogTable);
this.serializationSchema =
new JsonSerializationSchema(
buildSchemaWithRowType(seaTunnelRowType, sinkColumnsIndexInRow), charset);
buildSchemaWithRowType(
catalogTable.getSeaTunnelRowType(), sinkColumnsIndexInRow),
charset);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;

import org.apache.seatunnel.api.serialization.SerializationSchema;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.utils.DateTimeUtils;
Expand Down Expand Up @@ -71,12 +71,13 @@ public TextWriteStrategy(FileSinkConfig fileSinkConfig) {
}

@Override
public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
super.setSeaTunnelRowTypeInfo(seaTunnelRowType);
public void setCatalogTable(CatalogTable catalogTable) {
super.setCatalogTable(catalogTable);
this.serializationSchema =
TextSerializationSchema.builder()
.seaTunnelRowType(
buildSchemaWithRowType(seaTunnelRowType, sinkColumnsIndexInRow))
buildSchemaWithRowType(
catalogTable.getSeaTunnelRowType(), sinkColumnsIndexInRow))
.delimiter(fieldDelimiter)
.dateFormatter(dateFormat)
.dateTimeFormatter(dateTimeFormat)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;

import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
Expand Down Expand Up @@ -56,11 +56,11 @@ public interface WriteStrategy extends Transaction, Serializable, Closeable {
void write(SeaTunnelRow seaTunnelRow) throws FileConnectorException;

/**
* set seaTunnelRowTypeInfo in writer
* set catalog table to write strategy
*
* @param seaTunnelRowType seaTunnelRowType
* @param catalogTable catalogTable
*/
void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType);
void setCatalogTable(CatalogTable catalogTable);

/**
* use seaTunnelRow generate partition directory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
Expand Down Expand Up @@ -92,10 +93,10 @@ public void init(HadoopConf conf) {
}

@Override
public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
this.seaTunnelRowType = seaTunnelRowType;
public void setCatalogTable(CatalogTable catalogTable) {
this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
this.seaTunnelRowTypeWithPartition =
mergePartitionTypes(fileNames.get(0), seaTunnelRowType);
mergePartitionTypes(fileNames.get(0), catalogTable.getSeaTunnelRowType());
}

boolean checkFileType(String path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
Expand Down Expand Up @@ -145,31 +146,31 @@ protected void readProcess(
}

@Override
public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
if (isNullOrEmpty(seaTunnelRowType.getFieldNames())
|| isNullOrEmpty(seaTunnelRowType.getFieldTypes())) {
public void setCatalogTable(CatalogTable catalogTable) {
SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
if (isNullOrEmpty(rowType.getFieldNames()) || isNullOrEmpty(rowType.getFieldTypes())) {
throw new FileConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
"Schema information is not set or incorrect Schema settings");
}
SeaTunnelRowType userDefinedRowTypeWithPartition =
mergePartitionTypes(fileNames.get(0), seaTunnelRowType);
mergePartitionTypes(fileNames.get(0), rowType);
// column projection
if (pluginConfig.hasPath(BaseSourceConfigOptions.READ_COLUMNS.key())) {
// get the read column index from user-defined row type
indexes = new int[readColumns.size()];
String[] fields = new String[readColumns.size()];
SeaTunnelDataType<?>[] types = new SeaTunnelDataType[readColumns.size()];
for (int i = 0; i < indexes.length; i++) {
indexes[i] = seaTunnelRowType.indexOf(readColumns.get(i));
fields[i] = seaTunnelRowType.getFieldName(indexes[i]);
types[i] = seaTunnelRowType.getFieldType(indexes[i]);
indexes[i] = rowType.indexOf(readColumns.get(i));
fields[i] = rowType.getFieldName(indexes[i]);
types[i] = rowType.getFieldType(indexes[i]);
}
this.seaTunnelRowType = new SeaTunnelRowType(fields, types);
this.seaTunnelRowTypeWithPartition =
mergePartitionTypes(fileNames.get(0), this.seaTunnelRowType);
} else {
this.seaTunnelRowType = seaTunnelRowType;
this.seaTunnelRowType = rowType;
this.seaTunnelRowTypeWithPartition = userDefinedRowTypeWithPartition;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
Expand Down Expand Up @@ -62,8 +63,8 @@ public void init(HadoopConf conf) {
}

@Override
public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
super.setSeaTunnelRowTypeInfo(seaTunnelRowType);
public void setCatalogTable(CatalogTable catalogTable) {
super.setCatalogTable(catalogTable);
if (isMergePartition) {
deserializationSchema =
new JsonDeserializationSchema(false, false, this.seaTunnelRowTypeWithPartition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
Expand Down Expand Up @@ -56,8 +57,7 @@ default SeaTunnelRowType getSeaTunnelRowTypeInfoWithUserConfigRowType(
return getSeaTunnelRowTypeInfo(path);
}

// todo: use CatalogTable
void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType);
void setCatalogTable(CatalogTable catalogTable);

List<String> getFileNamesByPath(String path) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
Expand Down Expand Up @@ -170,9 +171,10 @@ public SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) {
}

@Override
public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
public void setCatalogTable(CatalogTable catalogTable) {
SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
SeaTunnelRowType userDefinedRowTypeWithPartition =
mergePartitionTypes(fileNames.get(0), seaTunnelRowType);
mergePartitionTypes(fileNames.get(0), rowType);
Optional<String> fieldDelimiterOptional =
ReadonlyConfig.fromConfig(pluginConfig)
.getOptional(BaseSourceConfigOptions.FIELD_DELIMITER);
Expand Down Expand Up @@ -201,7 +203,7 @@ public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
deserializationSchema =
builder.seaTunnelRowType(userDefinedRowTypeWithPartition).build();
} else {
deserializationSchema = builder.seaTunnelRowType(seaTunnelRowType).build();
deserializationSchema = builder.seaTunnelRowType(rowType).build();
}
// column projection
if (pluginConfig.hasPath(BaseSourceConfigOptions.READ_COLUMNS.key())) {
Expand All @@ -210,15 +212,15 @@ public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
String[] fields = new String[readColumns.size()];
SeaTunnelDataType<?>[] types = new SeaTunnelDataType[readColumns.size()];
for (int i = 0; i < indexes.length; i++) {
indexes[i] = seaTunnelRowType.indexOf(readColumns.get(i));
fields[i] = seaTunnelRowType.getFieldName(indexes[i]);
types[i] = seaTunnelRowType.getFieldType(indexes[i]);
indexes[i] = rowType.indexOf(readColumns.get(i));
fields[i] = rowType.getFieldName(indexes[i]);
types[i] = rowType.getFieldType(indexes[i]);
}
this.seaTunnelRowType = new SeaTunnelRowType(fields, types);
this.seaTunnelRowTypeWithPartition =
mergePartitionTypes(fileNames.get(0), this.seaTunnelRowType);
} else {
this.seaTunnelRowType = seaTunnelRowType;
this.seaTunnelRowType = rowType;
this.seaTunnelRowTypeWithPartition = userDefinedRowTypeWithPartition;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
Expand Down Expand Up @@ -173,20 +174,20 @@ public SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) throws FileConnecto
}

@Override
public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
if (ArrayUtils.isEmpty(seaTunnelRowType.getFieldNames())
|| ArrayUtils.isEmpty(seaTunnelRowType.getFieldTypes())) {
public void setCatalogTable(CatalogTable catalogTable) {
SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
if (ArrayUtils.isEmpty(rowType.getFieldNames())
|| ArrayUtils.isEmpty(rowType.getFieldTypes())) {
throw new FileConnectorException(
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
"Schema information is undefined or misconfigured, please check your configuration file.");
}

if (readColumns.isEmpty()) {
this.seaTunnelRowType = seaTunnelRowType;
this.seaTunnelRowTypeWithPartition =
mergePartitionTypes(fileNames.get(0), seaTunnelRowType);
this.seaTunnelRowType = rowType;
this.seaTunnelRowTypeWithPartition = mergePartitionTypes(fileNames.get(0), rowType);
} else {
if (readColumns.retainAll(Arrays.asList(seaTunnelRowType.getFieldNames()))) {
if (readColumns.retainAll(Arrays.asList(rowType.getFieldNames()))) {
log.warn(
"The read columns configuration will be filtered by the schema configuration, this may cause the actual results to be inconsistent with expectations. This is due to read columns not being a subset of the schema, "
+ "maybe you should check the schema and read_columns!");
Expand All @@ -195,9 +196,9 @@ public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
String[] fields = new String[readColumns.size()];
SeaTunnelDataType<?>[] types = new SeaTunnelDataType[readColumns.size()];
for (int i = 0; i < readColumns.size(); i++) {
indexes[i] = seaTunnelRowType.indexOf(readColumns.get(i));
fields[i] = seaTunnelRowType.getFieldName(indexes[i]);
types[i] = seaTunnelRowType.getFieldType(indexes[i]);
indexes[i] = rowType.indexOf(readColumns.get(i));
fields[i] = rowType.getFieldName(indexes[i]);
types[i] = rowType.getFieldType(indexes[i]);
}
this.seaTunnelRowType = new SeaTunnelRowType(fields, types);
this.seaTunnelRowTypeWithPartition =
Expand Down
Loading

0 comments on commit 6b5f74e

Please sign in to comment.