Skip to content

Commit

Permalink
#14115: Separate Unity Catalog From Databricks (#14138)
Browse files Browse the repository at this point in the history
  • Loading branch information
ulixius9 authored Dec 4, 2023
1 parent 59d0e82 commit 389ae79
Show file tree
Hide file tree
Showing 49 changed files with 1,517 additions and 598 deletions.
85 changes: 85 additions & 0 deletions bootstrap/sql/migrations/native/1.2.3/mysql/schemaChanges.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
-- update service type to UnityCatalog - update database entity
UPDATE database_entity de
SET de.json = JSON_INSERT(
JSON_REMOVE(de.json, '$.serviceType'),
'$.serviceType',
'UnityCatalog'
)
where id in (
select toId from entity_relationship er
where
fromEntity = 'databaseService'
and toEntity = 'database'
and fromId in (
select id from dbservice_entity dbe
where
serviceType = 'Databricks'
and JSON_EXTRACT(
dbe.json, '$.connection.config.useUnityCatalog'
) = true
));


-- update service type to UnityCatalog - update database schema entity
UPDATE database_schema_entity dse
SET dse.json = JSON_INSERT(
JSON_REMOVE(dse.json, '$.serviceType'),
'$.serviceType',
'UnityCatalog'
)
where JSON_EXTRACT(dse.json, '$.database.id') in (
select toId from entity_relationship er
where
fromEntity = 'databaseService'
and toEntity = 'database'
and fromId in (
select id from dbservice_entity dbe
where
serviceType = 'Databricks'
and JSON_EXTRACT(
dbe.json, '$.connection.config.useUnityCatalog'
) = true
));


-- update service type to UnityCatalog - update table entity
UPDATE table_entity te
SET te.json = JSON_INSERT(
JSON_REMOVE(te.json, '$.serviceType'),
'$.serviceType',
'UnityCatalog'
)
where JSON_EXTRACT(te.json, '$.database.id') in (
select toId from entity_relationship er
where
fromEntity = 'databaseService'
and toEntity = 'database'
and fromId in (
select id from dbservice_entity dbe
where
serviceType = 'Databricks'
and JSON_EXTRACT(
dbe.json, '$.connection.config.useUnityCatalog'
) = true
));


-- update service type to UnityCatalog - update db service entity
UPDATE dbservice_entity de
SET de.json = JSON_INSERT(
JSON_REMOVE(de.json, '$.connection.config.type'),
'$.connection.config.type',
'UnityCatalog'
),de.json = JSON_INSERT(
JSON_REMOVE(de.json, '$.serviceType'),
'$.serviceType',
'UnityCatalog'
)
WHERE de.serviceType = 'Databricks'
AND JSON_EXTRACT(de.json, '$.connection.config.useUnityCatalog') = True
;

-- remove `useUnityCatalog` flag from service connection details of databricks
UPDATE dbservice_entity de
SET de.json = JSON_REMOVE(de.json, '$.connection.config.useUnityCatalog')
WHERE de.serviceType IN ('Databricks','UnityCatalog');
84 changes: 84 additions & 0 deletions bootstrap/sql/migrations/native/1.2.3/postgres/schemaChanges.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,87 @@ SET json = jsonb_set(
WHERE json #>> '{pipelineType}' = 'metadata'
AND json #>> '{sourceConfig,config,type}' = 'DatabaseMetadata'
AND json #>> '{sourceConfig,config,viewParsingTimeoutLimit}' is not null;



-- update service type to UnityCatalog - update database entity
UPDATE database_entity de
SET json = jsonb_set(
json #- '{serviceType}',
'{serviceType}',
'"UnityCatalog"',
true
)
where id in (
select toId from entity_relationship er
where
fromEntity = 'databaseService'
and toEntity = 'database'
and fromId in (
select id from dbservice_entity dbe
where
serviceType = 'Databricks'
and (dbe.json #>> '{connection,config,useUnityCatalog}')::bool = true
));


-- update service type to UnityCatalog - update database schema entity
UPDATE database_schema_entity dse
SET json = jsonb_set(
json #- '{serviceType}',
'{serviceType}',
'"UnityCatalog"',
true
)
where json #>> '{database,id}' in (
select toId from entity_relationship er
where
fromEntity = 'databaseService'
and toEntity = 'database'
and fromId in (
select id from dbservice_entity dbe
where
serviceType = 'Databricks'
and (dbe.json #>> '{connection,config,useUnityCatalog}')::bool = true
));

-- update service type to UnityCatalog - update table entity
UPDATE table_entity te
SET json = jsonb_set(
json #- '{serviceType}',
'{serviceType}',
'"UnityCatalog"',
true
)
where json #>> '{database,id}' in (
select toId from entity_relationship er
where
fromEntity = 'databaseService'
and toEntity = 'database'
and fromId in (
select id from dbservice_entity dbe
where
serviceType = 'Databricks'
and (dbe.json #>> '{connection,config,useUnityCatalog}')::bool = true
));


-- update service type to UnityCatalog - update db service entity
UPDATE dbservice_entity de
SET json = jsonb_set(
jsonb_set(
de.json #- '{serviceType}',
'{serviceType}',
'"UnityCatalog"'
) #- '{connection,config,type}',
'{connection,config,type}',
'"UnityCatalog"'
)
WHERE de.serviceType = 'Databricks'
AND (de.json #>> '{connection,config,useUnityCatalog}')::bool = True
;

-- remove `useUnityCatalog` flag from service connection details of databricks
UPDATE dbservice_entity de
SET json = json #- '{connection,config,useUnityCatalog}'
WHERE de.serviceType IN ('Databricks','UnityCatalog');
1 change: 0 additions & 1 deletion ingestion/src/metadata/examples/workflows/databricks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ source:
token: <databricks token>
hostPort: localhost:443
connectionTimeout: 120
useUnityCatalog: true
connectionArguments:
http_path: <http path of databricks cluster>
sourceConfig:
Expand Down
27 changes: 27 additions & 0 deletions ingestion/src/metadata/examples/workflows/unity_catalog.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
source:
type: unitycatalog
serviceName: local_unitycatalog
serviceConnection:
config:
type: UnityCatalog
catalog: hive_metastore
databaseSchema: default
token: <databricks token>
hostPort: localhost:443
connectionTimeout: 120
connectionArguments:
http_path: <http path of databricks cluster>

sourceConfig:
config:
type: DatabaseMetadata
sink:
type: metadata-rest
config: {}
workflowConfig:
loggerLevel: DEBUG
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: openmetadata
securityConfig:
jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
source:
type: unitycatalog-lineage
serviceName: local_unitycatalog
sourceConfig:
config:
type: DatabaseLineage
queryLogDuration: 1
resultLimit: 10000
sink:
type: metadata-rest
config: {}
workflowConfig:
loggerLevel: DEBUG
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: openmetadata
securityConfig:
jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
35 changes: 35 additions & 0 deletions ingestion/src/metadata/examples/workflows/unity_catalog_usage.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
source:
type: unitycatalog-usage
serviceName: local_unitycatalog
serviceConnection:
config:
type: UnityCatalog
catalog: hive_metastore
databaseSchema: default
token: <databricks token>
hostPort: localhost:443
connectionTimeout: 120
connectionArguments:
http_path: <http path of databricks cluster>
sourceConfig:
config:
type: DatabaseUsage
queryLogDuration: 10
processor:
type: query-parser
config: {}
stage:
type: table-usage
config:
filename: /tmp/databricks_usage
bulkSink:
type: metadata-usage
config:
filename: /tmp/databricks_usage
workflowConfig:
loggerLevel: DEBUG
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: openmetadata
securityConfig:
jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,13 @@
DatabricksConnection,
)
from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.source.database.databricks.models import (
LineageColumnStreams,
LineageTableStreams,
)
from metadata.utils.constants import QUERY_WITH_DBT, QUERY_WITH_OM_VERSION
from metadata.utils.helpers import datetime_to_ts
from metadata.utils.logger import ingestion_logger

logger = ingestion_logger()
API_TIMEOUT = 10
QUERIES_PATH = "/sql/history/queries"
TABLE_LINEAGE_PATH = "/lineage-tracking/table-lineage/get"
COLUMN_LINEAGE_PATH = "/lineage-tracking/column-lineage/get"


class DatabricksClient:
Expand Down Expand Up @@ -216,55 +210,3 @@ def get_job_runs(self, job_id) -> List[dict]:
logger.error(exc)

return job_runs

def get_table_lineage(self, table_name: str) -> LineageTableStreams:
"""
Method returns table lineage details
"""
try:
data = {
"table_name": table_name,
}

response = self.client.get(
f"{self.base_url}{TABLE_LINEAGE_PATH}",
headers=self.headers,
data=json.dumps(data),
timeout=API_TIMEOUT,
).json()
if response:
return LineageTableStreams(**response)

except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(exc)

return LineageTableStreams()

def get_column_lineage(
self, table_name: str, column_name: str
) -> LineageColumnStreams:
"""
Method returns table lineage details
"""
try:
data = {
"table_name": table_name,
"column_name": column_name,
}

response = self.client.get(
f"{self.base_url}{COLUMN_LINEAGE_PATH}",
headers=self.headers,
data=json.dumps(data),
timeout=API_TIMEOUT,
).json()

if response:
return LineageColumnStreams(**response)

except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(exc)

return LineageColumnStreams()
Loading

0 comments on commit 389ae79

Please sign in to comment.