diff --git a/spring-batch-integration/src/main/java/org/springframework/batch/integration/partition/MessageChannelPartitionHandler.java b/spring-batch-integration/src/main/java/org/springframework/batch/integration/partition/MessageChannelPartitionHandler.java index a5cc624196..f0c710c544 100644 --- a/spring-batch-integration/src/main/java/org/springframework/batch/integration/partition/MessageChannelPartitionHandler.java +++ b/spring-batch-integration/src/main/java/org/springframework/batch/integration/partition/MessageChannelPartitionHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2009-2023 the original author or authors. + * Copyright 2009-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -251,24 +251,22 @@ protected Set doHandle(StepExecution managerStepExecution, private Set pollReplies(final StepExecution managerStepExecution, final Set split) throws Exception { - final Set result = new HashSet<>(split.size()); + Set partitionStepExecutionIds = split.stream().map(StepExecution::getId).collect(Collectors.toSet()); Callable> callback = () -> { - Set currentStepExecutionIds = split.stream().map(StepExecution::getId).collect(Collectors.toSet()); JobExecution jobExecution = jobExplorer.getJobExecution(managerStepExecution.getJobExecutionId()); - jobExecution.getStepExecutions() + Set finishedStepExecutions = jobExecution.getStepExecutions() .stream() - .filter(stepExecution -> currentStepExecutionIds.contains(stepExecution.getId())) - .filter(stepExecution -> !result.contains(stepExecution)) + .filter(stepExecution -> partitionStepExecutionIds.contains(stepExecution.getId())) .filter(stepExecution -> !stepExecution.getStatus().isRunning()) - .forEach(result::add); + .collect(Collectors.toSet()); if (logger.isDebugEnabled()) { logger.debug(String.format("Currently waiting on %s partitions to finish", split.size())); } - if (result.size() == split.size()) { - return result; + if (finishedStepExecutions.size() == split.size()) { + return finishedStepExecutions; } else { return null;