Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
ivandasch committed Sep 15, 2023
1 parent 50867c0 commit 7a99cc8
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -565,10 +565,8 @@ private ListFieldsQueryCursor<?> mapAndExecutePlan(

plan.init(mappingSvc, mapCtx);

List<Fragment> fragments = plan.fragments();

if (!F.isEmpty(qry.context().partitions())) {
fragments = Commons.transform(fragments, f -> {
plan.transformFragments(f -> {
try {
return f.filterByPartitions(qry.context().partitions());
}
Expand All @@ -578,39 +576,20 @@ private ListFieldsQueryCursor<?> mapAndExecutePlan(
});
}
else if (!qry.context().isLocal()) {
// ImmutableList.Builder<Fragment> fragmentBuilder = ImmutableList.builder();
// //fragmentBuilder.add(F.first(fragments));
//
// for (int i = 0; i < fragments.size(); ++i) {
// int[] parts = new PartitionExtractor(partSvc, qry.context().typeFactory(),
// Commons.parametersMap(qry.parameters())).go(fragments.get(i));
//
// if (F.isEmpty(parts))
// fragmentBuilder.add(fragments.get(i));
// else {
// Fragment f = fragments.get(i);
//
// try {
// fragmentBuilder.add(f.filterByPartitions(parts));
// }
// catch (ColocationMappingException e) {
// throw new FragmentMappingException("Failed to calculate physical distribution", f, f.root(), e);
// }
// }
// }
//
// fragments = fragmentBuilder.build();

fragments = Commons.transform(fragments, f -> {
plan.transformFragments(f -> {
int[] parts = new PartitionExtractor(partSvc, qry.context().typeFactory(),
Commons.parametersMap(qry.parameters())).go(f);

//if (F.isEmpty(parts))
return f;

try {
return f.filterByPartitions(parts);
}
catch (ColocationMappingException e) {
throw new FragmentMappingException("Failed to calculate physical distribution", f, f.root(), e);
}
});
}

List<Fragment> fragments = plan.fragments();

// Local execution
Fragment fragment = F.first(fragments);

Expand All @@ -623,7 +602,7 @@ else if (!qry.context().isLocal()) {

List<UUID> nodes = mapping.nodeIds();

assert nodes != null && nodes.size() == 1 && F.first(nodes).equals(localNodeId())
assert nodes != null && (nodes.size() == 1 && F.first(nodes).equals(localNodeId()) || nodes.isEmpty())
: "nodes=" + nodes + ", localNode=" + localNodeId();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.function.Function;
import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/**
Expand Down Expand Up @@ -129,4 +132,16 @@ private FragmentMapping mapping(long fragmentId) {
@Override public String textPlan() {
return textPlan;
}

/** {@inheritDoc} */
@Override public void transformFragments(@NotNull Function<Fragment, Fragment> clo) {
if (executionPlan != null) {
List<Fragment> fragments = executionPlan.fragments();

if (F.isEmpty(fragments))
return;

executionPlan = new ExecutionPlan(executionPlan.topologyVersion(), Commons.transform(fragments, clo));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
import org.jetbrains.annotations.NotNull;

/**
* Regular query or DML
Expand Down Expand Up @@ -66,4 +68,11 @@ public interface MultiStepPlan extends QueryPlan {
* @return Text representation of query plan
*/
String textPlan();

/**
* Transform fragments of plan.
*
* @param clo Mapping closure to apply on.
*/
void transformFragments(@NotNull Function<Fragment, Fragment> clo);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.query.calcite.integration;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Stream;
import org.apache.ignite.IgniteException;
Expand Down Expand Up @@ -70,7 +69,7 @@ public class PartitionPruneTest extends AbstractBasicIntegrationTest {
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();

sql("CREATE TABLE T1(ID INT, IDX_VAL VARCHAR, VAL VARCHAR, PRIMARY KEY(ID, IDX_VAL)) WITH cache_name=t1_cache,backups=1");
sql("CREATE TABLE T1(ID INT, IDX_VAL VARCHAR, VAL VARCHAR, PRIMARY KEY(ID)) WITH cache_name=t1_cache,backups=1");
sql("CREATE TABLE T2(ID INT, T1_ID INT, IDX_VAL VARCHAR, VAL VARCHAR, PRIMARY KEY(ID, T1_ID)) WITH " +
"cache_name=t2_cache,backups=1,affinity_key=t1_id");
sql("CREATE TABLE DICT(ID INT PRIMARY KEY, IDX_VAL VARCHAR, VAL VARCHAR) WITH template=replicated,cache_name=dict_cache");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Ints;
import org.apache.calcite.util.Pair;
import org.apache.ignite.IgniteCache;
Expand Down Expand Up @@ -53,12 +52,16 @@ public class QueryWithPartitionsIntegrationTest extends AbstractBasicIntegration
public boolean local;

/** */
@Parameterized.Parameters(name = "local = {0}")
@Parameterized.Parameter(1)
public int partSz;

/** */
@Parameterized.Parameters(name = "local = {0}, partSz = {1}")
public static List<Object[]> parameters() {
return ImmutableList.of(
new Object[]{true},
new Object[]{false}
);
return Stream.of(true, false)
.flatMap(isLocal -> Stream.of(1, 2, 5, 10, 20)
.map(i -> new Object[]{isLocal, i}))
.collect(Collectors.toList());
}

/** {@inheritDoc} */
Expand All @@ -67,7 +70,7 @@ public static List<Object[]> parameters() {

List<Integer> parts0 = IntStream.range(0, 1024).boxed().collect(Collectors.toList());
Collections.shuffle(parts0);
parts = Ints.toArray(parts0.subList(0, 20));
parts = Ints.toArray(parts0.subList(0, partSz));

log.info("Running tests with parts=" + Arrays.toString(parts));
}
Expand Down Expand Up @@ -187,7 +190,7 @@ public void testJoinReplicated() {
/** */
private void testJoin(String table1, String table2, String joinCol) {
String sqlStr = "select * from " + table1 + " join " + table2 +
" on " + table1 + "." + joinCol + "=" + table2 + "." + joinCol;
" on " + table1 + "." + joinCol + "=" + table2 + "." + joinCol + " order by " + table1 + ".id";

List<?> res = sql(sqlStr);

Expand Down

0 comments on commit 7a99cc8

Please sign in to comment.