Skip to content

Commit

Permalink
[feature][plugin][hdfswriter] Add new item preShell and postShell (
Browse files Browse the repository at this point in the history
  • Loading branch information
wgzhao authored Oct 27, 2024
1 parent 4442889 commit 69c1c46
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 2 deletions.
10 changes: 8 additions & 2 deletions docs/writer/hdfswriter.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ HDFS Writer 提供向 HDFS 文件系统指定路径中写入 `TextFile` , `ORC

## 参数说明

| 配置项 | 是否必须 | 数据类型 | 默认值 | 说明 |
| :--------------------- | :------: | ----------- | ------- | -------------------------------------------------------------------------------------------- |
| 配置项 | 是否必须 | 数据类型 | 默认值 | 说明 |
|:-----------------------| :------: |-------------| ------- | -------------------------------------------------------------------------------------------- |
| path || string || 要读取的文件路径 |
| defaultFS || string || 详述见下 |
| fileType || string || 文件的类型,详述见下 |
Expand All @@ -27,6 +27,8 @@ HDFS Writer 提供向 HDFS 文件系统指定路径中写入 `TextFile` , `ORC
| kerberosPrincipal || string || 用于 Kerberos 认证的凭证主体, 比如 `addax/node1@WGZHAO.COM` |
| compress || string || 文件的压缩格式,详见下文 |
| hadoopConfig || map || 里可以配置与 Hadoop 相关的一些高级参数,比如HA的配置 |
| preShell || `list` || 写入数据前执行的shell命令,比如 `hive -e "truncate table test.hello"` |
| postShell || `list` || 写入数据后执行的shell命令,比如 `hive -e "select count(1) from test.hello"` |

### path

Expand Down Expand Up @@ -123,6 +125,10 @@ Hadoop hdfs 文件系统 namenode 节点地址。格式:`hdfs://ip:port` ;

这里的 `cluster` 表示 HDFS 配置成HA时的名字,也是 `defaultFS` 配置项中的名字 如果实际环境中的名字不是 `cluster` ,则上述配置中所有写有 `cluster` 都需要替换

### preShell 与 postShell

引入 `preShell``postShell` 的目的是为了在写入数据前后执行一些额外的操作,比如在写入数据前清空表,写入数据后查询表的行数等。一个典型的生产环境场景时,采集的数据按日分区保存在 HDFS 上,
采集之前需要创建分区,这样就可以通过配置 `preShell` 来实现,比如 `hive -e "alter table test.hello add partition(dt='${logdate}')"`

## 类型转换

Expand Down
6 changes: 6 additions & 0 deletions plugin/writer/hdfswriter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-exec</artifactId>
<version>1.4.0</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.wgzhao.addax.common.spi.Writer;
import com.wgzhao.addax.common.util.Configuration;
import com.wgzhao.addax.storage.util.FileHelper;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.io.Charsets;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
Expand All @@ -45,8 +46,12 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import org.apache.commons.exec.CommandLine;

import static com.wgzhao.addax.common.spi.ErrorCode.EXECUTE_FAIL;
import static com.wgzhao.addax.common.spi.ErrorCode.ILLEGAL_VALUE;
import static com.wgzhao.addax.common.spi.ErrorCode.REQUIRED_VALUE;
import static com.wgzhao.addax.common.spi.ErrorCode.RUNTIME_ERROR;

public class HdfsWriter
extends Writer
Expand Down Expand Up @@ -203,6 +208,13 @@ private void validateParameter()
@Override
public void prepare()
{
// check preShell item
List<String> preShells = this.writerSliceConfig.getList("preShell", String.class);
if (!preShells.isEmpty()) {
for (String preShell : preShells) {
execShell(preShell);
}
}

this.tmpStorePath = buildTmpFilePath(path);

Expand Down Expand Up @@ -249,6 +261,14 @@ public void post()

// 删除临时目录
hdfsHelper.deleteDir(new Path(tmpStorePath));

//check postShell item
List<String> postShells = this.writerSliceConfig.getList("postShell", String.class);
if (!postShells.isEmpty()) {
for (String postShell : postShells) {
execShell(postShell);
}
}
}

@Override
Expand Down Expand Up @@ -365,6 +385,22 @@ private static int getDecimalScale(String type)
return Integer.parseInt(type.split(",")[1].replace(")", "").trim());
}
}

private static void execShell(String command)
{
CommandLine cmdLine = CommandLine.parse(command);
DefaultExecutor executor = DefaultExecutor.builder().get();
LOG.info("Running command: {}", command);
try {
int retCode = executor.execute(cmdLine);
if (retCode != 0) {
throw AddaxException.asAddaxException(EXECUTE_FAIL, String.format("Command [%s] exited with code %d", command, retCode));
}
}
catch (Exception e) {
throw AddaxException.asAddaxException(RUNTIME_ERROR, e);
}
}
}

public static class Task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
"fileType": "orc",
"path": "/user/hive/warehouse",
"fileName": "addax",
"preShell": [],
"postShell": [],
"column": [
{
"name": "col1",
Expand Down

0 comments on commit 69c1c46

Please sign in to comment.