diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py index 06439f80e52b..a61c2039c7d2 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py @@ -36,6 +36,7 @@ ) from metadata.ingestion.models.encoders import show_secrets_encoder from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.utils import fqn from metadata.workflow.workflow_output_handler import print_status # pylint: disable=ungrouped-imports @@ -225,6 +226,19 @@ def build_workflow_config_property( ) +def clean_name_tag(tag: str) -> str: + """ + Clean the tag to be used in Airflow + :param tag: tag to be cleaned + :return: cleaned tag + """ + try: + return fqn.split(tag)[-1][:100] + except Exception as exc: + logger.warning("Error cleaning tag: %s", exc) + return tag[:100] + + def build_dag_configs(ingestion_pipeline: IngestionPipeline) -> dict: """ Prepare kwargs to send to DAG @@ -255,7 +269,8 @@ def build_dag_configs(ingestion_pipeline: IngestionPipeline) -> dict: "schedule_interval": ingestion_pipeline.airflowConfig.scheduleInterval, "tags": [ "OpenMetadata", - ingestion_pipeline.displayName or ingestion_pipeline.name.__root__, + clean_name_tag(ingestion_pipeline.displayName) + or clean_name_tag(ingestion_pipeline.name.__root__), ingestion_pipeline.pipelineType.value, ingestion_pipeline.service.name, ], diff --git a/openmetadata-airflow-apis/tests/unit/test_helpers.py b/openmetadata-airflow-apis/tests/unit/test_helpers.py index f90e27be5899..421e4b362d60 100644 --- a/openmetadata-airflow-apis/tests/unit/test_helpers.py +++ b/openmetadata-airflow-apis/tests/unit/test_helpers.py @@ -11,21 +11,24 @@ """ Test helper functions """ -from unittest import TestCase - from openmetadata_managed_apis.api.utils import clean_dag_id +from openmetadata_managed_apis.workflows.ingestion.common import clean_name_tag -class TestHelpers(TestCase): +def test_clean_dag_id(): """ - Methods to validate helpers on REST APIs + To make sure airflow can parse it """ + assert clean_dag_id("hello") == "hello" + assert clean_dag_id("hello(world)") == "hello_world_" + assert clean_dag_id("hello-world") == "hello-world" + assert clean_dag_id("%%&^++hello__") == "_hello__" + + +def test_clean_tag(): + """We can properly tag airflow DAGs""" - def test_clean_dag_id(self): - """ - To make sure airflow can parse it - """ - self.assertEqual(clean_dag_id("hello"), "hello") - self.assertEqual(clean_dag_id("hello(world)"), "hello_world_") - self.assertEqual(clean_dag_id("hello-world"), "hello-world") - self.assertEqual(clean_dag_id("%%&^++hello__"), "_hello__") + assert clean_name_tag("hello") == "hello" + assert clean_name_tag("hello(world)") == "hello(world)" + assert clean_name_tag("service.pipeline") == "pipeline" + assert clean_name_tag(f"service.{'a' * 200}") == "a" * 100