diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py b/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py index 4cb1efe0c478..66e53a36fadd 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py @@ -326,11 +326,14 @@ def add_dbt_tests( None, ) - def _add_dbt_freshness_test_from_sources( + def add_dbt_sources( self, key: str, manifest_node, manifest_entities, dbt_objects: DbtObjects - ): - # in dbt manifest sources node name is table/view name (not test name like with test nodes) - # so in order for the test creation to be named precisely I am amending manifest node name within it's deepcopy + ) -> None: + """ + Method to append dbt test cases based on sources file for later processing + In dbt manifest sources node name is table/view name (not test name like with test nodes) + So in order for the test creation to be named precisely I am amending manifest node name within it's deepcopy + """ manifest_node_new = deepcopy(manifest_node) manifest_node_new.name = manifest_node_new.name + "_freshness" @@ -350,16 +353,6 @@ def _add_dbt_freshness_test_from_sources( DbtCommonEnum.RESULTS.value ] = freshness_test_result - def add_dbt_sources( - self, key: str, manifest_node, manifest_entities, dbt_objects: DbtObjects - ) -> None: - """ - Method to append dbt test cases based on sources file for later processing - """ - self._add_dbt_freshness_test_from_sources( - key, manifest_node, manifest_entities, dbt_objects - ) - # pylint: disable=too-many-locals, too-many-branches, too-many-statements def yield_data_models( self, dbt_objects: DbtObjects @@ -481,7 +474,7 @@ def yield_data_models( table_name=model_name, ) - table_entity: Optional[ + table_entities: Optional[ Union[Table, List[Table]] ] = get_entity_from_es_result( entity_list=self.metadata.es_search_from_fqn( @@ -489,10 +482,20 @@ def yield_data_models( fqn_search_string=table_fqn, fields="sourceHash", ), - fetch_multiple_entities=False, + fetch_multiple_entities=True, + ) + logger.debug( + f"Found table entities from {table_fqn}: {table_entities}" + ) + + table_entity = next( + iter(filter(lambda x: x is not None, table_entities)), None ) if table_entity: + logger.debug( + f"Using Table Entity for datamodel: {table_entity}" + ) data_model_link = DataModelLink( table_entity=table_entity, datamodel=DataModel( @@ -948,6 +951,7 @@ def create_dbt_test_case( self.metadata, entity_link_str ) table_fqn = get_table_fqn(entity_link_str) + logger.debug(f"Table fqn found: {table_fqn}") source_elements = table_fqn.split(fqn.FQN_SEPARATOR) test_case_fqn = fqn.build( self.metadata, @@ -983,6 +987,7 @@ def create_dbt_test_case( owners=None, ) ) + logger.debug(f"Test case Already Exists: {test_case_fqn}") except Exception as err: # pylint: disable=broad-except yield Either( left=StackTraceError( @@ -1072,6 +1077,8 @@ def add_dbt_test_result(self, dbt_test: dict): else None, test_case_name=manifest_node.name, ) + + logger.debug(f"Adding test case results to {test_case_fqn} ") try: self.metadata.add_test_case_results( test_results=test_case_result,