Skip to content

Commit

Permalink
Keep heap lean during remote partition polling
Browse files Browse the repository at this point in the history
Resolves #4598
  • Loading branch information
hpoettker authored and fmbenhassine committed May 21, 2024
1 parent b94f7cf commit 93fd2c9
Showing 1 changed file with 7 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2009-2023 the original author or authors.
* Copyright 2009-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -251,24 +251,22 @@ protected Set<StepExecution> doHandle(StepExecution managerStepExecution,

private Set<StepExecution> pollReplies(final StepExecution managerStepExecution, final Set<StepExecution> split)
throws Exception {
final Set<StepExecution> result = new HashSet<>(split.size());
Set<Long> partitionStepExecutionIds = split.stream().map(StepExecution::getId).collect(Collectors.toSet());

Callable<Set<StepExecution>> callback = () -> {
Set<Long> currentStepExecutionIds = split.stream().map(StepExecution::getId).collect(Collectors.toSet());
JobExecution jobExecution = jobExplorer.getJobExecution(managerStepExecution.getJobExecutionId());
jobExecution.getStepExecutions()
Set<StepExecution> finishedStepExecutions = jobExecution.getStepExecutions()
.stream()
.filter(stepExecution -> currentStepExecutionIds.contains(stepExecution.getId()))
.filter(stepExecution -> !result.contains(stepExecution))
.filter(stepExecution -> partitionStepExecutionIds.contains(stepExecution.getId()))
.filter(stepExecution -> !stepExecution.getStatus().isRunning())
.forEach(result::add);
.collect(Collectors.toSet());

if (logger.isDebugEnabled()) {
logger.debug(String.format("Currently waiting on %s partitions to finish", split.size()));
}

if (result.size() == split.size()) {
return result;
if (finishedStepExecutions.size() == split.size()) {
return finishedStepExecutions;
}
else {
return null;
Expand Down

0 comments on commit 93fd2c9

Please sign in to comment.