Skip to content

Commit

Permalink
Fix.
Browse files Browse the repository at this point in the history
  • Loading branch information
andreaschat-db committed Mar 25, 2024
1 parent 36f95dd commit 1a37f73
Showing 1 changed file with 25 additions and 30 deletions.
55 changes: 25 additions & 30 deletions spark/src/main/scala/io/delta/tables/DeltaMergeBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,9 @@ class DeltaMergeBuilder private(

private def mergePlan: DeltaMergeInto = {
var targetPlan = targetTable.toDF.queryExecution.analyzed
val sourcePlan = source.queryExecution.analyzed
var sourcePlan = source.queryExecution.analyzed
var condition = onCondition.expr
var clauses = whenClauses

// If source and target have duplicate, pre-resolved references (can happen with self-merge),
// then rewrite the references in target with new exprId to avoid ambiguity.
Expand All @@ -346,17 +348,25 @@ class DeltaMergeBuilder private(
// optional SubqueryAlias.
val duplicateResolvedRefs = targetPlan.outputSet.intersect(sourcePlan.outputSet)
if (duplicateResolvedRefs.nonEmpty) {
val refReplacementMap = duplicateResolvedRefs.toSeq.flatMap {
case a: AttributeReference =>
Some(a.exprId -> a.withExprId(NamedExpression.newExprId))
case _ => None
}.toMap
targetPlan = targetPlan.transformAllExpressions {
case a: AttributeReference if refReplacementMap.contains(a.exprId) =>
refReplacementMap(a.exprId)
}
logInfo("Rewritten duplicate refs between target and source plans: "
+ refReplacementMap.toSeq.mkString(", "))
val exprs = (condition +: clauses).map(_.transform {
// If any expression contain duplicate, pre-resolved references, we can't simply
// replace the references in the same way as the target because we don't know
// whether the user intended to refer to the source or the target columns. Instead,
// we unresolve them (only the duplicate refs) and let the analysis resolve the ambiguity
// and throw the usual error messages when needed.
case a: AttributeReference if duplicateResolvedRefs.contains(a) =>
UnresolvedAttribute(a.qualifier :+ a.name)
})
// Deduplicate the attribute IDs in the target and source plans, and all the MERGE
// expressions (condition and MERGE clauses), so that we can avoid duplicated attribute ID
// when building the MERGE command later.
val fakePlan = AnalysisHelper.FakeLogicalPlan(exprs, Seq(sourcePlan, targetPlan))
val newPlan = org.apache.spark.sql.catalyst.analysis.DeduplicateRelations(fakePlan)
.asInstanceOf[AnalysisHelper.FakeLogicalPlan]
sourcePlan = newPlan.children(0)
targetPlan = newPlan.children(1)
condition = newPlan.exprs.head
clauses = newPlan.exprs.takeRight(clauses.size).asInstanceOf[Seq[DeltaMergeIntoClause]]
}

// Note: The Scala API cannot generate MergeIntoTable just like the SQL parser because
Expand All @@ -367,24 +377,9 @@ class DeltaMergeBuilder private(
// (possible in Scala API, but syntactically not possible in SQL). This issue is tracked
// by https://issues.apache.org/jira/browse/SPARK-34962.
val merge = DeltaMergeInto(
targetPlan,
sourcePlan,
onCondition.expr,
whenClauses,
withSchemaEvolution = schemaEvolutionEnabled)
val finalMerge = if (duplicateResolvedRefs.nonEmpty) {
// If any expression contain duplicate, pre-resolved references, we can't simply
// replace the references in the same way as the target because we don't know
// whether the user intended to refer to the source or the target columns. Instead,
// we unresolve them (only the duplicate refs) and let the analysis resolve the ambiguity
// and throw the usual error messages when needed.
merge.transformExpressions {
case a: AttributeReference if duplicateResolvedRefs.contains(a) =>
UnresolvedAttribute(a.qualifier :+ a.name)
}
} else merge
logDebug("Generated merged plan:\n" + finalMerge)
finalMerge
targetPlan, sourcePlan, condition, clauses, withSchemaEvolution = schemaEvolutionEnabled)
logDebug("Generated merged plan:\n" + merge)
merge
}
}

Expand Down

0 comments on commit 1a37f73

Please sign in to comment.