Skip to content

Commit

Permalink
Fixes: Parse postgres json column fields (#17645)
Browse files Browse the repository at this point in the history
  • Loading branch information
SumanMaharana authored Sep 3, 2024
1 parent 46e98e1 commit e93cf23
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
get_columns,
get_etable_owner,
get_foreign_keys,
get_json_fields_and_type,
get_table_comment,
get_table_owner,
get_view_definition,
Expand Down Expand Up @@ -138,6 +139,7 @@
Inspector.get_all_table_ddls = get_all_table_ddls
Inspector.get_table_ddl = get_table_ddl
Inspector.get_table_owner = get_etable_owner
Inspector.get_json_fields_and_type = get_json_fields_and_type

PGDialect.get_foreign_keys = get_foreign_keys

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,3 +211,88 @@
n.oid = c.relnamespace
ORDER BY 1
"""

POSTGRES_GET_JSON_FIELDS = """
WITH RECURSIVE json_hierarchy AS (
SELECT
key AS path,
json_typeof(value) AS type,
value,
json_build_object() AS properties,
key AS title
FROM
{table_name} tbd,
LATERAL json_each({column_name}::json)
),
build_hierarchy AS (
SELECT
path,
type,
title,
CASE
WHEN type = 'object' THEN
json_build_object(
'title', title,
'type', 'object',
'properties', (
SELECT json_object_agg(
key,
json_build_object(
'title', key,
'type', json_typeof(value),
'properties', (
CASE
WHEN json_typeof(value) = 'object' THEN
(
SELECT json_object_agg(
key,
json_build_object(
'title', key,
'type', json_typeof(value),
'properties',
json_build_object()
)
)
FROM json_each(value::json) AS sub_key_value
)
ELSE json_build_object()
END
)
)
)
FROM json_each(value::json) AS key_value
)
)
WHEN type = 'array' THEN
json_build_object(
'title', title,
'type', 'array',
'properties', json_build_object()
)
ELSE
json_build_object(
'title', title,
'type', type
)
END AS hierarchy
FROM
json_hierarchy
),
aggregate_hierarchy AS (
select
json_build_object(
'title','{column_name}',
'type','object',
'properties',
json_object_agg(
path,
hierarchy
)) AS result
FROM
build_hierarchy
)
SELECT
result
FROM
aggregate_hierarchy;
"""
26 changes: 26 additions & 0 deletions ingestion/src/metadata/ingestion/source/database/postgres/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"""
Postgres SQLAlchemy util methods
"""
import json
import re
import traceback
from typing import Dict, Optional, Tuple
Expand All @@ -23,15 +24,18 @@
from sqlalchemy.engine import reflection
from sqlalchemy.sql import sqltypes

from metadata.generated.schema.entity.data.table import Column
from metadata.ingestion.source.database.postgres.queries import (
POSTGRES_COL_IDENTITY,
POSTGRES_FETCH_FK,
POSTGRES_GET_JSON_FIELDS,
POSTGRES_GET_SERVER_VERSION,
POSTGRES_SQL_COLUMNS,
POSTGRES_TABLE_COMMENTS,
POSTGRES_TABLE_OWNERS,
POSTGRES_VIEW_DEFINITIONS,
)
from metadata.parsers.json_schema_parser import parse_json_schema
from metadata.utils.logger import utils_logger
from metadata.utils.sqlalchemy_utils import (
get_table_comment_wrapper,
Expand Down Expand Up @@ -186,6 +190,28 @@ def get_table_comment(
)


@reflection.cache
def get_json_fields_and_type(
self, table_name, column_name, schema=None, **kw
): # pylint: disable=unused-argument
try:
query = POSTGRES_GET_JSON_FIELDS.format(
table_name=table_name, column_name=column_name
)
cursor = self.engine.execute(query)
result = cursor.fetchone()
if result:
parsed_column = parse_json_schema(json.dumps(result[0]), Column)
if parsed_column:
return parsed_column[0].children
except Exception as err:
logger.warning(
f"Unable to parse the json fields for {table_name}.{column_name} - {err}"
)
logger.debug(traceback.format_exc())
return None


@reflection.cache
def get_columns( # pylint: disable=too-many-locals
self, connection, table_name, schema=None, **kw
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,26 @@ def _process_complex_col_type(self, parsed_string: dict, column: dict) -> Column
]
return Column(**parsed_string)

@calculate_execution_time()
def process_json_type_column_fields( # pylint: disable=too-many-locals
self, schema_name: str, table_name: str, column_name: str, inspector: Inspector
) -> Optional[List[Column]]:
"""
Parse fields column with json data types
"""
try:
if hasattr(inspector, "get_json_fields_and_type"):
result = inspector.get_json_fields_and_type(
table_name, column_name, schema_name
)
return result

except NotImplementedError:
logger.debug(
"Cannot parse json fields for table column [{schema_name}.{table_name}.{col_name}]: NotImplementedError"
)
return None

@calculate_execution_time()
def get_columns_and_constraints( # pylint: disable=too-many-locals
self, schema_name: str, table_name: str, db_name: str, inspector: Inspector
Expand Down Expand Up @@ -271,6 +291,12 @@ def get_columns_and_constraints( # pylint: disable=too-many-locals
precision,
)
col_data_length = 1 if col_data_length is None else col_data_length

if col_type == "JSON":
children = self.process_json_type_column_fields(
schema_name, table_name, column.get("name"), inspector
)

om_column = Column(
name=ColumnName(
root=column["name"]
Expand Down
2 changes: 1 addition & 1 deletion ingestion/src/metadata/parsers/json_schema_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def get_json_schema_fields(
displayName=value.get("title"),
dataType=JsonSchemaDataTypes(value.get("type", "unknown")).name,
description=value.get("description"),
children=get_json_schema_fields(value.get("properties"))
children=get_json_schema_fields(value.get("properties"), cls=cls)
if value.get("type") == "object"
else None,
)
Expand Down
60 changes: 60 additions & 0 deletions ingestion/tests/unit/test_json_schema_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"""
from unittest import TestCase

from metadata.generated.schema.entity.data.table import Column
from metadata.parsers.json_schema_parser import parse_json_schema


Expand Down Expand Up @@ -47,7 +48,51 @@ class JsonSchemaParserTests(TestCase):
}
}"""

sample_postgres_json_schema = """{
"title": "review_details",
"type": "object",
"properties":
{
"staff": {
"title": "staff",
"type": "array",
"properties": {}
},
"services": {
"title": "services",
"type": "object",
"properties": {
"lunch": {
"title": "lunch",
"type": "string",
"properties": {}
},
"check_in": {
"title": "check_in",
"type": "string",
"properties": {}
},
"check_out": {
"title": "check_out",
"type": "string",
"properties": {}
},
"additional_services": {
"title": "additional_services",
"type": "array",
"properties": {}
}
}
},
"overall_experience": {
"title": "overall_experience",
"type": "string"
}
}
}"""

parsed_schema = parse_json_schema(sample_json_schema)
parsed_postgres_schema = parse_json_schema(sample_postgres_json_schema, Column)

def test_schema_name(self):
self.assertEqual(self.parsed_schema[0].name.root, "Person")
Expand Down Expand Up @@ -83,3 +128,18 @@ def test_field_descriptions(self):
"Age in years which must be equal to or greater than zero.",
},
)

def test_parse_postgres_json_fields(self):
self.assertEqual(self.parsed_postgres_schema[0].name.root, "review_details")
self.assertEqual(self.parsed_postgres_schema[0].children[0].name.root, "staff")
self.assertEqual(
self.parsed_postgres_schema[0].children[1].name.root, "services"
)
self.assertEqual(
self.parsed_postgres_schema[0].children[1].children[0].name.root, "lunch"
)
self.assertEqual(
self.parsed_postgres_schema[0].children[1].dataType.name, "RECORD"
)
self.assertEqual(len(self.parsed_postgres_schema[0].children), 3)
self.assertEqual(len(self.parsed_postgres_schema[0].children[1].children), 4)

0 comments on commit e93cf23

Please sign in to comment.