diff --git a/ingestion/operators/docker/main.py b/ingestion/operators/docker/main.py index d52edfbe9a45..eb494f727ebf 100644 --- a/ingestion/operators/docker/main.py +++ b/ingestion/operators/docker/main.py @@ -15,13 +15,13 @@ import yaml -from metadata.data_quality.api.workflow import TestSuiteWorkflow from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( PipelineType, ) from metadata.generated.schema.metadataIngestion.workflow import LogLevels from metadata.utils.logger import set_loggers_level from metadata.workflow.data_insight import DataInsightWorkflow +from metadata.workflow.data_quality import TestSuiteWorkflow from metadata.workflow.metadata import MetadataWorkflow from metadata.workflow.profiler import ProfilerWorkflow from metadata.workflow.usage import UsageWorkflow diff --git a/ingestion/src/metadata/cli/dataquality.py b/ingestion/src/metadata/cli/dataquality.py index 4899a9fa9bdd..7a97317ab369 100644 --- a/ingestion/src/metadata/cli/dataquality.py +++ b/ingestion/src/metadata/cli/dataquality.py @@ -17,12 +17,12 @@ import traceback from metadata.config.common import load_config_file -from metadata.data_quality.api.workflow import TestSuiteWorkflow from metadata.utils.logger import cli_logger +from metadata.workflow.data_quality import TestSuiteWorkflow from metadata.workflow.workflow_output_handler import ( WorkflowType, print_init_error, - print_test_suite_status, + print_status, ) logger = cli_logger() @@ -48,5 +48,5 @@ def run_test(config_path: str) -> None: workflow.execute() workflow.stop() - print_test_suite_status(workflow) + print_status(workflow) workflow.raise_from_status() diff --git a/ingestion/src/metadata/data_quality/api/models.py b/ingestion/src/metadata/data_quality/api/models.py index 6b0b5541d7c5..5f5800c5a909 100644 --- a/ingestion/src/metadata/data_quality/api/models.py +++ b/ingestion/src/metadata/data_quality/api/models.py @@ -18,8 +18,13 @@ from typing import List, Optional +from pydantic import BaseModel, Field + from metadata.config.common import ConfigModel -from metadata.generated.schema.tests.testCase import TestCaseParameterValue +from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest +from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.tests.basic import TestCaseResult +from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue class TestCaseDefinition(ConfigModel): @@ -38,3 +43,27 @@ class TestSuiteProcessorConfig(ConfigModel): testCases: Optional[List[TestCaseDefinition]] = None forceUpdate: Optional[bool] = False + + +class TestCaseResultResponse(BaseModel): + testCaseResult: TestCaseResult + testCase: TestCase + + +class TableAndTests(BaseModel): + """Source response bringing together the table and test cases""" + + table: Table = Field(None, description="Table being processed by the DQ workflow") + service_type: str = Field(..., description="Service type the table belongs to") + test_cases: Optional[List[TestCase]] = Field( + None, description="Test Cases already existing in the Test Suite, if any" + ) + executable_test_suite: Optional[CreateTestSuiteRequest] = Field( + None, description="If no executable test suite is found, we'll create one" + ) + + +class TestCaseResults(BaseModel): + """Processor response with a list of computed Test Case Results""" + + test_results: Optional[List[TestCaseResultResponse]] diff --git a/ingestion/src/metadata/data_quality/api/workflow.py b/ingestion/src/metadata/data_quality/api/workflow.py deleted file mode 100644 index d9e1461f770f..000000000000 --- a/ingestion/src/metadata/data_quality/api/workflow.py +++ /dev/null @@ -1,494 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Workflow definition for the test suite -""" - -from __future__ import annotations - -import traceback -from copy import deepcopy -from logging import Logger -from typing import List, Optional, cast - -from pydantic import ValidationError - -from metadata.config.common import WorkflowExecutionError -from metadata.data_quality.api.models import ( - TestCaseDefinition, - TestSuiteProcessorConfig, -) -from metadata.data_quality.runner.test_suite_source_factory import ( - test_suite_source_factory, -) -from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest -from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest -from metadata.generated.schema.entity.data.table import Table -from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( - OpenMetadataConnection, -) -from metadata.generated.schema.entity.services.connections.serviceConnection import ( - ServiceConnection, -) -from metadata.generated.schema.entity.services.databaseService import DatabaseService -from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( - PipelineState, -) -from metadata.generated.schema.metadataIngestion.testSuitePipeline import ( - TestSuitePipeline, -) -from metadata.generated.schema.metadataIngestion.workflow import ( - OpenMetadataWorkflowConfig, -) -from metadata.generated.schema.tests.testCase import TestCase -from metadata.generated.schema.tests.testDefinition import TestDefinition, TestPlatform -from metadata.generated.schema.tests.testSuite import TestSuite -from metadata.generated.schema.type.basic import EntityLink, FullyQualifiedEntityName -from metadata.ingestion.api.models import StackTraceError -from metadata.ingestion.api.parser import parse_workflow_config_gracefully -from metadata.ingestion.api.processor import ProcessorStatus -from metadata.ingestion.ometa.client_utils import create_ometa_client -from metadata.utils import entity_link -from metadata.utils.fqn import split -from metadata.utils.importer import get_sink -from metadata.utils.logger import test_suite_logger -from metadata.workflow.workflow_output_handler import print_test_suite_status -from metadata.workflow.workflow_status_mixin import WorkflowStatusMixin - -logger: Logger = test_suite_logger() - - -class TestSuiteWorkflow(WorkflowStatusMixin): - """workflow to run the test suite""" - - def __init__(self, config: OpenMetadataWorkflowConfig): - """ - Instantiate test suite workflow class - - Args: - config: OM workflow configuration object - - Attributes: - config: OM workflow configuration object - """ - self.config = config - self.metadata_config: OpenMetadataConnection = ( - self.config.workflowConfig.openMetadataServerConfig - ) - self.metadata = create_ometa_client(self.metadata_config) - - self.source_config: TestSuitePipeline = self.config.source.sourceConfig.config - self.service: DatabaseService = self._retrieve_service() - self._retrieve_service_connection() - - self.processor_config: TestSuiteProcessorConfig = ( - TestSuiteProcessorConfig.parse_obj( - self.config.processor.dict().get("config") - ) - ) - - self.set_ingestion_pipeline_status(state=PipelineState.running) - - self.status = ProcessorStatus() - - self.table_entity: Optional[Table] = self._get_table_entity( - self.source_config.entityFullyQualifiedName.__root__ - ) - - if self.config.sink: - self.sink = get_sink( - sink_type=self.config.sink.type, - sink_config=self.config.sink, - metadata_config=self.metadata_config, - ) - - @classmethod - def create(cls, config_dict) -> TestSuiteWorkflow: - """ - Instantiate a TestSuiteWorkflow object form a yaml or json config file - - Args: - config_dict: json or yaml configuration file - Returns: - a test suite workflow - """ - try: - config = parse_workflow_config_gracefully(config_dict) - return cls(config) - except ValidationError as err: - logger.error( - f"Error trying to parse the Profiler Workflow configuration: {err}" - ) - raise err - - def _retrieve_service(self) -> DatabaseService: - """Get service object from source config `entityFullyQualifiedName`""" - fully_qualified_name = self.source_config.entityFullyQualifiedName.__root__ - try: - service_name = split(fully_qualified_name)[0] - except IndexError as exc: - logger.debug(traceback.format_exc()) - raise IndexError( - f"Could not retrieve service name from entity fully qualified name {fully_qualified_name}: {exc}" - ) - try: - service = self.metadata.get_by_name(DatabaseService, service_name) - if not service: - raise ConnectionError( - f"Could not retrieve service with name `{service_name}`. " - "Typically caused by the `entityFullyQualifiedName` does not exists in OpenMetadata " - "or the JWT Token is invalid." - ) - except ConnectionError as exc: - raise exc - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.error( - f"Error getting service connection for service name [{service_name}]" - f" using the secrets manager provider [{self.metadata.config.secretsManagerProvider}]: {exc}" - ) - return service - - def _get_table_entity(self, entity_fqn: str) -> Optional[Table]: - """given an entity fqn return the table entity - - Args: - entity_fqn: entity fqn for the test case - """ - return self.metadata.get_by_name( - entity=Table, - fqn=entity_fqn, - fields=["tableProfilerConfig", "testSuite"], - ) - - def create_or_return_test_suite_entity(self) -> Optional[TestSuite]: - """ - try to get test suite name from source.servicName. - In the UI workflow we'll write the entity name (i.e. the test suite) - to source.serviceName. - """ - self.table_entity = cast(Table, self.table_entity) # satisfy type checker - test_suite = self.table_entity.testSuite - if test_suite and not test_suite.executable: - logger.debug( - f"Test suite {test_suite.fullyQualifiedName.__root__} is not executable." - ) - return None - - if self.processor_config.testCases and not test_suite: - # This should cover scenarios where we are running the tests from the CLI workflow - # and no corresponding tests suite exist in the platform. We, therefore, will need - # to create the test suite first. - logger.debug( - "Test suite name not found in the platform. Creating the test suite from processor config." - ) - test_suite = self.metadata.create_or_update_executable_test_suite( - CreateTestSuiteRequest( - name=f"{self.source_config.entityFullyQualifiedName.__root__}.TestSuite", - displayName=f"{self.source_config.entityFullyQualifiedName.__root__} Test Suite", - description="Test Suite created from YAML processor config file", - owner=None, - executableEntityReference=self.source_config.entityFullyQualifiedName.__root__, - ) - ) - - return test_suite - - def get_test_cases_from_test_suite( - self, test_suite: TestSuite - ) -> Optional[List[TestCase]]: - """ - Get test cases from test suite name - - Args: - test_suite_name: the name of the test suite - """ - test_cases = self.metadata.list_entities( - entity=TestCase, - fields=["testSuite", "entityLink", "testDefinition"], - params={"testSuiteId": test_suite.id.__root__}, - ).entities - test_cases = cast(List[TestCase], test_cases) # satisfy type checker - if self.processor_config.testCases is not None: - cli_test_cases = self.get_test_case_from_cli_config() # type: ignore - cli_test_cases = cast( - List[TestCaseDefinition], cli_test_cases - ) # satisfy type checker - test_cases = self.compare_and_create_test_cases( - cli_test_cases, test_cases, test_suite - ) - - return test_cases - - def filter_for_om_test_cases(self, test_cases: List[TestCase]) -> List[TestCase]: - """ - Filter test cases for OM test cases only. This will prevent us from running non OM test cases - - Args: - test_cases: list of test cases - """ - om_test_cases: List[TestCase] = [] - for test_case in test_cases: - test_definition: TestDefinition = self.metadata.get_by_id( - TestDefinition, test_case.testDefinition.id - ) - if TestPlatform.OpenMetadata not in test_definition.testPlatforms: - logger.debug( - f"Test case {test_case.name.__root__} is not an OpenMetadata test case." - ) - continue - om_test_cases.append(test_case) - - return om_test_cases - - def get_test_case_from_cli_config( - self, - ) -> Optional[List[TestCaseDefinition]]: - """Get all the test cases names defined in the CLI config file""" - if self.processor_config.testCases is not None: - return list(self.processor_config.testCases) - return None - - def _update_test_cases( - self, test_cases_to_update: List[TestCaseDefinition], test_cases: List[TestCase] - ): - """Given a list of CLI test definition patch test cases in the platform - - Args: - test_cases_to_update (List[TestCaseDefinition]): list of test case definitions - """ - test_cases_to_update_names = { - test_case_to_update.name for test_case_to_update in test_cases_to_update - } - for indx, test_case in enumerate(deepcopy(test_cases)): - if test_case.name.__root__ in test_cases_to_update_names: - test_case_definition = next( - test_case_to_update - for test_case_to_update in test_cases_to_update - if test_case_to_update.name == test_case.name.__root__ - ) - updated_test_case = self.metadata.patch_test_case_definition( - source=test_case, - entity_link=entity_link.get_entity_link( - self.source_config.entityFullyQualifiedName.__root__, - test_case_definition.columnName, - ), - test_case_parameter_values=test_case_definition.parameterValues, - ) - if updated_test_case: - test_cases.pop(indx) - test_cases.append(updated_test_case) - - return test_cases - - def compare_and_create_test_cases( - self, - cli_test_cases_definitions: Optional[List[TestCaseDefinition]], - test_cases: List[TestCase], - test_suite: TestSuite, - ) -> Optional[List[TestCase]]: - """ - compare test cases defined in CLI config workflow with test cases - defined on the server - - Args: - cli_test_cases_definitions: test cases defined in CLI workflow associated with its test suite - test_cases: list of test cases entities fetch from the server using test suite names in the config file - """ - if not cli_test_cases_definitions: - return test_cases - test_cases = deepcopy(test_cases) - test_case_names = {test_case.name.__root__ for test_case in test_cases} - - # we'll check the test cases defined in the CLI config file and not present in the platform - test_cases_to_create = [ - cli_test_case_definition - for cli_test_case_definition in cli_test_cases_definitions - if cli_test_case_definition.name not in test_case_names - ] - - if self.processor_config and self.processor_config.forceUpdate: - test_cases_to_update = [ - cli_test_case_definition - for cli_test_case_definition in cli_test_cases_definitions - if cli_test_case_definition.name in test_case_names - ] - test_cases = self._update_test_cases(test_cases_to_update, test_cases) - - if not test_cases_to_create: - return test_cases - - for test_case_to_create in test_cases_to_create: - logger.debug(f"Creating test case with name {test_case_to_create.name}") - try: - test_case = self.metadata.create_or_update( - CreateTestCaseRequest( - name=test_case_to_create.name, - description=test_case_to_create.description, - displayName=test_case_to_create.displayName, - testDefinition=FullyQualifiedEntityName( - __root__=test_case_to_create.testDefinitionName - ), - entityLink=EntityLink( - __root__=entity_link.get_entity_link( - self.source_config.entityFullyQualifiedName.__root__, - test_case_to_create.columnName, - ) - ), - testSuite=test_suite.fullyQualifiedName.__root__, - parameterValues=list(test_case_to_create.parameterValues) - if test_case_to_create.parameterValues - else None, - owner=None, - ) - ) - test_cases.append(test_case) - except Exception as exc: - error = ( - f"Couldn't create test case name {test_case_to_create.name}: {exc}" - ) - logger.error(error) - logger.debug(traceback.format_exc()) - self.status.failed( - StackTraceError( - name=self.source_config.entityFullyQualifiedName.__root__, - error=error, - stack_trace=traceback.format_exc(), - ) - ) - - return test_cases - - def run_test_suite(self): - """Main logic to run the tests""" - if not self.table_entity: - logger.debug(traceback.format_exc()) - raise ValueError( - f"Could not retrieve table entity for {self.source_config.entityFullyQualifiedName.__root__}. " - "Make sure the table exists in OpenMetadata and/or the JWT Token provided is valid." - ) - - test_suite = self.create_or_return_test_suite_entity() - if not test_suite: - logger.debug( - f"No test suite found for table {self.source_config.entityFullyQualifiedName.__root__} " - "or test suite is not executable." - ) - return - - test_cases = self.get_test_cases_from_test_suite(test_suite) - if not test_cases: - logger.debug( - f"No test cases found for table {self.source_config.entityFullyQualifiedName.__root__}" - f"and test suite {test_suite.fullyQualifiedName.__root__}" - ) - return - - openmetadata_test_cases = self.filter_for_om_test_cases(test_cases) - - test_suite_runner = test_suite_source_factory.create( - self.service.serviceType.value.lower(), - self.config, - self.metadata, - self.table_entity, - ).get_data_quality_runner() - - for test_case in openmetadata_test_cases: - try: - test_result = test_suite_runner.run_and_handle(test_case) - if not test_result: - continue - if hasattr(self, "sink"): - self.sink.run(test_result) - logger.debug(f"Successfully ran test case {test_case.name.__root__}") - self.status.processed(test_case.fullyQualifiedName.__root__) - except Exception as exc: - error = f"Could not run test case {test_case.name.__root__}: {exc}" - logger.debug(traceback.format_exc()) - logger.error(error) - self.status.failed( - StackTraceError( - name=test_case.name.__root__, - error=error, - stack_trace=traceback.format_exc(), - ) - ) - - def _retrieve_service_connection(self) -> None: - """ - We override the current `serviceConnection` source config object if source workflow service already exists - in OM. When it is configured, we retrieve the service connection from the secrets' manager. Otherwise, we get it - from the service object itself through the default `SecretsManager`. - """ - if ( - not self.config.source.serviceConnection - and not self.metadata.config.forceEntityOverwriting - ): - self.config.source.serviceConnection = ServiceConnection( - __root__=self.service.connection - ) - - def execute(self): - """Execute test suite workflow""" - try: - self.run_test_suite() - # At the end of the `execute`, update the associated Ingestion Pipeline status as success - self.set_ingestion_pipeline_status(PipelineState.success) - - # Any unhandled exception breaking the workflow should update the status - except Exception as err: - logger.debug(traceback.format_exc()) - self.set_ingestion_pipeline_status(PipelineState.failed) - raise err - - def print_status(self) -> None: - """ - Print the workflow results with click - """ - print_test_suite_status(self) - - def result_status(self) -> int: - """ - Returns 1 if status is failed, 0 otherwise. - """ - if self.status.failures or ( - hasattr(self, "sink") and self.sink.get_status().failures - ): - return 1 - return 0 - - def _raise_from_status_internal(self, raise_warnings=False): - """ - Check source, processor and sink status and raise if needed - - Our profiler source will never log any failure, only filters, - as we are just picking up data from OM. - """ - - if self.status.failures: - raise WorkflowExecutionError("Processor reported errors", self.status) - if hasattr(self, "sink") and self.sink.get_status().failures: - raise WorkflowExecutionError("Sink reported errors", self.sink.get_status()) - - if raise_warnings: - if self.status.warnings: - raise WorkflowExecutionError("Processor reported warnings", self.status) - if hasattr(self, "sink") and self.sink.get_status().warnings: - raise WorkflowExecutionError( - "Sink reported warnings", self.sink.get_status() - ) - - def stop(self): - """ - Close all connections - """ - self.metadata.close() diff --git a/ingestion/src/metadata/data_quality/processor/test_case_runner.py b/ingestion/src/metadata/data_quality/processor/test_case_runner.py index 9af57ff5371e..2bb2bc97e01f 100644 --- a/ingestion/src/metadata/data_quality/processor/test_case_runner.py +++ b/ingestion/src/metadata/data_quality/processor/test_case_runner.py @@ -12,20 +12,35 @@ """ This Processor is in charge of executing the test cases """ -from typing import Optional +import traceback +from copy import deepcopy +from typing import List, Optional, cast -from metadata.data_quality.api.models import TestSuiteProcessorConfig -from metadata.data_quality.source.test_suite import TableAndTests -from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest -from metadata.generated.schema.entity.data.table import Table +from metadata.data_quality.api.models import ( + TableAndTests, + TestCaseDefinition, + TestCaseResultResponse, + TestCaseResults, + TestSuiteProcessorConfig, +) +from metadata.data_quality.runner.core import DataTestsRunner +from metadata.data_quality.runner.test_suite_source_factory import ( + test_suite_source_factory, +) +from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataWorkflowConfig, ) -from metadata.ingestion.api.models import Either +from metadata.generated.schema.tests.testCase import TestCase +from metadata.generated.schema.tests.testDefinition import TestDefinition, TestPlatform +from metadata.generated.schema.tests.testSuite import TestSuite +from metadata.generated.schema.type.basic import EntityLink, FullyQualifiedEntityName +from metadata.ingestion.api.models import Either, StackTraceError from metadata.ingestion.api.parser import parse_workflow_config_gracefully from metadata.ingestion.api.step import Step from metadata.ingestion.api.steps import Processor from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.utils import entity_link, fqn from metadata.utils.logger import test_suite_logger logger = test_suite_logger() @@ -34,8 +49,12 @@ class TestCaseRunner(Processor): """Execute the test suite tests and create test cases from the YAML config""" - def __init__(self, config: OpenMetadataWorkflowConfig): + def __init__(self, config: OpenMetadataWorkflowConfig, metadata: OpenMetadata): + + super().__init__() + self.config = config + self.metadata = metadata self.processor_config: TestSuiteProcessorConfig = ( TestSuiteProcessorConfig.parse_obj( @@ -50,13 +69,234 @@ def _run(self, record: TableAndTests) -> Either: # We pass the test suite request to the sink return Either(right=record.executable_test_suite) - ... + # Add the test cases from the YAML file, if any + test_cases = self.get_test_cases( + test_cases=record.test_cases, + test_suite_fqn=fqn.build( + None, + TestSuite, + table_fqn=record.table.fullyQualifiedName.__root__, + ), + table_fqn=record.table.fullyQualifiedName.__root__, + ) + + if not test_cases: + return Either( + left=StackTraceError( + name="No test Cases", + error=f"No tests cases found for table {record.table.fullyQualifiedName.__root__}", + ) + ) + + openmetadata_test_cases = self.filter_for_om_test_cases(test_cases) + + test_suite_runner = test_suite_source_factory.create( + record.service_type.lower(), + self.config, + self.metadata, + record.table, + ).get_data_quality_runner() + + test_results = [ + test_case_result + for test_case in openmetadata_test_cases + if (test_case_result := self._run_test_case(test_case, test_suite_runner)) + ] + + return Either(right=TestCaseResults(test_results=test_results)) + + def get_test_cases( + self, test_cases: Optional[List[TestCase]], test_suite_fqn: str, table_fqn: str + ) -> List[TestCase]: + """ + Based on the test suite test cases that we already know, pick up + the rest from the YAML config, compare and create the new ones + """ + if self.processor_config.testCases is not None: + cli_test_cases = self.get_test_case_from_cli_config() # type: ignore + cli_test_cases = cast( + List[TestCaseDefinition], cli_test_cases + ) # satisfy type checker + return self.compare_and_create_test_cases( + cli_test_cases_definitions=cli_test_cases, + test_cases=test_cases, + test_suite_fqn=test_suite_fqn, + table_fqn=table_fqn, + ) + + return test_cases + + def get_test_case_from_cli_config( + self, + ) -> Optional[List[TestCaseDefinition]]: + """Get all the test cases names defined in the CLI config file""" + if self.processor_config.testCases is not None: + return list(self.processor_config.testCases) + return None + + def compare_and_create_test_cases( + self, + cli_test_cases_definitions: Optional[List[TestCaseDefinition]], + test_cases: List[TestCase], + table_fqn: str, + test_suite_fqn: str, + ) -> List[TestCase]: + """ + compare test cases defined in CLI config workflow with test cases + defined on the server + Args: + cli_test_cases_definitions: test cases defined in CLI workflow associated with its test suite + test_cases: list of test cases entities fetch from the server using test suite names in the config file + table_fqn: table being tested + test_suite_fqn: FQN of the table + .testSuite + """ + if not cli_test_cases_definitions: + return test_cases + test_cases = deepcopy(test_cases) + test_case_names = {test_case.name.__root__ for test_case in test_cases} + + # we'll check the test cases defined in the CLI config file and not present in the platform + test_cases_to_create = [ + cli_test_case_definition + for cli_test_case_definition in cli_test_cases_definitions + if cli_test_case_definition.name not in test_case_names + ] + + if self.processor_config and self.processor_config.forceUpdate: + test_cases_to_update = [ + cli_test_case_definition + for cli_test_case_definition in cli_test_cases_definitions + if cli_test_case_definition.name in test_case_names + ] + test_cases = self._update_test_cases( + test_cases_to_update, test_cases, table_fqn + ) + + if not test_cases_to_create: + return test_cases + + for test_case_to_create in test_cases_to_create: + logger.debug(f"Creating test case with name {test_case_to_create.name}") + try: + test_case = self.metadata.create_or_update( + CreateTestCaseRequest( + name=test_case_to_create.name, + description=test_case_to_create.description, + displayName=test_case_to_create.displayName, + testDefinition=FullyQualifiedEntityName( + __root__=test_case_to_create.testDefinitionName + ), + entityLink=EntityLink( + __root__=entity_link.get_entity_link( + table_fqn, + test_case_to_create.columnName, + ) + ), + testSuite=test_suite_fqn, + parameterValues=list(test_case_to_create.parameterValues) + if test_case_to_create.parameterValues + else None, + owner=None, + ) + ) + test_cases.append(test_case) + except Exception as exc: + error = ( + f"Couldn't create test case name {test_case_to_create.name}: {exc}" + ) + logger.error(error) + logger.debug(traceback.format_exc()) + self.status.failed( + StackTraceError( + name=table_fqn, + error=error, + stack_trace=traceback.format_exc(), + ) + ) + + return test_cases + + def _update_test_cases( + self, + test_cases_to_update: List[TestCaseDefinition], + test_cases: List[TestCase], + table_fqn: str, + ): + """Given a list of CLI test definition patch test cases in the platform + + Args: + test_cases_to_update (List[TestCaseDefinition]): list of test case definitions + """ + test_cases_to_update_names = { + test_case_to_update.name for test_case_to_update in test_cases_to_update + } + for indx, test_case in enumerate(deepcopy(test_cases)): + if test_case.name.__root__ in test_cases_to_update_names: + test_case_definition = next( + test_case_to_update + for test_case_to_update in test_cases_to_update + if test_case_to_update.name == test_case.name.__root__ + ) + updated_test_case = self.metadata.patch_test_case_definition( + source=test_case, + entity_link=entity_link.get_entity_link( + table_fqn, + test_case_definition.columnName, + ), + test_case_parameter_values=test_case_definition.parameterValues, + ) + if updated_test_case: + test_cases.pop(indx) + test_cases.append(updated_test_case) + + return test_cases + + def filter_for_om_test_cases(self, test_cases: List[TestCase]) -> List[TestCase]: + """ + Filter test cases for OM test cases only. This will prevent us from running non OM test cases + + Args: + test_cases: list of test cases + """ + om_test_cases: List[TestCase] = [] + for test_case in test_cases: + test_definition: TestDefinition = self.metadata.get_by_id( + TestDefinition, test_case.testDefinition.id + ) + if TestPlatform.OpenMetadata not in test_definition.testPlatforms: + logger.debug( + f"Test case {test_case.name.__root__} is not an OpenMetadata test case." + ) + continue + om_test_cases.append(test_case) + + return om_test_cases + + def _run_test_case( + self, test_case: TestCase, test_suite_runner: DataTestsRunner + ) -> Optional[TestCaseResultResponse]: + """Execute the test case and return the result, if any""" + try: + test_result = test_suite_runner.run_and_handle(test_case) + self.status.scanned(test_case.fullyQualifiedName.__root__) + return test_result + except Exception as exc: + error = f"Could not run test case {test_case.name.__root__}: {exc}" + logger.debug(traceback.format_exc()) + logger.error(error) + self.status.failed( + StackTraceError( + name=test_case.name.__root__, + error=error, + stack_trace=traceback.format_exc(), + ) + ) @classmethod - def create(cls, config_dict: dict, _: OpenMetadata) -> "Step": + def create(cls, config_dict: dict, metadata: OpenMetadata) -> "Step": config = parse_workflow_config_gracefully(config_dict) - return cls(config=config) + return cls(config=config, metadata=metadata) def close(self) -> None: """Nothing to close""" diff --git a/ingestion/src/metadata/data_quality/runner/core.py b/ingestion/src/metadata/data_quality/runner/core.py index f31f6cb52b69..f53e885017a2 100644 --- a/ingestion/src/metadata/data_quality/runner/core.py +++ b/ingestion/src/metadata/data_quality/runner/core.py @@ -14,8 +14,8 @@ """ +from metadata.data_quality.api.models import TestCaseResultResponse from metadata.data_quality.interface.test_suite_interface import TestSuiteInterface -from metadata.data_quality.runner.models import TestCaseResultResponse from metadata.generated.schema.tests.testCase import TestCase from metadata.utils.logger import test_suite_logger @@ -26,15 +26,15 @@ class DataTestsRunner: """class to execute the test validation""" def __init__(self, test_runner_interface: TestSuiteInterface): - self.test_runner_interace = test_runner_interface + self.test_runner_interface = test_runner_interface def run_and_handle(self, test_case: TestCase): """run and handle test case validation""" logger.info( f"Executing test case {test_case.name.__root__} " - f"for entity {self.test_runner_interace.table_entity.fullyQualifiedName.__root__}" + f"for entity {self.test_runner_interface.table_entity.fullyQualifiedName.__root__}" ) - test_result = self.test_runner_interace.run_test_case( + test_result = self.test_runner_interface.run_test_case( test_case, ) diff --git a/ingestion/src/metadata/data_quality/runner/models.py b/ingestion/src/metadata/data_quality/runner/models.py deleted file mode 100644 index 3448b140155f..000000000000 --- a/ingestion/src/metadata/data_quality/runner/models.py +++ /dev/null @@ -1,24 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -test case result response object -""" - -from pydantic import BaseModel - -from metadata.generated.schema.tests.basic import TestCaseResult -from metadata.generated.schema.tests.testCase import TestCase - - -class TestCaseResultResponse(BaseModel): - testCaseResult: TestCaseResult - testCase: TestCase diff --git a/ingestion/src/metadata/data_quality/source/test_suite.py b/ingestion/src/metadata/data_quality/source/test_suite.py index 861d05006b66..7c9e2bda51af 100644 --- a/ingestion/src/metadata/data_quality/source/test_suite.py +++ b/ingestion/src/metadata/data_quality/source/test_suite.py @@ -17,9 +17,7 @@ import traceback from typing import Iterable, List, Optional, cast -from pydantic import BaseModel, Field - -from build.lib.metadata.ometa.fqn import split +from metadata.data_quality.api.models import TableAndTests from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.connections.serviceConnection import ( @@ -39,21 +37,13 @@ from metadata.ingestion.api.step import Step from metadata.ingestion.api.steps import Source from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.utils import fqn +from metadata.utils.fqn import split from metadata.utils.logger import test_suite_logger logger = test_suite_logger() -class TableAndTests(BaseModel): - """Source response bringing together the table and test cases""" - - table: Table = Field(None, description="Table being processed by the DQ workflow") - test_cases: Optional[List[TestCase]] = Field( - None, description="Test Cases already existing in the Test Suite, if any" - ) - executable_test_suite: Optional[CreateTestSuiteRequest] = Field(None, description="If no executable test suite is found, we'll create one") - - class TestSuiteSource(Source): """ Gets the ingredients required to run the tests @@ -121,7 +111,7 @@ def _retrieve_service_connection(self) -> None: __root__=self.service.connection ) - def _get_table_entity(self, entity_fqn: str) -> Optional[Table]: + def _get_table_entity(self) -> Optional[Table]: """given an entity fqn return the table entity Args: @@ -129,7 +119,7 @@ def _get_table_entity(self, entity_fqn: str) -> Optional[Table]: """ table: Table = self.metadata.get_by_name( entity=Table, - fqn=entity_fqn, + fqn=self.source_config.entityFullyQualifiedName.__root__, fields=["tableProfilerConfig", "testSuite"], ) @@ -158,9 +148,7 @@ def test_connection(self) -> None: self.metadata.health_check() def _iter(self) -> Iterable[Either[TableAndTests]]: - table: Table = self._get_table_entity( - self.source_config.entityFullyQualifiedName.__root__ - ) + table: Table = self._get_table_entity() if table: yield from self._process_table_suite(table) @@ -180,23 +168,26 @@ def _process_table_suite(self, table: Table) -> Iterable[Either[TableAndTests]]: """ # If there is no executable test suite yet for the table, we'll need to create one + executable_test_suite = None if not table.testSuite: - yield Either(right=TableAndTests( - executable_test_suite=CreateTestSuiteRequest( - name=f"{self.source_config.entityFullyQualifiedName.__root__}.testSuite", - displayName=f"{self.source_config.entityFullyQualifiedName.__root__} Test Suite", - description="Test Suite created from YAML processor config file", - owner=None, - executableEntityReference=self.source_config.entityFullyQualifiedName.__root__, - ) - )) + executable_test_suite = CreateTestSuiteRequest( + name=fqn.build( + None, + TestSuite, + table_fqn=self.source_config.entityFullyQualifiedName.__root__, + ), + displayName=f"{self.source_config.entityFullyQualifiedName.__root__} Test Suite", + description="Test Suite created from YAML processor config file", + owner=None, + executableEntityReference=self.source_config.entityFullyQualifiedName.__root__, + ) if table.testSuite and not table.testSuite.executable: yield Either( left=StackTraceError( name="Non-executable Test Suite", error=f"The table {self.source_config.entityFullyQualifiedName.__root__} " - "has a test suite that is not executable.", + "has a test suite that is not executable.", ) ) @@ -208,6 +199,8 @@ def _process_table_suite(self, table: Table) -> Iterable[Either[TableAndTests]]: right=TableAndTests( table=table, test_cases=test_suite_cases, + executable_test_suite=executable_test_suite, + service_type=self.service.serviceType.value, ) ) diff --git a/ingestion/src/metadata/ingestion/api/status.py b/ingestion/src/metadata/ingestion/api/status.py index 99b5493ec6c3..a5d5dbec47a3 100644 --- a/ingestion/src/metadata/ingestion/api/status.py +++ b/ingestion/src/metadata/ingestion/api/status.py @@ -37,9 +37,13 @@ class Status(BaseModel): def scanned(self, record: Any) -> None: """ - Clean up the status results we want to show + Clean up the status results we want to show. + + We allow to not consider specific records that + are not worth keeping record of. """ - self.records.append(get_log_name(record)) + if log_name := get_log_name(record): + self.records.append(log_name) def warning(self, key: str, reason: str) -> None: self.warnings.append({key: reason}) diff --git a/ingestion/src/metadata/ingestion/api/steps.py b/ingestion/src/metadata/ingestion/api/steps.py index 9c0023f28b6d..bf79e1c8d48a 100644 --- a/ingestion/src/metadata/ingestion/api/steps.py +++ b/ingestion/src/metadata/ingestion/api/steps.py @@ -14,7 +14,6 @@ from abc import ABC, abstractmethod from typing import Any -from metadata.ingestion.api.models import Either, Entity from metadata.ingestion.api.step import BulkStep, IterStep, ReturnStep, StageStep from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils.logger import ingestion_logger @@ -51,23 +50,10 @@ def test_connection(self) -> None: class Sink(ReturnStep, ABC): """All Sinks must inherit this base class.""" - @abstractmethod - def _run(self, record: Entity) -> Either: - """ - Send the data somewhere, e.g., the OM API - """ - class Processor(ReturnStep, ABC): """All Processor must inherit this base class""" - @abstractmethod - def _run(self, record: Entity) -> Either: - """ - Post process a given entity and return it - or a new one - """ - class Stage(StageStep, ABC): """All Stages must inherit this base class.""" diff --git a/ingestion/src/metadata/ingestion/models/custom_pydantic.py b/ingestion/src/metadata/ingestion/models/custom_pydantic.py index 728fa89d867f..7c69b286c54b 100644 --- a/ingestion/src/metadata/ingestion/models/custom_pydantic.py +++ b/ingestion/src/metadata/ingestion/models/custom_pydantic.py @@ -9,9 +9,13 @@ # See the License for the specific language governing permissions and # limitations under the License. """ -Pydantic classes overwritten defaults ones of code generation -""" +Pydantic classes overwritten defaults ones of code generation. +This classes are used in the generated module, which should have NO +dependencies against any other metadata package. This class should +be self-sufficient with only pydantic at import time. +""" +import logging import warnings from typing import Any, Dict @@ -19,9 +23,7 @@ from pydantic.utils import update_not_none from pydantic.validators import constr_length_validator, str_validator -from metadata.utils.logger import ingestion_logger - -logger = ingestion_logger() +logger = logging.getLogger("metadata") class CustomSecretStr(SecretStr): @@ -101,6 +103,6 @@ def get_secret_value(self, skip_secret_manager: bool = False) -> str: ) except Exception as exc: logger.error( - f"Secret value [{secret_id}] not present in the configured secrets manages: {exc}" + f"Secret value [{secret_id}] not present in the configured secrets manager: {exc}" ) return self._secret_value diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index bce198e02de5..27237bdc084e 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -22,7 +22,7 @@ from metadata.config.common import ConfigModel from metadata.data_insight.source.metadata import DataInsightRecord -from metadata.data_quality.runner.models import TestCaseResultResponse +from metadata.data_quality.api.models import TestCaseResultResponse, TestCaseResults from metadata.generated.schema.analytics.reportData import ReportData from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.api.teams.createRole import CreateRoleRequest @@ -516,6 +516,19 @@ def write_executable_test_suite( test_suite = self.metadata.create_or_update_executable_test_suite(record) return Either(right=test_suite) + @_run_dispatch.register + def write_test_case_results(self, record: TestCaseResults): + """Record the list of test case result responses""" + + for result in record.test_results or []: + self.metadata.add_test_case_results( + test_results=result.testCaseResult, + test_case_fqn=result.testCase.fullyQualifiedName.__root__, + ) + self.status.scanned(result) + + return Either(right=record) + def close(self): """ We don't have anything to close since we are using the given metadata client diff --git a/ingestion/src/metadata/utils/fqn.py b/ingestion/src/metadata/utils/fqn.py index aef4aa55a89c..304e13d1b08c 100644 --- a/ingestion/src/metadata/utils/fqn.py +++ b/ingestion/src/metadata/utils/fqn.py @@ -41,6 +41,7 @@ from metadata.generated.schema.entity.teams.team import Team from metadata.generated.schema.entity.teams.user import User from metadata.generated.schema.tests.testCase import TestCase +from metadata.generated.schema.tests.testSuite import TestSuite from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils.dispatch import class_register from metadata.utils.elasticsearch import get_entity_from_es_result @@ -81,12 +82,15 @@ def split(s: str) -> List[str]: return splitter.split() -def _build(*args) -> str: +def _build(*args, quote: bool = True) -> str: """ Equivalent of Java's FullyQualifiedName#build """ - quoted = [quote_name(name) for name in args] - return FQN_SEPARATOR.join(quoted) + if quote: + quoted = [quote_name(name) for name in args] + return FQN_SEPARATOR.join(quoted) + + return FQN_SEPARATOR.join(args) def unquote_name(name: str) -> str: @@ -255,6 +259,16 @@ def _( return _build(service_name, mlmodel_name) +@fqn_build_registry.add(TestSuite) +def _(_: Optional[OpenMetadata], *, table_fqn: str) -> str: + """ + We don't need to quote since this comes from a table FQN. + We're replicating the backend logic of the FQN generation in the TestSuiteRepository + for executable test suites. + """ + return _build(table_fqn, "testSuite", quote=False) + + @fqn_build_registry.add(Topic) def _( _: Optional[OpenMetadata], # ES Index not necessary for Topic FQN building diff --git a/ingestion/src/metadata/utils/logger.py b/ingestion/src/metadata/utils/logger.py index a32fa92006b4..59d7290ad368 100644 --- a/ingestion/src/metadata/utils/logger.py +++ b/ingestion/src/metadata/utils/logger.py @@ -18,6 +18,11 @@ from types import DynamicClassAttribute from typing import Optional, Union +from metadata.data_quality.api.models import ( + TableAndTests, + TestCaseResultResponse, + TestCaseResults, +) from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.ingestion.api.models import Entity from metadata.ingestion.models.delete_entity import DeleteEntity @@ -171,7 +176,7 @@ def log_ansi_encoded_string( @singledispatch -def get_log_name(record: Entity) -> str: +def get_log_name(record: Entity) -> Optional[str]: try: return f"{type(record).__name__} [{record.name.__root__}]" except Exception: @@ -225,3 +230,19 @@ def _(record: OMetaLifeCycleData) -> str: Capture the lifecycle changes of an Entity """ return f"{type(record.entity).__name__} Lifecycle [{record.entity.name.__root__}]" + + +@get_log_name.register +def _(record: TableAndTests) -> str: + return f"Tests for [{record.table.fullyQualifiedName.__root__}]" + + +@get_log_name.register +def _(_: TestCaseResults) -> Optional[str]: + """We don't want to log this in the status""" + return None + + +@get_log_name.register +def _(record: TestCaseResultResponse) -> str: + return record.testCase.fullyQualifiedName.__root__ diff --git a/ingestion/tests/integration/test_suite/test_e2e_workflow.py b/ingestion/tests/integration/test_suite/test_e2e_workflow.py index c9193dab6286..b252cddfeb6a 100644 --- a/ingestion/tests/integration/test_suite/test_e2e_workflow.py +++ b/ingestion/tests/integration/test_suite/test_e2e_workflow.py @@ -20,7 +20,6 @@ import sqlalchemy as sqa from sqlalchemy.orm import Session, declarative_base -from metadata.data_quality.api.workflow import TestSuiteWorkflow from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest from metadata.generated.schema.api.data.createDatabaseSchema import ( CreateDatabaseSchemaRequest, @@ -46,11 +45,12 @@ ) from metadata.generated.schema.tests.testCase import TestCase from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.workflow.data_quality import TestSuiteWorkflow test_suite_config = { "source": { "type": "custom-database", - "serviceName": "MyRabdomWorkflow", + "serviceName": "test_suite_service_test", "sourceConfig": { "config": { "type": "TestSuite", @@ -146,7 +146,7 @@ def setUpClass(cls): ) ) - table = cls.metadata.create_or_update( + cls.metadata.create_or_update( CreateTableRequest( name="users", columns=[ diff --git a/ingestion/tests/integration/test_suite/test_workflow.py b/ingestion/tests/integration/test_suite/test_workflow.py index 57806414f236..7139abcaf587 100644 --- a/ingestion/tests/integration/test_suite/test_workflow.py +++ b/ingestion/tests/integration/test_suite/test_workflow.py @@ -14,17 +14,19 @@ """ import unittest -from collections.abc import MutableSequence from copy import deepcopy +from typing import List -from metadata.data_quality.api.workflow import TestSuiteWorkflow +from metadata.data_quality.api.models import TableAndTests from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) from metadata.generated.schema.tests.testCase import TestCase from metadata.generated.schema.tests.testSuite import TestSuite +from metadata.ingestion.api.models import Either from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.workflow.data_quality import TestSuiteWorkflow sqlite_shared = "file:cachedb?mode=memory&cache=shared&check_same_thread=False" @@ -88,58 +90,51 @@ def tearDown(self) -> None: def test_create_workflow_object(self): """Test workflow object is correctly instantiated""" TestSuiteWorkflow.create(test_suite_config) - assert True - def test_create_workflow_object_from_cli_config(self): + def test_create_workflow_object_with_table_with_test_suite(self): """test workflow object is instantiated correctly from cli config""" - _test_suite_config = deepcopy(test_suite_config) + workflow = TestSuiteWorkflow.create(test_suite_config) - processor = { - "processor": { - "type": "orm-test-runner", - "config": { - "testCases": [ - { - "name": "my_test_case", - "testDefinitionName": "tableColumnCountToBeBetween", - "parameterValues": [ - {"name": "minColValue", "value": 1}, - {"name": "maxColValue", "value": 5}, - ], - } - ] - }, - } - } + table: Table = workflow.source._get_table_entity() - _test_suite_config.update(processor) + table_and_tests: TableAndTests = list( + workflow.source._process_table_suite(table=table) + )[0] - workflow = TestSuiteWorkflow.create(_test_suite_config) - workflow_test_suite = workflow.create_or_return_test_suite_entity() + # If the table already has a test suite, we won't be generating one + self.assertIsNotNone(table.testSuite) + self.assertIsNone(table_and_tests.right.executable_test_suite) - test_suite = self.metadata.get_by_name( - entity=TestSuite, - fqn="sample_data.ecommerce_db.shopify.dim_address.testSuite", - ) + # We will pick up the tests from it + # Note that this number comes from what is defined in the sample data + self.assertEquals(len(table_and_tests.right.test_cases), 5) - assert workflow_test_suite.id == test_suite.id - self.test_suite_ids = [test_suite.id] + def test_create_workflow_config_with_table_without_suite(self): + """We'll prepare the test suite creation payload""" - def test_create_or_return_test_suite_entity(self): - """test we can correctly retrieve a test suite""" _test_suite_config = deepcopy(test_suite_config) + _test_suite_config["source"]["sourceConfig"]["config"][ + "entityFullyQualifiedName" + ] = "sample_data.ecommerce_db.shopify.dim_staff" workflow = TestSuiteWorkflow.create(_test_suite_config) - test_suite = workflow.create_or_return_test_suite_entity() - expected_test_suite = self.metadata.get_by_name( - entity=TestSuite, fqn="critical_metrics_suite" + # If the table does not have a test suite, we'll prepare the request to create one + table: Table = workflow.source._get_table_entity() + + table_and_tests: Either[TableAndTests] = list( + workflow.source._process_table_suite(table=table) + )[0] + + self.assertIsNone(table.testSuite) + self.assertEquals( + table_and_tests.right.executable_test_suite.name.__root__, + "sample_data.ecommerce_db.shopify.dim_staff.testSuite", ) - assert test_suite + def test_create_workflow_config_with_tests(self): + """We'll get the tests from the workflow YAML""" - def test_get_test_cases_from_test_suite(self): - """test test cases are correctly returned for specific test suite""" _test_suite_config = deepcopy(test_suite_config) processor = { @@ -161,19 +156,34 @@ def test_get_test_cases_from_test_suite(self): } _test_suite_config.update(processor) - workflow = TestSuiteWorkflow.create(_test_suite_config) - test_suite = workflow.create_or_return_test_suite_entity() - test_cases = workflow.get_test_cases_from_test_suite(test_suite) - assert isinstance(test_cases, MutableSequence) - assert isinstance(test_cases[0], TestCase) - assert {"my_test_case"}.intersection( - {test_case.name.__root__ for test_case in test_cases} + table: Table = workflow.source._get_table_entity() + table_and_tests: Either[TableAndTests] = list( + workflow.source._process_table_suite(table=table) + )[0] + + test_cases: List[TestCase] = workflow.steps[0].get_test_cases( + test_cases=table_and_tests.right.test_cases, + test_suite_fqn="sample_data.ecommerce_db.shopify.dim_address.testSuite", + table_fqn="sample_data.ecommerce_db.shopify.dim_address", + ) + + # 5 defined test cases + the new one in the YAML + self.assertEquals(len(test_cases), 6) + + new_test_case = next( + (test for test in test_cases if test.name.__root__ == "my_test_case"), None ) + self.assertIsNotNone(new_test_case) - for test_case in test_cases: - self.metadata.delete(entity=TestCase, entity_id=test_case.id) + # cleanup + self.metadata.delete( + entity=TestCase, + entity_id=new_test_case.id, + recursive=True, + hard_delete=True, + ) def test_get_test_case_names_from_cli_config(self): """test we can get all test case names from cli config""" @@ -208,7 +218,7 @@ def test_get_test_case_names_from_cli_config(self): _test_suite_config.update(processor) workflow = TestSuiteWorkflow.create(_test_suite_config) - test_cases_def = workflow.get_test_case_from_cli_config() + test_cases_def = workflow.steps[0].get_test_case_from_cli_config() assert [test_case_def.name for test_case_def in test_cases_def] == [ "my_test_case", @@ -259,15 +269,17 @@ def test_compare_and_create_test_cases(self): fqn="sample_data.ecommerce_db.shopify.dim_address.address_id.my_test_case_two", ) - test_suite = workflow.create_or_return_test_suite_entity() - test_cases = self.metadata.list_entities( - entity=TestCase, - fields=["testSuite", "entityLink", "testDefinition"], - params={"testSuiteId": test_suite.id.__root__}, - ).entities - config_test_cases_def = workflow.get_test_case_from_cli_config() - created_test_case = workflow.compare_and_create_test_cases( - config_test_cases_def, test_cases, test_suite + table: Table = workflow.source._get_table_entity() + table_and_tests: Either[TableAndTests] = list( + workflow.source._process_table_suite(table=table) + )[0] + + config_test_cases_def = workflow.steps[0].get_test_case_from_cli_config() + created_test_case = workflow.steps[0].compare_and_create_test_cases( + cli_test_cases_definitions=config_test_cases_def, + test_cases=table_and_tests.right.test_cases, + test_suite_fqn="sample_data.ecommerce_db.shopify.dim_address.testSuite", + table_fqn="sample_data.ecommerce_db.shopify.dim_address", ) # clean up test @@ -285,28 +297,8 @@ def test_compare_and_create_test_cases(self): assert my_test_case assert my_test_case_two - assert len(created_test_case) == 2 + # We return the 5 sample data tests & the 2 new ones + assert len(created_test_case) == 7 self.metadata.delete(entity=TestCase, entity_id=my_test_case.id) self.metadata.delete(entity=TestCase, entity_id=my_test_case_two.id) - - def test_get_table_entity(self): - """test get service connection returns correct info""" - workflow = TestSuiteWorkflow.create(test_suite_config) - service_connection = workflow._get_table_entity( - "sample_data.ecommerce_db.shopify.dim_address" - ) - - assert isinstance(service_connection, Table) - - # def test_filter_for_om_test_cases(self): - # """test filter for OM test cases method""" - # om_test_case_1 = TestCase( - # name="om_test_case_1", - # testDefinition=self.metadata.get_entity_reference( - # TestDefinition, - # "columnValuesToMatchRegex" - # ), - # entityLink="", - # testSuite=self.metadata.get_entity_reference("sample_data.ecommerce_db.shopify.dim_address.TestSuite"), - # ) diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py index 0cc0c2d3b217..a1e022111599 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py @@ -17,7 +17,6 @@ from openmetadata_managed_apis.utils.logger import set_operator_logger from openmetadata_managed_apis.workflows.ingestion.common import build_dag, build_source -from metadata.data_quality.api.workflow import TestSuiteWorkflow from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( IngestionPipeline, ) @@ -29,6 +28,7 @@ WorkflowConfig, ) from metadata.ingestion.models.encoders import show_secrets_encoder +from metadata.workflow.data_quality import TestSuiteWorkflow from metadata.workflow.workflow_output_handler import print_test_suite_status diff --git a/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py b/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py index 0dc337c1bc03..cf9be2d7863e 100644 --- a/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py +++ b/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py @@ -33,7 +33,6 @@ build_usage_workflow_config, ) -from metadata.data_quality.api.workflow import TestSuiteWorkflow from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) @@ -69,6 +68,7 @@ from metadata.ingestion.api.parser import parse_workflow_config_gracefully from metadata.ingestion.models.encoders import show_secrets_encoder from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.workflow.data_quality import TestSuiteWorkflow from metadata.workflow.metadata import MetadataWorkflow from metadata.workflow.profiler import ProfilerWorkflow