diff --git a/ingestion/operators/docker/main.py b/ingestion/operators/docker/main.py index d52edfbe9a45..eb494f727ebf 100644 --- a/ingestion/operators/docker/main.py +++ b/ingestion/operators/docker/main.py @@ -15,13 +15,13 @@ import yaml -from metadata.data_quality.api.workflow import TestSuiteWorkflow from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( PipelineType, ) from metadata.generated.schema.metadataIngestion.workflow import LogLevels from metadata.utils.logger import set_loggers_level from metadata.workflow.data_insight import DataInsightWorkflow +from metadata.workflow.data_quality import TestSuiteWorkflow from metadata.workflow.metadata import MetadataWorkflow from metadata.workflow.profiler import ProfilerWorkflow from metadata.workflow.usage import UsageWorkflow diff --git a/ingestion/src/metadata/cli/dataquality.py b/ingestion/src/metadata/cli/dataquality.py index 4899a9fa9bdd..7a97317ab369 100644 --- a/ingestion/src/metadata/cli/dataquality.py +++ b/ingestion/src/metadata/cli/dataquality.py @@ -17,12 +17,12 @@ import traceback from metadata.config.common import load_config_file -from metadata.data_quality.api.workflow import TestSuiteWorkflow from metadata.utils.logger import cli_logger +from metadata.workflow.data_quality import TestSuiteWorkflow from metadata.workflow.workflow_output_handler import ( WorkflowType, print_init_error, - print_test_suite_status, + print_status, ) logger = cli_logger() @@ -48,5 +48,5 @@ def run_test(config_path: str) -> None: workflow.execute() workflow.stop() - print_test_suite_status(workflow) + print_status(workflow) workflow.raise_from_status() diff --git a/ingestion/src/metadata/data_quality/api/models.py b/ingestion/src/metadata/data_quality/api/models.py index 6b0b5541d7c5..5f5800c5a909 100644 --- a/ingestion/src/metadata/data_quality/api/models.py +++ b/ingestion/src/metadata/data_quality/api/models.py @@ -18,8 +18,13 @@ from typing import List, Optional +from pydantic import BaseModel, Field + from metadata.config.common import ConfigModel -from metadata.generated.schema.tests.testCase import TestCaseParameterValue +from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest +from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.tests.basic import TestCaseResult +from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue class TestCaseDefinition(ConfigModel): @@ -38,3 +43,27 @@ class TestSuiteProcessorConfig(ConfigModel): testCases: Optional[List[TestCaseDefinition]] = None forceUpdate: Optional[bool] = False + + +class TestCaseResultResponse(BaseModel): + testCaseResult: TestCaseResult + testCase: TestCase + + +class TableAndTests(BaseModel): + """Source response bringing together the table and test cases""" + + table: Table = Field(None, description="Table being processed by the DQ workflow") + service_type: str = Field(..., description="Service type the table belongs to") + test_cases: Optional[List[TestCase]] = Field( + None, description="Test Cases already existing in the Test Suite, if any" + ) + executable_test_suite: Optional[CreateTestSuiteRequest] = Field( + None, description="If no executable test suite is found, we'll create one" + ) + + +class TestCaseResults(BaseModel): + """Processor response with a list of computed Test Case Results""" + + test_results: Optional[List[TestCaseResultResponse]] diff --git a/ingestion/src/metadata/data_quality/api/workflow.py b/ingestion/src/metadata/data_quality/api/workflow.py deleted file mode 100644 index dadd28ebce1b..000000000000 --- a/ingestion/src/metadata/data_quality/api/workflow.py +++ /dev/null @@ -1,511 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Workflow definition for the test suite -""" - -from __future__ import annotations - -import traceback -from copy import deepcopy -from logging import Logger -from typing import List, Optional, cast - -from pydantic import BaseModel, ValidationError - -from metadata.config.common import WorkflowExecutionError -from metadata.data_quality.api.models import ( - TestCaseDefinition, - TestSuiteProcessorConfig, -) -from metadata.data_quality.source.test_suite_source_factory import ( - test_suite_source_factory, -) -from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest -from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest -from metadata.generated.schema.entity.data.table import Table -from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( - OpenMetadataConnection, -) -from metadata.generated.schema.entity.services.connections.serviceConnection import ( - ServiceConnection, -) -from metadata.generated.schema.entity.services.databaseService import DatabaseService -from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( - PipelineState, -) -from metadata.generated.schema.metadataIngestion.testSuitePipeline import ( - TestSuitePipeline, -) -from metadata.generated.schema.metadataIngestion.workflow import ( - OpenMetadataWorkflowConfig, -) -from metadata.generated.schema.tests.testCase import TestCase -from metadata.generated.schema.tests.testDefinition import TestDefinition, TestPlatform -from metadata.generated.schema.tests.testSuite import TestSuite -from metadata.generated.schema.type.basic import EntityLink, FullyQualifiedEntityName -from metadata.ingestion.api.models import StackTraceError -from metadata.ingestion.api.parser import parse_workflow_config_gracefully -from metadata.ingestion.api.processor import ProcessorStatus -from metadata.ingestion.ometa.client_utils import create_ometa_client -from metadata.utils import entity_link -from metadata.utils.fqn import split -from metadata.utils.importer import get_sink -from metadata.utils.logger import test_suite_logger -from metadata.workflow.workflow_output_handler import print_test_suite_status -from metadata.workflow.workflow_status_mixin import WorkflowStatusMixin - -logger: Logger = test_suite_logger() - - -class TestCaseToCreate(BaseModel): - """Test case to create""" - - test_suite_name: str - test_case_name: str - entity_link: str - - def __hash__(self): - """make this base model hashable on unique_name""" - return hash(f"{self.test_suite_name}.{self.test_case_name}") - - def __str__(self) -> str: - """make this base model printable""" - return f"{self.test_suite_name}.{self.test_case_name}" - - -class TestSuiteWorkflow(WorkflowStatusMixin): - """workflow to run the test suite""" - - def __init__(self, config: OpenMetadataWorkflowConfig): - """ - Instantiate test suite workflow class - - Args: - config: OM workflow configuration object - - Attributes: - config: OM workflow configuration object - """ - self.config = config - self.metadata_config: OpenMetadataConnection = ( - self.config.workflowConfig.openMetadataServerConfig - ) - self.metadata = create_ometa_client(self.metadata_config) - - self.source_config: TestSuitePipeline = self.config.source.sourceConfig.config - self.service: DatabaseService = self._retrieve_service() - self._retrieve_service_connection() - - self.processor_config: TestSuiteProcessorConfig = ( - TestSuiteProcessorConfig.parse_obj( - self.config.processor.dict().get("config") - ) - ) - - self.set_ingestion_pipeline_status(state=PipelineState.running) - - self.status = ProcessorStatus() - - self.table_entity: Optional[Table] = self._get_table_entity( - self.source_config.entityFullyQualifiedName.__root__ - ) - - if self.config.sink: - self.sink = get_sink( - sink_type=self.config.sink.type, - sink_config=self.config.sink, - metadata_config=self.metadata_config, - from_="data_quality", - ) - - @classmethod - def create(cls, config_dict) -> TestSuiteWorkflow: - """ - Instantiate a TestSuiteWorkflow object form a yaml or json config file - - Args: - config_dict: json or yaml configuration file - Returns: - a test suite workflow - """ - try: - config = parse_workflow_config_gracefully(config_dict) - return cls(config) - except ValidationError as err: - logger.error( - f"Error trying to parse the Profiler Workflow configuration: {err}" - ) - raise err - - def _retrieve_service(self) -> DatabaseService: - """Get service object from source config `entityFullyQualifiedName`""" - fully_qualified_name = self.source_config.entityFullyQualifiedName.__root__ - try: - service_name = split(fully_qualified_name)[0] - except IndexError as exc: - logger.debug(traceback.format_exc()) - raise IndexError( - f"Could not retrieve service name from entity fully qualified name {fully_qualified_name}: {exc}" - ) - try: - service = self.metadata.get_by_name(DatabaseService, service_name) - if not service: - raise ConnectionError( - f"Could not retrieve service with name `{service_name}`. " - "Typically caused by the `entityFullyQualifiedName` does not exists in OpenMetadata " - "or the JWT Token is invalid." - ) - except ConnectionError as exc: - raise exc - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.error( - f"Error getting service connection for service name [{service_name}]" - f" using the secrets manager provider [{self.metadata.config.secretsManagerProvider}]: {exc}" - ) - return service - - def _get_table_entity(self, entity_fqn: str) -> Optional[Table]: - """given an entity fqn return the table entity - - Args: - entity_fqn: entity fqn for the test case - """ - return self.metadata.get_by_name( - entity=Table, - fqn=entity_fqn, - fields=["tableProfilerConfig", "testSuite"], - ) - - def create_or_return_test_suite_entity(self) -> Optional[TestSuite]: - """ - try to get test suite name from source.servicName. - In the UI workflow we'll write the entity name (i.e. the test suite) - to source.serviceName. - """ - self.table_entity = cast(Table, self.table_entity) # satisfy type checker - test_suite = self.table_entity.testSuite - if test_suite and not test_suite.executable: - logger.debug( - f"Test suite {test_suite.fullyQualifiedName.__root__} is not executable." - ) - return None - - if self.processor_config.testCases and not test_suite: - # This should cover scenarios where we are running the tests from the CLI workflow - # and no corresponding tests suite exist in the platform. We, therefore, will need - # to create the test suite first. - logger.debug( - "Test suite name not found in the platform. Creating the test suite from processor config." - ) - test_suite = self.metadata.create_or_update_executable_test_suite( - CreateTestSuiteRequest( - name=f"{self.source_config.entityFullyQualifiedName.__root__}.TestSuite", - displayName=f"{self.source_config.entityFullyQualifiedName.__root__} Test Suite", - description="Test Suite created from YAML processor config file", - owner=None, - executableEntityReference=self.source_config.entityFullyQualifiedName.__root__, - ) - ) - - return test_suite - - def get_test_cases_from_test_suite( - self, test_suite: TestSuite - ) -> Optional[List[TestCase]]: - """ - Get test cases from test suite name - - Args: - test_suite_name: the name of the test suite - """ - test_cases = self.metadata.list_entities( - entity=TestCase, - fields=["testSuite", "entityLink", "testDefinition"], - params={"testSuiteId": test_suite.id.__root__}, - ).entities - test_cases = cast(List[TestCase], test_cases) # satisfy type checker - if self.processor_config.testCases is not None: - cli_test_cases = self.get_test_case_from_cli_config() # type: ignore - cli_test_cases = cast( - List[TestCaseDefinition], cli_test_cases - ) # satisfy type checker - test_cases = self.compare_and_create_test_cases( - cli_test_cases, test_cases, test_suite - ) - - return test_cases - - def filter_for_om_test_cases(self, test_cases: List[TestCase]) -> List[TestCase]: - """ - Filter test cases for OM test cases only. This will prevent us from running non OM test cases - - Args: - test_cases: list of test cases - """ - om_test_cases: List[TestCase] = [] - for test_case in test_cases: - test_definition: TestDefinition = self.metadata.get_by_id( - TestDefinition, test_case.testDefinition.id - ) - if TestPlatform.OpenMetadata not in test_definition.testPlatforms: - logger.debug( - f"Test case {test_case.name.__root__} is not an OpenMetadata test case." - ) - continue - om_test_cases.append(test_case) - - return om_test_cases - - def get_test_case_from_cli_config( - self, - ) -> Optional[List[TestCaseDefinition]]: - """Get all the test cases names defined in the CLI config file""" - if self.processor_config.testCases is not None: - return list(self.processor_config.testCases) - return None - - def _update_test_cases( - self, test_cases_to_update: List[TestCaseDefinition], test_cases: List[TestCase] - ): - """Given a list of CLI test definition patch test cases in the platform - - Args: - test_cases_to_update (List[TestCaseDefinition]): list of test case definitions - """ - test_cases_to_update_names = { - test_case_to_update.name for test_case_to_update in test_cases_to_update - } - for indx, test_case in enumerate(deepcopy(test_cases)): - if test_case.name.__root__ in test_cases_to_update_names: - test_case_definition = next( - test_case_to_update - for test_case_to_update in test_cases_to_update - if test_case_to_update.name == test_case.name.__root__ - ) - updated_test_case = self.metadata.patch_test_case_definition( - source=test_case, - entity_link=entity_link.get_entity_link( - self.source_config.entityFullyQualifiedName.__root__, - test_case_definition.columnName, - ), - test_case_parameter_values=test_case_definition.parameterValues, - ) - if updated_test_case: - test_cases.pop(indx) - test_cases.append(updated_test_case) - - return test_cases - - def compare_and_create_test_cases( - self, - cli_test_cases_definitions: Optional[List[TestCaseDefinition]], - test_cases: List[TestCase], - test_suite: TestSuite, - ) -> Optional[List[TestCase]]: - """ - compare test cases defined in CLI config workflow with test cases - defined on the server - - Args: - cli_test_cases_definitions: test cases defined in CLI workflow associated with its test suite - test_cases: list of test cases entities fetch from the server using test suite names in the config file - """ - if not cli_test_cases_definitions: - return test_cases - test_cases = deepcopy(test_cases) - test_case_names = {test_case.name.__root__ for test_case in test_cases} - - # we'll check the test cases defined in the CLI config file and not present in the platform - test_cases_to_create = [ - cli_test_case_definition - for cli_test_case_definition in cli_test_cases_definitions - if cli_test_case_definition.name not in test_case_names - ] - - if self.processor_config and self.processor_config.forceUpdate: - test_cases_to_update = [ - cli_test_case_definition - for cli_test_case_definition in cli_test_cases_definitions - if cli_test_case_definition.name in test_case_names - ] - test_cases = self._update_test_cases(test_cases_to_update, test_cases) - - if not test_cases_to_create: - return test_cases - - for test_case_to_create in test_cases_to_create: - logger.debug(f"Creating test case with name {test_case_to_create.name}") - try: - test_case = self.metadata.create_or_update( - CreateTestCaseRequest( - name=test_case_to_create.name, - description=test_case_to_create.description, - displayName=test_case_to_create.displayName, - testDefinition=FullyQualifiedEntityName( - __root__=test_case_to_create.testDefinitionName - ), - entityLink=EntityLink( - __root__=entity_link.get_entity_link( - self.source_config.entityFullyQualifiedName.__root__, - test_case_to_create.columnName, - ) - ), - testSuite=test_suite.fullyQualifiedName.__root__, - parameterValues=list(test_case_to_create.parameterValues) - if test_case_to_create.parameterValues - else None, - owner=None, - ) - ) - test_cases.append(test_case) - except Exception as exc: - error = ( - f"Couldn't create test case name {test_case_to_create.name}: {exc}" - ) - logger.error(error) - logger.debug(traceback.format_exc()) - self.status.failed( - StackTraceError( - name=self.source_config.entityFullyQualifiedName.__root__, - error=error, - stack_trace=traceback.format_exc(), - ) - ) - - return test_cases - - def run_test_suite(self): - """Main logic to run the tests""" - if not self.table_entity: - logger.debug(traceback.format_exc()) - raise ValueError( - f"Could not retrieve table entity for {self.source_config.entityFullyQualifiedName.__root__}. " - "Make sure the table exists in OpenMetadata and/or the JWT Token provided is valid." - ) - - test_suite = self.create_or_return_test_suite_entity() - if not test_suite: - logger.debug( - f"No test suite found for table {self.source_config.entityFullyQualifiedName.__root__} " - "or test suite is not executable." - ) - return - - test_cases = self.get_test_cases_from_test_suite(test_suite) - if not test_cases: - logger.debug( - f"No test cases found for table {self.source_config.entityFullyQualifiedName.__root__}" - f"and test suite {test_suite.fullyQualifiedName.__root__}" - ) - return - - openmetadata_test_cases = self.filter_for_om_test_cases(test_cases) - - test_suite_runner = test_suite_source_factory.create( - self.service.serviceType.value.lower(), - self.config, - self.metadata, - self.table_entity, - ).get_data_quality_runner() - - for test_case in openmetadata_test_cases: - try: - test_result = test_suite_runner.run_and_handle(test_case) - if not test_result: - continue - if hasattr(self, "sink"): - self.sink.write_record(test_result) - logger.debug(f"Successfully ran test case {test_case.name.__root__}") - self.status.processed(test_case.fullyQualifiedName.__root__) - except Exception as exc: - error = f"Could not run test case {test_case.name.__root__}: {exc}" - logger.debug(traceback.format_exc()) - logger.error(error) - self.status.failed( - StackTraceError( - name=test_case.name.__root__, - error=error, - stack_trace=traceback.format_exc(), - ) - ) - - def _retrieve_service_connection(self) -> None: - """ - We override the current `serviceConnection` source config object if source workflow service already exists - in OM. When it is configured, we retrieve the service connection from the secrets' manager. Otherwise, we get it - from the service object itself through the default `SecretsManager`. - """ - if ( - not self.config.source.serviceConnection - and not self.metadata.config.forceEntityOverwriting - ): - self.config.source.serviceConnection = ServiceConnection( - __root__=self.service.connection - ) - - def execute(self): - """Execute test suite workflow""" - try: - self.run_test_suite() - # At the end of the `execute`, update the associated Ingestion Pipeline status as success - self.set_ingestion_pipeline_status(PipelineState.success) - - # Any unhandled exception breaking the workflow should update the status - except Exception as err: - logger.debug(traceback.format_exc()) - self.set_ingestion_pipeline_status(PipelineState.failed) - raise err - - def print_status(self) -> None: - """ - Print the workflow results with click - """ - print_test_suite_status(self) - - def result_status(self) -> int: - """ - Returns 1 if status is failed, 0 otherwise. - """ - if self.status.failures or ( - hasattr(self, "sink") and self.sink.get_status().failures - ): - return 1 - return 0 - - def _raise_from_status_internal(self, raise_warnings=False): - """ - Check source, processor and sink status and raise if needed - - Our profiler source will never log any failure, only filters, - as we are just picking up data from OM. - """ - - if self.status.failures: - raise WorkflowExecutionError("Processor reported errors", self.status) - if hasattr(self, "sink") and self.sink.get_status().failures: - raise WorkflowExecutionError("Sink reported errors", self.sink.get_status()) - - if raise_warnings: - if self.status.warnings: - raise WorkflowExecutionError("Processor reported warnings", self.status) - if hasattr(self, "sink") and self.sink.get_status().warnings: - raise WorkflowExecutionError( - "Sink reported warnings", self.sink.get_status() - ) - - def stop(self): - """ - Close all connections - """ - self.metadata.close() diff --git a/ingestion/src/metadata/data_quality/processor/__init__.py b/ingestion/src/metadata/data_quality/processor/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/ingestion/src/metadata/data_quality/processor/test_case_runner.py b/ingestion/src/metadata/data_quality/processor/test_case_runner.py new file mode 100644 index 000000000000..fd9deeab06b5 --- /dev/null +++ b/ingestion/src/metadata/data_quality/processor/test_case_runner.py @@ -0,0 +1,307 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This Processor is in charge of executing the test cases +""" +import traceback +from copy import deepcopy +from typing import List, Optional, cast + +from metadata.data_quality.api.models import ( + TableAndTests, + TestCaseDefinition, + TestCaseResultResponse, + TestCaseResults, + TestSuiteProcessorConfig, +) +from metadata.data_quality.runner.core import DataTestsRunner +from metadata.data_quality.runner.test_suite_source_factory import ( + test_suite_source_factory, +) +from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest +from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataWorkflowConfig, +) +from metadata.generated.schema.tests.testCase import TestCase +from metadata.generated.schema.tests.testDefinition import TestDefinition, TestPlatform +from metadata.generated.schema.tests.testSuite import TestSuite +from metadata.generated.schema.type.basic import EntityLink, FullyQualifiedEntityName +from metadata.ingestion.api.models import Either, StackTraceError +from metadata.ingestion.api.parser import parse_workflow_config_gracefully +from metadata.ingestion.api.step import Step +from metadata.ingestion.api.steps import Processor +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.utils import entity_link, fqn +from metadata.utils.logger import test_suite_logger + +logger = test_suite_logger() + + +class TestCaseRunner(Processor): + """Execute the test suite tests and create test cases from the YAML config""" + + def __init__(self, config: OpenMetadataWorkflowConfig, metadata: OpenMetadata): + + super().__init__() + + self.config = config + self.metadata = metadata + + self.processor_config: TestSuiteProcessorConfig = ( + TestSuiteProcessorConfig.parse_obj( + self.config.processor.dict().get("config") + ) + ) + + def _run(self, record: TableAndTests) -> Either: + # First, create the executable test suite if it does not exist yet + # This could happen if the process is executed from YAML and not the UI + if record.executable_test_suite: + # We pass the test suite request to the sink + return Either(right=record.executable_test_suite) + + # Add the test cases from the YAML file, if any + test_cases = self.get_test_cases( + test_cases=record.test_cases, + test_suite_fqn=fqn.build( + None, + TestSuite, + table_fqn=record.table.fullyQualifiedName.__root__, + ), + table_fqn=record.table.fullyQualifiedName.__root__, + ) + + if not test_cases: + return Either( + left=StackTraceError( + name="No test Cases", + error=f"No tests cases found for table {record.table.fullyQualifiedName.__root__}", + ) + ) + + openmetadata_test_cases = self.filter_for_om_test_cases(test_cases) + + test_suite_runner = test_suite_source_factory.create( + record.service_type.lower(), + self.config, + self.metadata, + record.table, + ).get_data_quality_runner() + + test_results = [ + test_case_result + for test_case in openmetadata_test_cases + if (test_case_result := self._run_test_case(test_case, test_suite_runner)) + ] + + return Either(right=TestCaseResults(test_results=test_results)) + + def get_test_cases( + self, test_cases: Optional[List[TestCase]], test_suite_fqn: str, table_fqn: str + ) -> List[TestCase]: + """ + Based on the test suite test cases that we already know, pick up + the rest from the YAML config, compare and create the new ones + """ + if self.processor_config.testCases is not None: + cli_test_cases = self.get_test_case_from_cli_config() # type: ignore + cli_test_cases = cast( + List[TestCaseDefinition], cli_test_cases + ) # satisfy type checker + return self.compare_and_create_test_cases( + cli_test_cases_definitions=cli_test_cases, + test_cases=test_cases, + test_suite_fqn=test_suite_fqn, + table_fqn=table_fqn, + ) + + return test_cases + + def get_test_case_from_cli_config( + self, + ) -> Optional[List[TestCaseDefinition]]: + """Get all the test cases names defined in the CLI config file""" + if self.processor_config.testCases is not None: + return list(self.processor_config.testCases) + return None + + def compare_and_create_test_cases( + self, + cli_test_cases_definitions: Optional[List[TestCaseDefinition]], + test_cases: List[TestCase], + table_fqn: str, + test_suite_fqn: str, + ) -> List[TestCase]: + """ + compare test cases defined in CLI config workflow with test cases + defined on the server + + Args: + cli_test_cases_definitions: test cases defined in CLI workflow associated with its test suite + test_cases: list of test cases entities fetch from the server using test suite names in the config file + table_fqn: table being tested + test_suite_fqn: FQN of the table + .testSuite + """ + if not cli_test_cases_definitions: + return test_cases + test_cases = deepcopy(test_cases) or [] + test_case_names = ( + {test_case.name.__root__ for test_case in test_cases} + if test_cases + else set() + ) + + # we'll check the test cases defined in the CLI config file and not present in the platform + test_cases_to_create = [ + cli_test_case_definition + for cli_test_case_definition in cli_test_cases_definitions + if cli_test_case_definition.name not in test_case_names + ] + + if self.processor_config and self.processor_config.forceUpdate: + test_cases_to_update = [ + cli_test_case_definition + for cli_test_case_definition in cli_test_cases_definitions + if cli_test_case_definition.name in test_case_names + ] + test_cases = self._update_test_cases( + test_cases_to_update, test_cases, table_fqn + ) + + if not test_cases_to_create: + return test_cases + + for test_case_to_create in test_cases_to_create: + logger.debug(f"Creating test case with name {test_case_to_create.name}") + try: + test_case = self.metadata.create_or_update( + CreateTestCaseRequest( + name=test_case_to_create.name, + description=test_case_to_create.description, + displayName=test_case_to_create.displayName, + testDefinition=FullyQualifiedEntityName( + __root__=test_case_to_create.testDefinitionName + ), + entityLink=EntityLink( + __root__=entity_link.get_entity_link( + table_fqn, + test_case_to_create.columnName, + ) + ), + testSuite=test_suite_fqn, + parameterValues=list(test_case_to_create.parameterValues) + if test_case_to_create.parameterValues + else None, + owner=None, + ) + ) + test_cases.append(test_case) + except Exception as exc: + error = ( + f"Couldn't create test case name {test_case_to_create.name}: {exc}" + ) + logger.error(error) + logger.debug(traceback.format_exc()) + self.status.failed( + StackTraceError( + name=table_fqn, + error=error, + stack_trace=traceback.format_exc(), + ) + ) + + return test_cases + + def _update_test_cases( + self, + test_cases_to_update: List[TestCaseDefinition], + test_cases: List[TestCase], + table_fqn: str, + ): + """Given a list of CLI test definition patch test cases in the platform + + Args: + test_cases_to_update (List[TestCaseDefinition]): list of test case definitions + """ + test_cases_to_update_names = { + test_case_to_update.name for test_case_to_update in test_cases_to_update + } + for indx, test_case in enumerate(deepcopy(test_cases)): + if test_case.name.__root__ in test_cases_to_update_names: + test_case_definition = next( + test_case_to_update + for test_case_to_update in test_cases_to_update + if test_case_to_update.name == test_case.name.__root__ + ) + updated_test_case = self.metadata.patch_test_case_definition( + source=test_case, + entity_link=entity_link.get_entity_link( + table_fqn, + test_case_definition.columnName, + ), + test_case_parameter_values=test_case_definition.parameterValues, + ) + if updated_test_case: + test_cases.pop(indx) + test_cases.append(updated_test_case) + + return test_cases + + def filter_for_om_test_cases(self, test_cases: List[TestCase]) -> List[TestCase]: + """ + Filter test cases for OM test cases only. This will prevent us from running non OM test cases + + Args: + test_cases: list of test cases + """ + om_test_cases: List[TestCase] = [] + for test_case in test_cases: + test_definition: TestDefinition = self.metadata.get_by_id( + TestDefinition, test_case.testDefinition.id + ) + if TestPlatform.OpenMetadata not in test_definition.testPlatforms: + logger.debug( + f"Test case {test_case.name.__root__} is not an OpenMetadata test case." + ) + continue + om_test_cases.append(test_case) + + return om_test_cases + + def _run_test_case( + self, test_case: TestCase, test_suite_runner: DataTestsRunner + ) -> Optional[TestCaseResultResponse]: + """Execute the test case and return the result, if any""" + try: + test_result = test_suite_runner.run_and_handle(test_case) + self.status.scanned(test_case.fullyQualifiedName.__root__) + return test_result + except Exception as exc: + error = f"Could not run test case {test_case.name.__root__}: {exc}" + logger.debug(traceback.format_exc()) + logger.error(error) + self.status.failed( + StackTraceError( + name=test_case.name.__root__, + error=error, + stack_trace=traceback.format_exc(), + ) + ) + return None + + @classmethod + def create(cls, config_dict: dict, metadata: OpenMetadata) -> "Step": + config = parse_workflow_config_gracefully(config_dict) + return cls(config=config, metadata=metadata) + + def close(self) -> None: + """Nothing to close""" diff --git a/ingestion/src/metadata/data_quality/source/base_test_suite_source.py b/ingestion/src/metadata/data_quality/runner/base_test_suite_source.py similarity index 99% rename from ingestion/src/metadata/data_quality/source/base_test_suite_source.py rename to ingestion/src/metadata/data_quality/runner/base_test_suite_source.py index bd7cd9923d61..b21bbb354ae4 100644 --- a/ingestion/src/metadata/data_quality/source/base_test_suite_source.py +++ b/ingestion/src/metadata/data_quality/runner/base_test_suite_source.py @@ -36,7 +36,7 @@ NON_SQA_DATABASE_CONNECTIONS = (DatalakeConnection,) -class BaseTestSuiteSource: +class BaseTestSuiteRunner: """Base class for the data quality runner""" def __init__( diff --git a/ingestion/src/metadata/data_quality/runner/core.py b/ingestion/src/metadata/data_quality/runner/core.py index f31f6cb52b69..f53e885017a2 100644 --- a/ingestion/src/metadata/data_quality/runner/core.py +++ b/ingestion/src/metadata/data_quality/runner/core.py @@ -14,8 +14,8 @@ """ +from metadata.data_quality.api.models import TestCaseResultResponse from metadata.data_quality.interface.test_suite_interface import TestSuiteInterface -from metadata.data_quality.runner.models import TestCaseResultResponse from metadata.generated.schema.tests.testCase import TestCase from metadata.utils.logger import test_suite_logger @@ -26,15 +26,15 @@ class DataTestsRunner: """class to execute the test validation""" def __init__(self, test_runner_interface: TestSuiteInterface): - self.test_runner_interace = test_runner_interface + self.test_runner_interface = test_runner_interface def run_and_handle(self, test_case: TestCase): """run and handle test case validation""" logger.info( f"Executing test case {test_case.name.__root__} " - f"for entity {self.test_runner_interace.table_entity.fullyQualifiedName.__root__}" + f"for entity {self.test_runner_interface.table_entity.fullyQualifiedName.__root__}" ) - test_result = self.test_runner_interace.run_test_case( + test_result = self.test_runner_interface.run_test_case( test_case, ) diff --git a/ingestion/src/metadata/data_quality/runner/models.py b/ingestion/src/metadata/data_quality/runner/models.py deleted file mode 100644 index 3448b140155f..000000000000 --- a/ingestion/src/metadata/data_quality/runner/models.py +++ /dev/null @@ -1,24 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -test case result response object -""" - -from pydantic import BaseModel - -from metadata.generated.schema.tests.basic import TestCaseResult -from metadata.generated.schema.tests.testCase import TestCase - - -class TestCaseResultResponse(BaseModel): - testCaseResult: TestCaseResult - testCase: TestCase diff --git a/ingestion/src/metadata/data_quality/source/test_suite_source_factory.py b/ingestion/src/metadata/data_quality/runner/test_suite_source_factory.py similarity index 83% rename from ingestion/src/metadata/data_quality/source/test_suite_source_factory.py rename to ingestion/src/metadata/data_quality/runner/test_suite_source_factory.py index e41533552953..2b1a245d8ccd 100644 --- a/ingestion/src/metadata/data_quality/source/test_suite_source_factory.py +++ b/ingestion/src/metadata/data_quality/runner/test_suite_source_factory.py @@ -13,20 +13,20 @@ Factory class for creating test suite source objects """ -from metadata.data_quality.source.base_test_suite_source import BaseTestSuiteSource +from metadata.data_quality.runner.base_test_suite_source import BaseTestSuiteRunner -class TestSuiteSourceFactory: +class TestSuiteRunnerFactory: """Creational factory for test suite source objects""" def __init__(self): - self._source_type = {"base": BaseTestSuiteSource} + self._source_type = {"base": BaseTestSuiteRunner} def register_source(self, source_type: str, source_class): """Register a new source type""" self._source_type[source_type] = source_class - def create(self, source_type: str, *args, **kwargs) -> BaseTestSuiteSource: + def create(self, source_type: str, *args, **kwargs) -> BaseTestSuiteRunner: """Create source object based on source type""" source_class = self._source_type.get(source_type) if not source_class: @@ -35,4 +35,4 @@ def create(self, source_type: str, *args, **kwargs) -> BaseTestSuiteSource: return source_class(*args, **kwargs) -test_suite_source_factory = TestSuiteSourceFactory() +test_suite_source_factory = TestSuiteRunnerFactory() diff --git a/ingestion/src/metadata/data_quality/sink/metadata_rest.py b/ingestion/src/metadata/data_quality/sink/metadata_rest.py deleted file mode 100644 index 7ac7ed6e9f7b..000000000000 --- a/ingestion/src/metadata/data_quality/sink/metadata_rest.py +++ /dev/null @@ -1,90 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -OpenMetadata REST Sink implementation for the ORM Profiler results -""" - -import traceback -from typing import Optional - -from metadata.config.common import ConfigModel -from metadata.data_quality.runner.models import TestCaseResultResponse -from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( - OpenMetadataConnection, -) -from metadata.ingestion.api.common import Entity -from metadata.ingestion.api.models import StackTraceError -from metadata.ingestion.api.sink import Sink -from metadata.ingestion.ometa.client import APIError -from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.utils.logger import test_suite_logger - -logger = test_suite_logger() - - -class MetadataRestSinkConfig(ConfigModel): - api_endpoint: Optional[str] = None - - -class MetadataRestSink(Sink[Entity]): - """ - Metadata Sink sending the test suite - to the OM API - """ - - config: MetadataRestSinkConfig - - def __init__( - self, - config: MetadataRestSinkConfig, - metadata_config: OpenMetadataConnection, - ): - super().__init__() - self.config = config - self.metadata_config = metadata_config - self.wrote_something = False - self.metadata = OpenMetadata(self.metadata_config) - - @classmethod - def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection): - config = MetadataRestSinkConfig.parse_obj(config_dict) - return cls(config, metadata_config) - - def close(self) -> None: - self.metadata.close() - - def write_record(self, record: TestCaseResultResponse) -> None: - try: - self.metadata.add_test_case_results( - test_results=record.testCaseResult, - test_case_fqn=record.testCase.fullyQualifiedName.__root__, - ) - logger.info( - f"Successfully ingested test case results for test case {record.testCase.name.__root__}" - ) - - self.status.records_written( - f"Test Case: {record.testCase.fullyQualifiedName.__root__}" - ) - - except APIError as err: - name = record.testCase.fullyQualifiedName.__root__ - error = f"Failed to sink test case results for {name}: {err}" - logger.error(error) - logger.debug(traceback.format_exc()) - self.status.failed( - StackTraceError( - name=name, - error=error, - stack_trace=traceback.format_exc(), - ) - ) diff --git a/ingestion/src/metadata/data_quality/source/test_suite.py b/ingestion/src/metadata/data_quality/source/test_suite.py new file mode 100644 index 000000000000..4adba6acc129 --- /dev/null +++ b/ingestion/src/metadata/data_quality/source/test_suite.py @@ -0,0 +1,218 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test Suite Workflow Source + +The main goal is to get the configured table from the API. +""" +import traceback +from typing import Iterable, List, Optional, cast + +from metadata.data_quality.api.models import TableAndTests +from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest +from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.entity.services.connections.serviceConnection import ( + ServiceConnection, +) +from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.generated.schema.metadataIngestion.testSuitePipeline import ( + TestSuitePipeline, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataWorkflowConfig, +) +from metadata.generated.schema.tests.testCase import TestCase +from metadata.generated.schema.tests.testSuite import TestSuite +from metadata.ingestion.api.models import Either, StackTraceError +from metadata.ingestion.api.parser import parse_workflow_config_gracefully +from metadata.ingestion.api.step import Step +from metadata.ingestion.api.steps import Source +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.utils import fqn +from metadata.utils.fqn import split +from metadata.utils.logger import test_suite_logger + +logger = test_suite_logger() + + +class TestSuiteSource(Source): + """ + Gets the ingredients required to run the tests + """ + + def __init__( + self, + config: OpenMetadataWorkflowConfig, + metadata: OpenMetadata, + ): + super().__init__() + + self.config = config + self.metadata = metadata + + self.source_config: TestSuitePipeline = self.config.source.sourceConfig.config + + self.service: DatabaseService = self._retrieve_service() + self._retrieve_service_connection() + + self.test_connection() + + def _retrieve_service(self) -> DatabaseService: + """Get service object from source config `entityFullyQualifiedName`""" + fully_qualified_name = self.source_config.entityFullyQualifiedName.__root__ + try: + service_name = split(fully_qualified_name)[0] + except IndexError as exc: + logger.debug(traceback.format_exc()) + raise IndexError( + f"Could not retrieve service name from entity fully qualified name {fully_qualified_name}: {exc}" + ) + try: + service = self.metadata.get_by_name(DatabaseService, service_name) + if not service: + raise ConnectionError( + f"Could not retrieve service with name `{service_name}`. " + "Typically caused by the `entityFullyQualifiedName` does not exists in OpenMetadata " + "or the JWT Token is invalid." + ) + + return service + + except ConnectionError as exc: + raise exc + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.error( + f"Error getting service connection for service name [{service_name}]" + f" using the secrets manager provider [{self.metadata.config.secretsManagerProvider}]: {exc}" + ) + raise exc + + def _retrieve_service_connection(self) -> None: + """ + We override the current `serviceConnection` source config object if source workflow service already exists + in OM. When it is configured, we retrieve the service connection from the secrets' manager. Otherwise, we get it + from the service object itself through the default `SecretsManager`. + """ + if ( + not self.config.source.serviceConnection + and not self.metadata.config.forceEntityOverwriting + ): + self.config.source.serviceConnection = ServiceConnection( + __root__=self.service.connection + ) + + def _get_table_entity(self) -> Optional[Table]: + """given an entity fqn return the table entity + + Args: + entity_fqn: entity fqn for the test case + """ + table: Table = self.metadata.get_by_name( + entity=Table, + fqn=self.source_config.entityFullyQualifiedName.__root__, + fields=["tableProfilerConfig", "testSuite"], + ) + + return table + + def _get_test_cases_from_test_suite( + self, test_suite: Optional[TestSuite] + ) -> Optional[List[TestCase]]: + """Return test cases if the test suite exists and has them""" + if test_suite: + test_cases = self.metadata.list_entities( + entity=TestCase, + fields=["testSuite", "entityLink", "testDefinition"], + params={"testSuiteId": test_suite.id.__root__}, + ).entities + test_cases = cast(List[TestCase], test_cases) # satisfy type checker + + return test_cases + + return None + + def prepare(self): + """Nothing to prepare""" + + def test_connection(self) -> None: + self.metadata.health_check() + + def _iter(self) -> Iterable[Either[TableAndTests]]: + table: Table = self._get_table_entity() + + if table: + yield from self._process_table_suite(table) + + else: + yield Either( + left=StackTraceError( + name="Missing Table", + error=f"Could not retrieve table entity for {self.source_config.entityFullyQualifiedName.__root__}." + " Make sure the table exists in OpenMetadata and/or the JWT Token provided is valid.", + ) + ) + + def _process_table_suite(self, table: Table) -> Iterable[Either[TableAndTests]]: + """ + Check that the table has the proper test suite built in + """ + + # If there is no executable test suite yet for the table, we'll need to create one + executable_test_suite = None + if not table.testSuite: + executable_test_suite = CreateTestSuiteRequest( + name=fqn.build( + None, + TestSuite, + table_fqn=self.source_config.entityFullyQualifiedName.__root__, + ), + displayName=f"{self.source_config.entityFullyQualifiedName.__root__} Test Suite", + description="Test Suite created from YAML processor config file", + owner=None, + executableEntityReference=self.source_config.entityFullyQualifiedName.__root__, + ) + yield Either( + right=TableAndTests( + executable_test_suite=executable_test_suite, + service_type=self.service.serviceType.value, + ) + ) + + if table.testSuite and not table.testSuite.executable: + yield Either( + left=StackTraceError( + name="Non-executable Test Suite", + error=f"The table {self.source_config.entityFullyQualifiedName.__root__} " + "has a test suite that is not executable.", + ) + ) + + else: + + test_suite_cases = self._get_test_cases_from_test_suite(table.testSuite) + + yield Either( + right=TableAndTests( + table=table, + test_cases=test_suite_cases, + service_type=self.service.serviceType.value, + ) + ) + + @classmethod + def create(cls, config_dict: dict, metadata: OpenMetadata) -> "Step": + config = parse_workflow_config_gracefully(config_dict) + return cls(config=config, metadata=metadata) + + def close(self) -> None: + """Nothing to close""" diff --git a/ingestion/src/metadata/ingestion/api/status.py b/ingestion/src/metadata/ingestion/api/status.py index 99b5493ec6c3..a5d5dbec47a3 100644 --- a/ingestion/src/metadata/ingestion/api/status.py +++ b/ingestion/src/metadata/ingestion/api/status.py @@ -37,9 +37,13 @@ class Status(BaseModel): def scanned(self, record: Any) -> None: """ - Clean up the status results we want to show + Clean up the status results we want to show. + + We allow to not consider specific records that + are not worth keeping record of. """ - self.records.append(get_log_name(record)) + if log_name := get_log_name(record): + self.records.append(log_name) def warning(self, key: str, reason: str) -> None: self.warnings.append({key: reason}) diff --git a/ingestion/src/metadata/ingestion/api/steps.py b/ingestion/src/metadata/ingestion/api/steps.py index 9c0023f28b6d..bf79e1c8d48a 100644 --- a/ingestion/src/metadata/ingestion/api/steps.py +++ b/ingestion/src/metadata/ingestion/api/steps.py @@ -14,7 +14,6 @@ from abc import ABC, abstractmethod from typing import Any -from metadata.ingestion.api.models import Either, Entity from metadata.ingestion.api.step import BulkStep, IterStep, ReturnStep, StageStep from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils.logger import ingestion_logger @@ -51,23 +50,10 @@ def test_connection(self) -> None: class Sink(ReturnStep, ABC): """All Sinks must inherit this base class.""" - @abstractmethod - def _run(self, record: Entity) -> Either: - """ - Send the data somewhere, e.g., the OM API - """ - class Processor(ReturnStep, ABC): """All Processor must inherit this base class""" - @abstractmethod - def _run(self, record: Entity) -> Either: - """ - Post process a given entity and return it - or a new one - """ - class Stage(StageStep, ABC): """All Stages must inherit this base class.""" diff --git a/ingestion/src/metadata/ingestion/models/custom_pydantic.py b/ingestion/src/metadata/ingestion/models/custom_pydantic.py index 728fa89d867f..7c69b286c54b 100644 --- a/ingestion/src/metadata/ingestion/models/custom_pydantic.py +++ b/ingestion/src/metadata/ingestion/models/custom_pydantic.py @@ -9,9 +9,13 @@ # See the License for the specific language governing permissions and # limitations under the License. """ -Pydantic classes overwritten defaults ones of code generation -""" +Pydantic classes overwritten defaults ones of code generation. +This classes are used in the generated module, which should have NO +dependencies against any other metadata package. This class should +be self-sufficient with only pydantic at import time. +""" +import logging import warnings from typing import Any, Dict @@ -19,9 +23,7 @@ from pydantic.utils import update_not_none from pydantic.validators import constr_length_validator, str_validator -from metadata.utils.logger import ingestion_logger - -logger = ingestion_logger() +logger = logging.getLogger("metadata") class CustomSecretStr(SecretStr): @@ -101,6 +103,6 @@ def get_secret_value(self, skip_secret_manager: bool = False) -> str: ) except Exception as exc: logger.error( - f"Secret value [{secret_id}] not present in the configured secrets manages: {exc}" + f"Secret value [{secret_id}] not present in the configured secrets manager: {exc}" ) return self._secret_value diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index 48c1e38d9524..bb1ba734bf66 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -22,6 +22,7 @@ from metadata.config.common import ConfigModel from metadata.data_insight.source.metadata import DataInsightRecord +from metadata.data_quality.api.models import TestCaseResultResponse, TestCaseResults from metadata.generated.schema.analytics.reportData import ReportData from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.api.teams.createRole import CreateRoleRequest @@ -30,6 +31,7 @@ from metadata.generated.schema.api.tests.createLogicalTestCases import ( CreateLogicalTestCases, ) +from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest from metadata.generated.schema.dataInsight.kpi.basic import KpiResult from metadata.generated.schema.entity.classification.tag import Tag from metadata.generated.schema.entity.data.dashboard import Dashboard @@ -369,6 +371,18 @@ def write_test_case_results_sample( ) return Either(right=record.test_case_results) + @_run_dispatch.register + def write_test_case_results(self, record: TestCaseResultResponse): + """Write the test case result""" + res = self.metadata.add_test_case_results( + test_results=record.testCaseResult, + test_case_fqn=record.testCase.fullyQualifiedName.__root__, + ) + logger.debug( + f"Successfully ingested test case results for test case {record.testCase.name.__root__}" + ) + return Either(right=res) + @_run_dispatch.register def write_data_insight_sample( self, record: OMetaDataInsightSample @@ -379,7 +393,7 @@ def write_data_insight_sample( self.metadata.add_data_insight_report_data( record.record, ) - return Either(left=None, right=record.record) + return Either(right=record.record) @_run_dispatch.register def write_data_insight(self, record: DataInsightRecord) -> Either[ReportData]: @@ -492,6 +506,29 @@ def write_profiler_response(self, record: ProfilerResponse) -> Either[Table]: return Either(right=table) + @_run_dispatch.register + def write_executable_test_suite( + self, record: CreateTestSuiteRequest + ) -> Either[TestSuite]: + """ + From the test suite workflow we might need to create executable test suites + """ + test_suite = self.metadata.create_or_update_executable_test_suite(record) + return Either(right=test_suite) + + @_run_dispatch.register + def write_test_case_result_list(self, record: TestCaseResults): + """Record the list of test case result responses""" + + for result in record.test_results or []: + self.metadata.add_test_case_results( + test_results=result.testCaseResult, + test_case_fqn=result.testCase.fullyQualifiedName.__root__, + ) + self.status.scanned(result) + + return Either(right=record) + def close(self): """ We don't have anything to close since we are using the given metadata client diff --git a/ingestion/src/metadata/utils/fqn.py b/ingestion/src/metadata/utils/fqn.py index aef4aa55a89c..304e13d1b08c 100644 --- a/ingestion/src/metadata/utils/fqn.py +++ b/ingestion/src/metadata/utils/fqn.py @@ -41,6 +41,7 @@ from metadata.generated.schema.entity.teams.team import Team from metadata.generated.schema.entity.teams.user import User from metadata.generated.schema.tests.testCase import TestCase +from metadata.generated.schema.tests.testSuite import TestSuite from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils.dispatch import class_register from metadata.utils.elasticsearch import get_entity_from_es_result @@ -81,12 +82,15 @@ def split(s: str) -> List[str]: return splitter.split() -def _build(*args) -> str: +def _build(*args, quote: bool = True) -> str: """ Equivalent of Java's FullyQualifiedName#build """ - quoted = [quote_name(name) for name in args] - return FQN_SEPARATOR.join(quoted) + if quote: + quoted = [quote_name(name) for name in args] + return FQN_SEPARATOR.join(quoted) + + return FQN_SEPARATOR.join(args) def unquote_name(name: str) -> str: @@ -255,6 +259,16 @@ def _( return _build(service_name, mlmodel_name) +@fqn_build_registry.add(TestSuite) +def _(_: Optional[OpenMetadata], *, table_fqn: str) -> str: + """ + We don't need to quote since this comes from a table FQN. + We're replicating the backend logic of the FQN generation in the TestSuiteRepository + for executable test suites. + """ + return _build(table_fqn, "testSuite", quote=False) + + @fqn_build_registry.add(Topic) def _( _: Optional[OpenMetadata], # ES Index not necessary for Topic FQN building diff --git a/ingestion/src/metadata/utils/logger.py b/ingestion/src/metadata/utils/logger.py index a32fa92006b4..6ca065671694 100644 --- a/ingestion/src/metadata/utils/logger.py +++ b/ingestion/src/metadata/utils/logger.py @@ -18,6 +18,11 @@ from types import DynamicClassAttribute from typing import Optional, Union +from metadata.data_quality.api.models import ( + TableAndTests, + TestCaseResultResponse, + TestCaseResults, +) from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.ingestion.api.models import Entity from metadata.ingestion.models.delete_entity import DeleteEntity @@ -171,7 +176,7 @@ def log_ansi_encoded_string( @singledispatch -def get_log_name(record: Entity) -> str: +def get_log_name(record: Entity) -> Optional[str]: try: return f"{type(record).__name__} [{record.name.__root__}]" except Exception: @@ -225,3 +230,22 @@ def _(record: OMetaLifeCycleData) -> str: Capture the lifecycle changes of an Entity """ return f"{type(record.entity).__name__} Lifecycle [{record.entity.name.__root__}]" + + +@get_log_name.register +def _(record: TableAndTests) -> str: + if record.table: + return f"Tests for [{record.table.fullyQualifiedName.__root__}]" + + return f"Test Suite [{record.executable_test_suite.name.__root__}]" + + +@get_log_name.register +def _(_: TestCaseResults) -> Optional[str]: + """We don't want to log this in the status""" + return None + + +@get_log_name.register +def _(record: TestCaseResultResponse) -> str: + return record.testCase.fullyQualifiedName.__root__ diff --git a/ingestion/src/metadata/workflow/data_quality.py b/ingestion/src/metadata/workflow/data_quality.py new file mode 100644 index 000000000000..099ab305b878 --- /dev/null +++ b/ingestion/src/metadata/workflow/data_quality.py @@ -0,0 +1,50 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Workflow definition for the Data Quality +""" +from metadata.data_quality.processor.test_case_runner import TestCaseRunner +from metadata.data_quality.source.test_suite import TestSuiteSource +from metadata.ingestion.api.steps import Processor, Sink +from metadata.utils.importer import import_sink_class +from metadata.utils.logger import test_suite_logger +from metadata.workflow.base import BaseWorkflow + +logger = test_suite_logger() + + +class TestSuiteWorkflow(BaseWorkflow): + """ + DAta Quality ingestion workflow implementation + + We check the source connection test when initializing + this workflow. No need to do anything here if this does not pass + """ + + def set_steps(self): + self.source = TestSuiteSource.create(self.config.dict(), self.metadata) + + test_runner_processor = self._get_test_runner_processor() + sink = self._get_sink() + + self.steps = (test_runner_processor, sink) + + def _get_sink(self) -> Sink: + sink_type = self.config.sink.type + sink_class = import_sink_class(sink_type=sink_type) + sink_config = self.config.sink.dict().get("config", {}) + sink: Sink = sink_class.create(sink_config, self.metadata) + logger.debug(f"Sink type:{self.config.sink.type}, {sink_class} configured") + + return sink + + def _get_test_runner_processor(self) -> Processor: + return TestCaseRunner.create(self.config.dict(), self.metadata) diff --git a/ingestion/tests/integration/test_suite/test_e2e_workflow.py b/ingestion/tests/integration/test_suite/test_e2e_workflow.py index c9193dab6286..e1967eb730b8 100644 --- a/ingestion/tests/integration/test_suite/test_e2e_workflow.py +++ b/ingestion/tests/integration/test_suite/test_e2e_workflow.py @@ -20,7 +20,6 @@ import sqlalchemy as sqa from sqlalchemy.orm import Session, declarative_base -from metadata.data_quality.api.workflow import TestSuiteWorkflow from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest from metadata.generated.schema.api.data.createDatabaseSchema import ( CreateDatabaseSchemaRequest, @@ -46,11 +45,12 @@ ) from metadata.generated.schema.tests.testCase import TestCase from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.workflow.data_quality import TestSuiteWorkflow test_suite_config = { "source": { "type": "custom-database", - "serviceName": "MyRabdomWorkflow", + "serviceName": "test_suite_service_test", "sourceConfig": { "config": { "type": "TestSuite", @@ -146,7 +146,7 @@ def setUpClass(cls): ) ) - table = cls.metadata.create_or_update( + cls.metadata.create_or_update( CreateTableRequest( name="users", columns=[ @@ -214,6 +214,7 @@ def test_e2e_cli_workflow(self): """test cli workflow e2e""" workflow = TestSuiteWorkflow.create(test_suite_config) workflow.execute() + workflow.raise_from_status() test_case_1 = self.metadata.get_by_name( entity=TestCase, diff --git a/ingestion/tests/integration/test_suite/test_workflow.py b/ingestion/tests/integration/test_suite/test_workflow.py index 57806414f236..79cad960e69c 100644 --- a/ingestion/tests/integration/test_suite/test_workflow.py +++ b/ingestion/tests/integration/test_suite/test_workflow.py @@ -14,44 +14,67 @@ """ import unittest -from collections.abc import MutableSequence -from copy import deepcopy +import uuid +from typing import List -from metadata.data_quality.api.workflow import TestSuiteWorkflow -from metadata.generated.schema.entity.data.table import Table +from metadata.data_quality.api.models import TableAndTests +from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest +from metadata.generated.schema.api.data.createDatabaseSchema import ( + CreateDatabaseSchemaRequest, +) +from metadata.generated.schema.api.data.createTable import CreateTableRequest +from metadata.generated.schema.api.services.createDatabaseService import ( + CreateDatabaseServiceRequest, +) +from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest +from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest +from metadata.generated.schema.entity.data.table import Column, DataType, Table +from metadata.generated.schema.entity.services.connections.database.common.basicAuth import ( + BasicAuth, +) +from metadata.generated.schema.entity.services.connections.database.mysqlConnection import ( + MysqlConnection, +) from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) -from metadata.generated.schema.tests.testCase import TestCase -from metadata.generated.schema.tests.testSuite import TestSuite +from metadata.generated.schema.entity.services.databaseService import ( + DatabaseConnection, + DatabaseService, + DatabaseServiceType, +) +from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue +from metadata.ingestion.api.models import Either from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.workflow.data_quality import TestSuiteWorkflow sqlite_shared = "file:cachedb?mode=memory&cache=shared&check_same_thread=False" -test_suite_config = { - "source": { - "type": "custom-database", - "serviceName": "sample_data", - "sourceConfig": { - "config": { - "type": "TestSuite", - "entityFullyQualifiedName": "sample_data.ecommerce_db.shopify.dim_address", +def get_test_suite_config(service_name: str, table_name: str) -> dict: + return { + "source": { + "type": "custom-database", + "serviceName": service_name, + "sourceConfig": { + "config": { + "type": "TestSuite", + "entityFullyQualifiedName": table_name, + } + }, + }, + "processor": {"type": "orm-test-runner", "config": {}}, + "sink": {"type": "metadata-rest", "config": {}}, + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "openmetadata", + "securityConfig": { + "jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" + }, } }, - }, - "processor": {"type": "orm-test-runner", "config": {}}, - "sink": {"type": "metadata-rest", "config": {}}, - "workflowConfig": { - "openMetadataServerConfig": { - "hostPort": "http://localhost:8585/api", - "authProvider": "openmetadata", - "securityConfig": { - "jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" - }, - } - }, -} + } class TestSuiteWorkflowTests(unittest.TestCase): @@ -59,88 +82,161 @@ class TestSuiteWorkflowTests(unittest.TestCase): metadata = OpenMetadata( OpenMetadataConnection.parse_obj( - test_suite_config["workflowConfig"]["openMetadataServerConfig"] + get_test_suite_config("", "")["workflowConfig"]["openMetadataServerConfig"] ) ) test_case_ids = [] test_suite_ids = [] - def tearDown(self) -> None: - for test_case_id in self.test_case_ids: - self.metadata.delete( - entity=TestCase, - entity_id=test_case_id, - recursive=True, - hard_delete=True, + @classmethod + def setUpClass(cls) -> None: + """Prepare ingredients""" + service = CreateDatabaseServiceRequest( + name=str(uuid.uuid4()), + serviceType=DatabaseServiceType.Mysql, + connection=DatabaseConnection( + config=MysqlConnection( + username="username", + authType=BasicAuth( + password="password", + ), + hostPort="http://localhost:1234", + ) + ), + ) + cls.service_entity: DatabaseService = cls.metadata.create_or_update( + data=service + ) + + create_db = CreateDatabaseRequest( + name=str(uuid.uuid4()), + service=cls.service_entity.fullyQualifiedName, + ) + + create_db_entity = cls.metadata.create_or_update(data=create_db) + + create_schema = CreateDatabaseSchemaRequest( + name=str(uuid.uuid4()), + database=create_db_entity.fullyQualifiedName, + ) + + cls.schema_entity = cls.metadata.create_or_update(data=create_schema) + + create_table = CreateTableRequest( + name=str(uuid.uuid4()), + databaseSchema=cls.schema_entity.fullyQualifiedName, + columns=[Column(name="id", dataType=DataType.BIGINT)], + ) + + cls.table_with_suite: Table = cls.metadata.create_or_update(create_table) + + cls.test_suite = cls.metadata.create_or_update_executable_test_suite( + data=CreateTestSuiteRequest( + name="test-suite", + executableEntityReference=cls.table_with_suite.fullyQualifiedName.__root__, ) - for test_suite_id in self.test_suite_ids: - self.metadata.delete_executable_test_suite( - entity=TestSuite, - entity_id=test_suite_id, - recursive=True, - hard_delete=True, + ) + + cls.metadata.create_or_update( + CreateTestCaseRequest( + name="testCaseForIntegration", + entityLink=f"<#E::table::{cls.table_with_suite.fullyQualifiedName.__root__}>", + testSuite=cls.test_suite.fullyQualifiedName, + testDefinition="tableRowCountToEqual", + parameterValues=[TestCaseParameterValue(name="value", value=10)], ) + ) + + create_table_2 = CreateTableRequest( + name=str(uuid.uuid4()), + databaseSchema=cls.schema_entity.fullyQualifiedName, + columns=[Column(name="id", dataType=DataType.BIGINT)], + ) - self.test_case_ids = [] - self.test_suite_ids = [] + cls.table: Table = cls.metadata.create_or_update(create_table_2) + + @classmethod + def tearDownClass(cls) -> None: + """Clean up""" + cls.metadata.delete( + entity=DatabaseService, + entity_id=cls.service_entity.id, + recursive=True, + hard_delete=True, + ) def test_create_workflow_object(self): """Test workflow object is correctly instantiated""" - TestSuiteWorkflow.create(test_suite_config) - assert True + TestSuiteWorkflow.create( + get_test_suite_config( + service_name=self.service_entity.name.__root__, + table_name=self.table_with_suite.fullyQualifiedName.__root__, + ) + ) - def test_create_workflow_object_from_cli_config(self): + def test_create_workflow_object_with_table_with_test_suite(self): """test workflow object is instantiated correctly from cli config""" - _test_suite_config = deepcopy(test_suite_config) - - processor = { - "processor": { - "type": "orm-test-runner", - "config": { - "testCases": [ - { - "name": "my_test_case", - "testDefinitionName": "tableColumnCountToBeBetween", - "parameterValues": [ - {"name": "minColValue", "value": 1}, - {"name": "maxColValue", "value": 5}, - ], - } - ] - }, - } - } + workflow = TestSuiteWorkflow.create( + get_test_suite_config( + service_name=self.service_entity.name.__root__, + table_name=self.table_with_suite.fullyQualifiedName.__root__, + ) + ) - _test_suite_config.update(processor) + table: Table = workflow.source._get_table_entity() + + table_and_tests: TableAndTests = list( + workflow.source._process_table_suite(table=table) + )[0] + + # If the table already has a test suite, we won't be generating one + self.assertIsNotNone(table.testSuite) + self.assertIsNone(table_and_tests.right.executable_test_suite) + + self.assertTrue(len(table_and_tests.right.test_cases) >= 1) + # We will pick up the tests from it + self.assertTrue( + next( + ( + test + for test in table_and_tests.right.test_cases + if test.name.__root__ == "testCaseForIntegration" + ), + None, + ) + ) - workflow = TestSuiteWorkflow.create(_test_suite_config) - workflow_test_suite = workflow.create_or_return_test_suite_entity() + def test_create_workflow_config_with_table_without_suite(self): + """We'll prepare the test suite creation payload""" - test_suite = self.metadata.get_by_name( - entity=TestSuite, - fqn="sample_data.ecommerce_db.shopify.dim_address.testSuite", + workflow = TestSuiteWorkflow.create( + get_test_suite_config( + service_name=self.service_entity.name.__root__, + table_name=self.table.fullyQualifiedName.__root__, + ) ) - assert workflow_test_suite.id == test_suite.id - self.test_suite_ids = [test_suite.id] + # If the table does not have a test suite, we'll prepare the request to create one + table: Table = workflow.source._get_table_entity() - def test_create_or_return_test_suite_entity(self): - """test we can correctly retrieve a test suite""" - _test_suite_config = deepcopy(test_suite_config) + table_and_tests: Either[TableAndTests] = list( + workflow.source._process_table_suite(table=table) + )[0] - workflow = TestSuiteWorkflow.create(_test_suite_config) - test_suite = workflow.create_or_return_test_suite_entity() - - expected_test_suite = self.metadata.get_by_name( - entity=TestSuite, fqn="critical_metrics_suite" + self.assertIsNone(table.testSuite) + self.assertEquals( + table_and_tests.right.executable_test_suite.name.__root__, + self.table.fullyQualifiedName.__root__ + ".testSuite", ) - assert test_suite + def test_create_workflow_config_with_tests(self): + """We'll get the tests from the workflow YAML""" - def test_get_test_cases_from_test_suite(self): - """test test cases are correctly returned for specific test suite""" - _test_suite_config = deepcopy(test_suite_config) + _test_suite_config = get_test_suite_config( + service_name=self.service_entity.name.__root__, + table_name=self.table_with_suite.fullyQualifiedName.__root__, + ) processor = { "processor": { @@ -161,23 +257,42 @@ def test_get_test_cases_from_test_suite(self): } _test_suite_config.update(processor) - workflow = TestSuiteWorkflow.create(_test_suite_config) - test_suite = workflow.create_or_return_test_suite_entity() - test_cases = workflow.get_test_cases_from_test_suite(test_suite) - assert isinstance(test_cases, MutableSequence) - assert isinstance(test_cases[0], TestCase) - assert {"my_test_case"}.intersection( - {test_case.name.__root__ for test_case in test_cases} + table: Table = workflow.source._get_table_entity() + table_and_tests: Either[TableAndTests] = list( + workflow.source._process_table_suite(table=table) + )[0] + + test_cases: List[TestCase] = workflow.steps[0].get_test_cases( + test_cases=table_and_tests.right.test_cases, + test_suite_fqn=self.table_with_suite.fullyQualifiedName.__root__ + + ".testSuite", + table_fqn=self.table_with_suite.fullyQualifiedName.__root__, + ) + + # 1 defined test cases + the new one in the YAML + self.assertTrue(len(table_and_tests.right.test_cases) >= 1) + + new_test_case = next( + (test for test in test_cases if test.name.__root__ == "my_test_case"), None ) + self.assertIsNotNone(new_test_case) - for test_case in test_cases: - self.metadata.delete(entity=TestCase, entity_id=test_case.id) + # cleanup + self.metadata.delete( + entity=TestCase, + entity_id=new_test_case.id, + recursive=True, + hard_delete=True, + ) def test_get_test_case_names_from_cli_config(self): """test we can get all test case names from cli config""" - _test_suite_config = deepcopy(test_suite_config) + _test_suite_config = get_test_suite_config( + service_name=self.service_entity.name.__root__, + table_name=self.table_with_suite.fullyQualifiedName.__root__, + ) processor = { "processor": { @@ -208,7 +323,7 @@ def test_get_test_case_names_from_cli_config(self): _test_suite_config.update(processor) workflow = TestSuiteWorkflow.create(_test_suite_config) - test_cases_def = workflow.get_test_case_from_cli_config() + test_cases_def = workflow.steps[0].get_test_case_from_cli_config() assert [test_case_def.name for test_case_def in test_cases_def] == [ "my_test_case", @@ -217,7 +332,10 @@ def test_get_test_case_names_from_cli_config(self): def test_compare_and_create_test_cases(self): """Test function creates the correct test case if they don't exists""" - _test_suite_config = deepcopy(test_suite_config) + _test_suite_config = get_test_suite_config( + service_name=self.service_entity.name.__root__, + table_name=self.table_with_suite.fullyQualifiedName.__root__, + ) processor = { "processor": { @@ -235,7 +353,7 @@ def test_compare_and_create_test_cases(self): { "name": "my_test_case_two", "testDefinitionName": "columnValuesToBeBetween", - "columnName": "address_id", + "columnName": "id", "parameterValues": [ {"name": "minValue", "value": 1}, {"name": "maxValue", "value": 5}, @@ -251,62 +369,44 @@ def test_compare_and_create_test_cases(self): assert not self.metadata.get_by_name( entity=TestCase, - fqn="sample_data.ecommerce_db.shopify.dim_address.my_test_case", + fqn=f"{self.table_with_suite.fullyQualifiedName.__root__}.my_test_case", ) assert not self.metadata.get_by_name( entity=TestCase, - fqn="sample_data.ecommerce_db.shopify.dim_address.address_id.my_test_case_two", + fqn=f"{self.table_with_suite.fullyQualifiedName.__root__}.my_test_case_two", ) - test_suite = workflow.create_or_return_test_suite_entity() - test_cases = self.metadata.list_entities( - entity=TestCase, - fields=["testSuite", "entityLink", "testDefinition"], - params={"testSuiteId": test_suite.id.__root__}, - ).entities - config_test_cases_def = workflow.get_test_case_from_cli_config() - created_test_case = workflow.compare_and_create_test_cases( - config_test_cases_def, test_cases, test_suite + table: Table = workflow.source._get_table_entity() + table_and_tests: Either[TableAndTests] = list( + workflow.source._process_table_suite(table=table) + )[0] + + config_test_cases_def = workflow.steps[0].get_test_case_from_cli_config() + created_test_case = workflow.steps[0].compare_and_create_test_cases( + cli_test_cases_definitions=config_test_cases_def, + test_cases=table_and_tests.right.test_cases, + test_suite_fqn=f"{self.table_with_suite.fullyQualifiedName.__root__}.testSuite", + table_fqn=self.table_with_suite.fullyQualifiedName.__root__, ) # clean up test my_test_case = self.metadata.get_by_name( entity=TestCase, - fqn="sample_data.ecommerce_db.shopify.dim_address.my_test_case", + fqn=f"{self.table_with_suite.fullyQualifiedName.__root__}.my_test_case", fields=["testDefinition", "testSuite"], ) my_test_case_two = self.metadata.get_by_name( entity=TestCase, - fqn="sample_data.ecommerce_db.shopify.dim_address.address_id.my_test_case_two", + fqn=f"{self.table_with_suite.fullyQualifiedName.__root__}.id.my_test_case_two", fields=["testDefinition", "testSuite"], ) assert my_test_case assert my_test_case_two - assert len(created_test_case) == 2 + # We return the existing test & the 2 new ones + assert len(created_test_case) >= 3 self.metadata.delete(entity=TestCase, entity_id=my_test_case.id) self.metadata.delete(entity=TestCase, entity_id=my_test_case_two.id) - - def test_get_table_entity(self): - """test get service connection returns correct info""" - workflow = TestSuiteWorkflow.create(test_suite_config) - service_connection = workflow._get_table_entity( - "sample_data.ecommerce_db.shopify.dim_address" - ) - - assert isinstance(service_connection, Table) - - # def test_filter_for_om_test_cases(self): - # """test filter for OM test cases method""" - # om_test_case_1 = TestCase( - # name="om_test_case_1", - # testDefinition=self.metadata.get_entity_reference( - # TestDefinition, - # "columnValuesToMatchRegex" - # ), - # entityLink="", - # testSuite=self.metadata.get_entity_reference("sample_data.ecommerce_db.shopify.dim_address.TestSuite"), - # ) diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py index 0cc0c2d3b217..a1e022111599 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py @@ -17,7 +17,6 @@ from openmetadata_managed_apis.utils.logger import set_operator_logger from openmetadata_managed_apis.workflows.ingestion.common import build_dag, build_source -from metadata.data_quality.api.workflow import TestSuiteWorkflow from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( IngestionPipeline, ) @@ -29,6 +28,7 @@ WorkflowConfig, ) from metadata.ingestion.models.encoders import show_secrets_encoder +from metadata.workflow.data_quality import TestSuiteWorkflow from metadata.workflow.workflow_output_handler import print_test_suite_status diff --git a/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py b/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py index 0dc337c1bc03..cf9be2d7863e 100644 --- a/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py +++ b/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py @@ -33,7 +33,6 @@ build_usage_workflow_config, ) -from metadata.data_quality.api.workflow import TestSuiteWorkflow from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) @@ -69,6 +68,7 @@ from metadata.ingestion.api.parser import parse_workflow_config_gracefully from metadata.ingestion.models.encoders import show_secrets_encoder from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.workflow.data_quality import TestSuiteWorkflow from metadata.workflow.metadata import MetadataWorkflow from metadata.workflow.profiler import ProfilerWorkflow