Skip to content

Commit

Permalink
IGNITE-24175 Rework to syncRunningJobs map
Browse files Browse the repository at this point in the history
  • Loading branch information
nizhikov committed Jan 9, 2025
1 parent 8b9dff4 commit febd532
Showing 1 changed file with 19 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ public GridJobProcessor(GridKernalContext ctx) {
metricsUpdateFreq = ctx.config().getMetricsUpdateFrequency();

syncRunningJobs = new ConcurrentHashMap<>();

activeJobs = initJobsMap(jobAlwaysActivate);

passiveJobs = jobAlwaysActivate ? null : new JobsMap(1024, 0.75f, 256);
Expand Down Expand Up @@ -379,7 +380,7 @@ public GridJobProcessor(GridKernalContext ctx) {
Arrays.asList(activeJobs, syncRunningJobs, passiveJobs, cancelledJobs),
ConcurrentMap::entrySet,
(map, e) -> {
ComputeJobState state = map == activeJobs ? ComputeJobState.ACTIVE :
ComputeJobState state = (map == activeJobs || map == syncRunningJobs) ? ComputeJobState.ACTIVE :
(map == passiveJobs ? ComputeJobState.PASSIVE : ComputeJobState.CANCELED);

return new ComputeJobView(e.getKey(), e.getValue(), state);
Expand Down Expand Up @@ -432,6 +433,8 @@ public GridJobProcessor(GridKernalContext ctx) {
/** {@inheritDoc} */
@Override public void stop(boolean cancel) {
// Clear collections.
syncRunningJobs = new ConcurrentHashMap<>();

activeJobs = initJobsMap(jobAlwaysActivate);

activeJobsMetric.reset();
Expand Down Expand Up @@ -802,10 +805,16 @@ public void cancelJob(@Nullable final IgniteUuid sesId, @Nullable final IgniteUu
cancelPassiveJob(job);
}
}

for (GridJobWorker job : activeJobs.values()) {
if (idsMatch.test(job))
cancelActiveJob(job, sys);
}

for (GridJobWorker job : syncRunningJobs.values()) {
if (idsMatch.test(job))
cancelJob(job, sys);
}
}
else {
if (!jobAlwaysActivate) {
Expand All @@ -817,8 +826,16 @@ public void cancelJob(@Nullable final IgniteUuid sesId, @Nullable final IgniteUu

GridJobWorker activeJob = activeJobs.get(jobId);

if (activeJob != null && idsMatch.test(activeJob))
if (activeJob != null && idsMatch.test(activeJob)) {
cancelActiveJob(activeJob, sys);

return;
}

activeJob = syncRunningJobs.get(jobId);

if (activeJob != null && idsMatch.test(activeJob))
cancelJob(activeJob, sys);
}
}
finally {
Expand Down

0 comments on commit febd532

Please sign in to comment.