Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature][plugin][hdfswriter] Add new item preShell and postShell #1175

Merged
merged 3 commits into from
Oct 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions docs/writer/hdfswriter.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ HDFS Writer 提供向 HDFS 文件系统指定路径中写入 `TextFile` , `ORC

## 参数说明

| 配置项 | 是否必须 | 数据类型 | 默认值 | 说明 |
| :--------------------- | :------: | ----------- | ------- | -------------------------------------------------------------------------------------------- |
| 配置项 | 是否必须 | 数据类型 | 默认值 | 说明 |
|:-----------------------| :------: |-------------| ------- | -------------------------------------------------------------------------------------------- |
| path | 是 | string | 无 | 要读取的文件路径 |
| defaultFS | 是 | string | 无 | 详述见下 |
| fileType | 是 | string | 无 | 文件的类型,详述见下 |
Expand All @@ -27,6 +27,8 @@ HDFS Writer 提供向 HDFS 文件系统指定路径中写入 `TextFile` , `ORC
| kerberosPrincipal | 否 | string | 无 | 用于 Kerberos 认证的凭证主体, 比如 `addax/node1@WGZHAO.COM` |
| compress | 否 | string | 无 | 文件的压缩格式,详见下文 |
| hadoopConfig | 否 | map | 无 | 里可以配置与 Hadoop 相关的一些高级参数,比如HA的配置 |
| preShell | 否 | `list` | 无 | 写入数据前执行的shell命令,比如 `hive -e "truncate table test.hello"` |
| postShell | 否 | `list` | 无 | 写入数据后执行的shell命令,比如 `hive -e "select count(1) from test.hello"` |

### path

Expand Down Expand Up @@ -123,6 +125,10 @@ Hadoop hdfs 文件系统 namenode 节点地址。格式:`hdfs://ip:port` ;

这里的 `cluster` 表示 HDFS 配置成HA时的名字,也是 `defaultFS` 配置项中的名字 如果实际环境中的名字不是 `cluster` ,则上述配置中所有写有 `cluster` 都需要替换

### preShell 与 postShell

引入 `preShell` 与 `postShell` 的目的是为了在写入数据前后执行一些额外的操作,比如在写入数据前清空表,写入数据后查询表的行数等。一个典型的生产环境场景时,采集的数据按日分区保存在 HDFS 上,
采集之前需要创建分区,这样就可以通过配置 `preShell` 来实现,比如 `hive -e "alter table test.hello add partition(dt='${logdate}')"`

## 类型转换

Expand Down
6 changes: 6 additions & 0 deletions plugin/writer/hdfswriter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-exec</artifactId>
<version>1.4.0</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.wgzhao.addax.common.spi.Writer;
import com.wgzhao.addax.common.util.Configuration;
import com.wgzhao.addax.storage.util.FileHelper;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.io.Charsets;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
Expand All @@ -45,8 +46,12 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import org.apache.commons.exec.CommandLine;

import static com.wgzhao.addax.common.spi.ErrorCode.EXECUTE_FAIL;
import static com.wgzhao.addax.common.spi.ErrorCode.ILLEGAL_VALUE;
import static com.wgzhao.addax.common.spi.ErrorCode.REQUIRED_VALUE;
import static com.wgzhao.addax.common.spi.ErrorCode.RUNTIME_ERROR;

public class HdfsWriter
extends Writer
Expand Down Expand Up @@ -203,6 +208,13 @@ private void validateParameter()
@Override
public void prepare()
{
// check preShell item
List<String> preShells = this.writerSliceConfig.getList("preShell", String.class);
if (!preShells.isEmpty()) {
for (String preShell : preShells) {
execShell(preShell);
}
}

this.tmpStorePath = buildTmpFilePath(path);

Expand Down Expand Up @@ -249,6 +261,14 @@ public void post()

// 删除临时目录
hdfsHelper.deleteDir(new Path(tmpStorePath));

//check postShell item
List<String> postShells = this.writerSliceConfig.getList("postShell", String.class);
if (!postShells.isEmpty()) {
for (String postShell : postShells) {
execShell(postShell);
}
}
}

@Override
Expand Down Expand Up @@ -365,6 +385,22 @@ private static int getDecimalScale(String type)
return Integer.parseInt(type.split(",")[1].replace(")", "").trim());
}
}

private static void execShell(String command)
{
CommandLine cmdLine = CommandLine.parse(command);
DefaultExecutor executor = DefaultExecutor.builder().get();
LOG.info("Running command: {}", command);
try {
int retCode = executor.execute(cmdLine);
if (retCode != 0) {
throw AddaxException.asAddaxException(EXECUTE_FAIL, String.format("Command [%s] exited with code %d", command, retCode));
}
}
catch (Exception e) {
throw AddaxException.asAddaxException(RUNTIME_ERROR, e);
}
}
}

public static class Task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
"fileType": "orc",
"path": "/user/hive/warehouse",
"fileName": "addax",
"preShell": [],
"postShell": [],
"column": [
{
"name": "col1",
Expand Down