Skip to content

Commit

Permalink
[dagster-airbyte] remove deprecated load_assets_from_airbyte_project (#…
Browse files Browse the repository at this point in the history
…25516)

## Summary

Removes `load_assets_from_airbyte_project`, which has been deprecated
for quite a while.
  • Loading branch information
benpankow authored Oct 24, 2024
1 parent bae7fe4 commit 5b1391e
Show file tree
Hide file tree
Showing 12 changed files with 4 additions and 417 deletions.
Binary file modified docs/content/api/modules.json.gz
Binary file not shown.
Binary file modified docs/content/api/searchindex.json.gz
Binary file not shown.
Binary file modified docs/content/api/sections.json.gz
Binary file not shown.
49 changes: 0 additions & 49 deletions docs/content/integrations/airbyte.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -94,55 +94,6 @@ airbyte_assets = load_assets_from_airbyte_instance(airbyte_instance)

The `load_assets_from_airbyte_instance` function retrieves all of the connections you have defined in the Airbyte interface, creating asset definitions for each data stream. Each connection has an associated [op](https://docs.dagster.io/concepts/ops-jobs-graphs/ops#ops) which triggers a sync of that connection.

</TabItem>

<TabItem name="Loading from YAML config">

### Loading Airbyte asset definitions from YAML config <Deprecated />

<strong>
<PyObject
module="dagster_airbyte"
object="load_assets_from_airbyte_project"
/>{" "}
has been deprecated as the Octavia CLI is no longer maintained. Consider using{" "}
<PyObject
module="dagster_airbyte"
object="load_assets_from_airbyte_instance"
/>{" "}
instead.
</strong>

To load Airbyte assets into Dagster from a set of YAML configuration files, specify the Octavia project directory, which contains the `sources`, `destinations`, and `connections` subfolders. This is the directory where you first ran `octavia init`. Here, the YAML files are treated as the source of truth for building Dagster assets.

```python startafter=start_load_assets_from_airbyte_project endbefore=end_load_assets_from_airbyte_project file=/integrations/airbyte/airbyte.py dedent=4
from dagster_airbyte import load_assets_from_airbyte_project

airbyte_assets = load_assets_from_airbyte_project(
project_dir="path/to/airbyte/project",
)
```

The `load_assets_from_airbyte_project` function parses the YAML metadata, generating a set of asset definitions which reflect each of the data streams synced by your connections. Each connection has an associated [op](https://docs.dagster.io/concepts/ops-jobs-graphs/ops#ops) which triggers a sync of that connection.

#### Adding a resource

Assets loaded from Airbyte require an `AirbyteResource`, which defines how to connect and interact with your Airbyte instance.

We can add the Airbyte resource we configured above to our Airbyte assets by doing the following:

```python startafter=start_airbyte_project_config endbefore=end_airbyte_project_config file=/integrations/airbyte/airbyte.py dedent=4
from dagster_airbyte import load_assets_from_airbyte_project

from dagster import with_resources

# Use the airbyte_instance resource we defined in Step 1
airbyte_assets = with_resources(
[load_assets_from_airbyte_project(project_dir="path/to/airbyte/project")],
{"airbyte": airbyte_instance},
)
```

</TabItem>
<TabItem name="Manually building assets">

Expand Down
Binary file modified docs/next/public/objects.inv
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@ Assets

.. autofunction:: load_assets_from_airbyte_instance

.. autofunction:: load_assets_from_airbyte_project

.. autofunction:: build_airbyte_assets


Ops
===

.. autoconfigurable:: airbyte_sync_op
.. autoconfigurable:: airbyte_sync_op
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ Resources (Sling)

.. autoclass:: SlingConnectionResource

----

*******************************
dlt (dagster-embedded-elt.dlt)
*******************************
Expand Down
3 changes: 3 additions & 0 deletions docs/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ deps =
-e ../python_modules/libraries/dagster-tableau
-e ../python_modules/libraries/dagster-powerbi

sling

commands =
make --directory=sphinx clean
make --directory=sphinx json SPHINXOPTS="-W --keep-going"
Expand Down Expand Up @@ -85,6 +87,7 @@ deps =
-e ../python_modules/libraries/dagster-sigma
-e ../python_modules/libraries/dagster-tableau
-e ../python_modules/libraries/dagster-powerbi
sling

commands =
make --directory=sphinx clean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,6 @@ def scope_define_cloud_instance() -> None:
# end_define_cloud_instance


def scope_load_assets_from_airbyte_project():
# start_load_assets_from_airbyte_project
from dagster_airbyte import load_assets_from_airbyte_project

airbyte_assets = load_assets_from_airbyte_project(
project_dir="path/to/airbyte/project",
)
# end_load_assets_from_airbyte_project


def scope_load_assets_from_airbyte_instance():
from dagster_airbyte import AirbyteResource
from dagster import EnvVar
Expand All @@ -57,26 +47,6 @@ def scope_load_assets_from_airbyte_instance():
# end_load_assets_from_airbyte_instance


def scope_airbyte_project_config():
from dagster_airbyte import AirbyteResource

airbyte_instance = AirbyteResource(
host="localhost",
port="8000",
)
# start_airbyte_project_config
from dagster_airbyte import load_assets_from_airbyte_project

from dagster import with_resources

# Use the airbyte_instance resource we defined in Step 1
airbyte_assets = with_resources(
[load_assets_from_airbyte_project(project_dir="path/to/airbyte/project")],
{"airbyte": airbyte_instance},
)
# end_airbyte_project_config


def scope_manually_define_airbyte_assets():
# start_manually_define_airbyte_assets
from dagster_airbyte import build_airbyte_assets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from dagster_airbyte.asset_defs import (
build_airbyte_assets as build_airbyte_assets,
load_assets_from_airbyte_instance as load_assets_from_airbyte_instance,
load_assets_from_airbyte_project as load_assets_from_airbyte_project,
)
from dagster_airbyte.ops import airbyte_sync_op as airbyte_sync_op
from dagster_airbyte.resources import (
Expand Down
127 changes: 0 additions & 127 deletions python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
SourceAsset,
_check as check,
)
from dagster._annotations import deprecated
from dagster._core.definitions import AssetsDefinition, multi_asset
from dagster._core.definitions.cacheable_assets import (
AssetsDefinitionCacheableData,
Expand Down Expand Up @@ -1033,129 +1032,3 @@ def load_assets_from_airbyte_instance(
connection_to_freshness_policy_fn=connection_to_freshness_policy_fn,
connection_to_auto_materialize_policy_fn=connection_to_auto_materialize_policy_fn,
)


@deprecated(
breaking_version="1.9",
additional_warn_text="The Airbyte Octavia CLI has been deprecated. Consider using load_assets_from_airbyte_instance instead.",
)
def load_assets_from_airbyte_project(
project_dir: str,
workspace_id: Optional[str] = None,
key_prefix: Optional[CoercibleToAssetKeyPrefix] = None,
create_assets_for_normalization_tables: bool = True,
connection_to_group_fn: Optional[Callable[[str], Optional[str]]] = _clean_name,
connection_meta_to_group_fn: Optional[
Callable[[AirbyteConnectionMetadata], Optional[str]]
] = None,
io_manager_key: Optional[str] = None,
connection_to_io_manager_key_fn: Optional[Callable[[str], Optional[str]]] = None,
connection_filter: Optional[Callable[[AirbyteConnectionMetadata], bool]] = None,
connection_directories: Optional[Sequence[str]] = None,
connection_to_asset_key_fn: Optional[
Callable[[AirbyteConnectionMetadata, str], AssetKey]
] = None,
connection_to_freshness_policy_fn: Optional[
Callable[[AirbyteConnectionMetadata], Optional[FreshnessPolicy]]
] = None,
connection_to_auto_materialize_policy_fn: Optional[
Callable[[AirbyteConnectionMetadata], Optional[AutoMaterializePolicy]]
] = None,
) -> CacheableAssetsDefinition:
"""Loads an Airbyte project into a set of Dagster assets.
Point to the root folder of an Airbyte project synced using the Octavia CLI. For
more information, see https://airbyte.com/tutorials/version-control-airbyte-configurations.
Args:
project_dir (str): The path to the root of your Airbyte project, containing sources, destinations,
and connections folders.
workspace_id (Optional[str]): The ID of the Airbyte workspace to load connections from. Only
required if multiple workspace state YAMLfiles exist in the project.
key_prefix (Optional[CoercibleToAssetKeyPrefix]): A prefix for the asset keys created.
create_assets_for_normalization_tables (bool): If True, assets will be created for tables
created by Airbyte's normalization feature. If False, only the destination tables
will be created. Defaults to True.
connection_to_group_fn (Optional[Callable[[str], Optional[str]]]): Function which returns an asset
group name for a given Airbyte connection name. If None, no groups will be created. Defaults
to a basic sanitization function.
connection_meta_to_group_fn (Optional[Callable[[AirbyteConnectionMetadata], Optional[str]]]): Function
which returns an asset group name for a given Airbyte connection metadata. If None and connection_to_group_fn
is None, no groups will be created. Defaults to None.
io_manager_key (Optional[str]): The I/O manager key to use for all assets. Defaults to "io_manager".
Use this if all assets should be loaded from the same source, otherwise use connection_to_io_manager_key_fn.
connection_to_io_manager_key_fn (Optional[Callable[[str], Optional[str]]]): Function which returns an
I/O manager key for a given Airbyte connection name. When other ops are downstream of the loaded assets,
the IOManager specified determines how the inputs to those ops are loaded. Defaults to "io_manager".
connection_filter (Optional[Callable[[AirbyteConnectionMetadata], bool]]): Optional function which
takes in connection metadata and returns False if the connection should be excluded from the output assets.
connection_directories (Optional[List[str]]): Optional list of connection directories to load assets from.
If omitted, all connections in the Airbyte project are loaded. May be faster than connection_filter
if the project has many connections or if the connection yaml files are large.
connection_to_asset_key_fn (Optional[Callable[[AirbyteConnectionMetadata, str], AssetKey]]): Optional function which
takes in connection metadata and table name and returns an asset key for the table. If None, the default asset
key is based on the table name. Any asset key prefix will be applied to the output of this function.
connection_to_freshness_policy_fn (Optional[Callable[[AirbyteConnectionMetadata], Optional[FreshnessPolicy]]]):
Optional function which takes in connection metadata and returns a freshness policy for the connection's assets.
If None, no freshness policies will be applied to the assets.
connection_to_auto_materialize_policy_fn (Optional[Callable[[AirbyteConnectionMetadata], Optional[AutoMaterializePolicy]]]):
Optional function which takes in connection metadata and returns an auto materialization policy for the connection's assets.
If None, no auto materialization policies will be applied to the assets.
**Examples:**
Loading all Airbyte connections as assets:
.. code-block:: python
from dagster_airbyte import load_assets_from_airbyte_project
airbyte_assets = load_assets_from_airbyte_project(
project_dir="path/to/airbyte/project",
)
Filtering the set of loaded connections:
.. code-block:: python
from dagster_airbyte import load_assets_from_airbyte_project
airbyte_assets = load_assets_from_airbyte_project(
project_dir="path/to/airbyte/project",
connection_filter=lambda meta: "snowflake" in meta.name,
)
"""
if isinstance(key_prefix, str):
key_prefix = [key_prefix]
key_prefix = check.list_param(key_prefix or [], "key_prefix", of_type=str)

check.invariant(
not io_manager_key or not connection_to_io_manager_key_fn,
"Cannot specify both io_manager_key and connection_to_io_manager_key_fn",
)
if not connection_to_io_manager_key_fn:
connection_to_io_manager_key_fn = lambda _: io_manager_key

check.invariant(
not connection_meta_to_group_fn
or not connection_to_group_fn
or connection_to_group_fn == _clean_name,
"Cannot specify both connection_meta_to_group_fn and connection_to_group_fn",
)

if not connection_meta_to_group_fn and connection_to_group_fn:
connection_meta_to_group_fn = lambda meta: connection_to_group_fn(meta.name)

return AirbyteYAMLCacheableAssetsDefinition(
project_dir=project_dir,
workspace_id=workspace_id,
key_prefix=key_prefix,
create_assets_for_normalization_tables=create_assets_for_normalization_tables,
connection_meta_to_group_fn=connection_meta_to_group_fn,
connection_to_io_manager_key_fn=connection_to_io_manager_key_fn,
connection_filter=connection_filter,
connection_directories=connection_directories,
connection_to_asset_key_fn=connection_to_asset_key_fn,
connection_to_freshness_policy_fn=connection_to_freshness_policy_fn,
connection_to_auto_materialize_policy_fn=connection_to_auto_materialize_policy_fn,
)
Loading

2 comments on commit 5b1391e

@github-actions
Copy link

@github-actions github-actions bot commented on 5b1391e Oct 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagster-docs-beta ready!

✅ Preview
https://dagster-docs-beta-c9zlnhufg-elementl.vercel.app

Built with commit 5b1391e.
This pull request is being automatically deployed with vercel-action

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagster-docs ready!

✅ Preview
https://dagster-docs-pbm8oazrm-elementl.vercel.app
https://master.dagster.dagster-docs.io

Built with commit 5b1391e.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.