Skip to content

Commit

Permalink
[SPARK-48497][PYTHON][DOCS] Add an example for Python data source wri…
Browse files Browse the repository at this point in the history
…ter in user guide

### What changes were proposed in this pull request?

This PR adds an example for creating a simple data source writer in the user guide.

### Why are the changes needed?

To improve the PySpark documentation.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Verified locally.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#46833 from allisonwang-db/spark-48497-ds-write-doc.

Authored-by: allisonwang-db <allison.wang@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
allisonwang-db authored and HyukjinKwon committed Jun 17, 2024
1 parent 042804a commit f0b7cfa
Showing 1 changed file with 60 additions and 6 deletions.
66 changes: 60 additions & 6 deletions python/docs/source/user_guide/sql/python_data_source.rst
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ Method that needs to be implemented for a capability:
def reader(self, schema: StructType):
return FakeDataSourceReader(schema, self.options)
def writer(self, schema: StructType, overwrite: bool):
return FakeDataSourceWriter(self.options)
def streamReader(self, schema: StructType):
return FakeStreamReader(schema, self.options)
Expand All @@ -83,8 +86,8 @@ Method that needs to be implemented for a capability:
def streamWriter(self, schema: StructType, overwrite: bool):
return FakeStreamWriter(self.options)
Implementing Reader for Python Data Source
------------------------------------------
Implementing Batch Reader and Writer for Python Data Source
-----------------------------------------------------------
**Implement the Reader**

Define the reader logic to generate synthetic data. Use the `faker` library to populate each field in the schema.
Expand All @@ -109,6 +112,43 @@ Define the reader logic to generate synthetic data. Use the `faker` library to p
row.append(value)
yield tuple(row)
**Implement the Writer**

Create a fake data source writer that processes each partition of data, counts the rows, and either
prints the total count of rows after a successful write or the number of failed tasks if the writing process fails.

.. code-block:: python
from dataclasses import dataclass
from typing import Iterator, List
from pyspark.sql.types import Row
from pyspark.sql.datasource import DataSource, DataSourceWriter, WriterCommitMessage
@dataclass
class SimpleCommitMessage(WriterCommitMessage):
partition_id: int
count: int
class FakeDataSourceWriter(DataSourceWriter):
def write(self, rows: Iterator[Row]) -> SimpleCommitMessage:
from pyspark import TaskContext
context = TaskContext.get()
partition_id = context.partitionId()
cnt = sum(1 for _ in rows)
return SimpleCommitMessage(partition_id=partition_id, count=cnt)
def commit(self, messages: List[SimpleCommitMessage]) -> None:
total_count = sum(message.count for message in messages)
print(f"Total number of rows: {total_count}")
def abort(self, messages: List[SimpleCommitMessage]) -> None:
failed_count = sum(message is None for message in messages)
print(f"Number of failed tasks: {failed_count}")
Implementing Streaming Reader and Writer for Python Data Source
---------------------------------------------------------------
**Implement the Stream Reader**
Expand Down Expand Up @@ -267,7 +307,9 @@ After defining your data source, it must be registered before usage.
spark.dataSource.register(FakeDataSource)
Use the fake datasource with the default schema and options:
**Read From a Python Data Source**

Read from the fake datasource with the default schema and options:

.. code-block:: python
Expand All @@ -281,7 +323,7 @@ Use the fake datasource with the default schema and options:
# | Amy Martin|1988-10-28| 68076| Oregon|
# +-----------+----------+-------+-------+
Use the fake datasource with a custom schema:
Read from the fake datasource with a custom schema:

.. code-block:: python
Expand All @@ -295,7 +337,7 @@ Use the fake datasource with a custom schema:
# |Mrs. Jacqueline Brown|Maynard Inc |
# +---------------------+--------------+
Use the fake datasource with a different number of rows:
Read from the fake datasource with a different number of rows:

.. code-block:: python
Expand All @@ -311,6 +353,18 @@ Use the fake datasource with a different number of rows:
# | Douglas James|2007-01-18| 46226| Alabama|
# +--------------+----------+-------+------------+
**Write To a Python Data Source**

To write data to a custom location, make sure that you specify the `mode()` clause. Supported modes are `append` and `overwrite`.

.. code-block:: python
df = spark.range(0, 10, 1, 5)
df.write.format("fake").mode("append").save()
# You can check the Spark log (standard error) to see the output of the write operation.
# Total number of rows: 10
**Use a Python Data Source in Streaming Query**

Once we register the python data source, we can also use it in streaming queries as source of readStream() or sink of writeStream() by passing short name or full name to format().
Expand Down Expand Up @@ -338,4 +392,4 @@ We can also use the same data source in streaming reader and writer

.. code-block:: python
query = spark.readStream.format("fake").load().writeStream().format("fake").start("/output_path")
query = spark.readStream.format("fake").load().writeStream.format("fake").start("/output_path")

0 comments on commit f0b7cfa

Please sign in to comment.