Skip to content

Commit

Permalink
[BugFix][Jar] Fix Flink jar submission and set parallelism failure is…
Browse files Browse the repository at this point in the history
…sue (#3165)

Co-authored-by: zackyoungh <zackyoungh@users.noreply.github.com>
  • Loading branch information
zackyoungh and zackyoungh authored Feb 19, 2024
1 parent a62813c commit 3c5e021
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.io.File;
import java.util.Optional;

import cn.hutool.core.convert.Convert;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.lang.Opt;
import cn.hutool.core.util.StrUtil;
Expand Down Expand Up @@ -91,7 +92,10 @@ public static StreamGraph getStreamGraph(JarSubmitParam submitParam, CustomTable
.setSavepointRestoreSettings(savepointRestoreSettings)
.setArguments(RunTimeUtil.handleCmds(submitParam.getArgs()))
.build();
Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(program, configuration, 1, true);
int parallelism = StrUtil.isNumeric(submitParam.getParallelism())
? Convert.toInt(submitParam.getParallelism())
: tEnv.getStreamExecutionEnvironment().getParallelism();
Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(program, configuration, parallelism, true);
program.close();
Assert.isTrue(pipeline instanceof StreamGraph, "can not translate");
return (StreamGraph) pipeline;
Expand Down
10 changes: 5 additions & 5 deletions docs/docs/extend/expand_statements/execute_jar.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ title: EXECUTE JAR

如果使用 `Checkpoint``Savepoint` ,请在右边作业,选择 `Savepoint策略`,其次检查点 跳过 请使用 execution.savepoint.ignore-unclaimed-state: true 参数控制


此flink sql jar任务支持 `set``add customjar` 联动使用
:::

## 语法结构
Expand All @@ -23,8 +25,7 @@ title: EXECUTE JAR
EXECUTE JAR WITH (
'uri'='<jar_path>.jar', -- 该参数 必填
'main-class'='<main_class>', -- 该参数 必填
'args'='<args>', -- 主类入参 该参数可选
'parallelism'='<parallelism>', -- 任务并行度 该参数可选
'args'='<args>' -- 主类入参 该参数可选
);

```
Expand All @@ -35,15 +36,14 @@ EXECUTE JAR WITH (
EXECUTE JAR WITH (
'uri'='rs:/jar/flink/demo/SocketWindowWordCount.jar',
'main-class'='org.apache.flink.streaming.examples.socket',
'args'=' --hostname localhost ',
'parallelism'=''
'args'=' --hostname localhost '
);
```
:::warning 注意

1. 以上示例中, uri 的值为 rs:/jar/flink/demo/SocketWindowWordCount.jar, 该值为资源中心中的资源路径,
请确保资源中心中存在该资源,请忽略资源中心 Root 节点(该节点为虚拟节点)
2. 如果要读取S3,HDFS,LCOAL等存储上面的文件均可通过rs协议进行桥接使用,请参考 [资源管理](../../user_guide/register_center/resource) 中 rs 协议使用方式
2. 如果要读取S3,HDFS,LOCAL等存储上面的文件均可通过rs协议进行桥接使用,请参考 [资源管理](../../user_guide/register_center/resource) 中 rs 协议使用方式
:::

## PyFlink 任务提交:
Expand Down

0 comments on commit 3c5e021

Please sign in to comment.