diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py index 73c5d1d78972..1f416bc558a9 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py @@ -228,17 +228,17 @@ def build_workflow_config_property( def clean_name_tag(tag: str) -> Optional[str]: """ - Clean the tag to be used in Airflow - :param tag: tag to be cleaned - :return: cleaned tag + Clean the tag to be used in Airflow. + Airflow supports 100 characters. We'll keep just 90 + since we add prefixes on the tags """ if not tag: return None try: - return fqn.split(tag)[-1][:100] + return fqn.split(tag)[-1][:90] except Exception as exc: logger.warning("Error cleaning tag: %s", exc) - return tag[:100] + return tag[:90] def build_dag_configs(ingestion_pipeline: IngestionPipeline) -> dict: @@ -273,8 +273,8 @@ def build_dag_configs(ingestion_pipeline: IngestionPipeline) -> dict: "OpenMetadata", clean_name_tag(ingestion_pipeline.displayName) or clean_name_tag(ingestion_pipeline.name.__root__), - ingestion_pipeline.pipelineType.value, - clean_name_tag(ingestion_pipeline.service.name), + f"type:{ingestion_pipeline.pipelineType.value}", + f"service:{clean_name_tag(ingestion_pipeline.service.name)}", ], } diff --git a/openmetadata-airflow-apis/tests/unit/test_helpers.py b/openmetadata-airflow-apis/tests/unit/test_helpers.py index 421e4b362d60..49a3cab2d913 100644 --- a/openmetadata-airflow-apis/tests/unit/test_helpers.py +++ b/openmetadata-airflow-apis/tests/unit/test_helpers.py @@ -31,4 +31,4 @@ def test_clean_tag(): assert clean_name_tag("hello") == "hello" assert clean_name_tag("hello(world)") == "hello(world)" assert clean_name_tag("service.pipeline") == "pipeline" - assert clean_name_tag(f"service.{'a' * 200}") == "a" * 100 + assert clean_name_tag(f"service.{'a' * 200}") == "a" * 90