Skip to content

Commit

Permalink
[update][plugin][databendwriter]Refactor databendwriter with jdbc, su…
Browse files Browse the repository at this point in the history
…pport replace mode (#864)
  • Loading branch information
wgzhao authored Nov 5, 2023
1 parent 4e5a8cc commit 784a057
Show file tree
Hide file tree
Showing 15 changed files with 163 additions and 1,066 deletions.
17 changes: 9 additions & 8 deletions docs/assets/jobs/databendwriter.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,17 @@
],
"postSql": [
],
"connection": [
{
"jdbcUrl": "jdbc:databend://localhost:8000/addax",
"table": [
"table1"
]
}
],
"username": "u1",
"password": "123",
"database": "example_db",
"table": "table1",
"jdbcUrl": "jdbc:mysql://127.0.0.1:3307/example_db",
"loadUrl": ["127.0.0.1:8000","127.0.0.1:8000"],
"fieldDelimiter": "\\x01",
"lineDelimiter": "\\x02",
"column": ["*"],
"format": "csv"
"column": ["*"]
}
},
"reader": {
Expand Down
63 changes: 36 additions & 27 deletions docs/writer/databendwriter.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# DatabendWriter

Databend 插件用于向 [Databend](https://databend.rs/zh-CN/doc/) 数据库以流式方式写入数据。 其实现上是通过访问 Databend http 连接(8000)
Databend 插件用于向 [Databend](https://databend.rs/zh-CN/doc/) 数据库以流式方式写入数据。 其实现上是通过访问 Databend
http 连接(8000)
,然后通过 [stream load](https://databend.rs/zh-CN/doc/integrations/api/streaming-load)
加载数据到数据中,相比 `insert into` 方式效率要高不少,也是官方推荐的生产环境下的数据加载方式。

Expand All @@ -13,11 +14,12 @@ Databend 是一个兼容 MySQL 协议的数据库后端,因此 Databend 读取
```sql
CREATE
DATABASE example_db;
CREATE TABLE `example_db`.`table1` (
`siteid` INT DEFAULT CAST(10 AS INT),
`citycode` INT,
`username` VARCHAR,
`pv` BIGINT
CREATE TABLE `example_db`.`table1`
(
`siteid` INT DEFAULT CAST(10 AS INT),
`citycode` INT,
`username` VARCHAR,
`pv` BIGINT
);
```

Expand All @@ -37,27 +39,34 @@ bin/addax.sh job/stream2Databend.json

## 参数说明

| 配置项 | 是否必须 | 类型 | 默认值 | 描述 |
| :------------- | :------: | ------ | --------- | --------------------------------------------------------------------------------------------------------------- |
| jdbcUrl || string || 目的数据库的 JDBC 连接信息,用于执行`preSql``postSql` |
| loadUrl || string || Databend query 节点的地址用于 StreamLoad,可以为多个 query 地址,`query_ip:query_http_port`,从多个地址轮循写入 |
| username || string || HTTP 签名验证帐号 |
| password || string || HTTP 签名验证密码 |
| database || string || Databend 表的数据库名称 |
| table || string || Databend 表的表名称 |
| column || list || 所配置的表中需要同步的列名集合,详细描述见 [rdbmswriter][1] |
| maxBatchRows || int | 500000 | 定义了插件和数据库服务器端每次批量数据获取条数,调高该值可能导致 OOM 或者目标数据库事务提交失败导致挂起 |
| maxBatchSize || int | 104857600 | 单次 StreamLoad 导入的最大字节数 |
| flushInterval || int | 300000 | 上一次 StreamLoad 结束至下一次开始的时间间隔(单位:ms) |
| endpoint || string || Databend 的 HTTP 连接方式,只需要写到主机和端口即可,具体路径插件会自动拼装 |
| username || string || HTTP 签名验证帐号 |
| password || string || HTTP 签名验证密码 |
| table || string || 所选取的需要同步的表名 |
| column || list || 所配置的表中需要同步的列名集合,详细描述见 [rdbmswriter][1] |
| batchSize || int | 1024 | |
| lineDelimiter || string | `\n` | 每行的分隔符,支持高位字节, 例如 `\\x02` |
| filedDelimiter || string | `\t` | 每列的分隔符,支持高位字节, 例如 `\\x01` |
| format || string | `csv` | 被导入数据会被转换成 format 指定格式。 |
| 配置项 | 是否必须 | 类型 | 默认值 | 描述 |
|:-----------------|:----:|--------|----------|------------------------------------------------------------------------------|
| jdbcUrl || string || 目的数据库的 JDBC 连接信息,用于执行`preSql``postSql` |
| username || string || JDBC 数据源用户名 |
| password || string || JDBC 数据源密码 |
| table || string || Databend 表的表名称 |
| column || list || 所配置的表中需要同步的列名集合,详细描述见 [rdbmswriter][1] |
| preSql || list || 任务开始前执行的 SQL 语句,多条语句以分号分隔,语句中不能包含分号。 |
| postSql || list || 任务结束后执行的 SQL 语句,多条语句以分号分隔,语句中不能包含分号。 |
| batchSize || int | 1024 | 每个批次的记录数 |
| writeMode || string | `insert` | 写入模式,支持 insert 和 replace 两种模式,默认为 insert。若为 replace,务必填写 onConflictColumn 参数 |
| onConflictColumn || string || 冲突列,当 writeMode 为 replace 时,必须指定冲突列,否则会导致写入失败。 |

### writeMode

该参数为 `4.1.2` 版本引入,用来支持 Databend 的 `replace into` 语法,当该参数设定为 `replace`
时,必须同时指定 `onConflictColumn` 参数,用来判断数据是插入还是更新的依据。

两个参数的示例如下:

```json
{
"writeMode": "replace",
"onConflictColumn": [
"id"
]
}
```

## 类型转换

Expand Down
15 changes: 5 additions & 10 deletions plugin/writer/databendwriter/pom.xml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.wgzhao.addax</groupId>
Expand Down Expand Up @@ -55,15 +56,9 @@
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.jdbc.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
</exclusions>
<groupId>com.databend</groupId>
<artifactId>databend-jdbc</artifactId>
<version>0.1.2</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,162 +1,121 @@
package com.wgzhao.addax.plugin.writer.databendwriter;

import com.wgzhao.addax.common.element.Record;
import com.wgzhao.addax.common.exception.AddaxException;
import com.wgzhao.addax.common.element.Column;
import com.wgzhao.addax.common.plugin.RecordReceiver;
import com.wgzhao.addax.common.spi.Writer;
import com.wgzhao.addax.common.util.Configuration;
import com.wgzhao.addax.plugin.writer.databendwriter.manager.DatabendWriterManager;
import com.wgzhao.addax.plugin.writer.databendwriter.row.DatabendISerializer;
import com.wgzhao.addax.plugin.writer.databendwriter.row.DatabendSerializerFactory;
import com.wgzhao.addax.plugin.writer.databendwriter.util.DatabendWriterUtil;
import com.wgzhao.addax.rdbms.util.DBUtil;
import com.wgzhao.addax.rdbms.util.DBUtilErrorCode;
import com.wgzhao.addax.rdbms.util.DataBaseType;
import com.wgzhao.addax.rdbms.writer.CommonRdbmsWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.util.ArrayList;
import java.util.Collections;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.List;

public class DatabendWriter
extends Writer
{
public class DatabendWriter extends Writer {
private static final DataBaseType DATABASE_TYPE = DataBaseType.Databend;

public static class Job
extends Writer.Job
{

extends Writer.Job {
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
private Configuration originalConfig = null;
private DatabendWriterOptions options;
private Configuration originalConfig;
private CommonRdbmsWriter.Job commonRdbmsWriterMaster;

@Override
public void init()
{
public void init() {
this.originalConfig = super.getPluginJobConf();
options = new DatabendWriterOptions(super.getPluginJobConf());
options.doPretreatment();
this.commonRdbmsWriterMaster = new CommonRdbmsWriter.Job(DATABASE_TYPE);
this.commonRdbmsWriterMaster.init(this.originalConfig);
// placeholder currently not supported by databend driver, needs special treatment
DatabendWriterUtil.dealWriteMode(this.originalConfig);
}

@Override
public void preCheck()
{
public void preCheck() {
this.init();
DatabendWriterUtil.preCheckPrePareSQL(options);
DatabendWriterUtil.preCheckPostSQL(options);
this.commonRdbmsWriterMaster.writerPreCheck(this.originalConfig, DATABASE_TYPE);
}

@Override
public void prepare()
{
String username = options.getUsername();
String password = options.getPassword();
String jdbcUrl = options.getJdbcUrl();
List<String> renderedPreSqls = DatabendWriterUtil.renderPreOrPostSqls(options.getPreSqlList(), options.getTable());
if (null != renderedPreSqls && !renderedPreSqls.isEmpty()) {
Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, username, password);
LOG.info("Begin to execute preSqls:[{}]. context info:{}.", String.join(";", renderedPreSqls), jdbcUrl);
DatabendWriterUtil.executeSqls(conn, renderedPreSqls);
DBUtil.closeDBResources(null, null, conn);
}
public void prepare() {
this.commonRdbmsWriterMaster.prepare(this.originalConfig);
}

@Override
public List<Configuration> split(int mandatoryNumber)
{
List<Configuration> configurations = new ArrayList<>(mandatoryNumber);
for (int i = 0; i < mandatoryNumber; i++) {
configurations.add(originalConfig);
}
return configurations;
public List<Configuration> split(int mandatoryNumber) {
return this.commonRdbmsWriterMaster.split(this.originalConfig, mandatoryNumber);
}

@Override
public void post()
{
String username = options.getUsername();
String password = options.getPassword();
String jdbcUrl = options.getJdbcUrl();
List<String> renderedPostSqls = DatabendWriterUtil.renderPreOrPostSqls(options.getPostSqlList(), options.getTable());
if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) {
Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, username, password);
LOG.info("Begin to execute preSqls:[{}]. context info:{}.", String.join(";", renderedPostSqls), jdbcUrl);
DatabendWriterUtil.executeSqls(conn, renderedPostSqls);
DBUtil.closeDBResources(null, null, conn);
}
public void post() {
this.commonRdbmsWriterMaster.post(this.originalConfig);
}

@Override
public void destroy()
{
public void destroy() {
this.commonRdbmsWriterMaster.destroy(this.originalConfig);
}
}

public static class Task
extends Writer.Task
{
private DatabendWriterManager writerManager;
private DatabendWriterOptions options;
private DatabendISerializer rowSerializer;

@Override
public void init()
{
options = new DatabendWriterOptions(super.getPluginJobConf());
if (options.isWildcardColumn()) {
options.setInfoCchemaColumns(Collections.singletonList("*"));
}
writerManager = new DatabendWriterManager(options);
rowSerializer = DatabendSerializerFactory.createSerializer(options);
}
public static class Task extends Writer.Task {
private static final Logger LOG = LoggerFactory.getLogger(Task.class);

@Override
public void prepare()
{
}
private Configuration writerSliceConfig;

private CommonRdbmsWriter.Task commonRdbmsWriterSlave;

public void startWrite(RecordReceiver recordReceiver)
{
try {
Record record;
while ((record = recordReceiver.getFromReader()) != null) {
if (!options.isWildcardColumn() && record.getColumnNumber() != options.getColumns().size()) {
throw AddaxException
.asAddaxException(
DBUtilErrorCode.CONF_ERROR,
String.format(
"There is an error in the column configuration information. The source reads the number of fields:%s not equal with the number of fields to be written in the destination table:%s. Please check your configuration.",
record.getColumnNumber(),
options.getColumns().size()));
@Override
public void init() {
this.writerSliceConfig = super.getPluginJobConf();

this.commonRdbmsWriterSlave = new CommonRdbmsWriter.Task(DataBaseType.Databend)
{
@Override
protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex, int columnSqlType, Column column)
throws SQLException {
switch (columnSqlType) {
case Types.TINYINT:
case Types.SMALLINT:
case Types.INTEGER:
preparedStatement.setInt(columnIndex, column.asBigInteger().intValue());
return preparedStatement;
case Types.BIGINT:
preparedStatement.setLong(columnIndex, column.asLong());
return preparedStatement;
case Types.JAVA_OBJECT:
// cast variant / array into string is fine.
preparedStatement.setString(columnIndex, column.asString());
return preparedStatement;
}
writerManager.writeRecord(rowSerializer.serialize(record));
return super.fillPreparedStatementColumnType(preparedStatement, columnIndex, columnSqlType, column);
}
}
catch (Exception e) {
throw AddaxException.asAddaxException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
}
};
this.commonRdbmsWriterSlave.init(this.writerSliceConfig);
}

@Override
public void destroy() {
this.commonRdbmsWriterSlave.destroy(this.writerSliceConfig);
}

@Override
public void post()
{
try {
writerManager.close();
}
catch (Exception e) {
throw AddaxException.asAddaxException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
}
public void prepare() {
this.commonRdbmsWriterSlave.prepare(this.writerSliceConfig);
}

@Override
public void destroy() {}
public void post() {
this.commonRdbmsWriterSlave.post(this.writerSliceConfig);
}

@Override
public boolean supportFailOver()
{
return false;
public void startWrite(RecordReceiver lineReceiver) {
this.commonRdbmsWriterSlave.startWrite(lineReceiver, this.writerSliceConfig, this.getTaskPluginCollector());
}

}
}
}
Loading

0 comments on commit 784a057

Please sign in to comment.