Skip to content

Commit

Permalink
Merge branch 'master' into IGNITE-15083-5
Browse files Browse the repository at this point in the history
  • Loading branch information
nizhikov committed Dec 27, 2024
2 parents 9e5fe36 + a373291 commit a885623
Show file tree
Hide file tree
Showing 16 changed files with 582 additions and 454 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@
package org.apache.ignite.internal.processors.query.calcite.exec;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.stream.IntStream;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservation;

/** */
public abstract class AbstractCacheScan<Row> implements Iterable<Row>, AutoCloseable {
Expand All @@ -45,15 +46,35 @@ public abstract class AbstractCacheScan<Row> implements Iterable<Row>, AutoClose
protected final int[] parts;

/** */
protected volatile List<GridDhtLocalPartition> reserved;
protected final boolean explicitParts;

/** */
private PartitionReservation reservation;

/** */
protected volatile List<GridDhtLocalPartition> reservedParts;

/** */
AbstractCacheScan(ExecutionContext<Row> ectx, GridCacheContext<?, ?> cctx, int[] parts) {
this.ectx = ectx;
this.cctx = cctx;
this.parts = parts;

topVer = ectx.topologyVersion();

explicitParts = parts != null;

if (cctx.isReplicated())
this.parts = IntStream.range(0, cctx.affinity().partitions()).toArray();
else {
if (parts != null)
this.parts = parts;
else {
Collection<Integer> primaryParts = cctx.affinity().primaryPartitions(
cctx.kernalContext().localNodeId(), topVer);

this.parts = primaryParts.stream().mapToInt(Integer::intValue).toArray();
}
}
}

/** {@inheritDoc} */
Expand All @@ -80,7 +101,7 @@ public abstract class AbstractCacheScan<Row> implements Iterable<Row>, AutoClose

/** */
private synchronized void reserve() {
if (reserved != null)
if (reservation != null)
return;

GridDhtPartitionTopology top = cctx.topology();
Expand All @@ -98,61 +119,42 @@ private synchronized void reserve() {
throw new ClusterTopologyException("Topology was changed. Please retry on stable topology.");
}

List<GridDhtLocalPartition> toReserve;

if (cctx.isReplicated()) {
int partsCnt = cctx.affinity().partitions();

toReserve = new ArrayList<>(partsCnt);

for (int i = 0; i < partsCnt; i++)
toReserve.add(top.localPartition(i));
}
else if (cctx.isPartitioned()) {
assert parts != null;
try {
PartitionReservation reservation;

toReserve = new ArrayList<>(parts.length);
try {
reservation = cctx.kernalContext().query().partitionReservationManager().reservePartitions(
cctx, topVer, explicitParts ? parts : null, ectx.originatingNodeId(), "qryId=" + ectx.queryId());
}
catch (IgniteCheckedException e) {
throw new ClusterTopologyException("Failed to reserve partition for query execution", e);
}

for (int i = 0; i < parts.length; i++)
toReserve.add(top.localPartition(parts[i]));
}
else
toReserve = Collections.emptyList();
if (reservation.failed()) {
reservation.release();

List<GridDhtLocalPartition> reserved = new ArrayList<>(toReserve.size());
throw new ClusterTopologyException(reservation.error());
}

try {
for (GridDhtLocalPartition part : toReserve) {
if (part == null || !part.reserve())
throw new ClusterTopologyException("Failed to reserve partition for query execution. Retry on stable topology.");
else if (part.state() != GridDhtPartitionState.OWNING) {
part.release();
this.reservation = reservation;

throw new ClusterTopologyException("Failed to reserve partition for query execution. Retry on stable topology.");
}
List<GridDhtLocalPartition> reservedParts = new ArrayList<>(parts.length);

reserved.add(part);
}
}
catch (Exception e) {
release();
for (int i = 0; i < parts.length; i++)
reservedParts.add(top.localPartition(parts[i]));

throw e;
this.reservedParts = reservedParts;
}
finally {
this.reserved = reserved;

top.readUnlock();
}
}

/** */
private synchronized void release() {
if (F.isEmpty(reserved))
return;

reserved.forEach(GridDhtLocalPartition::release);
if (reservation != null)
reservation.release();

reserved = null;
reservation = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public IndexScan(

txChanges = ectx.transactionChanges(
cctx.cacheId(),
parts,
cctx.isReplicated() ? null : this.parts,
r -> new IndexRowImpl(rowHnd, r),
this::compare
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ private class IteratorImpl extends GridIteratorAdapter<Row> {

/** */
private IteratorImpl() {
assert reserved != null;
assert reservedParts != null;

parts = new ArrayDeque<>(reserved);
parts = new ArrayDeque<>(reservedParts);

txChanges = F.isEmpty(ectx.getQryTxEntries())
? TransactionChanges.empty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ public class ColocationGroup implements MarshalableMessage {
@GridDirectTransient
private List<List<UUID>> assignments;

/**
* Flag, indacating that assignment is formed by original cache assignment for given topology.
* In case of {@code true} value we can skip assignment marshalling and calc assignment on remote nodes.
*/
@GridDirectTransient
private boolean primaryAssignment;

/** Marshalled assignments. */
private int[] marshalledAssignments;

Expand All @@ -68,7 +75,7 @@ public static ColocationGroup forNodes(List<UUID> nodeIds) {

/** */
public static ColocationGroup forAssignments(List<List<UUID>> assignments) {
return new ColocationGroup(null, null, assignments);
return new ColocationGroup(null, null, assignments, true);
}

/** */
Expand Down Expand Up @@ -100,6 +107,13 @@ private ColocationGroup(long[] sourceIds, List<UUID> nodeIds, List<List<UUID>> a
this.assignments = assignments;
}

/** */
private ColocationGroup(long[] sourceIds, List<UUID> nodeIds, List<List<UUID>> assignments, boolean primaryAssignment) {
this(sourceIds, nodeIds, assignments);

this.primaryAssignment = primaryAssignment;
}

/**
* @return Lists of nodes capable to execute a query fragment for what the mapping is calculated.
*/
Expand Down Expand Up @@ -143,10 +157,10 @@ public boolean belongs(long sourceId) {
*/
public ColocationGroup colocate(ColocationGroup other) throws ColocationMappingException {
long[] srcIds;
if (this.sourceIds == null || other.sourceIds == null)
srcIds = U.firstNotNull(this.sourceIds, other.sourceIds);
if (sourceIds == null || other.sourceIds == null)
srcIds = U.firstNotNull(sourceIds, other.sourceIds);
else
srcIds = LongStream.concat(Arrays.stream(this.sourceIds), Arrays.stream(other.sourceIds)).distinct().toArray();
srcIds = LongStream.concat(Arrays.stream(sourceIds), Arrays.stream(other.sourceIds)).distinct().toArray();

List<UUID> nodeIds;
if (this.nodeIds == null || other.nodeIds == null)
Expand All @@ -159,6 +173,8 @@ public ColocationGroup colocate(ColocationGroup other) throws ColocationMappingE
"Replicated query parts are not co-located on all nodes");
}

boolean primaryAssignment = this.primaryAssignment || other.primaryAssignment;

List<List<UUID>> assignments;
if (this.assignments == null || other.assignments == null) {
assignments = U.firstNotNull(this.assignments, other.assignments);
Expand All @@ -170,11 +186,14 @@ public ColocationGroup colocate(ColocationGroup other) throws ColocationMappingE
for (int i = 0; i < assignments.size(); i++) {
List<UUID> assignment = Commons.intersect(filter, assignments.get(i));

if (assignment.isEmpty()) { // TODO check with partition filters
if (assignment.isEmpty()) {
throw new ColocationMappingException("Failed to map fragment to location. " +
"Partition mapping is empty [part=" + i + "]");
}

if (!assignment.get(0).equals(assignments.get(i).get(0)))
primaryAssignment = false;

assignments0.add(assignment);
}

Expand All @@ -191,14 +210,20 @@ public ColocationGroup colocate(ColocationGroup other) throws ColocationMappingE
if (filter != null)
assignment.retainAll(filter);

if (assignment.isEmpty()) // TODO check with partition filters
throw new ColocationMappingException("Failed to map fragment to location. Partition mapping is empty [part=" + i + "]");
if (assignment.isEmpty()) {
throw new ColocationMappingException("Failed to map fragment to location. " +
"Partition mapping is empty [part=" + i + "]");
}

if (!assignment.get(0).equals(this.assignments.get(i).get(0))
|| !assignment.get(0).equals(other.assignments.get(i).get(0)))
primaryAssignment = false;

assignments.add(assignment);
}
}

return new ColocationGroup(srcIds, nodeIds, assignments);
return new ColocationGroup(srcIds, nodeIds, assignments, primaryAssignment);
}

/** */
Expand All @@ -216,7 +241,16 @@ public ColocationGroup finalizeMapping() {
assignments.add(first != null ? Collections.singletonList(first) : Collections.emptyList());
}

return new ColocationGroup(sourceIds, new ArrayList<>(nodes), assignments);
return new ColocationGroup(sourceIds, new ArrayList<>(nodes), assignments, primaryAssignment);
}

/** */
public ColocationGroup explicitMapping() {
if (assignments == null || !primaryAssignment)
return this;

// Make a shallow copy without cacheAssignment flag.
return new ColocationGroup(sourceIds, nodeIds, assignments, false);
}

/** */
Expand Down Expand Up @@ -359,7 +393,7 @@ public int[] partitions(UUID nodeId) {

/** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) {
if (assignments != null && marshalledAssignments == null) {
if (assignments != null && marshalledAssignments == null && !primaryAssignment) {
Map<UUID, Integer> nodeIdxs = new HashMap<>();

for (int i = 0; i < nodeIds.size(); i++)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,11 @@ public FragmentMapping mapping() {
if (mapping != null)
mapping.prepareMarshal(ctx);

if (target != null)
if (target != null) {
target = target.explicitMapping();

target.prepareMarshal(ctx);
}

if (remoteSources0 == null && remoteSources != null) {
remoteSources0 = U.newHashMap(remoteSources.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,18 +567,25 @@ private ColocationGroup replicatedGroup(@NotNull AffinityTopologyVersion topVer)
List<ClusterNode> nodes = cctx.discovery().discoCache(topVer).cacheGroupAffinityNodes(cctx.groupId());
List<UUID> nodes0;

if (!top.rebalanceFinished(topVer)) {
nodes0 = new ArrayList<>(nodes.size());
top.readLock();

int parts = top.partitions();
try {
if (!top.rebalanceFinished(topVer)) {
nodes0 = new ArrayList<>(nodes.size());

int parts = top.partitions();

for (ClusterNode node : nodes) {
if (isOwner(node.id(), top, parts))
nodes0.add(node.id());
for (ClusterNode node : nodes) {
if (isOwner(node.id(), top, parts))
nodes0.add(node.id());
}
}
else
nodes0 = Commons.transform(nodes, ClusterNode::id);
}
finally {
top.readUnlock();
}
else
nodes0 = Commons.transform(nodes, ClusterNode::id);

return ColocationGroup.forNodes(nodes0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.ignite.internal.processors.query.h2.twostep;
package org.apache.ignite.internal.processors.cache.distributed.dht.topology;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.ignite.internal.processors.query.h2.twostep;
package org.apache.ignite.internal.processors.cache.distributed.dht.topology;

import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.typedef.F;
Expand Down
Loading

0 comments on commit a885623

Please sign in to comment.