diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java index cd6290e9495a7..8a1f1c2055503 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java @@ -46,8 +46,9 @@ import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node; import org.apache.ignite.internal.processors.query.calcite.exec.rel.RootNode; import org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext; +import org.apache.ignite.internal.processors.query.calcite.prepare.ExecutionPlan; +import org.apache.ignite.internal.processors.query.calcite.prepare.FieldsMetadata; import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment; -import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan; import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext; import org.apache.ignite.internal.processors.query.calcite.util.Commons; import org.apache.ignite.internal.processors.query.running.TrackableQuery; @@ -193,14 +194,14 @@ public void mapping() { /** * Starts execution phase for the query and setup remote fragments. */ - public void run(ExecutionContext ctx, MultiStepPlan plan, Node root) { + public void run(ExecutionContext ctx, ExecutionPlan plan, FieldsMetadata metadata, Node root) { synchronized (mux) { if (state == QueryState.CLOSED) throw queryCanceledException(); planningTime = U.currentTimeMillis() - startTs; - RootNode rootNode = new RootNode<>(ctx, plan.fieldsMetadata().rowType(), this::tryClose); + RootNode rootNode = new RootNode<>(ctx, metadata.rowType(), this::tryClose); rootNode.register(root); addFragment(new RunningFragment<>(F.first(plan.fragments()).root(), rootNode, ctx)); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java index e7d1a594b9b99..38667c7c25af9 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java @@ -76,15 +76,14 @@ import org.apache.ignite.internal.processors.query.calcite.message.QueryStartRequest; import org.apache.ignite.internal.processors.query.calcite.message.QueryStartResponse; import org.apache.ignite.internal.processors.query.calcite.metadata.AffinityService; -import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationMappingException; import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription; import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping; -import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMappingException; import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService; import org.apache.ignite.internal.processors.query.calcite.metadata.RemoteException; import org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext; import org.apache.ignite.internal.processors.query.calcite.prepare.CacheKey; import org.apache.ignite.internal.processors.query.calcite.prepare.DdlPlan; +import org.apache.ignite.internal.processors.query.calcite.prepare.ExecutionPlan; import org.apache.ignite.internal.processors.query.calcite.prepare.ExplainPlan; import org.apache.ignite.internal.processors.query.calcite.prepare.FieldsMetadataImpl; import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment; @@ -561,22 +560,11 @@ private ListFieldsQueryCursor mapAndExecutePlan( ) { qry.mapping(); - MappingQueryContext mapCtx = Commons.mapContext(locNodeId, topologyVersion(), qry.context().isLocal()); + MappingQueryContext mapCtx = Commons.mapContext(locNodeId, topologyVersion(), qry.context()); - plan.init(mappingSvc, mapCtx); + ExecutionPlan execPlan = plan.init(mappingSvc, mapCtx); - List fragments = plan.fragments(); - - if (!F.isEmpty(qry.context().partitions())) { - fragments = Commons.transform(fragments, f -> { - try { - return f.filterByPartitions(qry.context().partitions()); - } - catch (ColocationMappingException e) { - throw new FragmentMappingException("Failed to calculate physical distribution", f, f.root(), e); - } - }); - } + List fragments = execPlan.fragments(); // Local execution Fragment fragment = F.first(fragments); @@ -584,13 +572,13 @@ private ListFieldsQueryCursor mapAndExecutePlan( if (U.assertionsEnabled()) { assert fragment != null; - FragmentMapping mapping = plan.mapping(fragment); + FragmentMapping mapping = execPlan.mapping(fragment); assert mapping != null; List nodes = mapping.nodeIds(); - assert nodes != null && nodes.size() == 1 && F.first(nodes).equals(localNodeId()) + assert nodes != null && (nodes.size() == 1 && F.first(nodes).equals(localNodeId()) || nodes.isEmpty()) : "nodes=" + nodes + ", localNode=" + localNodeId(); } @@ -603,9 +591,9 @@ private ListFieldsQueryCursor mapAndExecutePlan( FragmentDescription fragmentDesc = new FragmentDescription( fragment.fragmentId(), - plan.mapping(fragment), - plan.target(fragment), - plan.remotes(fragment)); + execPlan.mapping(fragment), + execPlan.target(fragment), + execPlan.remotes(fragment)); ExecutionContext ectx = new ExecutionContext<>( qry.context(), @@ -624,7 +612,7 @@ private ListFieldsQueryCursor mapAndExecutePlan( Node node = new LogicalRelImplementor<>(ectx, partitionService(), mailboxRegistry(), exchangeService(), failureProcessor()).go(fragment.root()); - qry.run(ectx, plan, node); + qry.run(ectx, execPlan, plan.fieldsMetadata(), node); Map fragmentsPerNode = fragments.stream() .skip(1) @@ -636,9 +624,9 @@ private ListFieldsQueryCursor mapAndExecutePlan( fragment = fragments.get(i); fragmentDesc = new FragmentDescription( fragment.fragmentId(), - plan.mapping(fragment), - plan.target(fragment), - plan.remotes(fragment)); + execPlan.mapping(fragment), + execPlan.target(fragment), + execPlan.remotes(fragment)); Throwable ex = null; byte[] parametersMarshalled = null; diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java index 0184ba91f869f..f6a8be350edbb 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java @@ -17,18 +17,12 @@ package org.apache.ignite.internal.processors.query.calcite.prepare; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.UUID; -import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup; -import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping; +import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationMappingException; +import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMappingException; import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService; -import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver; -import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender; +import org.apache.ignite.internal.processors.query.calcite.util.Commons; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; /** @@ -44,9 +38,6 @@ public abstract class AbstractMultiStepPlan extends AbstractQueryPlan implements /** */ protected final QueryTemplate queryTemplate; - /** */ - protected ExecutionPlan executionPlan; - /** */ private final String textPlan; @@ -66,11 +57,6 @@ protected AbstractMultiStepPlan( this.paramsMetadata = paramsMetadata; } - /** {@inheritDoc} */ - @Override public List fragments() { - return Objects.requireNonNull(executionPlan).fragments(); - } - /** {@inheritDoc} */ @Override public FieldsMetadata fieldsMetadata() { return fieldsMetadata; @@ -82,47 +68,25 @@ protected AbstractMultiStepPlan( } /** {@inheritDoc} */ - @Override public FragmentMapping mapping(Fragment fragment) { - return fragment.mapping(); - } - - /** {@inheritDoc} */ - @Override public ColocationGroup target(Fragment fragment) { - if (fragment.rootFragment()) - return null; - - IgniteSender sender = (IgniteSender)fragment.root(); - return mapping(sender.targetFragmentId()).findGroup(sender.exchangeId()); - } - - /** {@inheritDoc} */ - @Override public Map> remotes(Fragment fragment) { - List remotes = fragment.remotes(); + @Override public ExecutionPlan init(MappingService mappingService, MappingQueryContext ctx) { + ExecutionPlan executionPlan0 = queryTemplate.map(mappingService, ctx); - if (F.isEmpty(remotes)) - return null; + if (!F.isEmpty(ctx.partitions()) && !F.isEmpty(executionPlan0.fragments())) { + List fragments = executionPlan0.fragments(); - HashMap> res = U.newHashMap(remotes.size()); + fragments = Commons.transform(fragments, f -> { + try { + return f.filterByPartitions(ctx.partitions()); + } + catch (ColocationMappingException e) { + throw new FragmentMappingException("Failed to calculate physical distribution", f, f.root(), e); + } + }); - for (IgniteReceiver remote : remotes) - res.put(remote.exchangeId(), mapping(remote.sourceFragmentId()).nodeIds()); - - return res; - } + return new ExecutionPlan(executionPlan0.topologyVersion(), fragments); + } - /** {@inheritDoc} */ - @Override public void init(MappingService mappingService, MappingQueryContext ctx) { - executionPlan = queryTemplate.map(mappingService, ctx); - } - - /** */ - private FragmentMapping mapping(long fragmentId) { - return Objects.requireNonNull(executionPlan).fragments().stream() - .filter(f -> f.fragmentId() == fragmentId) - .findAny().orElseThrow(() -> new IllegalStateException("Cannot find fragment with given ID. [" + - "fragmentId=" + fragmentId + ", " + - "fragments=" + fragments() + "]")) - .mapping(); + return executionPlan0; } /** {@inheritDoc} */ diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ExecutionPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ExecutionPlan.java index 75fc70d741b80..5a25502ea278f 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ExecutionPlan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ExecutionPlan.java @@ -17,14 +17,23 @@ package org.apache.ignite.internal.processors.query.calcite.prepare; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.UUID; import com.google.common.collect.ImmutableList; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup; +import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; /** * */ -class ExecutionPlan { +public class ExecutionPlan { /** */ private final AffinityTopologyVersion ver; @@ -46,4 +55,42 @@ public AffinityTopologyVersion topologyVersion() { public List fragments() { return fragments; } + + /** */ + public FragmentMapping mapping(Fragment fragment) { + return fragment.mapping(); + } + + /** */ + public ColocationGroup target(Fragment fragment) { + if (fragment.rootFragment()) + return null; + + IgniteSender sender = (IgniteSender)fragment.root(); + return mapping(sender.targetFragmentId()).findGroup(sender.exchangeId()); + } + + /** */ + public Map> remotes(Fragment fragment) { + List remotes = fragment.remotes(); + + if (F.isEmpty(remotes)) + return null; + + HashMap> res = U.newHashMap(remotes.size()); + + for (IgniteReceiver remote : remotes) + res.put(remote.exchangeId(), mapping(remote.sourceFragmentId()).nodeIds()); + + return res; + } + + /** */ + private FragmentMapping mapping(long fragmentId) { + return fragments().stream() + .filter(f -> f.fragmentId() == fragmentId) + .findAny().orElseThrow(() -> new IllegalStateException("Cannot find fragment with given ID. [" + + "fragmentId=" + fragmentId + ", " + "fragments=" + fragments() + "]")) + .mapping(); + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java index 2bdd1a0c913a5..f8b6f43128f3c 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java @@ -41,16 +41,20 @@ public class MappingQueryContext { /** */ private final boolean isLocal; + /** */ + private final int[] parts; + /** */ public MappingQueryContext(UUID locNodeId, AffinityTopologyVersion topVer) { - this(locNodeId, topVer, false); + this(locNodeId, topVer, false, null); } /** */ - public MappingQueryContext(UUID locNodeId, AffinityTopologyVersion topVer, boolean isLocal) { + public MappingQueryContext(UUID locNodeId, AffinityTopologyVersion topVer, boolean isLocal, int[] parts) { this.locNodeId = locNodeId; this.topVer = topVer; this.isLocal = isLocal; + this.parts = parts; } /** */ @@ -68,6 +72,11 @@ public boolean isLocal() { return isLocal; } + /** */ + public int[] partitions() { + return parts; + } + /** Creates a cluster. */ RelOptCluster cluster() { if (cluster == null) { diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java index e738d9b92a035..692f713c9398e 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java @@ -17,22 +17,12 @@ package org.apache.ignite.internal.processors.query.calcite.prepare; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup; -import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping; import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService; /** * Regular query or DML */ public interface MultiStepPlan extends QueryPlan { - /** - * @return Query fragments. - */ - List fragments(); - /** * @return Fields metadata. */ @@ -43,24 +33,12 @@ public interface MultiStepPlan extends QueryPlan { */ FieldsMetadata paramsMetadata(); - /** - * @param fragment Fragment. - * @return Mapping for a given fragment. - */ - FragmentMapping mapping(Fragment fragment); - - /** */ - ColocationGroup target(Fragment fragment); - - /** */ - Map> remotes(Fragment fragment); - /** * Inits query fragments. * * @param ctx Planner context. */ - void init(MappingService mappingService, MappingQueryContext ctx); + ExecutionPlan init(MappingService mappingService, MappingQueryContext ctx); /** * @return Text representation of query plan diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java index fcafc98b301a9..4476ebc379323 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java @@ -444,11 +444,11 @@ public static IgniteTypeFactory typeFactory() { /** */ public static MappingQueryContext mapContext(UUID locNodeId, AffinityTopologyVersion topVer) { - return mapContext(locNodeId, topVer, false); + return new MappingQueryContext(locNodeId, topVer, false, null); } /** */ - public static MappingQueryContext mapContext(UUID locNodeId, AffinityTopologyVersion topVer, boolean isLocal) { - return new MappingQueryContext(locNodeId, topVer, isLocal); + public static MappingQueryContext mapContext(UUID locNodeId, AffinityTopologyVersion topVer, BaseQueryContext ctx) { + return new MappingQueryContext(locNodeId, topVer, ctx.isLocal(), ctx.partitions()); } } diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/QueryWithPartitionsIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/QueryWithPartitionsIntegrationTest.java index 16655187dc835..660ee91dc3eb9 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/QueryWithPartitionsIntegrationTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/QueryWithPartitionsIntegrationTest.java @@ -23,7 +23,6 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; -import com.google.common.collect.ImmutableList; import com.google.common.primitives.Ints; import org.apache.calcite.util.Pair; import org.apache.ignite.IgniteCache; @@ -53,12 +52,16 @@ public class QueryWithPartitionsIntegrationTest extends AbstractBasicIntegration public boolean local; /** */ - @Parameterized.Parameters(name = "local = {0}") + @Parameterized.Parameter(1) + public int partSz; + + /** */ + @Parameterized.Parameters(name = "local = {0}, partSz = {1}") public static List parameters() { - return ImmutableList.of( - new Object[]{true}, - new Object[]{false} - ); + return Stream.of(true, false) + .flatMap(isLocal -> Stream.of(1, 2, 5, 10, 20) + .map(i -> new Object[]{isLocal, i})) + .collect(Collectors.toList()); } /** {@inheritDoc} */ @@ -67,7 +70,7 @@ public static List parameters() { List parts0 = IntStream.range(0, 1024).boxed().collect(Collectors.toList()); Collections.shuffle(parts0); - parts = Ints.toArray(parts0.subList(0, 20)); + parts = Ints.toArray(parts0.subList(0, partSz)); log.info("Running tests with parts=" + Arrays.toString(parts)); } diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java index aa4cd3e95f22e..597869d237ca1 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java @@ -52,6 +52,7 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription; import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory; import org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext; +import org.apache.ignite.internal.processors.query.calcite.prepare.ExecutionPlan; import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment; import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner; import org.apache.ignite.internal.processors.query.calcite.prepare.MappingQueryContext; @@ -314,7 +315,7 @@ private List executeQuery(IgniteSchema publicSchema, String sql, Objec IgniteRel phys = physicalPlan(ctx); - MultiStepPlan plan = splitPlan(phys); + ExecutionPlan plan = splitPlan(phys); List fragments = plan.fragments(); assertEquals(2, fragments.size()); @@ -359,16 +360,14 @@ private List executeQuery(IgniteSchema publicSchema, String sql, Objec } /** */ - private MultiStepPlan splitPlan(IgniteRel phys) { + private ExecutionPlan splitPlan(IgniteRel phys) { assertNotNull(phys); MultiStepPlan plan = new MultiStepQueryPlan(null, null, new QueryTemplate(new Splitter().go(phys)), null, null); assertNotNull(plan); - plan.init(this::intermediateMapping, Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE)); - - return plan; + return plan.init(this::intermediateMapping, Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE)); } /** @@ -378,7 +377,7 @@ private Node implementFragment( BaseQueryContext qctx, PlanningContext ctx, TestIoManager mgr, - MultiStepPlan plan, + ExecutionPlan plan, Fragment fragment, UUID qryId, UUID nodeId