diff --git a/spark/src/main/scala/io/delta/tables/DeltaMergeBuilder.scala b/spark/src/main/scala/io/delta/tables/DeltaMergeBuilder.scala index 50b9ac5b116..0d0b8e69964 100644 --- a/spark/src/main/scala/io/delta/tables/DeltaMergeBuilder.scala +++ b/spark/src/main/scala/io/delta/tables/DeltaMergeBuilder.scala @@ -337,7 +337,9 @@ class DeltaMergeBuilder private( private def mergePlan: DeltaMergeInto = { var targetPlan = targetTable.toDF.queryExecution.analyzed - val sourcePlan = source.queryExecution.analyzed + var sourcePlan = source.queryExecution.analyzed + var condition = onCondition.expr + var clauses = whenClauses // If source and target have duplicate, pre-resolved references (can happen with self-merge), // then rewrite the references in target with new exprId to avoid ambiguity. @@ -346,17 +348,25 @@ class DeltaMergeBuilder private( // optional SubqueryAlias. val duplicateResolvedRefs = targetPlan.outputSet.intersect(sourcePlan.outputSet) if (duplicateResolvedRefs.nonEmpty) { - val refReplacementMap = duplicateResolvedRefs.toSeq.flatMap { - case a: AttributeReference => - Some(a.exprId -> a.withExprId(NamedExpression.newExprId)) - case _ => None - }.toMap - targetPlan = targetPlan.transformAllExpressions { - case a: AttributeReference if refReplacementMap.contains(a.exprId) => - refReplacementMap(a.exprId) - } - logInfo("Rewritten duplicate refs between target and source plans: " - + refReplacementMap.toSeq.mkString(", ")) + val exprs = (condition +: clauses).map(_.transform { + // If any expression contain duplicate, pre-resolved references, we can't simply + // replace the references in the same way as the target because we don't know + // whether the user intended to refer to the source or the target columns. Instead, + // we unresolve them (only the duplicate refs) and let the analysis resolve the ambiguity + // and throw the usual error messages when needed. + case a: AttributeReference if duplicateResolvedRefs.contains(a) => + UnresolvedAttribute(a.qualifier :+ a.name) + }) + // Deduplicate the attribute IDs in the target and source plans, and all the MERGE + // expressions (condition and MERGE clauses), so that we can avoid duplicated attribute ID + // when building the MERGE command later. + val fakePlan = AnalysisHelper.FakeLogicalPlan(exprs, Seq(sourcePlan, targetPlan)) + val newPlan = org.apache.spark.sql.catalyst.analysis.DeduplicateRelations(fakePlan) + .asInstanceOf[AnalysisHelper.FakeLogicalPlan] + sourcePlan = newPlan.children(0) + targetPlan = newPlan.children(1) + condition = newPlan.exprs.head + clauses = newPlan.exprs.takeRight(clauses.size).asInstanceOf[Seq[DeltaMergeIntoClause]] } // Note: The Scala API cannot generate MergeIntoTable just like the SQL parser because @@ -367,24 +377,9 @@ class DeltaMergeBuilder private( // (possible in Scala API, but syntactically not possible in SQL). This issue is tracked // by https://issues.apache.org/jira/browse/SPARK-34962. val merge = DeltaMergeInto( - targetPlan, - sourcePlan, - onCondition.expr, - whenClauses, - withSchemaEvolution = schemaEvolutionEnabled) - val finalMerge = if (duplicateResolvedRefs.nonEmpty) { - // If any expression contain duplicate, pre-resolved references, we can't simply - // replace the references in the same way as the target because we don't know - // whether the user intended to refer to the source or the target columns. Instead, - // we unresolve them (only the duplicate refs) and let the analysis resolve the ambiguity - // and throw the usual error messages when needed. - merge.transformExpressions { - case a: AttributeReference if duplicateResolvedRefs.contains(a) => - UnresolvedAttribute(a.qualifier :+ a.name) - } - } else merge - logDebug("Generated merged plan:\n" + finalMerge) - finalMerge + targetPlan, sourcePlan, condition, clauses, withSchemaEvolution = schemaEvolutionEnabled) + logDebug("Generated merged plan:\n" + merge) + merge } }