Skip to content

Commit

Permalink
FIX - Airflow tagging long names (#16304)
Browse files Browse the repository at this point in the history
* FIX - Airflow tagging long names

* FIX - Airflow tagging long names

* FIX - Airflow tagging long names
  • Loading branch information
pmbrull authored May 16, 2024
1 parent 46e14b7 commit 44181b1
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import uuid
from datetime import datetime, timedelta
from functools import partial
from typing import Callable, Union
from typing import Callable, Optional, Union

import airflow
from airflow import DAG
Expand All @@ -36,6 +36,7 @@
)
from metadata.ingestion.models.encoders import show_secrets_encoder
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import fqn
from metadata.workflow.workflow_output_handler import print_status

# pylint: disable=ungrouped-imports
Expand Down Expand Up @@ -225,6 +226,21 @@ def build_workflow_config_property(
)


def clean_name_tag(tag: str) -> Optional[str]:
"""
Clean the tag to be used in Airflow
:param tag: tag to be cleaned
:return: cleaned tag
"""
if not tag:
return None
try:
return fqn.split(tag)[-1][:100]
except Exception as exc:
logger.warning("Error cleaning tag: %s", exc)
return tag[:100]


def build_dag_configs(ingestion_pipeline: IngestionPipeline) -> dict:
"""
Prepare kwargs to send to DAG
Expand Down Expand Up @@ -255,9 +271,10 @@ def build_dag_configs(ingestion_pipeline: IngestionPipeline) -> dict:
"schedule_interval": ingestion_pipeline.airflowConfig.scheduleInterval,
"tags": [
"OpenMetadata",
ingestion_pipeline.displayName or ingestion_pipeline.name.__root__,
clean_name_tag(ingestion_pipeline.displayName)
or clean_name_tag(ingestion_pipeline.name.__root__),
ingestion_pipeline.pipelineType.value,
ingestion_pipeline.service.name,
clean_name_tag(ingestion_pipeline.service.name),
],
}

Expand Down
27 changes: 15 additions & 12 deletions openmetadata-airflow-apis/tests/unit/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,24 @@
"""
Test helper functions
"""
from unittest import TestCase

from openmetadata_managed_apis.api.utils import clean_dag_id
from openmetadata_managed_apis.workflows.ingestion.common import clean_name_tag


class TestHelpers(TestCase):
def test_clean_dag_id():
"""
Methods to validate helpers on REST APIs
To make sure airflow can parse it
"""
assert clean_dag_id("hello") == "hello"
assert clean_dag_id("hello(world)") == "hello_world_"
assert clean_dag_id("hello-world") == "hello-world"
assert clean_dag_id("%%&^++hello__") == "_hello__"


def test_clean_tag():
"""We can properly tag airflow DAGs"""

def test_clean_dag_id(self):
"""
To make sure airflow can parse it
"""
self.assertEqual(clean_dag_id("hello"), "hello")
self.assertEqual(clean_dag_id("hello(world)"), "hello_world_")
self.assertEqual(clean_dag_id("hello-world"), "hello-world")
self.assertEqual(clean_dag_id("%%&^++hello__"), "_hello__")
assert clean_name_tag("hello") == "hello"
assert clean_name_tag("hello(world)") == "hello(world)"
assert clean_name_tag("service.pipeline") == "pipeline"
assert clean_name_tag(f"service.{'a' * 200}") == "a" * 100

0 comments on commit 44181b1

Please sign in to comment.