Skip to content

Commit

Permalink
[add][plugin][txtfilewriter] Add support for writing to a file using …
Browse files Browse the repository at this point in the history
…SQL statements (#934)
  • Loading branch information
wgzhao authored Nov 4, 2023
1 parent 08b12c6 commit 8a4335e
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ public class Constant
public static final String ENC_PASSWORD_PREFIX = "${enc:";

public static final Set<String> SUPPORTED_WRITE_MODE = new HashSet<>(Arrays.asList("append", "nonConflict", "overwrite", "truncate"));
public static final Set<String> SUPPORTED_FILE_FORMAT = new HashSet<>(Arrays.asList("csv", "text"));

public static final String SQL_FORMAT = "sql";
public static final Set<String> SUPPORTED_FILE_FORMAT = new HashSet<>(Arrays.asList("csv", "text", "sql"));

public static final Set<String> SQL_RESERVED_WORDS = new HashSet<>(Arrays.asList("ALL", "ALTER", "AND", "ANY", "AS", "ASC", "AUTHORIZATION",
"BACKUP", "BEFORE", "BETWEEN", "BREAK", "BROWSE", "BULK", "BY", "CASCADE", "CASE", "CAST", "CATALOG", "CHECK", "CHECKPOINT",
Expand Down
2 changes: 1 addition & 1 deletion common/src/main/java/com/wgzhao/addax/common/base/Key.java
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public class Key
public static final String MANDATORY_ENCODING = "mandatoryEncoding";
public static final String HEADER = "header";
public static final String IS_TABLE_MODE = "isTableMode";

public static final String EXTENDED_INSERT = "extendedInsert";
public Key()
{

Expand Down
70 changes: 49 additions & 21 deletions docs/writer/txtfilewriter.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,25 @@ TxtFileWriter 提供了向本地文件写入类 CSV 格式的一个或者多个

## 参数说明

| 配置项 | 是否必须 | 默认值 | 描述 |
| :------------- | :------: | ------ | ----------------------------------------------------------------------- |
| path ||| 本地文件系统的路径信息,写入 Path 目录下属多个文件 |
| fileName ||| 写入的文件名,该文件名会添加随机的后缀作为每个线程写入实际文件名 |
| writeMode ||| FtpWriter 写入前数据清理处理模式,,详见下文 |
| column ||| 读取字段列表,type 指定源数据的类型,详见下文 |
| fieldDelimiter || `,` | 描述:读取的字段分隔符 |
| compress ||| 文本压缩类型,默认不压缩,支持压缩类型为 zip、lzo、lzop、tgz、bzip2 |
| encoding || utf-8 | 读取文件的编码配置 |
| nullFormat || `\N` | 定义哪些字符串可以表示为 null |
| dateFormat ||| 日期类型的数据序列化到文件中时的格式,例如 `"dateFormat": "yyyy-MM-dd"` |
| fileFormat || text | 文件写出的格式,包括 csv, text 两种, 详见下文 |
| header ||| text 写出时的表头,示例 `['id', 'name', 'age']` |
| 配置项 | 是否必须 | 类型 | 默认值 | 描述 |
|:---------------|:----:|--------|-------|-------------------------------------------------------|
| path || String || 本地文件系统的路径信息,写入 Path 目录下属多个文件 |
| fileName || String || 写入的文件名,该文件名会添加随机的后缀作为每个线程写入实际文件名 |
| writeMode || String || FtpWriter 写入前数据清理处理模式,,详见下文 |
| column || List || 读取字段列表,type 指定源数据的类型,详见下文 |
| fieldDelimiter || Char | `,` | 描述:读取的字段分隔符 |
| compress || String || 文本压缩类型,默认不压缩,支持压缩类型为 `zip``lzo``lzop``tgz``bzip2` |
| encoding || String | utf-8 | 读取文件的编码配置 |
| nullFormat || Char | `\N` | 定义哪些字符串可以表示为 null |
| dateFormat || String || 日期类型的数据序列化到文件中时的格式,例如 `"dateFormat": "yyyy-MM-dd"` |
| fileFormat || String | text | 文件写出的格式,包括 csv, text, sql[^1] 三种, 详见下文 |
| table || String || sql 模式时需要指定表名, |
| column || List || sql 模式时可选指定列名, |
| extendedInsert || Bool | true | sql 模式时是否使用批量插入语法,默认为真详见下文 |
| batchSize || Int | 2048 | sql 模式时批量插入语法的批次大小,详见下文 |
| header || List || text 写出时的表头,示例 `['id', 'name', 'age']` |

[^1]: sql 格式为 4.1.3 版本引入

### writeMode

Expand All @@ -34,16 +40,38 @@ TxtFileWriter 提供了向本地文件写入类 CSV 格式的一个或者多个

### fileFormat

文件写出的格式,包括 csv 和 text 两种,csv 是严格的 csv 格式,如果待写数据包括列分隔符,则会按照 csv 的转义语法转义,转义符号为双引号 `"`
文件写出的格式,包括 csv 和 text 和 `4.1.3` 版本引入的 sql 三种,csv 是严格的 csv 格式,如果待写数据包括列分隔符,则会按照
csv
的转义语法转义,转义符号为双引号 `"`
text 格式是用列分隔符简单分割待写数据,对于待写数据包括列分隔符情况下不做转义。
sql 格式表示将数据以 SQL 语句 (`INSERT INTO ... VALUES`) 的方式写入到文件

### table

仅在 sql 文件格式下需要,用来指定写入的表名

### column

在 sql 文件格式下,可以指定写入的列名,如果指定,则 sql
语句类似 `INSERT INTO table (col1, col2, col3) VALUES (val1, val2, val3)`,模式
否则为 `INSERT INTO table VALUES (val1, val2, val3)`。模式

### extendedInsert

是否启用批量插入语法,如果启用,则会将 batchSize 个数据一次性写入到文件中,否则每个数据一行。该参数借鉴了 `mysqldump` 工具的
[extended-insert](https://dev.mysql.com/doc/refman/8.0/en/mysqldump.html#option_mysqldump_extended-insert) 参数语法

### batchSize

批量插入语法的批次大小,如果 extendedInsert 为 true,则每 batchSize 个数据一次性写入到文件中,否则每个数据一行。

## 类型转换

| Addax 内部类型 | 本地文件 数据类型 |
| -------------- | ----------------- |
| |
| Long | Long |
| Double | Double |
| String | String |
| Boolean | Boolean |
| Date | Date |
|------------|-----------|
| |
| Long | Long |
| Double | Double |
| String | String |
| Boolean | Boolean |
| Date | Date |
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public enum StorageWriterErrorCode
WRITE_FILE_IO_ERROR("UnstructuredStorageWriter-02", "您配置的文件在写入时出现IO异常."),
RUNTIME_EXCEPTION("UnstructuredStorageWriter-03", "出现运行时异常, 请联系我们"),
REQUIRED_VALUE("UnstructuredStorageWriter-04", "您缺失了必须填写的参数值."),
SQL_REQUIRED_TABLE_NAME("UnstructuredStorageWriter-05", "sql format required table name."),
;

private final String code;
Expand Down
Loading

0 comments on commit 8a4335e

Please sign in to comment.