Skip to content

Commit

Permalink
IGNITE-23655 Added testIdleVerifyCancelCommandOnCheckpoint, it passes…
Browse files Browse the repository at this point in the history
…. Also added testIdleVerifyTrackedForkJoinPool that waits on beforeCancelLatch.await(). Overriden submit method for custom ForkJoinPool is not reached although pool.submit in calculatePartitionHashAsync is reached.
  • Loading branch information
vladnovoren committed Dec 26, 2024
1 parent 188a7d7 commit 38b3245
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -99,6 +101,7 @@
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.persistence.CheckpointState;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
import org.apache.ignite.internal.processors.cache.persistence.db.IgniteCacheGroupsWithRestartsTest;
import org.apache.ignite.internal.processors.cache.persistence.diagnostic.pagelocktracker.dumpprocessors.ToFileDumpProcessor;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
Expand Down Expand Up @@ -232,9 +235,9 @@ public class GridCommandHandlerTest extends GridCommandHandlerClusterPerMethodAb
@Override protected void beforeTest() throws Exception {
super.beforeTest();

initDiagnosticDir();

cleanPersistenceDir();

initDiagnosticDir();
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -774,13 +777,43 @@ public void testIdleVerifyOnInactiveClusterWithPersistence() throws Exception {
*
*/
@Test
public void testIdleVerifyCancelCommand() throws Exception {
public void testIdleVerifyCancelCommandOnCheckpoint() throws Exception {
final int gridsCnt = 4;

IgniteEx srv = startGrids(gridsCnt);

srv.cluster().state(ACTIVE);

CountDownLatch beforeCancelLatch = new CountDownLatch(1);

CountDownLatch afterCancelLatch = new CountDownLatch(1);

GridCacheDatabaseSharedManager dbMgr =
(GridCacheDatabaseSharedManager)grid(1).context().cache().context().database();

dbMgr.addCheckpointListener(new CheckpointListener() {
@Override public void beforeCheckpointBegin(Context ctx) {
if (ctx.progress().reason().equals("VerifyBackupPartitions"))
beforeCancelLatch.countDown();
}

@Override public void afterCheckpointEnd(Context ctx) throws IgniteCheckedException {
if (ctx.progress().reason().equals("VerifyBackupPartitions"))
try {
afterCancelLatch.await();
} catch (InterruptedException e) {
throw new IgniteInterruptedCheckedException(e);
}
}

@Override public void onMarkCheckpointBegin(Context ctx) {
}

@Override public void onCheckpointBegin(Context ctx) {

}
});

LogListener idleVerifyCancelListener = LogListener.matches("Idle verify was cancelled.").build();

LogListener verifyBackupCancelListener = LogListener.matches("Cancel request sent to VerifyBackupPartitionsJobV2.").build();
Expand All @@ -796,12 +829,102 @@ public void testIdleVerifyCancelCommand() throws Exception {

IgniteInternalFuture<Integer> idleVerifyFut = GridTestUtils.runAsync(() -> execute("--cache", "idle_verify"));

beforeCancelLatch.await();

assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify", "--cancel"));

afterCancelLatch.countDown();

for (int i = 0; i < gridsCnt; i++) {
int finalI = i;

waitForCondition(() -> {
for (ComputeTaskView taskView : grid(finalI).context().systemView().<ComputeTaskView>view(TASKS_VIEW)) {
if (IdleVerifyTaskV2.class.getName().equals(taskView.taskName()))
return false;
}

return true;
}, 1000);

waitForCondition(() -> {
for (ComputeJobView jobView : grid(finalI).context().systemView().<ComputeJobView>view(JOBS_VIEW)) {
if (IdleVerifyTaskV2.class.getName().equals(jobView.taskName()))
return false;
}

return true;
}, 1000);
}

idleVerifyFut.get(getTestTimeout());

assertTrue(idleVerifyCancelListener.check());

assertTrue(verifyBackupCancelListener.check());
}

/**
*
*/
@Test
public void testIdleVerifyTrackedForkJoinPool() throws Exception {
final int gridsCnt = 4;

IgniteEx srv = startGrids(gridsCnt);

srv.cluster().state(ACTIVE);

LogListener idleVerifyCancelListener = LogListener.matches("Idle verify was cancelled.").build();

LogListener verifyBackupCancelListener = LogListener.matches("Cancel request sent to VerifyBackupPartitionsJobV2.").build();

listeningLog.registerListener(idleVerifyCancelListener);

listeningLog.registerListener(verifyBackupCancelListener);

IgniteCache<Integer, Integer> cache = srv.createCache(new CacheConfiguration<Integer, Integer>(DEFAULT_CACHE_NAME).setBackups(3));

for (int i = 0; i < 10000; i++)
cache.put(i, i);

CountDownLatch beforeCancelLatch = new CountDownLatch(1);
CountDownLatch afterCancelLatch = new CountDownLatch(1);

ForkJoinPool forkJoinPool = new ForkJoinPool() {
@Override public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
System.out.println("GridCommandHandlerTest.submit");

beforeCancelLatch.countDown();

ForkJoinTask<T> submitted = super.submit(task);

try {
afterCancelLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

return submitted;
}
};

VerifyBackupPartitionsTaskV2.poolSupplier = () -> forkJoinPool;

IgniteInternalFuture<Integer> idleVerifyFut = GridTestUtils.runAsync(() -> execute("--cache", "idle_verify"));

doSleep(1000);

assertFalse(idleVerifyFut.isDone());

beforeCancelLatch.await();

System.out.println("reached execute cancel");

assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify", "--cancel"));

afterCancelLatch.countDown();

for (int i = 0; i < gridsCnt; i++) {
int finalI = i;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
Expand Down Expand Up @@ -90,6 +91,8 @@ public class VerifyBackupPartitionsTaskV2 extends ComputeTaskAdapter<CacheIdleVe
public static final String IDLE_VERIFY_ON_INACTIVE_CLUSTER_ERROR_MESSAGE = "Cannot perform the operation because " +
"the cluster is inactive.";

public static Supplier<ForkJoinPool> poolSupplier = ForkJoinPool::commonPool;

/** Injected logger. */
@LoggerResource
private IgniteLogger log;
Expand Down Expand Up @@ -357,8 +360,10 @@ private List<Future<Map<PartitionKeyV2, PartitionHashRecordV2>>> calcPartitionHa

List<GridDhtLocalPartition> parts = grpCtx.topology().localPartitions();

ForkJoinPool pool = poolSupplier.get();

for (GridDhtLocalPartition part : parts)
partHashCalcFutures.add(calculatePartitionHashAsync(grpCtx, part));
partHashCalcFutures.add(calculatePartitionHashAsync(pool, grpCtx, part));
}

return partHashCalcFutures;
Expand Down Expand Up @@ -497,10 +502,11 @@ private boolean isCacheMatchFilter(DynamicCacheDescriptor desc) {
* @param part Local partition.
*/
private Future<Map<PartitionKeyV2, PartitionHashRecordV2>> calculatePartitionHashAsync(
final ForkJoinPool pool,
final CacheGroupContext gctx,
final GridDhtLocalPartition part
) {
return ForkJoinPool.commonPool().submit(() -> {
return pool.submit(() -> {
Map<PartitionKeyV2, PartitionHashRecordV2> res = emptyMap();

if (!part.reserve())
Expand Down

0 comments on commit 38b3245

Please sign in to comment.