From a966b0272916c480ccc4f50f90c836d7855a715c Mon Sep 17 00:00:00 2001 From: Ivan Daschinskiy Date: Tue, 17 Oct 2023 20:48:00 +0300 Subject: [PATCH] IGNITE-12455 SQL Calcite: Partition pruning (#10928) --- .../calcite/exec/ExecutionServiceImpl.java | 8 +- .../calcite/exec/PartitionExtractor.java | 249 ++++++++ .../exec/partition/PartitionAllNode.java | 46 ++ .../exec/partition/PartitionLiteralNode.java | 42 ++ .../calcite/exec/partition/PartitionNode.java | 36 ++ .../exec/partition/PartitionNoneNode.java | 39 ++ .../exec/partition/PartitionOperandNode.java | 104 ++++ .../partition/PartitionParameterNode.java | 52 ++ .../partition/PartitionPruningContext.java | 58 ++ .../exec/partition/PartitionSingleNode.java | 47 ++ .../calcite/metadata/ColocationGroup.java | 29 +- .../calcite/metadata/FragmentMapping.java | 14 +- .../metadata/IgniteMdFragmentMapping.java | 2 +- .../prepare/AbstractMultiStepPlan.java | 55 +- .../query/calcite/prepare/ExecutionPlan.java | 12 +- .../calcite/prepare/MappingQueryContext.java | 64 +- .../query/calcite/prepare/MultiStepPlan.java | 3 +- .../query/calcite/prepare/QueryTemplate.java | 8 +- .../query/calcite/util/Commons.java | 12 +- .../LocalQueryIntegrationTest.java | 2 +- .../integration/PartitionPruneTest.java | 585 ++++++++++++++++++ .../calcite/planner/AbstractPlannerTest.java | 15 +- .../query/calcite/planner/PlannerTest.java | 2 +- .../testsuites/IntegrationTestSuite.java | 4 + 24 files changed, 1430 insertions(+), 58 deletions(-) create mode 100644 modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/PartitionExtractor.java create mode 100644 modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/partition/PartitionAllNode.java create mode 100644 modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/partition/PartitionLiteralNode.java create mode 100644 modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/partition/PartitionNode.java create mode 100644 modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/partition/PartitionNoneNode.java create mode 100644 modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/partition/PartitionOperandNode.java create mode 100644 modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/partition/PartitionParameterNode.java create mode 100644 modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/partition/PartitionPruningContext.java create mode 100644 modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/partition/PartitionSingleNode.java create mode 100644 modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/PartitionPruneTest.java diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java index 38667c7c25af96..835c13ede19bca 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java @@ -560,9 +560,11 @@ private ListFieldsQueryCursor mapAndExecutePlan( ) { qry.mapping(); - MappingQueryContext mapCtx = Commons.mapContext(locNodeId, topologyVersion(), qry.context()); + Map qryParams = Commons.parametersMap(qry.parameters()); - ExecutionPlan execPlan = plan.init(mappingSvc, mapCtx); + MappingQueryContext mapCtx = Commons.mapContext(locNodeId, topologyVersion(), qry.context(), qryParams); + + ExecutionPlan execPlan = plan.init(mappingSvc, partSvc, mapCtx); List fragments = execPlan.fragments(); @@ -607,7 +609,7 @@ private ListFieldsQueryCursor mapAndExecutePlan( qry.createMemoryTracker(memoryTracker, cfg.getQueryMemoryQuota()), createIoTracker(locNodeId, qry.localQueryId()), timeout, - Commons.parametersMap(qry.parameters())); + qryParams); Node node = new LogicalRelImplementor<>(ectx, partitionService(), mailboxRegistry(), exchangeService(), failureProcessor()).go(fragment.root()); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/PartitionExtractor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/PartitionExtractor.java new file mode 100644 index 00000000000000..a769e28cbb0b19 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/PartitionExtractor.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Deque; +import java.util.List; +import java.util.stream.Collectors; +import com.google.common.primitives.Primitives; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexDynamicParam; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexLocalRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.util.ImmutableIntList; +import org.apache.ignite.internal.processors.query.QueryUtils; +import org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionAllNode; +import org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionLiteralNode; +import org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionNode; +import org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionNoneNode; +import org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionOperandNode; +import org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionParameterNode; +import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment; +import org.apache.ignite.internal.processors.query.calcite.prepare.IgniteRelShuttle; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange; +import org.apache.ignite.internal.processors.query.calcite.rel.ProjectableFilterableTableScan; +import org.apache.ignite.internal.processors.query.calcite.schema.CacheTableDescriptor; +import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable; +import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory; +import org.apache.ignite.internal.processors.query.calcite.util.Commons; + +/** */ +public class PartitionExtractor extends IgniteRelShuttle { + /** */ + private final IgniteTypeFactory typeFactory; + + /** */ + private final Deque stack = new ArrayDeque<>(); + + /** */ + public PartitionExtractor(IgniteTypeFactory typeFactory) { + this.typeFactory = typeFactory; + } + + /** {@inheritDoc} */ + @Override public IgniteRel visit(IgniteIndexScan rel) { + processScan(rel); + + return rel; + } + + /** {@inheritDoc} */ + @Override public IgniteRel visit(IgniteTableScan rel) { + processScan(rel); + + return rel; + } + + /** {@inheritDoc} */ + @Override public IgniteRel visit(IgniteTrimExchange rel) { + stack.push(PartitionAllNode.IGNORE); + + return rel; + } + + /** {@inheritDoc} */ + @Override public IgniteRel visit(IgniteReceiver rel) { + stack.push(PartitionAllNode.IGNORE); + + return rel; + } + + /** {@inheritDoc} */ + @Override protected IgniteRel processNode(IgniteRel rel) { + IgniteRel res = super.processNode(rel); + + if (rel.getInputs().size() > 1) { + List operands = new ArrayList<>(); + for (int i = 0; i < rel.getInputs().size(); ++i) + operands.add(stack.pop()); + + stack.push(PartitionOperandNode.createOrOperandNode(operands)); + } + else if (rel.getInputs().isEmpty()) + stack.push(PartitionAllNode.INSTANCE); + + return res; + } + + /** */ + public PartitionNode go(Fragment fragment) { + if (!(fragment.root() instanceof IgniteSender)) + return PartitionAllNode.INSTANCE; + + if (fragment.mapping() == null || !fragment.mapping().colocated()) + return PartitionAllNode.INSTANCE; + + processNode(fragment.root()); + + if (stack.isEmpty()) + return PartitionAllNode.INSTANCE; + + return stack.pop(); + } + + /** */ + private void processScan(IgniteRel rel) { + assert rel instanceof ProjectableFilterableTableScan; + + RexNode condition = ((ProjectableFilterableTableScan)rel).condition(); + + IgniteTable tbl = rel.getTable().unwrap(IgniteTable.class); + + if (!tbl.distribution().function().affinity() || tbl.distribution().getKeys().size() != 1) { + stack.push(PartitionAllNode.INSTANCE); + + return; + } + + ImmutableIntList keys = tbl.distribution().getKeys(); + RelDataType rowType = tbl.getRowType(typeFactory); + int cacheId = ((CacheTableDescriptor)tbl.descriptor()).cacheInfo().cacheId(); + + List> types = new ArrayList<>(rowType.getFieldCount()); + for (RelDataTypeField field : rowType.getFieldList()) { + if (QueryUtils.KEY_FIELD_NAME.equals(field.getName())) + keys = keys.append(field.getIndex()); + + types.add(Primitives.wrap((Class)typeFactory.getJavaClass(field.getType()))); + } + + List requiredCols; + if (((ProjectableFilterableTableScan)rel).requiredColumns() != null) + requiredCols = ((ProjectableFilterableTableScan)rel).requiredColumns().asList(); + else + requiredCols = Collections.emptyList(); + + PartitionNode partNode = processCondition(condition, types, keys, requiredCols, cacheId); + + stack.push(partNode); + } + + /** */ + private PartitionNode processCondition( + RexNode condition, + List> types, + ImmutableIntList keys, + List requiredCols, + int cacheId + ) { + if (!(condition instanceof RexCall)) + return PartitionAllNode.INSTANCE; + + SqlKind opKind = condition.getKind(); + List operands = ((RexCall)condition).getOperands(); + + switch (opKind) { + case IS_NULL: { + RexNode left = operands.get(0); + + if (!left.isA(SqlKind.LOCAL_REF)) + return PartitionAllNode.INSTANCE; + + int idx = ((RexLocalRef)left).getIndex(); + if (!requiredCols.isEmpty()) + idx = requiredCols.get(idx); + + if (!keys.contains(idx)) + return PartitionAllNode.INSTANCE; + else + return PartitionNoneNode.INSTANCE; + } + case EQUALS: { + if (operands.size() != 2) + return PartitionAllNode.INSTANCE; + + RexNode left, right; + if (operands.get(0).isA(SqlKind.LOCAL_REF)) { + left = operands.get(0); + right = operands.get(1); + } + else { + left = operands.get(1); + right = operands.get(0); + } + + if (!left.isA(SqlKind.LOCAL_REF)) + return PartitionAllNode.INSTANCE; + + if (!right.isA(SqlKind.LITERAL) && !right.isA(SqlKind.DYNAMIC_PARAM)) + return PartitionAllNode.INSTANCE; + + int idx = ((RexLocalRef)left).getIndex(); + if (!requiredCols.isEmpty()) + idx = requiredCols.get(idx); + + if (!keys.contains(idx)) + return PartitionAllNode.INSTANCE; + + Class fldType = types.get(idx); + + if (right.isA(SqlKind.LITERAL)) + return new PartitionLiteralNode(cacheId, (RexLiteral)right, fldType); + else + return new PartitionParameterNode(cacheId, (RexDynamicParam)right, fldType); + } + case SEARCH: + RexNode condition0 = RexUtil.expandSearch(Commons.emptyCluster().getRexBuilder(), null, condition); + + return processCondition(condition0, types, keys, requiredCols, cacheId); + case OR: + case AND: + List operands0 = ((RexCall)condition).getOperands().stream() + .map(node -> processCondition(node, types, keys, requiredCols, cacheId)) + .collect(Collectors.toList()); + + return opKind == SqlKind.OR ? PartitionOperandNode.createOrOperandNode(operands0) + : PartitionOperandNode.createAndOperandNode(operands0); + default: + return PartitionAllNode.INSTANCE; + } + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/partition/PartitionAllNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/partition/PartitionAllNode.java new file mode 100644 index 00000000000000..da51c350539f03 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/partition/PartitionAllNode.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.query.calcite.exec.partition; + +import java.util.Collection; + +/** */ +public class PartitionAllNode implements PartitionNode { + /** Exclude this node from partition calculation if other nodes present in {@link PartitionParameterNode }. */ + public static final PartitionAllNode IGNORE = new PartitionAllNode(true); + + /** */ + public static final PartitionAllNode INSTANCE = new PartitionAllNode(false); + + /** */ + private final boolean replicated; + + /** */ + private PartitionAllNode(boolean replicated) { + this.replicated = replicated; + } + + /** */ + public boolean isReplicated() { + return replicated; + } + + /** {@inheritDoc} */ + @Override public Collection apply(PartitionPruningContext ctx) { + return null; + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/partition/PartitionLiteralNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/partition/PartitionLiteralNode.java new file mode 100644 index 00000000000000..04da8eca998d30 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/partition/PartitionLiteralNode.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.partition; + +import org.apache.calcite.rex.RexLiteral; +import org.apache.ignite.internal.processors.query.calcite.metadata.AffinityService; + +/** */ +public class PartitionLiteralNode extends PartitionSingleNode { + /** */ + private final Object val; + + /** */ + public PartitionLiteralNode(int cacheId, RexLiteral literal, Class colType) { + super(cacheId); + + val = literal.getValueAs(colType); + + } + + /** {@inheritDoc} */ + @Override Integer applySingle(PartitionPruningContext ctx) { + AffinityService affSvc = ctx.affinityService(); + + return affSvc.affinity(cacheId()).applyAsInt(val); + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/partition/PartitionNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/partition/PartitionNode.java new file mode 100644 index 00000000000000..9f65f79f892c55 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/partition/PartitionNode.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.partition; + +import java.util.Collection; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.jetbrains.annotations.Nullable; + +/** */ +public interface PartitionNode { + /** + * @param ctx Partition pruning context. + * @return Collection of partitions after pruning or {@code null} if all partitions required. + */ + @Nullable Collection apply(PartitionPruningContext ctx); + + /** */ + default int cacheId() { + return CU.UNDEFINED_CACHE_ID; + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/partition/PartitionNoneNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/partition/PartitionNoneNode.java new file mode 100644 index 00000000000000..07609c20f331f7 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/partition/PartitionNoneNode.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.partition; + +import java.util.Collection; +import java.util.Collections; + +/** */ +public class PartitionNoneNode implements PartitionNode { + /** */ + public static final PartitionNoneNode INSTANCE = new PartitionNoneNode(); + + /** + * Constructor. + */ + private PartitionNoneNode() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public Collection apply(PartitionPruningContext ctx) { + return Collections.emptyList(); + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/partition/PartitionOperandNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/partition/PartitionOperandNode.java new file mode 100644 index 00000000000000..9ed555affb0320 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/partition/PartitionOperandNode.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.partition; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** */ +public class PartitionOperandNode implements PartitionNode { + /** */ + private final Operand op; + + /** */ + private final List operands; + + /** */ + private PartitionOperandNode(Operand op, List operands) { + this.op = op; + this.operands = Collections.unmodifiableList(operands); + } + + /** {@inheritDoc} */ + @Override public Collection apply(PartitionPruningContext ctx) { + Set allParts = null; + + if (op == Operand.AND) { + for (PartitionNode operand : operands) { + if (operand == PartitionAllNode.IGNORE) + continue; + + Collection parts = operand.apply(ctx); + + if (parts == null) + continue; + + if (allParts == null) + allParts = new HashSet<>(parts); + else + allParts.retainAll(parts); + } + } + else { + for (PartitionNode operand: operands) { + if (operand == PartitionAllNode.IGNORE) + continue; + + Collection parts = operand.apply(ctx); + + if (parts == null) + return null; + + if (allParts == null) + allParts = new HashSet<>(parts); + else + allParts.addAll(parts); + } + } + + return allParts != null ? Collections.unmodifiableCollection(allParts) : null; + } + + /** */ + public static PartitionNode createAndOperandNode(List operands) { + if (operands.stream().anyMatch(n -> n == PartitionNoneNode.INSTANCE)) + return PartitionNoneNode.INSTANCE; + + return new PartitionOperandNode(Operand.AND, operands); + } + + /** */ + public static PartitionNode createOrOperandNode(List operands) { + if (operands.stream().anyMatch(n -> n == PartitionAllNode.INSTANCE)) + return PartitionAllNode.INSTANCE; + + return new PartitionOperandNode(Operand.OR, operands); + } + + /** */ + private enum Operand { + /** */ + AND, + + /** */ + OR, + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/partition/PartitionParameterNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/partition/PartitionParameterNode.java new file mode 100644 index 00000000000000..f8869404d0285a --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/partition/PartitionParameterNode.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.partition; + +import org.apache.calcite.rex.RexDynamicParam; +import org.apache.ignite.internal.processors.query.calcite.metadata.AffinityService; +import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils; + +/** */ +public class PartitionParameterNode extends PartitionSingleNode { + /** */ + private final Class colType; + + /** */ + private final RexDynamicParam param; + + /** */ + public PartitionParameterNode(int cacheId, RexDynamicParam param, Class colType) { + super(cacheId); + this.param = param; + this.colType = colType; + } + + /** {@inheritDoc} */ + @Override Integer applySingle(PartitionPruningContext ctx) { + int idx = param.getIndex(); + + Object val = TypeUtils.toInternal(ctx.dataContext(), ctx.resolveParameter(idx), colType); + + if (val == null) + return null; + + AffinityService affSvc = ctx.affinityService(); + + return affSvc.affinity(cacheId()).applyAsInt(val); + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/partition/PartitionPruningContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/partition/PartitionPruningContext.java new file mode 100644 index 00000000000000..3c3d45bf1a0718 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/partition/PartitionPruningContext.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.partition; + +import java.util.Collections; +import java.util.Map; +import org.apache.calcite.DataContext; +import org.apache.ignite.internal.processors.query.calcite.metadata.AffinityService; + +/** */ +public class PartitionPruningContext { + /** */ + private final AffinityService affSvc; + + /** */ + private final Map params; + + /** */ + private final DataContext dataCtx; + + /** */ + public PartitionPruningContext(AffinityService affSvc, DataContext dataCtx, Map params) { + this.affSvc = affSvc; + this.dataCtx = dataCtx; + this.params = Collections.unmodifiableMap(params); + } + + /** */ + public DataContext dataContext() { + return dataCtx; + } + + /** */ + public AffinityService affinityService() { + return affSvc; + } + + /** */ + public Object resolveParameter(int idx) { + return params.get("?" + idx); + + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/partition/PartitionSingleNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/partition/PartitionSingleNode.java new file mode 100644 index 00000000000000..22f2ef3c480cf5 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/partition/PartitionSingleNode.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.partition; + +import java.util.Collection; +import com.google.common.collect.ImmutableList; + +/** */ +abstract class PartitionSingleNode implements PartitionNode { + /** */ + private final int cacheId; + + /** */ + protected PartitionSingleNode(int cacheId) { + this.cacheId = cacheId; + } + + /** {@inheritDoc} */ + @Override public Collection apply(PartitionPruningContext ctx) { + Integer part = applySingle(ctx); + + return part == null ? ImmutableList.of() : ImmutableList.of(part); + } + + /** */ + abstract Integer applySingle(PartitionPruningContext ctx); + + /** {@inheritDoc} */ + @Override public int cacheId() { + return cacheId; + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java index 962f6a793f3af5..baa36c26aef0ac 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java @@ -36,7 +36,6 @@ import org.apache.ignite.internal.processors.query.calcite.message.MarshalableMessage; import org.apache.ignite.internal.processors.query.calcite.message.MarshallingContext; import org.apache.ignite.internal.processors.query.calcite.message.MessageType; -import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; import org.apache.ignite.internal.processors.query.calcite.util.Commons; import org.apache.ignite.internal.util.GridIntIterator; import org.apache.ignite.internal.util.GridIntList; @@ -122,16 +121,6 @@ public List> assignments() { return Collections.emptyList(); } - /** - * Prunes involved partitions (hence nodes, involved in query execution) on the basis of filter, - * its distribution, query parameters and original nodes mapping. - * @param rel Filter. - * @return Resulting nodes mapping. - */ - public ColocationGroup prune(IgniteRel rel) { - return this; // TODO https://issues.apache.org/jira/browse/IGNITE-12455 - } - /** */ public boolean belongs(long sourceId) { if (sourceIds == null) @@ -236,16 +225,22 @@ public ColocationGroup filterByPartitions(int[] parts) { List> assignments = new ArrayList<>(this.assignments.size()); Set nodes = new HashSet<>(); - if (F.isEmpty(parts)) + if (parts == null) return this; - for (int i = 0; i < this.assignments.size(); ++i) { - UUID first = Arrays.binarySearch(parts, i) >= 0 ? F.first(this.assignments.get(i)) : null; + if (parts.length > 0) { + for (int i = 0; i < this.assignments.size(); ++i) { + UUID first = Arrays.binarySearch(parts, i) >= 0 ? F.first(this.assignments.get(i)) : null; - if (first != null) - nodes.add(first); + if (first != null) + nodes.add(first); - assignments.add(first != null ? this.assignments.get(i) : Collections.emptyList()); + assignments.add(first != null ? this.assignments.get(i) : Collections.emptyList()); + } + } + else { + for (int i = 0; i < this.assignments.size(); ++i) + assignments.add(Collections.emptyList()); } return new ColocationGroup(sourceIds, new ArrayList<>(nodes), assignments); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java index 7d023558c649c1..f31107d1d3a206 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java @@ -27,7 +27,6 @@ import org.apache.ignite.internal.processors.query.calcite.message.MarshalableMessage; import org.apache.ignite.internal.processors.query.calcite.message.MarshallingContext; import org.apache.ignite.internal.processors.query.calcite.message.MessageType; -import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; import org.apache.ignite.internal.processors.query.calcite.util.Commons; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; @@ -88,14 +87,6 @@ public boolean colocated() { return colocationGroups.isEmpty() || colocationGroups.size() == 1; } - /** */ - public FragmentMapping prune(IgniteRel rel) { - if (colocationGroups.size() != 1) - return this; - - return new FragmentMapping(F.first(colocationGroups).prune(rel)); - } - /** */ public FragmentMapping combine(FragmentMapping other) { return new FragmentMapping(Commons.combine(colocationGroups, other.colocationGroups)); @@ -131,6 +122,11 @@ public List nodeIds() { .distinct().collect(Collectors.toList()); } + /** */ + public List colocationGroups() { + return Collections.unmodifiableList(colocationGroups); + } + /** */ public FragmentMapping finalizeMapping(Supplier> nodesSource) { if (colocationGroups.isEmpty()) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentMapping.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentMapping.java index b4efd07ffcd1a3..51b0ca886ada5e 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentMapping.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentMapping.java @@ -159,7 +159,7 @@ public FragmentMapping fragmentMapping(SetOp rel, RelMetadataQuery mq, MappingQu * Prunes involved partitions (hence nodes, involved in query execution) if possible. */ public FragmentMapping fragmentMapping(IgniteFilter rel, RelMetadataQuery mq, MappingQueryContext ctx) { - return _fragmentMapping(rel.getInput(), mq, ctx).prune(rel); + return _fragmentMapping(rel.getInput(), mq, ctx); } /** diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java index f6a8be350edbbe..dd1a05ebdef609 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java @@ -17,12 +17,20 @@ package org.apache.ignite.internal.processors.query.calcite.prepare; +import java.util.Arrays; +import java.util.Collection; import java.util.List; +import com.google.common.primitives.Ints; +import org.apache.calcite.util.Pair; +import org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionNode; +import org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionPruningContext; +import org.apache.ignite.internal.processors.query.calcite.metadata.AffinityService; import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationMappingException; import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMappingException; import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService; import org.apache.ignite.internal.processors.query.calcite.util.Commons; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; /** @@ -68,22 +76,59 @@ protected AbstractMultiStepPlan( } /** {@inheritDoc} */ - @Override public ExecutionPlan init(MappingService mappingService, MappingQueryContext ctx) { - ExecutionPlan executionPlan0 = queryTemplate.map(mappingService, ctx); + @Override public ExecutionPlan init( + MappingService mappingService, + AffinityService affSvc, + MappingQueryContext mapCtx + ) { + ExecutionPlan executionPlan0 = queryTemplate.map(mappingService, mapCtx); + + if (F.isEmpty(executionPlan0.fragments())) + return executionPlan0; - if (!F.isEmpty(ctx.partitions()) && !F.isEmpty(executionPlan0.fragments())) { + if (!F.isEmpty(mapCtx.partitions())) { List fragments = executionPlan0.fragments(); fragments = Commons.transform(fragments, f -> { try { - return f.filterByPartitions(ctx.partitions()); + return f.filterByPartitions(mapCtx.partitions()); } catch (ColocationMappingException e) { throw new FragmentMappingException("Failed to calculate physical distribution", f, f.root(), e); } }); - return new ExecutionPlan(executionPlan0.topologyVersion(), fragments); + return new ExecutionPlan(executionPlan0.topologyVersion(), fragments, executionPlan0.partitionNodes()); + } + else if (!mapCtx.isLocal() && mapCtx.unwrap(BaseQueryContext.class) != null) { + BaseQueryContext qryCtx = mapCtx.unwrap(BaseQueryContext.class); + + List fragments = executionPlan0.fragments(); + List partNodes = executionPlan0.partitionNodes(); + + fragments = Commons.transform(Pair.zip(fragments, partNodes), pair -> { + Fragment fragment = pair.left; + PartitionNode partNode = pair.right; + + Collection parts0 = partNode.apply(new PartitionPruningContext(affSvc, + new BaseDataContext(qryCtx.typeFactory()), mapCtx.queryParameters())); + + if (parts0 == null) + return fragment; + + int[] parts = !parts0.isEmpty() ? Ints.toArray(parts0) : U.EMPTY_INTS; + if (parts.length > 1) + Arrays.sort(parts); + + try { + return fragment.filterByPartitions(parts); + } + catch (ColocationMappingException e) { + throw new FragmentMappingException("Failed to calculate physical distribution", fragment, fragment.root(), e); + } + }); + + return new ExecutionPlan(executionPlan0.topologyVersion(), fragments, partNodes); } return executionPlan0; diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ExecutionPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ExecutionPlan.java index 5a25502ea278fa..e129333da0e84b 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ExecutionPlan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ExecutionPlan.java @@ -23,6 +23,7 @@ import java.util.UUID; import com.google.common.collect.ImmutableList; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionNode; import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup; import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver; @@ -41,9 +42,13 @@ public class ExecutionPlan { private final ImmutableList fragments; /** */ - ExecutionPlan(AffinityTopologyVersion ver, List fragments) { + private final ImmutableList partNodes; + + /** */ + ExecutionPlan(AffinityTopologyVersion ver, List fragments, List partNodes) { this.ver = ver; this.fragments = ImmutableList.copyOf(fragments); + this.partNodes = ImmutableList.copyOf(partNodes); } /** */ @@ -56,6 +61,11 @@ public List fragments() { return fragments; } + /** */ + public List partitionNodes() { + return partNodes; + } + /** */ public FragmentMapping mapping(Fragment fragment) { return fragment.mapping(); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java index f8b6f43128f3c3..2ab7c6c0fcc2d8 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java @@ -17,44 +17,58 @@ package org.apache.ignite.internal.processors.query.calcite.prepare; +import java.util.Collections; +import java.util.Map; import java.util.UUID; +import org.apache.calcite.plan.Context; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.rel.metadata.CachingRelMetadataProvider; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata; import org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx; +import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory; import org.apache.ignite.internal.processors.query.calcite.util.Commons; +import org.apache.ignite.internal.util.typedef.F; +import org.checkerframework.checker.nullness.qual.Nullable; /** * Query mapping context. */ -public class MappingQueryContext { +public class MappingQueryContext implements Context { /** */ - private final UUID locNodeId; + private final Context parent; /** */ - private final AffinityTopologyVersion topVer; + private final UUID locNodeId; /** */ - private RelOptCluster cluster; + private final AffinityTopologyVersion topVer; /** */ - private final boolean isLocal; + private final Map params; /** */ - private final int[] parts; + private RelOptCluster cluster; /** */ - public MappingQueryContext(UUID locNodeId, AffinityTopologyVersion topVer) { - this(locNodeId, topVer, false, null); + public MappingQueryContext( + UUID locNodeId, + AffinityTopologyVersion topVer + ) { + this(null, locNodeId, topVer, null); } /** */ - public MappingQueryContext(UUID locNodeId, AffinityTopologyVersion topVer, boolean isLocal, int[] parts) { + public MappingQueryContext( + BaseQueryContext parent, + UUID locNodeId, + AffinityTopologyVersion topVer, + Map params + ) { this.locNodeId = locNodeId; this.topVer = topVer; - this.isLocal = isLocal; - this.parts = parts; + this.parent = parent; + this.params = !F.isEmpty(params) ? Collections.unmodifiableMap(params) : Collections.emptyMap(); } /** */ @@ -69,12 +83,28 @@ public AffinityTopologyVersion topologyVersion() { /** */ public boolean isLocal() { - return isLocal; + BaseQueryContext qryCtx = unwrap(BaseQueryContext.class); + + return qryCtx != null && qryCtx.isLocal(); } /** */ public int[] partitions() { - return parts; + BaseQueryContext qryCtx = unwrap(BaseQueryContext.class); + + return qryCtx != null ? qryCtx.partitions() : null; + } + + /** */ + public Map queryParameters() { + return params; + } + + /** */ + public IgniteTypeFactory typeFactory() { + BaseQueryContext qryCtx = unwrap(BaseQueryContext.class); + + return qryCtx != null ? qryCtx.typeFactory() : BaseQueryContext.TYPE_FACTORY; } /** Creates a cluster. */ @@ -88,4 +118,12 @@ RelOptCluster cluster() { return cluster; } + + /** {@inheritDoc} */ + @Override public @Nullable C unwrap(Class aCls) { + if (aCls == getClass()) + return aCls.cast(this); + + return parent != null ? parent.unwrap(aCls) : null; + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java index 692f713c9398ec..6e174019b86cb7 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.query.calcite.prepare; +import org.apache.ignite.internal.processors.query.calcite.metadata.AffinityService; import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService; /** @@ -38,7 +39,7 @@ public interface MultiStepPlan extends QueryPlan { * * @param ctx Planner context. */ - ExecutionPlan init(MappingService mappingService, MappingQueryContext ctx); + ExecutionPlan init(MappingService mappingService, AffinityService affSvc, MappingQueryContext ctx); /** * @return Text representation of query plan diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java index b1c121d2187922..33a173846617cc 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java @@ -27,6 +27,8 @@ import com.google.common.collect.ImmutableList; import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.ignite.internal.processors.query.IgniteSQLException; +import org.apache.ignite.internal.processors.query.calcite.exec.PartitionExtractor; +import org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionNode; import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMappingException; import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver; @@ -66,7 +68,11 @@ public ExecutionPlan map(MappingService mappingService, MappingQueryContext ctx) for (int i = 0; i < 3; i++) { try { - ExecutionPlan executionPlan0 = new ExecutionPlan(ctx.topologyVersion(), map(mappingService, fragments, ctx, mq)); + fragments = map(mappingService, fragments, ctx, mq); + + List partNodes = Commons.transform(fragments, f -> new PartitionExtractor(ctx.typeFactory()).go(f)); + + ExecutionPlan executionPlan0 = new ExecutionPlan(ctx.topologyVersion(), fragments, partNodes); if (executionPlan == null || executionPlan.topologyVersion().before(executionPlan0.topologyVersion())) this.executionPlan.compareAndSet(executionPlan, executionPlan0); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java index 4476ebc3793234..a88d52c8865093 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java @@ -33,7 +33,6 @@ import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; - import org.apache.calcite.config.CalciteSystemProperty; import org.apache.calcite.linq4j.Ord; import org.apache.calcite.plan.Context; @@ -444,11 +443,16 @@ public static IgniteTypeFactory typeFactory() { /** */ public static MappingQueryContext mapContext(UUID locNodeId, AffinityTopologyVersion topVer) { - return new MappingQueryContext(locNodeId, topVer, false, null); + return mapContext(locNodeId, topVer, null, null); } /** */ - public static MappingQueryContext mapContext(UUID locNodeId, AffinityTopologyVersion topVer, BaseQueryContext ctx) { - return new MappingQueryContext(locNodeId, topVer, ctx.isLocal(), ctx.partitions()); + public static MappingQueryContext mapContext( + UUID locNodeId, + AffinityTopologyVersion topVer, + BaseQueryContext ctx, + Map qryParams + ) { + return new MappingQueryContext(ctx, locNodeId, topVer, qryParams); } } diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/LocalQueryIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/LocalQueryIntegrationTest.java index 7302eb6bd73ea8..fa0bccaa1add7d 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/LocalQueryIntegrationTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/LocalQueryIntegrationTest.java @@ -73,7 +73,7 @@ public class LocalQueryIntegrationTest extends AbstractBasicIntegrationTest { StringBuilder sb = new StringBuilder("INSERT INTO ").append(tableName) .append("(ID, IDX_VAL, VAL) VALUES "); - for (int i = 0; i < 10000; ++i) { + for (int i = 0; i < ENTRIES_COUNT; ++i) { sb.append("(").append(i).append(", ") .append("'name_").append(i).append("', ") .append("'name_").append(i).append("')"); diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/PartitionPruneTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/PartitionPruneTest.java new file mode 100644 index 00000000000000..d0ed39d2d3e9f5 --- /dev/null +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/PartitionPruneTest.java @@ -0,0 +1,585 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.integration; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.LongAdder; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import com.google.common.collect.ImmutableList; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CachePeekMode; +import org.apache.ignite.calcite.CalciteQueryEngineConfiguration; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.query.calcite.message.QueryStartRequest; +import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup; +import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.junit.Test; + +/** */ +public class PartitionPruneTest extends AbstractBasicIntegrationTest { + /** */ + private static final int ENTRIES_COUNT = 10000; + + /** */ + private static final int LONG_QUERY_WARNING_TIMEOUT = 20000; + + /** */ + private static final LongAdder INTERCEPTED_START_REQUEST_COUNT = new LongAdder(); + + /** */ + private static final ConcurrentSkipListSet INTERCEPTED_PARTS = new ConcurrentSkipListSet<>(); + + /** */ + private static final ConcurrentSkipListSet INTERCEPTED_NODES = new ConcurrentSkipListSet<>( + Comparator.comparing(ClusterNode::id)); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.getSqlConfiguration().setQueryEnginesConfiguration(new CalciteQueryEngineConfiguration()); + cfg.getSqlConfiguration().setLongQueryWarningTimeout(LONG_QUERY_WARNING_TIMEOUT); + + cfg.setCommunicationSpi(new TcpCommunicationSpi() { + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure ackC) { + assert msg != null; + + if (msg instanceof GridIoMessage) { + GridIoMessage msg0 = (GridIoMessage)msg; + + if (msg0.message() instanceof QueryStartRequest) { + INTERCEPTED_START_REQUEST_COUNT.increment(); + INTERCEPTED_NODES.add(node); + + QueryStartRequest startReq = (QueryStartRequest)msg0.message(); + + assertNotNull(startReq.fragmentDescription()); + + FragmentMapping mapping = startReq.fragmentDescription().mapping(); + + assertNotNull(mapping); + + List groups = mapping.colocationGroups(); + + for (ColocationGroup group: groups) { + int[] parts = group.partitions(node.id()); + + if (!F.isEmpty(parts)) { + for (int part : parts) + INTERCEPTED_PARTS.add(part); + } + } + } + } + + super.sendMessage(node, msg, ackC); + } + }); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + sql("CREATE TABLE T1(ID INT, IDX_VAL VARCHAR, VAL VARCHAR, PRIMARY KEY(ID)) WITH cache_name=t1_cache,backups=1"); + sql("CREATE TABLE T2(ID INT, IDX_VAL VARCHAR, VAL VARCHAR, PRIMARY KEY(ID)) WITH cache_name=t2_cache,backups=1"); + sql("CREATE TABLE DICT(ID INT PRIMARY KEY, IDX_VAL VARCHAR, VAL VARCHAR) WITH template=replicated,cache_name=dict_cache"); + + sql("CREATE INDEX T1_IDX ON T1(IDX_VAL)"); + sql("CREATE INDEX T2_IDX ON T2(IDX_VAL)"); + sql("CREATE INDEX DICT_IDX ON DICT(IDX_VAL)"); + + Stream.of("T1", "T2", "DICT").forEach(tableName -> { + StringBuilder sb = new StringBuilder("INSERT INTO ").append(tableName) + .append("(ID, IDX_VAL, VAL) VALUES "); + + for (int i = 0; i < ENTRIES_COUNT; ++i) { + sb.append("(").append(i).append(", ") + .append("'name_").append(i).append("', ") + .append("'name_").append(i).append("')"); + + if (i < ENTRIES_COUNT - 1) + sb.append(","); + } + + sql(sb.toString()); + + assertEquals(ENTRIES_COUNT, client.getOrCreateCache(tableName + "_CACHE").size(CachePeekMode.PRIMARY)); + }); + + sql("ANALYZE PUBLIC.T1(ID), PUBLIC.T2(ID), PUBLIC.DICT(ID) WITH \"NULLS=0,DISTINCT=" + ENTRIES_COUNT + + ",TOTAL=" + ENTRIES_COUNT + "\""); + + clearIntercepted(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + clearIntercepted(); + } + + /** {@inheritDoc} */ + @Override protected int nodeCount() { + return 8; + } + + /** */ + @Test + public void testSimple() { + execute("select count(*) from T1 where T1.ID = ?", + res -> { + assertPartitions(partition("T1_CACHE", 123)); + assertNodes(node("T1_CACHE", 123)); + + assertEquals(1, res.size()); + assertEquals(1L, res.get(0).get(0)); + }, + 123); + + execute("select * from T1 where T1.ID = ?", + res -> { + assertPartitions(partition("T1_CACHE", 123)); + assertNodes(node("T1_CACHE", 123)); + + assertEquals(1, res.size()); + assertEquals("name_123", res.get(0).get(1)); + }, + 123); + + execute("select * from T1 where ? = T1.ID", + res -> { + assertPartitions(partition("T1_CACHE", 123)); + assertNodes(node("T1_CACHE", 123)); + + assertEquals(1, res.size()); + assertEquals("name_123", res.get(0).get(1)); + }, + 123); + + execute("select VAL, IDX_VAL, ID from T1 where T1.ID = ?", + res -> { + assertPartitions(partition("T1_CACHE", 123)); + assertNodes(node("T1_CACHE", 123)); + + assertEquals(1, res.size()); + assertEquals("name_123", res.get(0).get(1)); + }, + 123); + + execute("select * from T1 where T1.ID = ? and T1.ID = ?", + res -> { + assertPartitions(partition("T1_CACHE", 123)); + assertNodes(node("T1_CACHE", 123)); + + assertEquals(1, res.size()); + assertEquals("name_123", res.get(0).get(1)); + }, + 123, 123); + } + + /** */ + @Test + public void testNullsInCondition() { + execute("select * from T1 where T1.ID is NULL", + res -> { + assertPartitions(); + assertNodes(); + + assertTrue(res.isEmpty()); + }); + + execute("select * from T1 where T1.ID = ?", + res -> { + assertPartitions(); + assertNodes(); + + assertTrue(res.isEmpty()); + }, new Object[]{ null }); + + execute("select * from T1 where T1.ID is NULL and T1.ID = ?", + res -> { + assertPartitions(); + assertNodes(); + + assertTrue(res.isEmpty()); + }, 123); + + execute("select * from T1 where T1.ID = ? and T1.ID = ?", + res -> { + assertPartitions(); + assertNodes(); + + assertTrue(res.isEmpty()); + }, null, 123); + + execute("select * from T1 where T1.ID is NULL or T1.ID = ?", + res -> { + assertPartitions(partition("T1_CACHE", 123)); + assertNodes(node("T1_CACHE", 123)); + + assertEquals(1, res.size()); + assertEquals("name_123", res.get(0).get(1)); + }, 123); + + execute("select * from T1 where T1.ID = ? or T1.ID = ?", + res -> { + assertPartitions(partition("T1_CACHE", 123)); + assertNodes(node("T1_CACHE", 123)); + + assertEquals(1, res.size()); + assertEquals("name_123", res.get(0).get(1)); + }, null, 123); + } + + /** */ + @Test + public void testEmptyConditions() { + execute("select * from T1 where T1.ID = ? and T1.ID = ?", + res -> { + assertPartitions(); + assertNodes(); + + assertTrue(res.isEmpty()); + }, + 123, 518); + + execute("select * from T1 where T1.ID = ? and (T1.ID = ? OR T1.ID = ? OR T1.ID = ?)", + res -> { + assertPartitions(); + assertNodes(); + + assertTrue(res.isEmpty()); + }, + 123, 518, 781, 295); + + execute("select * from T1 where (T1.ID = ? OR T1.ID = ?) AND (T1.ID = ? OR T1.ID = ?)", + res -> { + assertPartitions(); + assertNodes(); + + assertTrue(res.isEmpty()); + }, + 123, 518, 781, 295); + + execute("select * from T1 where (T1.ID = ? AND T1.ID = ?) OR (T1.ID = ? AND T1.ID = ?)", + res -> { + assertPartitions(); + assertNodes(); + + assertTrue(res.isEmpty()); + }, + 123, 518, 781, 295); + } + + /** */ + @Test + public void testSelectIn() { + IntStream.of(2, 6).forEach(i -> { + testSelect(i, true, "_KEY"); + testSelect(i, true, "ID"); + }); + } + + + /** */ + @Test + public void testSelectOr() { + testSelect(1, false, "_KEY"); + testSelect(1, false, "ID"); + + IntStream.of(2, 6).forEach(i -> { + testSelect(i, false, "_KEY"); + testSelect(i, false, "ID"); + }); + } + + /** */ + @Test + public void testSetOperations() { + execute("SELECT ID, VAL FROM T1 WHERE T1.ID = ? UNION SELECT ID, VAL FROM T1 WHERE T1.ID = ?", + (res) -> { + assertPartitions(partition("T1_CACHE", 123), partition("T1_CACHE", 125)); + assertNodes(node("T1_CACHE", 123), node("T1_CACHE", 125)); + assertEquals(2, res.size()); + assertEquals(ImmutableList.of(123, 125), res.stream().map(row -> (Integer)row.get(0)).sorted() + .collect(Collectors.toList())); + }, + 123, 125 + ); + + execute("SELECT ID, VAL FROM T1 WHERE T1.ID = ? INTERSECT SELECT ID, VAL FROM T1 WHERE T1.ID = ?", + (res) -> { + assertPartitions(partition("T1_CACHE", 123), partition("T1_CACHE", 125)); + assertNodes(node("T1_CACHE", 123), node("T1_CACHE", 125)); + assertEquals(0, res.size()); + }, + 123, 125 + ); + + execute("SELECT ID, VAL FROM T1 WHERE T1.ID = ? UNION SELECT ID, VAL FROM T2 WHERE T2.ID = ?", + (res) -> { + assertPartitions(partition("T1_CACHE", 123), partition("T2_CACHE", 125)); + assertNodes(node("T1_CACHE", 123), node("T2_CACHE", 125)); + assertEquals(2, res.size()); + assertEquals(ImmutableList.of(123, 125), res.stream().map(row -> (Integer)row.get(0)).sorted() + .collect(Collectors.toList())); + }, + 123, 125 + ); + + execute("SELECT ID, VAL FROM T1 WHERE T1.ID = ? INTERSECT SELECT ID, VAL FROM T2 WHERE T2.ID = ?", + (res) -> { + assertPartitions(partition("T1_CACHE", 123), partition("T2_CACHE", 125)); + assertNodes(node("T1_CACHE", 123), node("T2_CACHE", 125)); + assertEquals(0, res.size()); + }, + 123, 125 + ); + + execute("SELECT ID, VAL FROM T1 WHERE T1.ID = ? UNION SELECT ID, VAL FROM DICT WHERE DICT.ID = ?", + (res) -> { + assertPartitions(partition("T1_CACHE", 123)); + assertContainsNodes(node("T1_CACHE", 123)); + + assertEquals(2, res.size()); + assertEquals(ImmutableList.of(123, 125), res.stream().map(row -> (Integer)row.get(0)).sorted() + .collect(Collectors.toList())); + }, + 123, 125 + ); + + execute("SELECT ID, VAL FROM T1 WHERE T1.ID = ? INTERSECT SELECT ID, VAL FROM DICT WHERE DICT.ID = ?", + (res) -> { + assertPartitions(partition("T1_CACHE", 123)); + assertContainsNodes(node("T1_CACHE", 123)); + + assertEquals(0, res.size()); + }, + 123, 125 + ); + } + + /** */ + private void testSelect(int sz, boolean withIn, String column) { + assertTrue(sz >= 1); + int[] values = ThreadLocalRandom.current().ints(0, ENTRIES_COUNT).distinct().limit(sz).toArray(); + + StringBuilder query; + + if (!withIn || sz == 1) + query = new StringBuilder("select * from T1 where "); + else + query = new StringBuilder("select * from T1 where T1.").append(column).append(" in ("); + + for (int i = 0; i < sz; ++i) { + if (!withIn || sz == 1) + query.append("T1.").append(column).append("= ?"); + else + query.append('?'); + + if (sz == 1) + break; + + if (i == sz - 1) + query.append(!withIn ? "" : ")"); + else + query.append(!withIn ? " OR " : ", "); + } + + execute(query.toString(), + res -> { + assertPartitions(IntStream.of(values).map(i -> partition("T1_CACHE", i)).toArray()); + assertNodes(IntStream.of(values).mapToObj(i -> node("T1_CACHE", i)).toArray(ClusterNode[]::new)); + + assertEquals(values.length, res.size()); + + assertEquals( + IntStream.of(values).sorted().boxed().collect(Collectors.toList()), + res.stream().map(row -> row.get(0)).sorted().collect(Collectors.toList()) + ); + }, + IntStream.of(values).boxed().toArray(Integer[]::new)); + } + + /** */ + public void execute(String sql, Consumer>> resConsumer, Object... args) { + log.info(">>> TEST COMBINATION: \"" + sql + "\""); + + // Execute query as is. + log.info("Execute \"" + sql + "\" with args " + Arrays.toString(args)); + + List> res = sql(sql, args); + + resConsumer.accept(res); + clearIntercepted(); + + // Start filling arguments recursively. + if (args != null && args.length > 0) + executeCombinations0(sql, resConsumer, new HashSet<>(), args); + } + + /** */ + private void executeCombinations0( + String sql, + Consumer>> resConsumer, + Set executedSqls, + Object... args + ) { + assert args != null && args.length > 0; + + // Get argument positions. + List paramPoss = new ArrayList<>(); + + int pos = 0; + + while (true) { + int paramPos = sql.indexOf('?', pos); + + if (paramPos == -1) + break; + + paramPoss.add(paramPos); + + pos = paramPos + 1; + } + + for (int i = 0; i < args.length; i++) { + // Prepare new SQL and arguments. + int paramPos = paramPoss.get(i); + + String newSql = sql.substring(0, paramPos) + (args[i] instanceof String ? "'" + args[i] + "'" : args[i]) + + sql.substring(paramPos + 1); + + Object[] newArgs = new Object[args.length - 1]; + + int newArgsPos = 0; + + for (int j = 0; j < args.length; j++) { + if (j != i) + newArgs[newArgsPos++] = args[j]; + } + + // Execute if this combination was never executed before. + if (executedSqls.add(newSql)) { + log.info("Execute sql \"" + newSql + "\""); + + List> res = sql(newSql, newArgs); + + resConsumer.accept(res); + clearIntercepted(); + } + + // Continue recursively. + if (newArgs.length > 0) + executeCombinations0(newSql, resConsumer, executedSqls, newArgs); + } + } + + /** */ + protected static void assertPartitions(int... expParts) { + Collection expParts0 = new TreeSet<>(); + + if (!F.isEmpty(expParts)) { + for (int expPart : expParts) + expParts0.add(expPart); + } + + TreeSet actualParts = new TreeSet<>(INTERCEPTED_PARTS); + + assertEquals("Unexpected partitions [exp=" + expParts0 + ", actual=" + actualParts + ']', + expParts0, actualParts); + } + + /** */ + protected int partition(String cacheName, Object key) { + return client.affinity(cacheName).partition(key); + } + + /** */ + protected int[] allPartitions(String cacheName) { + return IntStream.range(0, client.affinity(cacheName).partitions()).toArray(); + } + + /** */ + protected ClusterNode node(String cacheName, Object key) { + return G.allGrids().stream() + .filter(ign -> ign.affinity(cacheName).isPrimary(ign.cluster().localNode(), key)) + .map(ign -> ign.cluster().localNode()).findFirst().orElse(null); + } + + /** */ + protected ClusterNode[] allNodes(String cacheName) { + return G.allGrids().stream() + .map(ign -> ign.cluster().localNode()) + .filter(n -> client.affinity(cacheName).allPartitions(n).length != 0) + .toArray(ClusterNode[]::new); + } + + /** */ + protected static void assertContainsNodes(ClusterNode... expNodes) { + TreeSet actualNodes = new TreeSet<>(INTERCEPTED_NODES); + + if (!F.isEmpty(expNodes)) { + for (ClusterNode node: expNodes) + assertTrue("Actual nodes doesn't contain node [actual=" + actualNodes + ", node= " + node, + actualNodes.contains(node)); + } + } + + /** */ + protected static void assertNodes(ClusterNode... expNodes) { + Collection expNodes0 = new TreeSet<>(Comparator.comparing(ClusterNode::id)); + + if (!F.isEmpty(expNodes)) + expNodes0.addAll(Arrays.asList(expNodes)); + + TreeSet actualNodes = new TreeSet<>(INTERCEPTED_NODES); + + assertEquals("Unexpected nodes [exp=" + Arrays.toString(expNodes) + ", actual=" + actualNodes + ']', + expNodes0, actualNodes); + } + + /** */ + protected static void clearIntercepted() { + INTERCEPTED_START_REQUEST_COUNT.reset(); + INTERCEPTED_PARTS.clear(); + INTERCEPTED_NODES.clear(); + } +} diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java index e87ac9520ab796..72db75bedcc561 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java @@ -52,6 +52,7 @@ import org.apache.calcite.util.Util; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.failure.FailureContext; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -87,6 +88,7 @@ import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem; import org.apache.ignite.internal.processors.query.calcite.util.Commons; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.ListeningTestLogger; @@ -96,6 +98,7 @@ import org.jetbrains.annotations.Nullable; import org.junit.After; import org.junit.Before; +import org.mockito.Mockito; import static org.apache.calcite.tools.Frameworks.createRootSchema; import static org.apache.calcite.tools.Frameworks.newConfigBuilder; @@ -697,15 +700,25 @@ static class TestTableDescriptor implements CacheTableDescriptor { /** */ private final RelDataType rowType; + /** */ + private final GridCacheContextInfo cacheInfo; + /** */ public TestTableDescriptor(Supplier distribution, RelDataType rowType) { this.distributionSupp = distribution; this.rowType = rowType; + cacheInfo = Mockito.mock(GridCacheContextInfo.class); + + CacheConfiguration cfg = Mockito.mock(CacheConfiguration.class); + Mockito.when(cfg.isEagerTtl()).thenReturn(true); + + Mockito.when(cacheInfo.cacheId()).thenReturn(CU.cacheId("TEST")); + Mockito.when(cacheInfo.config()).thenReturn(cfg); } /** {@inheritDoc} */ @Override public GridCacheContextInfo cacheInfo() { - return null; + return cacheInfo; } /** {@inheritDoc} */ diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java index 597869d237ca1c..fcd8731a19c553 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java @@ -367,7 +367,7 @@ private ExecutionPlan splitPlan(IgniteRel phys) { assertNotNull(plan); - return plan.init(this::intermediateMapping, Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE)); + return plan.init(this::intermediateMapping, null, Commons.mapContext(F.first(nodes), AffinityTopologyVersion.NONE)); } /** diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java index 25178aaef39dc0..c71493ce606e72 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java +++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java @@ -45,6 +45,7 @@ import org.apache.ignite.internal.processors.query.calcite.integration.LocalQueryIntegrationTest; import org.apache.ignite.internal.processors.query.calcite.integration.MemoryQuotasIntegrationTest; import org.apache.ignite.internal.processors.query.calcite.integration.MetadataIntegrationTest; +import org.apache.ignite.internal.processors.query.calcite.integration.PartitionPruneTest; import org.apache.ignite.internal.processors.query.calcite.integration.QueryEngineConfigurationIntegrationTest; import org.apache.ignite.internal.processors.query.calcite.integration.QueryMetadataIntegrationTest; import org.apache.ignite.internal.processors.query.calcite.integration.QueryWithPartitionsIntegrationTest; @@ -125,6 +126,9 @@ DynamicParametersIntegrationTest.class, ExpiredEntriesIntegrationTest.class, TimeoutIntegrationTest.class, + + // Partition pruning + PartitionPruneTest.class }) public class IntegrationTestSuite { }