-
Notifications
You must be signed in to change notification settings - Fork 70
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Sink: Azure destination for FileSink (#671)
Add AzureDestination to FileSink
- Loading branch information
Showing
5 changed files
with
203 additions
and
19 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
# Microsoft Azure File Sink | ||
|
||
!!! info | ||
|
||
This is a **Community** connector. Test it before using in production. | ||
|
||
To learn more about differences between Core and Community connectors, see the [Community and Core Connectors](../community-and-core.md) page. | ||
|
||
This sink writes batches of data to Microsoft Azure in various formats. | ||
By default, the data will include the kafka message key, value, and timestamp. | ||
|
||
## How To Install | ||
|
||
To use the Azure sink, you need to install the required dependencies: | ||
|
||
```bash | ||
pip install quixstreams[azure-file] | ||
``` | ||
|
||
## How It Works | ||
|
||
`FileSink` with `AzureFileDestination` is a batching sink that writes data directly to Microsoft Azure. | ||
|
||
It batches processed records in memory per topic partition and writes them to Azure objects in a specified container and prefix structure. Objects are organized by topic and partition, with each batch being written to a separate object named by its starting offset. | ||
|
||
Batches are written to Azure during the commit phase of processing. This means the size of each batch (and therefore each Azure object) is influenced by your application's commit settings - either through `commit_interval` or the `commit_every` parameters. | ||
|
||
!!! note | ||
|
||
The Azure container must already exist and be accessible. The sink does not create the container automatically. If the container does not exist or access is denied, an error will be raised when initializing the sink. | ||
|
||
## How To Use | ||
|
||
Create an instance of `FileSink` with `AzureFileDestination` and pass it to the `StreamingDataFrame.sink()` method. | ||
|
||
```python | ||
from quixstreams import Application | ||
from quixstreams.sinks.community.file import FileSink | ||
from quixstreams.sinks.community.file.destinations import AzureFileDestination | ||
|
||
|
||
# Configure the sink to write JSON files to Azure | ||
file_sink = FileSink( | ||
# Optional: defaults to current working directory | ||
directory="data", | ||
# Optional: defaults to "json" | ||
# Available formats: "json", "parquet" or an instance of Format | ||
format=JSONFormat(compress=True), | ||
destination=AzureFileDestination( | ||
container="<YOUR CONTAINER NAME>", | ||
connection_string="<YOUR CONNECTION STRING>", | ||
) | ||
) | ||
|
||
app = Application(broker_address='localhost:9092', auto_offset_reset="earliest") | ||
topic = app.topic('sink-topic') | ||
|
||
sdf = app.dataframe(topic=topic) | ||
sdf.sink(file_sink) | ||
|
||
if __name__ == "__main__": | ||
app.run() | ||
``` | ||
|
||
## Azure Object Organization | ||
|
||
Objects in Azure follow this structure: | ||
``` | ||
my-container/ | ||
└── data/ | ||
└── sink_topic/ | ||
├── 0/ | ||
│ ├── 0000000000000000000.jsonl | ||
│ ├── 0000000000000000123.jsonl | ||
│ └── 0000000000000001456.jsonl | ||
└── 1/ | ||
├── 0000000000000000000.jsonl | ||
├── 0000000000000000789.jsonl | ||
└── 0000000000000001012.jsonl | ||
``` | ||
|
||
Each object is named using the batch's starting offset (padded to 19 digits) and the appropriate file extension for the chosen format. | ||
|
||
## Supported Formats | ||
|
||
- **JSON**: Supports appending to existing files | ||
- **Parquet**: Does not support appending (new file created for each batch) | ||
|
||
## Delivery Guarantees | ||
|
||
`FileSink` provides at-least-once guarantees, and the results may contain duplicated data if there were errors during processing. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,14 +1,4 @@ | ||
from .destinations import Destination, LocalDestination, S3Destination | ||
from .formats import Format, InvalidFormatError, JSONFormat, ParquetFormat | ||
from .sink import FileSink | ||
|
||
__all__ = [ | ||
"Destination", | ||
"LocalDestination", | ||
"S3Destination", | ||
"Format", | ||
"InvalidFormatError", | ||
"JSONFormat", | ||
"ParquetFormat", | ||
"FileSink", | ||
] | ||
# ruff: noqa: F403 | ||
from .destinations import * | ||
from .formats import * | ||
from .sink import * |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
from .base import Destination | ||
from .local import LocalDestination | ||
from .s3 import S3Destination | ||
|
||
__all__ = ("Destination", "LocalDestination", "S3Destination") | ||
# ruff: noqa: F403 | ||
from .azure import * | ||
from .base import * | ||
from .local import * | ||
from .s3 import * |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
import logging | ||
|
||
from quixstreams.sinks import SinkBatch | ||
from quixstreams.sinks.community.file.destinations.base import Destination | ||
|
||
try: | ||
from azure.core.exceptions import HttpResponseError | ||
from azure.storage.blob import BlobServiceClient | ||
from azure.storage.blob._container_client import ContainerClient | ||
except ImportError as exc: | ||
raise ImportError( | ||
f"Package {exc.name} is missing: " | ||
'run "pip install quixstreams[azure-file]" to use AzureFileDestination' | ||
) from exc | ||
|
||
|
||
__all__ = ("AzureFileDestination",) | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class AzureContainerNotFoundError(Exception): | ||
"""Raised when the specified Azure File container does not exist.""" | ||
|
||
|
||
class AzureContainerAccessDeniedError(Exception): | ||
"""Raised when the specified Azure File container access is denied.""" | ||
|
||
|
||
class AzureFileDestination(Destination): | ||
""" | ||
A destination that writes data to Microsoft Azure File. | ||
Handles writing data to Azure containers using the Azure Blob SDK. Credentials can | ||
be provided directly or via environment variables. | ||
""" | ||
|
||
def __init__( | ||
self, | ||
connection_string: str, | ||
container: str, | ||
) -> None: | ||
""" | ||
Initialize the Azure File destination. | ||
:param connection_string: Azure client authentication string. | ||
:param container: Azure container name. | ||
:raises AzureContainerNotFoundError: If the specified container doesn't exist. | ||
:raises AzureContainerAccessDeniedError: If access to the container is denied. | ||
""" | ||
self._container = container | ||
self._client = self._get_client(connection_string) | ||
self._validate_container() | ||
|
||
def _get_client(self, auth: str) -> ContainerClient: | ||
""" | ||
Get an Azure file container client and validate the container exists. | ||
:param auth: Azure client authentication string. | ||
:return: An Azure ContainerClient | ||
""" | ||
storage_client = BlobServiceClient.from_connection_string(auth) | ||
container_client = storage_client.get_container_client(self._container) | ||
return container_client | ||
|
||
def _validate_container(self) -> None: | ||
""" | ||
Validate that the container exists and is accessible. | ||
:raises AzureContainerNotFoundError: If the specified container doesn't exist. | ||
:raises AzureContainerAccessDeniedError: If access to the container is denied. | ||
""" | ||
try: | ||
if self._client.exists(timeout=10): | ||
return | ||
except HttpResponseError as e: | ||
if e.status_code == 403: | ||
raise AzureContainerAccessDeniedError( | ||
f"Container access denied: {self._container}" | ||
) | ||
except Exception as e: | ||
logger.error("An unexpected Azure client error occurred", exc_info=e) | ||
raise | ||
raise AzureContainerNotFoundError(f"Container not found: {self._container}") | ||
|
||
def write(self, data: bytes, batch: SinkBatch) -> None: | ||
""" | ||
Write data to Azure. | ||
:param data: The serialized data to write. | ||
:param batch: The batch information containing topic and partition details. | ||
""" | ||
file_name = str(self._path(batch)) | ||
logger.debug( | ||
"Writing %d bytes to Azure container=%s, path=%s", | ||
len(data), | ||
self._container, | ||
file_name, | ||
) | ||
self._client.get_blob_client(file_name).upload_blob(data) |