Skip to content

Commit

Permalink
[BugFix][Flink] Fix some bugs that occurred when Flink was submitted …
Browse files Browse the repository at this point in the history
…in local mode (#3762)

Co-authored-by: zackyoungh <zackyoungh@users.noreply.github.com>
  • Loading branch information
zackyoungh and zackyoungh authored Aug 30, 2024
1 parent 5af58a1 commit 2c95957
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,29 @@

import java.net.URLStreamHandler;
import java.net.URLStreamHandlerFactory;
import java.util.Arrays;
import java.util.List;

import org.springframework.context.annotation.Profile;

import cn.hutool.core.lang.Singleton;
import cn.hutool.core.util.StrUtil;

@Profile("!test")
public class RsURLStreamHandlerFactory implements URLStreamHandlerFactory {
private final List<String> notContains = Arrays.asList("jar", "file");

@Override
public URLStreamHandler createURLStreamHandler(String protocol) {
if ("rs".equals(protocol)) {
return new RsURLStreamHandler();
}
for (String tempProtocol : notContains) {
if (tempProtocol.equals(StrUtil.sub(protocol, 0, tempProtocol.length()))) {
return null;
}
}

try {
Class.forName("org.apache.hadoop.fs.FsUrlStreamHandlerFactory");
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.List;
import java.util.stream.Collectors;

import cn.hutool.core.util.URLUtil;

public class ClusterDescriptorAdapterImpl extends ClusterDescriptorAdapter {

public ClusterDescriptorAdapterImpl() {}
Expand All @@ -41,7 +43,7 @@ public ClusterDescriptorAdapterImpl(YarnClusterDescriptor yarnClusterDescriptor)
@Override
public void addShipFiles(List<File> shipFiles) {
yarnClusterDescriptor.addShipFiles(shipFiles.stream()
.map(file -> new Path("file://" + file.getPath()))
.map(file -> new Path(URLUtil.getURL(file).toString()))
.collect(Collectors.toList()));
}

Expand Down
4 changes: 3 additions & 1 deletion dinky-core/src/main/java/org/dinky/job/JobConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,9 @@ public void setAddress(String address) {
if (colonIndex == -1) {
this.address = address + NetConstant.COLON + configJson.get(RestOptions.PORT.key());
} else {
this.address = address.replaceAll("(?<=:)\\d{0,6}$", configJson.get(RestOptions.PORT.key()));
String port =
configJson.getOrDefault(RestOptions.BIND_PORT.key(), configJson.get(RestOptions.PORT.key()));
this.address = address.replaceAll("(?<=:)\\d{0,6}$", port);
}
} else {
this.address = address;
Expand Down
12 changes: 0 additions & 12 deletions dinky-metadata/dinky-metadata-paimon/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,6 @@
<artifactId>paimon-s3</artifactId>
<version>0.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-s3-fs-presto</artifactId>
<version>${flink.version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2-uber -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
<version>2.8.3-10.0</version>
</dependency>

<!-- test dependencies -->
<dependency>
Expand Down

0 comments on commit 2c95957

Please sign in to comment.