From f0b7cfa56cb90ef70132d9656299956cbde00b53 Mon Sep 17 00:00:00 2001 From: allisonwang-db Date: Tue, 18 Jun 2024 08:46:10 +0900 Subject: [PATCH] [SPARK-48497][PYTHON][DOCS] Add an example for Python data source writer in user guide ### What changes were proposed in this pull request? This PR adds an example for creating a simple data source writer in the user guide. ### Why are the changes needed? To improve the PySpark documentation. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Verified locally. ### Was this patch authored or co-authored using generative AI tooling? No Closes #46833 from allisonwang-db/spark-48497-ds-write-doc. Authored-by: allisonwang-db Signed-off-by: Hyukjin Kwon --- .../user_guide/sql/python_data_source.rst | 66 +++++++++++++++++-- 1 file changed, 60 insertions(+), 6 deletions(-) diff --git a/python/docs/source/user_guide/sql/python_data_source.rst b/python/docs/source/user_guide/sql/python_data_source.rst index 2386303f0da85..cdbc706993119 100644 --- a/python/docs/source/user_guide/sql/python_data_source.rst +++ b/python/docs/source/user_guide/sql/python_data_source.rst @@ -73,6 +73,9 @@ Method that needs to be implemented for a capability: def reader(self, schema: StructType): return FakeDataSourceReader(schema, self.options) + def writer(self, schema: StructType, overwrite: bool): + return FakeDataSourceWriter(self.options) + def streamReader(self, schema: StructType): return FakeStreamReader(schema, self.options) @@ -83,8 +86,8 @@ Method that needs to be implemented for a capability: def streamWriter(self, schema: StructType, overwrite: bool): return FakeStreamWriter(self.options) -Implementing Reader for Python Data Source ------------------------------------------- +Implementing Batch Reader and Writer for Python Data Source +----------------------------------------------------------- **Implement the Reader** Define the reader logic to generate synthetic data. Use the `faker` library to populate each field in the schema. @@ -109,6 +112,43 @@ Define the reader logic to generate synthetic data. Use the `faker` library to p row.append(value) yield tuple(row) +**Implement the Writer** + +Create a fake data source writer that processes each partition of data, counts the rows, and either +prints the total count of rows after a successful write or the number of failed tasks if the writing process fails. + +.. code-block:: python + + from dataclasses import dataclass + from typing import Iterator, List + + from pyspark.sql.types import Row + from pyspark.sql.datasource import DataSource, DataSourceWriter, WriterCommitMessage + + @dataclass + class SimpleCommitMessage(WriterCommitMessage): + partition_id: int + count: int + + class FakeDataSourceWriter(DataSourceWriter): + + def write(self, rows: Iterator[Row]) -> SimpleCommitMessage: + from pyspark import TaskContext + + context = TaskContext.get() + partition_id = context.partitionId() + cnt = sum(1 for _ in rows) + return SimpleCommitMessage(partition_id=partition_id, count=cnt) + + def commit(self, messages: List[SimpleCommitMessage]) -> None: + total_count = sum(message.count for message in messages) + print(f"Total number of rows: {total_count}") + + def abort(self, messages: List[SimpleCommitMessage]) -> None: + failed_count = sum(message is None for message in messages) + print(f"Number of failed tasks: {failed_count}") + + Implementing Streaming Reader and Writer for Python Data Source --------------------------------------------------------------- **Implement the Stream Reader** @@ -267,7 +307,9 @@ After defining your data source, it must be registered before usage. spark.dataSource.register(FakeDataSource) -Use the fake datasource with the default schema and options: +**Read From a Python Data Source** + +Read from the fake datasource with the default schema and options: .. code-block:: python @@ -281,7 +323,7 @@ Use the fake datasource with the default schema and options: # | Amy Martin|1988-10-28| 68076| Oregon| # +-----------+----------+-------+-------+ -Use the fake datasource with a custom schema: +Read from the fake datasource with a custom schema: .. code-block:: python @@ -295,7 +337,7 @@ Use the fake datasource with a custom schema: # |Mrs. Jacqueline Brown|Maynard Inc | # +---------------------+--------------+ -Use the fake datasource with a different number of rows: +Read from the fake datasource with a different number of rows: .. code-block:: python @@ -311,6 +353,18 @@ Use the fake datasource with a different number of rows: # | Douglas James|2007-01-18| 46226| Alabama| # +--------------+----------+-------+------------+ +**Write To a Python Data Source** + +To write data to a custom location, make sure that you specify the `mode()` clause. Supported modes are `append` and `overwrite`. + +.. code-block:: python + + df = spark.range(0, 10, 1, 5) + df.write.format("fake").mode("append").save() + + # You can check the Spark log (standard error) to see the output of the write operation. + # Total number of rows: 10 + **Use a Python Data Source in Streaming Query** Once we register the python data source, we can also use it in streaming queries as source of readStream() or sink of writeStream() by passing short name or full name to format(). @@ -338,4 +392,4 @@ We can also use the same data source in streaming reader and writer .. code-block:: python - query = spark.readStream.format("fake").load().writeStream().format("fake").start("/output_path") + query = spark.readStream.format("fake").load().writeStream.format("fake").start("/output_path")