diff --git a/python_modules/libraries/dagster-looker/dagster_looker/__init__.py b/python_modules/libraries/dagster-looker/dagster_looker/__init__.py index e78935924fa36..73845344bad1d 100644 --- a/python_modules/libraries/dagster-looker/dagster_looker/__init__.py +++ b/python_modules/libraries/dagster-looker/dagster_looker/__init__.py @@ -10,6 +10,7 @@ RequestStartPdtBuild as RequestStartPdtBuild, ) from dagster_looker.api.resource import ( + LookerFilter as LookerFilter, LookerResource as LookerResource, load_looker_asset_specs as load_looker_asset_specs, ) diff --git a/python_modules/libraries/dagster-looker/dagster_looker/api/dagster_looker_api_translator.py b/python_modules/libraries/dagster-looker/dagster_looker/api/dagster_looker_api_translator.py index d85dd9b68f3c2..f3603fa1183e4 100644 --- a/python_modules/libraries/dagster-looker/dagster_looker/api/dagster_looker_api_translator.py +++ b/python_modules/libraries/dagster-looker/dagster_looker/api/dagster_looker_api_translator.py @@ -115,6 +115,7 @@ def get_explore_asset_key(self, looker_structure: LookerStructureData) -> AssetK def get_explore_asset_spec(self, looker_structure: LookerStructureData) -> AssetSpec: lookml_explore = check.inst(looker_structure.data, (LookmlModelExplore, DashboardFilter)) + if isinstance(lookml_explore, LookmlModelExplore): explore_base_view = LookmlView( view_name=check.not_none(lookml_explore.view_name), diff --git a/python_modules/libraries/dagster-looker/dagster_looker/api/resource.py b/python_modules/libraries/dagster-looker/dagster_looker/api/resource.py index 6be23dc68c44f..44ec733b1732e 100644 --- a/python_modules/libraries/dagster-looker/dagster_looker/api/resource.py +++ b/python_modules/libraries/dagster-looker/dagster_looker/api/resource.py @@ -1,6 +1,6 @@ from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, List, Mapping, Optional, Sequence, Tuple, Type, cast +from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Sequence, Tuple, Type, cast from dagster import ( AssetSpec, @@ -10,6 +10,7 @@ ) from dagster._annotations import deprecated, experimental, public from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader +from dagster._record import record from dagster._utils.cached_method import cached_method from dagster._utils.log import get_dagster_logger from looker_sdk import init40 @@ -26,7 +27,7 @@ ) if TYPE_CHECKING: - from looker_sdk.sdk.api40.models import LookmlModelExplore + from looker_sdk.sdk.api40.models import Folder, LookmlModelExplore logger = get_dagster_logger("dagster_looker") @@ -35,6 +36,22 @@ LOOKER_RECONSTRUCTION_METADATA_KEY_PREFIX = "dagster-looker/reconstruction_metadata" +@record +class LookerFilter: + """Filters the set of Looker objects to fetch. + + Args: + dashboard_folders (Optional[List[List[str]]]): A list of folder paths to fetch dashboards from. + Each folder path is a list of folder names, starting from the root folder. All dashboards + contained in the specified folders will be fetched. If not provided, all dashboards will be fetched. + only_fetch_explores_used_in_dashboards (bool): If True, only explores used in the fetched dashboards + will be fetched. If False, all explores will be fetched. Defaults to False. + """ + + dashboard_folders: Optional[List[List[str]]] = None + only_fetch_explores_used_in_dashboards: bool = False + + @experimental class LookerResource(ConfigurableResource): """Represents a connection to a Looker instance and provides methods @@ -71,6 +88,7 @@ def build_defs( *, request_start_pdt_builds: Optional[Sequence[RequestStartPdtBuild]] = None, dagster_looker_translator: Optional[DagsterLookerApiTranslator] = None, + looker_filter: Optional[LookerFilter] = None, ) -> Definitions: """Returns a Definitions object which will load structures from the Looker instance and translate it into assets, using the provided translator. @@ -101,7 +119,7 @@ def build_defs( ) return Definitions( - assets=[*pdts, *load_looker_asset_specs(self, translator_cls)], + assets=[*pdts, *load_looker_asset_specs(self, translator_cls, looker_filter)], resources={resource_key: self}, ) @@ -109,6 +127,7 @@ def build_defs( def load_looker_asset_specs( looker_resource: LookerResource, dagster_looker_translator: Type[DagsterLookerApiTranslator] = DagsterLookerApiTranslator, + looker_filter: Optional[LookerFilter] = None, ) -> Sequence[AssetSpec]: """Returns a list of AssetSpecs representing the Looker structures. @@ -122,7 +141,9 @@ def load_looker_asset_specs( """ return check.is_list( LookerApiDefsLoader( - looker_resource=looker_resource, translator_cls=dagster_looker_translator + looker_resource=looker_resource, + translator_cls=dagster_looker_translator, + looker_filter=looker_filter or LookerFilter(), ) .build_defs() .assets, @@ -130,10 +151,20 @@ def load_looker_asset_specs( ) +def build_folder_path(folder_id_to_folder: Dict[str, "Folder"], folder_id: str) -> List[str]: + curr = folder_id + result = [] + while curr in folder_id_to_folder: + result = [folder_id_to_folder[curr].name] + result + curr = folder_id_to_folder[curr].parent_id + return result + + @dataclass(frozen=True) class LookerApiDefsLoader(StateBackedDefinitionsLoader[Mapping[str, Any]]): looker_resource: LookerResource translator_cls: Type[DagsterLookerApiTranslator] + looker_filter: LookerFilter @property def defs_key(self) -> str: @@ -177,26 +208,59 @@ def fetch_looker_instance_data(self) -> LookerInstanceData: """ sdk = self.looker_resource.get_sdk() + folders = sdk.all_folders() + folder_by_id = {folder.id: folder for folder in folders if folder.id is not None} + # Get dashboards dashboards = sdk.all_dashboards( fields=",".join( [ "id", "hidden", + "folder", ] ) ) + folder_filter_strings = ( + [ + "/".join(folder_filter).lower() + for folder_filter in self.looker_filter.dashboard_folders + ] + if self.looker_filter.dashboard_folders + else [] + ) + + dashboard_ids_to_fetch = [] + if len(folder_filter_strings) == 0: + dashboard_ids_to_fetch = [ + dashboard.id for dashboard in dashboards if not dashboard.hidden + ] + else: + for dashboard in dashboards: + if ( + not dashboard.hidden + and dashboard.folder is not None + and dashboard.folder.id is not None + ): + folder_string = "/".join( + build_folder_path(folder_by_id, dashboard.folder.id) + ).lower() + if any( + folder_string.startswith(folder_filter_string) + for folder_filter_string in folder_filter_strings + ): + dashboard_ids_to_fetch.append(dashboard.id) + with ThreadPoolExecutor(max_workers=None) as executor: dashboards_by_id = dict( list( executor.map( - lambda dashboard: (dashboard.id, sdk.dashboard(dashboard_id=dashboard.id)), - ( - dashboard - for dashboard in dashboards - if dashboard.id and not dashboard.hidden + lambda dashboard_id: ( + dashboard_id, + sdk.dashboard(dashboard_id=dashboard_id), ), + (dashboard_id for dashboard_id in dashboard_ids_to_fetch), ) ) ) @@ -215,6 +279,21 @@ def fetch_looker_instance_data(self) -> LookerInstanceData: if model.name } + if self.looker_filter.only_fetch_explores_used_in_dashboards: + used_explores = set() + for dashboard in dashboards_by_id.values(): + for dash_filter in dashboard.dashboard_filters or []: + used_explores.add((dash_filter.model, dash_filter.explore)) + + explores_for_model = { + model_name: [ + explore_name + for explore_name in explore_names + if (model_name, explore_name) in used_explores + ] + for model_name, explore_names in explores_for_model.items() + } + def fetch_explore(model_name, explore_name) -> Optional[Tuple[str, "LookmlModelExplore"]]: try: lookml_explore = sdk.lookml_model_explore( diff --git a/python_modules/libraries/dagster-looker/dagster_looker_tests/api/mock_looker_data.py b/python_modules/libraries/dagster-looker/dagster_looker_tests/api/mock_looker_data.py index fd3778ea38afb..1e6effeb46c3e 100644 --- a/python_modules/libraries/dagster-looker/dagster_looker_tests/api/mock_looker_data.py +++ b/python_modules/libraries/dagster-looker/dagster_looker_tests/api/mock_looker_data.py @@ -4,6 +4,7 @@ Dashboard, DashboardBase, DashboardFilter, + FolderBase, LookmlModel, LookmlModelExplore, LookmlModelNavExplore, @@ -14,6 +15,7 @@ LookmlModel( explores=[ LookmlModelNavExplore(name="my_explore"), + LookmlModelNavExplore(name="my_other_explore"), ], name="my_model", ) @@ -22,10 +24,24 @@ mock_lookml_explore = LookmlModelExplore( id="my_model::my_explore", view_name="my_view", sql_table_name="my_table" ) +mock_lookml_other_explore = LookmlModelExplore( + id="my_model::my_other_explore", view_name="my_view", sql_table_name="my_table" +) + +mock_folders = [ + FolderBase(parent_id=None, name="my_folder", id="1"), + FolderBase(parent_id="1", name="my_subfolder", id="2"), + FolderBase(parent_id="1", name="my_other_subfolder", id="3"), +] mock_looker_dashboard_bases = [ - DashboardBase(id="1", hidden=False), - DashboardBase(id="2", hidden=True), + DashboardBase( + id="1", hidden=False, folder=FolderBase(name="my_subfolder", id="2", parent_id="1") + ), + DashboardBase( + id="2", hidden=False, folder=FolderBase(name="my_other_subfolder", id="3", parent_id="1") + ), + DashboardBase(id="3", hidden=True), ] mock_looker_dashboard = Dashboard( @@ -36,6 +52,14 @@ ], ) +mock_other_looker_dashboard = Dashboard( + title="my_dashboard_2", + id="2", + dashboard_filters=[ + DashboardFilter(model="my_model", explore="my_other_explore"), + ], +) + mock_start_pdt_build = MaterializePDT( materialization_id="100", ) diff --git a/python_modules/libraries/dagster-looker/dagster_looker_tests/api/test_build_defs.py b/python_modules/libraries/dagster-looker/dagster_looker_tests/api/test_build_defs.py index 54d65a27b9faa..b2b68de7ef441 100644 --- a/python_modules/libraries/dagster-looker/dagster_looker_tests/api/test_build_defs.py +++ b/python_modules/libraries/dagster-looker/dagster_looker_tests/api/test_build_defs.py @@ -3,6 +3,7 @@ import pytest import responses from dagster import AssetKey, AssetSpec, Definitions, materialize +from dagster_looker import LookerFilter from dagster_looker.api.assets import build_looker_pdt_assets_definitions from dagster_looker.api.dagster_looker_api_translator import ( DagsterLookerApiTranslator, @@ -13,10 +14,13 @@ from dagster_looker_tests.api.mock_looker_data import ( mock_check_pdt_build, + mock_folders, mock_looker_dashboard, mock_looker_dashboard_bases, mock_lookml_explore, mock_lookml_models, + mock_lookml_other_explore, + mock_other_looker_dashboard, mock_start_pdt_build, ) @@ -53,6 +57,18 @@ def looker_instance_data_mocks_fixture( url=f"{TEST_BASE_URL}/api/4.0/lookml_models/my_model/explores/my_explore", body=sdk.serialize(api_model=mock_lookml_explore), # type: ignore ) + responses.add( + method=responses.GET, + url=f"{TEST_BASE_URL}/api/4.0/lookml_models/my_model/explores/my_other_explore", + body=sdk.serialize(api_model=mock_lookml_other_explore), # type: ignore + ) + + # Mock the request for all looker dashboards + responses.add( + method=responses.GET, + url=f"{TEST_BASE_URL}/api/4.0/folders", + body=sdk.serialize(api_model=mock_folders), # type: ignore + ) # Mock the request for all looker dashboards responses.add( @@ -68,16 +84,42 @@ def looker_instance_data_mocks_fixture( body=sdk.serialize(api_model=mock_looker_dashboard), # type: ignore ) + responses.add( + method=responses.GET, + url=f"{TEST_BASE_URL}/api/4.0/dashboards/2", + body=sdk.serialize(api_model=mock_other_looker_dashboard), # type: ignore + ) + yield response +@responses.activate +def test_load_asset_specs_filter( + looker_resource: LookerResource, looker_instance_data_mocks: responses.RequestsMock +) -> None: + asset_specs_by_key = { + spec.key: spec + for spec in load_looker_asset_specs( + looker_resource, + looker_filter=LookerFilter( + dashboard_folders=[["my_folder", "my_subfolder"]], + only_fetch_explores_used_in_dashboards=True, + ), + ) + } + + assert len(asset_specs_by_key) == 2 + assert AssetKey(["my_dashboard_2"]) not in asset_specs_by_key + assert AssetKey(["my_model::my_other_explore"]) not in asset_specs_by_key + + @responses.activate def test_load_asset_specs( looker_resource: LookerResource, looker_instance_data_mocks: responses.RequestsMock ) -> None: asset_specs_by_key = {spec.key: spec for spec in load_looker_asset_specs(looker_resource)} - assert len(asset_specs_by_key) == 2 + assert len(asset_specs_by_key) == 4 expected_lookml_view_asset_dep_key = AssetKey(["view", "my_view"]) expected_lookml_explore_asset_key = AssetKey(["my_model::my_explore"]) @@ -112,7 +154,7 @@ def test_build_defs_with_pdts( resources={resource_key: looker_resource}, ) - assert len(defs.get_all_asset_specs()) == 3 + assert len(defs.get_all_asset_specs()) == 5 sdk = looker_resource.get_sdk()