Skip to content

Commit

Permalink
[dagster-airbyte] Implement airbyte_assets and build_airbyte_assets_d…
Browse files Browse the repository at this point in the history
…efinitions
  • Loading branch information
maximearmstrong committed Dec 18, 2024
1 parent 1b64ddb commit 144064d
Showing 1 changed file with 115 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
from typing import Any, Callable, Optional

from dagster import AssetsDefinition, multi_asset
from dagster._annotations import experimental

from dagster_airbyte.resources import AirbyteCloudResource
from dagster_airbyte.translator import AirbyteMetadataSet, DagsterAirbyteTranslator


@experimental
def airbyte_assets(
*,
connection_id: str,
workspace: AirbyteCloudResource,
name: Optional[str] = None,
group_name: Optional[str] = None,
dagster_airbyte_translator: Optional[DagsterAirbyteTranslator] = None,
) -> Callable[[Callable[..., Any]], AssetsDefinition]:
"""Create a definition for how to sync the tables of a given Airbyte connection.
Args:
connection_id (str): The Airbyte Connection ID.
workspace (AirbyteCloudWorkspace): The Airbyte workspace to fetch assets from.
name (Optional[str], optional): The name of the op.
group_name (Optional[str], optional): The name of the asset group.
dagster_airbyte_translator (Optional[DagsterAirbyteTranslator], optional): The translator to use
to convert Airbyte content into :py:class:`dagster.AssetSpec`.
Defaults to :py:class:`DagsterAirbyteTranslator`.
Examples:
Sync the tables of an Airbyte connection:
.. code-block:: python
from dagster_airbyte import AirbyteCloudWorkspace, airbyte_assets
import dagster as dg
airbyte_workspace = AirbyteCloudWorkspace(
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"),
client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"),
client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"),
)
@airbyte_assets(
connection_id="airbyte_connection_id",
name="airbyte_connection_id",
group_name="airbyte_connection_id",
workspace=airbyte_workspace,
)
def airbyte_connection_assets(context: dg.AssetExecutionContext, airbyte: AirbyteCloudWorkspace):
yield from airbyte.sync_and_poll(context=context)
defs = dg.Definitions(
assets=[airbyte_connection_assets],
resources={"airbyte": airbyte_workspace},
)
Sync the tables of an Airbyte connection with a custom translator:
.. code-block:: python
from dagster_airbyte import (
DagsterAirbyteTranslator,
AirbyteConnectionTableProps,
AirbyteCloudWorkspace,
airbyte_assets
)
import dagster as dg
class CustomDagsterAirbyteTranslator(DagsterAirbyteTranslator):
def get_asset_spec(self, props: AirbyteConnectionTableProps) -> dg.AssetSpec:
default_spec = super().get_asset_spec(props)
return default_spec.replace_attributes(
key=asset_spec.key.with_prefix("my_prefix"),
)
airbyte_workspace = AirbyteCloudWorkspace(
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"),
client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"),
client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"),
)
@airbyte_assets(
connection_id="airbyte_connection_id",
name="airbyte_connection_id",
group_name="airbyte_connection_id",
workspace=airbyte_workspace,
dagster_airbyte_translator=CustomDagsterAirbyteTranslator()
)
def airbyte_connection_assets(context: dg.AssetExecutionContext, airbyte: AirbyteCloudWorkspace):
yield from airbyte.sync_and_poll(context=context)
defs = dg.Definitions(
assets=[airbyte_connection_assets],
resources={"airbyte": airbyte_workspace},
)
"""
return multi_asset(
name=name,
group_name=group_name,
can_subset=False,
specs=[
spec
for spec in workspace.load_asset_specs(
dagster_airbyte_translator=dagster_airbyte_translator or DagsterAirbyteTranslator()
)
if AirbyteMetadataSet.extract(spec.metadata).connection_id == connection_id
],
)

0 comments on commit 144064d

Please sign in to comment.