diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexForceRebuildTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexForceRebuildTest.java index ee269d7c4e7be..ce9f0658de5ac 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexForceRebuildTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexForceRebuildTest.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.ignite.Ignite; import org.apache.ignite.cache.CacheAtomicityMode; @@ -43,6 +44,7 @@ import org.apache.ignite.internal.processors.query.schema.IndexRebuildCancelToken; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheFuture; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure; +import org.apache.ignite.internal.util.GridStringBuilder; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; @@ -729,20 +731,24 @@ private static String makeStringListWithIndent(String... strings) { /** * Makes formatted text for given caches. * + * @param header Output header. * @param cacheGroputToNames Cache groups mapping to non-existing cache names. - * @return Text for CLI print output for given caches. + * @return CLI output pattern for given caches. */ - private static String makeStringListForCacheGroupsAndNames(Map> cacheGroputToNames) { - SB sb = new SB(); + private static Pattern makePatternForCacheGroupsAndNames(String header, Map> cacheGroputToNames) { + GridStringBuilder sb = new SB(header).a("\\n"); for (Map.Entry> entry : cacheGroputToNames.entrySet()) { String cacheGrp = entry.getKey(); for (String cacheName : entry.getValue()) - sb.a(INDENT).a("groupName=").a(cacheGrp).a(", cacheName=").a(cacheName).a(U.nl()); + sb.a(INDENT) + .a("groupName=").a(cacheGrp) + .a(", cacheName=").a(cacheName) + .a(", indexBuildPartitionsLeftCount=(\\d+), totalPartitionsCount=(\\d+), progress=(\\d+)%\\n"); } - return sb.toString(); + return Pattern.compile(sb.toString()); } /** @@ -752,13 +758,12 @@ private static String makeStringListForCacheGroupsAndNames(Map> cacheGroputToNames) { - String caches = makeStringListForCacheGroupsAndNames(cacheGroputToNames); - - assertContains( - log, - outputStr, - "WARNING: These caches have indexes rebuilding in progress:" + U.nl() + caches + Pattern pattern = makePatternForCacheGroupsAndNames( + "WARNING: These caches have indexes rebuilding in progress:", + cacheGroputToNames ); + + assertTrue(pattern.matcher(outputStr).find()); } /** @@ -768,13 +773,12 @@ private static void validateOutputIndicesRebuildingInProgress(String outputStr, * @param cacheGroputToNames Cache groups mapping to non-existing cache names. */ private void validateOutputIndicesRebuildWasStarted(String outputStr, Map> cacheGroputToNames) { - String caches = makeStringListForCacheGroupsAndNames(cacheGroputToNames); - - assertContains( - log, - outputStr, - "Indexes rebuild was started for these caches:" + U.nl() + caches + Pattern pattern = makePatternForCacheGroupsAndNames( + "Indexes rebuild was started for these caches:", + cacheGroputToNames ); + + assertTrue(pattern.matcher(outputStr).find()); } /** diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexRebuildStatusTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexRebuildStatusTest.java index 9ed302c6bc75b..80913e9f3eb0b 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexRebuildStatusTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexRebuildStatusTest.java @@ -17,14 +17,21 @@ package org.apache.ignite.util; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.Ignition; import org.apache.ignite.cluster.ClusterState; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInterruptedCheckedException; @@ -39,9 +46,11 @@ import org.apache.ignite.testframework.GridTestUtils; import org.junit.Test; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK; import static org.apache.ignite.testframework.GridTestUtils.assertContains; import static org.apache.ignite.testframework.GridTestUtils.deleteIndexBin; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; import static org.apache.ignite.util.GridCommandHandlerIndexingUtils.createAndFillCache; import static org.apache.ignite.util.GridCommandHandlerIndexingUtils.createAndFillThreeFieldsEntryCache; import static org.apache.ignite.util.GridCommandHandlerIndexingUtils.simpleIndexEntity; @@ -90,6 +99,7 @@ public class GridCommandHandlerIndexRebuildStatusTest extends GridCommandHandler idxRebuildsStartedNum.set(0); statusRequestingFinished.set(false); + BlockingSchemaIndexCacheVisitorClosure.rowIndexListener = null; } /** {@inheritDoc} */ @@ -143,22 +153,45 @@ public void testCommandOutput() throws Exception { deleteIndexBin(getTestIgniteInstanceName(GRIDS_NUM - 2)); IndexProcessor.idxRebuildCls = BlockingIndexesRebuildTask.class; - IgniteEx ignite1 = startGrid(GRIDS_NUM - 1); + startGrid(GRIDS_NUM - 1); IndexProcessor.idxRebuildCls = BlockingIndexesRebuildTask.class; - IgniteEx ignite2 = startGrid(GRIDS_NUM - 2); + startGrid(GRIDS_NUM - 2); - final UUID id1 = ignite1.localNode().id(); - final UUID id2 = ignite2.localNode().id(); + grid(0).cache("cache1").enableStatistics(true); boolean allRebuildsStarted = GridTestUtils.waitForCondition(() -> idxRebuildsStartedNum.get() == 6, 30_000); assertTrue("Failed to wait for all indexes to start being rebuilt", allRebuildsStarted); assertEquals(EXIT_CODE_OK, execute(handler, "--cache", "indexes_rebuild_status")); + checkResult(handler, 1, 2); + statusRequestingFinished.set(true); - checkResult(handler, id1, id2); + CountDownLatch idxProgressBlockedLatch = new CountDownLatch(1); + CountDownLatch idxProgressUnblockedLatch = new CountDownLatch(1); + + BlockingSchemaIndexCacheVisitorClosure.rowIndexListener = () -> { + if (isIndexRebuildInProgress("cache1")) { + try { + idxProgressBlockedLatch.countDown(); + + idxProgressUnblockedLatch.await(getTestTimeout(), MILLISECONDS); + } + catch (InterruptedException e) { + throw new IgniteException(e); + } + } + }; + + assertTrue(idxProgressBlockedLatch.await(getTestTimeout(), MILLISECONDS)); + + assertEquals(EXIT_CODE_OK, execute(handler, "--cache", "indexes_rebuild_status")); + + checkRebuildInProgressOutputFor("cache1"); + + idxProgressUnblockedLatch.countDown(); } /** @@ -192,7 +225,7 @@ public void testNodeIdOption() throws Exception { statusRequestingFinished.set(true); - checkResult(handler, id1); + checkResult(handler, 2); } /** @@ -214,26 +247,82 @@ public void testEmptyResult() { * in {@code handler} last operation result and in {@code testOut}. * * @param handler CommandHandler used to run command. - * @param nodeIds Ids to check. + * @param nodeIdxs Indexes of node to check. */ - private void checkResult(TestCommandHandler handler, UUID... nodeIds) { + private void checkResult(TestCommandHandler handler, int... nodeIdxs) { String output = testOut.toString(); Map> cmdResult = handler.getLastOperationResult(); + assertNotNull(cmdResult); - assertEquals("Unexpected number of nodes in result", nodeIds.length, cmdResult.size()); + assertEquals("Unexpected number of nodes in result", nodeIdxs.length, cmdResult.size()); + + for (int nodeIdx : nodeIdxs) { + Set cacheInfos = cmdResult.get(grid(nodeIdx).localNode().id()); - for (UUID nodeId: nodeIds) { - Set cacheInfos = cmdResult.get(nodeId); assertNotNull(cacheInfos); assertEquals("Unexpected number of cacheInfos in result", 3, cacheInfos.size()); - final String nodeStr = "node_id=" + nodeId + ", groupName=group1, cacheName=cache2\n" + - "node_id=" + nodeId + ", groupName=group2, cacheName=cache1\n" + - "node_id=" + nodeId + ", groupName=no_group, cacheName=cache_no_group"; + checkRebuildStartOutput(output, nodeIdx, "group1", "cache2"); + checkRebuildStartOutput(output, nodeIdx, "group2", "cache1"); + checkRebuildStartOutput(output, nodeIdx, "no_group", "cache_no_group"); + } + } + + /** */ + private void checkRebuildStartOutput(String output, int nodeIdx, String grpName, String cacheName) { + IgniteEx ignite = grid(nodeIdx); + + int locPartsCount = ignite.context().cache().cache(cacheName).context().topology().localPartitions().size(); + + assertContains( + log, + output, + "node_id=" + ignite.localNode().id() + + ", groupName=" + grpName + + ", cacheName=" + cacheName + + ", indexBuildPartitionsLeftCount=" + locPartsCount + + ", totalPartitionsCount=" + locPartsCount + + ", progress=0%"); + } + + /** */ + private void checkRebuildInProgressOutputFor(String cacheName) throws Exception { + Matcher matcher = Pattern.compile( + "cacheName=" + cacheName + ", indexBuildPartitionsLeftCount=(\\d+), totalPartitionsCount=(\\d+), progress=(\\d+)%" + ).matcher(testOut.toString()); + + List rebuildProgressStatuses = new ArrayList<>(); + List indexBuildPartitionsLeftCounts = new ArrayList<>(); + + while (matcher.find()) { + indexBuildPartitionsLeftCounts.add(Integer.parseInt(matcher.group(1))); + + rebuildProgressStatuses.add(Integer.parseInt(matcher.group(3))); + } + + assertTrue(rebuildProgressStatuses.stream().anyMatch(progress -> progress > 0)); + + int cacheTotalRebuildingPartsCnt = indexBuildPartitionsLeftCounts.stream().mapToInt(Integer::intValue).sum(); - assertContains(log, output, nodeStr); + assertTrue(waitForCondition( + () -> grid(0).cache(cacheName).metrics().getIndexBuildPartitionsLeftCount() == cacheTotalRebuildingPartsCnt, + getTestTimeout()) + ); + } + + /** */ + private boolean isIndexRebuildInProgress(String cacheName) { + for (Ignite ignite : Ignition.allGrids()) { + GridCacheContext cctx = ((IgniteEx)ignite).context().cache().cache(cacheName).context(); + + long parts = cctx.cache().metrics0().getIndexBuildPartitionsLeftCount(); + + if (parts > 0 && parts < cctx.topology().partitions() / 2) + return true; } + + return false; } /** @@ -256,6 +345,9 @@ private static class BlockingSchemaIndexCacheVisitorClosure implements SchemaInd /** */ private SchemaIndexCacheVisitorClosure original; + /** */ + public static Runnable rowIndexListener; + /** * @param original Original. */ @@ -274,6 +366,9 @@ private static class BlockingSchemaIndexCacheVisitorClosure implements SchemaInd statusRequestingFinished.set(true); } + if (rowIndexListener != null) + rowIndexListener.run(); + original.apply(row); } } diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java index 15e8ac9e2822d..5ede778637346 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java @@ -735,4 +735,10 @@ public interface CacheMetrics { * @return Number of keys processed during index rebuilding. */ public long getIndexRebuildKeysProcessed(); + + /** + * @return The number of partitions that remain to be processed to complete indexing. + * Note that this metric includes backup partitions, which also participate in index building on each node. + */ + public int getIndexBuildPartitionsLeftCount(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IndexForceRebuildTask.java b/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IndexForceRebuildTask.java index 74ce6c787d5f3..85479d93eeb3e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IndexForceRebuildTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IndexForceRebuildTask.java @@ -101,12 +101,12 @@ protected IndexForceRebuildJob(CacheIndexesForceRebuildCommandArg arg, boolean d Set cachesWithRebuildingInProgress = cachesCtxWithRebuildingInProgress.stream() - .map(c -> new IndexRebuildStatusInfoContainer(c.config())) + .map(IndexRebuildStatusInfoContainer::new) .collect(Collectors.toSet()); Set cachesWithStartedRebuild = cachesToRebuild.stream() - .map(c -> new IndexRebuildStatusInfoContainer(c.config())) + .map(IndexRebuildStatusInfoContainer::new) .filter(c -> !cachesWithRebuildingInProgress.contains(c)) .collect(Collectors.toSet()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IndexRebuildStatusInfoContainer.java b/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IndexRebuildStatusInfoContainer.java index 057ba7cdb1f9e..a2754c7e5ab33 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IndexRebuildStatusInfoContainer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IndexRebuildStatusInfoContainer.java @@ -23,6 +23,7 @@ import java.util.Comparator; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.dto.IgniteDataTransferObject; +import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -42,6 +43,12 @@ public class IndexRebuildStatusInfoContainer extends IgniteDataTransferObject { /** Cache name. */ private String cacheName; + /** */ + private int indexBuildPartitionsLeftCount; + + /** Local partitions count. */ + private int totalPartitionsCount; + /** * Empty constructor required for Serializable. */ @@ -50,23 +57,31 @@ public IndexRebuildStatusInfoContainer() { } /** */ - public IndexRebuildStatusInfoContainer(CacheConfiguration cfg) { - assert cfg != null; + public IndexRebuildStatusInfoContainer(GridCacheContext cctx) { + assert cctx != null; + + CacheConfiguration cfg = cctx.config(); groupName = cfg.getGroupName() == null ? EMPTY_GROUP_NAME : cfg.getGroupName(); cacheName = cfg.getName(); + indexBuildPartitionsLeftCount = cctx.cache().metrics0().getIndexBuildPartitionsLeftCount(); + totalPartitionsCount = cctx.topology().localPartitions().size(); } /** {@inheritDoc} */ @Override protected void writeExternalData(ObjectOutput out) throws IOException { U.writeString(out, groupName); U.writeString(out, cacheName); + out.writeInt(indexBuildPartitionsLeftCount); + out.writeInt(totalPartitionsCount); } /** {@inheritDoc} */ @Override protected void readExternalData(byte protoVer, ObjectInput in) throws IOException, ClassNotFoundException { groupName = U.readString(in); cacheName = U.readString(in); + indexBuildPartitionsLeftCount = in.readInt(); + totalPartitionsCount = in.readInt(); } /** {@inheritDoc} */ @@ -98,11 +113,32 @@ public String cacheName() { return cacheName; } + /** + * @return Total local node partitions count. + */ + public int totalPartitionsCount() { + return totalPartitionsCount; + } + + /** + * @return The number of local node partitions that remain to be processed to complete indexing. + */ + public int indexBuildPartitionsLeftCount() { + return indexBuildPartitionsLeftCount; + } + /** * @return default string object representation without {@code IndexRebuildStatusInfoContainer} and brackets. */ @Override public String toString() { - String dfltImpl = S.toString(IndexRebuildStatusInfoContainer.class, this); + float progress = (float)(totalPartitionsCount - indexBuildPartitionsLeftCount) / totalPartitionsCount; + + String dfltImpl = S.toString( + IndexRebuildStatusInfoContainer.class, + this, + "progress", + (int)(Math.max(0, progress) * 100) + "%" + ); return dfltImpl.substring(IndexRebuildStatusInfoContainer.class.getSimpleName().length() + 2, dfltImpl.length() - 1); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IndexRebuildStatusTask.java b/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IndexRebuildStatusTask.java index d50f07710ad8f..31111f8e198bf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IndexRebuildStatusTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IndexRebuildStatusTask.java @@ -26,11 +26,10 @@ import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; -import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.compute.ComputeJobResult; -import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.visor.VisorJob; import org.apache.ignite.internal.visor.VisorMultiNodeTask; @@ -99,15 +98,15 @@ protected IndexRebuildStatusJob(@Nullable CacheIndexesRebuildStatusCommandArg ar @Override protected Set run( @Nullable CacheIndexesRebuildStatusCommandArg arg ) throws IgniteException { - Set rebuildIdxCaches = + Set> rebuildIdxCaches = ignite.context().cache().publicCaches().stream() .filter(c -> !c.indexReadyFuture().isDone()) .collect(Collectors.toSet()); Set res = new HashSet<>(); - for (IgniteCache cache : rebuildIdxCaches) - res.add(new IndexRebuildStatusInfoContainer(cache.getConfiguration(CacheConfiguration.class))); + for (IgniteCacheProxy cache : rebuildIdxCaches) + res.add(new IndexRebuildStatusInfoContainer(cache.context())); return res; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsImpl.java index eac03b8ecd355..6fbe537974ead 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsImpl.java @@ -55,9 +55,6 @@ public class CacheGroupMetricsImpl { /** Cache group metrics prefix. */ public static final String CACHE_GROUP_METRICS_PREFIX = "cacheGroups"; - /** Number of partitions need processed for finished indexes create or rebuilding. */ - private final AtomicLongMetric idxBuildCntPartitionsLeft; - /** Cache group context. */ private final CacheGroupContext ctx; @@ -111,8 +108,11 @@ public CacheGroupMetricsImpl(CacheGroupContext ctx) { () -> persistenceEnabled ? database().forGroupPageStores(ctx, PageStore::getSparseSize) : 0, "Storage space allocated for group adjusted for possible sparsity, in bytes."); - idxBuildCntPartitionsLeft = mreg.longMetric("IndexBuildCountPartitionsLeft", - "Number of partitions need processed for finished indexes create or rebuilding."); + mreg.register( + "IndexBuildCountPartitionsLeft", + this::getIndexBuildCountPartitionsLeft, + "Number of partitions need processed for finished indexes create or rebuilding." + ); initLocPartitionsNum = mreg.longMetric("InitializedLocalPartitionsNumber", "Number of local partitions initialized on current node."); @@ -196,19 +196,7 @@ public void onTopologyInitialized() { /** */ public long getIndexBuildCountPartitionsLeft() { - return idxBuildCntPartitionsLeft.value(); - } - - /** Add number of partitions need processed for finished indexes create or rebuilding. */ - public void addIndexBuildCountPartitionsLeft(long idxBuildCntPartitionsLeft) { - this.idxBuildCntPartitionsLeft.add(idxBuildCntPartitionsLeft); - } - - /** - * Decrement number of partitions need processed for finished indexes create or rebuilding. - */ - public void decrementIndexBuildCountPartitionsLeft() { - idxBuildCntPartitionsLeft.decrement(); + return ctx.caches().stream().mapToLong(cctx -> cctx.cache().metrics0().getIndexBuildPartitionsLeftCount()).sum(); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java index be49398e4cf7c..80a27045abbfb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric; import org.apache.ignite.internal.processors.metric.impl.HistogramMetricImpl; import org.apache.ignite.internal.processors.metric.impl.HitRateMetric; +import org.apache.ignite.internal.processors.metric.impl.IntMetricImpl; import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric; import org.apache.ignite.internal.processors.metric.impl.LongGauge; import org.apache.ignite.internal.processors.metric.impl.MetricUtils; @@ -245,6 +246,9 @@ public class CacheMetricsImpl implements CacheMetrics { /** Number of keys processed during index rebuilding. */ private final LongAdderMetric idxRebuildKeyProcessed; + /** The number of local node partitions that remain to be processed to complete indexing. */ + private final IntMetricImpl idxBuildPartitionsLeftCnt; + /** * Creates cache metrics. * @@ -431,6 +435,9 @@ public CacheMetricsImpl(GridCacheContext cctx, boolean isNear) { idxRebuildKeyProcessed = mreg.longAdderMetric("IndexRebuildKeyProcessed", "Number of keys processed during the index rebuilding."); + + idxBuildPartitionsLeftCnt = mreg.intMetric("IndexBuildPartitionsLeftCount", + "The number of local node partitions that remain to be processed to complete indexing."); } /** @@ -1658,6 +1665,26 @@ public void addIndexRebuildKeyProcessed(long val) { idxRebuildKeyProcessed.add(val); } + /** */ + public void decrementIndexBuildPartitionsLeftCount() { + idxBuildPartitionsLeftCnt.decrement(); + } + + /** */ + public void addIndexBuildPartitionsLeftCount(int val) { + idxBuildPartitionsLeftCnt.add(val); + } + + /** */ + public void resetIndexBuildPartitionsLeftCount() { + idxBuildPartitionsLeftCnt.reset(); + } + + /** {@inheritDoc} */ + @Override public int getIndexBuildPartitionsLeftCount() { + return idxBuildPartitionsLeftCnt.value(); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheMetricsImpl.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java index 2d7f62c625b77..e50a1810213d9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java @@ -1046,6 +1046,11 @@ public CacheMetricsSnapshot(CacheMetrics loc, Collection metrics) return idxRebuildKeyProcessed; } + /** {@inheritDoc} */ + @Override public int getIndexBuildPartitionsLeftCount() { + return 0; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheMetricsSnapshot.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshotV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshotV2.java index f792499619f0c..3249381fd6b6f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshotV2.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshotV2.java @@ -329,6 +329,9 @@ public class CacheMetricsSnapshotV2 extends IgniteDataTransferObject implements /** Number of keys processed during index rebuilding. */ private long idxRebuildKeyProcessed; + /** The number of local node partitions that remain to be processed to complete indexing. */ + private int idxBuildPartitionsLeftCount; + /** * Default constructor. */ @@ -442,6 +445,8 @@ public CacheMetricsSnapshotV2(CacheMetricsImpl m) { idxRebuildInProgress = m.isIndexRebuildInProgress(); idxRebuildKeyProcessed = m.getIndexRebuildKeysProcessed(); + + idxBuildPartitionsLeftCount = m.getIndexBuildPartitionsLeftCount(); } /** @@ -587,6 +592,7 @@ public CacheMetricsSnapshotV2(CacheMetrics loc, Collection metrics keysToRebalanceLeft += e.getKeysToRebalanceLeft(); rebalancingBytesRate += e.getRebalancingBytesRate(); rebalancingKeysRate += e.getRebalancingKeysRate(); + idxBuildPartitionsLeftCount += e.getIndexBuildPartitionsLeftCount(); } int size = metrics.size(); @@ -1072,6 +1078,11 @@ public CacheMetricsSnapshotV2(CacheMetrics loc, Collection metrics return idxRebuildKeyProcessed; } + /** {@inheritDoc} */ + @Override public int getIndexBuildPartitionsLeftCount() { + return idxBuildPartitionsLeftCount; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheMetricsSnapshotV2.class, this); @@ -1154,6 +1165,7 @@ public CacheMetricsSnapshotV2(CacheMetrics loc, Collection metrics out.writeInt(size); out.writeInt(keySize); U.writeLongString(out, txKeyCollisions); + out.writeInt(idxBuildPartitionsLeftCount); } /** {@inheritDoc} */ @@ -1233,5 +1245,6 @@ public CacheMetricsSnapshotV2(CacheMetrics loc, Collection metrics size = in.readInt(); keySize = in.readInt(); txKeyCollisions = U.readLongString(in); + idxBuildPartitionsLeftCount = in.readInt(); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/impl/IntMetricImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/impl/IntMetricImpl.java index 2c4927706cb4a..c5c6d995ad557 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/impl/IntMetricImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/impl/IntMetricImpl.java @@ -55,6 +55,11 @@ public void increment() { add(1); } + /** Adds -1 to the metric. */ + public void decrement() { + add(-1); + } + /** * Sets value. * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCachePartitionWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCachePartitionWorker.java index e37fa54f8ba55..222551cd8eff1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCachePartitionWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCachePartitionWorker.java @@ -139,7 +139,7 @@ public SchemaIndexCachePartitionWorker( int cnt = partsCnt.getAndSet(0); if (cnt > 0) - cctx.group().metrics().addIndexBuildCountPartitionsLeft(-cnt); + cctx.cache().metrics0().resetIndexBuildPartitionsLeftCount(); } finally { fut.onDone(wrappedClo.indexCacheStat, err); @@ -213,7 +213,7 @@ private void processPartition() throws IgniteCheckedException { locPart.release(); if (partsCnt.getAndUpdate(v -> v > 0 ? v - 1 : 0) > 0) - cctx.group().metrics().decrementIndexBuildCountPartitionsLeft(); + cctx.cache().metrics0().decrementIndexBuildPartitionsLeftCount(); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java index d00691f0fc4a1..241838f9a0ec9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java @@ -102,7 +102,7 @@ public SchemaIndexCacheVisitorImpl( return; } - cctx.group().metrics().addIndexBuildCountPartitionsLeft(locParts.size()); + cctx.cache().metrics0().addIndexBuildPartitionsLeftCount(locParts.size()); cctx.cache().metrics0().resetIndexRebuildKeyProcessed(); beforeExecute(); diff --git a/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java b/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java index 6d9a557dc490a..8b85f6be36e59 100644 --- a/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java +++ b/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java @@ -552,5 +552,10 @@ private static class TestCacheMetrics implements CacheMetrics { @Override public long getIndexRebuildKeysProcessed() { return 0; } + + /** {@inheritDoc} */ + @Override public int getIndexBuildPartitionsLeftCount() { + return 0; + } } }