Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG][Spark] issue when merge using autoMerge property #3336

Open
2 of 8 tasks
londrake opened this issue Jul 4, 2024 · 2 comments
Open
2 of 8 tasks

[BUG][Spark] issue when merge using autoMerge property #3336

londrake opened this issue Jul 4, 2024 · 2 comments
Labels
bug Something isn't working

Comments

@londrake
Copy link

londrake commented Jul 4, 2024

Bug [Spark] issue when merge using autoMerge property

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Describe the problem

Merge operation with an insert/update Expr condition doesn't support using an alias when referencing the target table.
and the conf "spark.databricks.delta.schema.autoMerge.enabled" is enabled. Aliasing work fine when the parameter is off.

Let's suppose the target table has the alias t and the source has the alias s, when we define the Expr condition like that

Supposing new_col is an additional column for the target table, giving such a map Map( "t.new_col"- > "s.new_col") raises the error.

Steps to reproduce

Run the code below. Update the variable into the updateExpr/insertExpr to reproduce the issue.
`
// spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled","true") <- this cannot be enabled at run time. it has to be set when the spark session is initializing.

// val basePath = "My amazing path"
val targetPath = s"$basePath/target_table/"
val updatesDF = sparkSession.range(20).selectExpr("id", "(id*1000) as new_col")
sparkSession.range(10).write.format("delta").save(targetPath)
val target = DeltaTable.forPath(sparkSession, targetPath)

val badColumnsMap = Map("target.new_col" -> "source.new_col")
val goodColumnsMap = Map("new_col" -> "source.new_col")

target
  .alias("target")
  .merge(updatesDF.alias("source"), "target.id = source.id")
  .whenMatched()
  .updateExpr(goodColumnsMap) // insert  badColumnsMap for error
  .whenNotMatched()
  .insertExpr(goodColumnsMap) // insert  badColumnsMap for error
  .execute()
DeltaTable.forPath(sparkSession, targetPath).toDF.show()

`

Observed results

When val goodColumnsMap = Map("new_col" -> "source.new_col") is given as updateExpr/insertExpr condition, the merge runs smoothly as expected.

When val badColumnsMap = Map("target.new_col" -> "source.new_col") is given as updateExpr/insertExpr condition, an error will be raised.

Merge Op

    target
      .alias("target")
      .merge(updatesDF.alias("source"), "target.id = source.id")
      .whenMatched()
      .updateExpr(badColumnsMap)
      .whenNotMatched()
      .insertExpr(badColumnsMap)
      .execute()
    DeltaTable.forPath(sparkSession, targetPath).toDF.show()

ERROR LOG

[DELTA_MERGE_UNRESOLVED_EXPRESSION] Cannot resolve target.new_col in UPDATE clause given columns source.id, source.new_col
org.apache.spark.sql.delta.DeltaAnalysisException: [DELTA_MERGE_UNRESOLVED_EXPRESSION] Cannot resolve target.new_col in UPDATE clause given columns source.id, source.new_col
	at org.apache.spark.sql.delta.ResolveDeltaMergeInto$.$anonfun$resolveReferencesAndSchema$4(ResolveDeltaMergeInto.scala:81)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.sql.delta.ResolveDeltaMergeInto$.assertResolved$1(ResolveDeltaMergeInto.scala:74)
	at org.apache.spark.sql.delta.ResolveDeltaMergeInto$.$anonfun$resolveReferencesAndSchema$1(ResolveDeltaMergeInto.scala:60)
	at org.apache.spark.sql.delta.ResolveDeltaMergeInto$.$anonfun$resolveReferencesAndSchema$1$adapted(ResolveDeltaMergeInto.scala:60)
	at scala.collection.immutable.List.foreach(List.scala:431)

Expected results

I do expect there is no different behavior between the 2 cases. So the merge should run smooth.

Further details

Environment information

  • Delta Lake version: 3.2.0
  • Spark version: 3.5.0
  • Scala version: 2.13.0

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

  • Yes. I can contribute a fix for this bug independently.
  • Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
  • No. I cannot contribute a bug fix at this time.
@xupefei
Copy link
Contributor

xupefei commented Jul 5, 2024

I can confirm that this issue is valid. This is why it can can happen:

  1. When source and target tables are aliased, the output columns of their plan will be aliased, i.e., prefixed by the aliases. In the given example, the column of the source table is source.new_col while which of the target table is target.id.
  2. During the analysis process, we first resolve the attribute of the UPDATE clause target.new_col against the target plan. It's obvious that no column can match.
  3. As schema evolution is turned on, we will try again but against the source plan, which is the culprit (code pointer). Since columns in this plan are prefixed with the alias, target.new_col will not match anything because the only existing column is source.new_col.

The fix however is not trivial and risky. We could change the match logic to see the first part of the column name as an alias, but this will fail in the following example:

Source schema: col1 int, col2 int, t struct<col1: int, col2 int>
Target schema: col3 int, col4 int
Query: source.alias('s').merge(target.alias('t').update(Map('t.col2' -> 's.col2'))

What will 't.col2' match? the col2 column in the source table or the nested field t.col2?

I'll do some research to see how people run this kind of query and decide the next step.

@londrake
Copy link
Author

londrake commented Jul 5, 2024

Query: source.alias('s').merge(target.alias('t').update(Map('t.col2' -> 's.col2'))

I think you meant

Query: target.alias('t').merge(source.alias('s').update(Map('t.col2' -> 's.col2'))

Well, in this case, there is a clash btw the alias for the delta table and the struct column name... making sure that the alias does not match the column name will probably solve the issue, but it sounds like a workaround.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants