Skip to content

Commit

Permalink
Optimize
Browse files Browse the repository at this point in the history
  • Loading branch information
SumanMaharana committed Oct 1, 2024
1 parent cf39c0d commit 559198e
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
from metadata.ingestion.source.pipeline.gluepipeline.models import (
AmazonRedshift,
CatalogSource,
JDBCSource,
JobNodeResponse,
)
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
Expand All @@ -72,6 +73,17 @@
"incomplete": StatusType.Failed,
"pending": StatusType.Pending,
}
MODEL_MAP = {
"AmazonRedshiftSource": AmazonRedshift,
"AmazonRedshiftTarget": AmazonRedshift,
"AthenaConnectorSource": JDBCSource,
"JDBCConnectorSource": JDBCSource,
"JDBCConnectorTarget": JDBCSource,
"DirectJDBCSource": CatalogSource,
"RedshiftSource": CatalogSource,
"RedshiftTarget": CatalogSource,
"DirectJDBC": CatalogSource,
}


class GluepipelineSource(PipelineServiceSource):
Expand Down Expand Up @@ -168,18 +180,9 @@ def get_lineage_details(self, job) -> Optional[dict]:
for _, node in nodes.items():
for key, entity in node.items():
table_model = None
if key in ["AmazonRedshiftSource", "AmazonRedshiftTarget"]:
table_model = AmazonRedshift.model_validate(entity)
if (
key
in [
"DirectJDBCSource",
"RedshiftSource",
"RedshiftTarget",
"DirectJDBC",
]
or "Catalog" in key
):
if key in MODEL_MAP:
table_model = MODEL_MAP[key].model_validate(entity)
elif "Catalog" in key:
table_model = CatalogSource.model_validate(entity)
if table_model:
for db_service_name in self.get_db_service_names():
Expand All @@ -199,7 +202,7 @@ def get_lineage_details(self, job) -> Optional[dict]:
lineage_details["sources"].append(
(table_entity, "table")
)
elif key.endswith("Target"):
else:
lineage_details["targets"].append(
(table_entity, "table")
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ class CatalogSource(BaseModel):
table_name: str = Field(alias="Table")


class JDBCSource(BaseModel):
Name: str
schema_name: Optional[str] = Field(default=None, alias="SchemaName")
database_name: Optional[str] = None
table_name: str = Field(alias="ConnectionTable")


class JobNodes(BaseModel):
config_nodes: Optional[dict] = Field(alias="CodeGenConfigurationNodes")

Expand Down

0 comments on commit 559198e

Please sign in to comment.