Skip to content

Commit

Permalink
MINOR: raise lineage error when table does not exist (#16756)
Browse files Browse the repository at this point in the history
* raise lineage error when table does not exist

* added test case for partial success

* format

* format

* fixed tests
  • Loading branch information
sushi30 committed Jun 24, 2024
1 parent 38fe061 commit 54ca82f
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 41 deletions.
82 changes: 45 additions & 37 deletions ingestion/src/metadata/ingestion/lineage/sql_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"""
Helper functions to handle SQL lineage operations
"""
import itertools
import traceback
from typing import Any, Iterable, List, Optional, Tuple

Expand Down Expand Up @@ -254,35 +255,34 @@ def _build_table_lineage(
"""
Prepare the lineage request generator
"""
if from_entity and to_entity:
col_lineage = get_column_lineage(
to_entity=to_entity,
to_table_raw_name=str(to_table_raw_name),
from_entity=from_entity,
from_table_raw_name=str(from_table_raw_name),
column_lineage_map=column_lineage_map,
)
lineage_details = LineageDetails(sqlQuery=query, source=lineage_source)
if col_lineage:
lineage_details.columnsLineage = col_lineage
lineage = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=from_entity.id.root,
type="table",
),
toEntity=EntityReference(
id=to_entity.id.root,
type="table",
),
)
col_lineage = get_column_lineage(
to_entity=to_entity,
to_table_raw_name=str(to_table_raw_name),
from_entity=from_entity,
from_table_raw_name=str(from_table_raw_name),
column_lineage_map=column_lineage_map,
)
lineage_details = LineageDetails(sqlQuery=query, source=lineage_source)
if col_lineage:
lineage_details.columnsLineage = col_lineage
lineage = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=from_entity.id.root,
type="table",
),
toEntity=EntityReference(
id=to_entity.id.root,
type="table",
),
)
if lineage_details:
lineage.edge.lineageDetails = lineage_details
yield Either(right=lineage)
)
if lineage_details:
lineage.edge.lineageDetails = lineage_details
yield Either(right=lineage)


# pylint: disable=too-many-arguments
# pylint: disable=too-many-arguments,too-many-locals
def _create_lineage_by_table_name(
metadata: OpenMetadata,
from_table: str,
Expand Down Expand Up @@ -315,17 +315,25 @@ def _create_lineage_by_table_name(
table_name=to_table,
)

for from_entity in from_table_entities or []:
for to_entity in to_table_entities or []:
yield from _build_table_lineage(
to_entity=to_entity,
from_entity=from_entity,
to_table_raw_name=to_table,
from_table_raw_name=from_table,
query=query,
column_lineage_map=column_lineage_map,
lineage_source=lineage_source,
)
for table_name, entity in (
(from_table, from_table_entities),
(to_table, to_table_entities),
):
if entity is None:
raise RuntimeError(f"Table entity not found: [{table_name}]")

for from_entity, to_entity in itertools.product(
from_table_entities, to_table_entities
):
yield from _build_table_lineage(
to_entity=to_entity,
from_entity=from_entity,
to_table_raw_name=to_table,
from_table_raw_name=from_table,
query=query,
column_lineage_map=column_lineage_map,
lineage_source=lineage_source,
)

except Exception as exc:
yield Either(
Expand Down
6 changes: 6 additions & 0 deletions ingestion/tests/integration/postgres/bad_query_log.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
query_text,database_name,schema_name
"select * from sales",dvdrental,public
"select * from marketing",dvdrental,public
"insert into marketing select * from sales",default,public
"insert into dvdrental.public.staff as (select id from non_existent union select * from dvdrental.public.customer)",dvdrental,public
"insert into dvdrental.public.actor as (select * from dvdrental.public.customer)",dvdrental,public
88 changes: 84 additions & 4 deletions ingestion/tests/integration/postgres/test_postgres.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import sys
import time
from os import path

import pytest

Expand Down Expand Up @@ -81,6 +83,7 @@ def db_service(metadata, postgres_container):

@pytest.fixture(scope="module")
def ingest_metadata(db_service, metadata: OpenMetadata):
search_cache.clear()
workflow_config = OpenMetadataWorkflowConfig(
source=Source(
type=db_service.connection.config.type.value.lower(),
Expand All @@ -100,7 +103,7 @@ def ingest_metadata(db_service, metadata: OpenMetadata):


@pytest.fixture(scope="module")
def ingest_lineage(db_service, ingest_metadata, metadata: OpenMetadata):
def ingest_postgres_lineage(db_service, ingest_metadata, metadata: OpenMetadata):
workflow_config = OpenMetadataWorkflowConfig(
source=Source(
type="postgres-lineage",
Expand All @@ -112,13 +115,67 @@ def ingest_lineage(db_service, ingest_metadata, metadata: OpenMetadata):
type="metadata-rest",
config={},
),
workflowConfig=WorkflowConfig(openMetadataServerConfig=metadata.config),
workflowConfig=WorkflowConfig(
loggerLevel=LogLevels.DEBUG, openMetadataServerConfig=metadata.config
),
)
metadata_ingestion = MetadataWorkflow.create(workflow_config)
metadata_ingestion.execute()
metadata_ingestion.raise_from_status()
return


def test_ingest_query_log(db_service, ingest_metadata, metadata: OpenMetadata):
reindex_search(
metadata
) # since query cache is stored in ES, we need to reindex to avoid having a stale cache
workflow_config = {
"source": {
"type": "query-log-lineage",
"serviceName": db_service.fullyQualifiedName.root,
"sourceConfig": {
"config": {
"type": "DatabaseLineage",
"queryLogFilePath": path.dirname(__file__) + "/bad_query_log.csv",
}
},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"loggerLevel": "DEBUG",
"openMetadataServerConfig": metadata.config.dict(),
},
}
metadata_ingestion = MetadataWorkflow.create(workflow_config)
metadata_ingestion.execute()
assert len(metadata_ingestion.source.status.failures) == 2
for failure in metadata_ingestion.source.status.failures:
assert "Table entity not found" in failure.error
customer_table: Table = metadata.get_by_name(
Table,
f"{db_service.fullyQualifiedName.root}.dvdrental.public.customer",
nullable=False,
)
actor_table: Table = metadata.get_by_name(
Table,
f"{db_service.fullyQualifiedName.root}.dvdrental.public.actor",
nullable=False,
)
staff_table: Table = metadata.get_by_name(
Table,
f"{db_service.fullyQualifiedName.root}.dvdrental.public.staff",
nullable=False,
)
edge = metadata.get_lineage_edge(
str(customer_table.id.root), str(actor_table.id.root)
)
assert edge is not None
edge = metadata.get_lineage_edge(
str(customer_table.id.root), str(staff_table.id.root)
)
assert edge is not None


@pytest.fixture(scope="module")
def run_profiler_workflow(ingest_metadata, db_service, metadata):
workflow_config = OpenMetadataWorkflowConfig(
Expand Down Expand Up @@ -209,7 +266,7 @@ def test_profiler(run_profiler_workflow):
pass


def test_lineage(ingest_lineage):
def test_db_lineage(ingest_postgres_lineage):
pass


Expand Down Expand Up @@ -252,7 +309,7 @@ def run_usage_workflow(db_service, metadata):
reason="'metadata.ingestion.lineage.sql_lineage.search_cache' gets corrupted with invalid data."
" See issue https://github.com/open-metadata/OpenMetadata/issues/16408"
)
def test_usage_delete_usage(db_service, ingest_lineage, metadata):
def test_usage_delete_usage(db_service, ingest_postgres_lineage, metadata):
workflow_config = {
"source": {
"type": "postgres-usage",
Expand Down Expand Up @@ -304,3 +361,26 @@ def test_usage_delete_usage(db_service, ingest_lineage, metadata):
metadata_ingestion.execute()
metadata_ingestion.raise_from_status()
run_usage_workflow(db_service, metadata)


def reindex_search(metadata: OpenMetadata, timeout=60):
start = time.time()
status = None
while status is None or status == "running":
response = metadata.client.get(
"/apps/name/SearchIndexingApplication/status?offset=0&limit=1"
)
status = response["data"][0]["status"]
if time.time() - start > timeout:
raise TimeoutError("Timed out waiting for reindexing to start")
time.sleep(1)
time.sleep(1)
metadata.client.post("/apps/trigger/SearchIndexingApplication")
while status != "success":
response = metadata.client.get(
"/apps/name/SearchIndexingApplication/status?offset=0&limit=1"
)
status = response["data"][0]["status"]
if time.time() - start > timeout:
raise TimeoutError("Timed out waiting for reindexing to complete")
time.sleep(1)

0 comments on commit 54ca82f

Please sign in to comment.