Skip to content

Commit

Permalink
Databricks Support Table Constraints (#12138)
Browse files Browse the repository at this point in the history
* Databricks Support Table Constraints

* pylint fix

---------

Co-authored-by: Pere Miquel Brull <peremiquelbrull@gmail.com>
  • Loading branch information
ulixius9 and pmbrull authored Jun 26, 2023
1 parent e13a5de commit 18c8eb3
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,9 @@ class LineageTableStreams(BaseModel):
class LineageColumnStreams(BaseModel):
upstream_cols: Optional[List[DatabricksColumn]] = []
downstream_cols: Optional[List[DatabricksColumn]] = []


class ForeignConstrains(BaseModel):
child_columns: Optional[List[str]] = []
parent_columns: Optional[List[str]] = []
parent_table: str
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
Databricks Unity Catalog Source source methods.
"""
import traceback
from typing import Iterable, List, Optional, Tuple
from typing import Dict, Iterable, List, Optional, Tuple

from databricks.sdk.service.catalog import ColumnInfo
from databricks.sdk.service.catalog import TableConstraint as DBTableConstraint
from databricks.sdk.service.catalog import TableConstraintList

from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.api.data.createDatabaseSchema import (
Expand All @@ -24,7 +26,13 @@
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.table import Column, Table, TableType
from metadata.generated.schema.entity.data.table import (
Column,
ConstraintType,
Table,
TableConstraint,
TableType,
)
from metadata.generated.schema.entity.services.connections.database.databricksConnection import (
DatabricksConnection,
)
Expand All @@ -38,11 +46,14 @@
Source as WorkflowSource,
)
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.lineage.sql_lineage import get_column_fqn
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.models.table_metadata import OMetaTableConstraints
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser
from metadata.ingestion.source.database.database_service import DatabaseServiceSource
from metadata.ingestion.source.database.databricks.connection import get_connection
from metadata.ingestion.source.database.databricks.models import ForeignConstrains
from metadata.ingestion.source.models import TableView
from metadata.utils import fqn
from metadata.utils.db_utils import get_view_lineage
Expand All @@ -51,6 +62,16 @@

logger = ingestion_logger()

# pylint: disable=invalid-name,not-callable
@classmethod
def from_dict(cls, d: Dict[str, any]) -> "TableConstraintList":
return cls(
table_constraints=[DBTableConstraint.from_dict(constraint) for constraint in d]
)


TableConstraintList.from_dict = from_dict


class DatabricksUnityCatalogSource(DatabaseServiceSource):
"""
Expand All @@ -73,6 +94,7 @@ def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnecti
)
self.client = get_connection(self.service_connection)
self.connection_obj = self.client
self.table_constraints = []
self.test_connection()

@classmethod
Expand Down Expand Up @@ -233,7 +255,7 @@ def yield_table(
Prepare a table request and pass it to the sink
"""
table_name, table_type = table_name_and_type
table = self.context.table_data
table = self.client.tables.get(self.context.table_data.full_name)
schema_name = self.context.database_schema.name.__root__
db_name = self.context.database.name.__root__
table_constraints = None
Expand Down Expand Up @@ -263,13 +285,95 @@ def yield_table(
)
)

self.add_table_constraint_to_context(table.table_constraints)
self.register_record(table_request=table_request)
except Exception as exc:
error = f"Unexpected exception to yield table [{table_name}]: {exc}"
logger.debug(traceback.format_exc())
logger.warning(error)
self.status.failed(table_name, error, traceback.format_exc())

def add_table_constraint_to_context(self, constraints: TableConstraintList) -> None:
"""
Function to handle table constraint for the current table and add it to context
"""
if constraints and constraints.table_constraints:
primary_constraints = []
foreign_constraints = []
for constraint in constraints.table_constraints:
if constraint.primary_key_constraint:
primary_constraints.append(
TableConstraint(
constraintType=ConstraintType.PRIMARY_KEY,
columns=constraint.primary_key_constraint.child_columns,
)
)
if constraint.foreign_key_constraint:
foreign_constraints.append(
ForeignConstrains(
child_columns=constraint.foreign_key_constraint.child_columns,
parent_columns=constraint.foreign_key_constraint.parent_columns,
parent_table=constraint.foreign_key_constraint.parent_table,
)
)
self.table_constraints.append(
OMetaTableConstraints(
table=self.context.table,
foreign_constraints=foreign_constraints,
constraints=primary_constraints,
)
)

def _get_foreign_constraints(
self, table_constraints: OMetaTableConstraints
) -> List[TableConstraint]:
"""
Search the referred table for foreign constraints
and get referred column fqn
"""

foreign_constraints = []
for constraint in table_constraints.foreign_constraints:
referred_column_fqns = []
ref_table_fqn = constraint["parent_table"]
table_fqn_list = fqn.split(ref_table_fqn)

referred_table = fqn.search_table_from_es(
metadata=self.metadata,
table_name=table_fqn_list[2],
schema_name=table_fqn_list[1],
database_name=table_fqn_list[0],
service_name=self.context.database_service.name.__root__,
)
if referred_table:
for column in constraint["parent_columns"]:
col_fqn = get_column_fqn(table_entity=referred_table, column=column)
if col_fqn:
referred_column_fqns.append(col_fqn)
foreign_constraints.append(
TableConstraint(
constraintType=ConstraintType.FOREIGN_KEY,
columns=constraint["child_columns"],
referredColumns=referred_column_fqns,
)
)

return foreign_constraints

def yield_table_constraints(self) -> Optional[Iterable[OMetaTableConstraints]]:
"""
From topology.
process the table constraints of all tables
"""
for table_constraints in self.table_constraints:
foreign_constraints = self._get_foreign_constraints(table_constraints)
if foreign_constraints:
if table_constraints.constraints:
table_constraints.constraints.extend(foreign_constraints)
else:
table_constraints.constraints = foreign_constraints
yield table_constraints

def prepare(self):
pass

Expand Down

0 comments on commit 18c8eb3

Please sign in to comment.