Skip to content

Commit

Permalink
Added quotes for dots
Browse files Browse the repository at this point in the history
  • Loading branch information
OnkarVO7 committed Jun 29, 2023
1 parent decec8a commit ff597ca
Showing 1 changed file with 12 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,9 @@
from metadata.generated.schema.entity.services.mlmodelService import MlModelService
from metadata.generated.schema.entity.services.pipelineService import PipelineService
from metadata.generated.schema.entity.services.storageService import StorageService
from metadata.generated.schema.metadataIngestion.testSuitePipeline import (
TestSuitePipeline,
)
from metadata.ingestion.models.encoders import show_secrets_encoder
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.fqn import split
from metadata.utils.fqn import quote_name

try:
from airflow.operators.python import PythonOperator
Expand Down Expand Up @@ -126,45 +123,45 @@ def build_source(ingestion_pipeline: IngestionPipeline) -> WorkflowSource:
metadata.health_check()
return WorkflowSource(
type=service_type,
serviceName=ingestion_pipeline.service.name,
serviceName=quote_name(ingestion_pipeline.service.name),
sourceConfig=ingestion_pipeline.sourceConfig,
serviceConnection=None, # retrieved from the test suite workflow using the `sourceConfig.config.entityFullyQualifiedName`
)

if service_type == "databaseService":
entity_class = DatabaseService
service: DatabaseService = metadata.get_by_name(
entity=entity_class, fqn=ingestion_pipeline.service.name
entity=entity_class, fqn=quote_name(ingestion_pipeline.service.name)
)
elif service_type == "pipelineService":
entity_class = PipelineService
service: PipelineService = metadata.get_by_name(
entity=entity_class, fqn=ingestion_pipeline.service.name
entity=entity_class, fqn=quote_name(ingestion_pipeline.service.name)
)
elif service_type == "dashboardService":
entity_class = DashboardService
service: DashboardService = metadata.get_by_name(
entity=entity_class, fqn=ingestion_pipeline.service.name
entity=entity_class, fqn=quote_name(ingestion_pipeline.service.name)
)
elif service_type == "messagingService":
entity_class = MessagingService
service: MessagingService = metadata.get_by_name(
entity=entity_class, fqn=ingestion_pipeline.service.name
entity=entity_class, fqn=quote_name(ingestion_pipeline.service.name)
)
elif service_type == "mlmodelService":
entity_class = MlModelService
service: MlModelService = metadata.get_by_name(
entity=entity_class, fqn=ingestion_pipeline.service.name
entity=entity_class, fqn=quote_name(ingestion_pipeline.service.name)
)
elif service_type == "metadataService":
entity_class = MetadataService
service: MetadataService = metadata.get_by_name(
entity=entity_class, fqn=ingestion_pipeline.service.name
entity=entity_class, fqn=quote_name(ingestion_pipeline.service.name)
)
elif service_type == "storageService":
entity_class = StorageService
service: StorageService = metadata.get_by_name(
entity=entity_class, fqn=ingestion_pipeline.service.name
entity=entity_class, fqn=quote_name(ingestion_pipeline.service.name)
)
else:
raise InvalidServiceException(f"Invalid Service Type: {service_type}")
Expand Down Expand Up @@ -193,7 +190,9 @@ def build_source(ingestion_pipeline: IngestionPipeline) -> WorkflowSource:
)

if not service:
raise GetServiceException(service_type, ingestion_pipeline.service.name)
raise GetServiceException(
service_type, quote_name(ingestion_pipeline.service.name)
)

return WorkflowSource(
type=service.serviceType.value.lower(),
Expand Down

0 comments on commit ff597ca

Please sign in to comment.