From db3bff19af17b54e1ec50c88a22ef7c4c943cb1c Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Mon, 13 May 2024 12:45:20 +0200 Subject: [PATCH] MINOR - Add labels in OM Airflow DAGs (#16233) * MINOR - Add labels in OM Airflow DAGs * update test --- .../workflows/ingestion/common.py | 2 + .../test_workflow_creation.py | 55 ------------------- 2 files changed, 2 insertions(+), 55 deletions(-) diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py index f33bb027016c..06439f80e52b 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py @@ -255,7 +255,9 @@ def build_dag_configs(ingestion_pipeline: IngestionPipeline) -> dict: "schedule_interval": ingestion_pipeline.airflowConfig.scheduleInterval, "tags": [ "OpenMetadata", + ingestion_pipeline.displayName or ingestion_pipeline.name.__root__, ingestion_pipeline.pipelineType.value, + ingestion_pipeline.service.name, ], } diff --git a/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py b/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py index d5f95fada27d..a757e653ed68 100644 --- a/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py +++ b/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py @@ -17,9 +17,6 @@ from unittest import TestCase from unittest.mock import patch -from openmetadata_managed_apis.workflows.ingestion.application import ( - build_application_workflow_config, -) from openmetadata_managed_apis.workflows.ingestion.lineage import ( build_lineage_workflow_config, ) @@ -36,12 +33,6 @@ build_usage_workflow_config, ) -from metadata.generated.schema.entity.applications.configuration.applicationConfig import ( - AppConfig, -) -from metadata.generated.schema.entity.applications.configuration.external.autoTaggerAppConfig import ( - AutoTaggerAppConfig, -) from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) @@ -51,10 +42,6 @@ IngestionPipeline, PipelineType, ) -from metadata.generated.schema.metadataIngestion.applicationPipeline import ( - ApplicationConfigType, - ApplicationPipeline, -) from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import ( DatabaseServiceMetadataPipeline, ) @@ -81,7 +68,6 @@ from metadata.ingestion.api.parser import parse_workflow_config_gracefully from metadata.ingestion.models.encoders import show_secrets_encoder from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.workflow.application import ApplicationWorkflow from metadata.workflow.data_quality import TestSuiteWorkflow from metadata.workflow.metadata import MetadataWorkflow from metadata.workflow.profiler import ProfilerWorkflow @@ -358,44 +344,3 @@ def test_test_suite_workflow(self): config = json.loads(workflow_config.json(encoder=show_secrets_encoder)) parse_workflow_config_gracefully(config) - - @patch.object( - ApplicationWorkflow, - "set_ingestion_pipeline_status", - mock_set_ingestion_pipeline_status, - ) - def test_application_workflow(self): - """ - Validate that the ingestionPipeline can be parsed - and properly load an Application Workflow - """ - - ingestion_pipeline = IngestionPipeline( - id=uuid.uuid4(), - name="test_auto_tagger_application", - pipelineType=PipelineType.application, - fullyQualifiedName="OpenMetadata.test_auto_tagger_application", - sourceConfig=SourceConfig( - config=ApplicationPipeline( - type=ApplicationConfigType.Application, - appConfig=AppConfig( - __root__=AutoTaggerAppConfig(confidenceLevel=80) - ), - sourcePythonClass="metadata.applications.auto_tagger.AutoTaggerApp", - ) - ), - openMetadataServerConnection=self.server_config, - airflowConfig=AirflowConfig( - startDate="2022-06-10T15:06:47+00:00", - ), - service=EntityReference( - id=uuid.uuid4(), - type="metadata", - name="OpenMetadata", - ), - ) - - workflow_config = build_application_workflow_config(ingestion_pipeline) - config = json.loads(workflow_config.json(encoder=show_secrets_encoder)) - - ApplicationWorkflow.create(config)