Skip to content

Commit

Permalink
Fix 13823: Validate and Parse gracefully IngestionPipeline (open-meta…
Browse files Browse the repository at this point in the history
…data#14461)

* fixed ingestion pipeline parsing

* Added validation for automation workflow

---------

Co-authored-by: Pere Miquel Brull <peremiquelbrull@gmail.com>
  • Loading branch information
OnkarVO7 and pmbrull authored Dec 22, 2023
1 parent 0a3b03b commit 79444f4
Show file tree
Hide file tree
Showing 5 changed files with 449 additions and 68 deletions.
105 changes: 79 additions & 26 deletions ingestion/src/metadata/ingestion/api/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
"""
Helper to parse workflow configurations
"""
from typing import Optional, Type, TypeVar, Union
from typing import Type, TypeVar, Union

from pydantic import BaseModel, ValidationError

from metadata.generated.schema.entity.automations.testServiceConnection import (
TestServiceConnectionRequest,
from metadata.generated.schema.entity.automations.workflow import (
Workflow as AutomationWorkflow,
)
from metadata.generated.schema.entity.services.dashboardService import (
DashboardConnection,
Expand All @@ -26,6 +26,9 @@
DatabaseConnection,
DatabaseServiceType,
)
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
IngestionPipeline,
)
from metadata.generated.schema.entity.services.messagingService import (
MessagingConnection,
MessagingServiceType,
Expand Down Expand Up @@ -270,13 +273,11 @@ def _parse_inner_connection(config_dict: dict, source_type: str) -> None:
:param config_dict: JSON configuration
:param source_type: source type name, e.g., Airflow.
"""
inner_source_type = config_dict["source"]["serviceConnection"]["config"][
"connection"
]["type"]
inner_source_type = config_dict["type"]
inner_service_type = get_service_type(inner_source_type)
inner_connection_class = get_connection_class(inner_source_type, inner_service_type)
_unsafe_parse_config(
config=config_dict["source"]["serviceConnection"]["config"]["connection"],
config=config_dict,
cls=inner_connection_class,
message=f"Error parsing the inner service connection for {source_type}",
)
Expand All @@ -303,7 +304,12 @@ def parse_service_connection(config_dict: dict) -> None:

if source_type in HAS_INNER_CONNECTION:
# We will first parse the inner `connection` configuration
_parse_inner_connection(config_dict, source_type)
_parse_inner_connection(
config_dict["source"]["serviceConnection"]["config"]["connection"][
"config"
]["connection"],
source_type,
)

# Parse the service connection dictionary with the scoped class
_unsafe_parse_config(
Expand Down Expand Up @@ -400,37 +406,84 @@ def parse_workflow_config_gracefully(
raise ParsingConfigurationError("Uncaught error when parsing the workflow!")


def parse_test_connection_request_gracefully(
def parse_ingestion_pipeline_config_gracefully(
config_dict: dict,
) -> Optional[TestServiceConnectionRequest]:
) -> IngestionPipeline:
"""
This function either correctly parses the pydantic class,
or throws a scoped error while fetching the required source
connection class
This function either correctly parses the pydantic class, or
throws a scoped error while fetching the required source connection
class.
:param config_dict: JSON workflow config
:return: TestServiceConnectionRequest or scoped error
:param config_dict: JSON ingestion pipeline config
:return:Ingestion Pipeline config or scoped error
"""

try:
test_service_connection = TestServiceConnectionRequest.parse_obj(config_dict)
return test_service_connection
ingestion_pipeline = IngestionPipeline.parse_obj(config_dict)
return ingestion_pipeline

except ValidationError as err:
# Unsafe access to the keys. Allow a KeyError if the config is not well formatted
source_type = config_dict["connection"]["config"]["type"]
logger.warning(
f"Error parsing the Workflow Configuration for {source_type} ingestion: {err}"
except ValidationError:
source_config_type = config_dict["sourceConfig"]["config"].get("type")

if source_config_type is None:
raise InvalidWorkflowException("Missing type in the sourceConfig config")

source_config_class = get_source_config_class(source_config_type)

_unsafe_parse_config(
config=config_dict["sourceConfig"]["config"],
cls=source_config_class,
message="Error parsing the source config",
)

raise ParsingConfigurationError(
"Uncaught error when parsing the Ingestion Pipeline!"
)


def parse_automation_workflow_gracefully(
config_dict: dict,
) -> AutomationWorkflow:
"""
This function either correctly parses the pydantic class, or
throws a scoped error while fetching the required source connection
class.
:param config_dict: JSON AutomationWorkflow config
:return: AutomationWorkflow config or scoped error
"""

try:
automation_workflow = AutomationWorkflow.parse_obj(config_dict)
return automation_workflow

except ValidationError:
source_type = config_dict["request"]["connection"]["config"].get("type")

if source_type is None:
raise InvalidWorkflowException("Missing type in the connection config")

logger.debug(
f"Error parsing the Workflow Configuration for {source_type} ingestion"
)

service_type = get_service_type(source_type)
connection_class = get_connection_class(source_type, service_type)

# Parse the dictionary with the scoped class
if source_type in HAS_INNER_CONNECTION:
# We will first parse the inner `connection` configuration
_parse_inner_connection(
config_dict["request"]["connection"]["config"]["connection"],
source_type,
)

# Parse the service connection dictionary with the scoped class
_unsafe_parse_config(
config=config_dict["connection"]["config"],
config=config_dict["request"]["connection"]["config"],
cls=connection_class,
message="Error parsing the connection config",
message="Error parsing the service connection",
)

raise ParsingConfigurationError("Uncaught error when parsing the workflow!")
raise ParsingConfigurationError(
"Uncaught error when parsing the Ingestion Pipeline!"
)
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
IngestionPipeline,
PipelineStatus,
)
from metadata.ingestion.api.parser import parse_ingestion_pipeline_config_gracefully
from metadata.ingestion.ometa.client import REST
from metadata.utils.logger import ometa_logger

Expand Down Expand Up @@ -79,7 +80,7 @@ def run_pipeline(self, ingestion_pipeline_id: str) -> IngestionPipeline:
f"{self.get_suffix(IngestionPipeline)}/trigger/{ingestion_pipeline_id}"
)

return IngestionPipeline.parse_obj(resp)
return parse_ingestion_pipeline_config_gracefully(resp)

def get_pipeline_status_between_ts(
self,
Expand Down Expand Up @@ -125,6 +126,6 @@ def get_ingestion_pipeline_by_name(
)

if hasattr(resp, "sourceConfig"):
return IngestionPipeline.parse_obj(resp)
return parse_ingestion_pipeline_config_gracefully(resp)

return None
Loading

0 comments on commit 79444f4

Please sign in to comment.