Skip to content

Commit

Permalink
IGNITE-23975 Review comments fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-plekhanov committed Dec 27, 2024
1 parent b3d29c0 commit 4205a36
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public class ColocationGroup implements MarshalableMessage {
* In case of {@code true} value we can skip assignment marshalling and calc assignment on remote nodes.
*/
@GridDirectTransient
private boolean cacheAssignment;
private boolean primaryAssignment;

/** Marshalled assignments. */
private int[] marshalledAssignments;
Expand All @@ -73,14 +73,9 @@ public static ColocationGroup forNodes(List<UUID> nodeIds) {
return new ColocationGroup(null, nodeIds, null);
}

/** */
public static ColocationGroup forCacheAssignment(List<List<UUID>> assignments) {
return new ColocationGroup(null, null, assignments, true);
}

/** */
public static ColocationGroup forAssignments(List<List<UUID>> assignments) {
return new ColocationGroup(null, null, assignments);
return new ColocationGroup(null, null, assignments, true);
}

/** */
Expand Down Expand Up @@ -113,10 +108,10 @@ private ColocationGroup(long[] sourceIds, List<UUID> nodeIds, List<List<UUID>> a
}

/** */
private ColocationGroup(long[] sourceIds, List<UUID> nodeIds, List<List<UUID>> assignments, boolean cacheAssignment) {
private ColocationGroup(long[] sourceIds, List<UUID> nodeIds, List<List<UUID>> assignments, boolean primaryAssignment) {
this(sourceIds, nodeIds, assignments);

this.cacheAssignment = cacheAssignment;
this.primaryAssignment = primaryAssignment;
}

/**
Expand Down Expand Up @@ -162,10 +157,10 @@ public boolean belongs(long sourceId) {
*/
public ColocationGroup colocate(ColocationGroup other) throws ColocationMappingException {
long[] srcIds;
if (this.sourceIds == null || other.sourceIds == null)
srcIds = U.firstNotNull(this.sourceIds, other.sourceIds);
if (sourceIds == null || other.sourceIds == null)
srcIds = U.firstNotNull(sourceIds, other.sourceIds);
else
srcIds = LongStream.concat(Arrays.stream(this.sourceIds), Arrays.stream(other.sourceIds)).distinct().toArray();
srcIds = LongStream.concat(Arrays.stream(sourceIds), Arrays.stream(other.sourceIds)).distinct().toArray();

List<UUID> nodeIds;
if (this.nodeIds == null || other.nodeIds == null)
Expand All @@ -178,7 +173,7 @@ public ColocationGroup colocate(ColocationGroup other) throws ColocationMappingE
"Replicated query parts are not co-located on all nodes");
}

boolean cacheAssignment = this.cacheAssignment || other.cacheAssignment;
boolean primaryAssignment = this.primaryAssignment || other.primaryAssignment;

List<List<UUID>> assignments;
if (this.assignments == null || other.assignments == null) {
Expand All @@ -191,13 +186,13 @@ public ColocationGroup colocate(ColocationGroup other) throws ColocationMappingE
for (int i = 0; i < assignments.size(); i++) {
List<UUID> assignment = Commons.intersect(filter, assignments.get(i));

if (assignment.isEmpty()) { // TODO check with partition filters
if (assignment.isEmpty()) {
throw new ColocationMappingException("Failed to map fragment to location. " +
"Partition mapping is empty [part=" + i + "]");
}

if (!assignment.get(0).equals(assignments.get(i).get(0)))
cacheAssignment = false;
primaryAssignment = false;

assignments0.add(assignment);
}
Expand All @@ -215,17 +210,20 @@ public ColocationGroup colocate(ColocationGroup other) throws ColocationMappingE
if (filter != null)
assignment.retainAll(filter);

if (assignment.isEmpty()) // TODO check with partition filters
throw new ColocationMappingException("Failed to map fragment to location. Partition mapping is empty [part=" + i + "]");
if (assignment.isEmpty()) {
throw new ColocationMappingException("Failed to map fragment to location. " +
"Partition mapping is empty [part=" + i + "]");
}

if (!assignment.get(0).equals(this.assignments.get(i).get(0)) || !assignment.get(0).equals(other.assignments.get(i).get(0)))
cacheAssignment = false;
if (!assignment.get(0).equals(this.assignments.get(i).get(0))
|| !assignment.get(0).equals(other.assignments.get(i).get(0)))
primaryAssignment = false;

assignments.add(assignment);
}
}

return new ColocationGroup(srcIds, nodeIds, assignments, cacheAssignment);
return new ColocationGroup(srcIds, nodeIds, assignments, primaryAssignment);
}

/** */
Expand All @@ -243,12 +241,12 @@ public ColocationGroup finalizeMapping() {
assignments.add(first != null ? Collections.singletonList(first) : Collections.emptyList());
}

return new ColocationGroup(sourceIds, new ArrayList<>(nodes), assignments, cacheAssignment);
return new ColocationGroup(sourceIds, new ArrayList<>(nodes), assignments, primaryAssignment);
}

/** */
public ColocationGroup explicitMapping() {
if (assignments == null || !cacheAssignment)
if (assignments == null || !primaryAssignment)
return this;

// Make a shallow copy without cacheAssignment flag.
Expand Down Expand Up @@ -395,7 +393,7 @@ public int[] partitions(UUID nodeId) {

/** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) {
if (assignments != null && marshalledAssignments == null && !cacheAssignment) {
if (assignments != null && marshalledAssignments == null && !primaryAssignment) {
Map<UUID, Integer> nodeIdxs = new HashMap<>();

for (int i = 0; i < nodeIds.size(); i++)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ private ColocationGroup partitionedGroup(@NotNull AffinityTopologyVersion topVer
assignments0.add(F.isEmpty(partNodes) ? emptyList() : singletonList(F.first(partNodes).id()));
}

return ColocationGroup.forCacheAssignment(assignments0);
return ColocationGroup.forAssignments(assignments0);
}

/** */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ public PartitionReservationManager(GridKernalContext ctx) {

/**
* @param top Partition topology.
* @param p Partition ID.
* @param partId Partition ID.
* @return Partition.
*/
private static GridDhtLocalPartition partition(GridDhtPartitionTopology top, int p) {
return top.localPartition(p, NONE, false);
private static GridDhtLocalPartition partition(GridDhtPartitionTopology top, int partId) {
return top.localPartition(partId, NONE, false);
}

/**
Expand All @@ -102,7 +102,7 @@ private static GridDhtLocalPartition partition(GridDhtPartitionTopology top, int
public PartitionReservation reservePartitions(
@Nullable List<Integer> cacheIds,
AffinityTopologyVersion reqTopVer,
final int[] explicitParts,
int[] explicitParts,
UUID nodeId,
long reqId
) throws IgniteCheckedException {
Expand Down Expand Up @@ -154,7 +154,7 @@ public PartitionReservation reservePartitions(
public PartitionReservation reservePartitions(
GridCacheContext<?, ?> cctx,
AffinityTopologyVersion reqTopVer,
final int[] explicitParts,
int[] explicitParts,
UUID nodeId,
String qryInfo
) throws IgniteCheckedException {
Expand All @@ -179,17 +179,17 @@ public PartitionReservation reservePartitions(
private @Nullable String reservePartitions(
List<GridReservable> reserved,
GridCacheContext<?, ?> cctx,
Collection<Integer> partIds,
@Nullable Collection<Integer> explicitParts,
AffinityTopologyVersion topVer,
UUID nodeId,
String qryInfo
) throws IgniteCheckedException {
// For replicated cache topology version does not make sense.
final PartitionReservationKey grpKey = new PartitionReservationKey(cctx.name(), cctx.isReplicated() ? null : topVer);
PartitionReservationKey grpKey = new PartitionReservationKey(cctx.name(), cctx.isReplicated() ? null : topVer);

GridReservable r = reservations.get(grpKey);

if (partIds == null && r != null) // Try to reserve group partition if any and no explicits.
if (explicitParts == null && r != null) // Try to reserve group partition if any and no explicits.
return groupPartitionReservation(reserved, r, cctx, topVer, nodeId, qryInfo);
else { // Try to reserve partitions one by one.
int partsCnt = cctx.affinity().partitions();
Expand Down Expand Up @@ -237,7 +237,7 @@ public PartitionReservation reservePartitions(
}
}
else { // Reserve primary partitions for partitioned cache (if no explicit given).
Collection<Integer> partIds0 = partIds != null ? partIds
Collection<Integer> partIds = explicitParts != null ? explicitParts
: cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer);

int reservedCnt = 0;
Expand All @@ -247,7 +247,7 @@ public PartitionReservation reservePartitions(
top.readLock();

try {
for (int partId : partIds0) {
for (int partId : partIds) {
GridDhtLocalPartition part = partition(top, partId);

GridDhtPartitionState partState = part != null ? part.state() : null;
Expand Down Expand Up @@ -303,9 +303,9 @@ public PartitionReservation reservePartitions(
}

MTC.span().addLog(() -> "Cache partitions were reserved [cache=" + cctx.name() +
", partitions=" + partIds0 + ", topology=" + topVer + ']');
", partitions=" + partIds + ", topology=" + topVer + ']');

if (partIds == null && reservedCnt > 0) {
if (explicitParts == null && reservedCnt > 0) {
// We reserved all the primary partitions for cache, attempt to add group reservation.
GridDhtPartitionsReservation grp = new GridDhtPartitionsReservation(topVer, cctx, "SQL");

Expand Down Expand Up @@ -349,16 +349,18 @@ public void onCacheStop(String cacheName) {
* @param cctx Cache context.
* @param part Partition.
*/
private static void failQueryOnLostData(GridCacheContext<?, ?> cctx, GridDhtLocalPartition part)
throws IgniteCheckedException {
private static void failQueryOnLostData(
GridCacheContext<?, ?> cctx,
GridDhtLocalPartition part
) throws IgniteCheckedException {
throw new CacheInvalidStateException("Failed to execute query because cache partition has been " +
"lost [cacheName=" + cctx.name() + ", part=" + part + ']');
}

/**
* Cleanup group reservations cache on change affinity version.
*/
@Override public void onDoneAfterTopologyUnlock(final GridDhtPartitionsExchangeFuture fut) {
@Override public void onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
try {
// Must not do anything at the exchange thread. Dispatch to the management thread pool.
ctx.closure().runLocal(
Expand Down

0 comments on commit 4205a36

Please sign in to comment.