Skip to content

Commit

Permalink
Merge branch 'dev' into edit_width
Browse files Browse the repository at this point in the history
  • Loading branch information
SbloodyS authored Nov 15, 2024
2 parents 1d45e6f + 9a64534 commit 54b3ac6
Show file tree
Hide file tree
Showing 23 changed files with 148 additions and 55 deletions.
12 changes: 6 additions & 6 deletions docs/docs/en/contribute/development-environment-setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ Use different Git branch to develop different codes

### Start backend server

Find the class `org.apache.dolphinscheduler.StandaloneServer` in Intellij IDEA and clikc run main function to startup.
Find the class `org.apache.dolphinscheduler.StandaloneServer` in IntelliJ IDEA and clikc run main function to startup.

> Note: Please check the option `Add dependencies with "provided" scope to classpath` in the startup configuration before starting, so as to avoid the problem that no dependencies can be found during startup.
Expand Down Expand Up @@ -176,7 +176,7 @@ Following steps will guide how to start the DolphinScheduler backend service
##### Backend Start Prepare
- Open project: Use IDE open the project, here we use Intellij IDEA as an example, after opening it will take a while for Intellij IDEA to complete the dependent download
- Open project: Use IDE open the project, here we use IntelliJ IDEA as an example, after opening it will take a while for IntelliJ IDEA to complete the dependent download
- File change
Expand Down Expand Up @@ -211,15 +211,15 @@ spring:
</root>
```
> **_Note:_** Only DolphinScheduler 2.0 and later versions need to inatall plugin before start server. It not need before version 2.0.
> **_Note:_** Only DolphinScheduler 2.0 and later versions need to install plugin before start server. It doesn't need it before version 2.0.

##### Server start

There are three services that need to be started, including MasterServer, WorkerServer, ApiApplicationServer.

- MasterServer:Execute function `main` in the class `org.apache.dolphinscheduler.server.master.MasterServer` by Intellij IDEA, with the configuration _VM Options_ `-Dlogging.config=classpath:logback-spring.xml -Ddruid.mysql.usePingMethod=false -Dspring.profiles.active=mysql`
- WorkerServer:Execute function `main` in the class `org.apache.dolphinscheduler.server.worker.WorkerServer` by Intellij IDEA, with the configuration _VM Options_ `-Dlogging.config=classpath:logback-spring.xml -Ddruid.mysql.usePingMethod=false -Dspring.profiles.active=mysql`
- ApiApplicationServer:Execute function `main` in the class `org.apache.dolphinscheduler.api.ApiApplicationServer` by Intellij IDEA, with the configuration _VM Options_ `-Dlogging.config=classpath:logback-spring.xml -Dspring.profiles.active=api,mysql`. After it started, you could find Open API documentation in http://localhost:12345/dolphinscheduler/swagger-ui/index.html
- MasterServer:Execute function `main` in the class `org.apache.dolphinscheduler.server.master.MasterServer` by IntelliJ IDEA, with the configuration _VM Options_ `-Dlogging.config=classpath:logback-spring.xml -Ddruid.mysql.usePingMethod=false -Dspring.profiles.active=mysql`
- WorkerServer:Execute function `main` in the class `org.apache.dolphinscheduler.server.worker.WorkerServer` by IntelliJ IDEA, with the configuration _VM Options_ `-Dlogging.config=classpath:logback-spring.xml -Ddruid.mysql.usePingMethod=false -Dspring.profiles.active=mysql`
- ApiApplicationServer:Execute function `main` in the class `org.apache.dolphinscheduler.api.ApiApplicationServer` by IntelliJ IDEA, with the configuration _VM Options_ `-Dlogging.config=classpath:logback-spring.xml -Dspring.profiles.active=api,mysql`. After it started, you could find Open API documentation in http://localhost:12345/dolphinscheduler/swagger-ui/index.html

> The `mysql` in the VM Options `-Dspring.profiles.active=mysql` means specified configuration file

Expand Down
6 changes: 3 additions & 3 deletions docs/docs/en/contribute/release.md
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ If there is any problem in gpg signature, `Close` will fail, but you can see the
#### Checkout Dolphinscheduler Release Directory
We need too checkout Dolphinscheduler dev release directory to local, and
We need to checkout Dolphinscheduler dev release directory to local, and
```shell
SVN_DIR_DEV="${SVN_DIR}/dolphinscheduler/dev"
Expand All @@ -305,8 +305,8 @@ svn --username="${A_USERNAME}" update "${SVN_DIR_DEV}"
#### Export New GPG Key to KEYS(Optional)
Only if the first time you release with this gpg KEY, including it is you first release, or you change your KEY. You should
change working directory to another one because this step need checkout and change KEYS in release directory.
Only if the first time you release with this gpg KEY, including it is your first release, or you change your KEY. You should
change the working directory to another one because this step needs checkout and change KEYS in the release directory.
```shell
# Optional, only if the SVN root path not exists.
Expand Down
11 changes: 5 additions & 6 deletions docs/docs/en/guide/task/seatunnel.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,16 @@ Click [here](https://seatunnel.apache.org/) for more information about `Apache S
- SEATUNNEL_ENGINE
- Deployment mode: specify the deployment mode, `cluster` `local`

> Click [here](https://seatunnel.apache.org/docs/2.3.3/command/usage) for more information on the usage of

`Apache SeaTunnel command`
> Click [here](https://seatunnel.apache.org/docs/command/usage) for more information on the usage of Apache SeaTunnel command`
- Custom Configuration: Supports custom configuration or select configuration file from Resource Center

> Click [here](https://seatunnel.apache.org/docs/2.3.3/concept/config) for more information about `Apache
>
>> SeaTunnel config` file
> Click [here](https://seatunnel.apache.org/docs/concept/config) for more information about `Apache SeaTunnel config` file
- Script: Customize configuration information on the task node, including four parts: `env` `source` `transform` `sink`
- Custom Parameters/Global Parameters: When custom parameters/global parameters are defined, the parameters will be passed to the SeaTunnel task, and the parameter value can be dynamically replaced during task execution by referencing the parameter with `${}` in the SeaTunnel task.

> Click [here](https://seatunnel.apache.org/docs/concept/config/#config-variable-substitution) for more information on `Apache SeaTunnel variable substitution`
## Task Example

Expand Down
10 changes: 5 additions & 5 deletions docs/docs/zh/contribute/development-environment-setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ DolphinScheduler 开发环境配置有两个方式,分别是standalone模式

### 启动后端

Intellij IDEA 找到并启动类 `org.apache.dolphinscheduler.StandaloneServer` 即可完成后端启动
IntelliJ IDEA 找到并启动类 `org.apache.dolphinscheduler.StandaloneServer` 即可完成后端启动

> 注意:启动前请在启动配置里将 `Add dependencies with "provided" scope to classpath` 选项勾选上,这样可以避免启动时找不到依赖的问题
Expand Down Expand Up @@ -170,7 +170,7 @@ DolphinScheduler 的元数据存储在关系型数据库中,目前支持的关

##### 必要的准备工作

* 打开项目:使用开发工具打开项目,这里以 Intellij IDEA 为例,打开后需要一段时间,让 Intellij IDEA 完成以依赖的下载
* 打开项目:使用开发工具打开项目,这里以 IntelliJ IDEA 为例,打开后需要一段时间,让 IntelliJ IDEA 完成以依赖的下载

* 必要的修改

Expand Down Expand Up @@ -208,9 +208,9 @@ DolphinScheduler 的元数据存储在关系型数据库中,目前支持的关

我们需要启动三个服务,包括 MasterServer,WorkerServer,ApiApplicationServer

* MasterServer:在 Intellij IDEA 中执行 `org.apache.dolphinscheduler.server.master.MasterServer` 中的 `main` 方法,并配置 *VM Options* `-Dlogging.config=classpath:logback-spring.xml -Ddruid.mysql.usePingMethod=false -Dspring.profiles.active=mysql`
* WorkerServer:在 Intellij IDEA 中执行 `org.apache.dolphinscheduler.server.worker.WorkerServer` 中的 `main` 方法,并配置 *VM Options* `-Dlogging.config=classpath:logback-spring.xml -Ddruid.mysql.usePingMethod=false -Dspring.profiles.active=mysql`
* ApiApplicationServer:在 Intellij IDEA 中执行 `org.apache.dolphinscheduler.api.ApiApplicationServer` 中的 `main` 方法,并配置 *VM Options* `-Dlogging.config=classpath:logback-spring.xml -Dspring.profiles.active=api,mysql`。启动完成可以浏览 Open API 文档,地址为 http://localhost:12345/dolphinscheduler/swagger-ui/index.html
* MasterServer:在 IntelliJ IDEA 中执行 `org.apache.dolphinscheduler.server.master.MasterServer` 中的 `main` 方法,并配置 *VM Options* `-Dlogging.config=classpath:logback-spring.xml -Ddruid.mysql.usePingMethod=false -Dspring.profiles.active=mysql`
* WorkerServer:在 IntelliJ IDEA 中执行 `org.apache.dolphinscheduler.server.worker.WorkerServer` 中的 `main` 方法,并配置 *VM Options* `-Dlogging.config=classpath:logback-spring.xml -Ddruid.mysql.usePingMethod=false -Dspring.profiles.active=mysql`
* ApiApplicationServer:在 IntelliJ IDEA 中执行 `org.apache.dolphinscheduler.api.ApiApplicationServer` 中的 `main` 方法,并配置 *VM Options* `-Dlogging.config=classpath:logback-spring.xml -Dspring.profiles.active=api,mysql`。启动完成可以浏览 Open API 文档,地址为 http://localhost:12345/dolphinscheduler/swagger-ui/index.html

> VM Options `-Dspring.profiles.active=mysql``mysql` 表示指定的配置文件

Expand Down
7 changes: 5 additions & 2 deletions docs/docs/zh/guide/task/seatunnel.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,16 @@
- SEATUNNEL_ENGINE
- 部署方式:指定部署模式,`cluster` `local`

> 点击 [这里](https://seatunnel.apache.org/docs/2.3.3/command/usage) 获取更多关于`Apache SeaTunnel command` 使用的信息
> 点击 [这里](https://seatunnel.apache.org/docs/command/usage) 获取更多关于`Apache SeaTunnel command` 使用的信息
- 自定义配置:支持自定义配置或从资源中心选择配置文件

> 点击 [这里](https://seatunnel.apache.org/docs/2.3.3/concept/config) 获取更多关于`Apache SeaTunnel config` 文件介绍
> 点击 [这里](https://seatunnel.apache.org/docs/concept/config) 获取更多关于`Apache SeaTunnel config` 文件介绍
- 脚本:在任务节点那自定义配置信息,包括四部分:`env` `source` `transform` `sink`
- 自定义参数/全局参数: 当定义了自定义参数/全局参数时, 会将该参数传递给SeaTunnel任务, 可以在SeaTunnel任务中通过`${}`引用该参数, 从而在任务运行时动态替换参数值.

> 点击 [这里](https://seatunnel.apache.org/docs/concept/config/#config-variable-substitution) 获取更多关于`Apache SeaTunnel 变量替换` 使用的信息
## 任务样例

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
-XX:+IgnoreUnrecognizedVMOptions
-XX:+PrintGCDateStamps
-XX:+PrintGCDetails
-Xloggc:gc.log
-Xloggc:gc-%t.log

-XX:-OmitStackTraceInFastThrow
-XX:+ExitOnOutOfMemoryError
Expand Down
2 changes: 1 addition & 1 deletion dolphinscheduler-api/src/main/bin/jvm_args_env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
-XX:+IgnoreUnrecognizedVMOptions
-XX:+PrintGCDateStamps
-XX:+PrintGCDetails
-Xloggc:gc.log
-Xloggc:gc-%t.log

-XX:-OmitStackTraceInFastThrow
-XX:+ExitOnOutOfMemoryError
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public Result createProjectParameter(User loginUser, long projectCode, String pr
.code(CodeGenerateUtils.genCode())
.projectCode(projectCode)
.userId(loginUser.getId())
.operator(loginUser.getId())
.createTime(now)
.updateTime(now)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ public void testCreateProjectParameter() {
result = projectParameterService.createProjectParameter(loginUser, projectCode, "key1", "value",
DataType.VARCHAR.name());
assertEquals(Status.SUCCESS.getCode(), result.getCode());

ProjectParameter projectParameter = (ProjectParameter) result.getData();
assertEquals(loginUser.getId(), projectParameter.getOperator());
}

@Test
Expand Down
2 changes: 1 addition & 1 deletion dolphinscheduler-master/src/main/bin/jvm_args_env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
-XX:+IgnoreUnrecognizedVMOptions
-XX:+PrintGCDateStamps
-XX:+PrintGCDetails
-Xloggc:gc.log
-Xloggc:gc-%t.log

-XX:-OmitStackTraceInFastThrow
-XX:+ExitOnOutOfMemoryError
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public void start() {
this.registryClient.subscribe(RegistryNodeType.MASTER.getRegistryPath(), masterClusters);
this.registryClient.subscribe(RegistryNodeType.WORKER.getRegistryPath(), workerClusters);
this.workerGroupChangeNotifier.subscribeWorkerGroupsChange(workerClusters);
this.workerGroupChangeNotifier.start();
log.info("ClusterManager started...");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import org.apache.dolphinscheduler.common.utils.MapComparator;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.repository.WorkerGroupDao;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.utils.MasterThreadFactory;

import org.apache.commons.collections4.CollectionUtils;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -34,6 +34,7 @@

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
Expand All @@ -43,7 +44,8 @@
@Component
public class WorkerGroupChangeNotifier {

private static final long DEFAULT_REFRESH_WORKER_INTERVAL = Duration.ofMinutes(1).toMillis();
@Autowired
private MasterConfig masterConfig;

private final WorkerGroupDao workerGroupDao;
private final List<WorkerGroupListener> listeners = new CopyOnWriteArrayList<>();
Expand All @@ -52,11 +54,15 @@ public class WorkerGroupChangeNotifier {

public WorkerGroupChangeNotifier(WorkerGroupDao workerGroupDao) {
this.workerGroupDao = workerGroupDao;
}

public void start() {
detectWorkerGroupChanges();
final long workerGroupRefreshIntervalSeconds = masterConfig.getWorkerGroupRefreshInterval().getSeconds();
MasterThreadFactory.getDefaultSchedulerThreadExecutor().scheduleWithFixedDelay(
this::detectWorkerGroupChanges,
DEFAULT_REFRESH_WORKER_INTERVAL,
DEFAULT_REFRESH_WORKER_INTERVAL,
workerGroupRefreshIntervalSeconds,
workerGroupRefreshIntervalSeconds,
TimeUnit.SECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ void doDispatch() {
// If dispatch failed, will put the task back to the queue
// The task will be dispatched after waiting time.
// the waiting time will increase multiple of times, but will not exceed 60 seconds
long waitingTimeMills = Math.max(
long waitingTimeMills = Math.min(
taskExecutionRunnable.getTaskExecutionContext().increaseDispatchFailTimes() * 1_000L, 60_000L);
globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(taskExecutionRunnable,
waitingTimeMills);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
-XX:+IgnoreUnrecognizedVMOptions
-XX:+PrintGCDateStamps
-XX:+PrintGCDetails
-Xloggc:gc.log
-Xloggc:gc-%t.log

-XX:-OmitStackTraceInFastThrow
-XX:+ExitOnOutOfMemoryError
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.security.UserGroupInformation;

Expand Down Expand Up @@ -235,10 +233,12 @@ public List<StorageEntity> listFileStorageEntityRecursively(String resourceAbsol
if (!fs.exists(path)) {
continue;
}
RemoteIterator<LocatedFileStatus> remoteIterator = fs.listFiles(path, true);
while (remoteIterator.hasNext()) {
LocatedFileStatus locatedFileStatus = remoteIterator.next();
result.add(transformFileStatusToResourceMetadata(locatedFileStatus));
FileStatus[] fileStatuses = fs.listStatus(path);
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isDirectory()) {
foldersToFetch.addLast(fileStatus.getPath().toString());
}
result.add(transformFileStatusToResourceMetadata(fileStatus));
}
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,10 +296,14 @@ public void testListStorageEntityRecursively_directory() {
String resourceFileAbsolutePath = "file:" + baseDir;
List<StorageEntity> storageEntities =
storageOperator.listFileStorageEntityRecursively(resourceFileAbsolutePath);
assertThat(storageEntities.size()).isEqualTo(1);

StorageEntity storageEntity2 = storageEntities.get(0);
assertThat(storageEntity2.getFullName()).isEqualTo("file:" + Paths.get(baseDir, "sqlDirectory", "demo.sql"));
assertThat(storageEntities.size()).isEqualTo(3);

StorageEntity storageEntity2 = storageEntities.stream()
.filter(storageEntity -> storageEntity.getFileName().equals("demo.sql"))
.findFirst()
.get();
assertThat(storageEntity2.getFullName())
.isEqualTo("file:" + Paths.get(baseDir, "sqlDirectory", "demo.sql"));
assertThat(storageEntity2.getFileName()).isEqualTo("demo.sql");
assertThat(storageEntity2.getPfullName()).isEqualTo("file:" + Paths.get(baseDir, "sqlDirectory"));
assertThat(storageEntity2.isDirectory()).isFalse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
Expand All @@ -45,6 +46,7 @@
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -169,9 +171,35 @@ protected List<String> buildOptions() throws Exception {
String filePath = buildConfigFilePath();
createConfigFileIfNotExists(scriptContent, filePath);
args.add(filePath);
args.addAll(generateTaskParameters());
return args;
}

private List<String> generateTaskParameters() {
Map<String, String> variables = new HashMap<>();
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
List<Property> propertyList = JSONUtils.toList(taskExecutionContext.getGlobalParams(), Property.class);
if (propertyList != null && !propertyList.isEmpty()) {
for (Property property : propertyList) {
variables.put(property.getProp(), paramsMap.get(property.getProp()).getValue());
}
}
List<Property> localParams = this.seatunnelParameters.getLocalParams();
if (localParams != null && !localParams.isEmpty()) {
for (Property property : localParams) {
if (property.getDirect().equals(Direct.IN)) {
variables.put(property.getProp(), paramsMap.get(property.getProp()).getValue());
}
}
}
List<String> parameters = new ArrayList<>();
variables.forEach((k, v) -> {
parameters.add("-i");
parameters.add(String.format("%s='%s'", k, v));
});
return parameters;
}

private String buildCustomConfigContent() {
log.info("raw custom config content : {}", seatunnelParameters.getRawScript());
String script = seatunnelParameters.getRawScript().replaceAll("\\r\\n", System.lineSeparator());
Expand Down
Loading

0 comments on commit 54b3ac6

Please sign in to comment.