diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/client.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/client.py index 2b23069a0835..39db6a5d43e7 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/client.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/client.py @@ -14,6 +14,8 @@ import json import traceback +from requests import HTTPError + from metadata.ingestion.source.database.databricks.client import ( API_TIMEOUT, DatabricksClient, @@ -27,6 +29,7 @@ logger = ingestion_logger() TABLE_LINEAGE_PATH = "/lineage-tracking/table-lineage/get" COLUMN_LINEAGE_PATH = "/lineage-tracking/column-lineage/get" +TABLES_PATH = "/unity-catalog/tables" class UnityCatalogClient(DatabricksClient): @@ -85,3 +88,21 @@ def get_column_lineage( logger.error(exc) return LineageColumnStreams() + + def get_owner_info(self, full_table_name: str) -> str: + """ + get owner info from tables API + """ + try: + response = self.client.get( + f"{self.base_url}{TABLES_PATH}/{full_table_name}", + headers=self.headers, + timeout=API_TIMEOUT, + ) + if response.status_code != 200: + raise HTTPError(response.text) + return response.json().get("owner") + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.error(exc) + return diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py index 8d3992552338..01046c09f992 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py @@ -50,6 +50,7 @@ Source as WorkflowSource, ) from metadata.generated.schema.type.basic import EntityName, FullyQualifiedEntityName +from metadata.generated.schema.type.entityReferenceList import EntityReferenceList from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification @@ -61,6 +62,7 @@ ) from metadata.ingestion.source.database.multi_db_source import MultiDBSource from metadata.ingestion.source.database.stored_procedures_mixin import QueryByProcedure +from metadata.ingestion.source.database.unitycatalog.client import UnityCatalogClient from metadata.ingestion.source.database.unitycatalog.connection import get_connection from metadata.ingestion.source.database.unitycatalog.models import ( ColumnJson, @@ -99,6 +101,7 @@ def __init__(self, config: WorkflowSource, metadata: OpenMetadata): ) self.external_location_map = {} self.client = get_connection(self.service_connection) + self.api_client = UnityCatalogClient(self.service_connection) self.connection_obj = self.client self.table_constraints = [] self.context.storage_location = None @@ -330,6 +333,7 @@ def yield_table( schema_name=schema_name, ) ), + owners=self.get_owner_ref(table_name), ) yield Either(right=table_request) @@ -537,3 +541,19 @@ def yield_procedure_lineage_and_queries( def close(self): """Nothing to close""" + + def get_owner_ref(self, table_name: str) -> Optional[EntityReferenceList]: + """ + Method to process the table owners + """ + try: + full_table_name = f"{self.context.get().database}.{self.context.get().database_schema}.{table_name}" + owner = self.api_client.get_owner_info(full_table_name) + if not owner: + return + owner_ref = self.metadata.get_reference_by_email(email=owner) + return owner_ref + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning(f"Error processing owner for table {table_name}: {exc}") + return