Skip to content

Commit

Permalink
IGNITE-12455 Minors.
Browse files Browse the repository at this point in the history
  • Loading branch information
ivandasch committed Sep 27, 2023
1 parent 5b0ccf5 commit 5ef268e
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

package org.apache.ignite.internal.processors.query.calcite.exec;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Primitives;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
Expand All @@ -45,6 +46,7 @@
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteUnionAll;
import org.apache.ignite.internal.processors.query.calcite.rel.ProjectableFilterableTableScan;
import org.apache.ignite.internal.processors.query.calcite.schema.CacheTableDescriptor;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
Expand All @@ -57,7 +59,7 @@ public class PartitionExtractor extends IgniteRelShuttle {
private final IgniteTypeFactory typeFactory;

/** */
private PartitionNode partNode;
private final Deque<PartitionNode> stack = new ArrayDeque<>();

/** */
public PartitionExtractor(IgniteTypeFactory typeFactory) {
Expand All @@ -78,6 +80,22 @@ public PartitionExtractor(IgniteTypeFactory typeFactory) {
return rel;
}

/** {@inheritDoc} */
@Override protected IgniteRel processNode(IgniteRel rel) {
IgniteRel res = super.processNode(rel);

List<PartitionNode> operands = new ArrayList<>();
for (int i = 0; i < rel.getInputs().size(); ++i)
operands.add(stack.pop());

if (rel instanceof IgniteUnionAll)
stack.push(PartitionOperandNode.createOrOperandNode(operands));
else
stack.push(PartitionOperandNode.createAndOperandNode(operands));

return res;
}

/** */
public PartitionNode go(Fragment fragment) {
if (!(fragment.root() instanceof IgniteSender))
Expand All @@ -88,48 +106,37 @@ public PartitionNode go(Fragment fragment) {

visit(fragment.root());

return partNode != null ? partNode : PartitionAllNode.INSTANCE;
return stack.isEmpty() ? PartitionAllNode.INSTANCE : stack.pop();
}

/** */
private void processScan(IgniteRel rel) {
if (partNode == PartitionNoneNode.INSTANCE)
return;

assert rel instanceof ProjectableFilterableTableScan;

RexNode condition = ((ProjectableFilterableTableScan)rel).condition();

IgniteTable tbl = rel.getTable().unwrap(IgniteTable.class);

if (!tbl.descriptor().distribution().function().affinity() || tbl.distribution().getKeys().size() != 1) {
partNode = PartitionAllNode.INSTANCE;
if (!rel.distribution().function().affinity() || rel.distribution().getKeys().size() != 1) {
stack.push(PartitionAllNode.INSTANCE);

return;
}

ImmutableIntList keys = ImmutableIntList.copyOf(rel.distribution().getKeys());
IgniteTable tbl = rel.getTable().unwrap(IgniteTable.class);
RelDataType rowType = tbl.getRowType(typeFactory);
int cacheId = ((CacheTableDescriptor)tbl.descriptor()).cacheInfo().cacheId();

ImmutableIntList keys = ImmutableIntList.copyOf(rel.distribution().getKeys());
List<Class<?>> types = new ArrayList<>(rowType.getFieldCount());

for (RelDataTypeField field : rowType.getFieldList()) {
if (QueryUtils.KEY_FIELD_NAME.equals(field.getName()))
keys = keys.append(field.getIndex());

types.add(Primitives.wrap((Class<?>)typeFactory.getJavaClass(field.getType())));
}

int cacheId = ((CacheTableDescriptor)tbl.descriptor()).cacheInfo().cacheId();

PartitionNode partNode0 = processCondition(condition, types, keys, cacheId);
PartitionNode partNode = processCondition(condition, types, keys, cacheId);

if (partNode == null) {
partNode = partNode0;
}
else {
partNode = PartitionOperandNode.createAndOperandNode(ImmutableList.of(partNode0, partNode));
}
stack.push(partNode);
}

/** */
Expand All @@ -141,7 +148,7 @@ private PartitionNode processCondition(RexNode condition, List<Class<?>> types,
List<RexNode> operands = ((RexCall)condition).getOperands();

switch (opKind) {
case IS_NULL:
case IS_NULL: {
RexNode left = operands.get(0);

if (!left.isA(SqlKind.LOCAL_REF))
Expand All @@ -153,12 +160,12 @@ private PartitionNode processCondition(RexNode condition, List<Class<?>> types,
return PartitionAllNode.INSTANCE;
else
return PartitionNoneNode.INSTANCE;

case EQUALS:
}
case EQUALS: {
if (operands.size() != 2)
return PartitionAllNode.INSTANCE;

left = operands.get(0);
RexNode left = operands.get(0);
RexNode right = operands.get(1);

if (!left.isA(SqlKind.LOCAL_REF))
Expand All @@ -167,7 +174,7 @@ private PartitionNode processCondition(RexNode condition, List<Class<?>> types,
if (!right.isA(SqlKind.LITERAL) && !right.isA(SqlKind.DYNAMIC_PARAM))
return PartitionAllNode.INSTANCE;

idx = ((RexLocalRef)left).getIndex();
int idx = ((RexLocalRef)left).getIndex();

if (!keys.contains(idx))
return PartitionAllNode.INSTANCE;
Expand All @@ -178,6 +185,7 @@ private PartitionNode processCondition(RexNode condition, List<Class<?>> types,
return new PartitionLiteralNode(cacheId, (RexLiteral)right, fldType);
else
return new PartitionParameterNode(cacheId, (RexDynamicParam)right, fldType);
}
case SEARCH:
RexNode condition0 = RexUtil.expandSearch(Commons.emptyCluster().getRexBuilder(), null, condition);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ public class PartitionPruneTest extends AbstractBasicIntegrationTest {
assertEquals(ENTRIES_COUNT, client.getOrCreateCache(tableName + "_CACHE").size(CachePeekMode.PRIMARY));
});

sql("ANALYZE PUBLIC.T1(ID), PUBLIC.T2(ID,AK), PUBLIC.DICT(ID) WITH \"NULLS=0,DISTINCT=10000,TOTAL=10000\"");

clearIntercepted();
}

Expand All @@ -176,6 +178,16 @@ public class PartitionPruneTest extends AbstractBasicIntegrationTest {
/** */
@Test
public void testSimple() {
execute("select count(*) from T1 where T1.ID = ?",
res -> {
assertPartitions(partition("T1_CACHE", 123));
assertNodes(node("T1_CACHE", 123));

assertEquals(1, res.size());
assertEquals(1L, res.get(0).get(0));
},
123);

execute("select * from T1 where T1.ID = ?",
res -> {
assertPartitions(partition("T1_CACHE", 123));
Expand Down Expand Up @@ -317,17 +329,6 @@ public void testSelectOr() {
@Test
public void testSimpleJoin() {
// Key (not alias).
// execute("SELECT * FROM T1 INNER JOIN DICT ON T1.ID = DICT.ID WHERE T1.ID = ?",
// (res) -> {
// assertPartitions(partition("T1_CACHE", 123));
// assertNodes(node("T1_CACHE", 123));
// assertEquals(1, res.size());
// assertEquals(123, res.get(0).get(0));
// },
// 123
// );

// Key (not alias).
execute("SELECT * FROM T1 INNER JOIN T2 ON T1.ID = T2.AK WHERE T1.ID = ?",
(res) -> {
assertPartitions(partition("T1_CACHE", 123));
Expand Down Expand Up @@ -402,7 +403,7 @@ private void testSelect(int sz, boolean withIn, String column) {
assertPartitions(IntStream.of(values).map(i -> partition("T1_CACHE", i)).toArray());
assertNodes(IntStream.of(values).mapToObj(i -> node("T1_CACHE", i)).toArray(ClusterNode[]::new));

assertTrue(res.size() == values.length);
assertEquals(values.length, res.size());

assertEquals(
IntStream.of(values).sorted().boxed().collect(Collectors.toList()),
Expand Down

0 comments on commit 5ef268e

Please sign in to comment.