Skip to content

Commit

Permalink
MINOR - Airflow dag_tag PK
Browse files Browse the repository at this point in the history
  • Loading branch information
pmbrull committed May 16, 2024
1 parent 84fdc77 commit 23affea
Showing 1 changed file with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -228,17 +228,17 @@ def build_workflow_config_property(

def clean_name_tag(tag: str) -> Optional[str]:
"""
Clean the tag to be used in Airflow
:param tag: tag to be cleaned
:return: cleaned tag
Clean the tag to be used in Airflow.
Airflow supports 100 characters. We'll keep just 90
since we add prefixes on the tags
"""
if not tag:
return None
try:
return fqn.split(tag)[-1][:100]
return fqn.split(tag)[-1][:90]
except Exception as exc:
logger.warning("Error cleaning tag: %s", exc)
return tag[:100]
return tag[:90]


def build_dag_configs(ingestion_pipeline: IngestionPipeline) -> dict:
Expand Down Expand Up @@ -273,8 +273,8 @@ def build_dag_configs(ingestion_pipeline: IngestionPipeline) -> dict:
"OpenMetadata",
clean_name_tag(ingestion_pipeline.displayName)
or clean_name_tag(ingestion_pipeline.name.__root__),
ingestion_pipeline.pipelineType.value,
clean_name_tag(ingestion_pipeline.service.name),
f"type:{ingestion_pipeline.pipelineType.value}",
f"service:{clean_name_tag(ingestion_pipeline.service.name)}",
],
}

Expand Down

0 comments on commit 23affea

Please sign in to comment.