From 1ba83f5d9126f619693269c62083134267d315fc Mon Sep 17 00:00:00 2001 From: Nikita Amelchev Date: Thu, 23 May 2024 21:28:49 +0300 Subject: [PATCH 01/25] IGNITE-22319 Node crashes if a snapshot restore cancelled due to network issues --- .../cache/persistence/snapshot/IgniteSnapshotManager.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java index 30b54fb3180a0..0a4fa8626c499 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java @@ -3875,7 +3875,7 @@ public synchronized void stop() { /** * @param nodeId A node left the cluster. */ - public void onNodeLeft(UUID nodeId) { + public synchronized void onNodeLeft(UUID nodeId) { Set futs = activeTasks(); ClusterTopologyCheckedException ex = new ClusterTopologyCheckedException("The node from which a snapshot has been " + "requested left the grid"); @@ -4019,7 +4019,7 @@ else if (msg instanceof SnapshotFilesFailureMessage) { if (task == null) return; - assert task.rmtNodeId.equals(nodeId); + assert task.isDone() || task.stopChecker.getAsBoolean() || task.rmtNodeId.equals(nodeId); task.acceptException(ex); } From 803c1905d53058b9a5f0b608d5b46df502b41aff Mon Sep 17 00:00:00 2001 From: Nikita Amelchev Date: Sat, 25 May 2024 00:49:41 +0300 Subject: [PATCH 02/25] IGNITE-22319 Node crashes if a snapshot restore cancelled due to network issues --- .../snapshot/IgniteSnapshotManager.java | 10 +++--- .../IgniteSnapshotRestoreFromRemoteTest.java | 35 +++++++++++++++++++ 2 files changed, 41 insertions(+), 4 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java index 0a4fa8626c499..ab427875026de 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java @@ -46,13 +46,13 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; import java.util.Optional; -import java.util.Queue; import java.util.Set; import java.util.TreeMap; import java.util.UUID; @@ -3806,7 +3806,7 @@ private class SequentialRemoteSnapshotManager implements TransmissionHandler, Gr private volatile RemoteSnapshotFilesRecevier active; /** Queue of asynchronous tasks to execute. */ - private final Queue queue = new ConcurrentLinkedDeque<>(); + private final Deque queue = new ConcurrentLinkedDeque<>(); /** {@code true} if the node is stopping. */ private boolean stopping; @@ -3890,7 +3890,9 @@ public synchronized void onNodeLeft(UUID nodeId) { * @return The set of currently scheduled tasks, some of them may be already completed. */ private Set activeTasks() { - Set futs = new HashSet<>(queue); + Set futs = new LinkedHashSet<>(); + + queue.descendingIterator().forEachRemaining(futs::add); RemoteSnapshotFilesRecevier active0 = active; @@ -4013,7 +4015,7 @@ else if (msg instanceof SnapshotFilesFailureMessage) { } /** {@inheritDoc} */ - @Override public void onException(UUID nodeId, Throwable ex) { + @Override public synchronized void onException(UUID nodeId, Throwable ex) { RemoteSnapshotFilesRecevier task = active; if (task == null) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java index 40bd60290e6aa..0cf7bd5062ca6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java @@ -64,6 +64,7 @@ import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId; import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory; import static org.apache.ignite.testframework.GridTestUtils.assertContains; +import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause; /** */ public class IgniteSnapshotRestoreFromRemoteTest extends IgniteClusterSnapshotRestoreBaseTest { @@ -323,6 +324,40 @@ public void testSnapshotCachesStoppedIfLoadingFailOnRemote() throws Exception { ensureCacheAbsent(dfltCacheCfg); } + /** @throws Exception If failed. */ + @Test + public void testRestoreConnectionLost() throws Exception { + IgniteEx coord = startDedicatedGrids(SECOND_CLUSTER_PREFIX, 2); + + copyAndShuffle(snpParts, G.allGrids()); + + // Start a new node without snapshot working directory. + IgniteEx emptyNode = startDedicatedGrid(SECOND_CLUSTER_PREFIX, 2); + + emptyNode.cluster().state(ClusterState.ACTIVE); + + emptyNode.cache(DEFAULT_CACHE_NAME).destroy(); + awaitPartitionMapExchange(); + + IgniteSnapshotManager mgr = snp(coord); + mgr.remoteSnapshotSenderFactory(new BiFunction() { + @Override public SnapshotSender apply(String s, UUID uuid) { + return new DelegateSnapshotSender(log, mgr.snapshotExecutorService(), mgr.remoteSnapshotSenderFactory(s, uuid)) { + @Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) { + delegate.sendPart0(part, cacheDirName, pair, length); + + GridTestUtils.runAsync(coord::close); + } + }; + } + }); + + // Restore all cache groups. + IgniteFuture fut = emptyNode.snapshot().restoreSnapshot(SNAPSHOT_NAME, null); + + assertThrowsWithCause(() -> fut.get(TIMEOUT), IgniteException.class); + } + /** * @param snpParts Snapshot parts. * @param toNodes List of toNodes to copy parts to. From 4b55d3145d7ec2c00298ad8d9c4387bcff8d2ecf Mon Sep 17 00:00:00 2001 From: Nikita Amelchev Date: Thu, 6 Jun 2024 00:20:34 +0300 Subject: [PATCH 03/25] IGNITE-22319 Node crashes if a snapshot restore cancelled due to network issues --- .../cache/persistence/snapshot/SnapshotRestoreProcess.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java index d1a1ac3586898..0d6ac7ee52cdc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java @@ -1231,8 +1231,7 @@ private IgniteInternalFuture preload(UUID reqId) { } catch (Exception ex) { opCtx0.errHnd.accept(ex); - - return new GridFinishedFuture<>(ex); + retFut.onDone(ex); } return retFut; @@ -1916,7 +1915,7 @@ private static class SnapshotRestoreContext { private volatile Map cfgs = Collections.emptyMap(); /** Graceful shutdown future. */ - private volatile IgniteFuture stopFut; + private volatile IgniteFuture stopFut; /** Operation start time. */ private final long startTime; From b5077d1df55ee08c136f8ac8dcd8d6fb975f399b Mon Sep 17 00:00:00 2001 From: Nikita Amelchev Date: Thu, 6 Jun 2024 19:02:43 +0300 Subject: [PATCH 04/25] IGNITE-22319 Node crashes if a snapshot restore cancelled due to network issues --- .../managers/communication/GridIoManager.java | 22 ++++++++++ .../snapshot/IgniteSnapshotManager.java | 42 ++++++++----------- 2 files changed, 39 insertions(+), 25 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index f4efa8493001d..14a2365cb006a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -1958,6 +1958,28 @@ public void removeTransmissionHandler(Object topic) { "on local node [nodeId=" + ctx.localNodeId() + ']')); } + /** + * @param nodeId The remote node ID. + * @param ex Exception. + */ + public void interruptTransmissionReceiver(UUID nodeId, Exception ex) { + synchronized (rcvMux) { + Iterator> it = rcvCtxs.entrySet().iterator(); + + while (it.hasNext()) { + Map.Entry e = it.next(); + + if (nodeId.equals(e.getValue().rmtNodeId)) { + it.remove(); + + log.info("MY interruptTransmissionReceiver=" + e.getValue().rmtNodeId); + + interruptReceiver(e.getValue(), ex); + } + } + } + } + /** * This method must be used prior to opening a {@link TransmissionSender} by calling * {@link #openTransmissionSender(UUID, Object)} to ensure that remote and local nodes diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java index ab427875026de..341a13bb38335 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java @@ -164,7 +164,6 @@ import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; import org.apache.ignite.internal.util.distributed.DistributedProcess; import org.apache.ignite.internal.util.distributed.InitMessage; -import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -3857,19 +3856,6 @@ public synchronized void stop() { while ((r = queue.poll()) != null) r.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG)); - - Set futs = activeTasks(); - GridCompoundFuture stopFut = new GridCompoundFuture<>(); - - try { - for (IgniteInternalFuture fut : futs) - stopFut.add(fut); - - stopFut.markInitialized().get(); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } } /** @@ -3969,20 +3955,26 @@ private Set activeTasks() { else if (msg instanceof SnapshotFilesFailureMessage) { SnapshotFilesFailureMessage respMsg0 = (SnapshotFilesFailureMessage)msg; - RemoteSnapshotFilesRecevier task = active; + synchronized (this) { + RemoteSnapshotFilesRecevier task = active; - if (task == null || !task.reqId.equals(respMsg0.id())) { - if (log.isInfoEnabled()) { - log.info("A stale snapshot response message has been received. Will be ignored " + - "[fromNodeId=" + nodeId + ", response=" + respMsg0 + ']'); + if (task == null || !task.reqId.equals(respMsg0.id())) { + if (log.isInfoEnabled()) { + log.info("A stale snapshot response message has been received. Will be ignored " + + "[fromNodeId=" + nodeId + ", response=" + respMsg0 + ']'); + } + + return; } - return; - } + if (respMsg0.errorMessage() != null) { + IgniteCheckedException e = new IgniteCheckedException("Request cancelled. The snapshot operation stopped " + + "on the remote node with an error: " + respMsg0.errorMessage()); + + cctx.kernalContext().io().interruptTransmissionReceiver(task.rmtNodeId, e); - if (respMsg0.errorMessage() != null) { - task.acceptException(new IgniteCheckedException("Request cancelled. The snapshot operation stopped " + - "on the remote node with an error: " + respMsg0.errorMessage())); + task.acceptException(e); + } } } } @@ -4021,7 +4013,7 @@ else if (msg instanceof SnapshotFilesFailureMessage) { if (task == null) return; - assert task.isDone() || task.stopChecker.getAsBoolean() || task.rmtNodeId.equals(nodeId); + assert task.rmtNodeId.equals(nodeId); task.acceptException(ex); } From 8b88ea9abe5b69e32a06f6f79396e347ef4c7ead Mon Sep 17 00:00:00 2001 From: Nikita Amelchev Date: Thu, 6 Jun 2024 20:37:23 +0300 Subject: [PATCH 05/25] IGNITE-22319 Node crashes if a snapshot restore cancelled due to network issues --- .../managers/communication/GridIoManager.java | 20 ++-------- .../snapshot/IgniteSnapshotManager.java | 38 ++++++++----------- 2 files changed, 18 insertions(+), 40 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 14a2365cb006a..66ae2d547566e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -1959,25 +1959,11 @@ public void removeTransmissionHandler(Object topic) { } /** - * @param nodeId The remote node ID. + * @param topic The topic to interrupt receiver from. * @param ex Exception. */ - public void interruptTransmissionReceiver(UUID nodeId, Exception ex) { - synchronized (rcvMux) { - Iterator> it = rcvCtxs.entrySet().iterator(); - - while (it.hasNext()) { - Map.Entry e = it.next(); - - if (nodeId.equals(e.getValue().rmtNodeId)) { - it.remove(); - - log.info("MY interruptTransmissionReceiver=" + e.getValue().rmtNodeId); - - interruptReceiver(e.getValue(), ex); - } - } - } + public void interruptTransmissionReceiver(Object topic, Exception ex) { + interruptReceiver(rcvCtxs.remove(topic), ex); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java index 1b579707d2a02..aa245db1c55fb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java @@ -46,13 +46,13 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; import java.util.Optional; +import java.util.Queue; import java.util.Set; import java.util.TreeMap; import java.util.UUID; @@ -3733,7 +3733,7 @@ private class SequentialRemoteSnapshotManager implements TransmissionHandler, Gr private volatile RemoteSnapshotFilesRecevier active; /** Queue of asynchronous tasks to execute. */ - private final Deque queue = new ConcurrentLinkedDeque<>(); + private final Queue queue = new ConcurrentLinkedDeque<>(); /** {@code true} if the node is stopping. */ private boolean stopping; @@ -3790,30 +3790,26 @@ public synchronized void stop() { * @param nodeId A node left the cluster. */ public synchronized void onNodeLeft(UUID nodeId) { - Set futs = activeTasks(); + if (active == null || active.isDone()) + return; + ClusterTopologyCheckedException ex = new ClusterTopologyCheckedException("The node from which a snapshot has been " + "requested left the grid"); - futs.forEach(t -> { + if (active.rmtNodeId.equals(nodeId)) + interruptActive(ex); + + queue.forEach(t -> { if (t.rmtNodeId.equals(nodeId)) t.acceptException(ex); }); } - /** - * @return The set of currently scheduled tasks, some of them may be already completed. - */ - private Set activeTasks() { - Set futs = new LinkedHashSet<>(); - - queue.descendingIterator().forEachRemaining(futs::add); - - RemoteSnapshotFilesRecevier active0 = active; - - if (active0 != null) - futs.add(active0); + /** Interrupts current active task (if present) to safely shedule next. */ + private void interruptActive(Exception e) { + cctx.kernalContext().io().interruptTransmissionReceiver(DFLT_INITIAL_SNAPSHOT_TOPIC, e); - return futs; + active.acceptException(e); } /** {@inheritDoc} */ @@ -3896,12 +3892,8 @@ else if (msg instanceof SnapshotFilesFailureMessage) { } if (respMsg0.errorMessage() != null) { - IgniteCheckedException e = new IgniteCheckedException("Request cancelled. The snapshot operation stopped " + - "on the remote node with an error: " + respMsg0.errorMessage()); - - cctx.kernalContext().io().interruptTransmissionReceiver(task.rmtNodeId, e); - - task.acceptException(e); + interruptActive(new IgniteCheckedException("Request cancelled. The snapshot operation " + + "stopped on the remote node with an error: " + respMsg0.errorMessage())); } } } From e461ba47b442147fc1530c5dd6e45babcc2d8822 Mon Sep 17 00:00:00 2001 From: Nikita Amelchev Date: Thu, 6 Jun 2024 20:52:50 +0300 Subject: [PATCH 06/25] IGNITE-22319 Node crashes if a snapshot restore cancelled due to network issues --- .../snapshot/IgniteSnapshotRestoreFromRemoteTest.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java index 0cf7bd5062ca6..2814ae5ebac23 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java @@ -30,6 +30,8 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.function.Function; @@ -339,6 +341,8 @@ public void testRestoreConnectionLost() throws Exception { emptyNode.cache(DEFAULT_CACHE_NAME).destroy(); awaitPartitionMapExchange(); + CountDownLatch latch = new CountDownLatch(1); + IgniteSnapshotManager mgr = snp(coord); mgr.remoteSnapshotSenderFactory(new BiFunction() { @Override public SnapshotSender apply(String s, UUID uuid) { @@ -346,7 +350,7 @@ public void testRestoreConnectionLost() throws Exception { @Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) { delegate.sendPart0(part, cacheDirName, pair, length); - GridTestUtils.runAsync(coord::close); + latch.countDown(); } }; } @@ -355,6 +359,10 @@ public void testRestoreConnectionLost() throws Exception { // Restore all cache groups. IgniteFuture fut = emptyNode.snapshot().restoreSnapshot(SNAPSHOT_NAME, null); + latch.await(TIMEOUT, TimeUnit.MILLISECONDS); + + coord.close(); + assertThrowsWithCause(() -> fut.get(TIMEOUT), IgniteException.class); } From 6dbebea32407fb7aefb85f70d6efa9573b13c6e4 Mon Sep 17 00:00:00 2001 From: Nikita Amelchev Date: Mon, 10 Jun 2024 14:52:06 +0300 Subject: [PATCH 07/25] Review fixes --- .../cache/persistence/snapshot/IgniteSnapshotManager.java | 8 +++----- .../snapshot/IgniteSnapshotRestoreFromRemoteTest.java | 2 ++ 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java index aa245db1c55fb..cd83615ef1975 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java @@ -3928,14 +3928,12 @@ else if (msg instanceof SnapshotFilesFailureMessage) { /** {@inheritDoc} */ @Override public synchronized void onException(UUID nodeId, Throwable ex) { - RemoteSnapshotFilesRecevier task = active; - - if (task == null) + if (active == null) return; - assert task.rmtNodeId.equals(nodeId); + assert active.rmtNodeId.equals(nodeId); - task.acceptException(ex); + active.acceptException(ex); } /** {@inheritDoc} */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java index 2814ae5ebac23..dd861618ef3ba 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java @@ -339,11 +339,13 @@ public void testRestoreConnectionLost() throws Exception { emptyNode.cluster().state(ClusterState.ACTIVE); emptyNode.cache(DEFAULT_CACHE_NAME).destroy(); + awaitPartitionMapExchange(); CountDownLatch latch = new CountDownLatch(1); IgniteSnapshotManager mgr = snp(coord); + mgr.remoteSnapshotSenderFactory(new BiFunction() { @Override public SnapshotSender apply(String s, UUID uuid) { return new DelegateSnapshotSender(log, mgr.snapshotExecutorService(), mgr.remoteSnapshotSenderFactory(s, uuid)) { From 4df1317d26cecd27f997e51d2dceae07dab0bca7 Mon Sep 17 00:00:00 2001 From: Nikita Amelchev Date: Mon, 10 Jun 2024 15:36:13 +0300 Subject: [PATCH 08/25] Review fixes --- .../snapshot/IgniteSnapshotManager.java | 4 +--- .../IgniteSnapshotRestoreFromRemoteTest.java | 16 +++++++++++++--- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java index cd83615ef1975..74fa00ccbf71a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java @@ -3880,9 +3880,7 @@ else if (msg instanceof SnapshotFilesFailureMessage) { SnapshotFilesFailureMessage respMsg0 = (SnapshotFilesFailureMessage)msg; synchronized (this) { - RemoteSnapshotFilesRecevier task = active; - - if (task == null || !task.reqId.equals(respMsg0.id())) { + if (active == null || !active.reqId.equals(respMsg0.id())) { if (log.isInfoEnabled()) { log.info("A stale snapshot response message has been received. Will be ignored " + "[fromNodeId=" + nodeId + ", response=" + respMsg0 + ']'); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java index dd861618ef3ba..cbb5767e9c45c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java @@ -342,7 +342,8 @@ public void testRestoreConnectionLost() throws Exception { awaitPartitionMapExchange(); - CountDownLatch latch = new CountDownLatch(1); + CountDownLatch restoreStarted = new CountDownLatch(1); + CountDownLatch nodeStopped = new CountDownLatch(1); IgniteSnapshotManager mgr = snp(coord); @@ -352,7 +353,14 @@ public void testRestoreConnectionLost() throws Exception { @Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) { delegate.sendPart0(part, cacheDirName, pair, length); - latch.countDown(); + restoreStarted.countDown(); + + try { + nodeStopped.await(TIMEOUT, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } } }; } @@ -361,10 +369,12 @@ public void testRestoreConnectionLost() throws Exception { // Restore all cache groups. IgniteFuture fut = emptyNode.snapshot().restoreSnapshot(SNAPSHOT_NAME, null); - latch.await(TIMEOUT, TimeUnit.MILLISECONDS); + restoreStarted.await(TIMEOUT, TimeUnit.MILLISECONDS); coord.close(); + nodeStopped.countDown(); + assertThrowsWithCause(() -> fut.get(TIMEOUT), IgniteException.class); } From b877084fb4fdf80ec4ab3e1f8e32a2c62e5a3ee2 Mon Sep 17 00:00:00 2001 From: Nikita Amelchev Date: Mon, 10 Jun 2024 15:49:07 +0300 Subject: [PATCH 09/25] Review fixes --- .../cache/persistence/snapshot/IgniteSnapshotManager.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java index 74fa00ccbf71a..bba915f83c068 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java @@ -3777,13 +3777,15 @@ private synchronized void scheduleNext() { public synchronized void stop() { stopping = true; + IgniteException err = new IgniteException(SNP_NODE_STOPPING_ERR_MSG); + if (active != null) - active.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG)); + active.acceptException(err); RemoteSnapshotFilesRecevier r; while ((r = queue.poll()) != null) - r.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG)); + r.acceptException(err); } /** From 72678c6d45db3463f0ce3875f79985a79052cf6c Mon Sep 17 00:00:00 2001 From: Nikita Amelchev Date: Tue, 11 Jun 2024 11:09:59 +0300 Subject: [PATCH 10/25] Review fixes --- .../cache/persistence/snapshot/SnapshotRestoreProcess.java | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java index e1b0999f027b0..c334ff90688d7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java @@ -1200,6 +1200,7 @@ private IgniteInternalFuture preload(UUID reqId) { } catch (Exception ex) { opCtx0.errHnd.accept(ex); + retFut.onDone(ex); } From 1b01fae04f1ad56060d41eee3b8e3bb66732913b Mon Sep 17 00:00:00 2001 From: Nikita Amelchev Date: Thu, 11 Jul 2024 18:48:34 +0300 Subject: [PATCH 11/25] wip --- .../managers/communication/GridIoManager.java | 8 -- .../snapshot/IgniteSnapshotManager.java | 117 +++++++++--------- 2 files changed, 59 insertions(+), 66 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 3066ea98cbf12..2e721239ceda2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -1958,14 +1958,6 @@ public void removeTransmissionHandler(Object topic) { "on local node [nodeId=" + ctx.localNodeId() + ']')); } - /** - * @param topic The topic to interrupt receiver from. - * @param ex Exception. - */ - public void interruptTransmissionReceiver(Object topic, Exception ex) { - interruptReceiver(rcvCtxs.remove(topic), ex); - } - /** * This method must be used prior to opening a {@link TransmissionSender} by calling * {@link #openTransmissionSender(UUID, Object)} to ensure that remote and local nodes diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java index bba915f83c068..f9fec415613da 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java @@ -159,6 +159,7 @@ import org.apache.ignite.internal.processors.marshaller.MappedName; import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl; import org.apache.ignite.internal.processors.task.GridInternal; +import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; import org.apache.ignite.internal.util.BasicRateLimiter; import org.apache.ignite.internal.util.GridBusyLock; import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; @@ -721,6 +722,8 @@ public static String partDeltaFileName(int partId) { busyLock.block(); try { + snpRmtMgr.stop(); + restoreCacheGrpProc.interrupt(new NodeStoppingException("Node is stopping.")); // Try stop all snapshot processing if not yet. @@ -729,8 +732,6 @@ public static String partDeltaFileName(int partId) { locSnpTasks.clear(); - snpRmtMgr.stop(); - synchronized (snpOpMux) { if (clusterSnpFut != null) { clusterSnpFut.onDone(new NodeStoppingException(SNP_NODE_STOPPING_ERR_MSG)); @@ -3738,33 +3739,33 @@ private class SequentialRemoteSnapshotManager implements TransmissionHandler, Gr /** {@code true} if the node is stopping. */ private boolean stopping; - /** - * @param next New task for scheduling. - */ - public synchronized void submit(IgniteSnapshotManager.RemoteSnapshotFilesRecevier next) { + /** @param next New task for scheduling. */ + public void submit(IgniteSnapshotManager.RemoteSnapshotFilesRecevier next) { assert next != null; - if (stopping) { - next.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG)); + synchronized (this) { + if (stopping) { + next.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG)); - return; - } + return; + } - RemoteSnapshotFilesRecevier curr = active; + if (active != null && !active.isDone()) { + queue.offer(next); - if (curr == null || curr.isDone()) { - next.listen(this::scheduleNext); + return; + } active = next; - next.init(); + active.listen(this::scheduleNext); } - else - queue.offer(next); + + next.init(); } /** Schedule next async receiver. */ - private synchronized void scheduleNext() { + private void scheduleNext() { RemoteSnapshotFilesRecevier next = queue.poll(); if (next == null) @@ -3774,44 +3775,42 @@ private synchronized void scheduleNext() { } /** Stopping handler. */ - public synchronized void stop() { - stopping = true; - - IgniteException err = new IgniteException(SNP_NODE_STOPPING_ERR_MSG); - - if (active != null) - active.acceptException(err); + public void stop() { + synchronized (this) { + stopping = true; + } RemoteSnapshotFilesRecevier r; while ((r = queue.poll()) != null) - r.acceptException(err); - } + r.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG)); - /** - * @param nodeId A node left the cluster. - */ - public synchronized void onNodeLeft(UUID nodeId) { - if (active == null || active.isDone()) - return; + if (active != null) + active.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG)); + } + /** @param nodeId A node left the cluster. */ + public void onNodeLeft(UUID nodeId) { ClusterTopologyCheckedException ex = new ClusterTopologyCheckedException("The node from which a snapshot has been " + "requested left the grid"); - if (active.rmtNodeId.equals(nodeId)) - interruptActive(ex); - - queue.forEach(t -> { - if (t.rmtNodeId.equals(nodeId)) - t.acceptException(ex); + queue.forEach(r -> { + if (r.rmtNodeId.equals(nodeId)) + r.acceptException(ex); }); - } - /** Interrupts current active task (if present) to safely shedule next. */ - private void interruptActive(Exception e) { - cctx.kernalContext().io().interruptTransmissionReceiver(DFLT_INITIAL_SNAPSHOT_TOPIC, e); + RemoteSnapshotFilesRecevier active0 = active; - active.acceptException(e); + if (active0 != null && !active0.isDone() && active0.rmtNodeId.equals(nodeId)) { + // If the task was started, but the remote node did not start the transmission and did not send SnapshotFilesFailureMessage. + // Otherwise, the task will fail via TransmissionHandler#onException. + cctx.kernalContext().timeout().addTimeoutObject( + new GridTimeoutObjectAdapter(2 * cctx.kernalContext().config().getNetworkTimeout()) { + @Override public void onTimeout() { + active0.acceptException(ex); + } + }); + } } /** {@inheritDoc} */ @@ -3881,20 +3880,20 @@ private void interruptActive(Exception e) { else if (msg instanceof SnapshotFilesFailureMessage) { SnapshotFilesFailureMessage respMsg0 = (SnapshotFilesFailureMessage)msg; - synchronized (this) { - if (active == null || !active.reqId.equals(respMsg0.id())) { - if (log.isInfoEnabled()) { - log.info("A stale snapshot response message has been received. Will be ignored " + - "[fromNodeId=" + nodeId + ", response=" + respMsg0 + ']'); - } + RemoteSnapshotFilesRecevier task = active; - return; + if (task == null || !task.reqId.equals(respMsg0.id())) { + if (log.isInfoEnabled()) { + log.info("A stale snapshot response message has been received. Will be ignored " + + "[fromNodeId=" + nodeId + ", response=" + respMsg0 + ']'); } - if (respMsg0.errorMessage() != null) { - interruptActive(new IgniteCheckedException("Request cancelled. The snapshot operation " + - "stopped on the remote node with an error: " + respMsg0.errorMessage())); - } + return; + } + + if (respMsg0.errorMessage() != null) { + task.acceptException(new IgniteCheckedException("Request cancelled. The snapshot operation stopped " + + "on the remote node with an error: " + respMsg0.errorMessage())); } } } @@ -3927,13 +3926,15 @@ else if (msg instanceof SnapshotFilesFailureMessage) { } /** {@inheritDoc} */ - @Override public synchronized void onException(UUID nodeId, Throwable ex) { - if (active == null) + @Override public void onException(UUID nodeId, Throwable ex) { + RemoteSnapshotFilesRecevier task = active; + + if (task == null) return; - assert active.rmtNodeId.equals(nodeId); + assert task.rmtNodeId.equals(nodeId); - active.acceptException(ex); + task.acceptException(ex); } /** {@inheritDoc} */ From 53bde1d79480473d1c57429e9d305942ec326adf Mon Sep 17 00:00:00 2001 From: Nikita Amelchev Date: Thu, 11 Jul 2024 22:16:57 +0300 Subject: [PATCH 12/25] wip --- .../persistence/snapshot/IgniteSnapshotRemoteRequestTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java index fcde905bc8b70..8be03f0195a18 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java @@ -267,7 +267,7 @@ public void testSnapshotRequestRemoteSourceNodeLeft() throws Exception { latch.countDown(); assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), ClusterTopologyCheckedException.class, - "he node from which a snapshot has been requested left the grid"); + "Remote node left the grid."); } /** @throws Exception If fails. */ From ccef631dfe67f54256f3031f881b0255a5a83925 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Fri, 19 Jul 2024 09:31:58 +0300 Subject: [PATCH 13/25] test --- .../query/stat/StatisticsAbstractTest.java | 15 +- .../stat/StatisticsObsolescenceTest.java | 135 ++++++++++-------- 2 files changed, 92 insertions(+), 58 deletions(-) diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsAbstractTest.java index 8c87170a466be..388251ff31f0f 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsAbstractTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsAbstractTest.java @@ -261,12 +261,23 @@ protected List> sql(String sql) { } /** - * Create SQL table with the given index. + * Create SQL table with the given index and fill with samll amount of data. * * @param suffix Table idx, if {@code null} - name "SMALL" without index will be used. * @return Table name. */ protected String createSmallTable(String suffix) { + return createSmallTable(SMALL_SIZE, suffix); + } + + /** + * Create SQL table with the given index and preload size. + * + * @param preloadCnt Records cnt to load after creation. + * @param suffix Table idx, if {@code null} - name "SMALL" without index will be used. + * @return Table name. + */ + protected String createSmallTable(int preloadCnt, String suffix) { String tblName = "small" + ((suffix != null) ? suffix : ""); sql("DROP TABLE IF EXISTS " + tblName); @@ -279,7 +290,7 @@ protected String createSmallTable(String suffix) { sql(String.format("CREATE INDEX %s_c ON %s(c)", tblName, tblName)); - for (int i = 0; i < SMALL_SIZE; i++) + for (int i = 0; i < preloadCnt; i++) sql(String.format("INSERT INTO %s(a, b, c) VALUES(%d, %d, %d)", tblName, i, i, i % 10)); return tblName; diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java index 1baa806d1a202..c2ff6fa7770e2 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java @@ -17,14 +17,12 @@ package org.apache.ignite.internal.processors.query.stat; -import java.util.Map; - -import org.apache.ignite.Ignite; -import org.apache.ignite.cluster.ClusterState; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.util.collection.IntMap; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.GridTestUtils; import org.junit.Test; @@ -34,76 +32,101 @@ * Test for statistics obsolescence. */ public class StatisticsObsolescenceTest extends StatisticsAbstractTest { - /** - * Test statistics refreshing after significant changes of base table: - * 1) Create and populate small table - * 2) Analyze it and get local statistics - * 3) Insert same number of rows into small table - * 4) Check that statistics refreshed and its values changed. - * - * @throws Exception In case of error. - */ + /** Ensured IgniteStatisticsManagerImpl#OBSOLESCENCE_INTERVAL. 5 minutes. */ + @Override protected long getTestTimeout() { + return 5L * 60L * 1000L; + } + + /** */ @Test - public void testObsolescence() throws Exception { - startGridsMultiThreaded(1); + public void testObsolescenceUnderLoad() throws Exception { + int servers = 2; - createSmallTable(null); + AtomicBoolean stop = new AtomicBoolean(); - statisticsMgr(0).collectStatistics(buildDefaultConfigurations(SMALL_TARGET)); + try { + startGridsMultiThreaded(servers); - assertTrue(GridTestUtils.waitForCondition(() -> statisticsMgr(0).getLocalStatistics(SMALL_KEY) != null, TIMEOUT)); + long top = grid(0).cluster().topologyVersion(); - ObjectStatisticsImpl stat1 = (ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY); + createSmallTable(0, null); - assertNotNull(stat1); + statisticsMgr(0).usageState(StatisticsUsageState.ON); - for (int i = SMALL_SIZE; i < 2 * SMALL_SIZE; i++) - sql(String.format("INSERT INTO small(a, b, c) VALUES(%d, %d, %d)", i, i, i % 10)); + if (log.isInfoEnabled()) + log.info("Enabling statistic for the table " + SMALL_TARGET); - statisticsMgr(0).processObsolescence(); + statisticsMgr(0).collectStatistics(buildDefaultConfigurations(SMALL_TARGET)); - assertTrue(GridTestUtils.waitForCondition(() -> { - ObjectStatisticsImpl stat2 = (ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY); + // Initialized, empty statistics. + assertTrue(GridTestUtils.waitForCondition(() -> statisticsMgr(0).getLocalStatistics(SMALL_KEY) != null, getTestTimeout())); + assertTrue(GridTestUtils.waitForCondition(() -> statisticsMgr(1).getLocalStatistics(SMALL_KEY) != null, getTestTimeout())); - return stat2 != null && stat2.rowCount() > stat1.rowCount(); - }, TIMEOUT)); - } + if (log.isInfoEnabled()) + log.info("Got first, empty statistic of the table " + SMALL_TARGET); - /** - * Test activation with statistics with topology changes. - * - * 1) Start two node cluster. - * 2) Activate cluster. - * 3) Create table and analyze it. - * 4) Inactivate cluster and change it's topology. - * 5) Get obsolescence map size for created table. - * 6) Activate cluster again. - * 7) Check that obsolescence map size changed due to new topology. - * - * @throws Exception In case of errors. - */ - @Test - public void testInactiveLoad() throws Exception { - Ignite ignite = startGrid(0); - Ignite ignite1 = startGrid(1); + ObjectStatisticsImpl emptyStat = (ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY); + + assertTrue(emptyStat.rowCount() == 0); + + GridTestUtils.runAsync(() -> { + AtomicLong key = new AtomicLong(); + + if (log.isInfoEnabled()) + log.info("Starting the loading..."); + + long time = System.nanoTime(); + + while (!stop.get()) { + sql(String.format("INSERT INTO small(a, b, c) VALUES(%d, %d, %d)", key.incrementAndGet(), key.get(), key.get() % 10)); + + if (U.nanosToMillis(System.nanoTime() - time) > 10_000L) { + time = System.nanoTime(); + + if (log.isInfoEnabled()) + log.info("Loaded " + grid(0).cache("SMALLnull").size() + " records."); + } + } + + if (log.isInfoEnabled()) + log.info("The loading stopped."); + }); + + // Here we get non-zero, updated statistics. + assertTrue(GridTestUtils.waitForCondition(() -> { + ObjectStatisticsImpl updatedStat = (ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY); + + return updatedStat != null && updatedStat.rowCount() > emptyStat.rowCount(); + }, getTestTimeout())); - ignite.cluster().state(ClusterState.ACTIVE); + // First not empty statistics (rowCount > 0). + ObjectStatisticsImpl firstNotEmpty = (ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY); - createSmallTable(null); - collectStatistics(StatisticsType.GLOBAL, "SMALL"); + assertTrue(firstNotEmpty.rowCount() > 0); - ignite.cluster().state(ClusterState.INACTIVE); + if (log.isInfoEnabled()) + log.info("Got first not empty statistic: " + firstNotEmpty); - ignite1.close(); + // FAILS here with a timeout. + // Continuing data loading, the table is being updated. Since the row count is inreasing, we must obtain a + // new statistics, greather than {@code firstNotEmpty}. + assertTrue(GridTestUtils.waitForCondition(() -> { + ObjectStatisticsImpl updatedStat2 = (ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY); - Map> statObs = GridTestUtils - .getFieldValue(statisticsMgr(0).statisticsRepository(), "statObs"); + return updatedStat2 != null && updatedStat2.rowCount() > firstNotEmpty.rowCount(); + }, getTestTimeout())); - Integer oldSize = statObs.get(SMALL_KEY).size(); + // Ensure the topology hasn't changed and didn't trigger the statistics. + assertEquals(top, grid(0).cluster().topologyVersion()); - ignite.cluster().state(ClusterState.ACTIVE); + ObjectStatisticsImpl secondNotEmpty = (ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY); - assertTrue(GridTestUtils.waitForCondition(() -> statObs.get(SMALL_KEY).size() > oldSize, TIMEOUT)); + if (log.isInfoEnabled()) + log.info("Got second not empty statistic: " + secondNotEmpty); + } + finally { + stop.set(true); + } } /** {@inheritDoc} */ From 447a15cfd5baa019a2583c3991fd753ad9cdc7fa Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Fri, 19 Jul 2024 17:16:04 +0300 Subject: [PATCH 14/25] clients and sql cmds research --- .../cache/query/GridCacheQueryManager.java | 23 +++++++++++++ .../stat/StatisticsObsolescenceTest.java | 33 +++++++++++++++---- 2 files changed, 49 insertions(+), 7 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 48a0aaa2006d0..165401285f55f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -427,6 +427,29 @@ public void store(CacheDataRow newRow, @Nullable CacheDataRow prevRow, } } +// /** +// * Update key statistics. +// * +// * @param key Updated key. +// */ +// private void updateStatistics(KeyCacheObject key) { +// qryProc.statsManager().onRowUpdated(); +// +// GridCacheContext cacheCtx = cacheInfo.cacheContext(); +// if (cacheCtx == null) +// return; +// +// IgniteH2Indexing indexing = (IgniteH2Indexing)cacheCtx.kernalContext().query().getIndexing(); +// try { +// indexing.statsManager().onRowUpdated(this.identifier().schema(), +// this.identifier.table(), key.partition(), key.valueBytes(this.cacheContext().cacheObjectContext())); +// } +// catch (IgniteCheckedException e) { +// if (log.isDebugEnabled()) +// log.debug("Error while updating statistics obsolescence due to " + e.getMessage()); +// } +// } + /** * @param key Key. * @param prevRow Previous row. diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java index 66d160ddf3f13..543b55ff5ee6f 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java @@ -19,16 +19,18 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import org.apache.ignite.Ignite; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.client.IgniteClient; +import org.apache.ignite.configuration.ClientConfiguration; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.GridTestUtils; import org.junit.Test; -import static org.apache.ignite.internal.processors.query.stat.IgniteStatisticsHelper.buildDefaultConfigurations; - /** * Test for statistics obsolescence. */ @@ -48,8 +50,14 @@ public void testObsolescenceUnderLoad() throws Exception { try { startGridsMultiThreaded(servers); + Ignite thick = startClientGrid(servers); + + IgniteClient thin = Ignition.startClient(new ClientConfiguration().setAddresses("127.0.0.1:10800")); + long top = grid(0).cluster().topologyVersion(); + grid(0).cluster().setBaselineTopology(top); + createSmallTable(0, null); statisticsMgr(0).usageState(StatisticsUsageState.ON); @@ -57,12 +65,15 @@ public void testObsolescenceUnderLoad() throws Exception { if (log.isInfoEnabled()) log.info("Enabling statistic for the table " + SMALL_TARGET); - StatisticsObjectConfiguration[] statCfgs = buildDefaultConfigurations(SMALL_TARGET); - +// StatisticsObjectConfiguration[] statCfgs = buildDefaultConfigurations(SMALL_TARGET); // This FIXES the test (workaround). // statCfgs[0] = new StatisticsObjectConfiguration(statCfgs[0].key(), statCfgs[0].columns().values(), (byte)-1); +// statisticsMgr(0).collectStatistics(statCfgs); - statisticsMgr(0).collectStatistics(statCfgs); +// client.cache("SMALLnull").query(new SqlFieldsQuery("ANALYZE SMALL")).getAll(); + thin.cache("SMALLnull").query(new SqlFieldsQuery("ANALYZE SMALL")).getAll(); +// thick.cache("SMALLnull").query(new SqlFieldsQuery("REFRESH STATISTICS PUBLIC.SMALL")).getAll(); + thin.cache("SMALLnull").query(new SqlFieldsQuery("REFRESH STATISTICS PUBLIC.SMALL")).getAll(); // Initialized, empty statistics. assertTrue(GridTestUtils.waitForCondition(() -> statisticsMgr(0).getLocalStatistics(SMALL_KEY) != null, getTestTimeout())); @@ -98,7 +109,15 @@ public void testObsolescenceUnderLoad() throws Exception { log.info("The loading stopped."); }); - // Here we get non-zero, updated statistics. +// GridTestUtils.runAsync(() -> { +// for (int i = 0; i < 10; ++i) { +// thin.cache("SMALLnull").query(new SqlFieldsQuery("REFRESH STATISTICS PUBLIC.SMALL")).getAll(); +// +// Thread.sleep(100); +// } +// }); + + // Here we get not empty, updated statistics. assertTrue(GridTestUtils.waitForCondition(() -> { ObjectStatisticsImpl updatedStat = (ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY); From 9d0a18cf446027a71f6de4602799fe01f80ce84b Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Fri, 19 Jul 2024 19:18:58 +0300 Subject: [PATCH 15/25] + tests --- .../StatisticsCommandDdlIntegrationTest.java | 6 +- .../cache/query/GridCacheQueryManager.java | 23 -- .../processors/query/GridQueryProcessor.java | 4 + .../stat/IgniteStatisticsManagerImpl.java | 2 +- .../query/stat/StatisticsAbstractTest.java | 2 +- .../stat/StatisticsObsolescenceTest.java | 222 +++++++++++------- 6 files changed, 150 insertions(+), 109 deletions(-) diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/StatisticsCommandDdlIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/StatisticsCommandDdlIntegrationTest.java index 0652c00392aa3..3817d0131df3f 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/StatisticsCommandDdlIntegrationTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/StatisticsCommandDdlIntegrationTest.java @@ -119,7 +119,7 @@ public void testAnalyzeOptions() { // MAX_CHANGED_PARTITION_ROWS_PERCENT overrides old settings for all columns. sql(String.format( - "ANALYZE %s(%s) WITH DISTINCT=6,NULLS=7,TOTAL=8,MAX_CHANGED_PARTITION_ROWS_PERCENT=-1", + "ANALYZE %s(%s) WITH DISTINCT=6,NULLS=7,TOTAL=8,MAX_CHANGED_PARTITION_ROWS_PERCENT=2", TABLE_NAME, NAME_FIELD )); @@ -131,8 +131,8 @@ public void testAnalyzeOptions() { long ver = (Long)res.get(0).get(9); assertThat(res, hasItems( - Lists.newArrayList(PUBLIC_SCHEMA, "TABLE", TABLE_NAME, ID_FIELD, (byte)-1, 6L, 5L, 7L, 8, ver), - Lists.newArrayList(PUBLIC_SCHEMA, "TABLE", TABLE_NAME, NAME_FIELD, (byte)-1, 7L, 6L, 8L, null, ver) + Lists.newArrayList(PUBLIC_SCHEMA, "TABLE", TABLE_NAME, ID_FIELD, (byte)2, 6L, 5L, 7L, 8, ver), + Lists.newArrayList(PUBLIC_SCHEMA, "TABLE", TABLE_NAME, NAME_FIELD, (byte)2, 7L, 6L, 8L, null, ver) )); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 165401285f55f..48a0aaa2006d0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -427,29 +427,6 @@ public void store(CacheDataRow newRow, @Nullable CacheDataRow prevRow, } } -// /** -// * Update key statistics. -// * -// * @param key Updated key. -// */ -// private void updateStatistics(KeyCacheObject key) { -// qryProc.statsManager().onRowUpdated(); -// -// GridCacheContext cacheCtx = cacheInfo.cacheContext(); -// if (cacheCtx == null) -// return; -// -// IgniteH2Indexing indexing = (IgniteH2Indexing)cacheCtx.kernalContext().query().getIndexing(); -// try { -// indexing.statsManager().onRowUpdated(this.identifier().schema(), -// this.identifier.table(), key.partition(), key.valueBytes(this.cacheContext().cacheObjectContext())); -// } -// catch (IgniteCheckedException e) { -// if (log.isDebugEnabled()) -// log.debug("Error while updating statistics obsolescence due to " + e.getMessage()); -// } -// } - /** * @param key Key. * @param prevRow Previous row. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 208ee4f9c67e3..9e8897f2ef010 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -2747,6 +2747,8 @@ public void store(GridCacheContext cctx, CacheDataRow newRow, @Nullable CacheDat if (idx != null) idx.store(cctx, desc, newRow, prevRow, prevRowAvailable); + + statsMgr.onRowUpdated(desc.schemaName(), desc.tableName(), newRow.partition(), key.valueBytes(coctx)); } /** @@ -3614,6 +3616,8 @@ public void remove(GridCacheContext cctx, CacheDataRow row) if (indexingEnabled()) idx.remove(cctx, desc, row); + + statsMgr.onRowUpdated(desc.schemaName(), desc.tableName(), row.partition(), row.key().valueBytes(cctx.cacheObjectContext())); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java index 66f5af303b865..a7f75faa2173f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java @@ -60,7 +60,7 @@ public class IgniteStatisticsManagerImpl implements IgniteStatisticsManager { private static final StatisticsUsageState DEFAULT_STATISTICS_USAGE_STATE = ON; /** Interval to check statistics obsolescence in seconds. */ - private static final int OBSOLESCENCE_INTERVAL = 60; + static final int OBSOLESCENCE_INTERVAL = 20; /** Logger. */ private final IgniteLogger log; diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsAbstractTest.java index 388251ff31f0f..7edfe2c0fc1c2 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsAbstractTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsAbstractTest.java @@ -261,7 +261,7 @@ protected List> sql(String sql) { } /** - * Create SQL table with the given index and fill with samll amount of data. + * Create SQL table with the given index and fill with small amount of data. * * @param suffix Table idx, if {@code null} - name "SMALL" without index will be used. * @return Table name. diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java index 543b55ff5ee6f..eb0b43b66ed04 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java @@ -17,141 +17,201 @@ package org.apache.ignite.internal.processors.query.stat; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import org.apache.ignite.Ignite; -import org.apache.ignite.Ignition; -import org.apache.ignite.cache.query.SqlFieldsQuery; -import org.apache.ignite.client.IgniteClient; -import org.apache.ignite.configuration.ClientConfiguration; +import org.apache.ignite.cluster.ClusterState; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.util.collection.IntMap; import org.apache.ignite.testframework.GridTestUtils; import org.junit.Test; +import static org.apache.ignite.internal.processors.query.stat.IgniteStatisticsHelper.buildDefaultConfigurations; +import static org.apache.ignite.internal.processors.query.stat.IgniteStatisticsManagerImpl.OBSOLESCENCE_INTERVAL; + /** * Test for statistics obsolescence. */ public class StatisticsObsolescenceTest extends StatisticsAbstractTest { - /** Ensured IgniteStatisticsManagerImpl#OBSOLESCENCE_INTERVAL. 5 minutes. */ + /** */ + private long testTimeout = super.getTestTimeout(); + + /** {@inheritDoc} */ @Override protected long getTestTimeout() { - return 5L * 60L * 1000L; + return testTimeout; + } + + /** */ + @Test + public void testObsolescenceWithInserting() throws Exception { + doTestObsolescenceUnderLoad(0, 0L, 0L, 1, + key -> sql(String.format("INSERT INTO small(a, b, c) VALUES(%d, %d, %d)", key, key, key))); } /** */ @Test - public void testObsolescenceUnderLoad() throws Exception { - int servers = 2; + public void testObsolescenceWithUpdating() throws Exception { + doTestObsolescenceUnderLoad(10000, 10000L, 1L, 0, key -> sql("UPDATE small set b=b+1 where a=" + key)); + } + + /** */ + private void doTestObsolescenceUnderLoad(int preloadCnt, long maxKey, long firstKey, int rowCntCmp, Consumer crud) throws Exception { + // Ensured IgniteStatisticsManagerImpl#OBSOLESCENCE_INTERVAL. + testTimeout = 3L * OBSOLESCENCE_INTERVAL * 1000L; AtomicBoolean stop = new AtomicBoolean(); try { - startGridsMultiThreaded(servers); + startGridsMultiThreaded(2); - Ignite thick = startClientGrid(servers); + createSmallTable(preloadCnt, null); - IgniteClient thin = Ignition.startClient(new ClientConfiguration().setAddresses("127.0.0.1:10800")); + statisticsMgr(0).usageState(StatisticsUsageState.ON); + statisticsMgr(0).collectStatistics(buildDefaultConfigurations(SMALL_TARGET)); - long top = grid(0).cluster().topologyVersion(); + // Initialized statistics. + assertTrue(GridTestUtils.waitForCondition(() -> statisticsMgr(0).getLocalStatistics(SMALL_KEY) != null, getTestTimeout())); + assertTrue(GridTestUtils.waitForCondition(() -> statisticsMgr(1).getLocalStatistics(SMALL_KEY) != null, getTestTimeout())); - grid(0).cluster().setBaselineTopology(top); + ObjectStatisticsImpl initStat1 = (ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY); + ObjectStatisticsImpl initStat2 = (ObjectStatisticsImpl)statisticsMgr(1).getLocalStatistics(SMALL_KEY); - createSmallTable(0, null); + assertEquals(preloadCnt, initStat1.rowCount() + initStat2.rowCount()); - statisticsMgr(0).usageState(StatisticsUsageState.ON); + GridTestUtils.runAsync(() -> { + AtomicLong key = new AtomicLong(firstKey); - if (log.isInfoEnabled()) - log.info("Enabling statistic for the table " + SMALL_TARGET); + long a; -// StatisticsObjectConfiguration[] statCfgs = buildDefaultConfigurations(SMALL_TARGET); - // This FIXES the test (workaround). - // statCfgs[0] = new StatisticsObjectConfiguration(statCfgs[0].key(), statCfgs[0].columns().values(), (byte)-1); -// statisticsMgr(0).collectStatistics(statCfgs); + while (!stop.get()) { + a = key.incrementAndGet(); -// client.cache("SMALLnull").query(new SqlFieldsQuery("ANALYZE SMALL")).getAll(); - thin.cache("SMALLnull").query(new SqlFieldsQuery("ANALYZE SMALL")).getAll(); -// thick.cache("SMALLnull").query(new SqlFieldsQuery("REFRESH STATISTICS PUBLIC.SMALL")).getAll(); - thin.cache("SMALLnull").query(new SqlFieldsQuery("REFRESH STATISTICS PUBLIC.SMALL")).getAll(); + if (maxKey > 0 && a > maxKey) + key.set(a = firstKey); - // Initialized, empty statistics. - assertTrue(GridTestUtils.waitForCondition(() -> statisticsMgr(0).getLocalStatistics(SMALL_KEY) != null, getTestTimeout())); - assertTrue(GridTestUtils.waitForCondition(() -> statisticsMgr(1).getLocalStatistics(SMALL_KEY) != null, getTestTimeout())); + crud.accept(a); + } + }); - if (log.isInfoEnabled()) - log.info("Got first, empty statistic of the table " + SMALL_TARGET); + waitForStatsUpdates(initStat1); - ObjectStatisticsImpl emptyStat = (ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY); + ObjectStatisticsImpl updatedStat = (ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY); - assertTrue(emptyStat.rowCount() == 0); + assertTrue(rowCntCmp > 0 ? updatedStat.rowCount() > initStat1.rowCount() : + (rowCntCmp < 0 ? initStat1.rowCount() < updatedStat.rowCount() : initStat1.rowCount() == updatedStat.rowCount())); - GridTestUtils.runAsync(() -> { - AtomicLong key = new AtomicLong(); + // Continuing data loading, the table is being updated. Since the row count is inreasing, we must obtain a + // new statistics, greather than {@code firstNotEmpty}. + waitForStatsUpdates(updatedStat); - if (log.isInfoEnabled()) - log.info("Starting the loading..."); + ObjectStatisticsImpl updatedStat2 = (ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY); - long time = System.nanoTime(); + assertTrue(rowCntCmp > 0 ? updatedStat2.rowCount() > updatedStat.rowCount() : + (rowCntCmp < 0 ? updatedStat2.rowCount() < updatedStat.rowCount() : updatedStat2.rowCount() == updatedStat.rowCount())); + } + finally { + stop.set(true); + } + } - while (!stop.get()) { - sql(String.format("INSERT INTO small(a, b, c) VALUES(%d, %d, %d)", key.incrementAndGet(), key.get(), key.get() % 10)); + /** */ + private void waitForStatsUpdates(ObjectStatisticsImpl compareTo) throws IgniteInterruptedCheckedException { + assertTrue(GridTestUtils.waitForCondition(() -> { + ObjectStatisticsImpl updatedStat = (ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY); - if (U.nanosToMillis(System.nanoTime() - time) > 10_000L) { - time = System.nanoTime(); + if (updatedStat == null) + return false; - if (log.isInfoEnabled()) - log.info("Loaded " + grid(0).cache("SMALLnull").size() + " records."); - } - } + AtomicBoolean passed = new AtomicBoolean(true); + + updatedStat.columnsStatistics().forEach((col, stat) -> { + ColumnStatistics compared = compareTo.columnStatistics(col); + + assert compared != null; - if (log.isInfoEnabled()) - log.info("The loading stopped."); + if (compared.createdAt() >= stat.createdAt()) + passed.set(false); }); -// GridTestUtils.runAsync(() -> { -// for (int i = 0; i < 10; ++i) { -// thin.cache("SMALLnull").query(new SqlFieldsQuery("REFRESH STATISTICS PUBLIC.SMALL")).getAll(); -// -// Thread.sleep(100); -// } -// }); + return passed.get(); + }, getTestTimeout())); + } - // Here we get not empty, updated statistics. - assertTrue(GridTestUtils.waitForCondition(() -> { - ObjectStatisticsImpl updatedStat = (ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY); + /** + * Test statistics refreshing after significant changes of base table: + * 1) Create and populate small table + * 2) Analyze it and get local statistics + * 3) Insert same number of rows into small table + * 4) Check that statistics refreshed and its values changed. + * + * @throws Exception In case of error. + */ + @Test + public void testObsolescence() throws Exception { + startGridsMultiThreaded(1); - return updatedStat != null && updatedStat.rowCount() > emptyStat.rowCount(); - }, getTestTimeout())); + createSmallTable(null); - // First not empty statistics (rowCount > 0). - ObjectStatisticsImpl firstNotEmpty = (ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY); + statisticsMgr(0).collectStatistics(buildDefaultConfigurations(SMALL_TARGET)); - assertTrue(firstNotEmpty.rowCount() > 0); + assertTrue(GridTestUtils.waitForCondition(() -> statisticsMgr(0).getLocalStatistics(SMALL_KEY) != null, TIMEOUT)); - if (log.isInfoEnabled()) - log.info("Got first not empty statistic: " + firstNotEmpty); + ObjectStatisticsImpl stat1 = (ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY); - // FAILS here with a timeout. - // Continuing data loading, the table is being updated. Since the row count is inreasing, we must obtain a - // new statistics, greather than {@code firstNotEmpty}. - assertTrue(GridTestUtils.waitForCondition(() -> { - ObjectStatisticsImpl updatedStat2 = (ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY); + assertNotNull(stat1); - return updatedStat2 != null && updatedStat2.rowCount() > firstNotEmpty.rowCount(); - }, getTestTimeout())); + for (int i = SMALL_SIZE; i < 2 * SMALL_SIZE; i++) + sql(String.format("INSERT INTO small(a, b, c) VALUES(%d, %d, %d)", i, i, i % 10)); - // Ensure the topology hasn't changed and didn't trigger the statistics. - assertEquals(top, grid(0).cluster().topologyVersion()); + statisticsMgr(0).processObsolescence(); - ObjectStatisticsImpl secondNotEmpty = (ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY); + assertTrue(GridTestUtils.waitForCondition(() -> { + ObjectStatisticsImpl stat2 = (ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY); - if (log.isInfoEnabled()) - log.info("Got second not empty statistic: " + secondNotEmpty); - } - finally { - stop.set(true); - } + return stat2 != null && stat2.rowCount() > stat1.rowCount(); + }, TIMEOUT)); + } + + /** + * Test activation with statistics with topology changes. + * + * 1) Start two node cluster. + * 2) Activate cluster. + * 3) Create table and analyze it. + * 4) Inactivate cluster and change it's topology. + * 5) Get obsolescence map size for created table. + * 6) Activate cluster again. + * 7) Check that obsolescence map size changed due to new topology. + * + * @throws Exception In case of errors. + */ + @Test + public void testInactiveLoad() throws Exception { + Ignite ignite = startGrid(0); + Ignite ignite1 = startGrid(1); + + ignite.cluster().state(ClusterState.ACTIVE); + + createSmallTable(null); + collectStatistics(StatisticsType.GLOBAL, "SMALL"); + + ignite.cluster().state(ClusterState.INACTIVE); + + ignite1.close(); + + Map> statObs = GridTestUtils + .getFieldValue(statisticsMgr(0).statisticsRepository(), "statObs"); + + Integer oldSize = statObs.get(SMALL_KEY).size(); + + ignite.cluster().state(ClusterState.ACTIVE); + + assertTrue(GridTestUtils.waitForCondition(() -> statObs.get(SMALL_KEY).size() > oldSize, TIMEOUT)); } /** {@inheritDoc} */ From 4a386a968290704b33b3a744c1f146e1d7feb5a1 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Mon, 22 Jul 2024 14:26:59 +0300 Subject: [PATCH 16/25] + delete --- .../stat/IgniteStatisticsManagerImpl.java | 2 +- .../query/stat/StatisticsAbstractTest.java | 4 +- .../stat/StatisticsObsolescenceTest.java | 74 ++++++++++++------- 3 files changed, 51 insertions(+), 29 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java index a7f75faa2173f..50a55ed8fb4ce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java @@ -60,7 +60,7 @@ public class IgniteStatisticsManagerImpl implements IgniteStatisticsManager { private static final StatisticsUsageState DEFAULT_STATISTICS_USAGE_STATE = ON; /** Interval to check statistics obsolescence in seconds. */ - static final int OBSOLESCENCE_INTERVAL = 20; + static final int OBSOLESCENCE_INTERVAL = 60; /** Logger. */ private final IgniteLogger log; diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsAbstractTest.java index 7edfe2c0fc1c2..a631e99723730 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsAbstractTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsAbstractTest.java @@ -261,7 +261,7 @@ protected List> sql(String sql) { } /** - * Create SQL table with the given index and fill with small amount of data. + * Creates SQL table with the given index and fills with small amount of data. * * @param suffix Table idx, if {@code null} - name "SMALL" without index will be used. * @return Table name. @@ -271,7 +271,7 @@ protected String createSmallTable(String suffix) { } /** - * Create SQL table with the given index and preload size. + * Creates SQL table with the given index and fills with data of the passed amount. * * @param preloadCnt Records cnt to load after creation. * @param suffix Table idx, if {@code null} - name "SMALL" without index will be used. diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java index eb0b43b66ed04..130868f2f2a04 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java @@ -33,34 +33,40 @@ import static org.apache.ignite.internal.processors.query.stat.IgniteStatisticsHelper.buildDefaultConfigurations; import static org.apache.ignite.internal.processors.query.stat.IgniteStatisticsManagerImpl.OBSOLESCENCE_INTERVAL; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; /** * Test for statistics obsolescence. */ public class StatisticsObsolescenceTest extends StatisticsAbstractTest { /** */ - private long testTimeout = super.getTestTimeout(); + private long testTimeout; - /** {@inheritDoc} */ - @Override protected long getTestTimeout() { - return testTimeout; + /** */ + @Test + public void testObsolescenceWithInsert() throws Exception { + doTestObsolescenceUnderLoad(false, 1, + key -> sql(String.format("insert into SMALL(A, B, C) values(%d, %d, %d)", key, key, key))); } /** */ @Test - public void testObsolescenceWithInserting() throws Exception { - doTestObsolescenceUnderLoad(0, 0L, 0L, 1, - key -> sql(String.format("INSERT INTO small(a, b, c) VALUES(%d, %d, %d)", key, key, key))); + public void testObsolescenceWithUpdate() throws Exception { + doTestObsolescenceUnderLoad(true, 0, key -> sql("update SMALL set B=B+1 where A=" + key)); } /** */ @Test - public void testObsolescenceWithUpdating() throws Exception { - doTestObsolescenceUnderLoad(10000, 10000L, 1L, 0, key -> sql("UPDATE small set b=b+1 where a=" + key)); + public void testObsolescenceWithDelete() throws Exception { + doTestObsolescenceUnderLoad(true, -1, key -> sql("delete from SMALL where A=" + key)); } /** */ - private void doTestObsolescenceUnderLoad(int preloadCnt, long maxKey, long firstKey, int rowCntCmp, Consumer crud) throws Exception { + private void doTestObsolescenceUnderLoad(boolean preload, int rowCntCmp, Consumer op) throws Exception { + int preloadCnt = preload ? 1000 : 0; + long opFirstKey = 1L; + long opDelayKeysCnt = 300L; + // Ensured IgniteStatisticsManagerImpl#OBSOLESCENCE_INTERVAL. testTimeout = 3L * OBSOLESCENCE_INTERVAL * 1000L; @@ -75,44 +81,55 @@ private void doTestObsolescenceUnderLoad(int preloadCnt, long maxKey, long first statisticsMgr(0).collectStatistics(buildDefaultConfigurations(SMALL_TARGET)); // Initialized statistics. - assertTrue(GridTestUtils.waitForCondition(() -> statisticsMgr(0).getLocalStatistics(SMALL_KEY) != null, getTestTimeout())); - assertTrue(GridTestUtils.waitForCondition(() -> statisticsMgr(1).getLocalStatistics(SMALL_KEY) != null, getTestTimeout())); + assertTrue(waitForCondition(() -> statisticsMgr(0).getLocalStatistics(SMALL_KEY) != null, getTestTimeout())); + assertTrue(waitForCondition(() -> statisticsMgr(1).getLocalStatistics(SMALL_KEY) != null, getTestTimeout())); ObjectStatisticsImpl initStat1 = (ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY); ObjectStatisticsImpl initStat2 = (ObjectStatisticsImpl)statisticsMgr(1).getLocalStatistics(SMALL_KEY); assertEquals(preloadCnt, initStat1.rowCount() + initStat2.rowCount()); + AtomicBoolean statReadyFlag = new AtomicBoolean(false); + GridTestUtils.runAsync(() -> { - AtomicLong key = new AtomicLong(firstKey); + AtomicLong key = new AtomicLong(opFirstKey); - long a; + long opCnt = 0; while (!stop.get()) { - a = key.incrementAndGet(); + op.accept(key.getAndIncrement()); + + if (++opCnt == opDelayKeysCnt) { + opCnt = 0; - if (maxKey > 0 && a > maxKey) - key.set(a = firstKey); + statReadyFlag.set(true); - crud.accept(a); + assertTrue(waitForCondition(() -> stop.get() || !statReadyFlag.get(), getTestTimeout())); + } } }); + assertTrue(waitForCondition(statReadyFlag::get, getTestTimeout())); + waitForStatsUpdates(initStat1); ObjectStatisticsImpl updatedStat = (ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY); assertTrue(rowCntCmp > 0 ? updatedStat.rowCount() > initStat1.rowCount() : - (rowCntCmp < 0 ? initStat1.rowCount() < updatedStat.rowCount() : initStat1.rowCount() == updatedStat.rowCount())); + (rowCntCmp < 0 ? updatedStat.rowCount() < initStat1.rowCount() : updatedStat.rowCount() == initStat1.rowCount())); + + statReadyFlag.set(false); + + assertTrue(waitForCondition(statReadyFlag::get, getTestTimeout())); // Continuing data loading, the table is being updated. Since the row count is inreasing, we must obtain a // new statistics, greather than {@code firstNotEmpty}. waitForStatsUpdates(updatedStat); - ObjectStatisticsImpl updatedStat2 = (ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY); + ObjectStatisticsImpl finalStat = (ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY); - assertTrue(rowCntCmp > 0 ? updatedStat2.rowCount() > updatedStat.rowCount() : - (rowCntCmp < 0 ? updatedStat2.rowCount() < updatedStat.rowCount() : updatedStat2.rowCount() == updatedStat.rowCount())); + assertTrue(rowCntCmp > 0 ? finalStat.rowCount() > updatedStat.rowCount() : + (rowCntCmp < 0 ? finalStat.rowCount() < updatedStat.rowCount() : finalStat.rowCount() == updatedStat.rowCount())); } finally { stop.set(true); @@ -121,7 +138,7 @@ private void doTestObsolescenceUnderLoad(int preloadCnt, long maxKey, long first /** */ private void waitForStatsUpdates(ObjectStatisticsImpl compareTo) throws IgniteInterruptedCheckedException { - assertTrue(GridTestUtils.waitForCondition(() -> { + assertTrue(waitForCondition(() -> { ObjectStatisticsImpl updatedStat = (ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY); if (updatedStat == null) @@ -159,7 +176,7 @@ public void testObsolescence() throws Exception { statisticsMgr(0).collectStatistics(buildDefaultConfigurations(SMALL_TARGET)); - assertTrue(GridTestUtils.waitForCondition(() -> statisticsMgr(0).getLocalStatistics(SMALL_KEY) != null, TIMEOUT)); + assertTrue(waitForCondition(() -> statisticsMgr(0).getLocalStatistics(SMALL_KEY) != null, TIMEOUT)); ObjectStatisticsImpl stat1 = (ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY); @@ -170,7 +187,7 @@ public void testObsolescence() throws Exception { statisticsMgr(0).processObsolescence(); - assertTrue(GridTestUtils.waitForCondition(() -> { + assertTrue(waitForCondition(() -> { ObjectStatisticsImpl stat2 = (ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY); return stat2 != null && stat2.rowCount() > stat1.rowCount(); @@ -211,7 +228,7 @@ public void testInactiveLoad() throws Exception { ignite.cluster().state(ClusterState.ACTIVE); - assertTrue(GridTestUtils.waitForCondition(() -> statObs.get(SMALL_KEY).size() > oldSize, TIMEOUT)); + assertTrue(waitForCondition(() -> statObs.get(SMALL_KEY).size() > oldSize, TIMEOUT)); } /** {@inheritDoc} */ @@ -228,6 +245,11 @@ public void testInactiveLoad() throws Exception { return cfg; } + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return testTimeout > 0 ? testTimeout : super.getTestTimeout(); + } + /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { stopAllGrids(); From 90b9a98ee8cb99de3362ecfe50a347a8dd1badaa Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Tue, 23 Jul 2024 17:31:04 +0300 Subject: [PATCH 17/25] + master. Test fix --- .../stat/IgniteStatisticsManagerImpl.java | 32 +++++++++---- .../stat/StatisticsObsolescenceTest.java | 47 +++++++++++-------- 2 files changed, 49 insertions(+), 30 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java index 50a55ed8fb4ce..fa3ef3936d14a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -60,7 +61,7 @@ public class IgniteStatisticsManagerImpl implements IgniteStatisticsManager { private static final StatisticsUsageState DEFAULT_STATISTICS_USAGE_STATE = ON; /** Interval to check statistics obsolescence in seconds. */ - static final int OBSOLESCENCE_INTERVAL = 60; + private static final int OBSOLESCENCE_INTERVAL = 60; /** Logger. */ private final IgniteLogger log; @@ -111,7 +112,7 @@ public class IgniteStatisticsManagerImpl implements IgniteStatisticsManager { private volatile boolean started; /** Schedule to process obsolescence statistics. */ - private GridTimeoutProcessor.CancelableTask obsolescenceSchedule; + private final AtomicReference obsolescenceSchedule = new AtomicReference<>(); /** Exchange listener. */ private final PartitionsExchangeAware exchAwareLsnr = new PartitionsExchangeAware() { @@ -236,16 +237,25 @@ else if (db == null) tryStart(); - if (serverNode) { - // Use mgmt pool to work with statistics repository in busy lock to schedule some tasks. - obsolescenceSchedule = ctx.timeout().schedule(() -> { - obsolescenceBusyExecutor.execute(() -> processObsolescence()); - }, OBSOLESCENCE_INTERVAL * 1000, OBSOLESCENCE_INTERVAL * 1000); - } + if (serverNode) + scheduleObsolescence(OBSOLESCENCE_INTERVAL); ctx.cache().context().exchange().registerExchangeAwareComponent(exchAwareLsnr); } + /** */ + public void scheduleObsolescence(int seconds) { + assert seconds > 1; + + obsolescenceSchedule.getAndUpdate(curSchedule -> { + if (curSchedule != null) + curSchedule.close(); + + return ctx.timeout().schedule(() -> obsolescenceBusyExecutor.execute(this::processObsolescence), + seconds * 1000, seconds * 1000); + }); + } + /** * Check all preconditions and stop if started and have reason to stop. */ @@ -384,8 +394,10 @@ public ObjectStatisticsImpl getLocalStatistics(StatisticsKey key, AffinityTopolo @Override public void stop() { stopX(); - if (obsolescenceSchedule != null) - obsolescenceSchedule.close(); + GridTimeoutProcessor.CancelableTask obsolescenceTask = obsolescenceSchedule.getAndSet(null); + + if (obsolescenceTask != null) + obsolescenceTask.close(); if (gatherPool != null) { List unfinishedTasks = gatherPool.shutdownNow(); diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java index 130868f2f2a04..5da941dc2c5e6 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java @@ -22,17 +22,19 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import org.apache.ignite.Ignite; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cluster.ClusterState; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.util.collection.IntMap; +import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.testframework.GridTestUtils; import org.junit.Test; import static org.apache.ignite.internal.processors.query.stat.IgniteStatisticsHelper.buildDefaultConfigurations; -import static org.apache.ignite.internal.processors.query.stat.IgniteStatisticsManagerImpl.OBSOLESCENCE_INTERVAL; import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; /** @@ -63,68 +65,73 @@ public void testObsolescenceWithDelete() throws Exception { /** */ private void doTestObsolescenceUnderLoad(boolean preload, int rowCntCmp, Consumer op) throws Exception { - int preloadCnt = preload ? 1000 : 0; - long opFirstKey = 1L; - long opDelayKeysCnt = 300L; + // Keep enough data to touch every partition. The statistics collection is sensitive to a partition's empty rows num + // and is able to reassemble in this case. This would give false-positive result. + int workingRowsNum = RendezvousAffinityFunction.DFLT_PARTITION_COUNT * 10; + int preloadCnt = preload ? workingRowsNum : 0; - // Ensured IgniteStatisticsManagerImpl#OBSOLESCENCE_INTERVAL. - testTimeout = 3L * OBSOLESCENCE_INTERVAL * 1000L; + int osbInterval = 7; + + testTimeout = osbInterval * 5 * 1000; AtomicBoolean stop = new AtomicBoolean(); try { startGridsMultiThreaded(2); + for (Ignite ig : G.allGrids()) + ((IgniteStatisticsManagerImpl)((IgniteEx)ig).context().query().statsManager()).scheduleObsolescence(osbInterval); + createSmallTable(preloadCnt, null); statisticsMgr(0).usageState(StatisticsUsageState.ON); statisticsMgr(0).collectStatistics(buildDefaultConfigurations(SMALL_TARGET)); // Initialized statistics. - assertTrue(waitForCondition(() -> statisticsMgr(0).getLocalStatistics(SMALL_KEY) != null, getTestTimeout())); - assertTrue(waitForCondition(() -> statisticsMgr(1).getLocalStatistics(SMALL_KEY) != null, getTestTimeout())); + assertTrue(waitForCondition(() -> statisticsMgr(0).getLocalStatistics(SMALL_KEY) != null, osbInterval * 1000)); + assertTrue(waitForCondition(() -> statisticsMgr(1).getLocalStatistics(SMALL_KEY) != null, osbInterval * 1000)); ObjectStatisticsImpl initStat1 = (ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY); ObjectStatisticsImpl initStat2 = (ObjectStatisticsImpl)statisticsMgr(1).getLocalStatistics(SMALL_KEY); assertEquals(preloadCnt, initStat1.rowCount() + initStat2.rowCount()); - AtomicBoolean statReadyFlag = new AtomicBoolean(false); + AtomicBoolean enoughDataChanged = new AtomicBoolean(false); GridTestUtils.runAsync(() -> { - AtomicLong key = new AtomicLong(opFirstKey); + AtomicLong key = new AtomicLong(1L); long opCnt = 0; while (!stop.get()) { op.accept(key.getAndIncrement()); - if (++opCnt == opDelayKeysCnt) { + if (++opCnt == workingRowsNum / 3) { opCnt = 0; - statReadyFlag.set(true); + enoughDataChanged.set(true); - assertTrue(waitForCondition(() -> stop.get() || !statReadyFlag.get(), getTestTimeout())); + assertTrue(waitForCondition(() -> stop.get() || !enoughDataChanged.get(), getTestTimeout())); } } }); - assertTrue(waitForCondition(statReadyFlag::get, getTestTimeout())); + assertTrue(waitForCondition(enoughDataChanged::get, getTestTimeout())); - waitForStatsUpdates(initStat1); + waitForStatsUpdates(initStat1, osbInterval); ObjectStatisticsImpl updatedStat = (ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY); assertTrue(rowCntCmp > 0 ? updatedStat.rowCount() > initStat1.rowCount() : (rowCntCmp < 0 ? updatedStat.rowCount() < initStat1.rowCount() : updatedStat.rowCount() == initStat1.rowCount())); - statReadyFlag.set(false); + enoughDataChanged.set(false); - assertTrue(waitForCondition(statReadyFlag::get, getTestTimeout())); + assertTrue(waitForCondition(enoughDataChanged::get, getTestTimeout())); // Continuing data loading, the table is being updated. Since the row count is inreasing, we must obtain a // new statistics, greather than {@code firstNotEmpty}. - waitForStatsUpdates(updatedStat); + waitForStatsUpdates(updatedStat, osbInterval); ObjectStatisticsImpl finalStat = (ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY); @@ -137,7 +144,7 @@ private void doTestObsolescenceUnderLoad(boolean preload, int rowCntCmp, Consume } /** */ - private void waitForStatsUpdates(ObjectStatisticsImpl compareTo) throws IgniteInterruptedCheckedException { + private void waitForStatsUpdates(ObjectStatisticsImpl compareTo, long timeoutSec) throws IgniteInterruptedCheckedException { assertTrue(waitForCondition(() -> { ObjectStatisticsImpl updatedStat = (ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY); @@ -156,7 +163,7 @@ private void waitForStatsUpdates(ObjectStatisticsImpl compareTo) throws IgniteIn }); return passed.get(); - }, getTestTimeout())); + }, timeoutSec * 1000)); } /** From 155223e6aaf94298ee963150dc146546ee33c834 Mon Sep 17 00:00:00 2001 From: Nikita Amelchev Date: Tue, 23 Jul 2024 20:22:07 +0300 Subject: [PATCH 18/25] review fixes --- .../cache/persistence/snapshot/SnapshotRestoreProcess.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java index c334ff90688d7..c02deccd103e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java @@ -1887,7 +1887,7 @@ private static class SnapshotRestoreContext { private volatile Map cfgs = Collections.emptyMap(); /** Graceful shutdown future. */ - private volatile IgniteFuture stopFut; + private volatile IgniteFuture stopFut; /** Operation start time. */ private final long startTime; From 4117e2e0de98f002b39376c625237742c0defa9c Mon Sep 17 00:00:00 2001 From: Nikita Amelchev Date: Thu, 25 Jul 2024 10:24:28 +0300 Subject: [PATCH 19/25] review fixes --- .../cache/persistence/snapshot/IgniteSnapshotManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java index f9fec415613da..e6732144efe17 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java @@ -3929,7 +3929,7 @@ else if (msg instanceof SnapshotFilesFailureMessage) { @Override public void onException(UUID nodeId, Throwable ex) { RemoteSnapshotFilesRecevier task = active; - if (task == null) + if (task == null || task.isDone()) return; assert task.rmtNodeId.equals(nodeId); From 4e5476d1e9948e6e1993d39ae7c31efcc1f2161e Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Thu, 25 Jul 2024 15:36:48 +0300 Subject: [PATCH 20/25] research --- .../managers/communication/GridIoManager.java | 2 ++ .../snapshot/IgniteSnapshotManager.java | 26 ++++++++++++++----- .../IgniteSnapshotRestoreFromRemoteTest.java | 2 ++ 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 2e721239ceda2..93a3e02fd8ff8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -2795,6 +2795,8 @@ private void interruptReceiver(ReceiverContext rctx, Exception ex) { else U.error(log, "Receiver has been interrupted due to an exception occurred [rctx=" + rctx + ']', ex); + log.error("TEST | interrupt receiver, e:" + ex.getMessage()); + rctx.hnd.onException(rctx.rmtNodeId, ex); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java index e6732144efe17..3563b287253ed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java @@ -159,7 +159,6 @@ import org.apache.ignite.internal.processors.marshaller.MappedName; import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl; import org.apache.ignite.internal.processors.task.GridInternal; -import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; import org.apache.ignite.internal.util.BasicRateLimiter; import org.apache.ignite.internal.util.GridBusyLock; import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; @@ -2538,6 +2537,8 @@ public IgniteInternalFuture requestRemoteSnapshotFiles( RemoteSnapshotFilesRecevier fut = new RemoteSnapshotFilesRecevier(this, rmtNodeId, reqId, snpName, rmtSnpPath, parts, stopChecker, partHnd); + log.error("TEST | requestRemoteSnapshotFiles"); + snpRmtMgr.submit(fut); return fut; @@ -3696,6 +3697,8 @@ public synchronized void acceptFile(File part) { /** {@inheritDoc} */ @Override protected synchronized boolean onDone(@Nullable Void res, @Nullable Throwable err, boolean cancel) { + System.err.println("TEST | receiver onDone, err=" + err + ", res="+res); + U.delete(dir); return super.onDone(res, err, cancel); @@ -3751,11 +3754,15 @@ public void submit(IgniteSnapshotManager.RemoteSnapshotFilesRecevier next) { } if (active != null && !active.isDone()) { + log.error("TEST | queue: " + next.rmtNodeId); + queue.offer(next); return; } + log.error("TEST | active: " + next.rmtNodeId); + active = next; active.listen(this::scheduleNext); @@ -3771,6 +3778,8 @@ private void scheduleNext() { if (next == null) return; + log.error("TEST | scheduleNext"); + submit(next); } @@ -3802,14 +3811,15 @@ public void onNodeLeft(UUID nodeId) { RemoteSnapshotFilesRecevier active0 = active; if (active0 != null && !active0.isDone() && active0.rmtNodeId.equals(nodeId)) { + active0.acceptException(ex); // If the task was started, but the remote node did not start the transmission and did not send SnapshotFilesFailureMessage. // Otherwise, the task will fail via TransmissionHandler#onException. - cctx.kernalContext().timeout().addTimeoutObject( - new GridTimeoutObjectAdapter(2 * cctx.kernalContext().config().getNetworkTimeout()) { - @Override public void onTimeout() { - active0.acceptException(ex); - } - }); +// cctx.kernalContext().timeout().addTimeoutObject( +// new GridTimeoutObjectAdapter(2 * cctx.kernalContext().config().getNetworkTimeout()) { +// @Override public void onTimeout() { +// active0.acceptException(ex); +// } +// }); } } @@ -3927,6 +3937,8 @@ else if (msg instanceof SnapshotFilesFailureMessage) { /** {@inheritDoc} */ @Override public void onException(UUID nodeId, Throwable ex) { + log.error("TEST | onException: nodeId=" + nodeId + ", e: " + ex.getMessage() + ", active: " + active); + RemoteSnapshotFilesRecevier task = active; if (task == null || task.isDone()) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java index cbb5767e9c45c..2235e767d6db7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java @@ -371,6 +371,8 @@ public void testRestoreConnectionLost() throws Exception { restoreStarted.await(TIMEOUT, TimeUnit.MILLISECONDS); + log.error("TEST | stopping node: " + coord.localNode().id()); + coord.close(); nodeStopped.countDown(); From e7aff567b7616e1ac9f67f76de58f17669a1aae9 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Thu, 25 Jul 2024 18:24:17 +0300 Subject: [PATCH 21/25] my --- .../managers/communication/GridIoManager.java | 2 -- .../snapshot/IgniteSnapshotManager.java | 33 +++++-------------- .../snapshot/SnapshotRestoreProcess.java | 7 ++++ 3 files changed, 15 insertions(+), 27 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 93a3e02fd8ff8..2e721239ceda2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -2795,8 +2795,6 @@ private void interruptReceiver(ReceiverContext rctx, Exception ex) { else U.error(log, "Receiver has been interrupted due to an exception occurred [rctx=" + rctx + ']', ex); - log.error("TEST | interrupt receiver, e:" + ex.getMessage()); - rctx.hnd.onException(rctx.rmtNodeId, ex); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java index 3563b287253ed..71853b8654924 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java @@ -2537,8 +2537,6 @@ public IgniteInternalFuture requestRemoteSnapshotFiles( RemoteSnapshotFilesRecevier fut = new RemoteSnapshotFilesRecevier(this, rmtNodeId, reqId, snpName, rmtSnpPath, parts, stopChecker, partHnd); - log.error("TEST | requestRemoteSnapshotFiles"); - snpRmtMgr.submit(fut); return fut; @@ -3697,8 +3695,6 @@ public synchronized void acceptFile(File part) { /** {@inheritDoc} */ @Override protected synchronized boolean onDone(@Nullable Void res, @Nullable Throwable err, boolean cancel) { - System.err.println("TEST | receiver onDone, err=" + err + ", res="+res); - U.delete(dir); return super.onDone(res, err, cancel); @@ -3754,15 +3750,11 @@ public void submit(IgniteSnapshotManager.RemoteSnapshotFilesRecevier next) { } if (active != null && !active.isDone()) { - log.error("TEST | queue: " + next.rmtNodeId); - queue.offer(next); return; } - log.error("TEST | active: " + next.rmtNodeId); - active = next; active.listen(this::scheduleNext); @@ -3778,8 +3770,6 @@ private void scheduleNext() { if (next == null) return; - log.error("TEST | scheduleNext"); - submit(next); } @@ -3803,24 +3793,19 @@ public void onNodeLeft(UUID nodeId) { ClusterTopologyCheckedException ex = new ClusterTopologyCheckedException("The node from which a snapshot has been " + "requested left the grid"); + Collection requiredNodes = restoreCacheGrpProc.nodes(); + + boolean cleanAll = requiredNodes != null && requiredNodes.contains(nodeId); + queue.forEach(r -> { - if (r.rmtNodeId.equals(nodeId)) + if (cleanAll || r.rmtNodeId.equals(nodeId)) r.acceptException(ex); }); RemoteSnapshotFilesRecevier active0 = active; - if (active0 != null && !active0.isDone() && active0.rmtNodeId.equals(nodeId)) { + if (active0 != null && !active0.isDone() && (cleanAll || active0.rmtNodeId.equals(nodeId))) active0.acceptException(ex); - // If the task was started, but the remote node did not start the transmission and did not send SnapshotFilesFailureMessage. - // Otherwise, the task will fail via TransmissionHandler#onException. -// cctx.kernalContext().timeout().addTimeoutObject( -// new GridTimeoutObjectAdapter(2 * cctx.kernalContext().config().getNetworkTimeout()) { -// @Override public void onTimeout() { -// active0.acceptException(ex); -// } -// }); - } } /** {@inheritDoc} */ @@ -3937,16 +3922,14 @@ else if (msg instanceof SnapshotFilesFailureMessage) { /** {@inheritDoc} */ @Override public void onException(UUID nodeId, Throwable ex) { - log.error("TEST | onException: nodeId=" + nodeId + ", e: " + ex.getMessage() + ", active: " + active); - RemoteSnapshotFilesRecevier task = active; if (task == null || task.isDone()) return; - assert task.rmtNodeId.equals(nodeId); - task.acceptException(ex); + + assert task.rmtNodeId.equals(nodeId); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java index c02deccd103e1..269c776c644a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java @@ -1840,6 +1840,13 @@ private String partitionsMapToString(Map> map, Map nodes() { + SnapshotRestoreContext ctx = opCtx; + + return ctx == null ? null : ctx.nodes(); + } + /** * Cache group restore from snapshot operation context. */ From 7678c0883aa11f352daaa24a634cc52710ad4cc7 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Fri, 26 Jul 2024 10:56:37 +0300 Subject: [PATCH 22/25] review fixes --- .../stat/IgniteStatisticsManagerImpl.java | 18 ++++++---- .../stat/StatisticsObsolescenceTest.java | 34 +++++++------------ 2 files changed, 23 insertions(+), 29 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java index fa3ef3936d14a..c56495efe5bce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java @@ -244,16 +244,20 @@ else if (db == null) } /** */ - public void scheduleObsolescence(int seconds) { - assert seconds > 1; + void scheduleObsolescence(int seconds) { + assert seconds >= 1; - obsolescenceSchedule.getAndUpdate(curSchedule -> { - if (curSchedule != null) - curSchedule.close(); + synchronized (obsolescenceSchedule) { + GridTimeoutProcessor.CancelableTask cur = obsolescenceSchedule.get(); - return ctx.timeout().schedule(() -> obsolescenceBusyExecutor.execute(this::processObsolescence), + if (cur != null) + cur.close(); + + cur = ctx.timeout().schedule(() -> obsolescenceBusyExecutor.execute(this::processObsolescence), seconds * 1000, seconds * 1000); - }); + + obsolescenceSchedule.set(cur); + } } /** diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java index 5da941dc2c5e6..40b3af410add6 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.query.stat; import java.util.Map; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; @@ -41,9 +42,6 @@ * Test for statistics obsolescence. */ public class StatisticsObsolescenceTest extends StatisticsAbstractTest { - /** */ - private long testTimeout; - /** */ @Test public void testObsolescenceWithInsert() throws Exception { @@ -72,9 +70,7 @@ private void doTestObsolescenceUnderLoad(boolean preload, int rowCntCmp, Consume int osbInterval = 7; - testTimeout = osbInterval * 5 * 1000; - - AtomicBoolean stop = new AtomicBoolean(); + CyclicBarrier barrier = new CyclicBarrier(2); try { startGridsMultiThreaded(2); @@ -96,42 +92,41 @@ private void doTestObsolescenceUnderLoad(boolean preload, int rowCntCmp, Consume assertEquals(preloadCnt, initStat1.rowCount() + initStat2.rowCount()); - AtomicBoolean enoughDataChanged = new AtomicBoolean(false); - GridTestUtils.runAsync(() -> { AtomicLong key = new AtomicLong(1L); long opCnt = 0; - while (!stop.get()) { + while (!barrier.isBroken()) { op.accept(key.getAndIncrement()); + // Enough updates to trigger the statistics. if (++opCnt == workingRowsNum / 3) { opCnt = 0; - enoughDataChanged.set(true); + barrier.await(); - assertTrue(waitForCondition(() -> stop.get() || !enoughDataChanged.get(), getTestTimeout())); + barrier.await(); } } }); - assertTrue(waitForCondition(enoughDataChanged::get, getTestTimeout())); + barrier.await(); - waitForStatsUpdates(initStat1, osbInterval); + waitForStatsUpdates(initStat1, osbInterval * 2); ObjectStatisticsImpl updatedStat = (ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY); assertTrue(rowCntCmp > 0 ? updatedStat.rowCount() > initStat1.rowCount() : (rowCntCmp < 0 ? updatedStat.rowCount() < initStat1.rowCount() : updatedStat.rowCount() == initStat1.rowCount())); - enoughDataChanged.set(false); + barrier.await(); - assertTrue(waitForCondition(enoughDataChanged::get, getTestTimeout())); + barrier.await(); // Continuing data loading, the table is being updated. Since the row count is inreasing, we must obtain a // new statistics, greather than {@code firstNotEmpty}. - waitForStatsUpdates(updatedStat, osbInterval); + waitForStatsUpdates(updatedStat, osbInterval * 2); ObjectStatisticsImpl finalStat = (ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY); @@ -139,7 +134,7 @@ private void doTestObsolescenceUnderLoad(boolean preload, int rowCntCmp, Consume (rowCntCmp < 0 ? finalStat.rowCount() < updatedStat.rowCount() : finalStat.rowCount() == updatedStat.rowCount())); } finally { - stop.set(true); + barrier.reset(); } } @@ -252,11 +247,6 @@ public void testInactiveLoad() throws Exception { return cfg; } - /** {@inheritDoc} */ - @Override protected long getTestTimeout() { - return testTimeout > 0 ? testTimeout : super.getTestTimeout(); - } - /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { stopAllGrids(); From ac1daf1a2fefe398cdb7407e95e1db2022038a30 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Fri, 26 Jul 2024 10:59:12 +0300 Subject: [PATCH 23/25] minor --- .../processors/query/stat/StatisticsObsolescenceTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java index 40b3af410add6..417f837431770 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java @@ -105,7 +105,6 @@ private void doTestObsolescenceUnderLoad(boolean preload, int rowCntCmp, Consume opCnt = 0; barrier.await(); - barrier.await(); } } @@ -121,7 +120,6 @@ private void doTestObsolescenceUnderLoad(boolean preload, int rowCntCmp, Consume (rowCntCmp < 0 ? updatedStat.rowCount() < initStat1.rowCount() : updatedStat.rowCount() == initStat1.rowCount())); barrier.await(); - barrier.await(); // Continuing data loading, the table is being updated. Since the row count is inreasing, we must obtain a From b0fad1817d988cd19571a540f33fe98fbe3627d3 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Fri, 26 Jul 2024 14:29:00 +0300 Subject: [PATCH 24/25] reverts --- .../snapshot/IgniteSnapshotManager.java | 92 +++++++++++-------- .../snapshot/SnapshotRestoreProcess.java | 9 +- .../IgniteSnapshotRemoteRequestTest.java | 2 +- .../IgniteSnapshotRestoreFromRemoteTest.java | 57 ------------ 4 files changed, 58 insertions(+), 102 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java index 71853b8654924..be0d9afd6c993 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java @@ -164,6 +164,7 @@ import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; import org.apache.ignite.internal.util.distributed.DistributedProcess; import org.apache.ignite.internal.util.distributed.InitMessage; +import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -721,8 +722,6 @@ public static String partDeltaFileName(int partId) { busyLock.block(); try { - snpRmtMgr.stop(); - restoreCacheGrpProc.interrupt(new NodeStoppingException("Node is stopping.")); // Try stop all snapshot processing if not yet. @@ -731,6 +730,8 @@ public static String partDeltaFileName(int partId) { locSnpTasks.clear(); + snpRmtMgr.stop(); + synchronized (snpOpMux) { if (clusterSnpFut != null) { clusterSnpFut.onDone(new NodeStoppingException(SNP_NODE_STOPPING_ERR_MSG)); @@ -3738,33 +3739,33 @@ private class SequentialRemoteSnapshotManager implements TransmissionHandler, Gr /** {@code true} if the node is stopping. */ private boolean stopping; - /** @param next New task for scheduling. */ - public void submit(IgniteSnapshotManager.RemoteSnapshotFilesRecevier next) { + /** + * @param next New task for scheduling. + */ + public synchronized void submit(IgniteSnapshotManager.RemoteSnapshotFilesRecevier next) { assert next != null; - synchronized (this) { - if (stopping) { - next.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG)); + if (stopping) { + next.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG)); - return; - } + return; + } - if (active != null && !active.isDone()) { - queue.offer(next); + RemoteSnapshotFilesRecevier curr = active; - return; - } + if (curr == null || curr.isDone()) { + next.listen(this::scheduleNext); active = next; - active.listen(this::scheduleNext); + next.init(); } - - next.init(); + else + queue.offer(next); } /** Schedule next async receiver. */ - private void scheduleNext() { + private synchronized void scheduleNext() { RemoteSnapshotFilesRecevier next = queue.poll(); if (next == null) @@ -3774,38 +3775,57 @@ private void scheduleNext() { } /** Stopping handler. */ - public void stop() { - synchronized (this) { - stopping = true; - } + public synchronized void stop() { + stopping = true; + + if (active != null) + active.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG)); RemoteSnapshotFilesRecevier r; while ((r = queue.poll()) != null) r.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG)); - if (active != null) - active.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG)); + Set futs = activeTasks(); + GridCompoundFuture stopFut = new GridCompoundFuture<>(); + + try { + for (IgniteInternalFuture fut : futs) + stopFut.add(fut); + + stopFut.markInitialized().get(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } } - /** @param nodeId A node left the cluster. */ + /** + * @param nodeId A node left the cluster. + */ public void onNodeLeft(UUID nodeId) { + Set futs = activeTasks(); ClusterTopologyCheckedException ex = new ClusterTopologyCheckedException("The node from which a snapshot has been " + "requested left the grid"); - Collection requiredNodes = restoreCacheGrpProc.nodes(); - - boolean cleanAll = requiredNodes != null && requiredNodes.contains(nodeId); - - queue.forEach(r -> { - if (cleanAll || r.rmtNodeId.equals(nodeId)) - r.acceptException(ex); + futs.forEach(t -> { + if (t.rmtNodeId.equals(nodeId)) + t.acceptException(ex); }); + } + + /** + * @return The set of currently scheduled tasks, some of them may be already completed. + */ + private Set activeTasks() { + Set futs = new HashSet<>(queue); RemoteSnapshotFilesRecevier active0 = active; - if (active0 != null && !active0.isDone() && (cleanAll || active0.rmtNodeId.equals(nodeId))) - active0.acceptException(ex); + if (active0 != null) + futs.add(active0); + + return futs; } /** {@inheritDoc} */ @@ -3924,12 +3944,12 @@ else if (msg instanceof SnapshotFilesFailureMessage) { @Override public void onException(UUID nodeId, Throwable ex) { RemoteSnapshotFilesRecevier task = active; - if (task == null || task.isDone()) + if (task == null) return; - task.acceptException(ex); - assert task.rmtNodeId.equals(nodeId); + + task.acceptException(ex); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java index 269c776c644a9..16511cb3c4e71 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java @@ -1201,7 +1201,7 @@ private IgniteInternalFuture preload(UUID reqId) { catch (Exception ex) { opCtx0.errHnd.accept(ex); - retFut.onDone(ex); + return new GridFinishedFuture<>(ex); } return retFut; @@ -1840,13 +1840,6 @@ private String partitionsMapToString(Map> map, Map nodes() { - SnapshotRestoreContext ctx = opCtx; - - return ctx == null ? null : ctx.nodes(); - } - /** * Cache group restore from snapshot operation context. */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java index 8be03f0195a18..fcde905bc8b70 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java @@ -267,7 +267,7 @@ public void testSnapshotRequestRemoteSourceNodeLeft() throws Exception { latch.countDown(); assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), ClusterTopologyCheckedException.class, - "Remote node left the grid."); + "he node from which a snapshot has been requested left the grid"); } /** @throws Exception If fails. */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java index 2235e767d6db7..40bd60290e6aa 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java @@ -30,8 +30,6 @@ import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.function.Function; @@ -66,7 +64,6 @@ import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId; import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory; import static org.apache.ignite.testframework.GridTestUtils.assertContains; -import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause; /** */ public class IgniteSnapshotRestoreFromRemoteTest extends IgniteClusterSnapshotRestoreBaseTest { @@ -326,60 +323,6 @@ public void testSnapshotCachesStoppedIfLoadingFailOnRemote() throws Exception { ensureCacheAbsent(dfltCacheCfg); } - /** @throws Exception If failed. */ - @Test - public void testRestoreConnectionLost() throws Exception { - IgniteEx coord = startDedicatedGrids(SECOND_CLUSTER_PREFIX, 2); - - copyAndShuffle(snpParts, G.allGrids()); - - // Start a new node without snapshot working directory. - IgniteEx emptyNode = startDedicatedGrid(SECOND_CLUSTER_PREFIX, 2); - - emptyNode.cluster().state(ClusterState.ACTIVE); - - emptyNode.cache(DEFAULT_CACHE_NAME).destroy(); - - awaitPartitionMapExchange(); - - CountDownLatch restoreStarted = new CountDownLatch(1); - CountDownLatch nodeStopped = new CountDownLatch(1); - - IgniteSnapshotManager mgr = snp(coord); - - mgr.remoteSnapshotSenderFactory(new BiFunction() { - @Override public SnapshotSender apply(String s, UUID uuid) { - return new DelegateSnapshotSender(log, mgr.snapshotExecutorService(), mgr.remoteSnapshotSenderFactory(s, uuid)) { - @Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) { - delegate.sendPart0(part, cacheDirName, pair, length); - - restoreStarted.countDown(); - - try { - nodeStopped.await(TIMEOUT, TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - }; - } - }); - - // Restore all cache groups. - IgniteFuture fut = emptyNode.snapshot().restoreSnapshot(SNAPSHOT_NAME, null); - - restoreStarted.await(TIMEOUT, TimeUnit.MILLISECONDS); - - log.error("TEST | stopping node: " + coord.localNode().id()); - - coord.close(); - - nodeStopped.countDown(); - - assertThrowsWithCause(() -> fut.get(TIMEOUT), IgniteException.class); - } - /** * @param snpParts Snapshot parts. * @param toNodes List of toNodes to copy parts to. From c4693410901d72a791a5269aef277f5ceca459df Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Fri, 26 Jul 2024 17:38:40 +0300 Subject: [PATCH 25/25] review fix --- .../stat/IgniteStatisticsManagerImpl.java | 23 ++++++------------- 1 file changed, 7 insertions(+), 16 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java index c56495efe5bce..1e2ceb3c593e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java @@ -22,7 +22,6 @@ import java.util.List; import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -112,7 +111,7 @@ public class IgniteStatisticsManagerImpl implements IgniteStatisticsManager { private volatile boolean started; /** Schedule to process obsolescence statistics. */ - private final AtomicReference obsolescenceSchedule = new AtomicReference<>(); + private volatile GridTimeoutProcessor.CancelableTask obsolescenceSchedule; /** Exchange listener. */ private final PartitionsExchangeAware exchAwareLsnr = new PartitionsExchangeAware() { @@ -247,17 +246,11 @@ else if (db == null) void scheduleObsolescence(int seconds) { assert seconds >= 1; - synchronized (obsolescenceSchedule) { - GridTimeoutProcessor.CancelableTask cur = obsolescenceSchedule.get(); + if (obsolescenceSchedule != null) + obsolescenceSchedule.close(); - if (cur != null) - cur.close(); - - cur = ctx.timeout().schedule(() -> obsolescenceBusyExecutor.execute(this::processObsolescence), - seconds * 1000, seconds * 1000); - - obsolescenceSchedule.set(cur); - } + obsolescenceSchedule = ctx.timeout().schedule(() -> obsolescenceBusyExecutor.execute(this::processObsolescence), + seconds * 1000, seconds * 1000); } /** @@ -398,10 +391,8 @@ public ObjectStatisticsImpl getLocalStatistics(StatisticsKey key, AffinityTopolo @Override public void stop() { stopX(); - GridTimeoutProcessor.CancelableTask obsolescenceTask = obsolescenceSchedule.getAndSet(null); - - if (obsolescenceTask != null) - obsolescenceTask.close(); + if (obsolescenceSchedule != null) + obsolescenceSchedule.close(); if (gatherPool != null) { List unfinishedTasks = gatherPool.shutdownNow();