Skip to content

Commit

Permalink
[issue_1045][taier-schedule] self-dependence day tasks fill data lose… (
Browse files Browse the repository at this point in the history
#1046)

… dependencies #1045
  • Loading branch information
vainhope authored Apr 17, 2023
1 parent 151b262 commit 726c356
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

package com.dtstack.taier.scheduler.server.builder;

import com.dtstack.taier.common.enums.EScheduleJobType;
import com.dtstack.taier.common.enums.Deleted;
import com.dtstack.taier.common.enums.EScheduleJobType;
import com.dtstack.taier.common.enums.Restarted;
import com.dtstack.taier.common.env.EnvironmentContext;
import com.dtstack.taier.common.exception.TaierDefineException;
Expand All @@ -32,8 +32,8 @@
import com.dtstack.taier.scheduler.server.ScheduleJobDetails;
import com.dtstack.taier.scheduler.server.builder.cron.ScheduleConfManager;
import com.dtstack.taier.scheduler.server.builder.cron.ScheduleCorn;
import com.dtstack.taier.scheduler.server.builder.dependency.JobDependency;
import com.dtstack.taier.scheduler.server.builder.dependency.DependencyManager;
import com.dtstack.taier.scheduler.server.builder.dependency.JobDependency;
import com.dtstack.taier.scheduler.service.ScheduleActionService;
import com.dtstack.taier.scheduler.service.ScheduleJobService;
import com.dtstack.taier.scheduler.service.ScheduleTaskShadeService;
Expand Down Expand Up @@ -157,6 +157,9 @@ public List<ScheduleJobDetails> buildJob(ScheduleTaskShade batchTaskShade, Strin
* @return 名称
*/
private String getName(ScheduleTaskShade scheduleTaskShade, String name, String cycTime) {
if (StringUtils.isBlank(name)) {
return getPrefix() + "_" + scheduleTaskShade.getName() + "_" + cycTime;
}
return getPrefix() + "_" + name + "_" + scheduleTaskShade.getName() + "_" + cycTime;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -75,7 +76,6 @@ public class FillDataJobBuilder extends AbstractJobBuilder {
* @param endDay 每天时间范围 结束范围
* @throws Exception
*/
@Transactional(rollbackFor = Exception.class)
public void createFillJob(Set<Long> all, Set<Long> run, Long fillId, String fillName, String beginTime, String endTime,
String startDay, String endDay) throws Exception {
Date startDate = DateUtil.parseDate(startDay, DateUtil.DATE_FORMAT, Locale.CHINA);
Expand All @@ -102,54 +102,52 @@ public void createFillJob(Set<Long> all, Set<Long> run, Long fillId, String fill
* @param endTime 每天时间范围 结束范围
* @throws Exception
*/
@Transactional(rollbackFor = Exception.class)
public void buildFillDataJobGraph(String fillName, Long fillId, Set<Long> all, Set<Long> run, String triggerDay,
String beginTime, String endTime) throws Exception {
String beginTime, String endTime) throws Exception {
List<Long> allList = Lists.newArrayList(all);
List<List<Long>> partition = Lists.partition(allList, environmentContext.getJobGraphTaskLimitSize());
AtomicJobSortWorker sortWorker = new AtomicJobSortWorker();
List<ScheduleJobDetails> saveList = Lists.newArrayList();
CompletableFuture.allOf(partition.stream()
.map(taskKey ->
CompletableFuture.runAsync(() ->
fillTaskPartition(fillName, fillId, run, triggerDay, beginTime, endTime, allList, sortWorker, saveList, taskKey),
jobGraphBuildPool))
.toArray(CompletableFuture[]::new)).thenAccept(a -> savaFillJob(saveList)).join();
}

for (List<Long> taskKey : partition) {
jobGraphBuildPool.submit(() -> {
try {
List<ScheduleJobDetails> saveList = Lists.newArrayList();
for (Long taskId : taskKey) {
try {
ScheduleTaskShade scheduleTaskShade = scheduleTaskService
.lambdaQuery()
.eq(ScheduleTaskShade::getTaskId, taskId)
.eq(ScheduleTaskShade::getIsDeleted, Deleted.NORMAL.getStatus())
.one();

if (scheduleTaskShade != null) {
List<ScheduleJobDetails> jobBuilderBeanList = Lists.newArrayList();
// 非工作流任务子任务
if (scheduleTaskShade.getFlowId() == 0) {
// 生成补数据实例
jobBuilderBeanList = RetryUtil.executeWithRetry(() -> buildJob(scheduleTaskShade, fillName, triggerDay, beginTime, endTime, fillId, sortWorker),
environmentContext.getBuildJobErrorRetry(), 200, false);
} else {
Long flowId = scheduleTaskShade.getFlowId();
if (!allList.contains(flowId)) {
// 生成周期实例
jobBuilderBeanList = RetryUtil.executeWithRetry(() -> buildJob(scheduleTaskShade, fillName, triggerDay, beginTime, beginTime, fillId, sortWorker),
environmentContext.getBuildJobErrorRetry(), 200, false);
}
}

for (ScheduleJobDetails jobBuilderBean : jobBuilderBeanList) {
addMap(run, saveList, taskId, jobBuilderBean);
}
}
} catch (Exception e) {
LOGGER.error("taskKey : {} error:", taskId, e);
private void fillTaskPartition(String fillName, Long fillId, Set<Long> run, String triggerDay, String beginTime, String endTime, List<Long> allList, AtomicJobSortWorker sortWorker, List<ScheduleJobDetails> saveList, List<Long> taskKey) {
for (Long taskId : taskKey) {
try {
ScheduleTaskShade scheduleTaskShade = scheduleTaskService
.lambdaQuery()
.eq(ScheduleTaskShade::getTaskId, taskId)
.eq(ScheduleTaskShade::getIsDeleted, Deleted.NORMAL.getStatus())
.one();

if (scheduleTaskShade != null) {
List<ScheduleJobDetails> jobBuilderBeanList = Lists.newArrayList();
// 非工作流任务子任务
if (scheduleTaskShade.getFlowId() == 0) {
// 生成补数据实例
jobBuilderBeanList = RetryUtil.executeWithRetry(() -> buildJob(scheduleTaskShade, fillName, triggerDay, beginTime, endTime, fillId, sortWorker),
environmentContext.getBuildJobErrorRetry(), 200, false);
} else {
Long flowId = scheduleTaskShade.getFlowId();
if (!allList.contains(flowId)) {
// 生成周期实例
jobBuilderBeanList = RetryUtil.executeWithRetry(() -> buildJob(scheduleTaskShade, fillName, triggerDay, beginTime, endTime, fillId, sortWorker),
environmentContext.getBuildJobErrorRetry(), 200, false);
}
}
savaFillJob(saveList);
} catch (Exception e) {
LOGGER.error("fill error:", e);

for (ScheduleJobDetails jobBuilderBean : jobBuilderBeanList) {
addMap(run, saveList, taskId, jobBuilderBean);
}
}
});
} catch (Exception e) {
LOGGER.error("taskKey : {} error:", taskId, e);
}
}
}

Expand Down Expand Up @@ -178,7 +176,8 @@ private void addMap(Set<Long> run, List<ScheduleJobDetails> saveList, Long taskI
*
* @param allJobList 所有集合
*/
private void savaFillJob(List<ScheduleJobDetails> allJobList) {
@Transactional(rollbackFor = Exception.class)
public void savaFillJob(List<ScheduleJobDetails> allJobList) {
scheduleJobService.insertJobList(allJobList, EScheduleType.FILL_DATA.getType());
Set<ScheduleJobOperatorRecord> operatorJobIds = allJobList
.stream()
Expand Down

0 comments on commit 726c356

Please sign in to comment.