Skip to content

Commit

Permalink
MINOR - Add labels in OM Airflow DAGs (#16233)
Browse files Browse the repository at this point in the history
* MINOR - Add labels in OM Airflow DAGs

* update test
  • Loading branch information
pmbrull committed May 13, 2024
1 parent 4f9415f commit db3bff1
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,9 @@ def build_dag_configs(ingestion_pipeline: IngestionPipeline) -> dict:
"schedule_interval": ingestion_pipeline.airflowConfig.scheduleInterval,
"tags": [
"OpenMetadata",
ingestion_pipeline.displayName or ingestion_pipeline.name.__root__,
ingestion_pipeline.pipelineType.value,
ingestion_pipeline.service.name,
],
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@
from unittest import TestCase
from unittest.mock import patch

from openmetadata_managed_apis.workflows.ingestion.application import (
build_application_workflow_config,
)
from openmetadata_managed_apis.workflows.ingestion.lineage import (
build_lineage_workflow_config,
)
Expand All @@ -36,12 +33,6 @@
build_usage_workflow_config,
)

from metadata.generated.schema.entity.applications.configuration.applicationConfig import (
AppConfig,
)
from metadata.generated.schema.entity.applications.configuration.external.autoTaggerAppConfig import (
AutoTaggerAppConfig,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
Expand All @@ -51,10 +42,6 @@
IngestionPipeline,
PipelineType,
)
from metadata.generated.schema.metadataIngestion.applicationPipeline import (
ApplicationConfigType,
ApplicationPipeline,
)
from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import (
DatabaseServiceMetadataPipeline,
)
Expand All @@ -81,7 +68,6 @@
from metadata.ingestion.api.parser import parse_workflow_config_gracefully
from metadata.ingestion.models.encoders import show_secrets_encoder
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.workflow.application import ApplicationWorkflow
from metadata.workflow.data_quality import TestSuiteWorkflow
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
Expand Down Expand Up @@ -358,44 +344,3 @@ def test_test_suite_workflow(self):
config = json.loads(workflow_config.json(encoder=show_secrets_encoder))

parse_workflow_config_gracefully(config)

@patch.object(
ApplicationWorkflow,
"set_ingestion_pipeline_status",
mock_set_ingestion_pipeline_status,
)
def test_application_workflow(self):
"""
Validate that the ingestionPipeline can be parsed
and properly load an Application Workflow
"""

ingestion_pipeline = IngestionPipeline(
id=uuid.uuid4(),
name="test_auto_tagger_application",
pipelineType=PipelineType.application,
fullyQualifiedName="OpenMetadata.test_auto_tagger_application",
sourceConfig=SourceConfig(
config=ApplicationPipeline(
type=ApplicationConfigType.Application,
appConfig=AppConfig(
__root__=AutoTaggerAppConfig(confidenceLevel=80)
),
sourcePythonClass="metadata.applications.auto_tagger.AutoTaggerApp",
)
),
openMetadataServerConnection=self.server_config,
airflowConfig=AirflowConfig(
startDate="2022-06-10T15:06:47+00:00",
),
service=EntityReference(
id=uuid.uuid4(),
type="metadata",
name="OpenMetadata",
),
)

workflow_config = build_application_workflow_config(ingestion_pipeline)
config = json.loads(workflow_config.json(encoder=show_secrets_encoder))

ApplicationWorkflow.create(config)

0 comments on commit db3bff1

Please sign in to comment.