Skip to content

Commit

Permalink
MINOR - Airflow dag_tag PK (#16314)
Browse files Browse the repository at this point in the history
* MINOR - Airflow dag_tag PK

* MINOR - Airflow dag_tag PK
  • Loading branch information
pmbrull authored May 17, 2024
1 parent d6046c8 commit 62259a2
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -228,17 +228,17 @@ def build_workflow_config_property(

def clean_name_tag(tag: str) -> Optional[str]:
"""
Clean the tag to be used in Airflow
:param tag: tag to be cleaned
:return: cleaned tag
Clean the tag to be used in Airflow.
Airflow supports 100 characters. We'll keep just 90
since we add prefixes on the tags
"""
if not tag:
return None
try:
return fqn.split(tag)[-1][:100]
return fqn.split(tag)[-1][:90]
except Exception as exc:
logger.warning("Error cleaning tag: %s", exc)
return tag[:100]
return tag[:90]


def build_dag_configs(ingestion_pipeline: IngestionPipeline) -> dict:
Expand Down Expand Up @@ -273,8 +273,8 @@ def build_dag_configs(ingestion_pipeline: IngestionPipeline) -> dict:
"OpenMetadata",
clean_name_tag(ingestion_pipeline.displayName)
or clean_name_tag(ingestion_pipeline.name.__root__),
ingestion_pipeline.pipelineType.value,
clean_name_tag(ingestion_pipeline.service.name),
f"type:{ingestion_pipeline.pipelineType.value}",
f"service:{clean_name_tag(ingestion_pipeline.service.name)}",
],
}

Expand Down
2 changes: 1 addition & 1 deletion openmetadata-airflow-apis/tests/unit/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ def test_clean_tag():
assert clean_name_tag("hello") == "hello"
assert clean_name_tag("hello(world)") == "hello(world)"
assert clean_name_tag("service.pipeline") == "pipeline"
assert clean_name_tag(f"service.{'a' * 200}") == "a" * 100
assert clean_name_tag(f"service.{'a' * 200}") == "a" * 90

0 comments on commit 62259a2

Please sign in to comment.