From af5d1ddbee2a6216e9f5a6b17b499df22b1e9aa0 Mon Sep 17 00:00:00 2001 From: Aleksey Plekhanov Date: Fri, 27 Dec 2024 21:22:05 +0300 Subject: [PATCH 1/2] IGNITE-23975 SQL Calcite: Add group partitions reservation - Fixes #11758. Signed-off-by: Aleksey Plekhanov --- .../query/calcite/exec/AbstractCacheScan.java | 94 ++-- .../query/calcite/exec/IndexScan.java | 2 +- .../query/calcite/exec/TableScan.java | 4 +- .../calcite/metadata/ColocationGroup.java | 54 ++- .../calcite/metadata/FragmentDescription.java | 5 +- .../schema/CacheTableDescriptorImpl.java | 23 +- .../dht/topology}/PartitionReservation.java | 2 +- .../topology}/PartitionReservationKey.java | 2 +- .../topology/PartitionReservationManager.java | 443 ++++++++++++++++++ .../processors/query/GridQueryProcessor.java | 15 + .../processors/query/h2/IgniteH2Indexing.java | 8 +- .../processors/query/h2/opt/QueryContext.java | 2 +- .../h2/twostep/GridMapQueryExecutor.java | 1 + .../twostep/PartitionReservationManager.java | 375 --------------- .../processors/query/KillQueryTest.java | 4 +- .../h2/twostep/RetryCauseMessageSelfTest.java | 2 + 16 files changed, 582 insertions(+), 454 deletions(-) rename modules/{indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep => core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology}/PartitionReservation.java (96%) rename modules/{indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep => core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology}/PartitionReservationKey.java (96%) create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservationManager.java delete mode 100644 modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationManager.java diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java index a8126fbce4d9a..1f46ed0d2da72 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java @@ -18,17 +18,18 @@ package org.apache.ignite.internal.processors.query.calcite.exec; import java.util.ArrayList; -import java.util.Collections; +import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.stream.IntStream; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterTopologyException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; -import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; -import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservation; /** */ public abstract class AbstractCacheScan implements Iterable, AutoCloseable { @@ -45,15 +46,35 @@ public abstract class AbstractCacheScan implements Iterable, AutoClose protected final int[] parts; /** */ - protected volatile List reserved; + protected final boolean explicitParts; + + /** */ + private PartitionReservation reservation; + + /** */ + protected volatile List reservedParts; /** */ AbstractCacheScan(ExecutionContext ectx, GridCacheContext cctx, int[] parts) { this.ectx = ectx; this.cctx = cctx; - this.parts = parts; topVer = ectx.topologyVersion(); + + explicitParts = parts != null; + + if (cctx.isReplicated()) + this.parts = IntStream.range(0, cctx.affinity().partitions()).toArray(); + else { + if (parts != null) + this.parts = parts; + else { + Collection primaryParts = cctx.affinity().primaryPartitions( + cctx.kernalContext().localNodeId(), topVer); + + this.parts = primaryParts.stream().mapToInt(Integer::intValue).toArray(); + } + } } /** {@inheritDoc} */ @@ -80,7 +101,7 @@ public abstract class AbstractCacheScan implements Iterable, AutoClose /** */ private synchronized void reserve() { - if (reserved != null) + if (reservation != null) return; GridDhtPartitionTopology top = cctx.topology(); @@ -98,61 +119,42 @@ private synchronized void reserve() { throw new ClusterTopologyException("Topology was changed. Please retry on stable topology."); } - List toReserve; - - if (cctx.isReplicated()) { - int partsCnt = cctx.affinity().partitions(); - - toReserve = new ArrayList<>(partsCnt); - - for (int i = 0; i < partsCnt; i++) - toReserve.add(top.localPartition(i)); - } - else if (cctx.isPartitioned()) { - assert parts != null; + try { + PartitionReservation reservation; - toReserve = new ArrayList<>(parts.length); + try { + reservation = cctx.kernalContext().query().partitionReservationManager().reservePartitions( + cctx, topVer, explicitParts ? parts : null, ectx.originatingNodeId(), "qryId=" + ectx.queryId()); + } + catch (IgniteCheckedException e) { + throw new ClusterTopologyException("Failed to reserve partition for query execution", e); + } - for (int i = 0; i < parts.length; i++) - toReserve.add(top.localPartition(parts[i])); - } - else - toReserve = Collections.emptyList(); + if (reservation.failed()) { + reservation.release(); - List reserved = new ArrayList<>(toReserve.size()); + throw new ClusterTopologyException(reservation.error()); + } - try { - for (GridDhtLocalPartition part : toReserve) { - if (part == null || !part.reserve()) - throw new ClusterTopologyException("Failed to reserve partition for query execution. Retry on stable topology."); - else if (part.state() != GridDhtPartitionState.OWNING) { - part.release(); + this.reservation = reservation; - throw new ClusterTopologyException("Failed to reserve partition for query execution. Retry on stable topology."); - } + List reservedParts = new ArrayList<>(parts.length); - reserved.add(part); - } - } - catch (Exception e) { - release(); + for (int i = 0; i < parts.length; i++) + reservedParts.add(top.localPartition(parts[i])); - throw e; + this.reservedParts = reservedParts; } finally { - this.reserved = reserved; - top.readUnlock(); } } /** */ private synchronized void release() { - if (F.isEmpty(reserved)) - return; - - reserved.forEach(GridDhtLocalPartition::release); + if (reservation != null) + reservation.release(); - reserved = null; + reservation = null; } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java index 2077996aa0909..d61f302b0ab1b 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java @@ -116,7 +116,7 @@ public IndexScan( txChanges = ectx.transactionChanges( cctx.cacheId(), - parts, + cctx.isReplicated() ? null : this.parts, r -> new IndexRowImpl(rowHnd, r), this::compare ); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java index f008f3f6af3c1..c16252b902a0f 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java @@ -74,9 +74,9 @@ private class IteratorImpl extends GridIteratorAdapter { /** */ private IteratorImpl() { - assert reserved != null; + assert reservedParts != null; - parts = new ArrayDeque<>(reserved); + parts = new ArrayDeque<>(reservedParts); txChanges = F.isEmpty(ectx.getQryTxEntries()) ? TransactionChanges.empty() diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java index c5e652a00afe9..af9fadc572363 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java @@ -58,6 +58,13 @@ public class ColocationGroup implements MarshalableMessage { @GridDirectTransient private List> assignments; + /** + * Flag, indacating that assignment is formed by original cache assignment for given topology. + * In case of {@code true} value we can skip assignment marshalling and calc assignment on remote nodes. + */ + @GridDirectTransient + private boolean primaryAssignment; + /** Marshalled assignments. */ private int[] marshalledAssignments; @@ -68,7 +75,7 @@ public static ColocationGroup forNodes(List nodeIds) { /** */ public static ColocationGroup forAssignments(List> assignments) { - return new ColocationGroup(null, null, assignments); + return new ColocationGroup(null, null, assignments, true); } /** */ @@ -100,6 +107,13 @@ private ColocationGroup(long[] sourceIds, List nodeIds, List> a this.assignments = assignments; } + /** */ + private ColocationGroup(long[] sourceIds, List nodeIds, List> assignments, boolean primaryAssignment) { + this(sourceIds, nodeIds, assignments); + + this.primaryAssignment = primaryAssignment; + } + /** * @return Lists of nodes capable to execute a query fragment for what the mapping is calculated. */ @@ -143,10 +157,10 @@ public boolean belongs(long sourceId) { */ public ColocationGroup colocate(ColocationGroup other) throws ColocationMappingException { long[] srcIds; - if (this.sourceIds == null || other.sourceIds == null) - srcIds = U.firstNotNull(this.sourceIds, other.sourceIds); + if (sourceIds == null || other.sourceIds == null) + srcIds = U.firstNotNull(sourceIds, other.sourceIds); else - srcIds = LongStream.concat(Arrays.stream(this.sourceIds), Arrays.stream(other.sourceIds)).distinct().toArray(); + srcIds = LongStream.concat(Arrays.stream(sourceIds), Arrays.stream(other.sourceIds)).distinct().toArray(); List nodeIds; if (this.nodeIds == null || other.nodeIds == null) @@ -159,6 +173,8 @@ public ColocationGroup colocate(ColocationGroup other) throws ColocationMappingE "Replicated query parts are not co-located on all nodes"); } + boolean primaryAssignment = this.primaryAssignment || other.primaryAssignment; + List> assignments; if (this.assignments == null || other.assignments == null) { assignments = U.firstNotNull(this.assignments, other.assignments); @@ -170,11 +186,14 @@ public ColocationGroup colocate(ColocationGroup other) throws ColocationMappingE for (int i = 0; i < assignments.size(); i++) { List assignment = Commons.intersect(filter, assignments.get(i)); - if (assignment.isEmpty()) { // TODO check with partition filters + if (assignment.isEmpty()) { throw new ColocationMappingException("Failed to map fragment to location. " + "Partition mapping is empty [part=" + i + "]"); } + if (!assignment.get(0).equals(assignments.get(i).get(0))) + primaryAssignment = false; + assignments0.add(assignment); } @@ -191,14 +210,20 @@ public ColocationGroup colocate(ColocationGroup other) throws ColocationMappingE if (filter != null) assignment.retainAll(filter); - if (assignment.isEmpty()) // TODO check with partition filters - throw new ColocationMappingException("Failed to map fragment to location. Partition mapping is empty [part=" + i + "]"); + if (assignment.isEmpty()) { + throw new ColocationMappingException("Failed to map fragment to location. " + + "Partition mapping is empty [part=" + i + "]"); + } + + if (!assignment.get(0).equals(this.assignments.get(i).get(0)) + || !assignment.get(0).equals(other.assignments.get(i).get(0))) + primaryAssignment = false; assignments.add(assignment); } } - return new ColocationGroup(srcIds, nodeIds, assignments); + return new ColocationGroup(srcIds, nodeIds, assignments, primaryAssignment); } /** */ @@ -216,7 +241,16 @@ public ColocationGroup finalizeMapping() { assignments.add(first != null ? Collections.singletonList(first) : Collections.emptyList()); } - return new ColocationGroup(sourceIds, new ArrayList<>(nodes), assignments); + return new ColocationGroup(sourceIds, new ArrayList<>(nodes), assignments, primaryAssignment); + } + + /** */ + public ColocationGroup explicitMapping() { + if (assignments == null || !primaryAssignment) + return this; + + // Make a shallow copy without cacheAssignment flag. + return new ColocationGroup(sourceIds, nodeIds, assignments, false); } /** */ @@ -359,7 +393,7 @@ public int[] partitions(UUID nodeId) { /** {@inheritDoc} */ @Override public void prepareMarshal(GridCacheSharedContext ctx) { - if (assignments != null && marshalledAssignments == null) { + if (assignments != null && marshalledAssignments == null && !primaryAssignment) { Map nodeIdxs = new HashMap<>(); for (int i = 0; i < nodeIds.size(); i++) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentDescription.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentDescription.java index dfef48e33469a..f1dc050aa8225 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentDescription.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentDescription.java @@ -192,8 +192,11 @@ public FragmentMapping mapping() { if (mapping != null) mapping.prepareMarshal(ctx); - if (target != null) + if (target != null) { + target = target.explicitMapping(); + target.prepareMarshal(ctx); + } if (remoteSources0 == null && remoteSources != null) { remoteSources0 = U.newHashMap(remoteSources.size()); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java index ab704a6850da3..11dca7c96ff93 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java @@ -599,18 +599,25 @@ private ColocationGroup replicatedGroup(@NotNull AffinityTopologyVersion topVer) List nodes = cctx.discovery().discoCache(topVer).cacheGroupAffinityNodes(cctx.groupId()); List nodes0; - if (!top.rebalanceFinished(topVer)) { - nodes0 = new ArrayList<>(nodes.size()); + top.readLock(); - int parts = top.partitions(); + try { + if (!top.rebalanceFinished(topVer)) { + nodes0 = new ArrayList<>(nodes.size()); + + int parts = top.partitions(); - for (ClusterNode node : nodes) { - if (isOwner(node.id(), top, parts)) - nodes0.add(node.id()); + for (ClusterNode node : nodes) { + if (isOwner(node.id(), top, parts)) + nodes0.add(node.id()); + } } + else + nodes0 = Commons.transform(nodes, ClusterNode::id); + } + finally { + top.readUnlock(); } - else - nodes0 = Commons.transform(nodes, ClusterNode::id); return ColocationGroup.forNodes(nodes0); } diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservation.java similarity index 96% rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservation.java rename to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservation.java index b5593e58d8eae..ae21192819dfb 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservation.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservation.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.ignite.internal.processors.query.h2.twostep; +package org.apache.ignite.internal.processors.cache.distributed.dht.topology; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservationKey.java similarity index 96% rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationKey.java rename to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservationKey.java index 0fad2c4dd0926..60911e7ac8472 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationKey.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservationKey.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.ignite.internal.processors.query.h2.twostep; +package org.apache.ignite.internal.processors.cache.distributed.dht.topology; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.util.typedef.F; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservationManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservationManager.java new file mode 100644 index 0000000000000..fa44fc6a10a76 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservationManager.java @@ -0,0 +1,443 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.topology; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.failure.FailureContext; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheInvalidStateException; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware; +import org.apache.ignite.internal.processors.tracing.MTC; +import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; +import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.F; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; +import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING; +import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_PARTITIONS_RESERVE; + +/** + * Class responsible for partition reservation for queries executed on local node. Prevents partitions from being + * evicted from node during query execution. + */ +public class PartitionReservationManager implements PartitionsExchangeAware { + /** Special instance of reservable object for REPLICATED caches. */ + private static final ReplicatedReservable REPLICATED_RESERVABLE = new ReplicatedReservable(); + + /** Kernal context. */ + private final GridKernalContext ctx; + + /** + * Group reservations cache. When affinity version is not changed and all primary partitions must be reserved we get + * group reservation from this map instead of create new reservation group. + */ + private final ConcurrentMap reservations = new ConcurrentHashMap<>(); + + /** Logger. */ + private final IgniteLogger log; + + /** + * Constructor. + * + * @param ctx Context. + */ + public PartitionReservationManager(GridKernalContext ctx) { + this.ctx = ctx; + + log = ctx.log(PartitionReservationManager.class); + + ctx.cache().context().exchange().registerExchangeAwareComponent(this); + } + + /** + * @param top Partition topology. + * @param partId Partition ID. + * @return Partition. + */ + private static GridDhtLocalPartition partition(GridDhtPartitionTopology top, int partId) { + return top.localPartition(partId, NONE, false); + } + + /** + * @param cacheIds Cache IDs. + * @param reqTopVer Topology version from request. + * @param explicitParts Explicit partitions list. + * @param nodeId Node ID. + * @param reqId Request ID. + * @return PartitionReservation instance with reservation result. + * @throws IgniteCheckedException If failed. + */ + public PartitionReservation reservePartitions( + @Nullable List cacheIds, + AffinityTopologyVersion reqTopVer, + int[] explicitParts, + UUID nodeId, + long reqId + ) throws IgniteCheckedException { + try (TraceSurroundings ignored = MTC.support(ctx.tracing().create(SQL_PARTITIONS_RESERVE, MTC.span()))) { + assert reqTopVer != null; + + AffinityTopologyVersion topVer = ctx.cache().context().exchange().lastAffinityChangedTopologyVersion(reqTopVer); + + if (F.isEmpty(cacheIds)) + return new PartitionReservation(Collections.emptyList()); + + Collection partIds = partsToCollection(explicitParts); + + List reserved = new ArrayList<>(); + + for (int i = 0; i < cacheIds.size(); i++) { + GridCacheContext cctx = ctx.cache().context().cacheContext(cacheIds.get(i)); + + // Cache was not found, probably was not deployed yet. + if (cctx == null) { + return new PartitionReservation(reserved, + String.format("Failed to reserve partitions for query (cache is not " + + "found on local node) [localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s]", + ctx.localNodeId(), nodeId, reqId, topVer, cacheIds.get(i))); + } + + if (!cctx.rebalanceEnabled()) + continue; + + String err = reservePartitions(reserved, cctx, partIds, topVer, nodeId, "reqId=" + reqId); + + if (err != null) + return new PartitionReservation(reserved, err); + } + + return new PartitionReservation(reserved); + } + } + + /** + * @param cctx Cache context. + * @param reqTopVer Topology version from request. + * @param explicitParts Explicit partitions list. + * @param nodeId Node ID. + * @param qryInfo Query info. + * @return PartitionReservation instance with reservation result. + * @throws IgniteCheckedException If failed. + */ + public PartitionReservation reservePartitions( + GridCacheContext cctx, + AffinityTopologyVersion reqTopVer, + int[] explicitParts, + UUID nodeId, + String qryInfo + ) throws IgniteCheckedException { + try (TraceSurroundings ignored = MTC.support(ctx.tracing().create(SQL_PARTITIONS_RESERVE, MTC.span()))) { + assert reqTopVer != null; + + AffinityTopologyVersion topVer = ctx.cache().context().exchange().lastAffinityChangedTopologyVersion(reqTopVer); + + Collection partIds = partsToCollection(explicitParts); + + List reserved = new ArrayList<>(); + + String err = reservePartitions(reserved, cctx, partIds, topVer, nodeId, qryInfo); + + return new PartitionReservation(reserved, err); + } + } + + /** + * @return Error message or {@code null}. + */ + private @Nullable String reservePartitions( + List reserved, + GridCacheContext cctx, + @Nullable Collection explicitParts, + AffinityTopologyVersion topVer, + UUID nodeId, + String qryInfo + ) throws IgniteCheckedException { + // For replicated cache topology version does not make sense. + PartitionReservationKey grpKey = new PartitionReservationKey(cctx.name(), cctx.isReplicated() ? null : topVer); + + GridReservable r = reservations.get(grpKey); + + if (explicitParts == null && r != null) // Try to reserve group partition if any and no explicits. + return groupPartitionReservation(reserved, r, cctx, topVer, nodeId, qryInfo); + else { // Try to reserve partitions one by one. + int partsCnt = cctx.affinity().partitions(); + + if (cctx.isReplicated()) { // Check all the partitions are in owning state for replicated cache. + if (r == null) { // Check only once. + GridDhtPartitionTopology top = cctx.topology(); + + top.readLock(); + + try { + for (int p = 0; p < partsCnt; p++) { + GridDhtLocalPartition part = partition(top, p); + + // We don't need to reserve partitions because they will not be evicted in replicated caches. + GridDhtPartitionState partState = part != null ? part.state() : null; + + if (partState != OWNING) { + return String.format("Failed to reserve partitions for " + + "query (partition of REPLICATED cache is not in OWNING state) [" + + "localNodeId=%s, rmtNodeId=%s, %s, affTopVer=%s, cacheId=%s, " + + "cacheName=%s, part=%s, partFound=%s, partState=%s]", + ctx.localNodeId(), + nodeId, + qryInfo, + topVer, + cctx.cacheId(), + cctx.name(), + p, + (part != null), + partState + ); + } + } + } + finally { + top.readUnlock(); + } + + // Mark that we checked this replicated cache. + reservations.putIfAbsent(grpKey, REPLICATED_RESERVABLE); + + MTC.span().addLog(() -> "Cache partitions were reserved [cache=" + cctx.name() + + ", partitions=[0.." + partsCnt + ']'); + } + } + else { // Reserve primary partitions for partitioned cache (if no explicit given). + Collection partIds = explicitParts != null ? explicitParts + : cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer); + + int reservedCnt = 0; + + GridDhtPartitionTopology top = cctx.topology(); + + top.readLock(); + + try { + for (int partId : partIds) { + GridDhtLocalPartition part = partition(top, partId); + + GridDhtPartitionState partState = part != null ? part.state() : null; + + if (partState != OWNING) { + if (partState == LOST) { + reserved.forEach(GridReservable::release); + + failQueryOnLostData(cctx, part); + } + else { + return String.format("Failed to reserve partitions " + + "for query (partition of PARTITIONED cache is not found or not in OWNING " + + "state) [localNodeId=%s, rmtNodeId=%s, %s, affTopVer=%s, cacheId=%s, " + + "cacheName=%s, part=%s, partFound=%s, partState=%s]", + ctx.localNodeId(), + nodeId, + qryInfo, + topVer, + cctx.cacheId(), + cctx.name(), + partId, + (part != null), + partState + ); + } + } + + if (!part.reserve()) { + return String.format("Failed to reserve partitions for query " + + "(partition of PARTITIONED cache cannot be reserved) [" + + "localNodeId=%s, rmtNodeId=%s, %s, affTopVer=%s, cacheId=%s, " + + "cacheName=%s, part=%s, partFound=%s, partState=%s]", + ctx.localNodeId(), + nodeId, + qryInfo, + topVer, + cctx.cacheId(), + cctx.name(), + partId, + true, + partState + ); + } + + reserved.add(part); + + reservedCnt++; + } + } + finally { + top.readUnlock(); + } + + MTC.span().addLog(() -> "Cache partitions were reserved [cache=" + cctx.name() + + ", partitions=" + partIds + ", topology=" + topVer + ']'); + + if (explicitParts == null && reservedCnt > 0) { + // We reserved all the primary partitions for cache, attempt to add group reservation. + GridDhtPartitionsReservation grp = new GridDhtPartitionsReservation(topVer, cctx, "SQL"); + + synchronized (this) { + // Double check under lock. + GridReservable grpReservation = reservations.get(grpKey); + + if (grpReservation != null) + return groupPartitionReservation(reserved, grpReservation, cctx, topVer, nodeId, qryInfo); + else { + if (grp.register(reserved.subList(reserved.size() - reservedCnt, reserved.size()))) { + reservations.put(grpKey, grp); + + grp.onPublish(new CI1<>() { + @Override public void apply(GridDhtPartitionsReservation r) { + reservations.remove(grpKey, r); + } + }); + } + } + } + } + } + } + + return null; + } + + /** + * @param cacheName Cache name. + */ + public void onCacheStop(String cacheName) { + // Drop group reservations. + for (PartitionReservationKey grpKey : reservations.keySet()) { + if (F.eq(grpKey.cacheName(), cacheName)) + reservations.remove(grpKey); + } + } + + /** + * @param cctx Cache context. + * @param part Partition. + */ + private static void failQueryOnLostData( + GridCacheContext cctx, + GridDhtLocalPartition part + ) throws IgniteCheckedException { + throw new CacheInvalidStateException("Failed to execute query because cache partition has been " + + "lost [cacheName=" + cctx.name() + ", part=" + part + ']'); + } + + /** + * Cleanup group reservations cache on change affinity version. + */ + @Override public void onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) { + try { + // Must not do anything at the exchange thread. Dispatch to the management thread pool. + ctx.closure().runLocal( + new GridPlainRunnable() { + @Override public void run() { + AffinityTopologyVersion topVer = ctx.cache().context().exchange() + .lastAffinityChangedTopologyVersion(fut.topologyVersion()); + + reservations.forEach((key, r) -> { + if (r != REPLICATED_RESERVABLE && !F.eq(key.topologyVersion(), topVer)) { + assert r instanceof GridDhtPartitionsReservation; + + ((GridDhtPartitionsReservation)r).invalidate(); + } + }); + } + }, + GridIoPolicy.MANAGEMENT_POOL); + } + catch (Throwable e) { + log.error("Unexpected exception on start reservations cleanup."); + ctx.failure().process(new FailureContext(CRITICAL_ERROR, e)); + } + } + + /** */ + private static Collection partsToCollection(int[] explicitParts) { + if (explicitParts == null) + return null; + else if (explicitParts.length == 0) + return Collections.emptyList(); + else { + List partIds = new ArrayList<>(explicitParts.length); + + for (int explicitPart : explicitParts) + partIds.add(explicitPart); + + return partIds; + } + } + + /** */ + private String groupPartitionReservation( + List reserved, + GridReservable grpReservation, + GridCacheContext cctx, + AffinityTopologyVersion topVer, + UUID nodeId, + String qryInfo + ) { + if (grpReservation != REPLICATED_RESERVABLE) { + if (!grpReservation.reserve()) { + return String.format("Failed to reserve partitions for query (group " + + "reservation failed) [localNodeId=%s, rmtNodeId=%s, %s, affTopVer=%s, cacheId=%s, " + + "cacheName=%s]", ctx.localNodeId(), nodeId, qryInfo, topVer, cctx.cacheId(), cctx.name()); + } + + reserved.add(grpReservation); + + MTC.span().addLog(() -> "Cache partitions were reserved " + grpReservation); + } + + return null; + } + + /** + * Mapper fake reservation object for replicated caches. + */ + private static class ReplicatedReservable implements GridReservable { + /** {@inheritDoc} */ + @Override public boolean reserve() { + throw new IllegalStateException(); + } + + /** {@inheritDoc} */ + @Override public void release() { + throw new IllegalStateException(); + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 78b47d4f0fd15..255910bfcf064 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -90,6 +90,7 @@ import org.apache.ignite.internal.processors.cache.QueryCursorImpl; import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservationManager; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture; @@ -308,6 +309,9 @@ public class GridQueryProcessor extends GridProcessorAdapter { /** Global schema SQL views manager. */ private final SchemaSqlViewManager schemaSqlViewMgr; + /** Partition reservation manager. */ + private final PartitionReservationManager partReservationMgr; + /** @see TransactionConfiguration#isTxAwareQueriesEnabled() */ private final boolean txAwareQueriesEnabled; @@ -331,6 +335,8 @@ public GridQueryProcessor(GridKernalContext ctx) throws IgniteCheckedException { schemaSqlViewMgr = new SchemaSqlViewManager(ctx); + partReservationMgr = new PartitionReservationManager(ctx); + idxProc = ctx.indexProcessor(); idxQryPrc = new IndexQueryProcessor(idxProc); @@ -1060,6 +1066,11 @@ public RunningQueryManager runningQueryManager() throws IgniteException { return runningQryMgr; } + /** Partition reservation manager. */ + public PartitionReservationManager partitionReservationManager() { + return partReservationMgr; + } + /** * Create type descriptors from schema and initialize indexing for given cache.

* Use with {@link #busyLock} where appropriate. @@ -1327,6 +1338,8 @@ public void onClientCacheStop(GridCacheContextInfo cacheInfo) { try { if (schemaMgr.clearCacheContext(cacheInfo.cacheContext())) { + partReservationMgr.onCacheStop(cacheInfo.name()); + if (idx != null) idx.unregisterCache(cacheInfo); } @@ -2448,6 +2461,8 @@ public void onCacheStop0(GridCacheContextInfo cacheInfo, boolean destroy, boolea schemaMgr.onCacheStopped(cacheName, destroy, clearIdx); + partReservationMgr.onCacheStop(cacheName); + // Notify indexing. if (idx != null) idx.unregisterCache(cacheInfo); diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index fa0804b555a9a..86ee2d79e8ad3 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -65,6 +65,7 @@ import org.apache.ignite.internal.processors.cache.QueryCursorImpl; import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteClusterReadOnlyException; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservationManager; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; @@ -97,7 +98,6 @@ import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement; import org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor; import org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor; -import org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservationManager; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest; @@ -1565,7 +1565,7 @@ public GridReduceQueryExecutor reduceQueryExecutor() { this.ctx = ctx; - partReservationMgr = new PartitionReservationManager(ctx); + partReservationMgr = ctx.query().partitionReservationManager(); connMgr = new ConnectionManager(ctx); @@ -1839,10 +1839,6 @@ private JavaObjectSerializer h2Serializer() { /** {@inheritDoc} */ @Override public void unregisterCache(GridCacheContextInfo cacheInfo) { - String cacheName = cacheInfo.name(); - - partReservationMgr.onCacheStop(cacheName); - // Unregister connection. connMgr.onCacheDestroyed(); diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContext.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContext.java index 8930a89fd8cda..1b0d4511f0ffa 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContext.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContext.java @@ -18,8 +18,8 @@ package org.apache.ignite.internal.processors.query.h2.opt; import java.util.Objects; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservation; import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinContext; -import org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservation; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.spi.indexing.IndexingQueryFilter; import org.jetbrains.annotations.Nullable; diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index 4bd579a7eb4fe..a3b5d3a9c9003 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -49,6 +49,7 @@ import org.apache.ignite.internal.metric.IoStatisticsQueryHelper; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservation; import org.apache.ignite.internal.processors.cache.query.CacheQueryType; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationManager.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationManager.java deleted file mode 100644 index d092c3cd2d811..0000000000000 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationManager.java +++ /dev/null @@ -1,375 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.query.h2.twostep; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.failure.FailureContext; -import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.managers.communication.GridIoPolicy; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheInvalidStateException; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware; -import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; -import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; -import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionsReservation; -import org.apache.ignite.internal.processors.tracing.MTC; -import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings; -import org.apache.ignite.internal.util.lang.GridPlainRunnable; -import org.apache.ignite.internal.util.typedef.CI1; -import org.apache.ignite.internal.util.typedef.F; -import org.jetbrains.annotations.Nullable; - -import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; -import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE; -import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST; -import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING; -import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_PARTITIONS_RESERVE; - -/** - * Class responsible for partition reservation for queries executed on local node. Prevents partitions from being - * evicted from node during query execution. - */ -public class PartitionReservationManager implements PartitionsExchangeAware { - /** Special instance of reservable object for REPLICATED caches. */ - private static final ReplicatedReservable REPLICATED_RESERVABLE = new ReplicatedReservable(); - - /** Kernal context. */ - private final GridKernalContext ctx; - - /** - * Group reservations cache. When affinity version is not changed and all primary partitions must be reserved we get - * group reservation from this map instead of create new reservation group. - */ - private final ConcurrentMap reservations = new ConcurrentHashMap<>(); - - /** Logger. */ - private final IgniteLogger log; - - /** - * Constructor. - * - * @param ctx Context. - */ - public PartitionReservationManager(GridKernalContext ctx) { - this.ctx = ctx; - - log = ctx.log(PartitionReservationManager.class); - - ctx.cache().context().exchange().registerExchangeAwareComponent(this); - } - - /** - * @param cctx Cache context. - * @param p Partition ID. - * @return Partition. - */ - private static GridDhtLocalPartition partition(GridCacheContext cctx, int p) { - return cctx.topology().localPartition(p, NONE, false); - } - - /** - * @param cacheIds Cache IDs. - * @param reqTopVer Topology version from request. - * @param explicitParts Explicit partitions list. - * @param nodeId Node ID. - * @param reqId Request ID. - * @return String which is null in case of success or with causeMessage if failed - * @throws IgniteCheckedException If failed. - */ - public PartitionReservation reservePartitions( - @Nullable List cacheIds, - AffinityTopologyVersion reqTopVer, - final int[] explicitParts, - UUID nodeId, - long reqId - ) throws IgniteCheckedException { - try (TraceSurroundings ignored = MTC.support(ctx.tracing().create(SQL_PARTITIONS_RESERVE, MTC.span()))) { - assert reqTopVer != null; - - AffinityTopologyVersion topVer = ctx.cache().context().exchange().lastAffinityChangedTopologyVersion(reqTopVer); - - if (F.isEmpty(cacheIds)) - return new PartitionReservation(Collections.emptyList()); - - Collection partIds; - - if (explicitParts == null) - partIds = null; - else if (explicitParts.length == 0) - partIds = Collections.emptyList(); - else { - partIds = new ArrayList<>(explicitParts.length); - - for (int explicitPart : explicitParts) - partIds.add(explicitPart); - } - - List reserved = new ArrayList<>(); - - for (int i = 0; i < cacheIds.size(); i++) { - GridCacheContext cctx = ctx.cache().context().cacheContext(cacheIds.get(i)); - - // Cache was not found, probably was not deployed yet. - if (cctx == null) { - return new PartitionReservation(reserved, - String.format("Failed to reserve partitions for query (cache is not " + - "found on local node) [localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s]", - ctx.localNodeId(), nodeId, reqId, topVer, cacheIds.get(i))); - } - - if (!cctx.rebalanceEnabled()) - continue; - - // For replicated cache topology version does not make sense. - final PartitionReservationKey grpKey = new PartitionReservationKey(cctx.name(), cctx.isReplicated() ? null : topVer); - - GridReservable r = reservations.get(grpKey); - - if (explicitParts == null && r != null) { // Try to reserve group partition if any and no explicits. - if (r != REPLICATED_RESERVABLE) { - if (!r.reserve()) - return new PartitionReservation(reserved, - String.format("Failed to reserve partitions for query (group " + - "reservation failed) [localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, " + - "cacheName=%s]", ctx.localNodeId(), nodeId, reqId, topVer, cacheIds.get(i), cctx.name())); - - reserved.add(r); - - MTC.span().addLog(() -> "Cache partitions were reserved " + r); - } - } - else { // Try to reserve partitions one by one. - int partsCnt = cctx.affinity().partitions(); - - if (cctx.isReplicated()) { // Check all the partitions are in owning state for replicated cache. - if (r == null) { // Check only once. - for (int p = 0; p < partsCnt; p++) { - GridDhtLocalPartition part = partition(cctx, p); - - // We don't need to reserve partitions because they will not be evicted in replicated caches. - GridDhtPartitionState partState = part != null ? part.state() : null; - - if (partState != OWNING) - return new PartitionReservation(reserved, - String.format("Failed to reserve partitions for " + - "query (partition of REPLICATED cache is not in OWNING state) [" + - "localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, " + - "cacheName=%s, part=%s, partFound=%s, partState=%s]", - ctx.localNodeId(), - nodeId, - reqId, - topVer, - cacheIds.get(i), - cctx.name(), - p, - (part != null), - partState - )); - } - - // Mark that we checked this replicated cache. - reservations.putIfAbsent(grpKey, REPLICATED_RESERVABLE); - - MTC.span().addLog(() -> "Cache partitions were reserved [cache=" + cctx.name() + - ", partitions=[0.." + partsCnt + ']'); - } - } - else { // Reserve primary partitions for partitioned cache (if no explicit given). - if (explicitParts == null) - partIds = cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer); - - int reservedCnt = 0; - - for (int partId : partIds) { - GridDhtLocalPartition part = partition(cctx, partId); - - GridDhtPartitionState partState = part != null ? part.state() : null; - - if (partState != OWNING) { - if (partState == LOST) - failQueryOnLostData(cctx, part); - else { - return new PartitionReservation(reserved, - String.format("Failed to reserve partitions " + - "for query (partition of PARTITIONED cache is not found or not in OWNING " + - "state) [localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, " + - "cacheName=%s, part=%s, partFound=%s, partState=%s]", - ctx.localNodeId(), - nodeId, - reqId, - topVer, - cacheIds.get(i), - cctx.name(), - partId, - (part != null), - partState - )); - } - } - - if (!part.reserve()) { - return new PartitionReservation(reserved, - String.format("Failed to reserve partitions for query " + - "(partition of PARTITIONED cache cannot be reserved) [" + - "localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, " + - "cacheName=%s, part=%s, partFound=%s, partState=%s]", - ctx.localNodeId(), - nodeId, - reqId, - topVer, - cacheIds.get(i), - cctx.name(), - partId, - true, - partState - )); - } - - reserved.add(part); - - reservedCnt++; - - // Double check that we are still in owning state and partition contents are not cleared. - partState = part.state(); - - if (partState != OWNING) { - if (partState == LOST) - failQueryOnLostData(cctx, part); - else { - return new PartitionReservation(reserved, - String.format("Failed to reserve partitions for " + - "query (partition of PARTITIONED cache is not in OWNING state after " + - "reservation) [localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, " + - "cacheId=%s, cacheName=%s, part=%s, partState=%s]", - ctx.localNodeId(), - nodeId, - reqId, - topVer, - cacheIds.get(i), - cctx.name(), - partId, - partState - )); - } - } - } - - final Collection finalPartIds = partIds; - - MTC.span().addLog(() -> "Cache partitions were reserved [cache=" + cctx.name() + - ", partitions=" + finalPartIds + ", topology=" + topVer + ']'); - - if (explicitParts == null && reservedCnt > 0) { - // We reserved all the primary partitions for cache, attempt to add group reservation. - GridDhtPartitionsReservation grp = new GridDhtPartitionsReservation(topVer, cctx, "SQL"); - - if (grp.register(reserved.subList(reserved.size() - reservedCnt, reserved.size()))) { - if (reservations.putIfAbsent(grpKey, grp) != null) - throw new IllegalStateException("Reservation already exists."); - - grp.onPublish(new CI1() { - @Override public void apply(GridDhtPartitionsReservation r) { - reservations.remove(grpKey, r); - } - }); - } - } - } - } - } - - return new PartitionReservation(reserved); - } - } - - /** - * @param cacheName Cache name. - */ - public void onCacheStop(String cacheName) { - // Drop group reservations. - for (PartitionReservationKey grpKey : reservations.keySet()) { - if (F.eq(grpKey.cacheName(), cacheName)) - reservations.remove(grpKey); - } - } - - /** - * @param cctx Cache context. - * @param part Partition. - */ - private static void failQueryOnLostData(GridCacheContext cctx, GridDhtLocalPartition part) - throws IgniteCheckedException { - throw new CacheInvalidStateException("Failed to execute query because cache partition has been " + - "lost [cacheName=" + cctx.name() + ", part=" + part + ']'); - } - - /** - * Cleanup group reservations cache on change affinity version. - */ - @Override public void onDoneAfterTopologyUnlock(final GridDhtPartitionsExchangeFuture fut) { - try { - // Must not do anything at the exchange thread. Dispatch to the management thread pool. - ctx.closure().runLocal( - new GridPlainRunnable() { - @Override public void run() { - AffinityTopologyVersion topVer = ctx.cache().context().exchange() - .lastAffinityChangedTopologyVersion(fut.topologyVersion()); - - reservations.forEach((key, r) -> { - if (r != REPLICATED_RESERVABLE && !F.eq(key.topologyVersion(), topVer)) { - assert r instanceof GridDhtPartitionsReservation; - - ((GridDhtPartitionsReservation)r).invalidate(); - } - }); - } - }, - GridIoPolicy.MANAGEMENT_POOL); - } - catch (Throwable e) { - log.error("Unexpected exception on start reservations cleanup."); - ctx.failure().process(new FailureContext(CRITICAL_ERROR, e)); - } - } - - /** - * Mapper fake reservation object for replicated caches. - */ - private static class ReplicatedReservable implements GridReservable { - /** {@inheritDoc} */ - @Override public boolean reserve() { - throw new IllegalStateException(); - } - - /** {@inheritDoc} */ - @Override public void release() { - throw new IllegalStateException(); - } - } -} diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java index 0c30057fd8f54..70847fabff366 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java @@ -69,11 +69,11 @@ import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservation; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservationManager; import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; import org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor; -import org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservation; -import org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservationManager; import org.apache.ignite.internal.processors.query.h2.twostep.ReducePartitionMapResult; import org.apache.ignite.internal.processors.query.h2.twostep.ReducePartitionMapper; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest; diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java index 0f5491934e9f1..7b3c83e5bc949 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java @@ -38,6 +38,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservationKey; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservationManager; import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest; import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx; import org.apache.ignite.internal.processors.query.GridQueryProcessor; From a37329185366aab227956ff2fd4144fdc5ce5214 Mon Sep 17 00:00:00 2001 From: Nikolay Date: Fri, 27 Dec 2024 22:55:50 +0300 Subject: [PATCH 2/2] IGNITE-15803 Remove "instanceof BinaryMarshaller" from tests (#11780) --- .../internal/jdbc2/JdbcResultSetSelfTest.java | 4 +- .../store/jdbc/CacheJdbcPojoStoreTest.java | 140 ++++++------------ .../CacheEntryProcessorCopySelfTest.java | 4 +- .../CacheEnumOperationsAbstractTest.java | 14 +- .../CacheStartupInDeploymentModesTest.java | 10 -- ...ridCacheConditionalDeploymentSelfTest.java | 43 +----- .../cache/IgniteCacheGroupsTest.java | 8 +- ...heReadThroughEvictionsVariationsSuite.java | 10 +- .../IgniteCacheStoreValueAbstractTest.java | 43 +----- .../GridCacheReplicatedPreloadSelfTest.java | 34 +---- .../GridSessionCheckpointSelfTest.java | 10 +- ...CacheConfigVariationsFullApiTestSuite.java | 13 -- ...gniteCacheAbstractFieldsQuerySelfTest.java | 44 +----- ...teCacheAbstractInsertSqlQuerySelfTest.java | 67 ++------- .../IgniteCacheAbstractQuerySelfTest.java | 41 +++-- ...gniteCacheAbstractSqlDmlQuerySelfTest.java | 51 +------ ...niteCacheJoinQueryWithAffinityKeyTest.java | 5 +- 17 files changed, 107 insertions(+), 434 deletions(-) diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcResultSetSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcResultSetSelfTest.java index 1470a5cfcf3bf..536744a519526 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcResultSetSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcResultSetSelfTest.java @@ -42,7 +42,6 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.ConnectorConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.testframework.GridTestUtils; @@ -820,8 +819,7 @@ public void testUrl() throws Exception { @Test public void testObject() throws Exception { final Ignite ignite = ignite(0); - final boolean binaryMarshaller = ignite.configuration().getMarshaller() instanceof BinaryMarshaller; - final IgniteBinary binary = binaryMarshaller ? ignite.binary() : null; + final IgniteBinary binary = ignite.binary(); ResultSet rs = stmt.executeQuery(SQL); TestObjectField f1 = new TestObjectField(100, "AAAA"); diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java index a1b02a75126bf..a3760ab032dbc 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java @@ -48,7 +48,6 @@ import org.apache.ignite.cache.store.jdbc.model.Person; import org.apache.ignite.cache.store.jdbc.model.PersonComplexKey; import org.apache.ignite.cache.store.jdbc.model.PersonKey; -import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.processors.cache.CacheEntryImpl; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.internal.U; @@ -74,9 +73,6 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest c = new CI2() { @Override public void apply(Object k, Object v) { - if (binaryEnable) { - if (k instanceof BinaryObject && v instanceof BinaryObject) { - BinaryObject key = (BinaryObject)k; - BinaryObject val = (BinaryObject)v; - - String keyType = key.type().typeName(); - String valType = val.type().typeName(); - - if (OrganizationKey.class.getName().equals(keyType) - && Organization.class.getName().equals(valType)) - orgKeys.add(key); - - if (PersonKey.class.getName().equals(keyType) - && Person.class.getName().equals(valType)) - prnKeys.add(key); - - if (PersonComplexKey.class.getName().equals(keyType) - && Person.class.getName().equals(valType)) - prnComplexKeys.add(key); - - if (BinaryTestKey.class.getName().equals(keyType) - && BinaryTest.class.getName().equals(valType)) - binaryTestVals.add(val.field("bytes")); - } - } - else { - if (k instanceof OrganizationKey && v instanceof Organization) - orgKeys.add(k); - else if (k instanceof PersonKey && v instanceof Person) - prnKeys.add(k); - else if (k instanceof BinaryTestKey && v instanceof BinaryTest) - binaryTestVals.add(((BinaryTest)v).getBytes()); - else if (k instanceof PersonComplexKey && v instanceof Person) { - PersonComplexKey key = (PersonComplexKey)k; - - Person val = (Person)v; - - assertTrue("Key ID should be the same as value ID", key.getId() == val.getId()); - assertTrue("Key orgID should be the same as value orgID", key.getOrgId() == val.getOrgId()); - assertEquals("name" + key.getId(), val.getName()); - - prnComplexKeys.add(k); - } + if (k instanceof BinaryObject && v instanceof BinaryObject) { + BinaryObject key = (BinaryObject)k; + BinaryObject val = (BinaryObject)v; + + String keyType = key.type().typeName(); + String valType = val.type().typeName(); + + if (OrganizationKey.class.getName().equals(keyType) + && Organization.class.getName().equals(valType)) + orgKeys.add(key); + + if (PersonKey.class.getName().equals(keyType) + && Person.class.getName().equals(valType)) + prnKeys.add(key); + + if (PersonComplexKey.class.getName().equals(keyType) + && Person.class.getName().equals(valType)) + prnComplexKeys.add(key); + + if (BinaryTestKey.class.getName().equals(keyType) + && BinaryTest.class.getName().equals(valType)) + binaryTestVals.add(val.field("bytes")); } } }; @@ -505,24 +478,16 @@ public void testParallelLoad() throws Exception { IgniteBiInClosure c = new CI2() { @Override public void apply(Object k, Object v) { - if (binaryEnable) { - if (k instanceof BinaryObject && v instanceof BinaryObject) { - BinaryObject key = (BinaryObject)k; - BinaryObject val = (BinaryObject)v; - - String keyType = key.type().typeName(); - String valType = val.type().typeName(); - - if (PersonComplexKey.class.getName().equals(keyType) - && Person.class.getName().equals(valType)) - prnComplexKeys.add(key); - } - } - else { - if (k instanceof PersonComplexKey && v instanceof Person) - prnComplexKeys.add(k); - else - fail("Unexpected entry [key=" + k + ", value=" + v + "]"); + if (k instanceof BinaryObject && v instanceof BinaryObject) { + BinaryObject key = (BinaryObject)k; + BinaryObject val = (BinaryObject)v; + + String keyType = key.type().typeName(); + String valType = val.type().typeName(); + + if (PersonComplexKey.class.getName().equals(keyType) + && Person.class.getName().equals(valType)) + prnComplexKeys.add(key); } } }; @@ -648,24 +613,13 @@ public void testLoadCacheLobFields() throws Exception { IgniteBiInClosure c = new CI2() { @Override public void apply(Object k, Object v) { - if (binaryEnable) { - assertTrue(k instanceof BinaryObject); - assertTrue(v instanceof BinaryObject); - - BinaryObject val = (BinaryObject)v; + assertTrue(k instanceof BinaryObject); + assertTrue(v instanceof BinaryObject); - assertTrue(Arrays.equals(picture, val.field("picture"))); - assertEquals(longDescription, val.field("description")); - } - else { - assertTrue(k instanceof LogoKey); - assertTrue(v instanceof Logo); - - Logo val = (Logo)v; + BinaryObject val = (BinaryObject)v; - assertTrue(Arrays.equals(picture, val.getPicture())); - assertEquals(longDescription, val.getDescription()); - } + assertTrue(Arrays.equals(picture, val.field("picture"))); + assertEquals(longDescription, val.field("description")); } }; @@ -720,23 +674,19 @@ public void testLobWrite() throws Exception { * @param obj Object. */ private Object wrap(Object obj) throws IllegalAccessException { - if (binaryEnable) { - Class cls = obj.getClass(); + Class cls = obj.getClass(); - BinaryObjectBuilder builder = ig.binary().builder(cls.getName()); + BinaryObjectBuilder builder = ig.binary().builder(cls.getName()); - for (Field f : cls.getDeclaredFields()) { - if (f.getName().contains("serialVersionUID")) - continue; + for (Field f : cls.getDeclaredFields()) { + if (f.getName().contains("serialVersionUID")) + continue; - f.setAccessible(true); - - builder.setField(f.getName(), f.get(obj)); - } + f.setAccessible(true); - return builder.build(); + builder.setField(f.getName(), f.get(obj)); } - return obj; + return builder.build(); } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheEntryProcessorCopySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheEntryProcessorCopySelfTest.java index 9902d03b1dae4..4bb57665a7797 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheEntryProcessorCopySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheEntryProcessorCopySelfTest.java @@ -30,7 +30,6 @@ import org.apache.ignite.cache.CacheEntryProcessor; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -102,8 +101,7 @@ private void doTestMutableEntry(boolean p2pEnabled) throws Exception { // One deserialization due to copyOnRead == true. // Additional deserialization in case p2p enabled and not BinaryMarshaller due to storeValue == true on update entry. - doTest(true, true, NEW_VAL, p2pEnabled && - !(grid.configuration().getMarshaller() instanceof BinaryMarshaller) ? 2 : 1); + doTest(true, true, NEW_VAL, 1); // No deserialization. doTest(false, false, NEW_VAL, 0); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheEnumOperationsAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheEnumOperationsAbstractTest.java index 22710b6fa9121..1d80d6ea049a9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheEnumOperationsAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheEnumOperationsAbstractTest.java @@ -26,8 +26,6 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.binary.BinaryEnumObjectImpl; -import org.apache.ignite.internal.binary.BinaryMarshaller; -import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; @@ -187,15 +185,11 @@ private void enumOperations(IgniteCache cache, int key) { * @param expVal Expected value. */ private static void assertBinaryEnum(IgniteCache cache, int key, TestEnum expVal) { - Marshaller marsh = ((IgniteCacheProxy)cache).context().marshaller(); + BinaryObject enumObj = (BinaryObject)cache.withKeepBinary().get(key); - if (marsh instanceof BinaryMarshaller) { - BinaryObject enumObj = (BinaryObject)cache.withKeepBinary().get(key); - - assertEquals(expVal.ordinal(), enumObj.enumOrdinal()); - assertTrue(enumObj.type().isEnum()); - assertTrue(enumObj instanceof BinaryEnumObjectImpl); - } + assertEquals(expVal.ordinal(), enumObj.enumOrdinal()); + assertTrue(enumObj.type().isEnum()); + assertTrue(enumObj instanceof BinaryEnumObjectImpl); } /** diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStartupInDeploymentModesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStartupInDeploymentModesTest.java index f1c341302faa5..66d3e23638e62 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStartupInDeploymentModesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStartupInDeploymentModesTest.java @@ -22,8 +22,6 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DeploymentMode; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.binary.BinaryMarshaller; -import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; @@ -40,14 +38,10 @@ public class CacheStartupInDeploymentModesTest extends GridCommonAbstractTest { /** */ private DeploymentMode deploymentMode; - /** */ - private Marshaller marshaller; - /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - cfg.setMarshaller(marshaller); cfg.setDeploymentMode(deploymentMode); CacheConfiguration cacheCfg1 = new CacheConfiguration(DEFAULT_CACHE_NAME); @@ -76,7 +70,6 @@ public class CacheStartupInDeploymentModesTest extends GridCommonAbstractTest { @Test public void testStartedInIsolatedMode() throws Exception { deploymentMode = DeploymentMode.ISOLATED; - marshaller = new BinaryMarshaller(); doCheckStarted(deploymentMode); } @@ -87,7 +80,6 @@ public void testStartedInIsolatedMode() throws Exception { @Test public void testStartedInPrivateMode() throws Exception { deploymentMode = DeploymentMode.PRIVATE; - marshaller = new BinaryMarshaller(); doCheckStarted(deploymentMode); } @@ -103,8 +95,6 @@ private void doCheckStarted(DeploymentMode mode) throws Exception { assertEquals(mode, ignite(0).configuration().getDeploymentMode()); - assert ignite(0).configuration().getMarshaller() instanceof BinaryMarshaller; - IgniteCache rCache = ignite(0).cache(REPLICATED_CACHE); checkPutCache(rCache); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java index 6d35499b40598..d1533d46ff5a4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java @@ -21,7 +21,6 @@ import org.apache.ignite.Ignition; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.plugin.AbstractTestPluginProvider; import org.apache.ignite.plugin.ExtensionRegistry; @@ -108,47 +107,7 @@ public void testNoDeploymentInfo() throws Exception { public void testAddedDeploymentInfo() throws Exception { GridCacheContext ctx = cacheContext(); - if (grid(0).configuration().getMarshaller() instanceof BinaryMarshaller) - assertFalse(ctx.deploymentEnabled()); - else { - GridCacheIoManager ioMgr = cacheIoManager(); - - TestMessage msg = new TestMessage(); - - assertNull(msg.deployInfo()); - - msg.addDepInfo = true; - - IgniteUtils.invoke(GridCacheIoManager.class, ioMgr, "onSend", msg, grid(1).cluster().localNode().id()); - - assertNotNull(msg.deployInfo()); - } - } - - /** - * @throws Exception In case of error. - */ - @Test - public void testAddedDeploymentInfo2() throws Exception { - GridCacheContext ctx = cacheContext(); - - if (grid(0).configuration().getMarshaller() instanceof BinaryMarshaller) - assertFalse(ctx.deploymentEnabled()); - else { - assertTrue(ctx.deploymentEnabled()); - - GridCacheIoManager ioMgr = cacheIoManager(); - - TestMessage msg = new TestMessage(); - - assertNull(msg.deployInfo()); - - msg.addDepInfo = false; - - IgniteUtils.invoke(GridCacheIoManager.class, ioMgr, "onSend", msg, grid(1).cluster().localNode().id()); - - assertNull(msg.deployInfo()); - } + assertFalse(ctx.deploymentEnabled()); } /** diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java index 79ad985183e71..360d571e3a595 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java @@ -77,7 +77,6 @@ import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; -import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicyFactory; @@ -3460,12 +3459,7 @@ private void checkAffinityMappers(Ignite node) { assertEquals(i + 10, aff2.partition(k)); assertEquals(i + 10, aff4.partition(k)); - int part; - - if (node.configuration().getMarshaller() instanceof BinaryMarshaller) - part = func.partition(node.binary().toBinary(k)); - else - part = func.partition(k); + int part = func.partition(node.binary().toBinary(k)); assertEquals(part, aff5.partition(k)); assertEquals(part, aff6.partition(k)); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheReadThroughEvictionsVariationsSuite.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheReadThroughEvictionsVariationsSuite.java index c180ea66f2265..949dfad16f3ec 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheReadThroughEvictionsVariationsSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheReadThroughEvictionsVariationsSuite.java @@ -19,7 +19,6 @@ import java.util.List; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.testframework.configvariations.ConfigVariationsTestSuiteBuilder; import org.apache.ignite.testframework.junits.DynamicSuite; @@ -34,15 +33,8 @@ public static List> suite() { .withBasicCacheParams() .withIgniteConfigFilters(new IgnitePredicate() { /** {@inheritDoc} */ - @SuppressWarnings("RedundantIfStatement") @Override public boolean apply(IgniteConfiguration cfg) { - if (cfg.getMarshaller() != null && !(cfg.getMarshaller() instanceof BinaryMarshaller)) - return false; - - if (cfg.isPeerClassLoadingEnabled()) - return false; - - return true; + return !cfg.isPeerClassLoadingEnabled(); } }) .skipWaitPartitionMapExchange() diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStoreValueAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStoreValueAbstractTest.java index 770f749fc2a22..f48f929fd7f63 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStoreValueAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStoreValueAbstractTest.java @@ -27,7 +27,6 @@ import javax.cache.integration.CacheLoaderException; import javax.cache.integration.CacheWriterException; import javax.cache.processor.MutableEntry; -import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.cache.CacheEntryProcessor; @@ -38,13 +37,13 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.binary.BinaryObjectImpl; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.testframework.GridTestUtils; import org.junit.Test; + import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; /** @@ -257,20 +256,6 @@ private void checkNoValue(Affinity aff, Object key) { assertNotNull(keyObj); - if (!isBinaryMarshallerUsed(ig)) { - assertNotNull("Unexpected value, node: " + g, - GridTestUtils.getFieldValue(keyObj, CacheObjectAdapter.class, "val")); - - Object key0 = keyObj.value(cache0.context().cacheObjectContext(), true); - Object key1 = keyObj.value(cache0.context().cacheObjectContext(), false); - Object key2 = keyObj.value(cache0.context().cacheObjectContext(), true); - Object key3 = keyObj.value(cache0.context().cacheObjectContext(), false); - - assertSame(key0, key1); - assertSame(key1, key2); - assertSame(key2, key3); - } - CacheObject obj = e.rawGet(); if (obj != null) { @@ -390,26 +375,9 @@ private void checkHasValue(Affinity aff, Object key) { assertNotNull(keyObj); - if (!isBinaryMarshallerUsed(ig)) { - assertNotNull("Unexpected value, node: " + g, - GridTestUtils.getFieldValue(keyObj, CacheObjectAdapter.class, "val")); - - Object key0 = keyObj.value(cache0.context().cacheObjectContext(), true); - Object key1 = keyObj.value(cache0.context().cacheObjectContext(), false); - Object key2 = keyObj.value(cache0.context().cacheObjectContext(), true); - Object key3 = keyObj.value(cache0.context().cacheObjectContext(), false); - - assertSame(key0, key1); - assertSame(key1, key2); - assertSame(key2, key3); - } - CacheObject obj = e.rawGet(); if (obj != null) { - if (!isBinaryMarshallerUsed(ig)) - assertNotNull("Unexpected value, node: " + g, reflectiveValue(obj)); - Object val0 = obj.value(cache0.context().cacheObjectContext(), true); assertNotNull("Unexpected value after value() requested1: " + g, reflectiveValue(obj)); @@ -434,15 +402,6 @@ private void checkHasValue(Affinity aff, Object key) { } } - /** - * @param ig Ignite. - * @return If binary marshaller is used. - */ - private boolean isBinaryMarshallerUsed(Ignite ig) { - return ig.configuration().getMarshaller() == null || - ig.configuration().getMarshaller() instanceof BinaryMarshaller; - } - /** * @param obj Object to extract value from. * @return Cache object. diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java index 0cd0ba9f437f0..bb82bf0008740 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java @@ -43,7 +43,6 @@ import org.apache.ignite.events.EventType; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.binary.BinaryEnumObjectImpl; -import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.util.lang.GridAbsPredicate; @@ -81,9 +80,6 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest { /** */ private volatile boolean extClassloadingAtCfg = false; - /** */ - private volatile boolean useExtClassLoader = false; - /** Disable p2p. */ private volatile boolean disableP2p = false; @@ -121,9 +117,7 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest { if (disableP2p) cfg.setPeerClassLoadingEnabled(false); - if (getTestIgniteInstanceName(1).equals(igniteInstanceName) || useExtClassLoader || - cfg.getMarshaller() instanceof BinaryMarshaller) - cfg.setClassLoader(getExternalClassLoader()); + cfg.setClassLoader(getExternalClassLoader()); if (cutromEvt) { int[] evts = new int[EVTS_ALL.length + 1]; @@ -320,8 +314,6 @@ public void testDeployment() throws Exception { assert v2 != null; assert v2.toString().equals(v1.toString()); assert !v2.getClass().getClassLoader().equals(getClass().getClassLoader()); - assert v2.getClass().getClassLoader().getClass().getName().contains("GridDeploymentClassLoader") || - grid(2).configuration().getMarshaller() instanceof BinaryMarshaller; Object e1 = ldr.loadClass("org.apache.ignite.tests.p2p.CacheDeploymentTestEnumValue").getEnumConstants()[0]; @@ -329,19 +321,15 @@ public void testDeployment() throws Exception { Object e2 = cache2.get(2); - if (g1.configuration().getMarshaller() instanceof BinaryMarshaller) { - BinaryObject enumObj = (BinaryObject)cache2.withKeepBinary().get(2); + BinaryObject enumObj = (BinaryObject)cache2.withKeepBinary().get(2); - assertEquals(0, enumObj.enumOrdinal()); - assertTrue(enumObj.type().isEnum()); - assertTrue(enumObj instanceof BinaryEnumObjectImpl); - } + assertEquals(0, enumObj.enumOrdinal()); + assertTrue(enumObj.type().isEnum()); + assertTrue(enumObj instanceof BinaryEnumObjectImpl); assert e2 != null; assert e2.toString().equals(e1.toString()); assert !e2.getClass().getClassLoader().equals(getClass().getClassLoader()); - assert e2.getClass().getClassLoader().getClass().getName().contains("GridDeploymentClassLoader") || - grid(2).configuration().getMarshaller() instanceof BinaryMarshaller; stopGrid(1); @@ -358,8 +346,6 @@ public void testDeployment() throws Exception { assert v3 != null; assert v3.toString().equals(v1.toString()); assert !v3.getClass().getClassLoader().equals(getClass().getClassLoader()); - assert v3.getClass().getClassLoader().getClass().getName().contains("GridDeploymentClassLoader") || - grid(3).configuration().getMarshaller() instanceof BinaryMarshaller; } finally { stopAllGrids(); @@ -373,7 +359,6 @@ public void testDeployment() throws Exception { public void testExternalClassesAtConfiguration() throws Exception { try { extClassloadingAtCfg = true; - useExtClassLoader = true; Ignite g1 = startGrid(1); @@ -415,7 +400,6 @@ public void testExternalClassesAtConfiguration() throws Exception { } finally { extClassloadingAtCfg = false; - useExtClassLoader = false; } } @@ -426,7 +410,6 @@ public void testExternalClassesAtConfiguration() throws Exception { public void testExternalClassesAtConfigurationDynamicStart() throws Exception { try { extClassloadingAtCfg = false; - useExtClassLoader = true; Ignite g1 = startGrid(1); Ignite g2 = startGrid(2); @@ -451,7 +434,6 @@ public void testExternalClassesAtConfigurationDynamicStart() throws Exception { } finally { extClassloadingAtCfg = false; - useExtClassLoader = false; } } @@ -462,7 +444,6 @@ public void testExternalClassesAtConfigurationDynamicStart() throws Exception { public void testExternalClassesAtConfigurationDynamicStart2() throws Exception { try { extClassloadingAtCfg = false; - useExtClassLoader = true; Ignite g1 = startGrid(1); Ignite g2 = startGrid(2); @@ -487,7 +468,6 @@ public void testExternalClassesAtConfigurationDynamicStart2() throws Exception { } finally { extClassloadingAtCfg = false; - useExtClassLoader = false; } } @@ -497,7 +477,6 @@ public void testExternalClassesAtConfigurationDynamicStart2() throws Exception { @Test public void testExternalClassesAtMessage() throws Exception { try { - useExtClassLoader = true; disableP2p = true; final Class cls = (Class)getExternalClassLoader(). @@ -539,7 +518,6 @@ public void testExternalClassesAtMessage() throws Exception { } finally { - useExtClassLoader = false; disableP2p = false; } } @@ -565,7 +543,6 @@ public void testExternalClassesAtEvent() throws Exception { */ private void testExternalClassesAtEvent0(boolean p2p) throws Exception { try { - useExtClassLoader = true; cutromEvt = true; if (p2p) @@ -592,7 +569,6 @@ private void testExternalClassesAtEvent0(boolean p2p) throws Exception { latch.await(); } finally { - useExtClassLoader = false; cutromEvt = false; if (p2p) diff --git a/modules/core/src/test/java/org/apache/ignite/session/GridSessionCheckpointSelfTest.java b/modules/core/src/test/java/org/apache/ignite/session/GridSessionCheckpointSelfTest.java index b1d7fbb778541..e5ac797dbf8a7 100644 --- a/modules/core/src/test/java/org/apache/ignite/session/GridSessionCheckpointSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/session/GridSessionCheckpointSelfTest.java @@ -98,15 +98,13 @@ public void testCacheCheckpoint() throws Exception { cfg.setCheckpointSpi(spi); - if (cfg.getMarshaller() instanceof BinaryMarshaller) { - BinaryMarshaller marsh = (BinaryMarshaller)cfg.getMarshaller(); + BinaryMarshaller marsh = (BinaryMarshaller)cfg.getMarshaller(); - BinaryContext ctx = new BinaryContext(BinaryCachingMetadataHandler.create(), cfg, new NullLogger()); + BinaryContext ctx = new BinaryContext(BinaryCachingMetadataHandler.create(), cfg, new NullLogger()); - marsh.setContext(new MarshallerContextTestImpl(null)); + marsh.setContext(new MarshallerContextTestImpl(null)); - marsh.setBinaryContext(ctx, cfg); - } + marsh.setBinaryContext(ctx, cfg); GridSessionCheckpointSelfTest.spi = spi; diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/WithKeepBinaryCacheConfigVariationsFullApiTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/WithKeepBinaryCacheConfigVariationsFullApiTestSuite.java index cc072ec311f0f..dc9f289091a1e 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/WithKeepBinaryCacheConfigVariationsFullApiTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/WithKeepBinaryCacheConfigVariationsFullApiTestSuite.java @@ -20,10 +20,7 @@ import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.processors.cache.WithKeepBinaryCacheFullApiTest; -import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.testframework.configvariations.ConfigVariationsTestSuiteBuilder; import org.apache.ignite.testframework.junits.DynamicSuite; import org.junit.runner.RunWith; @@ -39,11 +36,6 @@ public static List> suite() { return Stream.concat( new ConfigVariationsTestSuiteBuilder(WithKeepBinaryCacheFullApiTest.class) .withBasicCacheParams() - .withIgniteConfigFilters(new IgnitePredicate() { - @Override public boolean apply(IgniteConfiguration cfg) { - return cfg.getMarshaller() instanceof BinaryMarshaller; - } - }) .gridsCount(5) .backups(1) .testedNodesCount(3).withClients() @@ -51,11 +43,6 @@ public static List> suite() { new ConfigVariationsTestSuiteBuilder(WithKeepBinaryCacheFullApiTest.class) .withBasicCacheParams() - .withIgniteConfigFilters(new IgnitePredicate() { - @Override public boolean apply(IgniteConfiguration cfg) { - return cfg.getMarshaller() instanceof BinaryMarshaller; - } - }) .gridsCount(5) .backups(1) .testedNodesCount(3).withClients() diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java index 77ab338fd792b..400d24d993ca7 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java @@ -45,7 +45,6 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteKernal; -import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlIndexMetadata; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlMetadata; @@ -86,9 +85,6 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA /** Flag indicating if starting node should have cache. */ protected boolean hasCache; - /** Whether BinaryMarshaller is set. */ - protected boolean binaryMarshaller; - /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); @@ -179,11 +175,6 @@ protected IgniteCache jcache(String name, Class clsK, Class c noOpCache = grid(0).getOrCreateCache("noop"); } - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - binaryMarshaller = grid(0).configuration().getMarshaller() instanceof BinaryMarshaller; - } - /** {@inheritDoc} */ @Override protected void afterTestsStopped() throws Exception { orgCache = null; @@ -230,29 +221,16 @@ public void testCacheMetaData() throws Exception { if (personCache.getName().equals(meta.cacheName())) { assertEquals("Invalid types size", 1, types.size()); assert types.contains("Person"); - - if (binaryMarshaller) { - assert Object.class.getName().equals(meta.keyClass("Person")); - assert Object.class.getName().equals(meta.valueClass("Person")); - } - else { - assert AffinityKey.class.getName().equals(meta.keyClass("Person")); - assert Person.class.getName().equals(meta.valueClass("Person")); - } + assert Object.class.getName().equals(meta.keyClass("Person")); + assert Object.class.getName().equals(meta.valueClass("Person")); Map fields = meta.fields("Person"); assert fields != null; assert fields.size() == 3; - if (binaryMarshaller) { - assert Integer.class.getName().equals(fields.get("AGE")); - assert Integer.class.getName().equals(fields.get("ORGID")); - } - else { - assert int.class.getName().equals(fields.get("AGE")); - assert int.class.getName().equals(fields.get("ORGID")); - } + assert Integer.class.getName().equals(fields.get("AGE")); + assert Integer.class.getName().equals(fields.get("ORGID")); assert String.class.getName().equals(fields.get("NAME")); @@ -289,12 +267,7 @@ public void testCacheMetaData() throws Exception { else if (orgCache.getName().equals(meta.cacheName())) { assertEquals("Invalid types size", 1, types.size()); assert types.contains("Organization"); - - if (binaryMarshaller) - assert Object.class.getName().equals(meta.valueClass("Organization")); - else - assert Organization.class.getName().equals(meta.valueClass("Organization")); - + assert Object.class.getName().equals(meta.valueClass("Organization")); assert String.class.getName().equals(meta.keyClass("Organization")); Map fields = meta.fields("Organization"); @@ -302,12 +275,7 @@ else if (orgCache.getName().equals(meta.cacheName())) { assert fields != null; assertEquals("Fields: " + fields, 2, fields.size()); - if (binaryMarshaller) { - assert Integer.class.getName().equals(fields.get("ID")); - } - else { - assert int.class.getName().equals(fields.get("ID")); - } + assert Integer.class.getName().equals(fields.get("ID")); assert String.class.getName().equals(fields.get("NAME")); } diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractInsertSqlQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractInsertSqlQuerySelfTest.java index 6237b895be0cf..5a249c572489c 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractInsertSqlQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractInsertSqlQuerySelfTest.java @@ -22,7 +22,6 @@ import java.util.Collections; import java.util.LinkedHashMap; import java.util.UUID; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.binary.BinaryObjectBuilder; import org.apache.ignite.binary.BinaryTypeConfiguration; import org.apache.ignite.cache.CacheAtomicityMode; @@ -35,11 +34,7 @@ import org.apache.ignite.configuration.BinaryConfiguration; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.processors.query.QueryUtils; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.marshaller.Marshaller; -import org.apache.ignite.testframework.junits.IgniteTestResources; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import static org.apache.ignite.internal.processors.cache.IgniteCacheUpdateSqlQuerySelfTest.AllTypes; @@ -49,28 +44,6 @@ */ @SuppressWarnings("unchecked") public abstract class IgniteCacheAbstractInsertSqlQuerySelfTest extends GridCommonAbstractTest { - /** */ - protected final Marshaller marsh; - - /** - * - */ - IgniteCacheAbstractInsertSqlQuerySelfTest() { - try { - marsh = IgniteTestResources.getMarshaller(); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - } - - /** - * @return whether {@link #marsh} is an instance of {@link BinaryMarshaller} or not. - */ - boolean isBinaryMarshaller() { - return marsh instanceof BinaryMarshaller; - } - /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); @@ -101,10 +74,7 @@ boolean isBinaryMarshaller() { @Override protected void beforeTest() throws Exception { super.beforeTest(); - if (!isBinaryMarshaller()) - createCaches(); - else - createBinaryCaches(); + createBinaryCaches(); ignite(0).createCache(cacheConfig("I2AT", true, false, Integer.class, AllTypes.class)); } @@ -244,40 +214,23 @@ final void createBinaryCaches() { * */ Object createPerson(int id, String name) { - if (!isBinaryMarshaller()) { - Person p = new Person(id); - p.name = name; - - return p; - } - else { - BinaryObjectBuilder o = grid(0).binary().builder("Person"); - o.setField("id", id); - o.setField("name", name); + BinaryObjectBuilder o = grid(0).binary().builder("Person"); + o.setField("id", id); + o.setField("name", name); - return o.build(); - } + return o.build(); } /** * */ Object createPerson2(int id, String name, int valFld) { - if (!isBinaryMarshaller()) { - Person2 p = new Person2(id); - p.name = name; - p.IntVal = valFld; - - return p; - } - else { - BinaryObjectBuilder o = grid(0).binary().builder("Person2"); - o.setField("id", id); - o.setField("name", name); - o.setField("IntVal", valFld); + BinaryObjectBuilder o = grid(0).binary().builder("Person2"); + o.setField("id", id); + o.setField("name", name); + o.setField("IntVal", valFld); - return o.build(); - } + return o.build(); } /** diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java index d50bc44b2c1f4..fe6a67abf322b 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java @@ -78,7 +78,6 @@ import org.apache.ignite.events.Event; import org.apache.ignite.events.EventType; import org.apache.ignite.events.SqlQueryExecutionEvent; -import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.processors.cache.query.QueryCursorEx; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; import org.apache.ignite.internal.processors.query.QueryUtils; @@ -541,35 +540,33 @@ public void testComplexType() throws Exception { */ @Test public void testComplexTypeKeepBinary() throws Exception { - if (ignite().configuration().getMarshaller() == null || ignite().configuration().getMarshaller() instanceof BinaryMarshaller) { - IgniteCache cache = jcache(Key.class, GridCacheQueryTestValue.class); + IgniteCache cache = jcache(Key.class, GridCacheQueryTestValue.class); - GridCacheQueryTestValue val1 = new GridCacheQueryTestValue(); + GridCacheQueryTestValue val1 = new GridCacheQueryTestValue(); - val1.setField1("field1"); - val1.setField2(1); - val1.setField3(1L); + val1.setField1("field1"); + val1.setField2(1); + val1.setField3(1L); - GridCacheQueryTestValue val2 = new GridCacheQueryTestValue(); + GridCacheQueryTestValue val2 = new GridCacheQueryTestValue(); - val2.setField1("field2"); - val2.setField2(2); - val2.setField3(2L); - val2.setField6(null); + val2.setField1("field2"); + val2.setField2(2); + val2.setField3(2L); + val2.setField6(null); - cache.put(new Key(100500), val1); - cache.put(new Key(100501), val2); + cache.put(new Key(100500), val1); + cache.put(new Key(100501), val2); - QueryCursor> qry = cache.withKeepBinary() - .query(new SqlQuery(GridCacheQueryTestValue.class, - "fieldName='field1' and field2=1 and field3=1 and id=100500 and embeddedField2=11 and x=3")); + QueryCursor> qry = cache.withKeepBinary() + .query(new SqlQuery(GridCacheQueryTestValue.class, + "fieldName='field1' and field2=1 and field3=1 and id=100500 and embeddedField2=11 and x=3")); - Cache.Entry entry = F.first(qry.getAll()); + Cache.Entry entry = F.first(qry.getAll()); - assertNotNull(entry); - assertEquals(Long.valueOf(100500L), entry.getKey().field("id")); - assertEquals(val1, entry.getValue().deserialize()); - } + assertNotNull(entry); + assertEquals(Long.valueOf(100500L), entry.getKey().field("id")); + assertEquals(val1, entry.getValue().deserialize()); } /** diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractSqlDmlQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractSqlDmlQuerySelfTest.java index 72a38b48e00f4..acf21c17abc2c 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractSqlDmlQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractSqlDmlQuerySelfTest.java @@ -21,7 +21,6 @@ import java.util.Collections; import java.util.LinkedHashMap; import org.apache.ignite.IgniteCache; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.binary.BinaryObjectBuilder; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; @@ -31,38 +30,12 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.binary.AbstractBinaryArraysTest; -import org.apache.ignite.internal.binary.BinaryMarshaller; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.marshaller.Marshaller; -import org.apache.ignite.testframework.junits.IgniteTestResources; /** * */ @SuppressWarnings("unchecked") public abstract class IgniteCacheAbstractSqlDmlQuerySelfTest extends AbstractBinaryArraysTest { - /** */ - protected final Marshaller marsh; - - /** - * - */ - IgniteCacheAbstractSqlDmlQuerySelfTest() { - try { - marsh = IgniteTestResources.getMarshaller(); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - } - - /** - * @return whether {@link #marsh} is an instance of {@link BinaryMarshaller} or not. - */ - protected boolean isBinaryMarshaller() { - return marsh instanceof BinaryMarshaller; - } - /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); @@ -77,9 +50,7 @@ protected boolean isBinaryMarshaller() { startGridsMultiThreaded(3, true); ignite(0).createCache(cacheConfig("S2P", true, false).setIndexedTypes(String.class, Person.class)); - - if (isBinaryMarshaller()) - ignite(0).createCache(createBinCacheConfig()); + ignite(0).createCache(createBinCacheConfig()); } /** {@inheritDoc} */ @@ -92,12 +63,10 @@ protected boolean isBinaryMarshaller() { ignite(0).cache("S2P").put("k3", new Person(3, "Sylvia", "Green")); ignite(0).cache("S2P").put("f0u4thk3y", new Person(4, "Jane", "Silver")); - if (isBinaryMarshaller()) { - ignite(0).cache("S2P-bin").put("FirstKey", createBinPerson(1, "John", "White")); - ignite(0).cache("S2P-bin").put("SecondKey", createBinPerson(2, "Joe", "Black")); - ignite(0).cache("S2P-bin").put("k3", createBinPerson(3, "Sylvia", "Green")); - ignite(0).cache("S2P-bin").put("f0u4thk3y", createBinPerson(4, "Jane", "Silver")); - } + ignite(0).cache("S2P-bin").put("FirstKey", createBinPerson(1, "John", "White")); + ignite(0).cache("S2P-bin").put("SecondKey", createBinPerson(2, "Joe", "Black")); + ignite(0).cache("S2P-bin").put("k3", createBinPerson(3, "Sylvia", "Green")); + ignite(0).cache("S2P-bin").put("f0u4thk3y", createBinPerson(4, "Jane", "Silver")); } /** @@ -109,10 +78,7 @@ protected boolean isBinaryMarshaller() { * @return Person. */ Object createPerson(int id, String name, String secondName) { - if (!isBinaryMarshaller()) - return new Person(id, name, secondName); - else - return createBinPerson(id, name, secondName); + return createBinPerson(id, name, secondName); } /** @@ -137,10 +103,7 @@ private Object createBinPerson(int id, String name, String secondName) { * @return Cache. */ protected IgniteCache cache() { - if (!isBinaryMarshaller()) - return ignite(0).cache("S2P"); - else - return ignite(0).cache("S2P-bin").withKeepBinary(); + return ignite(0).cache("S2P-bin").withKeepBinary(); } /** diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryWithAffinityKeyTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryWithAffinityKeyTest.java index 2a0b6402e20f2..ec703cac9f9dd 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryWithAffinityKeyTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryWithAffinityKeyTest.java @@ -36,7 +36,6 @@ import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; @@ -265,12 +264,10 @@ private void checkPersonAccountsJoin(IgniteCache cache, Map cnt Ignite ignite = (Ignite)cache.unwrap(Ignite.class); - boolean binary = ignite.configuration().getMarshaller() instanceof BinaryMarshaller; - long total = 0; for (Map.Entry e : cnts.entrySet()) { - Object arg = binary ? ignite.binary().toBinary(e.getKey()) : e.getKey(); + Object arg = ignite.binary().toBinary(e.getKey()); qry1.setArgs(arg);