Skip to content

Commit

Permalink
IGNITE-20627 Added number of partitions processed by the index worker…
Browse files Browse the repository at this point in the history
… to the output of the index commands. (#10993)
  • Loading branch information
petrov-mg authored Oct 26, 2023
1 parent c861ac5 commit a0d074d
Show file tree
Hide file tree
Showing 14 changed files with 246 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
import org.apache.ignite.cache.CacheAtomicityMode;
Expand All @@ -43,6 +44,7 @@
import org.apache.ignite.internal.processors.query.schema.IndexRebuildCancelToken;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheFuture;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
import org.apache.ignite.internal.util.GridStringBuilder;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
Expand Down Expand Up @@ -729,20 +731,24 @@ private static String makeStringListWithIndent(String... strings) {
/**
* Makes formatted text for given caches.
*
* @param header Output header.
* @param cacheGroputToNames Cache groups mapping to non-existing cache names.
* @return Text for CLI print output for given caches.
* @return CLI output pattern for given caches.
*/
private static String makeStringListForCacheGroupsAndNames(Map<String, List<String>> cacheGroputToNames) {
SB sb = new SB();
private static Pattern makePatternForCacheGroupsAndNames(String header, Map<String, List<String>> cacheGroputToNames) {
GridStringBuilder sb = new SB(header).a("\\n");

for (Map.Entry<String, List<String>> entry : cacheGroputToNames.entrySet()) {
String cacheGrp = entry.getKey();

for (String cacheName : entry.getValue())
sb.a(INDENT).a("groupName=").a(cacheGrp).a(", cacheName=").a(cacheName).a(U.nl());
sb.a(INDENT)
.a("groupName=").a(cacheGrp)
.a(", cacheName=").a(cacheName)
.a(", indexBuildPartitionsLeftCount=(\\d+), totalPartitionsCount=(\\d+), progress=(\\d+)%\\n");
}

return sb.toString();
return Pattern.compile(sb.toString());
}

/**
Expand All @@ -752,13 +758,12 @@ private static String makeStringListForCacheGroupsAndNames(Map<String, List<Stri
* @param cacheGroputToNames Cache groups mapping to non-existing cache names.
*/
private static void validateOutputIndicesRebuildingInProgress(String outputStr, Map<String, List<String>> cacheGroputToNames) {
String caches = makeStringListForCacheGroupsAndNames(cacheGroputToNames);

assertContains(
log,
outputStr,
"WARNING: These caches have indexes rebuilding in progress:" + U.nl() + caches
Pattern pattern = makePatternForCacheGroupsAndNames(
"WARNING: These caches have indexes rebuilding in progress:",
cacheGroputToNames
);

assertTrue(pattern.matcher(outputStr).find());
}

/**
Expand All @@ -768,13 +773,12 @@ private static void validateOutputIndicesRebuildingInProgress(String outputStr,
* @param cacheGroputToNames Cache groups mapping to non-existing cache names.
*/
private void validateOutputIndicesRebuildWasStarted(String outputStr, Map<String, List<String>> cacheGroputToNames) {
String caches = makeStringListForCacheGroupsAndNames(cacheGroputToNames);

assertContains(
log,
outputStr,
"Indexes rebuild was started for these caches:" + U.nl() + caches
Pattern pattern = makePatternForCacheGroupsAndNames(
"Indexes rebuild was started for these caches:",
cacheGroputToNames
);

assertTrue(pattern.matcher(outputStr).find());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,21 @@

package org.apache.ignite.util;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.Ignition;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
Expand All @@ -39,9 +46,11 @@
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK;
import static org.apache.ignite.testframework.GridTestUtils.assertContains;
import static org.apache.ignite.testframework.GridTestUtils.deleteIndexBin;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
import static org.apache.ignite.util.GridCommandHandlerIndexingUtils.createAndFillCache;
import static org.apache.ignite.util.GridCommandHandlerIndexingUtils.createAndFillThreeFieldsEntryCache;
import static org.apache.ignite.util.GridCommandHandlerIndexingUtils.simpleIndexEntity;
Expand Down Expand Up @@ -90,6 +99,7 @@ public class GridCommandHandlerIndexRebuildStatusTest extends GridCommandHandler

idxRebuildsStartedNum.set(0);
statusRequestingFinished.set(false);
BlockingSchemaIndexCacheVisitorClosure.rowIndexListener = null;
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -143,22 +153,45 @@ public void testCommandOutput() throws Exception {
deleteIndexBin(getTestIgniteInstanceName(GRIDS_NUM - 2));

IndexProcessor.idxRebuildCls = BlockingIndexesRebuildTask.class;
IgniteEx ignite1 = startGrid(GRIDS_NUM - 1);
startGrid(GRIDS_NUM - 1);

IndexProcessor.idxRebuildCls = BlockingIndexesRebuildTask.class;
IgniteEx ignite2 = startGrid(GRIDS_NUM - 2);
startGrid(GRIDS_NUM - 2);

final UUID id1 = ignite1.localNode().id();
final UUID id2 = ignite2.localNode().id();
grid(0).cache("cache1").enableStatistics(true);

boolean allRebuildsStarted = GridTestUtils.waitForCondition(() -> idxRebuildsStartedNum.get() == 6, 30_000);
assertTrue("Failed to wait for all indexes to start being rebuilt", allRebuildsStarted);

assertEquals(EXIT_CODE_OK, execute(handler, "--cache", "indexes_rebuild_status"));

checkResult(handler, 1, 2);

statusRequestingFinished.set(true);

checkResult(handler, id1, id2);
CountDownLatch idxProgressBlockedLatch = new CountDownLatch(1);
CountDownLatch idxProgressUnblockedLatch = new CountDownLatch(1);

BlockingSchemaIndexCacheVisitorClosure.rowIndexListener = () -> {
if (isIndexRebuildInProgress("cache1")) {
try {
idxProgressBlockedLatch.countDown();

idxProgressUnblockedLatch.await(getTestTimeout(), MILLISECONDS);
}
catch (InterruptedException e) {
throw new IgniteException(e);
}
}
};

assertTrue(idxProgressBlockedLatch.await(getTestTimeout(), MILLISECONDS));

assertEquals(EXIT_CODE_OK, execute(handler, "--cache", "indexes_rebuild_status"));

checkRebuildInProgressOutputFor("cache1");

idxProgressUnblockedLatch.countDown();
}

/**
Expand Down Expand Up @@ -192,7 +225,7 @@ public void testNodeIdOption() throws Exception {

statusRequestingFinished.set(true);

checkResult(handler, id1);
checkResult(handler, 2);
}

/**
Expand All @@ -214,26 +247,82 @@ public void testEmptyResult() {
* in {@code handler} last operation result and in {@code testOut}.
*
* @param handler CommandHandler used to run command.
* @param nodeIds Ids to check.
* @param nodeIdxs Indexes of node to check.
*/
private void checkResult(TestCommandHandler handler, UUID... nodeIds) {
private void checkResult(TestCommandHandler handler, int... nodeIdxs) {
String output = testOut.toString();

Map<UUID, Set<IndexRebuildStatusInfoContainer>> cmdResult = handler.getLastOperationResult();

assertNotNull(cmdResult);
assertEquals("Unexpected number of nodes in result", nodeIds.length, cmdResult.size());
assertEquals("Unexpected number of nodes in result", nodeIdxs.length, cmdResult.size());

for (int nodeIdx : nodeIdxs) {
Set<IndexRebuildStatusInfoContainer> cacheInfos = cmdResult.get(grid(nodeIdx).localNode().id());

for (UUID nodeId: nodeIds) {
Set<IndexRebuildStatusInfoContainer> cacheInfos = cmdResult.get(nodeId);
assertNotNull(cacheInfos);
assertEquals("Unexpected number of cacheInfos in result", 3, cacheInfos.size());

final String nodeStr = "node_id=" + nodeId + ", groupName=group1, cacheName=cache2\n" +
"node_id=" + nodeId + ", groupName=group2, cacheName=cache1\n" +
"node_id=" + nodeId + ", groupName=no_group, cacheName=cache_no_group";
checkRebuildStartOutput(output, nodeIdx, "group1", "cache2");
checkRebuildStartOutput(output, nodeIdx, "group2", "cache1");
checkRebuildStartOutput(output, nodeIdx, "no_group", "cache_no_group");
}
}

/** */
private void checkRebuildStartOutput(String output, int nodeIdx, String grpName, String cacheName) {
IgniteEx ignite = grid(nodeIdx);

int locPartsCount = ignite.context().cache().cache(cacheName).context().topology().localPartitions().size();

assertContains(
log,
output,
"node_id=" + ignite.localNode().id() +
", groupName=" + grpName +
", cacheName=" + cacheName +
", indexBuildPartitionsLeftCount=" + locPartsCount +
", totalPartitionsCount=" + locPartsCount +
", progress=0%");
}

/** */
private void checkRebuildInProgressOutputFor(String cacheName) throws Exception {
Matcher matcher = Pattern.compile(
"cacheName=" + cacheName + ", indexBuildPartitionsLeftCount=(\\d+), totalPartitionsCount=(\\d+), progress=(\\d+)%"
).matcher(testOut.toString());

List<Integer> rebuildProgressStatuses = new ArrayList<>();
List<Integer> indexBuildPartitionsLeftCounts = new ArrayList<>();

while (matcher.find()) {
indexBuildPartitionsLeftCounts.add(Integer.parseInt(matcher.group(1)));

rebuildProgressStatuses.add(Integer.parseInt(matcher.group(3)));
}

assertTrue(rebuildProgressStatuses.stream().anyMatch(progress -> progress > 0));

int cacheTotalRebuildingPartsCnt = indexBuildPartitionsLeftCounts.stream().mapToInt(Integer::intValue).sum();

assertContains(log, output, nodeStr);
assertTrue(waitForCondition(
() -> grid(0).cache(cacheName).metrics().getIndexBuildPartitionsLeftCount() == cacheTotalRebuildingPartsCnt,
getTestTimeout())
);
}

/** */
private boolean isIndexRebuildInProgress(String cacheName) {
for (Ignite ignite : Ignition.allGrids()) {
GridCacheContext<Object, Object> cctx = ((IgniteEx)ignite).context().cache().cache(cacheName).context();

long parts = cctx.cache().metrics0().getIndexBuildPartitionsLeftCount();

if (parts > 0 && parts < cctx.topology().partitions() / 2)
return true;
}

return false;
}

/**
Expand All @@ -256,6 +345,9 @@ private static class BlockingSchemaIndexCacheVisitorClosure implements SchemaInd
/** */
private SchemaIndexCacheVisitorClosure original;

/** */
public static Runnable rowIndexListener;

/**
* @param original Original.
*/
Expand All @@ -274,6 +366,9 @@ private static class BlockingSchemaIndexCacheVisitorClosure implements SchemaInd
statusRequestingFinished.set(true);
}

if (rowIndexListener != null)
rowIndexListener.run();

original.apply(row);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -735,4 +735,10 @@ public interface CacheMetrics {
* @return Number of keys processed during index rebuilding.
*/
public long getIndexRebuildKeysProcessed();

/**
* @return The number of partitions that remain to be processed to complete indexing.
* Note that this metric includes backup partitions, which also participate in index building on each node.
*/
public int getIndexBuildPartitionsLeftCount();
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,12 @@ protected IndexForceRebuildJob(CacheIndexesForceRebuildCommandArg arg, boolean d

Set<IndexRebuildStatusInfoContainer> cachesWithRebuildingInProgress =
cachesCtxWithRebuildingInProgress.stream()
.map(c -> new IndexRebuildStatusInfoContainer(c.config()))
.map(IndexRebuildStatusInfoContainer::new)
.collect(Collectors.toSet());

Set<IndexRebuildStatusInfoContainer> cachesWithStartedRebuild =
cachesToRebuild.stream()
.map(c -> new IndexRebuildStatusInfoContainer(c.config()))
.map(IndexRebuildStatusInfoContainer::new)
.filter(c -> !cachesWithRebuildingInProgress.contains(c))
.collect(Collectors.toSet());

Expand Down
Loading

0 comments on commit a0d074d

Please sign in to comment.