Skip to content

Commit

Permalink
Added Glue Pipeline Lineage
Browse files Browse the repository at this point in the history
  • Loading branch information
SumanMaharana committed Oct 1, 2024
1 parent 58ed12c commit cf39c0d
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
Task,
TaskStatus,
)
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.pipeline.gluePipelineConnection import (
GluePipelineConnection,
)
Expand All @@ -40,10 +41,18 @@
SourceUrl,
Timestamp,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.gluepipeline.models import (
AmazonRedshift,
CatalogSource,
JobNodeResponse,
)
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
from metadata.utils import fqn
from metadata.utils.logger import ingestion_logger
Expand Down Expand Up @@ -145,9 +154,73 @@ def get_downstream_tasks(self, task_unique_id, tasks):
downstream_tasks.append(self.task_id_mapping[edges["DestinationId"]])
return downstream_tasks

def get_lineage_details(self, job) -> Optional[dict]:
"""
Get the Lineage Details of the pipeline
"""
lineage_details = {"sources": [], "targets": []}
try:
job_details = JobNodeResponse.model_validate(
self.glue.get_job(JobName=job)
).Job
if job_details and job_details.config_nodes:
nodes = job_details.config_nodes
for _, node in nodes.items():
for key, entity in node.items():
table_model = None
if key in ["AmazonRedshiftSource", "AmazonRedshiftTarget"]:
table_model = AmazonRedshift.model_validate(entity)
if (
key
in [
"DirectJDBCSource",
"RedshiftSource",
"RedshiftTarget",
"DirectJDBC",
]
or "Catalog" in key
):
table_model = CatalogSource.model_validate(entity)
if table_model:
for db_service_name in self.get_db_service_names():
table_entity = self.metadata.get_by_name(
entity=Table,
fqn=fqn.build(
metadata=self.metadata,
entity_type=Table,
table_name=table_model.table_name,
database_name=table_model.database_name,
schema_name=table_model.schema_name,
service_name=db_service_name,
),
)
if table_entity:
if key.endswith("Source"):
lineage_details["sources"].append(
(table_entity, "table")
)
elif key.endswith("Target"):
lineage_details["targets"].append(
(table_entity, "table")
)
break

except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Failed to get lineage details for job : {job} due to : {exc}"
)
return lineage_details

def yield_pipeline_status(
self, pipeline_details: Any
) -> Iterable[Either[OMetaPipelineStatus]]:
pipeline_fqn = fqn.build(
metadata=self.metadata,
entity_type=Pipeline,
service_name=self.context.get().pipeline_service,
pipeline_name=self.context.get().pipeline,
)
for job in self.job_name_list:
try:
runs = self.glue.get_job_runs(JobName=job)
Expand Down Expand Up @@ -183,12 +256,6 @@ def yield_pipeline_status(
attempt["JobRunState"].lower(), StatusType.Pending
).value,
)
pipeline_fqn = fqn.build(
metadata=self.metadata,
entity_type=Pipeline,
service_name=self.context.get().pipeline_service,
pipeline_name=self.context.get().pipeline,
)
yield Either(
right=OMetaPipelineStatus(
pipeline_fqn=pipeline_fqn,
Expand All @@ -199,7 +266,7 @@ def yield_pipeline_status(
yield Either(
left=StackTraceError(
name=pipeline_fqn,
error=f"Failed to yield pipeline status: {exc}",
error=f"Failed to yield pipeline status for job {job}: {exc}",
stackTrace=traceback.format_exc(),
)
)
Expand All @@ -210,3 +277,49 @@ def yield_pipeline_lineage_details(
"""
Get lineage between pipeline and data sources
"""
try:
for job in self.job_name_list:
lineage_enities = self.get_lineage_details(job)
pipeline_fqn = fqn.build(
metadata=self.metadata,
entity_type=Pipeline,
service_name=self.context.get().pipeline_service,
pipeline_name=self.context.get().pipeline,
)

pipeline_entity = self.metadata.get_by_name(
entity=Pipeline, fqn=pipeline_fqn
)

lineage_details = LineageDetails(
pipeline=EntityReference(
id=pipeline_entity.id.root, type="pipeline"
),
source=LineageSource.PipelineLineage,
)
for source in lineage_enities.get("sources"):
for target in lineage_enities.get("targets"):
yield Either(
right=AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=source[0].id,
type=source[1],
),
toEntity=EntityReference(
id=target[0].id,
type=target[1],
),
lineageDetails=lineage_details,
)
)
)

except Exception as exc:
yield Either(
left=StackTraceError(
name=pipeline_details.get(NAME),
error=f"Wild error ingesting pipeline lineage {pipeline_details} - {exc}",
stackTrace=traceback.format_exc(),
)
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Glue Pipeline Source Model module
"""

from typing import Optional

from pydantic import BaseModel, Field


class EntityDetails(BaseModel):
Value: str


class SourceDetails(BaseModel):
schema_details: EntityDetails = Field(alias="Schema")
table_details: EntityDetails = Field(alias="Table")


class AmazonRedshift(BaseModel):
Name: str
Data: SourceDetails
database_name: Optional[str] = None

@property
def table_name(self):
if self.Data:
return self.Data.table_details.Value
return None

@property
def schema_name(self):
if self.Data:
return self.Data.schema_details.Value
return None


class CatalogSource(BaseModel):
Name: str
database_name: str = Field(alias="Database")
schema_name: Optional[str] = None
table_name: str = Field(alias="Table")


class JobNodes(BaseModel):
config_nodes: Optional[dict] = Field(alias="CodeGenConfigurationNodes")


class JobNodeResponse(BaseModel):
Job: Optional[JobNodes] = None

0 comments on commit cf39c0d

Please sign in to comment.