Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GEN-895: Added Glue Pipeline Lineage #18063

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
Task,
TaskStatus,
)
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.pipeline.gluePipelineConnection import (
GluePipelineConnection,
)
Expand All @@ -40,10 +41,19 @@
SourceUrl,
Timestamp,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.gluepipeline.models import (
AmazonRedshift,
CatalogSource,
JDBCSource,
JobNodeResponse,
)
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
from metadata.utils import fqn
from metadata.utils.logger import ingestion_logger
Expand All @@ -63,6 +73,17 @@
"incomplete": StatusType.Failed,
"pending": StatusType.Pending,
}
MODEL_MAP = {
"AmazonRedshiftSource": AmazonRedshift,
"AmazonRedshiftTarget": AmazonRedshift,
"AthenaConnectorSource": JDBCSource,
"JDBCConnectorSource": JDBCSource,
"JDBCConnectorTarget": JDBCSource,
"DirectJDBCSource": CatalogSource,
"RedshiftSource": CatalogSource,
"RedshiftTarget": CatalogSource,
"DirectJDBC": CatalogSource,
}


class GluepipelineSource(PipelineServiceSource):
Expand Down Expand Up @@ -145,9 +166,64 @@ def get_downstream_tasks(self, task_unique_id, tasks):
downstream_tasks.append(self.task_id_mapping[edges["DestinationId"]])
return downstream_tasks

def get_lineage_details(self, job) -> Optional[dict]:
"""
Get the Lineage Details of the pipeline
"""
lineage_details = {"sources": [], "targets": []}
try:
job_details = JobNodeResponse.model_validate(
self.glue.get_job(JobName=job)
).Job
if job_details and job_details.config_nodes:
nodes = job_details.config_nodes
for _, node in nodes.items():
for key, entity in node.items():
table_model = None
if key in MODEL_MAP:
table_model = MODEL_MAP[key].model_validate(entity)
elif "Catalog" in key:
table_model = CatalogSource.model_validate(entity)
if table_model:
for db_service_name in self.get_db_service_names():
table_entity = self.metadata.get_by_name(
entity=Table,
fqn=fqn.build(
metadata=self.metadata,
entity_type=Table,
table_name=table_model.table_name,
database_name=table_model.database_name,
schema_name=table_model.schema_name,
service_name=db_service_name,
),
)
if table_entity:
if key.endswith("Source"):
lineage_details["sources"].append(
(table_entity, "table")
)
else:
lineage_details["targets"].append(
(table_entity, "table")
)
break

except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Failed to get lineage details for job : {job} due to : {exc}"
)
return lineage_details

def yield_pipeline_status(
self, pipeline_details: Any
) -> Iterable[Either[OMetaPipelineStatus]]:
pipeline_fqn = fqn.build(
metadata=self.metadata,
entity_type=Pipeline,
service_name=self.context.get().pipeline_service,
pipeline_name=self.context.get().pipeline,
)
for job in self.job_name_list:
try:
runs = self.glue.get_job_runs(JobName=job)
Expand Down Expand Up @@ -183,12 +259,6 @@ def yield_pipeline_status(
attempt["JobRunState"].lower(), StatusType.Pending
).value,
)
pipeline_fqn = fqn.build(
metadata=self.metadata,
entity_type=Pipeline,
service_name=self.context.get().pipeline_service,
pipeline_name=self.context.get().pipeline,
)
yield Either(
right=OMetaPipelineStatus(
pipeline_fqn=pipeline_fqn,
Expand All @@ -199,7 +269,7 @@ def yield_pipeline_status(
yield Either(
left=StackTraceError(
name=pipeline_fqn,
error=f"Failed to yield pipeline status: {exc}",
error=f"Failed to yield pipeline status for job {job}: {exc}",
stackTrace=traceback.format_exc(),
)
)
Expand All @@ -210,3 +280,49 @@ def yield_pipeline_lineage_details(
"""
Get lineage between pipeline and data sources
"""
try:
for job in self.job_name_list:
lineage_enities = self.get_lineage_details(job)
pipeline_fqn = fqn.build(
metadata=self.metadata,
entity_type=Pipeline,
service_name=self.context.get().pipeline_service,
pipeline_name=self.context.get().pipeline,
)

pipeline_entity = self.metadata.get_by_name(
entity=Pipeline, fqn=pipeline_fqn
)

lineage_details = LineageDetails(
pipeline=EntityReference(
id=pipeline_entity.id.root, type="pipeline"
),
source=LineageSource.PipelineLineage,
)
for source in lineage_enities.get("sources"):
for target in lineage_enities.get("targets"):
yield Either(
right=AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=source[0].id,
type=source[1],
),
toEntity=EntityReference(
id=target[0].id,
type=target[1],
),
lineageDetails=lineage_details,
)
)
)

except Exception as exc:
yield Either(
left=StackTraceError(
name=pipeline_details.get(NAME),
error=f"Wild error ingesting pipeline lineage {pipeline_details} - {exc}",
stackTrace=traceback.format_exc(),
)
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Glue Pipeline Source Model module
"""

from typing import Optional

from pydantic import BaseModel, Field


class EntityDetails(BaseModel):
Value: str


class SourceDetails(BaseModel):
schema_details: EntityDetails = Field(alias="Schema")
table_details: EntityDetails = Field(alias="Table")


class AmazonRedshift(BaseModel):
Name: str
Data: SourceDetails
database_name: Optional[str] = None

@property
def table_name(self):
if self.Data:
return self.Data.table_details.Value
return None

@property
def schema_name(self):
if self.Data:
return self.Data.schema_details.Value
return None


class CatalogSource(BaseModel):
Name: str
database_name: str = Field(alias="Database")
schema_name: Optional[str] = None
table_name: str = Field(alias="Table")


class JDBCSource(BaseModel):
Name: str
schema_name: Optional[str] = Field(default=None, alias="SchemaName")
database_name: Optional[str] = None
table_name: str = Field(alias="ConnectionTable")


class JobNodes(BaseModel):
config_nodes: Optional[dict] = Field(alias="CodeGenConfigurationNodes")


class JobNodeResponse(BaseModel):
Job: Optional[JobNodes] = None
Loading