Skip to content

Commit

Permalink
[Fix-1503][executor] Fix use yarn application mode to submit job repo…
Browse files Browse the repository at this point in the history
…rted null pointer exception (#1562)

Co-authored-by: wenmo <32723967+wenmo@users.noreply.github.com>
  • Loading branch information
aiwenmo and aiwenmo authored Jan 15, 2023
1 parent 8ab0e6d commit 6a2ff0b
Showing 1 changed file with 20 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,23 @@ public ExecutorSetting(Integer checkpoint, Integer parallelism, boolean useSqlFr
this(checkpoint, parallelism, useSqlFragment, null, null);
}

public ExecutorSetting(Integer checkpoint, Integer parallelism, boolean useSqlFragment, String savePointPath, String jobName) {
public ExecutorSetting(Integer checkpoint, Integer parallelism, boolean useSqlFragment, String savePointPath,
String jobName) {
this(checkpoint, parallelism, useSqlFragment, savePointPath, jobName, null);
}

public ExecutorSetting(Integer checkpoint, Integer parallelism, boolean useSqlFragment, String savePointPath) {
this(checkpoint, parallelism, useSqlFragment, savePointPath, null, null);
}

public ExecutorSetting(Integer checkpoint, Integer parallelism, boolean useSqlFragment, String savePointPath, String jobName, Map<String, String> config) {
public ExecutorSetting(Integer checkpoint, Integer parallelism, boolean useSqlFragment, String savePointPath,
String jobName, Map<String, String> config) {
this(checkpoint, parallelism, useSqlFragment, false, false, savePointPath, jobName, config);
}

public ExecutorSetting(Integer checkpoint, Integer parallelism, boolean useSqlFragment, boolean useStatementSet, boolean useBatchModel, String savePointPath, String jobName,
Map<String, String> config) {
public ExecutorSetting(Integer checkpoint, Integer parallelism, boolean useSqlFragment, boolean useStatementSet,
boolean useBatchModel, String savePointPath, String jobName,
Map<String, String> config) {
this.checkpoint = checkpoint;
this.parallelism = parallelism;
this.useSqlFragment = useSqlFragment;
Expand All @@ -107,8 +110,9 @@ public ExecutorSetting(Integer checkpoint, Integer parallelism, boolean useSqlFr
this.config = config;
}

public static ExecutorSetting build(Integer checkpoint, Integer parallelism, boolean useSqlFragment, boolean useStatementSet, boolean useBatchModel, String savePointPath, String jobName,
String configJson) {
public static ExecutorSetting build(Integer checkpoint, Integer parallelism, boolean useSqlFragment,
boolean useStatementSet, boolean useBatchModel, String savePointPath, String jobName,
String configJson) {
List<Map<String, String>> configList = new ArrayList<>();
if (Asserts.isNotNullString(configJson)) {
try {
Expand All @@ -120,7 +124,9 @@ public static ExecutorSetting build(Integer checkpoint, Integer parallelism, boo

Map<String, String> config = new HashMap<>();
for (Map<String, String> item : configList) {
config.put(item.get("key"), item.get("value"));
if (Asserts.isNotNull(item) && Asserts.isAllNotNullString(item.get("key"), item.get("value"))) {
config.put(item.get("key"), item.get("value"));
}
}
return new ExecutorSetting(checkpoint, parallelism, useSqlFragment, useStatementSet, useBatchModel,
savePointPath, jobName, config);
Expand All @@ -130,8 +136,9 @@ public static ExecutorSetting build(Map<String, String> settingMap) {
Integer checkpoint = Integer.valueOf(settingMap.get(CHECKPOINT_CONST));
Integer parallelism = Integer.valueOf(settingMap.get(PARALLELISM_CONST));

return build(checkpoint, parallelism, "1".equals(settingMap.get(USE_SQL_FRAGMENT)), "1".equals(settingMap.get(USE_STATEMENT_SET)), "1".equals(settingMap.get(USE_BATCH_MODEL)),
settingMap.get(SAVE_POINT_PATH), settingMap.get(JOB_NAME), settingMap.get(CONFIG_CONST));
return build(checkpoint, parallelism, "1".equals(settingMap.get(USE_SQL_FRAGMENT)),
"1".equals(settingMap.get(USE_STATEMENT_SET)), "1".equals(settingMap.get(USE_BATCH_MODEL)),
settingMap.get(SAVE_POINT_PATH), settingMap.get(JOB_NAME), settingMap.get(CONFIG_CONST));
}

public boolean isValidParallelism() {
Expand All @@ -144,7 +151,9 @@ public boolean isValidJobName() {

@Override
public String toString() {
return String.format("ExecutorSetting{checkpoint=%d, parallelism=%d, useSqlFragment=%s, useStatementSet=%s, savePointPath='%s', jobName='%s', config=%s}", checkpoint, parallelism,
useSqlFragment, useStatementSet, savePointPath, jobName, config);
return String.format(
"ExecutorSetting{checkpoint=%d, parallelism=%d, useSqlFragment=%s, useStatementSet=%s, savePointPath='%s', jobName='%s', config=%s}",
checkpoint, parallelism,
useSqlFragment, useStatementSet, savePointPath, jobName, config);
}
}

0 comments on commit 6a2ff0b

Please sign in to comment.