Skip to content

Commit

Permalink
[BugFix] [Flink] Fix flink jar submit (#4028)
Browse files Browse the repository at this point in the history
Co-authored-by: zackyoungh <zackyoungh@users.noreply.github.com>
  • Loading branch information
zackyoungh and zackyoungh authored Dec 8, 2024
1 parent 3d01224 commit 062f553
Show file tree
Hide file tree
Showing 10 changed files with 66 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ default void addJar(File... jarPath) {
List<String> pathList =
Arrays.stream(URLUtil.getURLs(jarPath)).map(URL::toString).collect(Collectors.toList());
List<String> jars = configuration.get(PipelineOptions.JARS);
if (jars == null) {
if (CollUtil.isEmpty(jars)) {
addConfiguration(PipelineOptions.JARS, pathList);
} else {
CollUtil.addAll(jars, pathList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import static org.apache.flink.util.Preconditions.checkNotNull;

import org.dinky.executor.CustomTableEnvironment;

import org.apache.flink.api.common.Plan;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.PlanTranslator;
Expand Down Expand Up @@ -80,6 +82,41 @@ public static JobClient executeAsync(Pipeline pipeline, StreamExecutionEnvironme
}
}

public static JobClient executeAsync(Pipeline pipeline, CustomTableEnvironment env) throws Exception {
Configuration configuration = new Configuration(env.getRootConfiguration());
checkNotNull(pipeline, "pipeline cannot be null.");
checkNotNull(
configuration.get(DeploymentOptions.TARGET),
"No execution.target specified in your configuration file.");

PipelineExecutorServiceLoader executorServiceLoader = (PipelineExecutorServiceLoader)
ReflectUtil.getFieldValue(env.getStreamExecutionEnvironment(), "executorServiceLoader");
ClassLoader userClassloader = (ClassLoader) ReflectUtil.getFieldValue(env, "userClassloader");
final PipelineExecutorFactory executorFactory = executorServiceLoader.getExecutorFactory(configuration);

checkNotNull(
executorFactory,
"Cannot find compatible factory for specified execution.target (=%s)",
configuration.get(DeploymentOptions.TARGET));

CompletableFuture<JobClient> jobClientFuture =
executorFactory.getExecutor(configuration).execute(pipeline, configuration, userClassloader);

List<JobListener> jobListeners = env.getStreamExecutionEnvironment().getJobListeners();
try {
JobClient jobClient = jobClientFuture.get();
jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
return jobClient;
} catch (ExecutionException executionException) {
final Throwable strippedException = ExceptionUtils.stripExecutionException(executionException);
jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(null, strippedException));

throw new FlinkException(
String.format("Failed to execute job '%s'.", ReflectUtil.invoke(pipeline, "getJobName")),
strippedException);
}
}

public static String getStreamingPlanAsJSON(Pipeline pipeline) {
if (pipeline instanceof StreamGraph) {
return ((StreamGraph) pipeline).getStreamingPlanAsJSON();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ public enum JobStatementType {
DDL,
SQL,
PIPELINE,
EXECUTE_JAR
}
2 changes: 2 additions & 0 deletions dinky-common/src/main/java/org/dinky/data/job/SqlType.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ public enum SqlType {

RESET("RESET", "^RESET.*", SqlCategory.DDL),

EXECUTE_JAR("EXECUTE_JAR", "^EXECUTE\\s+JAR\\s+WITH.*", SqlCategory.DML),

EXECUTE("EXECUTE", "^EXECUTE.*", SqlCategory.DML),

ADD_JAR("ADD_JAR", "^ADD\\s+JAR\\s+\\S+", SqlCategory.DDL),
Expand Down
2 changes: 2 additions & 0 deletions dinky-core/src/main/java/org/dinky/executor/Executor.java
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ public JobStatementPlan parseStatementIntoJobStatementPlan(String[] statements)
SqlType operationType = Operations.getOperationType(statement);
if (operationType.equals(SqlType.SET) || operationType.equals(SqlType.RESET)) {
jobStatementPlan.addJobStatement(statement, JobStatementType.SET, operationType);
} else if (operationType.equals(SqlType.EXECUTE_JAR)) {
jobStatementPlan.addJobStatement(statement, JobStatementType.EXECUTE_JAR, operationType);
} else if (operationType.equals(SqlType.EXECUTE)) {
jobStatementPlan.addJobStatement(statement, JobStatementType.PIPELINE, operationType);
} else if (operationType.equals(SqlType.PRINT)) {
Expand Down
7 changes: 2 additions & 5 deletions dinky-core/src/main/java/org/dinky/job/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.dinky.gateway.result.GatewayResult;
import org.dinky.gateway.result.SavePointResult;
import org.dinky.gateway.result.TestResult;
import org.dinky.job.runner.JobJarRunner;
import org.dinky.trans.Operations;
import org.dinky.trans.parse.AddFileSqlParseStrategy;
import org.dinky.trans.parse.AddJarSqlParseStrategy;
Expand Down Expand Up @@ -252,12 +251,10 @@ public JobResult executeJarSql(String statement) throws Exception {
jobStatementPlan.buildFinalStatement();
job = Job.build(runMode, config, executorConfig, executor, statement, useGateway);
ready();
JobRunnerFactory jobRunnerFactory = JobRunnerFactory.create(this);
try {
// Only one is executed.
for (JobStatement jobStatement : jobStatementPlan.getJobStatementList()) {
JobJarRunner jobJarRunner = new JobJarRunner(this);
jobJarRunner.run(jobStatement);
break;
jobRunnerFactory.getJobRunner(jobStatement.getStatementType()).run(jobStatement);
}
if (job.isFailed()) {
failed();
Expand Down
5 changes: 5 additions & 0 deletions dinky-core/src/main/java/org/dinky/job/JobRunnerFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.dinky.data.job.JobStatementType;
import org.dinky.job.runner.JobDDLRunner;
import org.dinky.job.runner.JobJarRunner;
import org.dinky.job.runner.JobPipelineRunner;
import org.dinky.job.runner.JobSetRunner;
import org.dinky.job.runner.JobSqlRunner;
Expand All @@ -31,12 +32,14 @@ public class JobRunnerFactory {
private JobSqlRunner jobSqlRunner;
private JobPipelineRunner jobPipelineRunner;
private JobDDLRunner jobDDLRunner;
private JobJarRunner jobJarRunner;

public JobRunnerFactory(JobManager jobManager) {
this.jobSetRunner = new JobSetRunner(jobManager);
this.jobSqlRunner = new JobSqlRunner(jobManager);
this.jobPipelineRunner = new JobPipelineRunner(jobManager);
this.jobDDLRunner = new JobDDLRunner(jobManager);
this.jobJarRunner = new JobJarRunner(jobManager);
}

public JobRunner getJobRunner(JobStatementType jobStatementType) {
Expand All @@ -47,6 +50,8 @@ public JobRunner getJobRunner(JobStatementType jobStatementType) {
return jobSqlRunner;
case PIPELINE:
return jobPipelineRunner;
case EXECUTE_JAR:
return jobJarRunner;
case DDL:
default:
return jobDDLRunner;
Expand Down
3 changes: 2 additions & 1 deletion dinky-core/src/main/java/org/dinky/job/JobStatementPlan.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ private void checkEmptyStatement() {
throw new DinkyException("The statement cannot be empty. Please check your statements.");
}
if (jobStatement.getStatementType().equals(JobStatementType.SQL)
|| jobStatement.getStatementType().equals(JobStatementType.PIPELINE)) {
|| jobStatement.getStatementType().equals(JobStatementType.PIPELINE)
|| jobStatement.getStatementType().equals(JobStatementType.EXECUTE_JAR)) {
hasSqlStatement = true;
}
}
Expand Down
11 changes: 5 additions & 6 deletions dinky-core/src/main/java/org/dinky/job/runner/JobJarRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.net.URL;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;

Expand Down Expand Up @@ -174,14 +175,12 @@ private Pipeline getPipeline(JobStatement jobStatement) {

private void submitNormal(JobStatement jobStatement) throws Exception {
JobClient jobClient = FlinkStreamEnvironmentUtil.executeAsync(
getPipeline(jobStatement), jobManager.getExecutor().getStreamExecutionEnvironment());
getPipeline(jobStatement), jobManager.getExecutor().getCustomTableEnvironment());
if (Asserts.isNotNull(jobClient)) {
jobManager.getJob().setJobId(jobClient.getJobID().toHexString());
jobManager.getJob().setJids(new ArrayList<String>() {
{
add(jobManager.getJob().getJobId());
}
});
jobManager
.getJob()
.setJids(Collections.singletonList(jobManager.getJob().getJobId()));
jobManager.getJob().setStatus(Job.JobStatus.SUCCESS);
} else {
jobManager.getJob().setStatus(Job.JobStatus.FAILED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,15 @@ export const SqlTask = memo((props: FlinkSqlProps & any) => {
}));
const [isRunning, setIsRunning] = useState<boolean>(false);

useEffect(() => {
if (sqlForm.enable) {
setSqlForm((prevState) => ({
...prevState,
initSqlStatement: currentState.statement
}));
}
}, [sqlForm.enable, currentState.statement]);

useAsyncEffect(async () => {
const taskDetail = await getTaskDetails(params.taskId);
if (taskDetail) {
Expand Down

0 comments on commit 062f553

Please sign in to comment.