Skip to content

Commit

Permalink
Add TemporaryTableInfo to tablescan and tablewrite planNodes
Browse files Browse the repository at this point in the history
  • Loading branch information
jaystarshot committed Dec 3, 2024
1 parent 4479bcc commit 170b377
Show file tree
Hide file tree
Showing 45 changed files with 183 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ public PlanNode visitFilter(FilterNode node, RewriteContext<Void> context)
oldTableScanNode.getAssignments(),
oldTableScanNode.getTableConstraints(),
oldTableScanNode.getCurrentConstraint(),
oldTableScanNode.getEnforcedConstraint());
oldTableScanNode.getEnforcedConstraint(),
oldTableScanNode.getTemporaryTableInfo());

return new FilterNode(node.getSourceLocation(), idAllocator.getNextId(), newTableScanNode, node.getPredicate());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ private Optional<PlanNode> tryCreatingNewScanNode(PlanNode plan)
ImmutableList.copyOf(assignments.keySet()),
assignments.entrySet().stream().collect(toImmutableMap(Map.Entry::getKey, (e) -> (ColumnHandle) (e.getValue()))),
tableScanNode.getCurrentConstraint(),
tableScanNode.getEnforcedConstraint()));
tableScanNode.getEnforcedConstraint(),
tableScanNode.getTemporaryTableInfo()));
}

@Override
Expand Down Expand Up @@ -288,7 +289,8 @@ public PlanNode visitFilter(FilterNode node, Void context)
oldTableScanNode.getOutputVariables(),
oldTableScanNode.getAssignments(),
oldTableScanNode.getCurrentConstraint(),
oldTableScanNode.getEnforcedConstraint());
oldTableScanNode.getEnforcedConstraint(),
oldTableScanNode.getTemporaryTableInfo());

return new FilterNode(node.getSourceLocation(), idAllocator.getNextId(), newTableScanNode, node.getPredicate());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ private Optional<PlanNode> tryCreatingNewScanNode(PlanNode plan)
assignments.entrySet().stream().collect(toImmutableMap(Map.Entry::getKey, (e) -> (ColumnHandle) (e.getValue()))),
tableScanNode.getTableConstraints(),
tableScanNode.getCurrentConstraint(),
tableScanNode.getEnforcedConstraint()));
tableScanNode.getEnforcedConstraint(),
tableScanNode.getTemporaryTableInfo()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ protected TableScanNode tableScan(PlanBuilder planBuilder, DruidTableHandle conn
variables,
assignments.build(),
TupleDomain.all(),
TupleDomain.all());
TupleDomain.all(), Optional.empty());
}

protected FilterNode filter(PlanBuilder planBuilder, PlanNode source, RowExpression predicate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,8 @@ private static TableScanNode getTableScanNode(
tableScan.getAssignments(),
tableScan.getTableConstraints(),
pushdownFilterResult.getLayout().getPredicate(),
TupleDomain.all());
TupleDomain.all(),
tableScan.getTemporaryTableInfo());
}

private static ExtractionResult intersectExtractionResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public PlanNode visitTableScan(TableScanNode tableScan, RewriteContext<Void> con
tableScan.getAssignments(),
tableScan.getTableConstraints(),
tableScan.getCurrentConstraint(),
tableScan.getEnforcedConstraint());
tableScan.getEnforcedConstraint(),
tableScan.getTemporaryTableInfo());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,8 @@ private Optional<PlanNode> tryPartialAggregationPushdown(PlanNode plan)
ImmutableMap.copyOf(assignments),
oldTableScanNode.getTableConstraints(),
oldTableScanNode.getCurrentConstraint(),
oldTableScanNode.getEnforcedConstraint()));
oldTableScanNode.getEnforcedConstraint(),
oldTableScanNode.getTemporaryTableInfo()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,8 @@ private TableScanNode createDeletesTableScan(ImmutableMap<VariableReferenceExpre
outputs,
deleteColumnAssignments,
TupleDomain.all(),
TupleDomain.all());
TupleDomain.all(),
Optional.empty());
}

/**
Expand Down Expand Up @@ -382,7 +383,8 @@ private TableScanNode createNewRoot(TableScanNode node, IcebergTableHandle icebe
assignmentsBuilder.build(),
node.getTableConstraints(),
node.getCurrentConstraint(),
node.getEnforcedConstraint());
node.getEnforcedConstraint(),
node.getTemporaryTableInfo());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,8 @@ public PlanNode visitFilter(FilterNode filter, RewriteContext<Void> context)
.intersect(tableScan.getCurrentConstraint()),
predicateNotChangedBySimplification ?
identityPartitionColumnPredicate.intersect(tableScan.getEnforcedConstraint()) :
tableScan.getEnforcedConstraint());
tableScan.getEnforcedConstraint(),
tableScan.getTemporaryTableInfo());

if (TRUE_CONSTANT.equals(remainingFilterExpression) && predicateNotChangedBySimplification) {
return newTableScan;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import com.facebook.presto.spi.plan.TableFinishNode;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.plan.TableWriterNode;
import com.facebook.presto.spi.plan.TemporaryTableInfo;
import com.facebook.presto.spi.relation.CallExpression;
import com.facebook.presto.spi.relation.ConstantExpression;
import com.facebook.presto.spi.relation.RowExpression;
Expand Down Expand Up @@ -82,6 +83,7 @@
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Iterables.concat;
import static java.lang.String.format;
import static java.util.Collections.emptyList;
import static java.util.function.Function.identity;

// Planner Util for creating temporary tables
Expand All @@ -99,7 +101,8 @@ public static TableScanNode createTemporaryTableScan(
TableHandle tableHandle,
List<VariableReferenceExpression> outputVariables,
Map<VariableReferenceExpression, ColumnMetadata> variableToColumnMap,
Optional<PartitioningMetadata> expectedPartitioningMetadata)
Optional<PartitioningMetadata> expectedPartitioningMetadata,
Optional<String> cteId)
{
Map<String, ColumnHandle> columnHandles = metadata.getColumnHandles(session, tableHandle);
Map<VariableReferenceExpression, ColumnMetadata> outputColumns = outputVariables.stream()
Expand All @@ -126,11 +129,14 @@ public static TableScanNode createTemporaryTableScan(
return new TableScanNode(
sourceLocation,
idAllocator.getNextId(),
Optional.empty(),
selectedLayout.getLayout().getNewTableHandle(),
outputVariables,
assignments,
emptyList(),
TupleDomain.all(),
TupleDomain.all(),
TupleDomain.all());
cteId.map(TemporaryTableInfo::new));
}

public static Map<VariableReferenceExpression, ColumnMetadata> assignTemporaryTableColumnNames(Collection<VariableReferenceExpression> outputVariables,
Expand Down Expand Up @@ -181,7 +187,8 @@ public static TableFinishNode createTemporaryTableWriteWithoutExchanges(
TableHandle tableHandle,
List<VariableReferenceExpression> outputs,
Map<VariableReferenceExpression, ColumnMetadata> variableToColumnMap,
VariableReferenceExpression outputVar)
VariableReferenceExpression outputVar,
Optional<String> cteId)
{
SchemaTableName schemaTableName = metadata.getTableMetadata(session, tableHandle).getTable();
TableWriterNode.InsertReference insertReference = new TableWriterNode.InsertReference(tableHandle, schemaTableName);
Expand Down Expand Up @@ -211,11 +218,12 @@ public static TableFinishNode createTemporaryTableWriteWithoutExchanges(
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.of(Boolean.TRUE)),
cteId.map(TemporaryTableInfo::new)),
Optional.of(insertReference),
outputVar,
Optional.empty(),
Optional.empty());
Optional.empty(),
cteId.map(TemporaryTableInfo::new));
}

public static TableFinishNode createTemporaryTableWriteWithExchanges(
Expand Down Expand Up @@ -347,12 +355,11 @@ public static TableFinishNode createTemporaryTableWriteWithExchanges(
Optional.of(partitioningScheme),
enableStatsCollectionForTemporaryTable ? Optional.of(localAggregations.getPartialAggregation()) : Optional.empty(),
Optional.empty(),
Optional.of(Boolean.TRUE))),
Optional.empty())),
variableAllocator.newVariable("intermediaterows", BIGINT),
variableAllocator.newVariable("intermediatefragments", VARBINARY),
variableAllocator.newVariable("intermediatetablecommitcontext", VARBINARY),
enableStatsCollectionForTemporaryTable ? Optional.of(localAggregations.getIntermediateAggregation()) : Optional.empty());

return new TableFinishNode(
sourceLocation,
idAllocator.getNextId(),
Expand All @@ -363,7 +370,8 @@ public static TableFinishNode createTemporaryTableWriteWithExchanges(
Optional.of(insertReference),
variableAllocator.newVariable("rows", BIGINT),
enableStatsCollectionForTemporaryTable ? Optional.of(aggregations.getFinalAggregation()) : Optional.empty(),
enableStatsCollectionForTemporaryTable ? Optional.of(statisticsResult.getDescriptor()) : Optional.empty());
enableStatsCollectionForTemporaryTable ? Optional.of(statisticsResult.getDescriptor()) : Optional.empty(),
Optional.empty());
}

public static StatisticAggregations.Parts splitIntoPartialAndFinal(StatisticAggregations statisticAggregations, VariableAllocator variableAllocator, FunctionAndTypeManager functionAndTypeManager)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,8 @@ private PlanNode createRemoteMaterializedExchange(ExchangeNode exchange, Rewrite
temporaryTableHandle,
exchange.getOutputVariables(),
variableToColumnMap,
Optional.of(partitioningMetadata));
Optional.of(partitioningMetadata),
Optional.empty());

checkArgument(
!exchange.getPartitioningScheme().isReplicateNullsAndAny(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ public Optional<PlanNode> visitTableFinish(TableFinishNode node, Context context
node.getTarget().map(target -> CanonicalWriterTarget.from(target)),
node.getRowCountVariable(),
Optional.empty(),
Optional.empty());
Optional.empty(),
node.getTemporaryTableInfo());
context.addPlan(node, new CanonicalPlan(result, strategy));
return Optional.of(result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ private RelationPlan createAnalyzePlan(Analysis analysis, Analyze analyzeStateme
.putAll(tableScanOutputs.stream().collect(toImmutableMap(identity(), identity())))
.putAll(tableStatisticAggregation.getAdditionalVariables())
.build();
TableScanNode scanNode = new TableScanNode(getSourceLocation(analyzeStatement), idAllocator.getNextId(), targetTable, tableScanOutputs, variableToColumnHandle.build(), TupleDomain.all(), TupleDomain.all());
TableScanNode scanNode = new TableScanNode(getSourceLocation(analyzeStatement), idAllocator.getNextId(), targetTable, tableScanOutputs, variableToColumnHandle.build(), TupleDomain.all(), TupleDomain.all(), Optional.empty());
PlanNode project = PlannerUtils.addProjections(scanNode, idAllocator, assignments);
PlanNode planNode = new StatisticsWriterNode(
getSourceLocation(analyzeStatement),
Expand Down Expand Up @@ -435,13 +435,14 @@ private RelationPlan createTableWriterPlan(
// the data consumed by the TableWriteOperator
Optional.of(aggregations.getPartialAggregation()),
Optional.empty(),
Optional.of(Boolean.FALSE)),
Optional.empty()),
Optional.of(target),
variableAllocator.newVariable("rows", BIGINT),
// final aggregation is run within the TableFinishOperator to summarize collected statistics
// by the partial aggregation from all of the writer nodes
Optional.of(aggregations.getFinalAggregation()),
Optional.of(result.getDescriptor()));
Optional.of(result.getDescriptor()),
Optional.empty());

return new RelationPlan(commitNode, analysis.getRootScope(), commitNode.getOutputVariables());
}
Expand All @@ -463,10 +464,11 @@ private RelationPlan createTableWriterPlan(
tablePartitioningScheme,
Optional.empty(),
Optional.empty(),
Optional.of(Boolean.FALSE)),
Optional.empty()),
Optional.of(target),
variableAllocator.newVariable("rows", BIGINT),
Optional.empty(),
Optional.empty(),
Optional.empty());
return new RelationPlan(commitNode, analysis.getRootScope(), commitNode.getOutputVariables());
}
Expand All @@ -486,6 +488,7 @@ private RelationPlan createDeletePlan(Analysis analysis, Delete node)
Optional.of(deleteHandle),
variableAllocator.newVariable("rows", BIGINT),
Optional.empty(),
Optional.empty(),
Optional.empty());

return new RelationPlan(commitNode, analysis.getScope(node), commitNode.getOutputVariables());
Expand Down Expand Up @@ -527,6 +530,7 @@ private RelationPlan createUpdatePlan(Analysis analysis, Update node)
Optional.of(updateTarget),
variableAllocator.newVariable("rows", BIGINT),
Optional.empty(),
Optional.empty(),
Optional.empty());

return new RelationPlan(commitNode, analysis.getScope(node), commitNode.getOutputVariables());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ public static Set<PlanNodeId> getOutputTableWriterNodeIds(PlanNode plan)
return stream(forTree(PlanNode::getSources).depthFirstPreOrder(plan))
.filter(node -> node instanceof TableWriterNode)
.map(node -> (TableWriterNode) node)
.filter(tableWriterNode -> !tableWriterNode.getIsTemporaryTableWriter().orElse(false))
.filter(tableWriterNode -> !tableWriterNode.getTemporaryTableInfo().isPresent())
.map(TableWriterNode::getId)
.collect(toImmutableSet());
}
Expand Down Expand Up @@ -304,7 +304,8 @@ public PlanNode visitTableScan(TableScanNode node, RewriteContext<Void> context)
node.getOutputVariables(),
node.getAssignments(),
node.getCurrentConstraint(),
node.getEnforcedConstraint());
node.getEnforcedConstraint(),
node.getTemporaryTableInfo());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ private static TableScanNode cloneTableScan(TableScanNode scanNode, Session sess
newAssignments,
scanNode.getTableConstraints(),
scanNode.getCurrentConstraint(),
scanNode.getEnforcedConstraint());
scanNode.getEnforcedConstraint(), scanNode.getTemporaryTableInfo());
}

public static PlanNode clonePlanNode(PlanNode planNode, Session session, Metadata metadata, PlanNodeIdAllocator planNodeIdAllocator, List<VariableReferenceExpression> fieldsToKeep, Map<VariableReferenceExpression, VariableReferenceExpression> varMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ public DeleteNode plan(Delete node)

// create table scan
List<VariableReferenceExpression> outputVariables = outputVariablesBuilder.build();
PlanNode tableScan = new TableScanNode(getSourceLocation(node), idAllocator.getNextId(), handle, outputVariables, columns.build(), TupleDomain.all(), TupleDomain.all());
PlanNode tableScan = new TableScanNode(getSourceLocation(node), idAllocator.getNextId(), handle, outputVariables, columns.build(), TupleDomain.all(), TupleDomain.all(), Optional.empty());
Scope scope = Scope.builder().withRelationType(RelationId.anonymous(), new RelationType(fields.build())).build();
RelationPlan relationPlan = new RelationPlan(tableScan, scope, outputVariables);

Expand Down Expand Up @@ -344,7 +344,7 @@ public UpdateNode plan(Update node)

// create table scan
List<VariableReferenceExpression> outputVariables = outputVariablesBuilder.build();
PlanNode tableScan = new TableScanNode(getSourceLocation(node), idAllocator.getNextId(), handle, outputVariables, columns.build(), TupleDomain.all(), TupleDomain.all());
PlanNode tableScan = new TableScanNode(getSourceLocation(node), idAllocator.getNextId(), handle, outputVariables, columns.build(), TupleDomain.all(), TupleDomain.all(), Optional.empty());
Scope scope = Scope.builder().withRelationType(RelationId.anonymous(), new RelationType(fields.build())).build();
RelationPlan relationPlan = new RelationPlan(tableScan, scope, outputVariables);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ protected RelationPlan visitTable(Table node, SqlPlannerContext context)
List<VariableReferenceExpression> outputVariables = outputVariablesBuilder.build();
List<TableConstraint<ColumnHandle>> tableConstraints = metadata.getTableMetadata(session, handle).getMetadata().getTableConstraintsHolder().getTableConstraintsWithColumnHandles();
context.incrementLeafNodes(session);
PlanNode root = new TableScanNode(getSourceLocation(node.getLocation()), idAllocator.getNextId(), handle, outputVariables, columns.build(), tableConstraints, TupleDomain.all(), TupleDomain.all());
PlanNode root = new TableScanNode(getSourceLocation(node.getLocation()), idAllocator.getNextId(), handle, outputVariables, columns.build(),
tableConstraints, TupleDomain.all(), TupleDomain.all(), Optional.empty());

return new RelationPlan(root, scope, outputVariables);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ public Result apply(TableScanNode tableScanNode, Captures captures, Context cont
tableScanNode.getAssignments(),
tableScanNode.getTableConstraints(),
layout.getLayout().getPredicate(),
TupleDomain.all()));
TupleDomain.all(),
tableScanNode.getTemporaryTableInfo()));
}
}

Expand Down Expand Up @@ -324,7 +325,8 @@ private static PlanNode pushPredicateIntoTableScan(
node.getAssignments(),
node.getTableConstraints(),
layout.getLayout().getPredicate(),
computeEnforced(newDomain, layout.getUnenforcedConstraint()));
computeEnforced(newDomain, layout.getUnenforcedConstraint()),
node.getTemporaryTableInfo());

// The order of the arguments to combineConjuncts matters:
// * Unenforced constraints go first because they can only be simple column references,
Expand Down
Loading

0 comments on commit 170b377

Please sign in to comment.