diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py index 27a927a188f2..d5dade5a0821 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py @@ -30,12 +30,9 @@ from metadata.generated.schema.entity.services.mlmodelService import MlModelService from metadata.generated.schema.entity.services.pipelineService import PipelineService from metadata.generated.schema.entity.services.storageService import StorageService -from metadata.generated.schema.metadataIngestion.testSuitePipeline import ( - TestSuitePipeline, -) from metadata.ingestion.models.encoders import show_secrets_encoder from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.utils.fqn import split +from metadata.utils.fqn import quote_name try: from airflow.operators.python import PythonOperator @@ -126,7 +123,7 @@ def build_source(ingestion_pipeline: IngestionPipeline) -> WorkflowSource: metadata.health_check() return WorkflowSource( type=service_type, - serviceName=ingestion_pipeline.service.name, + serviceName=quote_name(ingestion_pipeline.service.name), sourceConfig=ingestion_pipeline.sourceConfig, serviceConnection=None, # retrieved from the test suite workflow using the `sourceConfig.config.entityFullyQualifiedName` ) @@ -134,37 +131,37 @@ def build_source(ingestion_pipeline: IngestionPipeline) -> WorkflowSource: if service_type == "databaseService": entity_class = DatabaseService service: DatabaseService = metadata.get_by_name( - entity=entity_class, fqn=ingestion_pipeline.service.name + entity=entity_class, fqn=quote_name(ingestion_pipeline.service.name) ) elif service_type == "pipelineService": entity_class = PipelineService service: PipelineService = metadata.get_by_name( - entity=entity_class, fqn=ingestion_pipeline.service.name + entity=entity_class, fqn=quote_name(ingestion_pipeline.service.name) ) elif service_type == "dashboardService": entity_class = DashboardService service: DashboardService = metadata.get_by_name( - entity=entity_class, fqn=ingestion_pipeline.service.name + entity=entity_class, fqn=quote_name(ingestion_pipeline.service.name) ) elif service_type == "messagingService": entity_class = MessagingService service: MessagingService = metadata.get_by_name( - entity=entity_class, fqn=ingestion_pipeline.service.name + entity=entity_class, fqn=quote_name(ingestion_pipeline.service.name) ) elif service_type == "mlmodelService": entity_class = MlModelService service: MlModelService = metadata.get_by_name( - entity=entity_class, fqn=ingestion_pipeline.service.name + entity=entity_class, fqn=quote_name(ingestion_pipeline.service.name) ) elif service_type == "metadataService": entity_class = MetadataService service: MetadataService = metadata.get_by_name( - entity=entity_class, fqn=ingestion_pipeline.service.name + entity=entity_class, fqn=quote_name(ingestion_pipeline.service.name) ) elif service_type == "storageService": entity_class = StorageService service: StorageService = metadata.get_by_name( - entity=entity_class, fqn=ingestion_pipeline.service.name + entity=entity_class, fqn=quote_name(ingestion_pipeline.service.name) ) else: raise InvalidServiceException(f"Invalid Service Type: {service_type}") @@ -193,7 +190,9 @@ def build_source(ingestion_pipeline: IngestionPipeline) -> WorkflowSource: ) if not service: - raise GetServiceException(service_type, ingestion_pipeline.service.name) + raise GetServiceException( + service_type, quote_name(ingestion_pipeline.service.name) + ) return WorkflowSource( type=service.serviceType.value.lower(),