From 559198e1549beb1d7fe1b6b1ae8c0ec530756045 Mon Sep 17 00:00:00 2001 From: SumanMaharana Date: Tue, 1 Oct 2024 17:15:55 +0530 Subject: [PATCH] Optimize --- .../source/pipeline/gluepipeline/metadata.py | 29 ++++++++++--------- .../source/pipeline/gluepipeline/models.py | 7 +++++ 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/metadata.py index 8583984bd4ad..c8d2302dcf6c 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/metadata.py @@ -51,6 +51,7 @@ from metadata.ingestion.source.pipeline.gluepipeline.models import ( AmazonRedshift, CatalogSource, + JDBCSource, JobNodeResponse, ) from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource @@ -72,6 +73,17 @@ "incomplete": StatusType.Failed, "pending": StatusType.Pending, } +MODEL_MAP = { + "AmazonRedshiftSource": AmazonRedshift, + "AmazonRedshiftTarget": AmazonRedshift, + "AthenaConnectorSource": JDBCSource, + "JDBCConnectorSource": JDBCSource, + "JDBCConnectorTarget": JDBCSource, + "DirectJDBCSource": CatalogSource, + "RedshiftSource": CatalogSource, + "RedshiftTarget": CatalogSource, + "DirectJDBC": CatalogSource, +} class GluepipelineSource(PipelineServiceSource): @@ -168,18 +180,9 @@ def get_lineage_details(self, job) -> Optional[dict]: for _, node in nodes.items(): for key, entity in node.items(): table_model = None - if key in ["AmazonRedshiftSource", "AmazonRedshiftTarget"]: - table_model = AmazonRedshift.model_validate(entity) - if ( - key - in [ - "DirectJDBCSource", - "RedshiftSource", - "RedshiftTarget", - "DirectJDBC", - ] - or "Catalog" in key - ): + if key in MODEL_MAP: + table_model = MODEL_MAP[key].model_validate(entity) + elif "Catalog" in key: table_model = CatalogSource.model_validate(entity) if table_model: for db_service_name in self.get_db_service_names(): @@ -199,7 +202,7 @@ def get_lineage_details(self, job) -> Optional[dict]: lineage_details["sources"].append( (table_entity, "table") ) - elif key.endswith("Target"): + else: lineage_details["targets"].append( (table_entity, "table") ) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/models.py b/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/models.py index 9338f4a289f9..137a6e16df34 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/models.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/models.py @@ -52,6 +52,13 @@ class CatalogSource(BaseModel): table_name: str = Field(alias="Table") +class JDBCSource(BaseModel): + Name: str + schema_name: Optional[str] = Field(default=None, alias="SchemaName") + database_name: Optional[str] = None + table_name: str = Field(alias="ConnectionTable") + + class JobNodes(BaseModel): config_nodes: Optional[dict] = Field(alias="CodeGenConfigurationNodes")