From 6b6b52e8510bc8e7e3e85d34f09e8753df1186bd Mon Sep 17 00:00:00 2001 From: Pandas886 Date: Wed, 20 Mar 2024 16:20:40 +0800 Subject: [PATCH] [Optimization] optimization execute jar params parse (#3277) Co-authored-by: Pandas886 --- .../dinky/trans/dml/ExecuteJarOperation.java | 32 +++++++++- .../trans/dml/ExecuteJarOperationTest.java | 60 +++++++++++++++++++ 2 files changed, 89 insertions(+), 3 deletions(-) create mode 100644 dinky-client/dinky-client-base/src/test/java/org/dinky/trans/dml/ExecuteJarOperationTest.java diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/dml/ExecuteJarOperation.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/dml/ExecuteJarOperation.java index d745144553..ac7bf1f3fb 100644 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/dml/ExecuteJarOperation.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/trans/dml/ExecuteJarOperation.java @@ -23,9 +23,9 @@ import org.dinky.trans.AbstractOperation; import org.dinky.trans.ExtendOperation; import org.dinky.trans.parse.ExecuteJarParseStrategy; -import org.dinky.utils.RunTimeUtil; import org.dinky.utils.URLUtils; +import org.apache.commons.lang.StringUtils; import org.apache.flink.api.dag.Pipeline; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.PackagedProgramUtils; @@ -37,7 +37,10 @@ import java.io.File; import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Optional; @@ -99,7 +102,7 @@ public static StreamGraph getStreamGraph( .setEntryPointClassName(submitParam.getMainClass()) .setConfiguration(configuration) .setSavepointRestoreSettings(savepointRestoreSettings) - .setArguments(RunTimeUtil.handleCmds(submitParam.getArgs())) + .setArguments(extractArgs(submitParam.getArgs().trim()).toArray(new String[0])) .setUserClassPaths(classpaths) .build(); int parallelism = StrUtil.isNumeric(submitParam.getParallelism()) @@ -114,6 +117,30 @@ public static StreamGraph getStreamGraph( } } + public static List extractArgs(String args) { + List programArgs = new ArrayList<>(); + if (StringUtils.isNotEmpty(args)) { + String[] array = args.split("\\s+"); + Iterator iter = Arrays.asList(array).iterator(); + while (iter.hasNext()) { + String v = iter.next(); + String p = v.substring(0, 1); + if (p.equals("'") || p.equals("\"")) { + String value = v; + if (!v.endsWith(p)) { + while (!value.endsWith(p) && iter.hasNext()) { + value += " " + iter.next(); + } + } + programArgs.add(value.substring(1, value.length() - 1)); + } else { + programArgs.add(v); + } + } + } + return programArgs; + } + @Override public String asSummaryString() { return statement; @@ -142,7 +169,6 @@ protected JarSubmitParam() {} public static JarSubmitParam build(String statement) { JarSubmitParam submitParam = ExecuteJarParseStrategy.getInfo(statement); Assert.notBlank(submitParam.getUri()); - Assert.notBlank(submitParam.getMainClass()); return submitParam; } } diff --git a/dinky-client/dinky-client-base/src/test/java/org/dinky/trans/dml/ExecuteJarOperationTest.java b/dinky-client/dinky-client-base/src/test/java/org/dinky/trans/dml/ExecuteJarOperationTest.java new file mode 100644 index 0000000000..4a6e175316 --- /dev/null +++ b/dinky-client/dinky-client-base/src/test/java/org/dinky/trans/dml/ExecuteJarOperationTest.java @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.trans.dml; + +import static org.junit.jupiter.api.Assertions.*; + +import java.util.List; + +import org.junit.Assert; +import org.junit.jupiter.api.Test; + +import com.google.common.collect.Lists; + +class ExecuteJarOperationTest { + + public static final List RESULT1 = Lists.newArrayList( + "merge_into", + "--warehouse", + "hdfs:///tmp/paimon", + "--database", + "default", + "--table", + "T", + "--source_table", + "S", + "--on", + "T.id = S.order_id", + "--merge_actions", + "matched-upsert,matched-delete", + "--matched_upsert_condition", + "T.price > 100", + "--matched_upsert_set", + "mark = 'important'", + "--matched_delete_condition", + "T.price < 10"); + + @Test + void extractArgs() { + List args1 = ExecuteJarOperation.extractArgs( + "merge_into --warehouse hdfs:///tmp/paimon --database default --table T --source_table S --on \"T.id = S.order_id\" --merge_actions matched-upsert,matched-delete --matched_upsert_condition \"T.price > 100\" --matched_upsert_set \"mark = 'important'\" --matched_delete_condition \"T.price < 10\""); + Assert.assertArrayEquals(args1.toArray(new String[0]), RESULT1.toArray(new String[0])); + } +}