diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java index a8126fbce4d9a..1f46ed0d2da72 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java @@ -18,17 +18,18 @@ package org.apache.ignite.internal.processors.query.calcite.exec; import java.util.ArrayList; -import java.util.Collections; +import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.stream.IntStream; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterTopologyException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; -import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; -import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservation; /** */ public abstract class AbstractCacheScan implements Iterable, AutoCloseable { @@ -45,15 +46,35 @@ public abstract class AbstractCacheScan implements Iterable, AutoClose protected final int[] parts; /** */ - protected volatile List reserved; + protected final boolean explicitParts; + + /** */ + private PartitionReservation reservation; + + /** */ + protected volatile List reservedParts; /** */ AbstractCacheScan(ExecutionContext ectx, GridCacheContext cctx, int[] parts) { this.ectx = ectx; this.cctx = cctx; - this.parts = parts; topVer = ectx.topologyVersion(); + + explicitParts = parts != null; + + if (cctx.isReplicated()) + this.parts = IntStream.range(0, cctx.affinity().partitions()).toArray(); + else { + if (parts != null) + this.parts = parts; + else { + Collection primaryParts = cctx.affinity().primaryPartitions( + cctx.kernalContext().localNodeId(), topVer); + + this.parts = primaryParts.stream().mapToInt(Integer::intValue).toArray(); + } + } } /** {@inheritDoc} */ @@ -80,7 +101,7 @@ public abstract class AbstractCacheScan implements Iterable, AutoClose /** */ private synchronized void reserve() { - if (reserved != null) + if (reservation != null) return; GridDhtPartitionTopology top = cctx.topology(); @@ -98,61 +119,42 @@ private synchronized void reserve() { throw new ClusterTopologyException("Topology was changed. Please retry on stable topology."); } - List toReserve; - - if (cctx.isReplicated()) { - int partsCnt = cctx.affinity().partitions(); - - toReserve = new ArrayList<>(partsCnt); - - for (int i = 0; i < partsCnt; i++) - toReserve.add(top.localPartition(i)); - } - else if (cctx.isPartitioned()) { - assert parts != null; + try { + PartitionReservation reservation; - toReserve = new ArrayList<>(parts.length); + try { + reservation = cctx.kernalContext().query().partitionReservationManager().reservePartitions( + cctx, topVer, explicitParts ? parts : null, ectx.originatingNodeId(), "qryId=" + ectx.queryId()); + } + catch (IgniteCheckedException e) { + throw new ClusterTopologyException("Failed to reserve partition for query execution", e); + } - for (int i = 0; i < parts.length; i++) - toReserve.add(top.localPartition(parts[i])); - } - else - toReserve = Collections.emptyList(); + if (reservation.failed()) { + reservation.release(); - List reserved = new ArrayList<>(toReserve.size()); + throw new ClusterTopologyException(reservation.error()); + } - try { - for (GridDhtLocalPartition part : toReserve) { - if (part == null || !part.reserve()) - throw new ClusterTopologyException("Failed to reserve partition for query execution. Retry on stable topology."); - else if (part.state() != GridDhtPartitionState.OWNING) { - part.release(); + this.reservation = reservation; - throw new ClusterTopologyException("Failed to reserve partition for query execution. Retry on stable topology."); - } + List reservedParts = new ArrayList<>(parts.length); - reserved.add(part); - } - } - catch (Exception e) { - release(); + for (int i = 0; i < parts.length; i++) + reservedParts.add(top.localPartition(parts[i])); - throw e; + this.reservedParts = reservedParts; } finally { - this.reserved = reserved; - top.readUnlock(); } } /** */ private synchronized void release() { - if (F.isEmpty(reserved)) - return; - - reserved.forEach(GridDhtLocalPartition::release); + if (reservation != null) + reservation.release(); - reserved = null; + reservation = null; } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java index 2077996aa0909..d61f302b0ab1b 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java @@ -116,7 +116,7 @@ public IndexScan( txChanges = ectx.transactionChanges( cctx.cacheId(), - parts, + cctx.isReplicated() ? null : this.parts, r -> new IndexRowImpl(rowHnd, r), this::compare ); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java index f008f3f6af3c1..c16252b902a0f 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java @@ -74,9 +74,9 @@ private class IteratorImpl extends GridIteratorAdapter { /** */ private IteratorImpl() { - assert reserved != null; + assert reservedParts != null; - parts = new ArrayDeque<>(reserved); + parts = new ArrayDeque<>(reservedParts); txChanges = F.isEmpty(ectx.getQryTxEntries()) ? TransactionChanges.empty() diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java index c5e652a00afe9..af9fadc572363 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java @@ -58,6 +58,13 @@ public class ColocationGroup implements MarshalableMessage { @GridDirectTransient private List> assignments; + /** + * Flag, indacating that assignment is formed by original cache assignment for given topology. + * In case of {@code true} value we can skip assignment marshalling and calc assignment on remote nodes. + */ + @GridDirectTransient + private boolean primaryAssignment; + /** Marshalled assignments. */ private int[] marshalledAssignments; @@ -68,7 +75,7 @@ public static ColocationGroup forNodes(List nodeIds) { /** */ public static ColocationGroup forAssignments(List> assignments) { - return new ColocationGroup(null, null, assignments); + return new ColocationGroup(null, null, assignments, true); } /** */ @@ -100,6 +107,13 @@ private ColocationGroup(long[] sourceIds, List nodeIds, List> a this.assignments = assignments; } + /** */ + private ColocationGroup(long[] sourceIds, List nodeIds, List> assignments, boolean primaryAssignment) { + this(sourceIds, nodeIds, assignments); + + this.primaryAssignment = primaryAssignment; + } + /** * @return Lists of nodes capable to execute a query fragment for what the mapping is calculated. */ @@ -143,10 +157,10 @@ public boolean belongs(long sourceId) { */ public ColocationGroup colocate(ColocationGroup other) throws ColocationMappingException { long[] srcIds; - if (this.sourceIds == null || other.sourceIds == null) - srcIds = U.firstNotNull(this.sourceIds, other.sourceIds); + if (sourceIds == null || other.sourceIds == null) + srcIds = U.firstNotNull(sourceIds, other.sourceIds); else - srcIds = LongStream.concat(Arrays.stream(this.sourceIds), Arrays.stream(other.sourceIds)).distinct().toArray(); + srcIds = LongStream.concat(Arrays.stream(sourceIds), Arrays.stream(other.sourceIds)).distinct().toArray(); List nodeIds; if (this.nodeIds == null || other.nodeIds == null) @@ -159,6 +173,8 @@ public ColocationGroup colocate(ColocationGroup other) throws ColocationMappingE "Replicated query parts are not co-located on all nodes"); } + boolean primaryAssignment = this.primaryAssignment || other.primaryAssignment; + List> assignments; if (this.assignments == null || other.assignments == null) { assignments = U.firstNotNull(this.assignments, other.assignments); @@ -170,11 +186,14 @@ public ColocationGroup colocate(ColocationGroup other) throws ColocationMappingE for (int i = 0; i < assignments.size(); i++) { List assignment = Commons.intersect(filter, assignments.get(i)); - if (assignment.isEmpty()) { // TODO check with partition filters + if (assignment.isEmpty()) { throw new ColocationMappingException("Failed to map fragment to location. " + "Partition mapping is empty [part=" + i + "]"); } + if (!assignment.get(0).equals(assignments.get(i).get(0))) + primaryAssignment = false; + assignments0.add(assignment); } @@ -191,14 +210,20 @@ public ColocationGroup colocate(ColocationGroup other) throws ColocationMappingE if (filter != null) assignment.retainAll(filter); - if (assignment.isEmpty()) // TODO check with partition filters - throw new ColocationMappingException("Failed to map fragment to location. Partition mapping is empty [part=" + i + "]"); + if (assignment.isEmpty()) { + throw new ColocationMappingException("Failed to map fragment to location. " + + "Partition mapping is empty [part=" + i + "]"); + } + + if (!assignment.get(0).equals(this.assignments.get(i).get(0)) + || !assignment.get(0).equals(other.assignments.get(i).get(0))) + primaryAssignment = false; assignments.add(assignment); } } - return new ColocationGroup(srcIds, nodeIds, assignments); + return new ColocationGroup(srcIds, nodeIds, assignments, primaryAssignment); } /** */ @@ -216,7 +241,16 @@ public ColocationGroup finalizeMapping() { assignments.add(first != null ? Collections.singletonList(first) : Collections.emptyList()); } - return new ColocationGroup(sourceIds, new ArrayList<>(nodes), assignments); + return new ColocationGroup(sourceIds, new ArrayList<>(nodes), assignments, primaryAssignment); + } + + /** */ + public ColocationGroup explicitMapping() { + if (assignments == null || !primaryAssignment) + return this; + + // Make a shallow copy without cacheAssignment flag. + return new ColocationGroup(sourceIds, nodeIds, assignments, false); } /** */ @@ -359,7 +393,7 @@ public int[] partitions(UUID nodeId) { /** {@inheritDoc} */ @Override public void prepareMarshal(GridCacheSharedContext ctx) { - if (assignments != null && marshalledAssignments == null) { + if (assignments != null && marshalledAssignments == null && !primaryAssignment) { Map nodeIdxs = new HashMap<>(); for (int i = 0; i < nodeIds.size(); i++) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentDescription.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentDescription.java index dfef48e33469a..f1dc050aa8225 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentDescription.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentDescription.java @@ -192,8 +192,11 @@ public FragmentMapping mapping() { if (mapping != null) mapping.prepareMarshal(ctx); - if (target != null) + if (target != null) { + target = target.explicitMapping(); + target.prepareMarshal(ctx); + } if (remoteSources0 == null && remoteSources != null) { remoteSources0 = U.newHashMap(remoteSources.size()); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java index 21faaec82c9fe..04637c3a728a3 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java @@ -567,18 +567,25 @@ private ColocationGroup replicatedGroup(@NotNull AffinityTopologyVersion topVer) List nodes = cctx.discovery().discoCache(topVer).cacheGroupAffinityNodes(cctx.groupId()); List nodes0; - if (!top.rebalanceFinished(topVer)) { - nodes0 = new ArrayList<>(nodes.size()); + top.readLock(); - int parts = top.partitions(); + try { + if (!top.rebalanceFinished(topVer)) { + nodes0 = new ArrayList<>(nodes.size()); + + int parts = top.partitions(); - for (ClusterNode node : nodes) { - if (isOwner(node.id(), top, parts)) - nodes0.add(node.id()); + for (ClusterNode node : nodes) { + if (isOwner(node.id(), top, parts)) + nodes0.add(node.id()); + } } + else + nodes0 = Commons.transform(nodes, ClusterNode::id); + } + finally { + top.readUnlock(); } - else - nodes0 = Commons.transform(nodes, ClusterNode::id); return ColocationGroup.forNodes(nodes0); } diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservation.java similarity index 96% rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservation.java rename to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservation.java index b5593e58d8eae..ae21192819dfb 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservation.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservation.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.ignite.internal.processors.query.h2.twostep; +package org.apache.ignite.internal.processors.cache.distributed.dht.topology; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservationKey.java similarity index 96% rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationKey.java rename to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservationKey.java index 0fad2c4dd0926..60911e7ac8472 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationKey.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservationKey.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.ignite.internal.processors.query.h2.twostep; +package org.apache.ignite.internal.processors.cache.distributed.dht.topology; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.util.typedef.F; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservationManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservationManager.java new file mode 100644 index 0000000000000..fa44fc6a10a76 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservationManager.java @@ -0,0 +1,443 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.topology; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.failure.FailureContext; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheInvalidStateException; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware; +import org.apache.ignite.internal.processors.tracing.MTC; +import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; +import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.F; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; +import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING; +import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_PARTITIONS_RESERVE; + +/** + * Class responsible for partition reservation for queries executed on local node. Prevents partitions from being + * evicted from node during query execution. + */ +public class PartitionReservationManager implements PartitionsExchangeAware { + /** Special instance of reservable object for REPLICATED caches. */ + private static final ReplicatedReservable REPLICATED_RESERVABLE = new ReplicatedReservable(); + + /** Kernal context. */ + private final GridKernalContext ctx; + + /** + * Group reservations cache. When affinity version is not changed and all primary partitions must be reserved we get + * group reservation from this map instead of create new reservation group. + */ + private final ConcurrentMap reservations = new ConcurrentHashMap<>(); + + /** Logger. */ + private final IgniteLogger log; + + /** + * Constructor. + * + * @param ctx Context. + */ + public PartitionReservationManager(GridKernalContext ctx) { + this.ctx = ctx; + + log = ctx.log(PartitionReservationManager.class); + + ctx.cache().context().exchange().registerExchangeAwareComponent(this); + } + + /** + * @param top Partition topology. + * @param partId Partition ID. + * @return Partition. + */ + private static GridDhtLocalPartition partition(GridDhtPartitionTopology top, int partId) { + return top.localPartition(partId, NONE, false); + } + + /** + * @param cacheIds Cache IDs. + * @param reqTopVer Topology version from request. + * @param explicitParts Explicit partitions list. + * @param nodeId Node ID. + * @param reqId Request ID. + * @return PartitionReservation instance with reservation result. + * @throws IgniteCheckedException If failed. + */ + public PartitionReservation reservePartitions( + @Nullable List cacheIds, + AffinityTopologyVersion reqTopVer, + int[] explicitParts, + UUID nodeId, + long reqId + ) throws IgniteCheckedException { + try (TraceSurroundings ignored = MTC.support(ctx.tracing().create(SQL_PARTITIONS_RESERVE, MTC.span()))) { + assert reqTopVer != null; + + AffinityTopologyVersion topVer = ctx.cache().context().exchange().lastAffinityChangedTopologyVersion(reqTopVer); + + if (F.isEmpty(cacheIds)) + return new PartitionReservation(Collections.emptyList()); + + Collection partIds = partsToCollection(explicitParts); + + List reserved = new ArrayList<>(); + + for (int i = 0; i < cacheIds.size(); i++) { + GridCacheContext cctx = ctx.cache().context().cacheContext(cacheIds.get(i)); + + // Cache was not found, probably was not deployed yet. + if (cctx == null) { + return new PartitionReservation(reserved, + String.format("Failed to reserve partitions for query (cache is not " + + "found on local node) [localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s]", + ctx.localNodeId(), nodeId, reqId, topVer, cacheIds.get(i))); + } + + if (!cctx.rebalanceEnabled()) + continue; + + String err = reservePartitions(reserved, cctx, partIds, topVer, nodeId, "reqId=" + reqId); + + if (err != null) + return new PartitionReservation(reserved, err); + } + + return new PartitionReservation(reserved); + } + } + + /** + * @param cctx Cache context. + * @param reqTopVer Topology version from request. + * @param explicitParts Explicit partitions list. + * @param nodeId Node ID. + * @param qryInfo Query info. + * @return PartitionReservation instance with reservation result. + * @throws IgniteCheckedException If failed. + */ + public PartitionReservation reservePartitions( + GridCacheContext cctx, + AffinityTopologyVersion reqTopVer, + int[] explicitParts, + UUID nodeId, + String qryInfo + ) throws IgniteCheckedException { + try (TraceSurroundings ignored = MTC.support(ctx.tracing().create(SQL_PARTITIONS_RESERVE, MTC.span()))) { + assert reqTopVer != null; + + AffinityTopologyVersion topVer = ctx.cache().context().exchange().lastAffinityChangedTopologyVersion(reqTopVer); + + Collection partIds = partsToCollection(explicitParts); + + List reserved = new ArrayList<>(); + + String err = reservePartitions(reserved, cctx, partIds, topVer, nodeId, qryInfo); + + return new PartitionReservation(reserved, err); + } + } + + /** + * @return Error message or {@code null}. + */ + private @Nullable String reservePartitions( + List reserved, + GridCacheContext cctx, + @Nullable Collection explicitParts, + AffinityTopologyVersion topVer, + UUID nodeId, + String qryInfo + ) throws IgniteCheckedException { + // For replicated cache topology version does not make sense. + PartitionReservationKey grpKey = new PartitionReservationKey(cctx.name(), cctx.isReplicated() ? null : topVer); + + GridReservable r = reservations.get(grpKey); + + if (explicitParts == null && r != null) // Try to reserve group partition if any and no explicits. + return groupPartitionReservation(reserved, r, cctx, topVer, nodeId, qryInfo); + else { // Try to reserve partitions one by one. + int partsCnt = cctx.affinity().partitions(); + + if (cctx.isReplicated()) { // Check all the partitions are in owning state for replicated cache. + if (r == null) { // Check only once. + GridDhtPartitionTopology top = cctx.topology(); + + top.readLock(); + + try { + for (int p = 0; p < partsCnt; p++) { + GridDhtLocalPartition part = partition(top, p); + + // We don't need to reserve partitions because they will not be evicted in replicated caches. + GridDhtPartitionState partState = part != null ? part.state() : null; + + if (partState != OWNING) { + return String.format("Failed to reserve partitions for " + + "query (partition of REPLICATED cache is not in OWNING state) [" + + "localNodeId=%s, rmtNodeId=%s, %s, affTopVer=%s, cacheId=%s, " + + "cacheName=%s, part=%s, partFound=%s, partState=%s]", + ctx.localNodeId(), + nodeId, + qryInfo, + topVer, + cctx.cacheId(), + cctx.name(), + p, + (part != null), + partState + ); + } + } + } + finally { + top.readUnlock(); + } + + // Mark that we checked this replicated cache. + reservations.putIfAbsent(grpKey, REPLICATED_RESERVABLE); + + MTC.span().addLog(() -> "Cache partitions were reserved [cache=" + cctx.name() + + ", partitions=[0.." + partsCnt + ']'); + } + } + else { // Reserve primary partitions for partitioned cache (if no explicit given). + Collection partIds = explicitParts != null ? explicitParts + : cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer); + + int reservedCnt = 0; + + GridDhtPartitionTopology top = cctx.topology(); + + top.readLock(); + + try { + for (int partId : partIds) { + GridDhtLocalPartition part = partition(top, partId); + + GridDhtPartitionState partState = part != null ? part.state() : null; + + if (partState != OWNING) { + if (partState == LOST) { + reserved.forEach(GridReservable::release); + + failQueryOnLostData(cctx, part); + } + else { + return String.format("Failed to reserve partitions " + + "for query (partition of PARTITIONED cache is not found or not in OWNING " + + "state) [localNodeId=%s, rmtNodeId=%s, %s, affTopVer=%s, cacheId=%s, " + + "cacheName=%s, part=%s, partFound=%s, partState=%s]", + ctx.localNodeId(), + nodeId, + qryInfo, + topVer, + cctx.cacheId(), + cctx.name(), + partId, + (part != null), + partState + ); + } + } + + if (!part.reserve()) { + return String.format("Failed to reserve partitions for query " + + "(partition of PARTITIONED cache cannot be reserved) [" + + "localNodeId=%s, rmtNodeId=%s, %s, affTopVer=%s, cacheId=%s, " + + "cacheName=%s, part=%s, partFound=%s, partState=%s]", + ctx.localNodeId(), + nodeId, + qryInfo, + topVer, + cctx.cacheId(), + cctx.name(), + partId, + true, + partState + ); + } + + reserved.add(part); + + reservedCnt++; + } + } + finally { + top.readUnlock(); + } + + MTC.span().addLog(() -> "Cache partitions were reserved [cache=" + cctx.name() + + ", partitions=" + partIds + ", topology=" + topVer + ']'); + + if (explicitParts == null && reservedCnt > 0) { + // We reserved all the primary partitions for cache, attempt to add group reservation. + GridDhtPartitionsReservation grp = new GridDhtPartitionsReservation(topVer, cctx, "SQL"); + + synchronized (this) { + // Double check under lock. + GridReservable grpReservation = reservations.get(grpKey); + + if (grpReservation != null) + return groupPartitionReservation(reserved, grpReservation, cctx, topVer, nodeId, qryInfo); + else { + if (grp.register(reserved.subList(reserved.size() - reservedCnt, reserved.size()))) { + reservations.put(grpKey, grp); + + grp.onPublish(new CI1<>() { + @Override public void apply(GridDhtPartitionsReservation r) { + reservations.remove(grpKey, r); + } + }); + } + } + } + } + } + } + + return null; + } + + /** + * @param cacheName Cache name. + */ + public void onCacheStop(String cacheName) { + // Drop group reservations. + for (PartitionReservationKey grpKey : reservations.keySet()) { + if (F.eq(grpKey.cacheName(), cacheName)) + reservations.remove(grpKey); + } + } + + /** + * @param cctx Cache context. + * @param part Partition. + */ + private static void failQueryOnLostData( + GridCacheContext cctx, + GridDhtLocalPartition part + ) throws IgniteCheckedException { + throw new CacheInvalidStateException("Failed to execute query because cache partition has been " + + "lost [cacheName=" + cctx.name() + ", part=" + part + ']'); + } + + /** + * Cleanup group reservations cache on change affinity version. + */ + @Override public void onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) { + try { + // Must not do anything at the exchange thread. Dispatch to the management thread pool. + ctx.closure().runLocal( + new GridPlainRunnable() { + @Override public void run() { + AffinityTopologyVersion topVer = ctx.cache().context().exchange() + .lastAffinityChangedTopologyVersion(fut.topologyVersion()); + + reservations.forEach((key, r) -> { + if (r != REPLICATED_RESERVABLE && !F.eq(key.topologyVersion(), topVer)) { + assert r instanceof GridDhtPartitionsReservation; + + ((GridDhtPartitionsReservation)r).invalidate(); + } + }); + } + }, + GridIoPolicy.MANAGEMENT_POOL); + } + catch (Throwable e) { + log.error("Unexpected exception on start reservations cleanup."); + ctx.failure().process(new FailureContext(CRITICAL_ERROR, e)); + } + } + + /** */ + private static Collection partsToCollection(int[] explicitParts) { + if (explicitParts == null) + return null; + else if (explicitParts.length == 0) + return Collections.emptyList(); + else { + List partIds = new ArrayList<>(explicitParts.length); + + for (int explicitPart : explicitParts) + partIds.add(explicitPart); + + return partIds; + } + } + + /** */ + private String groupPartitionReservation( + List reserved, + GridReservable grpReservation, + GridCacheContext cctx, + AffinityTopologyVersion topVer, + UUID nodeId, + String qryInfo + ) { + if (grpReservation != REPLICATED_RESERVABLE) { + if (!grpReservation.reserve()) { + return String.format("Failed to reserve partitions for query (group " + + "reservation failed) [localNodeId=%s, rmtNodeId=%s, %s, affTopVer=%s, cacheId=%s, " + + "cacheName=%s]", ctx.localNodeId(), nodeId, qryInfo, topVer, cctx.cacheId(), cctx.name()); + } + + reserved.add(grpReservation); + + MTC.span().addLog(() -> "Cache partitions were reserved " + grpReservation); + } + + return null; + } + + /** + * Mapper fake reservation object for replicated caches. + */ + private static class ReplicatedReservable implements GridReservable { + /** {@inheritDoc} */ + @Override public boolean reserve() { + throw new IllegalStateException(); + } + + /** {@inheritDoc} */ + @Override public void release() { + throw new IllegalStateException(); + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 54f81b4de7f1f..254e53e81d21b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -90,6 +90,7 @@ import org.apache.ignite.internal.processors.cache.QueryCursorImpl; import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservationManager; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture; @@ -308,6 +309,9 @@ public class GridQueryProcessor extends GridProcessorAdapter { /** Global schema SQL views manager. */ private final SchemaSqlViewManager schemaSqlViewMgr; + /** Partition reservation manager. */ + private final PartitionReservationManager partReservationMgr; + /** @see TransactionConfiguration#isTxAwareQueriesEnabled() */ private final boolean txAwareQueriesEnabled; @@ -331,6 +335,8 @@ public GridQueryProcessor(GridKernalContext ctx) throws IgniteCheckedException { schemaSqlViewMgr = new SchemaSqlViewManager(ctx); + partReservationMgr = new PartitionReservationManager(ctx); + idxProc = ctx.indexProcessor(); idxQryPrc = new IndexQueryProcessor(idxProc); @@ -1060,6 +1066,11 @@ public RunningQueryManager runningQueryManager() throws IgniteException { return runningQryMgr; } + /** Partition reservation manager. */ + public PartitionReservationManager partitionReservationManager() { + return partReservationMgr; + } + /** * Create type descriptors from schema and initialize indexing for given cache.

* Use with {@link #busyLock} where appropriate. @@ -1327,6 +1338,8 @@ public void onClientCacheStop(GridCacheContextInfo cacheInfo) { try { if (schemaMgr.clearCacheContext(cacheInfo.cacheContext())) { + partReservationMgr.onCacheStop(cacheInfo.name()); + if (idx != null) idx.unregisterCache(cacheInfo); } @@ -2444,6 +2457,8 @@ public void onCacheStop0(GridCacheContextInfo cacheInfo, boolean destroy, boolea schemaMgr.onCacheStopped(cacheName, destroy, clearIdx); + partReservationMgr.onCacheStop(cacheName); + // Notify indexing. if (idx != null) idx.unregisterCache(cacheInfo); diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 5f5c79cff7c2a..30bff62421e40 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -65,6 +65,7 @@ import org.apache.ignite.internal.processors.cache.QueryCursorImpl; import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteClusterReadOnlyException; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservationManager; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; @@ -97,7 +98,6 @@ import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement; import org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor; import org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor; -import org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservationManager; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest; @@ -1559,7 +1559,7 @@ public GridReduceQueryExecutor reduceQueryExecutor() { this.ctx = ctx; - partReservationMgr = new PartitionReservationManager(ctx); + partReservationMgr = ctx.query().partitionReservationManager(); connMgr = new ConnectionManager(ctx); @@ -1833,10 +1833,6 @@ private JavaObjectSerializer h2Serializer() { /** {@inheritDoc} */ @Override public void unregisterCache(GridCacheContextInfo cacheInfo) { - String cacheName = cacheInfo.name(); - - partReservationMgr.onCacheStop(cacheName); - // Unregister connection. connMgr.onCacheDestroyed(); diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContext.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContext.java index 8930a89fd8cda..1b0d4511f0ffa 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContext.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContext.java @@ -18,8 +18,8 @@ package org.apache.ignite.internal.processors.query.h2.opt; import java.util.Objects; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservation; import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinContext; -import org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservation; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.spi.indexing.IndexingQueryFilter; import org.jetbrains.annotations.Nullable; diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index 4bd579a7eb4fe..a3b5d3a9c9003 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -49,6 +49,7 @@ import org.apache.ignite.internal.metric.IoStatisticsQueryHelper; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservation; import org.apache.ignite.internal.processors.cache.query.CacheQueryType; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationManager.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationManager.java deleted file mode 100644 index d092c3cd2d811..0000000000000 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationManager.java +++ /dev/null @@ -1,375 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.query.h2.twostep; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.failure.FailureContext; -import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.managers.communication.GridIoPolicy; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheInvalidStateException; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware; -import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; -import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; -import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionsReservation; -import org.apache.ignite.internal.processors.tracing.MTC; -import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings; -import org.apache.ignite.internal.util.lang.GridPlainRunnable; -import org.apache.ignite.internal.util.typedef.CI1; -import org.apache.ignite.internal.util.typedef.F; -import org.jetbrains.annotations.Nullable; - -import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; -import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE; -import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST; -import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING; -import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_PARTITIONS_RESERVE; - -/** - * Class responsible for partition reservation for queries executed on local node. Prevents partitions from being - * evicted from node during query execution. - */ -public class PartitionReservationManager implements PartitionsExchangeAware { - /** Special instance of reservable object for REPLICATED caches. */ - private static final ReplicatedReservable REPLICATED_RESERVABLE = new ReplicatedReservable(); - - /** Kernal context. */ - private final GridKernalContext ctx; - - /** - * Group reservations cache. When affinity version is not changed and all primary partitions must be reserved we get - * group reservation from this map instead of create new reservation group. - */ - private final ConcurrentMap reservations = new ConcurrentHashMap<>(); - - /** Logger. */ - private final IgniteLogger log; - - /** - * Constructor. - * - * @param ctx Context. - */ - public PartitionReservationManager(GridKernalContext ctx) { - this.ctx = ctx; - - log = ctx.log(PartitionReservationManager.class); - - ctx.cache().context().exchange().registerExchangeAwareComponent(this); - } - - /** - * @param cctx Cache context. - * @param p Partition ID. - * @return Partition. - */ - private static GridDhtLocalPartition partition(GridCacheContext cctx, int p) { - return cctx.topology().localPartition(p, NONE, false); - } - - /** - * @param cacheIds Cache IDs. - * @param reqTopVer Topology version from request. - * @param explicitParts Explicit partitions list. - * @param nodeId Node ID. - * @param reqId Request ID. - * @return String which is null in case of success or with causeMessage if failed - * @throws IgniteCheckedException If failed. - */ - public PartitionReservation reservePartitions( - @Nullable List cacheIds, - AffinityTopologyVersion reqTopVer, - final int[] explicitParts, - UUID nodeId, - long reqId - ) throws IgniteCheckedException { - try (TraceSurroundings ignored = MTC.support(ctx.tracing().create(SQL_PARTITIONS_RESERVE, MTC.span()))) { - assert reqTopVer != null; - - AffinityTopologyVersion topVer = ctx.cache().context().exchange().lastAffinityChangedTopologyVersion(reqTopVer); - - if (F.isEmpty(cacheIds)) - return new PartitionReservation(Collections.emptyList()); - - Collection partIds; - - if (explicitParts == null) - partIds = null; - else if (explicitParts.length == 0) - partIds = Collections.emptyList(); - else { - partIds = new ArrayList<>(explicitParts.length); - - for (int explicitPart : explicitParts) - partIds.add(explicitPart); - } - - List reserved = new ArrayList<>(); - - for (int i = 0; i < cacheIds.size(); i++) { - GridCacheContext cctx = ctx.cache().context().cacheContext(cacheIds.get(i)); - - // Cache was not found, probably was not deployed yet. - if (cctx == null) { - return new PartitionReservation(reserved, - String.format("Failed to reserve partitions for query (cache is not " + - "found on local node) [localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s]", - ctx.localNodeId(), nodeId, reqId, topVer, cacheIds.get(i))); - } - - if (!cctx.rebalanceEnabled()) - continue; - - // For replicated cache topology version does not make sense. - final PartitionReservationKey grpKey = new PartitionReservationKey(cctx.name(), cctx.isReplicated() ? null : topVer); - - GridReservable r = reservations.get(grpKey); - - if (explicitParts == null && r != null) { // Try to reserve group partition if any and no explicits. - if (r != REPLICATED_RESERVABLE) { - if (!r.reserve()) - return new PartitionReservation(reserved, - String.format("Failed to reserve partitions for query (group " + - "reservation failed) [localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, " + - "cacheName=%s]", ctx.localNodeId(), nodeId, reqId, topVer, cacheIds.get(i), cctx.name())); - - reserved.add(r); - - MTC.span().addLog(() -> "Cache partitions were reserved " + r); - } - } - else { // Try to reserve partitions one by one. - int partsCnt = cctx.affinity().partitions(); - - if (cctx.isReplicated()) { // Check all the partitions are in owning state for replicated cache. - if (r == null) { // Check only once. - for (int p = 0; p < partsCnt; p++) { - GridDhtLocalPartition part = partition(cctx, p); - - // We don't need to reserve partitions because they will not be evicted in replicated caches. - GridDhtPartitionState partState = part != null ? part.state() : null; - - if (partState != OWNING) - return new PartitionReservation(reserved, - String.format("Failed to reserve partitions for " + - "query (partition of REPLICATED cache is not in OWNING state) [" + - "localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, " + - "cacheName=%s, part=%s, partFound=%s, partState=%s]", - ctx.localNodeId(), - nodeId, - reqId, - topVer, - cacheIds.get(i), - cctx.name(), - p, - (part != null), - partState - )); - } - - // Mark that we checked this replicated cache. - reservations.putIfAbsent(grpKey, REPLICATED_RESERVABLE); - - MTC.span().addLog(() -> "Cache partitions were reserved [cache=" + cctx.name() + - ", partitions=[0.." + partsCnt + ']'); - } - } - else { // Reserve primary partitions for partitioned cache (if no explicit given). - if (explicitParts == null) - partIds = cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer); - - int reservedCnt = 0; - - for (int partId : partIds) { - GridDhtLocalPartition part = partition(cctx, partId); - - GridDhtPartitionState partState = part != null ? part.state() : null; - - if (partState != OWNING) { - if (partState == LOST) - failQueryOnLostData(cctx, part); - else { - return new PartitionReservation(reserved, - String.format("Failed to reserve partitions " + - "for query (partition of PARTITIONED cache is not found or not in OWNING " + - "state) [localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, " + - "cacheName=%s, part=%s, partFound=%s, partState=%s]", - ctx.localNodeId(), - nodeId, - reqId, - topVer, - cacheIds.get(i), - cctx.name(), - partId, - (part != null), - partState - )); - } - } - - if (!part.reserve()) { - return new PartitionReservation(reserved, - String.format("Failed to reserve partitions for query " + - "(partition of PARTITIONED cache cannot be reserved) [" + - "localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, " + - "cacheName=%s, part=%s, partFound=%s, partState=%s]", - ctx.localNodeId(), - nodeId, - reqId, - topVer, - cacheIds.get(i), - cctx.name(), - partId, - true, - partState - )); - } - - reserved.add(part); - - reservedCnt++; - - // Double check that we are still in owning state and partition contents are not cleared. - partState = part.state(); - - if (partState != OWNING) { - if (partState == LOST) - failQueryOnLostData(cctx, part); - else { - return new PartitionReservation(reserved, - String.format("Failed to reserve partitions for " + - "query (partition of PARTITIONED cache is not in OWNING state after " + - "reservation) [localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, " + - "cacheId=%s, cacheName=%s, part=%s, partState=%s]", - ctx.localNodeId(), - nodeId, - reqId, - topVer, - cacheIds.get(i), - cctx.name(), - partId, - partState - )); - } - } - } - - final Collection finalPartIds = partIds; - - MTC.span().addLog(() -> "Cache partitions were reserved [cache=" + cctx.name() + - ", partitions=" + finalPartIds + ", topology=" + topVer + ']'); - - if (explicitParts == null && reservedCnt > 0) { - // We reserved all the primary partitions for cache, attempt to add group reservation. - GridDhtPartitionsReservation grp = new GridDhtPartitionsReservation(topVer, cctx, "SQL"); - - if (grp.register(reserved.subList(reserved.size() - reservedCnt, reserved.size()))) { - if (reservations.putIfAbsent(grpKey, grp) != null) - throw new IllegalStateException("Reservation already exists."); - - grp.onPublish(new CI1() { - @Override public void apply(GridDhtPartitionsReservation r) { - reservations.remove(grpKey, r); - } - }); - } - } - } - } - } - - return new PartitionReservation(reserved); - } - } - - /** - * @param cacheName Cache name. - */ - public void onCacheStop(String cacheName) { - // Drop group reservations. - for (PartitionReservationKey grpKey : reservations.keySet()) { - if (F.eq(grpKey.cacheName(), cacheName)) - reservations.remove(grpKey); - } - } - - /** - * @param cctx Cache context. - * @param part Partition. - */ - private static void failQueryOnLostData(GridCacheContext cctx, GridDhtLocalPartition part) - throws IgniteCheckedException { - throw new CacheInvalidStateException("Failed to execute query because cache partition has been " + - "lost [cacheName=" + cctx.name() + ", part=" + part + ']'); - } - - /** - * Cleanup group reservations cache on change affinity version. - */ - @Override public void onDoneAfterTopologyUnlock(final GridDhtPartitionsExchangeFuture fut) { - try { - // Must not do anything at the exchange thread. Dispatch to the management thread pool. - ctx.closure().runLocal( - new GridPlainRunnable() { - @Override public void run() { - AffinityTopologyVersion topVer = ctx.cache().context().exchange() - .lastAffinityChangedTopologyVersion(fut.topologyVersion()); - - reservations.forEach((key, r) -> { - if (r != REPLICATED_RESERVABLE && !F.eq(key.topologyVersion(), topVer)) { - assert r instanceof GridDhtPartitionsReservation; - - ((GridDhtPartitionsReservation)r).invalidate(); - } - }); - } - }, - GridIoPolicy.MANAGEMENT_POOL); - } - catch (Throwable e) { - log.error("Unexpected exception on start reservations cleanup."); - ctx.failure().process(new FailureContext(CRITICAL_ERROR, e)); - } - } - - /** - * Mapper fake reservation object for replicated caches. - */ - private static class ReplicatedReservable implements GridReservable { - /** {@inheritDoc} */ - @Override public boolean reserve() { - throw new IllegalStateException(); - } - - /** {@inheritDoc} */ - @Override public void release() { - throw new IllegalStateException(); - } - } -} diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java index 0c30057fd8f54..70847fabff366 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java @@ -69,11 +69,11 @@ import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservation; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservationManager; import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; import org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor; -import org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservation; -import org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservationManager; import org.apache.ignite.internal.processors.query.h2.twostep.ReducePartitionMapResult; import org.apache.ignite.internal.processors.query.h2.twostep.ReducePartitionMapper; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest; diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java index 0f5491934e9f1..7b3c83e5bc949 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java @@ -38,6 +38,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservationKey; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservationManager; import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest; import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx; import org.apache.ignite.internal.processors.query.GridQueryProcessor;