diff --git a/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/metadata.py index 94dc58f7c563..c8d2302dcf6c 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/metadata.py @@ -25,6 +25,7 @@ Task, TaskStatus, ) +from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.connections.pipeline.gluePipelineConnection import ( GluePipelineConnection, ) @@ -40,10 +41,19 @@ SourceUrl, Timestamp, ) +from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails +from metadata.generated.schema.type.entityLineage import Source as LineageSource +from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.pipeline.gluepipeline.models import ( + AmazonRedshift, + CatalogSource, + JDBCSource, + JobNodeResponse, +) from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource from metadata.utils import fqn from metadata.utils.logger import ingestion_logger @@ -63,6 +73,17 @@ "incomplete": StatusType.Failed, "pending": StatusType.Pending, } +MODEL_MAP = { + "AmazonRedshiftSource": AmazonRedshift, + "AmazonRedshiftTarget": AmazonRedshift, + "AthenaConnectorSource": JDBCSource, + "JDBCConnectorSource": JDBCSource, + "JDBCConnectorTarget": JDBCSource, + "DirectJDBCSource": CatalogSource, + "RedshiftSource": CatalogSource, + "RedshiftTarget": CatalogSource, + "DirectJDBC": CatalogSource, +} class GluepipelineSource(PipelineServiceSource): @@ -145,9 +166,64 @@ def get_downstream_tasks(self, task_unique_id, tasks): downstream_tasks.append(self.task_id_mapping[edges["DestinationId"]]) return downstream_tasks + def get_lineage_details(self, job) -> Optional[dict]: + """ + Get the Lineage Details of the pipeline + """ + lineage_details = {"sources": [], "targets": []} + try: + job_details = JobNodeResponse.model_validate( + self.glue.get_job(JobName=job) + ).Job + if job_details and job_details.config_nodes: + nodes = job_details.config_nodes + for _, node in nodes.items(): + for key, entity in node.items(): + table_model = None + if key in MODEL_MAP: + table_model = MODEL_MAP[key].model_validate(entity) + elif "Catalog" in key: + table_model = CatalogSource.model_validate(entity) + if table_model: + for db_service_name in self.get_db_service_names(): + table_entity = self.metadata.get_by_name( + entity=Table, + fqn=fqn.build( + metadata=self.metadata, + entity_type=Table, + table_name=table_model.table_name, + database_name=table_model.database_name, + schema_name=table_model.schema_name, + service_name=db_service_name, + ), + ) + if table_entity: + if key.endswith("Source"): + lineage_details["sources"].append( + (table_entity, "table") + ) + else: + lineage_details["targets"].append( + (table_entity, "table") + ) + break + + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning( + f"Failed to get lineage details for job : {job} due to : {exc}" + ) + return lineage_details + def yield_pipeline_status( self, pipeline_details: Any ) -> Iterable[Either[OMetaPipelineStatus]]: + pipeline_fqn = fqn.build( + metadata=self.metadata, + entity_type=Pipeline, + service_name=self.context.get().pipeline_service, + pipeline_name=self.context.get().pipeline, + ) for job in self.job_name_list: try: runs = self.glue.get_job_runs(JobName=job) @@ -183,12 +259,6 @@ def yield_pipeline_status( attempt["JobRunState"].lower(), StatusType.Pending ).value, ) - pipeline_fqn = fqn.build( - metadata=self.metadata, - entity_type=Pipeline, - service_name=self.context.get().pipeline_service, - pipeline_name=self.context.get().pipeline, - ) yield Either( right=OMetaPipelineStatus( pipeline_fqn=pipeline_fqn, @@ -199,7 +269,7 @@ def yield_pipeline_status( yield Either( left=StackTraceError( name=pipeline_fqn, - error=f"Failed to yield pipeline status: {exc}", + error=f"Failed to yield pipeline status for job {job}: {exc}", stackTrace=traceback.format_exc(), ) ) @@ -210,3 +280,49 @@ def yield_pipeline_lineage_details( """ Get lineage between pipeline and data sources """ + try: + for job in self.job_name_list: + lineage_enities = self.get_lineage_details(job) + pipeline_fqn = fqn.build( + metadata=self.metadata, + entity_type=Pipeline, + service_name=self.context.get().pipeline_service, + pipeline_name=self.context.get().pipeline, + ) + + pipeline_entity = self.metadata.get_by_name( + entity=Pipeline, fqn=pipeline_fqn + ) + + lineage_details = LineageDetails( + pipeline=EntityReference( + id=pipeline_entity.id.root, type="pipeline" + ), + source=LineageSource.PipelineLineage, + ) + for source in lineage_enities.get("sources"): + for target in lineage_enities.get("targets"): + yield Either( + right=AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference( + id=source[0].id, + type=source[1], + ), + toEntity=EntityReference( + id=target[0].id, + type=target[1], + ), + lineageDetails=lineage_details, + ) + ) + ) + + except Exception as exc: + yield Either( + left=StackTraceError( + name=pipeline_details.get(NAME), + error=f"Wild error ingesting pipeline lineage {pipeline_details} - {exc}", + stackTrace=traceback.format_exc(), + ) + ) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/models.py b/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/models.py new file mode 100644 index 000000000000..137a6e16df34 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/gluepipeline/models.py @@ -0,0 +1,67 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Glue Pipeline Source Model module +""" + +from typing import Optional + +from pydantic import BaseModel, Field + + +class EntityDetails(BaseModel): + Value: str + + +class SourceDetails(BaseModel): + schema_details: EntityDetails = Field(alias="Schema") + table_details: EntityDetails = Field(alias="Table") + + +class AmazonRedshift(BaseModel): + Name: str + Data: SourceDetails + database_name: Optional[str] = None + + @property + def table_name(self): + if self.Data: + return self.Data.table_details.Value + return None + + @property + def schema_name(self): + if self.Data: + return self.Data.schema_details.Value + return None + + +class CatalogSource(BaseModel): + Name: str + database_name: str = Field(alias="Database") + schema_name: Optional[str] = None + table_name: str = Field(alias="Table") + + +class JDBCSource(BaseModel): + Name: str + schema_name: Optional[str] = Field(default=None, alias="SchemaName") + database_name: Optional[str] = None + table_name: str = Field(alias="ConnectionTable") + + +class JobNodes(BaseModel): + config_nodes: Optional[dict] = Field(alias="CodeGenConfigurationNodes") + + +class JobNodeResponse(BaseModel): + Job: Optional[JobNodes] = None