Skip to content

Commit

Permalink
[dagster-looker] Add ability to filter incoming dashboards, models (#…
Browse files Browse the repository at this point in the history
…25522)

## Summary

Adds the ability to filter incoming dashboards and models to the Looker
API resource loading method.

```python
load_looker_asset_specs(
    looker_resource,
    looker_filter=LookerFilter(
        dashboard_folders=[["my_folder", "my_subfolder"]],
        only_fetch_explores_used_in_dashboards=True,
    ),
)
```

The optional `LookerFilter` object can include config to only fetch
dashboards from specific folder paths, and also whether to optionally
only fetch explores that dashboards reference.

## Test Plan

New unit test.
  • Loading branch information
benpankow authored Oct 24, 2024
1 parent 5b1391e commit a78210c
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
RequestStartPdtBuild as RequestStartPdtBuild,
)
from dagster_looker.api.resource import (
LookerFilter as LookerFilter,
LookerResource as LookerResource,
load_looker_asset_specs as load_looker_asset_specs,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ def get_explore_asset_key(self, looker_structure: LookerStructureData) -> AssetK

def get_explore_asset_spec(self, looker_structure: LookerStructureData) -> AssetSpec:
lookml_explore = check.inst(looker_structure.data, (LookmlModelExplore, DashboardFilter))

if isinstance(lookml_explore, LookmlModelExplore):
explore_base_view = LookmlView(
view_name=check.not_none(lookml_explore.view_name),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, List, Mapping, Optional, Sequence, Tuple, Type, cast
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Sequence, Tuple, Type, cast

from dagster import (
AssetSpec,
Expand All @@ -10,6 +10,7 @@
)
from dagster._annotations import deprecated, experimental, public
from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader
from dagster._record import record
from dagster._utils.cached_method import cached_method
from dagster._utils.log import get_dagster_logger
from looker_sdk import init40
Expand All @@ -26,7 +27,7 @@
)

if TYPE_CHECKING:
from looker_sdk.sdk.api40.models import LookmlModelExplore
from looker_sdk.sdk.api40.models import Folder, LookmlModelExplore


logger = get_dagster_logger("dagster_looker")
Expand All @@ -35,6 +36,22 @@
LOOKER_RECONSTRUCTION_METADATA_KEY_PREFIX = "dagster-looker/reconstruction_metadata"


@record
class LookerFilter:
"""Filters the set of Looker objects to fetch.
Args:
dashboard_folders (Optional[List[List[str]]]): A list of folder paths to fetch dashboards from.
Each folder path is a list of folder names, starting from the root folder. All dashboards
contained in the specified folders will be fetched. If not provided, all dashboards will be fetched.
only_fetch_explores_used_in_dashboards (bool): If True, only explores used in the fetched dashboards
will be fetched. If False, all explores will be fetched. Defaults to False.
"""

dashboard_folders: Optional[List[List[str]]] = None
only_fetch_explores_used_in_dashboards: bool = False


@experimental
class LookerResource(ConfigurableResource):
"""Represents a connection to a Looker instance and provides methods
Expand Down Expand Up @@ -71,6 +88,7 @@ def build_defs(
*,
request_start_pdt_builds: Optional[Sequence[RequestStartPdtBuild]] = None,
dagster_looker_translator: Optional[DagsterLookerApiTranslator] = None,
looker_filter: Optional[LookerFilter] = None,
) -> Definitions:
"""Returns a Definitions object which will load structures from the Looker instance
and translate it into assets, using the provided translator.
Expand Down Expand Up @@ -101,14 +119,15 @@ def build_defs(
)

return Definitions(
assets=[*pdts, *load_looker_asset_specs(self, translator_cls)],
assets=[*pdts, *load_looker_asset_specs(self, translator_cls, looker_filter)],
resources={resource_key: self},
)


def load_looker_asset_specs(
looker_resource: LookerResource,
dagster_looker_translator: Type[DagsterLookerApiTranslator] = DagsterLookerApiTranslator,
looker_filter: Optional[LookerFilter] = None,
) -> Sequence[AssetSpec]:
"""Returns a list of AssetSpecs representing the Looker structures.
Expand All @@ -122,18 +141,30 @@ def load_looker_asset_specs(
"""
return check.is_list(
LookerApiDefsLoader(
looker_resource=looker_resource, translator_cls=dagster_looker_translator
looker_resource=looker_resource,
translator_cls=dagster_looker_translator,
looker_filter=looker_filter or LookerFilter(),
)
.build_defs()
.assets,
AssetSpec,
)


def build_folder_path(folder_id_to_folder: Dict[str, "Folder"], folder_id: str) -> List[str]:
curr = folder_id
result = []
while curr in folder_id_to_folder:
result = [folder_id_to_folder[curr].name] + result
curr = folder_id_to_folder[curr].parent_id
return result


@dataclass(frozen=True)
class LookerApiDefsLoader(StateBackedDefinitionsLoader[Mapping[str, Any]]):
looker_resource: LookerResource
translator_cls: Type[DagsterLookerApiTranslator]
looker_filter: LookerFilter

@property
def defs_key(self) -> str:
Expand Down Expand Up @@ -177,26 +208,59 @@ def fetch_looker_instance_data(self) -> LookerInstanceData:
"""
sdk = self.looker_resource.get_sdk()

folders = sdk.all_folders()
folder_by_id = {folder.id: folder for folder in folders if folder.id is not None}

# Get dashboards
dashboards = sdk.all_dashboards(
fields=",".join(
[
"id",
"hidden",
"folder",
]
)
)

folder_filter_strings = (
[
"/".join(folder_filter).lower()
for folder_filter in self.looker_filter.dashboard_folders
]
if self.looker_filter.dashboard_folders
else []
)

dashboard_ids_to_fetch = []
if len(folder_filter_strings) == 0:
dashboard_ids_to_fetch = [
dashboard.id for dashboard in dashboards if not dashboard.hidden
]
else:
for dashboard in dashboards:
if (
not dashboard.hidden
and dashboard.folder is not None
and dashboard.folder.id is not None
):
folder_string = "/".join(
build_folder_path(folder_by_id, dashboard.folder.id)
).lower()
if any(
folder_string.startswith(folder_filter_string)
for folder_filter_string in folder_filter_strings
):
dashboard_ids_to_fetch.append(dashboard.id)

with ThreadPoolExecutor(max_workers=None) as executor:
dashboards_by_id = dict(
list(
executor.map(
lambda dashboard: (dashboard.id, sdk.dashboard(dashboard_id=dashboard.id)),
(
dashboard
for dashboard in dashboards
if dashboard.id and not dashboard.hidden
lambda dashboard_id: (
dashboard_id,
sdk.dashboard(dashboard_id=dashboard_id),
),
(dashboard_id for dashboard_id in dashboard_ids_to_fetch),
)
)
)
Expand All @@ -215,6 +279,21 @@ def fetch_looker_instance_data(self) -> LookerInstanceData:
if model.name
}

if self.looker_filter.only_fetch_explores_used_in_dashboards:
used_explores = set()
for dashboard in dashboards_by_id.values():
for dash_filter in dashboard.dashboard_filters or []:
used_explores.add((dash_filter.model, dash_filter.explore))

explores_for_model = {
model_name: [
explore_name
for explore_name in explore_names
if (model_name, explore_name) in used_explores
]
for model_name, explore_names in explores_for_model.items()
}

def fetch_explore(model_name, explore_name) -> Optional[Tuple[str, "LookmlModelExplore"]]:
try:
lookml_explore = sdk.lookml_model_explore(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Dashboard,
DashboardBase,
DashboardFilter,
FolderBase,
LookmlModel,
LookmlModelExplore,
LookmlModelNavExplore,
Expand All @@ -14,6 +15,7 @@
LookmlModel(
explores=[
LookmlModelNavExplore(name="my_explore"),
LookmlModelNavExplore(name="my_other_explore"),
],
name="my_model",
)
Expand All @@ -22,10 +24,24 @@
mock_lookml_explore = LookmlModelExplore(
id="my_model::my_explore", view_name="my_view", sql_table_name="my_table"
)
mock_lookml_other_explore = LookmlModelExplore(
id="my_model::my_other_explore", view_name="my_view", sql_table_name="my_table"
)

mock_folders = [
FolderBase(parent_id=None, name="my_folder", id="1"),
FolderBase(parent_id="1", name="my_subfolder", id="2"),
FolderBase(parent_id="1", name="my_other_subfolder", id="3"),
]

mock_looker_dashboard_bases = [
DashboardBase(id="1", hidden=False),
DashboardBase(id="2", hidden=True),
DashboardBase(
id="1", hidden=False, folder=FolderBase(name="my_subfolder", id="2", parent_id="1")
),
DashboardBase(
id="2", hidden=False, folder=FolderBase(name="my_other_subfolder", id="3", parent_id="1")
),
DashboardBase(id="3", hidden=True),
]

mock_looker_dashboard = Dashboard(
Expand All @@ -36,6 +52,14 @@
],
)

mock_other_looker_dashboard = Dashboard(
title="my_dashboard_2",
id="2",
dashboard_filters=[
DashboardFilter(model="my_model", explore="my_other_explore"),
],
)

mock_start_pdt_build = MaterializePDT(
materialization_id="100",
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pytest
import responses
from dagster import AssetKey, AssetSpec, Definitions, materialize
from dagster_looker import LookerFilter
from dagster_looker.api.assets import build_looker_pdt_assets_definitions
from dagster_looker.api.dagster_looker_api_translator import (
DagsterLookerApiTranslator,
Expand All @@ -13,10 +14,13 @@

from dagster_looker_tests.api.mock_looker_data import (
mock_check_pdt_build,
mock_folders,
mock_looker_dashboard,
mock_looker_dashboard_bases,
mock_lookml_explore,
mock_lookml_models,
mock_lookml_other_explore,
mock_other_looker_dashboard,
mock_start_pdt_build,
)

Expand Down Expand Up @@ -53,6 +57,18 @@ def looker_instance_data_mocks_fixture(
url=f"{TEST_BASE_URL}/api/4.0/lookml_models/my_model/explores/my_explore",
body=sdk.serialize(api_model=mock_lookml_explore), # type: ignore
)
responses.add(
method=responses.GET,
url=f"{TEST_BASE_URL}/api/4.0/lookml_models/my_model/explores/my_other_explore",
body=sdk.serialize(api_model=mock_lookml_other_explore), # type: ignore
)

# Mock the request for all looker dashboards
responses.add(
method=responses.GET,
url=f"{TEST_BASE_URL}/api/4.0/folders",
body=sdk.serialize(api_model=mock_folders), # type: ignore
)

# Mock the request for all looker dashboards
responses.add(
Expand All @@ -68,16 +84,42 @@ def looker_instance_data_mocks_fixture(
body=sdk.serialize(api_model=mock_looker_dashboard), # type: ignore
)

responses.add(
method=responses.GET,
url=f"{TEST_BASE_URL}/api/4.0/dashboards/2",
body=sdk.serialize(api_model=mock_other_looker_dashboard), # type: ignore
)

yield response


@responses.activate
def test_load_asset_specs_filter(
looker_resource: LookerResource, looker_instance_data_mocks: responses.RequestsMock
) -> None:
asset_specs_by_key = {
spec.key: spec
for spec in load_looker_asset_specs(
looker_resource,
looker_filter=LookerFilter(
dashboard_folders=[["my_folder", "my_subfolder"]],
only_fetch_explores_used_in_dashboards=True,
),
)
}

assert len(asset_specs_by_key) == 2
assert AssetKey(["my_dashboard_2"]) not in asset_specs_by_key
assert AssetKey(["my_model::my_other_explore"]) not in asset_specs_by_key


@responses.activate
def test_load_asset_specs(
looker_resource: LookerResource, looker_instance_data_mocks: responses.RequestsMock
) -> None:
asset_specs_by_key = {spec.key: spec for spec in load_looker_asset_specs(looker_resource)}

assert len(asset_specs_by_key) == 2
assert len(asset_specs_by_key) == 4

expected_lookml_view_asset_dep_key = AssetKey(["view", "my_view"])
expected_lookml_explore_asset_key = AssetKey(["my_model::my_explore"])
Expand Down Expand Up @@ -112,7 +154,7 @@ def test_build_defs_with_pdts(
resources={resource_key: looker_resource},
)

assert len(defs.get_all_asset_specs()) == 3
assert len(defs.get_all_asset_specs()) == 5

sdk = looker_resource.get_sdk()

Expand Down

0 comments on commit a78210c

Please sign in to comment.