From 23affeaadef7aedf4758e98c54c3ff1bd772cc7d Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Thu, 16 May 2024 19:24:36 +0200 Subject: [PATCH] MINOR - Airflow dag_tag PK --- .../workflows/ingestion/common.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py index 73c5d1d78972..1f416bc558a9 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py @@ -228,17 +228,17 @@ def build_workflow_config_property( def clean_name_tag(tag: str) -> Optional[str]: """ - Clean the tag to be used in Airflow - :param tag: tag to be cleaned - :return: cleaned tag + Clean the tag to be used in Airflow. + Airflow supports 100 characters. We'll keep just 90 + since we add prefixes on the tags """ if not tag: return None try: - return fqn.split(tag)[-1][:100] + return fqn.split(tag)[-1][:90] except Exception as exc: logger.warning("Error cleaning tag: %s", exc) - return tag[:100] + return tag[:90] def build_dag_configs(ingestion_pipeline: IngestionPipeline) -> dict: @@ -273,8 +273,8 @@ def build_dag_configs(ingestion_pipeline: IngestionPipeline) -> dict: "OpenMetadata", clean_name_tag(ingestion_pipeline.displayName) or clean_name_tag(ingestion_pipeline.name.__root__), - ingestion_pipeline.pipelineType.value, - clean_name_tag(ingestion_pipeline.service.name), + f"type:{ingestion_pipeline.pipelineType.value}", + f"service:{clean_name_tag(ingestion_pipeline.service.name)}", ], }