diff --git a/chunjun-connectors/chunjun-connector-s3/pom.xml b/chunjun-connectors/chunjun-connector-s3/pom.xml
index 51faa129a1..5f62bdf3bc 100644
--- a/chunjun-connectors/chunjun-connector-s3/pom.xml
+++ b/chunjun-connectors/chunjun-connector-s3/pom.xml
@@ -68,6 +68,18 @@
1.11-8
test
+
+ com.dtstack.chunjun
+ chunjun-format-tika
+ ${project.version}
+ provided
+
+
+ com.dtstack.chunjun
+ chunjun-format-excel
+ ${project.version}
+ provided
+
diff --git a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/config/S3Config.java b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/config/S3Config.java
index 2fe73532da..2d71c4e59b 100644
--- a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/config/S3Config.java
+++ b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/config/S3Config.java
@@ -19,6 +19,8 @@
package com.dtstack.chunjun.connector.s3.config;
import com.dtstack.chunjun.config.CommonConfig;
+import com.dtstack.chunjun.format.excel.config.ExcelFormatConfig;
+import com.dtstack.chunjun.format.tika.config.TikaReadConfig;
import com.amazonaws.regions.Regions;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@@ -86,4 +88,23 @@ public class S3Config extends CommonConfig implements Serializable {
/** 生成的文件名后缀 */
private String suffix;
+
+ /** 对象匹配规则 */
+ private String objectsRegex;
+
+ /** 是否使用文本限定符 */
+ private boolean useTextQualifier = true;
+
+ /** 是否开启每条记录生成一个对应的文件 */
+ private boolean enableWriteSingleRecordAsFile = false;
+
+ /** 保留原始文件名 */
+ private boolean keepOriginalFilename = false;
+
+ /** 禁用 Bucket 名称注入到 endpoint 前缀 */
+ private boolean disableBucketNameInEndpoint = false;
+
+ private TikaReadConfig tikaReadConfig = new TikaReadConfig();
+
+ private ExcelFormatConfig excelFormatConfig = new ExcelFormatConfig();
}
diff --git a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/sink/S3DynamicTableSink.java b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/sink/S3DynamicTableSink.java
index e50a2e1125..bea54bc953 100644
--- a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/sink/S3DynamicTableSink.java
+++ b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/sink/S3DynamicTableSink.java
@@ -84,6 +84,6 @@ public DynamicTableSink copy() {
@Override
public String asSummaryString() {
- return "StreamDynamicTableSink";
+ return S3DynamicTableSink.class.getName();
}
}
diff --git a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/sink/S3OutputFormat.java b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/sink/S3OutputFormat.java
index 7abd7d4c87..94b8609a76 100644
--- a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/sink/S3OutputFormat.java
+++ b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/sink/S3OutputFormat.java
@@ -27,6 +27,7 @@
import com.dtstack.chunjun.sink.format.BaseRichOutputFormat;
import com.dtstack.chunjun.throwable.ChunJunRuntimeException;
import com.dtstack.chunjun.throwable.WriteRecordException;
+import com.dtstack.chunjun.util.GsonUtil;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.data.RowData;
@@ -34,15 +35,20 @@
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.PartETag;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import java.io.StringWriter;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
+import java.util.UUID;
import java.util.stream.Collectors;
+import static com.dtstack.chunjun.format.tika.config.TikaReadConfig.ORIGINAL_FILENAME;
+
/** The OutputFormat Implementation which write data to Amazon S3. */
@Slf4j
public class S3OutputFormat extends BaseRichOutputFormat {
@@ -137,7 +143,8 @@ private void checkOutputDir() {
amazonS3,
s3Config.getBucket(),
s3Config.getObject(),
- s3Config.getFetchSize());
+ s3Config.getFetchSize(),
+ s3Config.getObjectsRegex());
} else {
subObjects =
S3Util.listObjectsByv1(
@@ -166,11 +173,17 @@ private void nextBlock() {
sw = new StringWriter();
}
this.writerUtil = new WriterUtil(sw, s3Config.getFieldDelimiter());
+ if (!s3Config.isUseTextQualifier()) {
+ writerUtil.setUseTextQualifier(false);
+ }
this.currentPartNumber = this.currentPartNumber + 1;
}
/** Create file multipart upload ID */
private void createActionFinishedTag() {
+ if (s3Config.isEnableWriteSingleRecordAsFile()) {
+ return;
+ }
if (!StringUtils.isNotBlank(currentUploadId)) {
this.currentUploadId =
S3Util.initiateMultipartUploadAndGetId(
@@ -193,8 +206,11 @@ private void beforeWriteRecords() {
}
protected void flushDataInternal() {
+ if (sw == null) {
+ return;
+ }
StringBuffer sb = sw.getBuffer();
- if (sb.length() > MIN_SIZE || willClose) {
+ if (sb.length() > MIN_SIZE || willClose || s3Config.isEnableWriteSingleRecordAsFile()) {
byte[] byteArray;
try {
byteArray = sb.toString().getBytes(s3Config.getEncoding());
@@ -202,17 +218,23 @@ protected void flushDataInternal() {
throw new ChunJunRuntimeException(e);
}
log.info("Upload part size:" + byteArray.length);
- PartETag partETag =
- S3Util.uploadPart(
- amazonS3,
- s3Config.getBucket(),
- s3Config.getObject(),
- this.currentUploadId,
- this.currentPartNumber,
- byteArray);
-
- MyPartETag myPartETag = new MyPartETag(partETag);
- myPartETags.add(myPartETag);
+
+ if (s3Config.isEnableWriteSingleRecordAsFile()) {
+ S3Util.putStringObject(
+ amazonS3, s3Config.getBucket(), s3Config.getObject(), sb.toString());
+ } else {
+ PartETag partETag =
+ S3Util.uploadPart(
+ amazonS3,
+ s3Config.getBucket(),
+ s3Config.getObject(),
+ this.currentUploadId,
+ this.currentPartNumber,
+ byteArray);
+
+ MyPartETag myPartETag = new MyPartETag(partETag);
+ myPartETags.add(myPartETag);
+ }
log.debug(
"task-{} upload etag:[{}]",
@@ -225,6 +247,9 @@ protected void flushDataInternal() {
}
private void completeMultipartUploadFile() {
+ if (s3Config.isEnableWriteSingleRecordAsFile()) {
+ return;
+ }
if (this.currentPartNumber > 10000) {
throw new IllegalArgumentException("part can not bigger than 10000");
}
@@ -282,7 +307,11 @@ protected void writeSingleRecordInternal(RowData rowData) throws WriteRecordExce
// convert row to string
stringRecord = (String[]) rowConverter.toExternal(rowData, stringRecord);
try {
- for (int i = 0; i < columnNameList.size(); ++i) {
+ int columnSize = columnNameList.size();
+ if (s3Config.isEnableWriteSingleRecordAsFile()) {
+ columnSize = 1;
+ }
+ for (int i = 0; i < columnSize; ++i) {
String column = stringRecord[i];
@@ -292,6 +321,25 @@ protected void writeSingleRecordInternal(RowData rowData) throws WriteRecordExce
writerUtil.write(column);
}
writerUtil.endRecord();
+
+ if (s3Config.isEnableWriteSingleRecordAsFile()) {
+ Map metadataMap =
+ GsonUtil.GSON.fromJson(stringRecord[1], Map.class);
+ String key = FilenameUtils.getPath(s3Config.getObject());
+ // 是否保留原始文件名
+ if (s3Config.isKeepOriginalFilename()) {
+ key += metadataMap.get(ORIGINAL_FILENAME) + getExtension();
+ } else {
+ key +=
+ jobId
+ + "_"
+ + taskNumber
+ + "_"
+ + UUID.randomUUID().toString()
+ + getExtension();
+ }
+ s3Config.setObject(key);
+ }
flushDataInternal();
} catch (Exception ex) {
String msg = "RowData2string error RowData(" + rowData + ")";
diff --git a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/source/S3DynamicTableSource.java b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/source/S3DynamicTableSource.java
index 5ee4beec7b..01986de2b8 100644
--- a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/source/S3DynamicTableSource.java
+++ b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/source/S3DynamicTableSource.java
@@ -66,7 +66,13 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
field.setName(column.getName());
field.setType(
TypeConfig.fromString(column.getDataType().getLogicalType().asSummaryString()));
- field.setIndex(i);
+ int index =
+ s3Config.getExcelFormatConfig().getColumnIndex() != null
+ ? s3Config.getExcelFormatConfig()
+ .getColumnIndex()
+ .get(columns.indexOf(column))
+ : columns.indexOf(column);
+ field.setIndex(index);
columnList.add(field);
}
s3Config.setColumn(columnList);
diff --git a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/source/S3InputFormat.java b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/source/S3InputFormat.java
index c450c96854..9b1ac7ee32 100644
--- a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/source/S3InputFormat.java
+++ b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/source/S3InputFormat.java
@@ -18,12 +18,17 @@
package com.dtstack.chunjun.connector.s3.source;
+import com.dtstack.chunjun.config.FieldConfig;
import com.dtstack.chunjun.config.RestoreConfig;
import com.dtstack.chunjun.connector.s3.config.S3Config;
import com.dtstack.chunjun.connector.s3.enums.CompressType;
import com.dtstack.chunjun.connector.s3.util.ReaderUtil;
import com.dtstack.chunjun.connector.s3.util.S3SimpleObject;
import com.dtstack.chunjun.connector.s3.util.S3Util;
+import com.dtstack.chunjun.format.excel.common.ExcelData;
+import com.dtstack.chunjun.format.excel.source.ExcelInputFormat;
+import com.dtstack.chunjun.format.tika.common.TikaData;
+import com.dtstack.chunjun.format.tika.source.TikaInputFormat;
import com.dtstack.chunjun.restore.FormatState;
import com.dtstack.chunjun.source.format.BaseRichInputFormat;
import com.dtstack.chunjun.throwable.ChunJunRuntimeException;
@@ -38,6 +43,8 @@
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import java.io.IOException;
@@ -71,6 +78,12 @@ public class S3InputFormat extends BaseRichInputFormat {
private RestoreConfig restoreConf;
+ private transient TikaData tikaData;
+ private TikaInputFormat tikaInputFormat;
+
+ private transient ExcelData excelData;
+ private ExcelInputFormat excelInputFormat;
+
@Override
public void openInputFormat() throws IOException {
super.openInputFormat();
@@ -137,7 +150,31 @@ protected InputSplit[] createInputSplitsInternal(int minNumSplits) {
protected RowData nextRecordInternal(RowData rowData) throws ReadRecordException {
String[] fields;
try {
- fields = readerUtil.getValues();
+ if (s3Config.getTikaReadConfig().isUseExtract() && tikaData != null) {
+ fields = tikaData.getData();
+ } else if (s3Config.getExcelFormatConfig().isUseExcelFormat() && excelData != null) {
+ fields = excelData.getData();
+ } else {
+ fields = readerUtil.getValues();
+ }
+ // 处理字段配置了对应的列索引
+ if (s3Config.getExcelFormatConfig().getColumnIndex() != null) {
+ List columns = s3Config.getColumn();
+ String[] fieldsData = new String[columns.size()];
+ for (int i = 0; i < CollectionUtils.size(columns); i++) {
+ FieldConfig fieldConfig = columns.get(i);
+ if (fieldConfig.getIndex() >= fields.length) {
+ String errorMessage =
+ String.format(
+ "The column index is greater than the data size."
+ + " The current column index is [%s], but the data size is [%s]. Data loss may occur.",
+ fieldConfig.getIndex(), fields.length);
+ throw new IllegalArgumentException(errorMessage);
+ }
+ fieldsData[i] = fields[fieldConfig.getIndex()];
+ }
+ fields = fieldsData;
+ }
rowData = rowConverter.toInternal(fields);
} catch (IOException e) {
throw new ChunJunRuntimeException(e);
@@ -164,9 +201,82 @@ protected void closeInternal() {
@Override
public boolean reachedEnd() throws IOException {
+ if (s3Config.getTikaReadConfig().isUseExtract()) {
+ tikaData = getTikaData();
+ return tikaData == null || tikaData.getData() == null;
+ } else if (s3Config.getExcelFormatConfig().isUseExcelFormat()) {
+ excelData = getExcelData();
+ return excelData == null || excelData.getData() == null;
+ }
return reachedEndWithoutCheckState();
}
+ public ExcelData getExcelData() {
+ if (excelInputFormat == null) {
+ nextExcelDataStream();
+ }
+ if (excelInputFormat != null) {
+ if (!excelInputFormat.hasNext()) {
+ excelInputFormat.close();
+ excelInputFormat = null;
+ return getExcelData();
+ }
+ String[] record = excelInputFormat.nextRecord();
+ return new ExcelData(record);
+ } else {
+ return null;
+ }
+ }
+
+ private void nextExcelDataStream() {
+ if (splits.hasNext()) {
+ currentObject = splits.next();
+ GetObjectRequest rangeObjectRequest =
+ new GetObjectRequest(s3Config.getBucket(), currentObject);
+ log.info("Current read file {}", currentObject);
+ S3Object o = amazonS3.getObject(rangeObjectRequest);
+ S3ObjectInputStream s3is = o.getObjectContent();
+ excelInputFormat = new ExcelInputFormat();
+ excelInputFormat.open(s3is, s3Config.getExcelFormatConfig());
+ } else {
+ excelInputFormat = null;
+ }
+ }
+
+ public TikaData getTikaData() {
+ if (tikaInputFormat == null) {
+ nextTikaDataStream();
+ }
+ if (tikaInputFormat != null) {
+ if (!tikaInputFormat.hasNext()) {
+ tikaInputFormat.close();
+ tikaInputFormat = null;
+ return getTikaData();
+ }
+ String[] record = tikaInputFormat.nextRecord();
+ return new TikaData(record);
+ } else {
+ return null;
+ }
+ }
+
+ private void nextTikaDataStream() {
+ if (splits.hasNext()) {
+ currentObject = splits.next();
+ GetObjectRequest rangeObjectRequest =
+ new GetObjectRequest(s3Config.getBucket(), currentObject);
+ log.info("Current read file {}", currentObject);
+ S3Object o = amazonS3.getObject(rangeObjectRequest);
+ S3ObjectInputStream s3is = o.getObjectContent();
+ tikaInputFormat =
+ new TikaInputFormat(
+ s3Config.getTikaReadConfig(), s3Config.getFieldNameList().size());
+ tikaInputFormat.open(s3is, FilenameUtils.getName(currentObject));
+ } else {
+ tikaInputFormat = null;
+ }
+ }
+
public boolean reachedEndWithoutCheckState() throws IOException {
// br is empty, indicating that a new file needs to be read
if (readerUtil == null) {
@@ -259,7 +369,11 @@ public List resolveObjects() {
if (s3Config.isUseV2()) {
subObjects =
S3Util.listObjectsKeyByPrefix(
- amazonS3, bucket, prefix, s3Config.getFetchSize());
+ amazonS3,
+ bucket,
+ prefix,
+ s3Config.getFetchSize(),
+ s3Config.getObjectsRegex());
} else {
subObjects =
S3Util.listObjectsByv1(
diff --git a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/table/S3DynamicTableFactory.java b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/table/S3DynamicTableFactory.java
index fa05abaf7f..1d793e74fe 100644
--- a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/table/S3DynamicTableFactory.java
+++ b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/table/S3DynamicTableFactory.java
@@ -22,11 +22,17 @@
import com.dtstack.chunjun.connector.s3.sink.S3DynamicTableSink;
import com.dtstack.chunjun.connector.s3.source.S3DynamicTableSource;
import com.dtstack.chunjun.connector.s3.table.options.S3Options;
+import com.dtstack.chunjun.format.excel.config.ExcelFormatConfig;
+import com.dtstack.chunjun.format.excel.options.ExcelFormatOptions;
+import com.dtstack.chunjun.format.tika.config.TikaReadConfig;
+import com.dtstack.chunjun.format.tika.options.TikaOptions;
import com.dtstack.chunjun.table.options.SinkOptions;
import com.dtstack.chunjun.util.GsonUtil;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
@@ -34,9 +40,14 @@
import org.apache.flink.table.factories.FactoryUtil;
import com.google.common.collect.Sets;
+import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
public class S3DynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
private static final String IDENTIFIER = "s3-x";
@@ -61,7 +72,46 @@ public DynamicTableSource createDynamicTableSource(Context context) {
s3Config.setFirstLineHeader(options.get(S3Options.IS_FIRST_LINE_HEADER));
s3Config.setEndpoint(options.get(S3Options.ENDPOINT));
s3Config.setCompress(options.get(S3Options.COMPRESS));
- return new S3DynamicTableSource(context.getCatalogTable().getResolvedSchema(), s3Config);
+ s3Config.setObjectsRegex(options.get(S3Options.OBJECTS_REGEX));
+ s3Config.setDisableBucketNameInEndpoint(
+ options.get(S3Options.DISABLE_BUCKET_NAME_IN_ENDPOINT));
+ TikaReadConfig tikaReadConfig = new TikaReadConfig();
+ tikaReadConfig.setUseExtract(options.get(TikaOptions.USE_EXTRACT));
+ tikaReadConfig.setOverlapRatio(options.get(TikaOptions.OVERLAP_RATIO));
+ tikaReadConfig.setChunkSize(options.get(TikaOptions.CHUNK_SIZE));
+ s3Config.setTikaReadConfig(tikaReadConfig);
+ ResolvedSchema resolvedSchema = context.getCatalogTable().getResolvedSchema();
+ List columns = resolvedSchema.getColumns();
+ ExcelFormatConfig excelFormatConfig = new ExcelFormatConfig();
+ excelFormatConfig.setUseExcelFormat(options.get(ExcelFormatOptions.USE_EXCEL_FORMAT));
+ excelFormatConfig.setFirstLineHeader(options.get(S3Options.IS_FIRST_LINE_HEADER));
+ if (StringUtils.isNotBlank(options.get(ExcelFormatOptions.SHEET_NO))) {
+ List sheetNo =
+ Arrays.stream(options.get(ExcelFormatOptions.SHEET_NO).split(","))
+ .map(Integer::parseInt)
+ .collect(Collectors.toList());
+ excelFormatConfig.setSheetNo(sheetNo);
+ }
+ if (StringUtils.isNotBlank(options.get(ExcelFormatOptions.COLUMN_INDEX))) {
+ List columnIndex =
+ Arrays.stream(options.get(ExcelFormatOptions.COLUMN_INDEX).split(","))
+ .map(Integer::parseInt)
+ .collect(Collectors.toList());
+ excelFormatConfig.setColumnIndex(columnIndex);
+ }
+ final String[] fields = new String[columns.size()];
+ IntStream.range(0, fields.length).forEach(i -> fields[i] = columns.get(i).getName());
+ excelFormatConfig.setFields(fields);
+ s3Config.setExcelFormatConfig(excelFormatConfig);
+ if (s3Config.getExcelFormatConfig().getColumnIndex() != null
+ && columns.size() != s3Config.getExcelFormatConfig().getColumnIndex().size()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "The number of fields (%s) is inconsistent with the number of indexes (%s).",
+ columns.size(),
+ s3Config.getExcelFormatConfig().getColumnIndex().size()));
+ }
+ return new S3DynamicTableSource(resolvedSchema, s3Config);
}
@Override
@@ -94,6 +144,17 @@ public Set> optionalOptions() {
options.add(S3Options.SUFFIX);
options.add(SinkOptions.SINK_PARALLELISM);
options.add(S3Options.WRITE_MODE);
+ options.add(S3Options.OBJECTS_REGEX);
+ options.add(S3Options.USE_TEXT_QUALIFIER);
+ options.add(S3Options.ENABLE_WRITE_SINGLE_RECORD_AS_FILE);
+ options.add(S3Options.KEEP_ORIGINAL_FILENAME);
+ options.add(S3Options.DISABLE_BUCKET_NAME_IN_ENDPOINT);
+ options.add(TikaOptions.USE_EXTRACT);
+ options.add(TikaOptions.CHUNK_SIZE);
+ options.add(TikaOptions.OVERLAP_RATIO);
+ options.add(ExcelFormatOptions.SHEET_NO);
+ options.add(ExcelFormatOptions.COLUMN_INDEX);
+ options.add(ExcelFormatOptions.USE_EXCEL_FORMAT);
return options;
}
@@ -121,6 +182,12 @@ public DynamicTableSink createDynamicTableSink(Context context) {
s3Config.setSuffix(options.get(S3Options.SUFFIX));
s3Config.setParallelism(options.get(SinkOptions.SINK_PARALLELISM));
s3Config.setWriteMode(options.get(S3Options.WRITE_MODE));
+ s3Config.setUseTextQualifier(options.get(S3Options.USE_TEXT_QUALIFIER));
+ s3Config.setEnableWriteSingleRecordAsFile(
+ options.get(S3Options.ENABLE_WRITE_SINGLE_RECORD_AS_FILE));
+ s3Config.setKeepOriginalFilename(options.get(S3Options.KEEP_ORIGINAL_FILENAME));
+ s3Config.setDisableBucketNameInEndpoint(
+ options.get(S3Options.DISABLE_BUCKET_NAME_IN_ENDPOINT));
return new S3DynamicTableSink(context.getCatalogTable().getResolvedSchema(), s3Config);
}
diff --git a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/table/options/S3Options.java b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/table/options/S3Options.java
index 1f6236438f..a8fff32659 100644
--- a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/table/options/S3Options.java
+++ b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/table/options/S3Options.java
@@ -95,4 +95,31 @@ public class S3Options {
public static final ConfigOption WRITE_MODE =
key("writeMode").stringType().defaultValue("overwrite").withDescription("writeMode");
+
+ public static final ConfigOption OBJECTS_REGEX =
+ key("objectsRegex").stringType().noDefaultValue().withDescription("objects regex rule");
+
+ public static final ConfigOption USE_TEXT_QUALIFIER =
+ key("useTextQualifier")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("use text qualifier");
+
+ public static final ConfigOption ENABLE_WRITE_SINGLE_RECORD_AS_FILE =
+ key("enableWriteSingleRecordAsFile")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("enable write single record as each file");
+
+ public static final ConfigOption KEEP_ORIGINAL_FILENAME =
+ key("keepOriginalFilename")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("keep original filename");
+
+ public static final ConfigOption DISABLE_BUCKET_NAME_IN_ENDPOINT =
+ key("disableBucketNameInEndpoint")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("disable Bucket Name In Endpoint");
}
diff --git a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/util/S3Util.java b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/util/S3Util.java
index a7d849a4a0..02d708f17e 100644
--- a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/util/S3Util.java
+++ b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/util/S3Util.java
@@ -30,6 +30,7 @@
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.S3ClientOptions;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
@@ -46,12 +47,14 @@
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
+import java.util.regex.Pattern;
@Slf4j
public class S3Util {
@@ -75,6 +78,10 @@ public static AmazonS3 getS3Client(S3Config s3Config) {
} else {
builder = builder.withRegion(clientRegion.getName());
}
+ // 禁用 Bucket 名称注入到 endpoint 前缀
+ if (s3Config.isDisableBucketNameInEndpoint()) {
+ builder = builder.withPathStyleAccessEnabled(true);
+ }
return builder.build();
} else {
@@ -89,6 +96,11 @@ public static AmazonS3 getS3Client(S3Config s3Config) {
}
AmazonS3Client client = new AmazonS3Client(cred, ccfg);
client.setEndpoint(s3Config.getEndpoint());
+ // 禁用 Bucket 名称注入到 endpoint 前缀
+ if (s3Config.isDisableBucketNameInEndpoint()) {
+ client.setS3ClientOptions(
+ S3ClientOptions.builder().setPathStyleAccess(true).build());
+ }
return client;
}
} else {
@@ -103,18 +115,29 @@ public static PutObjectResult putStringObject(
}
public static List listObjectsKeyByPrefix(
- AmazonS3 s3Client, String bucketName, String prefix, int fetchSize) {
+ AmazonS3 s3Client, String bucketName, String prefix, int fetchSize, String regex) {
List objects = new ArrayList<>(fetchSize);
ListObjectsV2Request req =
new ListObjectsV2Request().withBucketName(bucketName).withMaxKeys(fetchSize);
if (StringUtils.isNotBlank(prefix)) {
req.setPrefix(prefix);
}
+ // 定义正则表达式
+ Pattern pattern = null;
+ if (StringUtils.isNotBlank(regex)) {
+ pattern = Pattern.compile(regex);
+ }
+
ListObjectsV2Result result;
do {
result = s3Client.listObjectsV2(req);
for (S3ObjectSummary objectSummary : result.getObjectSummaries()) {
+ // 如果对象键与正则表达式匹配,则进行相应处理
+ if (pattern != null
+ && !pattern.matcher(FilenameUtils.getName(objectSummary.getKey())).find()) {
+ continue;
+ }
objects.add(objectSummary.getKey());
}
String token = result.getNextContinuationToken();
diff --git a/chunjun-formats/chunjun-format-excel/pom.xml b/chunjun-formats/chunjun-format-excel/pom.xml
new file mode 100644
index 0000000000..035d992ed5
--- /dev/null
+++ b/chunjun-formats/chunjun-format-excel/pom.xml
@@ -0,0 +1,59 @@
+
+
+
+
+ 4.0.0
+
+ com.dtstack.chunjun
+ chunjun-formats
+ ${revision}
+
+
+ chunjun-format-excel
+ ChunJun : Formats : Excel
+
+
+ excel
+
+
+
+
+ com.alibaba
+ easyexcel
+ 3.2.0
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+ org.apache.maven.plugins
+ maven-antrun-plugin
+
+
+
+
+
diff --git a/chunjun-formats/chunjun-format-excel/src/main/java/com/dtstack/chunjun/format/excel/client/ExcelReadListener.java b/chunjun-formats/chunjun-format-excel/src/main/java/com/dtstack/chunjun/format/excel/client/ExcelReadListener.java
new file mode 100644
index 0000000000..677dd37297
--- /dev/null
+++ b/chunjun-formats/chunjun-format-excel/src/main/java/com/dtstack/chunjun/format/excel/client/ExcelReadListener.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.format.excel.client;
+
+import com.dtstack.chunjun.util.DateUtil;
+
+import com.alibaba.excel.context.AnalysisContext;
+import com.alibaba.excel.read.listener.ReadListener;
+
+import java.time.LocalDateTime;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class ExcelReadListener implements ReadListener