Skip to content

Commit

Permalink
[Feature][UDF] udf develops modification logic (#3139)
Browse files Browse the repository at this point in the history
  • Loading branch information
zackyoungh authored Feb 4, 2024
1 parent 8af14c9 commit 6ae9fdb
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 3 deletions.
2 changes: 1 addition & 1 deletion dinky-admin/src/main/java/org/dinky/init/SystemInit.java
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ public static Project getProject() {
}

public void registerUDF() {
List<Task> allUDF = taskService.getAllUDF();
List<Task> allUDF = taskService.getReleaseUDF();
if (CollUtil.isNotEmpty(allUDF)) {
UdfCodePool.registerPool(allUDF.stream().map(UDFUtils::taskToUDF).collect(Collectors.toList()));
}
Expand Down
6 changes: 6 additions & 0 deletions dinky-admin/src/main/java/org/dinky/service/TaskService.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,12 @@ public interface TaskService extends ISuperService<Task> {
*/
List<Task> getAllUDF();

/**
* Get a list of all release user-defined functions (UDFs) in the system.
* @return A list of {@link Task} objects representing the release UDFs.
*/
List<Task> getReleaseUDF();

/**
* Get the API address of the given task.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,9 @@ public boolean changeTaskLifeRecyle(Integer taskId, JobLifeCycle lifeCycle) thro
if (lifeCycle == JobLifeCycle.PUBLISH) {
Integer taskVersionId = taskVersionService.createTaskVersionSnapshot(task);
task.setVersionId(taskVersionId);
UdfCodePool.addOrUpdate(UDFUtils.taskToUDF(task.buildTask()));
} else {
UdfCodePool.remove(task.getConfigJson().getUdfConfig().getClassName());
}
boolean saved = saveOrUpdate(task.buildTask());
if (saved && Asserts.isNotNull(task.getJobInstanceId())) {
Expand Down Expand Up @@ -613,7 +616,11 @@ public boolean saveOrUpdateTask(Task task) {
UdfCodePool.remove(task.getConfigJson().getUdfConfig().getClassName());
}
task.getConfigJson().getUdfConfig().setClassName(className);
UdfCodePool.addOrUpdate(UDFUtils.taskToUDF(task));
if (task.getStep().equals(JobLifeCycle.PUBLISH.getValue())) {
UdfCodePool.addOrUpdate(UDFUtils.taskToUDF(task));
} else {
UdfCodePool.remove(task.getConfigJson().getUdfConfig().getClassName());
}
}

return this.saveOrUpdate(task);
Expand Down Expand Up @@ -691,6 +698,15 @@ public List<Task> getAllUDF() {
.isNotNull("save_point_path"));
}

@Override
public List<Task> getReleaseUDF() {
return list(new LambdaQueryWrapper<Task>()
.in(Task::getDialect, Dialect.JAVA.getValue(), Dialect.SCALA.getValue(), Dialect.PYTHON.getValue())
.eq(Task::getEnabled, 1)
.eq(Task::getStep, JobLifeCycle.PUBLISH.getValue())
.isNotNull(Task::getSavePointPath));
}

@Override
public boolean rollbackTask(TaskRollbackVersionDTO dto) {
if (Asserts.isNull(dto.getVersionId()) || Asserts.isNull(dto.getTaskId())) {
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
<hamcrest.version>1.3</hamcrest.version>
<hibernate-validator.version>6.2.0.Final</hibernate-validator.version>
<httpclient.version>4.5.13</httpclient.version>
<hutool.version>5.8.24</hutool.version>
<hutool.version>5.8.25</hutool.version>
<jackson.version>2.14.1</jackson.version>
<jakarta.ws.rs-api.version>2.1.6</jakarta.ws.rs-api.version>
<jasypt.version>1.9.3</jasypt.version>
Expand Down

0 comments on commit 6ae9fdb

Please sign in to comment.