Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[SPARK-48566][PYTHON] Fix bug where partition indices are incorrect w…
…hen UDTF analyze() uses both select and partitionColumns ### What changes were proposed in this pull request? This PR fixes a bug that resulted in an internal error with some combination of the Python UDTF "select" and "partitionBy" options of the "analyze" method. Specifically, this logic in `Analyzer.scala` was wrong because it did not update the usage of `partitioningExpressionIndexes` to take the "select" expressions into account when they were introduced in apache#45007: ``` val tvfWithTableColumnIndexes = tvf match { case g Generate(pyudtf: PythonUDTF, _, _, _, _, _) if tableArgs.head._1.partitioningExpressionIndexes.nonEmpty => ////////////////////////////////////////////////////////////////////////////// // The bug is here: the 'partitioningExpressionIndexes' are not valid // if the UDTF "select" expressions are non-empty, since that prompts // us to add a new projection (of a possibly different number of // expressions) to evaluate them. ////////////////////////////////////////////////////////////////////////////// val partitionColumnIndexes = PythonUDTFPartitionColumnIndexes(tableArgs.head._1.partitioningExpressionIndexes) g.copy(generator = pyudtf.copy( pythonUDTFPartitionColumnIndexes = Some(partitionColumnIndexes))) case _ => tvf } ``` To reproduce: ``` from pyspark.sql.functions import ( AnalyzeArgument, AnalyzeResult, PartitioningColumn, SelectedColumn, udtf ) from pyspark.sql.types import ( DoubleType, StringType, StructType, ) udtf class TestTvf: staticmethod def analyze(observed: AnalyzeArgument) -> AnalyzeResult: out_schema = StructType() out_schema.add("partition_col", StringType()) out_schema.add("double_col", DoubleType()) return AnalyzeResult( schema=out_schema, partitionBy=[PartitioningColumn("partition_col")], select=[ SelectedColumn("partition_col"), SelectedColumn("double_col"), ], ) def eval(self, *args, **kwargs): pass def terminate(self): for _ in range(10): yield { "partition_col": None, "double_col": 1.0, } spark.udtf.register("serialize_test", TestTvf) # Fails ( spark .sql( """ SELECT * FROM serialize_test( TABLE( SELECT 5 AS unused_col, 'hi' AS partition_col, 1.0 AS double_col UNION ALL SELECT 4 AS unused_col, 'hi' AS partition_col, 1.0 AS double_col ) ) """ ) .toPandas() ) ``` ### Why are the changes needed? The above query returned internal errors before, but works now. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Additional golden file coverage ### Was this patch authored or co-authored using generative AI tooling? Some light GitHub copilot usage Closes apache#46918 from dtenedor/fix-udtf-bug. Authored-by: Daniel Tenedorio <daniel.tenedorio@databricks.com> Signed-off-by: Takuya Ueshin <ueshin@databricks.com>
- Loading branch information