From e09c72e6cfd8d873beb0e4fbec8b73ff9bbb2eec Mon Sep 17 00:00:00 2001 From: SumanMaharana Date: Wed, 18 Dec 2024 00:36:34 +0530 Subject: [PATCH] Fix dbt Table not found Issue --- .../ingestion/source/database/dbt/metadata.py | 38 +++++++++++-------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py b/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py index 5c1b2cf81e31..05a574fc95ff 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py @@ -325,11 +325,14 @@ def add_dbt_tests( None, ) - def _add_dbt_freshness_test_from_sources( + def add_dbt_sources( self, key: str, manifest_node, manifest_entities, dbt_objects: DbtObjects - ): - # in dbt manifest sources node name is table/view name (not test name like with test nodes) - # so in order for the test creation to be named precisely I am amending manifest node name within it's deepcopy + ) -> None: + """ + Method to append dbt test cases based on sources file for later processing + In dbt manifest sources node name is table/view name (not test name like with test nodes) + So in order for the test creation to be named precisely I am amending manifest node name within it's deepcopy + """ manifest_node_new = deepcopy(manifest_node) manifest_node_new.name = manifest_node_new.name + "_freshness" @@ -349,16 +352,6 @@ def _add_dbt_freshness_test_from_sources( DbtCommonEnum.RESULTS.value ] = freshness_test_result - def add_dbt_sources( - self, key: str, manifest_node, manifest_entities, dbt_objects: DbtObjects - ) -> None: - """ - Method to append dbt test cases based on sources file for later processing - """ - self._add_dbt_freshness_test_from_sources( - key, manifest_node, manifest_entities, dbt_objects - ) - # pylint: disable=too-many-locals, too-many-branches, too-many-statements def yield_data_models( self, dbt_objects: DbtObjects @@ -480,7 +473,7 @@ def yield_data_models( table_name=model_name, ) - table_entity: Optional[ + table_entities: Optional[ Union[Table, List[Table]] ] = get_entity_from_es_result( entity_list=self.metadata.es_search_from_fqn( @@ -488,10 +481,20 @@ def yield_data_models( fqn_search_string=table_fqn, fields="sourceHash", ), - fetch_multiple_entities=False, + fetch_multiple_entities=True, + ) + logger.debug( + f"Found table entities from {table_fqn}: {table_entities}" + ) + + table_entity = next( + iter(filter(lambda x: x is not None, table_entities)), None ) if table_entity: + logger.debug( + f"Using Table Entity for datamodel: {table_entity}" + ) data_model_link = DataModelLink( table_entity=table_entity, datamodel=DataModel( @@ -947,6 +950,7 @@ def create_dbt_test_case( self.metadata, entity_link_str ) table_fqn = get_table_fqn(entity_link_str) + logger.debug(f"Table fqn found: {table_fqn}") source_elements = table_fqn.split(fqn.FQN_SEPARATOR) test_case_fqn = fqn.build( self.metadata, @@ -982,6 +986,7 @@ def create_dbt_test_case( owners=None, ) ) + logger.debug(f"Test case Already Exists: {test_case_fqn}") except Exception as err: # pylint: disable=broad-except yield Either( left=StackTraceError( @@ -1071,6 +1076,7 @@ def add_dbt_test_result(self, dbt_test: dict): else None, test_case_name=manifest_node.name, ) + logger.debug(f"Adding test case results to {test_case_fqn} ") self.metadata.add_test_case_results( test_results=test_case_result, test_case_fqn=test_case_fqn,