Skip to content

Commit

Permalink
IGNITE-20428 SQL Calcite: fix query freezes when partitions are set. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ivandasch authored Sep 18, 2023
1 parent 7e51cb9 commit 0080ae9
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.RootNode;
import org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
import org.apache.ignite.internal.processors.query.calcite.prepare.ExecutionPlan;
import org.apache.ignite.internal.processors.query.calcite.prepare.FieldsMetadata;
import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan;
import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.processors.query.running.TrackableQuery;
Expand Down Expand Up @@ -193,14 +194,14 @@ public void mapping() {
/**
* Starts execution phase for the query and setup remote fragments.
*/
public void run(ExecutionContext<RowT> ctx, MultiStepPlan plan, Node<RowT> root) {
public void run(ExecutionContext<RowT> ctx, ExecutionPlan plan, FieldsMetadata metadata, Node<RowT> root) {
synchronized (mux) {
if (state == QueryState.CLOSED)
throw queryCanceledException();

planningTime = U.currentTimeMillis() - startTs;

RootNode<RowT> rootNode = new RootNode<>(ctx, plan.fieldsMetadata().rowType(), this::tryClose);
RootNode<RowT> rootNode = new RootNode<>(ctx, metadata.rowType(), this::tryClose);
rootNode.register(root);

addFragment(new RunningFragment<>(F.first(plan.fragments()).root(), rootNode, ctx));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,14 @@
import org.apache.ignite.internal.processors.query.calcite.message.QueryStartRequest;
import org.apache.ignite.internal.processors.query.calcite.message.QueryStartResponse;
import org.apache.ignite.internal.processors.query.calcite.metadata.AffinityService;
import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationMappingException;
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMappingException;
import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
import org.apache.ignite.internal.processors.query.calcite.metadata.RemoteException;
import org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
import org.apache.ignite.internal.processors.query.calcite.prepare.CacheKey;
import org.apache.ignite.internal.processors.query.calcite.prepare.DdlPlan;
import org.apache.ignite.internal.processors.query.calcite.prepare.ExecutionPlan;
import org.apache.ignite.internal.processors.query.calcite.prepare.ExplainPlan;
import org.apache.ignite.internal.processors.query.calcite.prepare.FieldsMetadataImpl;
import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
Expand Down Expand Up @@ -561,36 +560,25 @@ private ListFieldsQueryCursor<?> mapAndExecutePlan(
) {
qry.mapping();

MappingQueryContext mapCtx = Commons.mapContext(locNodeId, topologyVersion(), qry.context().isLocal());
MappingQueryContext mapCtx = Commons.mapContext(locNodeId, topologyVersion(), qry.context());

plan.init(mappingSvc, mapCtx);
ExecutionPlan execPlan = plan.init(mappingSvc, mapCtx);

List<Fragment> fragments = plan.fragments();

if (!F.isEmpty(qry.context().partitions())) {
fragments = Commons.transform(fragments, f -> {
try {
return f.filterByPartitions(qry.context().partitions());
}
catch (ColocationMappingException e) {
throw new FragmentMappingException("Failed to calculate physical distribution", f, f.root(), e);
}
});
}
List<Fragment> fragments = execPlan.fragments();

// Local execution
Fragment fragment = F.first(fragments);

if (U.assertionsEnabled()) {
assert fragment != null;

FragmentMapping mapping = plan.mapping(fragment);
FragmentMapping mapping = execPlan.mapping(fragment);

assert mapping != null;

List<UUID> nodes = mapping.nodeIds();

assert nodes != null && nodes.size() == 1 && F.first(nodes).equals(localNodeId())
assert nodes != null && (nodes.size() == 1 && F.first(nodes).equals(localNodeId()) || nodes.isEmpty())
: "nodes=" + nodes + ", localNode=" + localNodeId();
}

Expand All @@ -603,9 +591,9 @@ private ListFieldsQueryCursor<?> mapAndExecutePlan(

FragmentDescription fragmentDesc = new FragmentDescription(
fragment.fragmentId(),
plan.mapping(fragment),
plan.target(fragment),
plan.remotes(fragment));
execPlan.mapping(fragment),
execPlan.target(fragment),
execPlan.remotes(fragment));

ExecutionContext<Row> ectx = new ExecutionContext<>(
qry.context(),
Expand All @@ -624,7 +612,7 @@ private ListFieldsQueryCursor<?> mapAndExecutePlan(
Node<Row> node = new LogicalRelImplementor<>(ectx, partitionService(), mailboxRegistry(),
exchangeService(), failureProcessor()).go(fragment.root());

qry.run(ectx, plan, node);
qry.run(ectx, execPlan, plan.fieldsMetadata(), node);

Map<UUID, Long> fragmentsPerNode = fragments.stream()
.skip(1)
Expand All @@ -636,9 +624,9 @@ private ListFieldsQueryCursor<?> mapAndExecutePlan(
fragment = fragments.get(i);
fragmentDesc = new FragmentDescription(
fragment.fragmentId(),
plan.mapping(fragment),
plan.target(fragment),
plan.remotes(fragment));
execPlan.mapping(fragment),
execPlan.target(fragment),
execPlan.remotes(fragment));

Throwable ex = null;
byte[] parametersMarshalled = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,12 @@

package org.apache.ignite.internal.processors.query.calcite.prepare;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationMappingException;
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMappingException;
import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;

/**
Expand All @@ -44,9 +38,6 @@ public abstract class AbstractMultiStepPlan extends AbstractQueryPlan implements
/** */
protected final QueryTemplate queryTemplate;

/** */
protected ExecutionPlan executionPlan;

/** */
private final String textPlan;

Expand All @@ -66,11 +57,6 @@ protected AbstractMultiStepPlan(
this.paramsMetadata = paramsMetadata;
}

/** {@inheritDoc} */
@Override public List<Fragment> fragments() {
return Objects.requireNonNull(executionPlan).fragments();
}

/** {@inheritDoc} */
@Override public FieldsMetadata fieldsMetadata() {
return fieldsMetadata;
Expand All @@ -82,47 +68,25 @@ protected AbstractMultiStepPlan(
}

/** {@inheritDoc} */
@Override public FragmentMapping mapping(Fragment fragment) {
return fragment.mapping();
}

/** {@inheritDoc} */
@Override public ColocationGroup target(Fragment fragment) {
if (fragment.rootFragment())
return null;

IgniteSender sender = (IgniteSender)fragment.root();
return mapping(sender.targetFragmentId()).findGroup(sender.exchangeId());
}

/** {@inheritDoc} */
@Override public Map<Long, List<UUID>> remotes(Fragment fragment) {
List<IgniteReceiver> remotes = fragment.remotes();
@Override public ExecutionPlan init(MappingService mappingService, MappingQueryContext ctx) {
ExecutionPlan executionPlan0 = queryTemplate.map(mappingService, ctx);

if (F.isEmpty(remotes))
return null;
if (!F.isEmpty(ctx.partitions()) && !F.isEmpty(executionPlan0.fragments())) {
List<Fragment> fragments = executionPlan0.fragments();

HashMap<Long, List<UUID>> res = U.newHashMap(remotes.size());
fragments = Commons.transform(fragments, f -> {
try {
return f.filterByPartitions(ctx.partitions());
}
catch (ColocationMappingException e) {
throw new FragmentMappingException("Failed to calculate physical distribution", f, f.root(), e);
}
});

for (IgniteReceiver remote : remotes)
res.put(remote.exchangeId(), mapping(remote.sourceFragmentId()).nodeIds());

return res;
}
return new ExecutionPlan(executionPlan0.topologyVersion(), fragments);
}

/** {@inheritDoc} */
@Override public void init(MappingService mappingService, MappingQueryContext ctx) {
executionPlan = queryTemplate.map(mappingService, ctx);
}

/** */
private FragmentMapping mapping(long fragmentId) {
return Objects.requireNonNull(executionPlan).fragments().stream()
.filter(f -> f.fragmentId() == fragmentId)
.findAny().orElseThrow(() -> new IllegalStateException("Cannot find fragment with given ID. [" +
"fragmentId=" + fragmentId + ", " +
"fragments=" + fragments() + "]"))
.mapping();
return executionPlan0;
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,23 @@

package org.apache.ignite.internal.processors.query.calcite.prepare;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import com.google.common.collect.ImmutableList;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;

/**
*
*/
class ExecutionPlan {
public class ExecutionPlan {
/** */
private final AffinityTopologyVersion ver;

Expand All @@ -46,4 +55,42 @@ public AffinityTopologyVersion topologyVersion() {
public List<Fragment> fragments() {
return fragments;
}

/** */
public FragmentMapping mapping(Fragment fragment) {
return fragment.mapping();
}

/** */
public ColocationGroup target(Fragment fragment) {
if (fragment.rootFragment())
return null;

IgniteSender sender = (IgniteSender)fragment.root();
return mapping(sender.targetFragmentId()).findGroup(sender.exchangeId());
}

/** */
public Map<Long, List<UUID>> remotes(Fragment fragment) {
List<IgniteReceiver> remotes = fragment.remotes();

if (F.isEmpty(remotes))
return null;

HashMap<Long, List<UUID>> res = U.newHashMap(remotes.size());

for (IgniteReceiver remote : remotes)
res.put(remote.exchangeId(), mapping(remote.sourceFragmentId()).nodeIds());

return res;
}

/** */
private FragmentMapping mapping(long fragmentId) {
return fragments().stream()
.filter(f -> f.fragmentId() == fragmentId)
.findAny().orElseThrow(() -> new IllegalStateException("Cannot find fragment with given ID. [" +
"fragmentId=" + fragmentId + ", " + "fragments=" + fragments() + "]"))
.mapping();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,20 @@ public class MappingQueryContext {
/** */
private final boolean isLocal;

/** */
private final int[] parts;

/** */
public MappingQueryContext(UUID locNodeId, AffinityTopologyVersion topVer) {
this(locNodeId, topVer, false);
this(locNodeId, topVer, false, null);
}

/** */
public MappingQueryContext(UUID locNodeId, AffinityTopologyVersion topVer, boolean isLocal) {
public MappingQueryContext(UUID locNodeId, AffinityTopologyVersion topVer, boolean isLocal, int[] parts) {
this.locNodeId = locNodeId;
this.topVer = topVer;
this.isLocal = isLocal;
this.parts = parts;
}

/** */
Expand All @@ -68,6 +72,11 @@ public boolean isLocal() {
return isLocal;
}

/** */
public int[] partitions() {
return parts;
}

/** Creates a cluster. */
RelOptCluster cluster() {
if (cluster == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,12 @@

package org.apache.ignite.internal.processors.query.calcite.prepare;

import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;

/**
* Regular query or DML
*/
public interface MultiStepPlan extends QueryPlan {
/**
* @return Query fragments.
*/
List<Fragment> fragments();

/**
* @return Fields metadata.
*/
Expand All @@ -43,24 +33,12 @@ public interface MultiStepPlan extends QueryPlan {
*/
FieldsMetadata paramsMetadata();

/**
* @param fragment Fragment.
* @return Mapping for a given fragment.
*/
FragmentMapping mapping(Fragment fragment);

/** */
ColocationGroup target(Fragment fragment);

/** */
Map<Long, List<UUID>> remotes(Fragment fragment);

/**
* Inits query fragments.
*
* @param ctx Planner context.
*/
void init(MappingService mappingService, MappingQueryContext ctx);
ExecutionPlan init(MappingService mappingService, MappingQueryContext ctx);

/**
* @return Text representation of query plan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,11 +444,11 @@ public static IgniteTypeFactory typeFactory() {

/** */
public static MappingQueryContext mapContext(UUID locNodeId, AffinityTopologyVersion topVer) {
return mapContext(locNodeId, topVer, false);
return new MappingQueryContext(locNodeId, topVer, false, null);
}

/** */
public static MappingQueryContext mapContext(UUID locNodeId, AffinityTopologyVersion topVer, boolean isLocal) {
return new MappingQueryContext(locNodeId, topVer, isLocal);
public static MappingQueryContext mapContext(UUID locNodeId, AffinityTopologyVersion topVer, BaseQueryContext ctx) {
return new MappingQueryContext(locNodeId, topVer, ctx.isLocal(), ctx.partitions());
}
}
Loading

0 comments on commit 0080ae9

Please sign in to comment.