Skip to content

Commit

Permalink
unity-catalog get table owners
Browse files Browse the repository at this point in the history
  • Loading branch information
harshsoni2024 committed Aug 6, 2024
1 parent df3312e commit 2c3606b
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import json
import traceback

from requests import HTTPError

from metadata.ingestion.source.database.databricks.client import (
API_TIMEOUT,
DatabricksClient,
Expand All @@ -27,6 +29,7 @@
logger = ingestion_logger()
TABLE_LINEAGE_PATH = "/lineage-tracking/table-lineage/get"
COLUMN_LINEAGE_PATH = "/lineage-tracking/column-lineage/get"
TABLES_PATH = "/unity-catalog/tables"


class UnityCatalogClient(DatabricksClient):
Expand Down Expand Up @@ -85,3 +88,21 @@ def get_column_lineage(
logger.error(exc)

return LineageColumnStreams()

def get_owner_info(self, full_table_name: str) -> str:
"""
get owner info from tables API
"""
try:
response = self.client.get(
f"{self.base_url}{TABLES_PATH}/{full_table_name}",
headers=self.headers,
timeout=API_TIMEOUT,
)
if response.status_code != 200:
raise HTTPError(response.text)
return response.json().get("owner")
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(exc)
return
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
Source as WorkflowSource,
)
from metadata.generated.schema.type.basic import EntityName, FullyQualifiedEntityName
from metadata.generated.schema.type.entityReferenceList import EntityReferenceList
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
Expand All @@ -61,6 +62,7 @@
)
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.ingestion.source.database.stored_procedures_mixin import QueryByProcedure
from metadata.ingestion.source.database.unitycatalog.client import UnityCatalogClient
from metadata.ingestion.source.database.unitycatalog.connection import get_connection
from metadata.ingestion.source.database.unitycatalog.models import (
ColumnJson,
Expand Down Expand Up @@ -99,6 +101,7 @@ def __init__(self, config: WorkflowSource, metadata: OpenMetadata):
)
self.external_location_map = {}
self.client = get_connection(self.service_connection)
self.api_client = UnityCatalogClient(self.service_connection)
self.connection_obj = self.client
self.table_constraints = []
self.context.storage_location = None
Expand Down Expand Up @@ -330,6 +333,7 @@ def yield_table(
schema_name=schema_name,
)
),
owners=self.get_owner_ref(table_name),
)
yield Either(right=table_request)

Expand Down Expand Up @@ -537,3 +541,19 @@ def yield_procedure_lineage_and_queries(

def close(self):
"""Nothing to close"""

def get_owner_ref(self, table_name: str) -> Optional[EntityReferenceList]:
"""
Method to process the table owners
"""
try:
full_table_name = f"{self.context.get().database}.{self.context.get().database_schema}.{table_name}"
owner = self.api_client.get_owner_info(full_table_name)
if not owner:
return
owner_ref = self.metadata.get_reference_by_email(email=owner)
return owner_ref
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error processing owner for table {table_name}: {exc}")
return

0 comments on commit 2c3606b

Please sign in to comment.