From 4205a360cc4643e50414038e0a815fba6ee27efe Mon Sep 17 00:00:00 2001 From: Aleksey Plekhanov Date: Fri, 27 Dec 2024 10:31:47 +0300 Subject: [PATCH] IGNITE-23975 Review comments fixed --- .../calcite/metadata/ColocationGroup.java | 44 +++++++++---------- .../schema/CacheTableDescriptorImpl.java | 2 +- .../topology/PartitionReservationManager.java | 32 +++++++------- 3 files changed, 39 insertions(+), 39 deletions(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java index 21ea5e4a29ca8..af9fadc572363 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java @@ -63,7 +63,7 @@ public class ColocationGroup implements MarshalableMessage { * In case of {@code true} value we can skip assignment marshalling and calc assignment on remote nodes. */ @GridDirectTransient - private boolean cacheAssignment; + private boolean primaryAssignment; /** Marshalled assignments. */ private int[] marshalledAssignments; @@ -73,14 +73,9 @@ public static ColocationGroup forNodes(List nodeIds) { return new ColocationGroup(null, nodeIds, null); } - /** */ - public static ColocationGroup forCacheAssignment(List> assignments) { - return new ColocationGroup(null, null, assignments, true); - } - /** */ public static ColocationGroup forAssignments(List> assignments) { - return new ColocationGroup(null, null, assignments); + return new ColocationGroup(null, null, assignments, true); } /** */ @@ -113,10 +108,10 @@ private ColocationGroup(long[] sourceIds, List nodeIds, List> a } /** */ - private ColocationGroup(long[] sourceIds, List nodeIds, List> assignments, boolean cacheAssignment) { + private ColocationGroup(long[] sourceIds, List nodeIds, List> assignments, boolean primaryAssignment) { this(sourceIds, nodeIds, assignments); - this.cacheAssignment = cacheAssignment; + this.primaryAssignment = primaryAssignment; } /** @@ -162,10 +157,10 @@ public boolean belongs(long sourceId) { */ public ColocationGroup colocate(ColocationGroup other) throws ColocationMappingException { long[] srcIds; - if (this.sourceIds == null || other.sourceIds == null) - srcIds = U.firstNotNull(this.sourceIds, other.sourceIds); + if (sourceIds == null || other.sourceIds == null) + srcIds = U.firstNotNull(sourceIds, other.sourceIds); else - srcIds = LongStream.concat(Arrays.stream(this.sourceIds), Arrays.stream(other.sourceIds)).distinct().toArray(); + srcIds = LongStream.concat(Arrays.stream(sourceIds), Arrays.stream(other.sourceIds)).distinct().toArray(); List nodeIds; if (this.nodeIds == null || other.nodeIds == null) @@ -178,7 +173,7 @@ public ColocationGroup colocate(ColocationGroup other) throws ColocationMappingE "Replicated query parts are not co-located on all nodes"); } - boolean cacheAssignment = this.cacheAssignment || other.cacheAssignment; + boolean primaryAssignment = this.primaryAssignment || other.primaryAssignment; List> assignments; if (this.assignments == null || other.assignments == null) { @@ -191,13 +186,13 @@ public ColocationGroup colocate(ColocationGroup other) throws ColocationMappingE for (int i = 0; i < assignments.size(); i++) { List assignment = Commons.intersect(filter, assignments.get(i)); - if (assignment.isEmpty()) { // TODO check with partition filters + if (assignment.isEmpty()) { throw new ColocationMappingException("Failed to map fragment to location. " + "Partition mapping is empty [part=" + i + "]"); } if (!assignment.get(0).equals(assignments.get(i).get(0))) - cacheAssignment = false; + primaryAssignment = false; assignments0.add(assignment); } @@ -215,17 +210,20 @@ public ColocationGroup colocate(ColocationGroup other) throws ColocationMappingE if (filter != null) assignment.retainAll(filter); - if (assignment.isEmpty()) // TODO check with partition filters - throw new ColocationMappingException("Failed to map fragment to location. Partition mapping is empty [part=" + i + "]"); + if (assignment.isEmpty()) { + throw new ColocationMappingException("Failed to map fragment to location. " + + "Partition mapping is empty [part=" + i + "]"); + } - if (!assignment.get(0).equals(this.assignments.get(i).get(0)) || !assignment.get(0).equals(other.assignments.get(i).get(0))) - cacheAssignment = false; + if (!assignment.get(0).equals(this.assignments.get(i).get(0)) + || !assignment.get(0).equals(other.assignments.get(i).get(0))) + primaryAssignment = false; assignments.add(assignment); } } - return new ColocationGroup(srcIds, nodeIds, assignments, cacheAssignment); + return new ColocationGroup(srcIds, nodeIds, assignments, primaryAssignment); } /** */ @@ -243,12 +241,12 @@ public ColocationGroup finalizeMapping() { assignments.add(first != null ? Collections.singletonList(first) : Collections.emptyList()); } - return new ColocationGroup(sourceIds, new ArrayList<>(nodes), assignments, cacheAssignment); + return new ColocationGroup(sourceIds, new ArrayList<>(nodes), assignments, primaryAssignment); } /** */ public ColocationGroup explicitMapping() { - if (assignments == null || !cacheAssignment) + if (assignments == null || !primaryAssignment) return this; // Make a shallow copy without cacheAssignment flag. @@ -395,7 +393,7 @@ public int[] partitions(UUID nodeId) { /** {@inheritDoc} */ @Override public void prepareMarshal(GridCacheSharedContext ctx) { - if (assignments != null && marshalledAssignments == null && !cacheAssignment) { + if (assignments != null && marshalledAssignments == null && !primaryAssignment) { Map nodeIdxs = new HashMap<>(); for (int i = 0; i < nodeIds.size(); i++) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java index 70f4b1d5b61a3..11dca7c96ff93 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java @@ -587,7 +587,7 @@ private ColocationGroup partitionedGroup(@NotNull AffinityTopologyVersion topVer assignments0.add(F.isEmpty(partNodes) ? emptyList() : singletonList(F.first(partNodes).id())); } - return ColocationGroup.forCacheAssignment(assignments0); + return ColocationGroup.forAssignments(assignments0); } /** */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservationManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservationManager.java index 58fd80abc1450..fa44fc6a10a76 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservationManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservationManager.java @@ -83,11 +83,11 @@ public PartitionReservationManager(GridKernalContext ctx) { /** * @param top Partition topology. - * @param p Partition ID. + * @param partId Partition ID. * @return Partition. */ - private static GridDhtLocalPartition partition(GridDhtPartitionTopology top, int p) { - return top.localPartition(p, NONE, false); + private static GridDhtLocalPartition partition(GridDhtPartitionTopology top, int partId) { + return top.localPartition(partId, NONE, false); } /** @@ -102,7 +102,7 @@ private static GridDhtLocalPartition partition(GridDhtPartitionTopology top, int public PartitionReservation reservePartitions( @Nullable List cacheIds, AffinityTopologyVersion reqTopVer, - final int[] explicitParts, + int[] explicitParts, UUID nodeId, long reqId ) throws IgniteCheckedException { @@ -154,7 +154,7 @@ public PartitionReservation reservePartitions( public PartitionReservation reservePartitions( GridCacheContext cctx, AffinityTopologyVersion reqTopVer, - final int[] explicitParts, + int[] explicitParts, UUID nodeId, String qryInfo ) throws IgniteCheckedException { @@ -179,17 +179,17 @@ public PartitionReservation reservePartitions( private @Nullable String reservePartitions( List reserved, GridCacheContext cctx, - Collection partIds, + @Nullable Collection explicitParts, AffinityTopologyVersion topVer, UUID nodeId, String qryInfo ) throws IgniteCheckedException { // For replicated cache topology version does not make sense. - final PartitionReservationKey grpKey = new PartitionReservationKey(cctx.name(), cctx.isReplicated() ? null : topVer); + PartitionReservationKey grpKey = new PartitionReservationKey(cctx.name(), cctx.isReplicated() ? null : topVer); GridReservable r = reservations.get(grpKey); - if (partIds == null && r != null) // Try to reserve group partition if any and no explicits. + if (explicitParts == null && r != null) // Try to reserve group partition if any and no explicits. return groupPartitionReservation(reserved, r, cctx, topVer, nodeId, qryInfo); else { // Try to reserve partitions one by one. int partsCnt = cctx.affinity().partitions(); @@ -237,7 +237,7 @@ public PartitionReservation reservePartitions( } } else { // Reserve primary partitions for partitioned cache (if no explicit given). - Collection partIds0 = partIds != null ? partIds + Collection partIds = explicitParts != null ? explicitParts : cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer); int reservedCnt = 0; @@ -247,7 +247,7 @@ public PartitionReservation reservePartitions( top.readLock(); try { - for (int partId : partIds0) { + for (int partId : partIds) { GridDhtLocalPartition part = partition(top, partId); GridDhtPartitionState partState = part != null ? part.state() : null; @@ -303,9 +303,9 @@ public PartitionReservation reservePartitions( } MTC.span().addLog(() -> "Cache partitions were reserved [cache=" + cctx.name() + - ", partitions=" + partIds0 + ", topology=" + topVer + ']'); + ", partitions=" + partIds + ", topology=" + topVer + ']'); - if (partIds == null && reservedCnt > 0) { + if (explicitParts == null && reservedCnt > 0) { // We reserved all the primary partitions for cache, attempt to add group reservation. GridDhtPartitionsReservation grp = new GridDhtPartitionsReservation(topVer, cctx, "SQL"); @@ -349,8 +349,10 @@ public void onCacheStop(String cacheName) { * @param cctx Cache context. * @param part Partition. */ - private static void failQueryOnLostData(GridCacheContext cctx, GridDhtLocalPartition part) - throws IgniteCheckedException { + private static void failQueryOnLostData( + GridCacheContext cctx, + GridDhtLocalPartition part + ) throws IgniteCheckedException { throw new CacheInvalidStateException("Failed to execute query because cache partition has been " + "lost [cacheName=" + cctx.name() + ", part=" + part + ']'); } @@ -358,7 +360,7 @@ private static void failQueryOnLostData(GridCacheContext cctx, GridDhtLoca /** * Cleanup group reservations cache on change affinity version. */ - @Override public void onDoneAfterTopologyUnlock(final GridDhtPartitionsExchangeFuture fut) { + @Override public void onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) { try { // Must not do anything at the exchange thread. Dispatch to the management thread pool. ctx.closure().runLocal(