Skip to content

Commit

Permalink
[PySpark] Add schema evolution config to PySpark DeltaMergeBuilder (d…
Browse files Browse the repository at this point in the history
…elta-io#2778)

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [ ] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [X] PySpark

## Description

This PR continues from delta-io#2737 to
add a `withSchemaEvolution()` method for `DeltaMergeBuilder` in PySpark.


## How was this patch tested?

New unit tests.

## Does this PR introduce _any_ user-facing changes?

Yes, this PR allows the user to turn on schema evolution for MERGE in
PySpark by calling the `table.merge(...).withSchemaEvolution()` method.
  • Loading branch information
xupefei authored Mar 21, 2024
1 parent 9f040d4 commit 8618388
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 0 deletions.
13 changes: 13 additions & 0 deletions python/delta/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,19 @@ def whenNotMatchedBySourceDelete(
new_jbuilder = self.__getNotMatchedBySourceBuilder(condition).delete()
return DeltaMergeBuilder(self._spark, new_jbuilder)

@since(3.2) # type: ignore[arg-type]
def withSchemaEvolution(self) -> "DeltaMergeBuilder":
"""
Enable schema evolution for the merge operation. This allows the target table schema to
be automatically updated based on the schema of the source DataFrame.
See :py:class:`~delta.tables.DeltaMergeBuilder` for complete usage details.
:return: this builder
"""
new_jbuilder = self._jbuilder.withSchemaEvolution()
return DeltaMergeBuilder(self._spark, new_jbuilder)

@since(0.4) # type: ignore[arg-type]
def execute(self) -> None:
"""
Expand Down
14 changes: 14 additions & 0 deletions python/delta/tests/test_deltatable.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,20 @@ def reset_table() -> None:
.execute()
self.__checkAnswer(dt.toDF(), ([('a', -1), ('b', 2), ('c', 3), ('d', 4), ('e', -5)]))

# Schema evolution
reset_table()
dt.alias("t") \
.merge(source.toDF("key", "extra").alias("s"), expr("t.key = s.key")) \
.whenMatchedUpdate(set={"extra": "-1"}) \
.whenNotMatchedInsertAll() \
.withSchemaEvolution() \
.execute()
self.__checkAnswer(
DeltaTable.forPath(self.spark, self.tempFile).toDF(), # reload the table
([('a', 1, -1), ('b', 2, -1), ('c', 3, None), ('d', 4, None), ('e', None, -5),
('f', None, -6)]),
["key", "value", "extra"])

# ============== Test bad args ==============
# ---- bad args in merge()
with self.assertRaisesRegex(TypeError, "must be DataFrame"):
Expand Down

0 comments on commit 8618388

Please sign in to comment.