From 4c645a05a8a3ba9763d5b4550af3c31920d0989a Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Wed, 4 Oct 2023 07:45:17 +0200 Subject: [PATCH 01/12] DQ BaseWorkflow --- .../src/metadata/data_quality/api/workflow.py | 21 +---- .../data_quality/sink/metadata_rest.py | 90 ------------------- .../metadata/ingestion/sink/metadata_rest.py | 15 +++- .../src/metadata/workflow/data_quality.py | 23 +++++ 4 files changed, 39 insertions(+), 110 deletions(-) delete mode 100644 ingestion/src/metadata/data_quality/sink/metadata_rest.py create mode 100644 ingestion/src/metadata/workflow/data_quality.py diff --git a/ingestion/src/metadata/data_quality/api/workflow.py b/ingestion/src/metadata/data_quality/api/workflow.py index dadd28ebce1b..80ae0b503c78 100644 --- a/ingestion/src/metadata/data_quality/api/workflow.py +++ b/ingestion/src/metadata/data_quality/api/workflow.py @@ -20,7 +20,7 @@ from logging import Logger from typing import List, Optional, cast -from pydantic import BaseModel, ValidationError +from pydantic import ValidationError from metadata.config.common import WorkflowExecutionError from metadata.data_quality.api.models import ( @@ -67,22 +67,6 @@ logger: Logger = test_suite_logger() -class TestCaseToCreate(BaseModel): - """Test case to create""" - - test_suite_name: str - test_case_name: str - entity_link: str - - def __hash__(self): - """make this base model hashable on unique_name""" - return hash(f"{self.test_suite_name}.{self.test_case_name}") - - def __str__(self) -> str: - """make this base model printable""" - return f"{self.test_suite_name}.{self.test_case_name}" - - class TestSuiteWorkflow(WorkflowStatusMixin): """workflow to run the test suite""" @@ -125,7 +109,6 @@ def __init__(self, config: OpenMetadataWorkflowConfig): sink_type=self.config.sink.type, sink_config=self.config.sink, metadata_config=self.metadata_config, - from_="data_quality", ) @classmethod @@ -425,7 +408,7 @@ def run_test_suite(self): if not test_result: continue if hasattr(self, "sink"): - self.sink.write_record(test_result) + self.sink.run(test_result) logger.debug(f"Successfully ran test case {test_case.name.__root__}") self.status.processed(test_case.fullyQualifiedName.__root__) except Exception as exc: diff --git a/ingestion/src/metadata/data_quality/sink/metadata_rest.py b/ingestion/src/metadata/data_quality/sink/metadata_rest.py deleted file mode 100644 index 7ac7ed6e9f7b..000000000000 --- a/ingestion/src/metadata/data_quality/sink/metadata_rest.py +++ /dev/null @@ -1,90 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -OpenMetadata REST Sink implementation for the ORM Profiler results -""" - -import traceback -from typing import Optional - -from metadata.config.common import ConfigModel -from metadata.data_quality.runner.models import TestCaseResultResponse -from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( - OpenMetadataConnection, -) -from metadata.ingestion.api.common import Entity -from metadata.ingestion.api.models import StackTraceError -from metadata.ingestion.api.sink import Sink -from metadata.ingestion.ometa.client import APIError -from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.utils.logger import test_suite_logger - -logger = test_suite_logger() - - -class MetadataRestSinkConfig(ConfigModel): - api_endpoint: Optional[str] = None - - -class MetadataRestSink(Sink[Entity]): - """ - Metadata Sink sending the test suite - to the OM API - """ - - config: MetadataRestSinkConfig - - def __init__( - self, - config: MetadataRestSinkConfig, - metadata_config: OpenMetadataConnection, - ): - super().__init__() - self.config = config - self.metadata_config = metadata_config - self.wrote_something = False - self.metadata = OpenMetadata(self.metadata_config) - - @classmethod - def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection): - config = MetadataRestSinkConfig.parse_obj(config_dict) - return cls(config, metadata_config) - - def close(self) -> None: - self.metadata.close() - - def write_record(self, record: TestCaseResultResponse) -> None: - try: - self.metadata.add_test_case_results( - test_results=record.testCaseResult, - test_case_fqn=record.testCase.fullyQualifiedName.__root__, - ) - logger.info( - f"Successfully ingested test case results for test case {record.testCase.name.__root__}" - ) - - self.status.records_written( - f"Test Case: {record.testCase.fullyQualifiedName.__root__}" - ) - - except APIError as err: - name = record.testCase.fullyQualifiedName.__root__ - error = f"Failed to sink test case results for {name}: {err}" - logger.error(error) - logger.debug(traceback.format_exc()) - self.status.failed( - StackTraceError( - name=name, - error=error, - stack_trace=traceback.format_exc(), - ) - ) diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index ea8877c187a0..6e796d982900 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -22,6 +22,7 @@ from metadata.config.common import ConfigModel from metadata.data_insight.source.metadata import DataInsightRecord +from metadata.data_quality.runner.models import TestCaseResultResponse from metadata.generated.schema.analytics.reportData import ReportData from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.api.teams.createRole import CreateRoleRequest @@ -375,6 +376,18 @@ def write_test_case_results_sample( ) return Either(right=record.test_case_results) + @_run_dispatch.register + def write_test_case_results(self, record: TestCaseResultResponse): + """Write the test case result""" + res = self.metadata.add_test_case_results( + test_results=record.testCaseResult, + test_case_fqn=record.testCase.fullyQualifiedName.__root__, + ) + logger.debug( + f"Successfully ingested test case results for test case {record.testCase.name.__root__}" + ) + return Either(right=res) + @_run_dispatch.register def write_data_insight_sample( self, record: OMetaDataInsightSample @@ -385,7 +398,7 @@ def write_data_insight_sample( self.metadata.add_data_insight_report_data( record.record, ) - return Either(left=None, right=record.record) + return Either(right=record.record) @_run_dispatch.register def write_data_insight(self, record: DataInsightRecord) -> Either[ReportData]: diff --git a/ingestion/src/metadata/workflow/data_quality.py b/ingestion/src/metadata/workflow/data_quality.py new file mode 100644 index 000000000000..3c4065f9d8bb --- /dev/null +++ b/ingestion/src/metadata/workflow/data_quality.py @@ -0,0 +1,23 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Workflow definition for the Data Quality +""" +from metadata.workflow.base import BaseWorkflow + + +class TestSuiteWorkflow(BaseWorkflow): + """ + DAta Quality ingestion workflow implementation + + We check the source connection test when initializing + this workflow. No need to do anything here if this does not pass + """ From 07da1e746d3c774dbd0fc5f3a1126a2f2ab649cc Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Wed, 4 Oct 2023 09:13:04 +0200 Subject: [PATCH 02/12] Test suite runner --- .../src/metadata/data_quality/api/workflow.py | 2 +- .../{source => runner}/base_test_suite_source.py | 2 +- .../test_suite_source_factory.py | 10 +++++----- .../metadata/data_quality/source/test_suite.py | 16 ++++++++++++++++ ingestion/src/metadata/workflow/data_quality.py | 3 +++ 5 files changed, 26 insertions(+), 7 deletions(-) rename ingestion/src/metadata/data_quality/{source => runner}/base_test_suite_source.py (99%) rename ingestion/src/metadata/data_quality/{source => runner}/test_suite_source_factory.py (83%) create mode 100644 ingestion/src/metadata/data_quality/source/test_suite.py diff --git a/ingestion/src/metadata/data_quality/api/workflow.py b/ingestion/src/metadata/data_quality/api/workflow.py index 80ae0b503c78..d9e1461f770f 100644 --- a/ingestion/src/metadata/data_quality/api/workflow.py +++ b/ingestion/src/metadata/data_quality/api/workflow.py @@ -27,7 +27,7 @@ TestCaseDefinition, TestSuiteProcessorConfig, ) -from metadata.data_quality.source.test_suite_source_factory import ( +from metadata.data_quality.runner.test_suite_source_factory import ( test_suite_source_factory, ) from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest diff --git a/ingestion/src/metadata/data_quality/source/base_test_suite_source.py b/ingestion/src/metadata/data_quality/runner/base_test_suite_source.py similarity index 99% rename from ingestion/src/metadata/data_quality/source/base_test_suite_source.py rename to ingestion/src/metadata/data_quality/runner/base_test_suite_source.py index bd7cd9923d61..b21bbb354ae4 100644 --- a/ingestion/src/metadata/data_quality/source/base_test_suite_source.py +++ b/ingestion/src/metadata/data_quality/runner/base_test_suite_source.py @@ -36,7 +36,7 @@ NON_SQA_DATABASE_CONNECTIONS = (DatalakeConnection,) -class BaseTestSuiteSource: +class BaseTestSuiteRunner: """Base class for the data quality runner""" def __init__( diff --git a/ingestion/src/metadata/data_quality/source/test_suite_source_factory.py b/ingestion/src/metadata/data_quality/runner/test_suite_source_factory.py similarity index 83% rename from ingestion/src/metadata/data_quality/source/test_suite_source_factory.py rename to ingestion/src/metadata/data_quality/runner/test_suite_source_factory.py index e41533552953..2b1a245d8ccd 100644 --- a/ingestion/src/metadata/data_quality/source/test_suite_source_factory.py +++ b/ingestion/src/metadata/data_quality/runner/test_suite_source_factory.py @@ -13,20 +13,20 @@ Factory class for creating test suite source objects """ -from metadata.data_quality.source.base_test_suite_source import BaseTestSuiteSource +from metadata.data_quality.runner.base_test_suite_source import BaseTestSuiteRunner -class TestSuiteSourceFactory: +class TestSuiteRunnerFactory: """Creational factory for test suite source objects""" def __init__(self): - self._source_type = {"base": BaseTestSuiteSource} + self._source_type = {"base": BaseTestSuiteRunner} def register_source(self, source_type: str, source_class): """Register a new source type""" self._source_type[source_type] = source_class - def create(self, source_type: str, *args, **kwargs) -> BaseTestSuiteSource: + def create(self, source_type: str, *args, **kwargs) -> BaseTestSuiteRunner: """Create source object based on source type""" source_class = self._source_type.get(source_type) if not source_class: @@ -35,4 +35,4 @@ def create(self, source_type: str, *args, **kwargs) -> BaseTestSuiteSource: return source_class(*args, **kwargs) -test_suite_source_factory = TestSuiteSourceFactory() +test_suite_source_factory = TestSuiteRunnerFactory() diff --git a/ingestion/src/metadata/data_quality/source/test_suite.py b/ingestion/src/metadata/data_quality/source/test_suite.py new file mode 100644 index 000000000000..00dc07eeda6c --- /dev/null +++ b/ingestion/src/metadata/data_quality/source/test_suite.py @@ -0,0 +1,16 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test Suite Workflow Source + +The main goal is to get the configured table from the API. +""" \ No newline at end of file diff --git a/ingestion/src/metadata/workflow/data_quality.py b/ingestion/src/metadata/workflow/data_quality.py index 3c4065f9d8bb..d84236883d04 100644 --- a/ingestion/src/metadata/workflow/data_quality.py +++ b/ingestion/src/metadata/workflow/data_quality.py @@ -21,3 +21,6 @@ class TestSuiteWorkflow(BaseWorkflow): We check the source connection test when initializing this workflow. No need to do anything here if this does not pass """ + + def set_steps(self): + pass From f6a01782e456c415ea0da092b3177f983fde4ebf Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Wed, 4 Oct 2023 11:59:18 +0200 Subject: [PATCH 03/12] test Suite workflow --- .../data_quality/processor/__init__.py | 0 .../processor/test_case_runner.py | 62 ++++++ .../data_quality/source/test_suite.py | 206 +++++++++++++++++- .../metadata/ingestion/sink/metadata_rest.py | 11 + .../src/metadata/workflow/data_quality.py | 27 ++- 5 files changed, 304 insertions(+), 2 deletions(-) create mode 100644 ingestion/src/metadata/data_quality/processor/__init__.py create mode 100644 ingestion/src/metadata/data_quality/processor/test_case_runner.py diff --git a/ingestion/src/metadata/data_quality/processor/__init__.py b/ingestion/src/metadata/data_quality/processor/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/ingestion/src/metadata/data_quality/processor/test_case_runner.py b/ingestion/src/metadata/data_quality/processor/test_case_runner.py new file mode 100644 index 000000000000..9af57ff5371e --- /dev/null +++ b/ingestion/src/metadata/data_quality/processor/test_case_runner.py @@ -0,0 +1,62 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This Processor is in charge of executing the test cases +""" +from typing import Optional + +from metadata.data_quality.api.models import TestSuiteProcessorConfig +from metadata.data_quality.source.test_suite import TableAndTests +from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest +from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataWorkflowConfig, +) +from metadata.ingestion.api.models import Either +from metadata.ingestion.api.parser import parse_workflow_config_gracefully +from metadata.ingestion.api.step import Step +from metadata.ingestion.api.steps import Processor +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.utils.logger import test_suite_logger + +logger = test_suite_logger() + + +class TestCaseRunner(Processor): + """Execute the test suite tests and create test cases from the YAML config""" + + def __init__(self, config: OpenMetadataWorkflowConfig): + self.config = config + + self.processor_config: TestSuiteProcessorConfig = ( + TestSuiteProcessorConfig.parse_obj( + self.config.processor.dict().get("config") + ) + ) + + def _run(self, record: TableAndTests) -> Either: + # First, create the executable test suite if it does not exist yet + # This could happen if the process is executed from YAML and not the UI + if record.executable_test_suite: + # We pass the test suite request to the sink + return Either(right=record.executable_test_suite) + + ... + + + @classmethod + def create(cls, config_dict: dict, _: OpenMetadata) -> "Step": + config = parse_workflow_config_gracefully(config_dict) + return cls(config=config) + + def close(self) -> None: + """Nothing to close""" diff --git a/ingestion/src/metadata/data_quality/source/test_suite.py b/ingestion/src/metadata/data_quality/source/test_suite.py index 00dc07eeda6c..861d05006b66 100644 --- a/ingestion/src/metadata/data_quality/source/test_suite.py +++ b/ingestion/src/metadata/data_quality/source/test_suite.py @@ -13,4 +13,208 @@ Test Suite Workflow Source The main goal is to get the configured table from the API. -""" \ No newline at end of file +""" +import traceback +from typing import Iterable, List, Optional, cast + +from pydantic import BaseModel, Field + +from build.lib.metadata.ometa.fqn import split +from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest +from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.entity.services.connections.serviceConnection import ( + ServiceConnection, +) +from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.generated.schema.metadataIngestion.testSuitePipeline import ( + TestSuitePipeline, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataWorkflowConfig, +) +from metadata.generated.schema.tests.testCase import TestCase +from metadata.generated.schema.tests.testSuite import TestSuite +from metadata.ingestion.api.models import Either, StackTraceError +from metadata.ingestion.api.parser import parse_workflow_config_gracefully +from metadata.ingestion.api.step import Step +from metadata.ingestion.api.steps import Source +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.utils.logger import test_suite_logger + +logger = test_suite_logger() + + +class TableAndTests(BaseModel): + """Source response bringing together the table and test cases""" + + table: Table = Field(None, description="Table being processed by the DQ workflow") + test_cases: Optional[List[TestCase]] = Field( + None, description="Test Cases already existing in the Test Suite, if any" + ) + executable_test_suite: Optional[CreateTestSuiteRequest] = Field(None, description="If no executable test suite is found, we'll create one") + + +class TestSuiteSource(Source): + """ + Gets the ingredients required to run the tests + """ + + def __init__( + self, + config: OpenMetadataWorkflowConfig, + metadata: OpenMetadata, + ): + super().__init__() + + self.config = config + self.metadata = metadata + + self.source_config: TestSuitePipeline = self.config.source.sourceConfig.config + + self.service: DatabaseService = self._retrieve_service() + self._retrieve_service_connection() + + self.test_connection() + + def _retrieve_service(self) -> DatabaseService: + """Get service object from source config `entityFullyQualifiedName`""" + fully_qualified_name = self.source_config.entityFullyQualifiedName.__root__ + try: + service_name = split(fully_qualified_name)[0] + except IndexError as exc: + logger.debug(traceback.format_exc()) + raise IndexError( + f"Could not retrieve service name from entity fully qualified name {fully_qualified_name}: {exc}" + ) + try: + service = self.metadata.get_by_name(DatabaseService, service_name) + if not service: + raise ConnectionError( + f"Could not retrieve service with name `{service_name}`. " + "Typically caused by the `entityFullyQualifiedName` does not exists in OpenMetadata " + "or the JWT Token is invalid." + ) + + return service + + except ConnectionError as exc: + raise exc + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.error( + f"Error getting service connection for service name [{service_name}]" + f" using the secrets manager provider [{self.metadata.config.secretsManagerProvider}]: {exc}" + ) + raise exc + + def _retrieve_service_connection(self) -> None: + """ + We override the current `serviceConnection` source config object if source workflow service already exists + in OM. When it is configured, we retrieve the service connection from the secrets' manager. Otherwise, we get it + from the service object itself through the default `SecretsManager`. + """ + if ( + not self.config.source.serviceConnection + and not self.metadata.config.forceEntityOverwriting + ): + self.config.source.serviceConnection = ServiceConnection( + __root__=self.service.connection + ) + + def _get_table_entity(self, entity_fqn: str) -> Optional[Table]: + """given an entity fqn return the table entity + + Args: + entity_fqn: entity fqn for the test case + """ + table: Table = self.metadata.get_by_name( + entity=Table, + fqn=entity_fqn, + fields=["tableProfilerConfig", "testSuite"], + ) + + return table + + def _get_test_cases_from_test_suite( + self, test_suite: Optional[TestSuite] + ) -> Optional[List[TestCase]]: + """Return test cases if the test suite exists and has them""" + if test_suite: + test_cases = self.metadata.list_entities( + entity=TestCase, + fields=["testSuite", "entityLink", "testDefinition"], + params={"testSuiteId": test_suite.id.__root__}, + ).entities + test_cases = cast(List[TestCase], test_cases) # satisfy type checker + + return test_cases + + return None + + def prepare(self): + """Nothing to prepare""" + + def test_connection(self) -> None: + self.metadata.health_check() + + def _iter(self) -> Iterable[Either[TableAndTests]]: + table: Table = self._get_table_entity( + self.source_config.entityFullyQualifiedName.__root__ + ) + + if table: + yield from self._process_table_suite(table) + + else: + yield Either( + left=StackTraceError( + name="Missing Table", + error=f"Could not retrieve table entity for {self.source_config.entityFullyQualifiedName.__root__}. " + "Make sure the table exists in OpenMetadata and/or the JWT Token provided is valid.", + ) + ) + + def _process_table_suite(self, table: Table) -> Iterable[Either[TableAndTests]]: + """ + Check that the table has the proper test suite built in + """ + + # If there is no executable test suite yet for the table, we'll need to create one + if not table.testSuite: + yield Either(right=TableAndTests( + executable_test_suite=CreateTestSuiteRequest( + name=f"{self.source_config.entityFullyQualifiedName.__root__}.testSuite", + displayName=f"{self.source_config.entityFullyQualifiedName.__root__} Test Suite", + description="Test Suite created from YAML processor config file", + owner=None, + executableEntityReference=self.source_config.entityFullyQualifiedName.__root__, + ) + )) + + if table.testSuite and not table.testSuite.executable: + yield Either( + left=StackTraceError( + name="Non-executable Test Suite", + error=f"The table {self.source_config.entityFullyQualifiedName.__root__} " + "has a test suite that is not executable.", + ) + ) + + else: + + test_suite_cases = self._get_test_cases_from_test_suite(table.testSuite) + + yield Either( + right=TableAndTests( + table=table, + test_cases=test_suite_cases, + ) + ) + + @classmethod + def create(cls, config_dict: dict, metadata: OpenMetadata) -> "Step": + config = parse_workflow_config_gracefully(config_dict) + return cls(config=config, metadata=metadata) + + def close(self) -> None: + """Nothing to close""" diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index 3cf72d629d73..bce198e02de5 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -31,6 +31,7 @@ from metadata.generated.schema.api.tests.createLogicalTestCases import ( CreateLogicalTestCases, ) +from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest from metadata.generated.schema.dataInsight.kpi.basic import KpiResult from metadata.generated.schema.entity.classification.tag import Tag from metadata.generated.schema.entity.data.dashboard import Dashboard @@ -505,6 +506,16 @@ def write_profiler_response(self, record: ProfilerResponse) -> Either[Table]: return Either(right=table) + @_run_dispatch.register + def write_executable_test_suite( + self, record: CreateTestSuiteRequest + ) -> Either[TestSuite]: + """ + From the test suite workflow we might need to create executable test suites + """ + test_suite = self.metadata.create_or_update_executable_test_suite(record) + return Either(right=test_suite) + def close(self): """ We don't have anything to close since we are using the given metadata client diff --git a/ingestion/src/metadata/workflow/data_quality.py b/ingestion/src/metadata/workflow/data_quality.py index d84236883d04..b0883c9e066f 100644 --- a/ingestion/src/metadata/workflow/data_quality.py +++ b/ingestion/src/metadata/workflow/data_quality.py @@ -11,8 +11,16 @@ """ Workflow definition for the Data Quality """ +from metadata.data_quality.processor.test_case_runner import TestCaseRunner +from metadata.data_quality.source.test_suite import TestSuiteSource +from metadata.ingestion.api.processor import Processor +from metadata.ingestion.api.sink import Sink +from metadata.utils.importer import import_sink_class +from metadata.utils.logger import test_suite_logger from metadata.workflow.base import BaseWorkflow +logger = test_suite_logger() + class TestSuiteWorkflow(BaseWorkflow): """ @@ -23,4 +31,21 @@ class TestSuiteWorkflow(BaseWorkflow): """ def set_steps(self): - pass + self.source = TestSuiteSource.create(self.config.dict(), self.metadata) + + test_runner_processor = self._get_test_runner_processor() + sink = self._get_sink() + + self.steps = (test_runner_processor, sink) + + def _get_sink(self) -> Sink: + sink_type = self.config.sink.type + sink_class = import_sink_class(sink_type=sink_type) + sink_config = self.config.sink.dict().get("config", {}) + sink: Sink = sink_class.create(sink_config, self.metadata) + logger.debug(f"Sink type:{self.config.sink.type}, {sink_class} configured") + + return sink + + def _get_test_runner_processor(self) -> Processor: + return TestCaseRunner.create(self.config.dict(), self.metadata) From 93bb184ce1e9293f2672881ecb308005449c5c9c Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Thu, 5 Oct 2023 16:03:26 +0200 Subject: [PATCH 04/12] Refactor DQ for BaseWorkflow --- ingestion/operators/docker/main.py | 2 +- ingestion/src/metadata/cli/dataquality.py | 6 +- .../src/metadata/data_quality/api/models.py | 31 +- .../src/metadata/data_quality/api/workflow.py | 494 ------------------ .../processor/test_case_runner.py | 260 ++++++++- .../src/metadata/data_quality/runner/core.py | 8 +- .../metadata/data_quality/runner/models.py | 24 - .../data_quality/source/test_suite.py | 49 +- .../src/metadata/ingestion/api/status.py | 8 +- ingestion/src/metadata/ingestion/api/steps.py | 14 - .../ingestion/models/custom_pydantic.py | 14 +- .../metadata/ingestion/sink/metadata_rest.py | 15 +- ingestion/src/metadata/utils/fqn.py | 20 +- ingestion/src/metadata/utils/logger.py | 23 +- .../test_suite/test_e2e_workflow.py | 6 +- .../integration/test_suite/test_workflow.py | 152 +++--- .../workflows/ingestion/test_suite.py | 2 +- .../test_workflow_creation.py | 2 +- 18 files changed, 453 insertions(+), 677 deletions(-) delete mode 100644 ingestion/src/metadata/data_quality/api/workflow.py delete mode 100644 ingestion/src/metadata/data_quality/runner/models.py diff --git a/ingestion/operators/docker/main.py b/ingestion/operators/docker/main.py index d52edfbe9a45..eb494f727ebf 100644 --- a/ingestion/operators/docker/main.py +++ b/ingestion/operators/docker/main.py @@ -15,13 +15,13 @@ import yaml -from metadata.data_quality.api.workflow import TestSuiteWorkflow from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( PipelineType, ) from metadata.generated.schema.metadataIngestion.workflow import LogLevels from metadata.utils.logger import set_loggers_level from metadata.workflow.data_insight import DataInsightWorkflow +from metadata.workflow.data_quality import TestSuiteWorkflow from metadata.workflow.metadata import MetadataWorkflow from metadata.workflow.profiler import ProfilerWorkflow from metadata.workflow.usage import UsageWorkflow diff --git a/ingestion/src/metadata/cli/dataquality.py b/ingestion/src/metadata/cli/dataquality.py index 4899a9fa9bdd..7a97317ab369 100644 --- a/ingestion/src/metadata/cli/dataquality.py +++ b/ingestion/src/metadata/cli/dataquality.py @@ -17,12 +17,12 @@ import traceback from metadata.config.common import load_config_file -from metadata.data_quality.api.workflow import TestSuiteWorkflow from metadata.utils.logger import cli_logger +from metadata.workflow.data_quality import TestSuiteWorkflow from metadata.workflow.workflow_output_handler import ( WorkflowType, print_init_error, - print_test_suite_status, + print_status, ) logger = cli_logger() @@ -48,5 +48,5 @@ def run_test(config_path: str) -> None: workflow.execute() workflow.stop() - print_test_suite_status(workflow) + print_status(workflow) workflow.raise_from_status() diff --git a/ingestion/src/metadata/data_quality/api/models.py b/ingestion/src/metadata/data_quality/api/models.py index 6b0b5541d7c5..5f5800c5a909 100644 --- a/ingestion/src/metadata/data_quality/api/models.py +++ b/ingestion/src/metadata/data_quality/api/models.py @@ -18,8 +18,13 @@ from typing import List, Optional +from pydantic import BaseModel, Field + from metadata.config.common import ConfigModel -from metadata.generated.schema.tests.testCase import TestCaseParameterValue +from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest +from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.tests.basic import TestCaseResult +from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue class TestCaseDefinition(ConfigModel): @@ -38,3 +43,27 @@ class TestSuiteProcessorConfig(ConfigModel): testCases: Optional[List[TestCaseDefinition]] = None forceUpdate: Optional[bool] = False + + +class TestCaseResultResponse(BaseModel): + testCaseResult: TestCaseResult + testCase: TestCase + + +class TableAndTests(BaseModel): + """Source response bringing together the table and test cases""" + + table: Table = Field(None, description="Table being processed by the DQ workflow") + service_type: str = Field(..., description="Service type the table belongs to") + test_cases: Optional[List[TestCase]] = Field( + None, description="Test Cases already existing in the Test Suite, if any" + ) + executable_test_suite: Optional[CreateTestSuiteRequest] = Field( + None, description="If no executable test suite is found, we'll create one" + ) + + +class TestCaseResults(BaseModel): + """Processor response with a list of computed Test Case Results""" + + test_results: Optional[List[TestCaseResultResponse]] diff --git a/ingestion/src/metadata/data_quality/api/workflow.py b/ingestion/src/metadata/data_quality/api/workflow.py deleted file mode 100644 index d9e1461f770f..000000000000 --- a/ingestion/src/metadata/data_quality/api/workflow.py +++ /dev/null @@ -1,494 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Workflow definition for the test suite -""" - -from __future__ import annotations - -import traceback -from copy import deepcopy -from logging import Logger -from typing import List, Optional, cast - -from pydantic import ValidationError - -from metadata.config.common import WorkflowExecutionError -from metadata.data_quality.api.models import ( - TestCaseDefinition, - TestSuiteProcessorConfig, -) -from metadata.data_quality.runner.test_suite_source_factory import ( - test_suite_source_factory, -) -from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest -from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest -from metadata.generated.schema.entity.data.table import Table -from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( - OpenMetadataConnection, -) -from metadata.generated.schema.entity.services.connections.serviceConnection import ( - ServiceConnection, -) -from metadata.generated.schema.entity.services.databaseService import DatabaseService -from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( - PipelineState, -) -from metadata.generated.schema.metadataIngestion.testSuitePipeline import ( - TestSuitePipeline, -) -from metadata.generated.schema.metadataIngestion.workflow import ( - OpenMetadataWorkflowConfig, -) -from metadata.generated.schema.tests.testCase import TestCase -from metadata.generated.schema.tests.testDefinition import TestDefinition, TestPlatform -from metadata.generated.schema.tests.testSuite import TestSuite -from metadata.generated.schema.type.basic import EntityLink, FullyQualifiedEntityName -from metadata.ingestion.api.models import StackTraceError -from metadata.ingestion.api.parser import parse_workflow_config_gracefully -from metadata.ingestion.api.processor import ProcessorStatus -from metadata.ingestion.ometa.client_utils import create_ometa_client -from metadata.utils import entity_link -from metadata.utils.fqn import split -from metadata.utils.importer import get_sink -from metadata.utils.logger import test_suite_logger -from metadata.workflow.workflow_output_handler import print_test_suite_status -from metadata.workflow.workflow_status_mixin import WorkflowStatusMixin - -logger: Logger = test_suite_logger() - - -class TestSuiteWorkflow(WorkflowStatusMixin): - """workflow to run the test suite""" - - def __init__(self, config: OpenMetadataWorkflowConfig): - """ - Instantiate test suite workflow class - - Args: - config: OM workflow configuration object - - Attributes: - config: OM workflow configuration object - """ - self.config = config - self.metadata_config: OpenMetadataConnection = ( - self.config.workflowConfig.openMetadataServerConfig - ) - self.metadata = create_ometa_client(self.metadata_config) - - self.source_config: TestSuitePipeline = self.config.source.sourceConfig.config - self.service: DatabaseService = self._retrieve_service() - self._retrieve_service_connection() - - self.processor_config: TestSuiteProcessorConfig = ( - TestSuiteProcessorConfig.parse_obj( - self.config.processor.dict().get("config") - ) - ) - - self.set_ingestion_pipeline_status(state=PipelineState.running) - - self.status = ProcessorStatus() - - self.table_entity: Optional[Table] = self._get_table_entity( - self.source_config.entityFullyQualifiedName.__root__ - ) - - if self.config.sink: - self.sink = get_sink( - sink_type=self.config.sink.type, - sink_config=self.config.sink, - metadata_config=self.metadata_config, - ) - - @classmethod - def create(cls, config_dict) -> TestSuiteWorkflow: - """ - Instantiate a TestSuiteWorkflow object form a yaml or json config file - - Args: - config_dict: json or yaml configuration file - Returns: - a test suite workflow - """ - try: - config = parse_workflow_config_gracefully(config_dict) - return cls(config) - except ValidationError as err: - logger.error( - f"Error trying to parse the Profiler Workflow configuration: {err}" - ) - raise err - - def _retrieve_service(self) -> DatabaseService: - """Get service object from source config `entityFullyQualifiedName`""" - fully_qualified_name = self.source_config.entityFullyQualifiedName.__root__ - try: - service_name = split(fully_qualified_name)[0] - except IndexError as exc: - logger.debug(traceback.format_exc()) - raise IndexError( - f"Could not retrieve service name from entity fully qualified name {fully_qualified_name}: {exc}" - ) - try: - service = self.metadata.get_by_name(DatabaseService, service_name) - if not service: - raise ConnectionError( - f"Could not retrieve service with name `{service_name}`. " - "Typically caused by the `entityFullyQualifiedName` does not exists in OpenMetadata " - "or the JWT Token is invalid." - ) - except ConnectionError as exc: - raise exc - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.error( - f"Error getting service connection for service name [{service_name}]" - f" using the secrets manager provider [{self.metadata.config.secretsManagerProvider}]: {exc}" - ) - return service - - def _get_table_entity(self, entity_fqn: str) -> Optional[Table]: - """given an entity fqn return the table entity - - Args: - entity_fqn: entity fqn for the test case - """ - return self.metadata.get_by_name( - entity=Table, - fqn=entity_fqn, - fields=["tableProfilerConfig", "testSuite"], - ) - - def create_or_return_test_suite_entity(self) -> Optional[TestSuite]: - """ - try to get test suite name from source.servicName. - In the UI workflow we'll write the entity name (i.e. the test suite) - to source.serviceName. - """ - self.table_entity = cast(Table, self.table_entity) # satisfy type checker - test_suite = self.table_entity.testSuite - if test_suite and not test_suite.executable: - logger.debug( - f"Test suite {test_suite.fullyQualifiedName.__root__} is not executable." - ) - return None - - if self.processor_config.testCases and not test_suite: - # This should cover scenarios where we are running the tests from the CLI workflow - # and no corresponding tests suite exist in the platform. We, therefore, will need - # to create the test suite first. - logger.debug( - "Test suite name not found in the platform. Creating the test suite from processor config." - ) - test_suite = self.metadata.create_or_update_executable_test_suite( - CreateTestSuiteRequest( - name=f"{self.source_config.entityFullyQualifiedName.__root__}.TestSuite", - displayName=f"{self.source_config.entityFullyQualifiedName.__root__} Test Suite", - description="Test Suite created from YAML processor config file", - owner=None, - executableEntityReference=self.source_config.entityFullyQualifiedName.__root__, - ) - ) - - return test_suite - - def get_test_cases_from_test_suite( - self, test_suite: TestSuite - ) -> Optional[List[TestCase]]: - """ - Get test cases from test suite name - - Args: - test_suite_name: the name of the test suite - """ - test_cases = self.metadata.list_entities( - entity=TestCase, - fields=["testSuite", "entityLink", "testDefinition"], - params={"testSuiteId": test_suite.id.__root__}, - ).entities - test_cases = cast(List[TestCase], test_cases) # satisfy type checker - if self.processor_config.testCases is not None: - cli_test_cases = self.get_test_case_from_cli_config() # type: ignore - cli_test_cases = cast( - List[TestCaseDefinition], cli_test_cases - ) # satisfy type checker - test_cases = self.compare_and_create_test_cases( - cli_test_cases, test_cases, test_suite - ) - - return test_cases - - def filter_for_om_test_cases(self, test_cases: List[TestCase]) -> List[TestCase]: - """ - Filter test cases for OM test cases only. This will prevent us from running non OM test cases - - Args: - test_cases: list of test cases - """ - om_test_cases: List[TestCase] = [] - for test_case in test_cases: - test_definition: TestDefinition = self.metadata.get_by_id( - TestDefinition, test_case.testDefinition.id - ) - if TestPlatform.OpenMetadata not in test_definition.testPlatforms: - logger.debug( - f"Test case {test_case.name.__root__} is not an OpenMetadata test case." - ) - continue - om_test_cases.append(test_case) - - return om_test_cases - - def get_test_case_from_cli_config( - self, - ) -> Optional[List[TestCaseDefinition]]: - """Get all the test cases names defined in the CLI config file""" - if self.processor_config.testCases is not None: - return list(self.processor_config.testCases) - return None - - def _update_test_cases( - self, test_cases_to_update: List[TestCaseDefinition], test_cases: List[TestCase] - ): - """Given a list of CLI test definition patch test cases in the platform - - Args: - test_cases_to_update (List[TestCaseDefinition]): list of test case definitions - """ - test_cases_to_update_names = { - test_case_to_update.name for test_case_to_update in test_cases_to_update - } - for indx, test_case in enumerate(deepcopy(test_cases)): - if test_case.name.__root__ in test_cases_to_update_names: - test_case_definition = next( - test_case_to_update - for test_case_to_update in test_cases_to_update - if test_case_to_update.name == test_case.name.__root__ - ) - updated_test_case = self.metadata.patch_test_case_definition( - source=test_case, - entity_link=entity_link.get_entity_link( - self.source_config.entityFullyQualifiedName.__root__, - test_case_definition.columnName, - ), - test_case_parameter_values=test_case_definition.parameterValues, - ) - if updated_test_case: - test_cases.pop(indx) - test_cases.append(updated_test_case) - - return test_cases - - def compare_and_create_test_cases( - self, - cli_test_cases_definitions: Optional[List[TestCaseDefinition]], - test_cases: List[TestCase], - test_suite: TestSuite, - ) -> Optional[List[TestCase]]: - """ - compare test cases defined in CLI config workflow with test cases - defined on the server - - Args: - cli_test_cases_definitions: test cases defined in CLI workflow associated with its test suite - test_cases: list of test cases entities fetch from the server using test suite names in the config file - """ - if not cli_test_cases_definitions: - return test_cases - test_cases = deepcopy(test_cases) - test_case_names = {test_case.name.__root__ for test_case in test_cases} - - # we'll check the test cases defined in the CLI config file and not present in the platform - test_cases_to_create = [ - cli_test_case_definition - for cli_test_case_definition in cli_test_cases_definitions - if cli_test_case_definition.name not in test_case_names - ] - - if self.processor_config and self.processor_config.forceUpdate: - test_cases_to_update = [ - cli_test_case_definition - for cli_test_case_definition in cli_test_cases_definitions - if cli_test_case_definition.name in test_case_names - ] - test_cases = self._update_test_cases(test_cases_to_update, test_cases) - - if not test_cases_to_create: - return test_cases - - for test_case_to_create in test_cases_to_create: - logger.debug(f"Creating test case with name {test_case_to_create.name}") - try: - test_case = self.metadata.create_or_update( - CreateTestCaseRequest( - name=test_case_to_create.name, - description=test_case_to_create.description, - displayName=test_case_to_create.displayName, - testDefinition=FullyQualifiedEntityName( - __root__=test_case_to_create.testDefinitionName - ), - entityLink=EntityLink( - __root__=entity_link.get_entity_link( - self.source_config.entityFullyQualifiedName.__root__, - test_case_to_create.columnName, - ) - ), - testSuite=test_suite.fullyQualifiedName.__root__, - parameterValues=list(test_case_to_create.parameterValues) - if test_case_to_create.parameterValues - else None, - owner=None, - ) - ) - test_cases.append(test_case) - except Exception as exc: - error = ( - f"Couldn't create test case name {test_case_to_create.name}: {exc}" - ) - logger.error(error) - logger.debug(traceback.format_exc()) - self.status.failed( - StackTraceError( - name=self.source_config.entityFullyQualifiedName.__root__, - error=error, - stack_trace=traceback.format_exc(), - ) - ) - - return test_cases - - def run_test_suite(self): - """Main logic to run the tests""" - if not self.table_entity: - logger.debug(traceback.format_exc()) - raise ValueError( - f"Could not retrieve table entity for {self.source_config.entityFullyQualifiedName.__root__}. " - "Make sure the table exists in OpenMetadata and/or the JWT Token provided is valid." - ) - - test_suite = self.create_or_return_test_suite_entity() - if not test_suite: - logger.debug( - f"No test suite found for table {self.source_config.entityFullyQualifiedName.__root__} " - "or test suite is not executable." - ) - return - - test_cases = self.get_test_cases_from_test_suite(test_suite) - if not test_cases: - logger.debug( - f"No test cases found for table {self.source_config.entityFullyQualifiedName.__root__}" - f"and test suite {test_suite.fullyQualifiedName.__root__}" - ) - return - - openmetadata_test_cases = self.filter_for_om_test_cases(test_cases) - - test_suite_runner = test_suite_source_factory.create( - self.service.serviceType.value.lower(), - self.config, - self.metadata, - self.table_entity, - ).get_data_quality_runner() - - for test_case in openmetadata_test_cases: - try: - test_result = test_suite_runner.run_and_handle(test_case) - if not test_result: - continue - if hasattr(self, "sink"): - self.sink.run(test_result) - logger.debug(f"Successfully ran test case {test_case.name.__root__}") - self.status.processed(test_case.fullyQualifiedName.__root__) - except Exception as exc: - error = f"Could not run test case {test_case.name.__root__}: {exc}" - logger.debug(traceback.format_exc()) - logger.error(error) - self.status.failed( - StackTraceError( - name=test_case.name.__root__, - error=error, - stack_trace=traceback.format_exc(), - ) - ) - - def _retrieve_service_connection(self) -> None: - """ - We override the current `serviceConnection` source config object if source workflow service already exists - in OM. When it is configured, we retrieve the service connection from the secrets' manager. Otherwise, we get it - from the service object itself through the default `SecretsManager`. - """ - if ( - not self.config.source.serviceConnection - and not self.metadata.config.forceEntityOverwriting - ): - self.config.source.serviceConnection = ServiceConnection( - __root__=self.service.connection - ) - - def execute(self): - """Execute test suite workflow""" - try: - self.run_test_suite() - # At the end of the `execute`, update the associated Ingestion Pipeline status as success - self.set_ingestion_pipeline_status(PipelineState.success) - - # Any unhandled exception breaking the workflow should update the status - except Exception as err: - logger.debug(traceback.format_exc()) - self.set_ingestion_pipeline_status(PipelineState.failed) - raise err - - def print_status(self) -> None: - """ - Print the workflow results with click - """ - print_test_suite_status(self) - - def result_status(self) -> int: - """ - Returns 1 if status is failed, 0 otherwise. - """ - if self.status.failures or ( - hasattr(self, "sink") and self.sink.get_status().failures - ): - return 1 - return 0 - - def _raise_from_status_internal(self, raise_warnings=False): - """ - Check source, processor and sink status and raise if needed - - Our profiler source will never log any failure, only filters, - as we are just picking up data from OM. - """ - - if self.status.failures: - raise WorkflowExecutionError("Processor reported errors", self.status) - if hasattr(self, "sink") and self.sink.get_status().failures: - raise WorkflowExecutionError("Sink reported errors", self.sink.get_status()) - - if raise_warnings: - if self.status.warnings: - raise WorkflowExecutionError("Processor reported warnings", self.status) - if hasattr(self, "sink") and self.sink.get_status().warnings: - raise WorkflowExecutionError( - "Sink reported warnings", self.sink.get_status() - ) - - def stop(self): - """ - Close all connections - """ - self.metadata.close() diff --git a/ingestion/src/metadata/data_quality/processor/test_case_runner.py b/ingestion/src/metadata/data_quality/processor/test_case_runner.py index 9af57ff5371e..2bb2bc97e01f 100644 --- a/ingestion/src/metadata/data_quality/processor/test_case_runner.py +++ b/ingestion/src/metadata/data_quality/processor/test_case_runner.py @@ -12,20 +12,35 @@ """ This Processor is in charge of executing the test cases """ -from typing import Optional +import traceback +from copy import deepcopy +from typing import List, Optional, cast -from metadata.data_quality.api.models import TestSuiteProcessorConfig -from metadata.data_quality.source.test_suite import TableAndTests -from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest -from metadata.generated.schema.entity.data.table import Table +from metadata.data_quality.api.models import ( + TableAndTests, + TestCaseDefinition, + TestCaseResultResponse, + TestCaseResults, + TestSuiteProcessorConfig, +) +from metadata.data_quality.runner.core import DataTestsRunner +from metadata.data_quality.runner.test_suite_source_factory import ( + test_suite_source_factory, +) +from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataWorkflowConfig, ) -from metadata.ingestion.api.models import Either +from metadata.generated.schema.tests.testCase import TestCase +from metadata.generated.schema.tests.testDefinition import TestDefinition, TestPlatform +from metadata.generated.schema.tests.testSuite import TestSuite +from metadata.generated.schema.type.basic import EntityLink, FullyQualifiedEntityName +from metadata.ingestion.api.models import Either, StackTraceError from metadata.ingestion.api.parser import parse_workflow_config_gracefully from metadata.ingestion.api.step import Step from metadata.ingestion.api.steps import Processor from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.utils import entity_link, fqn from metadata.utils.logger import test_suite_logger logger = test_suite_logger() @@ -34,8 +49,12 @@ class TestCaseRunner(Processor): """Execute the test suite tests and create test cases from the YAML config""" - def __init__(self, config: OpenMetadataWorkflowConfig): + def __init__(self, config: OpenMetadataWorkflowConfig, metadata: OpenMetadata): + + super().__init__() + self.config = config + self.metadata = metadata self.processor_config: TestSuiteProcessorConfig = ( TestSuiteProcessorConfig.parse_obj( @@ -50,13 +69,234 @@ def _run(self, record: TableAndTests) -> Either: # We pass the test suite request to the sink return Either(right=record.executable_test_suite) - ... + # Add the test cases from the YAML file, if any + test_cases = self.get_test_cases( + test_cases=record.test_cases, + test_suite_fqn=fqn.build( + None, + TestSuite, + table_fqn=record.table.fullyQualifiedName.__root__, + ), + table_fqn=record.table.fullyQualifiedName.__root__, + ) + + if not test_cases: + return Either( + left=StackTraceError( + name="No test Cases", + error=f"No tests cases found for table {record.table.fullyQualifiedName.__root__}", + ) + ) + + openmetadata_test_cases = self.filter_for_om_test_cases(test_cases) + + test_suite_runner = test_suite_source_factory.create( + record.service_type.lower(), + self.config, + self.metadata, + record.table, + ).get_data_quality_runner() + + test_results = [ + test_case_result + for test_case in openmetadata_test_cases + if (test_case_result := self._run_test_case(test_case, test_suite_runner)) + ] + + return Either(right=TestCaseResults(test_results=test_results)) + + def get_test_cases( + self, test_cases: Optional[List[TestCase]], test_suite_fqn: str, table_fqn: str + ) -> List[TestCase]: + """ + Based on the test suite test cases that we already know, pick up + the rest from the YAML config, compare and create the new ones + """ + if self.processor_config.testCases is not None: + cli_test_cases = self.get_test_case_from_cli_config() # type: ignore + cli_test_cases = cast( + List[TestCaseDefinition], cli_test_cases + ) # satisfy type checker + return self.compare_and_create_test_cases( + cli_test_cases_definitions=cli_test_cases, + test_cases=test_cases, + test_suite_fqn=test_suite_fqn, + table_fqn=table_fqn, + ) + + return test_cases + + def get_test_case_from_cli_config( + self, + ) -> Optional[List[TestCaseDefinition]]: + """Get all the test cases names defined in the CLI config file""" + if self.processor_config.testCases is not None: + return list(self.processor_config.testCases) + return None + + def compare_and_create_test_cases( + self, + cli_test_cases_definitions: Optional[List[TestCaseDefinition]], + test_cases: List[TestCase], + table_fqn: str, + test_suite_fqn: str, + ) -> List[TestCase]: + """ + compare test cases defined in CLI config workflow with test cases + defined on the server + Args: + cli_test_cases_definitions: test cases defined in CLI workflow associated with its test suite + test_cases: list of test cases entities fetch from the server using test suite names in the config file + table_fqn: table being tested + test_suite_fqn: FQN of the table + .testSuite + """ + if not cli_test_cases_definitions: + return test_cases + test_cases = deepcopy(test_cases) + test_case_names = {test_case.name.__root__ for test_case in test_cases} + + # we'll check the test cases defined in the CLI config file and not present in the platform + test_cases_to_create = [ + cli_test_case_definition + for cli_test_case_definition in cli_test_cases_definitions + if cli_test_case_definition.name not in test_case_names + ] + + if self.processor_config and self.processor_config.forceUpdate: + test_cases_to_update = [ + cli_test_case_definition + for cli_test_case_definition in cli_test_cases_definitions + if cli_test_case_definition.name in test_case_names + ] + test_cases = self._update_test_cases( + test_cases_to_update, test_cases, table_fqn + ) + + if not test_cases_to_create: + return test_cases + + for test_case_to_create in test_cases_to_create: + logger.debug(f"Creating test case with name {test_case_to_create.name}") + try: + test_case = self.metadata.create_or_update( + CreateTestCaseRequest( + name=test_case_to_create.name, + description=test_case_to_create.description, + displayName=test_case_to_create.displayName, + testDefinition=FullyQualifiedEntityName( + __root__=test_case_to_create.testDefinitionName + ), + entityLink=EntityLink( + __root__=entity_link.get_entity_link( + table_fqn, + test_case_to_create.columnName, + ) + ), + testSuite=test_suite_fqn, + parameterValues=list(test_case_to_create.parameterValues) + if test_case_to_create.parameterValues + else None, + owner=None, + ) + ) + test_cases.append(test_case) + except Exception as exc: + error = ( + f"Couldn't create test case name {test_case_to_create.name}: {exc}" + ) + logger.error(error) + logger.debug(traceback.format_exc()) + self.status.failed( + StackTraceError( + name=table_fqn, + error=error, + stack_trace=traceback.format_exc(), + ) + ) + + return test_cases + + def _update_test_cases( + self, + test_cases_to_update: List[TestCaseDefinition], + test_cases: List[TestCase], + table_fqn: str, + ): + """Given a list of CLI test definition patch test cases in the platform + + Args: + test_cases_to_update (List[TestCaseDefinition]): list of test case definitions + """ + test_cases_to_update_names = { + test_case_to_update.name for test_case_to_update in test_cases_to_update + } + for indx, test_case in enumerate(deepcopy(test_cases)): + if test_case.name.__root__ in test_cases_to_update_names: + test_case_definition = next( + test_case_to_update + for test_case_to_update in test_cases_to_update + if test_case_to_update.name == test_case.name.__root__ + ) + updated_test_case = self.metadata.patch_test_case_definition( + source=test_case, + entity_link=entity_link.get_entity_link( + table_fqn, + test_case_definition.columnName, + ), + test_case_parameter_values=test_case_definition.parameterValues, + ) + if updated_test_case: + test_cases.pop(indx) + test_cases.append(updated_test_case) + + return test_cases + + def filter_for_om_test_cases(self, test_cases: List[TestCase]) -> List[TestCase]: + """ + Filter test cases for OM test cases only. This will prevent us from running non OM test cases + + Args: + test_cases: list of test cases + """ + om_test_cases: List[TestCase] = [] + for test_case in test_cases: + test_definition: TestDefinition = self.metadata.get_by_id( + TestDefinition, test_case.testDefinition.id + ) + if TestPlatform.OpenMetadata not in test_definition.testPlatforms: + logger.debug( + f"Test case {test_case.name.__root__} is not an OpenMetadata test case." + ) + continue + om_test_cases.append(test_case) + + return om_test_cases + + def _run_test_case( + self, test_case: TestCase, test_suite_runner: DataTestsRunner + ) -> Optional[TestCaseResultResponse]: + """Execute the test case and return the result, if any""" + try: + test_result = test_suite_runner.run_and_handle(test_case) + self.status.scanned(test_case.fullyQualifiedName.__root__) + return test_result + except Exception as exc: + error = f"Could not run test case {test_case.name.__root__}: {exc}" + logger.debug(traceback.format_exc()) + logger.error(error) + self.status.failed( + StackTraceError( + name=test_case.name.__root__, + error=error, + stack_trace=traceback.format_exc(), + ) + ) @classmethod - def create(cls, config_dict: dict, _: OpenMetadata) -> "Step": + def create(cls, config_dict: dict, metadata: OpenMetadata) -> "Step": config = parse_workflow_config_gracefully(config_dict) - return cls(config=config) + return cls(config=config, metadata=metadata) def close(self) -> None: """Nothing to close""" diff --git a/ingestion/src/metadata/data_quality/runner/core.py b/ingestion/src/metadata/data_quality/runner/core.py index f31f6cb52b69..f53e885017a2 100644 --- a/ingestion/src/metadata/data_quality/runner/core.py +++ b/ingestion/src/metadata/data_quality/runner/core.py @@ -14,8 +14,8 @@ """ +from metadata.data_quality.api.models import TestCaseResultResponse from metadata.data_quality.interface.test_suite_interface import TestSuiteInterface -from metadata.data_quality.runner.models import TestCaseResultResponse from metadata.generated.schema.tests.testCase import TestCase from metadata.utils.logger import test_suite_logger @@ -26,15 +26,15 @@ class DataTestsRunner: """class to execute the test validation""" def __init__(self, test_runner_interface: TestSuiteInterface): - self.test_runner_interace = test_runner_interface + self.test_runner_interface = test_runner_interface def run_and_handle(self, test_case: TestCase): """run and handle test case validation""" logger.info( f"Executing test case {test_case.name.__root__} " - f"for entity {self.test_runner_interace.table_entity.fullyQualifiedName.__root__}" + f"for entity {self.test_runner_interface.table_entity.fullyQualifiedName.__root__}" ) - test_result = self.test_runner_interace.run_test_case( + test_result = self.test_runner_interface.run_test_case( test_case, ) diff --git a/ingestion/src/metadata/data_quality/runner/models.py b/ingestion/src/metadata/data_quality/runner/models.py deleted file mode 100644 index 3448b140155f..000000000000 --- a/ingestion/src/metadata/data_quality/runner/models.py +++ /dev/null @@ -1,24 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -test case result response object -""" - -from pydantic import BaseModel - -from metadata.generated.schema.tests.basic import TestCaseResult -from metadata.generated.schema.tests.testCase import TestCase - - -class TestCaseResultResponse(BaseModel): - testCaseResult: TestCaseResult - testCase: TestCase diff --git a/ingestion/src/metadata/data_quality/source/test_suite.py b/ingestion/src/metadata/data_quality/source/test_suite.py index 861d05006b66..7c9e2bda51af 100644 --- a/ingestion/src/metadata/data_quality/source/test_suite.py +++ b/ingestion/src/metadata/data_quality/source/test_suite.py @@ -17,9 +17,7 @@ import traceback from typing import Iterable, List, Optional, cast -from pydantic import BaseModel, Field - -from build.lib.metadata.ometa.fqn import split +from metadata.data_quality.api.models import TableAndTests from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.connections.serviceConnection import ( @@ -39,21 +37,13 @@ from metadata.ingestion.api.step import Step from metadata.ingestion.api.steps import Source from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.utils import fqn +from metadata.utils.fqn import split from metadata.utils.logger import test_suite_logger logger = test_suite_logger() -class TableAndTests(BaseModel): - """Source response bringing together the table and test cases""" - - table: Table = Field(None, description="Table being processed by the DQ workflow") - test_cases: Optional[List[TestCase]] = Field( - None, description="Test Cases already existing in the Test Suite, if any" - ) - executable_test_suite: Optional[CreateTestSuiteRequest] = Field(None, description="If no executable test suite is found, we'll create one") - - class TestSuiteSource(Source): """ Gets the ingredients required to run the tests @@ -121,7 +111,7 @@ def _retrieve_service_connection(self) -> None: __root__=self.service.connection ) - def _get_table_entity(self, entity_fqn: str) -> Optional[Table]: + def _get_table_entity(self) -> Optional[Table]: """given an entity fqn return the table entity Args: @@ -129,7 +119,7 @@ def _get_table_entity(self, entity_fqn: str) -> Optional[Table]: """ table: Table = self.metadata.get_by_name( entity=Table, - fqn=entity_fqn, + fqn=self.source_config.entityFullyQualifiedName.__root__, fields=["tableProfilerConfig", "testSuite"], ) @@ -158,9 +148,7 @@ def test_connection(self) -> None: self.metadata.health_check() def _iter(self) -> Iterable[Either[TableAndTests]]: - table: Table = self._get_table_entity( - self.source_config.entityFullyQualifiedName.__root__ - ) + table: Table = self._get_table_entity() if table: yield from self._process_table_suite(table) @@ -180,23 +168,26 @@ def _process_table_suite(self, table: Table) -> Iterable[Either[TableAndTests]]: """ # If there is no executable test suite yet for the table, we'll need to create one + executable_test_suite = None if not table.testSuite: - yield Either(right=TableAndTests( - executable_test_suite=CreateTestSuiteRequest( - name=f"{self.source_config.entityFullyQualifiedName.__root__}.testSuite", - displayName=f"{self.source_config.entityFullyQualifiedName.__root__} Test Suite", - description="Test Suite created from YAML processor config file", - owner=None, - executableEntityReference=self.source_config.entityFullyQualifiedName.__root__, - ) - )) + executable_test_suite = CreateTestSuiteRequest( + name=fqn.build( + None, + TestSuite, + table_fqn=self.source_config.entityFullyQualifiedName.__root__, + ), + displayName=f"{self.source_config.entityFullyQualifiedName.__root__} Test Suite", + description="Test Suite created from YAML processor config file", + owner=None, + executableEntityReference=self.source_config.entityFullyQualifiedName.__root__, + ) if table.testSuite and not table.testSuite.executable: yield Either( left=StackTraceError( name="Non-executable Test Suite", error=f"The table {self.source_config.entityFullyQualifiedName.__root__} " - "has a test suite that is not executable.", + "has a test suite that is not executable.", ) ) @@ -208,6 +199,8 @@ def _process_table_suite(self, table: Table) -> Iterable[Either[TableAndTests]]: right=TableAndTests( table=table, test_cases=test_suite_cases, + executable_test_suite=executable_test_suite, + service_type=self.service.serviceType.value, ) ) diff --git a/ingestion/src/metadata/ingestion/api/status.py b/ingestion/src/metadata/ingestion/api/status.py index 99b5493ec6c3..a5d5dbec47a3 100644 --- a/ingestion/src/metadata/ingestion/api/status.py +++ b/ingestion/src/metadata/ingestion/api/status.py @@ -37,9 +37,13 @@ class Status(BaseModel): def scanned(self, record: Any) -> None: """ - Clean up the status results we want to show + Clean up the status results we want to show. + + We allow to not consider specific records that + are not worth keeping record of. """ - self.records.append(get_log_name(record)) + if log_name := get_log_name(record): + self.records.append(log_name) def warning(self, key: str, reason: str) -> None: self.warnings.append({key: reason}) diff --git a/ingestion/src/metadata/ingestion/api/steps.py b/ingestion/src/metadata/ingestion/api/steps.py index 9c0023f28b6d..bf79e1c8d48a 100644 --- a/ingestion/src/metadata/ingestion/api/steps.py +++ b/ingestion/src/metadata/ingestion/api/steps.py @@ -14,7 +14,6 @@ from abc import ABC, abstractmethod from typing import Any -from metadata.ingestion.api.models import Either, Entity from metadata.ingestion.api.step import BulkStep, IterStep, ReturnStep, StageStep from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils.logger import ingestion_logger @@ -51,23 +50,10 @@ def test_connection(self) -> None: class Sink(ReturnStep, ABC): """All Sinks must inherit this base class.""" - @abstractmethod - def _run(self, record: Entity) -> Either: - """ - Send the data somewhere, e.g., the OM API - """ - class Processor(ReturnStep, ABC): """All Processor must inherit this base class""" - @abstractmethod - def _run(self, record: Entity) -> Either: - """ - Post process a given entity and return it - or a new one - """ - class Stage(StageStep, ABC): """All Stages must inherit this base class.""" diff --git a/ingestion/src/metadata/ingestion/models/custom_pydantic.py b/ingestion/src/metadata/ingestion/models/custom_pydantic.py index 728fa89d867f..7c69b286c54b 100644 --- a/ingestion/src/metadata/ingestion/models/custom_pydantic.py +++ b/ingestion/src/metadata/ingestion/models/custom_pydantic.py @@ -9,9 +9,13 @@ # See the License for the specific language governing permissions and # limitations under the License. """ -Pydantic classes overwritten defaults ones of code generation -""" +Pydantic classes overwritten defaults ones of code generation. +This classes are used in the generated module, which should have NO +dependencies against any other metadata package. This class should +be self-sufficient with only pydantic at import time. +""" +import logging import warnings from typing import Any, Dict @@ -19,9 +23,7 @@ from pydantic.utils import update_not_none from pydantic.validators import constr_length_validator, str_validator -from metadata.utils.logger import ingestion_logger - -logger = ingestion_logger() +logger = logging.getLogger("metadata") class CustomSecretStr(SecretStr): @@ -101,6 +103,6 @@ def get_secret_value(self, skip_secret_manager: bool = False) -> str: ) except Exception as exc: logger.error( - f"Secret value [{secret_id}] not present in the configured secrets manages: {exc}" + f"Secret value [{secret_id}] not present in the configured secrets manager: {exc}" ) return self._secret_value diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index bce198e02de5..27237bdc084e 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -22,7 +22,7 @@ from metadata.config.common import ConfigModel from metadata.data_insight.source.metadata import DataInsightRecord -from metadata.data_quality.runner.models import TestCaseResultResponse +from metadata.data_quality.api.models import TestCaseResultResponse, TestCaseResults from metadata.generated.schema.analytics.reportData import ReportData from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.api.teams.createRole import CreateRoleRequest @@ -516,6 +516,19 @@ def write_executable_test_suite( test_suite = self.metadata.create_or_update_executable_test_suite(record) return Either(right=test_suite) + @_run_dispatch.register + def write_test_case_results(self, record: TestCaseResults): + """Record the list of test case result responses""" + + for result in record.test_results or []: + self.metadata.add_test_case_results( + test_results=result.testCaseResult, + test_case_fqn=result.testCase.fullyQualifiedName.__root__, + ) + self.status.scanned(result) + + return Either(right=record) + def close(self): """ We don't have anything to close since we are using the given metadata client diff --git a/ingestion/src/metadata/utils/fqn.py b/ingestion/src/metadata/utils/fqn.py index aef4aa55a89c..304e13d1b08c 100644 --- a/ingestion/src/metadata/utils/fqn.py +++ b/ingestion/src/metadata/utils/fqn.py @@ -41,6 +41,7 @@ from metadata.generated.schema.entity.teams.team import Team from metadata.generated.schema.entity.teams.user import User from metadata.generated.schema.tests.testCase import TestCase +from metadata.generated.schema.tests.testSuite import TestSuite from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils.dispatch import class_register from metadata.utils.elasticsearch import get_entity_from_es_result @@ -81,12 +82,15 @@ def split(s: str) -> List[str]: return splitter.split() -def _build(*args) -> str: +def _build(*args, quote: bool = True) -> str: """ Equivalent of Java's FullyQualifiedName#build """ - quoted = [quote_name(name) for name in args] - return FQN_SEPARATOR.join(quoted) + if quote: + quoted = [quote_name(name) for name in args] + return FQN_SEPARATOR.join(quoted) + + return FQN_SEPARATOR.join(args) def unquote_name(name: str) -> str: @@ -255,6 +259,16 @@ def _( return _build(service_name, mlmodel_name) +@fqn_build_registry.add(TestSuite) +def _(_: Optional[OpenMetadata], *, table_fqn: str) -> str: + """ + We don't need to quote since this comes from a table FQN. + We're replicating the backend logic of the FQN generation in the TestSuiteRepository + for executable test suites. + """ + return _build(table_fqn, "testSuite", quote=False) + + @fqn_build_registry.add(Topic) def _( _: Optional[OpenMetadata], # ES Index not necessary for Topic FQN building diff --git a/ingestion/src/metadata/utils/logger.py b/ingestion/src/metadata/utils/logger.py index a32fa92006b4..59d7290ad368 100644 --- a/ingestion/src/metadata/utils/logger.py +++ b/ingestion/src/metadata/utils/logger.py @@ -18,6 +18,11 @@ from types import DynamicClassAttribute from typing import Optional, Union +from metadata.data_quality.api.models import ( + TableAndTests, + TestCaseResultResponse, + TestCaseResults, +) from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.ingestion.api.models import Entity from metadata.ingestion.models.delete_entity import DeleteEntity @@ -171,7 +176,7 @@ def log_ansi_encoded_string( @singledispatch -def get_log_name(record: Entity) -> str: +def get_log_name(record: Entity) -> Optional[str]: try: return f"{type(record).__name__} [{record.name.__root__}]" except Exception: @@ -225,3 +230,19 @@ def _(record: OMetaLifeCycleData) -> str: Capture the lifecycle changes of an Entity """ return f"{type(record.entity).__name__} Lifecycle [{record.entity.name.__root__}]" + + +@get_log_name.register +def _(record: TableAndTests) -> str: + return f"Tests for [{record.table.fullyQualifiedName.__root__}]" + + +@get_log_name.register +def _(_: TestCaseResults) -> Optional[str]: + """We don't want to log this in the status""" + return None + + +@get_log_name.register +def _(record: TestCaseResultResponse) -> str: + return record.testCase.fullyQualifiedName.__root__ diff --git a/ingestion/tests/integration/test_suite/test_e2e_workflow.py b/ingestion/tests/integration/test_suite/test_e2e_workflow.py index c9193dab6286..b252cddfeb6a 100644 --- a/ingestion/tests/integration/test_suite/test_e2e_workflow.py +++ b/ingestion/tests/integration/test_suite/test_e2e_workflow.py @@ -20,7 +20,6 @@ import sqlalchemy as sqa from sqlalchemy.orm import Session, declarative_base -from metadata.data_quality.api.workflow import TestSuiteWorkflow from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest from metadata.generated.schema.api.data.createDatabaseSchema import ( CreateDatabaseSchemaRequest, @@ -46,11 +45,12 @@ ) from metadata.generated.schema.tests.testCase import TestCase from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.workflow.data_quality import TestSuiteWorkflow test_suite_config = { "source": { "type": "custom-database", - "serviceName": "MyRabdomWorkflow", + "serviceName": "test_suite_service_test", "sourceConfig": { "config": { "type": "TestSuite", @@ -146,7 +146,7 @@ def setUpClass(cls): ) ) - table = cls.metadata.create_or_update( + cls.metadata.create_or_update( CreateTableRequest( name="users", columns=[ diff --git a/ingestion/tests/integration/test_suite/test_workflow.py b/ingestion/tests/integration/test_suite/test_workflow.py index 57806414f236..7139abcaf587 100644 --- a/ingestion/tests/integration/test_suite/test_workflow.py +++ b/ingestion/tests/integration/test_suite/test_workflow.py @@ -14,17 +14,19 @@ """ import unittest -from collections.abc import MutableSequence from copy import deepcopy +from typing import List -from metadata.data_quality.api.workflow import TestSuiteWorkflow +from metadata.data_quality.api.models import TableAndTests from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) from metadata.generated.schema.tests.testCase import TestCase from metadata.generated.schema.tests.testSuite import TestSuite +from metadata.ingestion.api.models import Either from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.workflow.data_quality import TestSuiteWorkflow sqlite_shared = "file:cachedb?mode=memory&cache=shared&check_same_thread=False" @@ -88,58 +90,51 @@ def tearDown(self) -> None: def test_create_workflow_object(self): """Test workflow object is correctly instantiated""" TestSuiteWorkflow.create(test_suite_config) - assert True - def test_create_workflow_object_from_cli_config(self): + def test_create_workflow_object_with_table_with_test_suite(self): """test workflow object is instantiated correctly from cli config""" - _test_suite_config = deepcopy(test_suite_config) + workflow = TestSuiteWorkflow.create(test_suite_config) - processor = { - "processor": { - "type": "orm-test-runner", - "config": { - "testCases": [ - { - "name": "my_test_case", - "testDefinitionName": "tableColumnCountToBeBetween", - "parameterValues": [ - {"name": "minColValue", "value": 1}, - {"name": "maxColValue", "value": 5}, - ], - } - ] - }, - } - } + table: Table = workflow.source._get_table_entity() - _test_suite_config.update(processor) + table_and_tests: TableAndTests = list( + workflow.source._process_table_suite(table=table) + )[0] - workflow = TestSuiteWorkflow.create(_test_suite_config) - workflow_test_suite = workflow.create_or_return_test_suite_entity() + # If the table already has a test suite, we won't be generating one + self.assertIsNotNone(table.testSuite) + self.assertIsNone(table_and_tests.right.executable_test_suite) - test_suite = self.metadata.get_by_name( - entity=TestSuite, - fqn="sample_data.ecommerce_db.shopify.dim_address.testSuite", - ) + # We will pick up the tests from it + # Note that this number comes from what is defined in the sample data + self.assertEquals(len(table_and_tests.right.test_cases), 5) - assert workflow_test_suite.id == test_suite.id - self.test_suite_ids = [test_suite.id] + def test_create_workflow_config_with_table_without_suite(self): + """We'll prepare the test suite creation payload""" - def test_create_or_return_test_suite_entity(self): - """test we can correctly retrieve a test suite""" _test_suite_config = deepcopy(test_suite_config) + _test_suite_config["source"]["sourceConfig"]["config"][ + "entityFullyQualifiedName" + ] = "sample_data.ecommerce_db.shopify.dim_staff" workflow = TestSuiteWorkflow.create(_test_suite_config) - test_suite = workflow.create_or_return_test_suite_entity() - expected_test_suite = self.metadata.get_by_name( - entity=TestSuite, fqn="critical_metrics_suite" + # If the table does not have a test suite, we'll prepare the request to create one + table: Table = workflow.source._get_table_entity() + + table_and_tests: Either[TableAndTests] = list( + workflow.source._process_table_suite(table=table) + )[0] + + self.assertIsNone(table.testSuite) + self.assertEquals( + table_and_tests.right.executable_test_suite.name.__root__, + "sample_data.ecommerce_db.shopify.dim_staff.testSuite", ) - assert test_suite + def test_create_workflow_config_with_tests(self): + """We'll get the tests from the workflow YAML""" - def test_get_test_cases_from_test_suite(self): - """test test cases are correctly returned for specific test suite""" _test_suite_config = deepcopy(test_suite_config) processor = { @@ -161,19 +156,34 @@ def test_get_test_cases_from_test_suite(self): } _test_suite_config.update(processor) - workflow = TestSuiteWorkflow.create(_test_suite_config) - test_suite = workflow.create_or_return_test_suite_entity() - test_cases = workflow.get_test_cases_from_test_suite(test_suite) - assert isinstance(test_cases, MutableSequence) - assert isinstance(test_cases[0], TestCase) - assert {"my_test_case"}.intersection( - {test_case.name.__root__ for test_case in test_cases} + table: Table = workflow.source._get_table_entity() + table_and_tests: Either[TableAndTests] = list( + workflow.source._process_table_suite(table=table) + )[0] + + test_cases: List[TestCase] = workflow.steps[0].get_test_cases( + test_cases=table_and_tests.right.test_cases, + test_suite_fqn="sample_data.ecommerce_db.shopify.dim_address.testSuite", + table_fqn="sample_data.ecommerce_db.shopify.dim_address", + ) + + # 5 defined test cases + the new one in the YAML + self.assertEquals(len(test_cases), 6) + + new_test_case = next( + (test for test in test_cases if test.name.__root__ == "my_test_case"), None ) + self.assertIsNotNone(new_test_case) - for test_case in test_cases: - self.metadata.delete(entity=TestCase, entity_id=test_case.id) + # cleanup + self.metadata.delete( + entity=TestCase, + entity_id=new_test_case.id, + recursive=True, + hard_delete=True, + ) def test_get_test_case_names_from_cli_config(self): """test we can get all test case names from cli config""" @@ -208,7 +218,7 @@ def test_get_test_case_names_from_cli_config(self): _test_suite_config.update(processor) workflow = TestSuiteWorkflow.create(_test_suite_config) - test_cases_def = workflow.get_test_case_from_cli_config() + test_cases_def = workflow.steps[0].get_test_case_from_cli_config() assert [test_case_def.name for test_case_def in test_cases_def] == [ "my_test_case", @@ -259,15 +269,17 @@ def test_compare_and_create_test_cases(self): fqn="sample_data.ecommerce_db.shopify.dim_address.address_id.my_test_case_two", ) - test_suite = workflow.create_or_return_test_suite_entity() - test_cases = self.metadata.list_entities( - entity=TestCase, - fields=["testSuite", "entityLink", "testDefinition"], - params={"testSuiteId": test_suite.id.__root__}, - ).entities - config_test_cases_def = workflow.get_test_case_from_cli_config() - created_test_case = workflow.compare_and_create_test_cases( - config_test_cases_def, test_cases, test_suite + table: Table = workflow.source._get_table_entity() + table_and_tests: Either[TableAndTests] = list( + workflow.source._process_table_suite(table=table) + )[0] + + config_test_cases_def = workflow.steps[0].get_test_case_from_cli_config() + created_test_case = workflow.steps[0].compare_and_create_test_cases( + cli_test_cases_definitions=config_test_cases_def, + test_cases=table_and_tests.right.test_cases, + test_suite_fqn="sample_data.ecommerce_db.shopify.dim_address.testSuite", + table_fqn="sample_data.ecommerce_db.shopify.dim_address", ) # clean up test @@ -285,28 +297,8 @@ def test_compare_and_create_test_cases(self): assert my_test_case assert my_test_case_two - assert len(created_test_case) == 2 + # We return the 5 sample data tests & the 2 new ones + assert len(created_test_case) == 7 self.metadata.delete(entity=TestCase, entity_id=my_test_case.id) self.metadata.delete(entity=TestCase, entity_id=my_test_case_two.id) - - def test_get_table_entity(self): - """test get service connection returns correct info""" - workflow = TestSuiteWorkflow.create(test_suite_config) - service_connection = workflow._get_table_entity( - "sample_data.ecommerce_db.shopify.dim_address" - ) - - assert isinstance(service_connection, Table) - - # def test_filter_for_om_test_cases(self): - # """test filter for OM test cases method""" - # om_test_case_1 = TestCase( - # name="om_test_case_1", - # testDefinition=self.metadata.get_entity_reference( - # TestDefinition, - # "columnValuesToMatchRegex" - # ), - # entityLink="", - # testSuite=self.metadata.get_entity_reference("sample_data.ecommerce_db.shopify.dim_address.TestSuite"), - # ) diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py index 0cc0c2d3b217..a1e022111599 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py @@ -17,7 +17,6 @@ from openmetadata_managed_apis.utils.logger import set_operator_logger from openmetadata_managed_apis.workflows.ingestion.common import build_dag, build_source -from metadata.data_quality.api.workflow import TestSuiteWorkflow from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( IngestionPipeline, ) @@ -29,6 +28,7 @@ WorkflowConfig, ) from metadata.ingestion.models.encoders import show_secrets_encoder +from metadata.workflow.data_quality import TestSuiteWorkflow from metadata.workflow.workflow_output_handler import print_test_suite_status diff --git a/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py b/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py index 0dc337c1bc03..cf9be2d7863e 100644 --- a/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py +++ b/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py @@ -33,7 +33,6 @@ build_usage_workflow_config, ) -from metadata.data_quality.api.workflow import TestSuiteWorkflow from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) @@ -69,6 +68,7 @@ from metadata.ingestion.api.parser import parse_workflow_config_gracefully from metadata.ingestion.models.encoders import show_secrets_encoder from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.workflow.data_quality import TestSuiteWorkflow from metadata.workflow.metadata import MetadataWorkflow from metadata.workflow.profiler import ProfilerWorkflow From e557dc3947f50a6812e6a7f660fcbfab079069d6 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Thu, 5 Oct 2023 16:55:46 +0200 Subject: [PATCH 05/12] Lint --- .../src/metadata/data_quality/source/test_suite.py | 10 ++++++++-- ingestion/src/metadata/ingestion/sink/metadata_rest.py | 2 +- ingestion/src/metadata/utils/logger.py | 5 ++++- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/ingestion/src/metadata/data_quality/source/test_suite.py b/ingestion/src/metadata/data_quality/source/test_suite.py index 7c9e2bda51af..8cf354e5ce52 100644 --- a/ingestion/src/metadata/data_quality/source/test_suite.py +++ b/ingestion/src/metadata/data_quality/source/test_suite.py @@ -157,8 +157,8 @@ def _iter(self) -> Iterable[Either[TableAndTests]]: yield Either( left=StackTraceError( name="Missing Table", - error=f"Could not retrieve table entity for {self.source_config.entityFullyQualifiedName.__root__}. " - "Make sure the table exists in OpenMetadata and/or the JWT Token provided is valid.", + error=f"Could not retrieve table entity for {self.source_config.entityFullyQualifiedName.__root__}." + " Make sure the table exists in OpenMetadata and/or the JWT Token provided is valid.", ) ) @@ -181,6 +181,12 @@ def _process_table_suite(self, table: Table) -> Iterable[Either[TableAndTests]]: owner=None, executableEntityReference=self.source_config.entityFullyQualifiedName.__root__, ) + yield Either( + right=TableAndTests( + executable_test_suite=executable_test_suite, + service_type=self.service.serviceType.value, + ) + ) if table.testSuite and not table.testSuite.executable: yield Either( diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index 27237bdc084e..bb1ba734bf66 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -517,7 +517,7 @@ def write_executable_test_suite( return Either(right=test_suite) @_run_dispatch.register - def write_test_case_results(self, record: TestCaseResults): + def write_test_case_result_list(self, record: TestCaseResults): """Record the list of test case result responses""" for result in record.test_results or []: diff --git a/ingestion/src/metadata/utils/logger.py b/ingestion/src/metadata/utils/logger.py index 59d7290ad368..30dac8abe3ba 100644 --- a/ingestion/src/metadata/utils/logger.py +++ b/ingestion/src/metadata/utils/logger.py @@ -234,7 +234,10 @@ def _(record: OMetaLifeCycleData) -> str: @get_log_name.register def _(record: TableAndTests) -> str: - return f"Tests for [{record.table.fullyQualifiedName.__root__}]" + if record.table: + return f"Tests for [{record.table.fullyQualifiedName.__root__}]" + else: + return f"Test Suite [{record.executable_test_suite.name.__root__}]" @get_log_name.register From 9e085cc5eb0e00fec39e07bf8e52ad744b189a33 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Thu, 5 Oct 2023 17:00:29 +0200 Subject: [PATCH 06/12] Fix source --- ingestion/src/metadata/data_quality/source/test_suite.py | 1 - 1 file changed, 1 deletion(-) diff --git a/ingestion/src/metadata/data_quality/source/test_suite.py b/ingestion/src/metadata/data_quality/source/test_suite.py index 8cf354e5ce52..4adba6acc129 100644 --- a/ingestion/src/metadata/data_quality/source/test_suite.py +++ b/ingestion/src/metadata/data_quality/source/test_suite.py @@ -205,7 +205,6 @@ def _process_table_suite(self, table: Table) -> Iterable[Either[TableAndTests]]: right=TableAndTests( table=table, test_cases=test_suite_cases, - executable_test_suite=executable_test_suite, service_type=self.service.serviceType.value, ) ) From 34a5f572ec96eff49e1f71944ece4fc91f0921af Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Thu, 5 Oct 2023 17:18:54 +0200 Subject: [PATCH 07/12] Fix source --- ingestion/src/metadata/utils/logger.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ingestion/src/metadata/utils/logger.py b/ingestion/src/metadata/utils/logger.py index 30dac8abe3ba..6ca065671694 100644 --- a/ingestion/src/metadata/utils/logger.py +++ b/ingestion/src/metadata/utils/logger.py @@ -236,8 +236,8 @@ def _(record: OMetaLifeCycleData) -> str: def _(record: TableAndTests) -> str: if record.table: return f"Tests for [{record.table.fullyQualifiedName.__root__}]" - else: - return f"Test Suite [{record.executable_test_suite.name.__root__}]" + + return f"Test Suite [{record.executable_test_suite.name.__root__}]" @get_log_name.register From f532686b62efe47eb0aab046d223b84b120eb367 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Thu, 5 Oct 2023 17:29:09 +0200 Subject: [PATCH 08/12] Fix source --- .../src/metadata/data_quality/processor/test_case_runner.py | 1 + 1 file changed, 1 insertion(+) diff --git a/ingestion/src/metadata/data_quality/processor/test_case_runner.py b/ingestion/src/metadata/data_quality/processor/test_case_runner.py index 2bb2bc97e01f..d9fd444434d4 100644 --- a/ingestion/src/metadata/data_quality/processor/test_case_runner.py +++ b/ingestion/src/metadata/data_quality/processor/test_case_runner.py @@ -292,6 +292,7 @@ def _run_test_case( stack_trace=traceback.format_exc(), ) ) + return None @classmethod def create(cls, config_dict: dict, metadata: OpenMetadata) -> "Step": From 2d4ccbe4c2d202e0dcf3c2feb56159c839dec295 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Thu, 5 Oct 2023 18:26:07 +0200 Subject: [PATCH 09/12] Fix source --- .../metadata/data_quality/processor/test_case_runner.py | 8 ++++++-- .../tests/integration/test_suite/test_e2e_workflow.py | 1 + 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/ingestion/src/metadata/data_quality/processor/test_case_runner.py b/ingestion/src/metadata/data_quality/processor/test_case_runner.py index d9fd444434d4..fd9deeab06b5 100644 --- a/ingestion/src/metadata/data_quality/processor/test_case_runner.py +++ b/ingestion/src/metadata/data_quality/processor/test_case_runner.py @@ -153,8 +153,12 @@ def compare_and_create_test_cases( """ if not cli_test_cases_definitions: return test_cases - test_cases = deepcopy(test_cases) - test_case_names = {test_case.name.__root__ for test_case in test_cases} + test_cases = deepcopy(test_cases) or [] + test_case_names = ( + {test_case.name.__root__ for test_case in test_cases} + if test_cases + else set() + ) # we'll check the test cases defined in the CLI config file and not present in the platform test_cases_to_create = [ diff --git a/ingestion/tests/integration/test_suite/test_e2e_workflow.py b/ingestion/tests/integration/test_suite/test_e2e_workflow.py index b252cddfeb6a..e1967eb730b8 100644 --- a/ingestion/tests/integration/test_suite/test_e2e_workflow.py +++ b/ingestion/tests/integration/test_suite/test_e2e_workflow.py @@ -214,6 +214,7 @@ def test_e2e_cli_workflow(self): """test cli workflow e2e""" workflow = TestSuiteWorkflow.create(test_suite_config) workflow.execute() + workflow.raise_from_status() test_case_1 = self.metadata.get_by_name( entity=TestCase, From 72cbf491f6550af79f8436336ac0d3c110956682 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Fri, 6 Oct 2023 15:26:30 +0200 Subject: [PATCH 10/12] Fix test --- .../integration/test_suite/test_workflow.py | 244 +++++++++++++----- 1 file changed, 176 insertions(+), 68 deletions(-) diff --git a/ingestion/tests/integration/test_suite/test_workflow.py b/ingestion/tests/integration/test_suite/test_workflow.py index 7139abcaf587..74f594878d98 100644 --- a/ingestion/tests/integration/test_suite/test_workflow.py +++ b/ingestion/tests/integration/test_suite/test_workflow.py @@ -14,16 +14,36 @@ """ import unittest -from copy import deepcopy +import uuid from typing import List from metadata.data_quality.api.models import TableAndTests -from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest +from metadata.generated.schema.api.data.createDatabaseSchema import ( + CreateDatabaseSchemaRequest, +) +from metadata.generated.schema.api.data.createTable import CreateTableRequest +from metadata.generated.schema.api.services.createDatabaseService import ( + CreateDatabaseServiceRequest, +) +from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest +from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest +from metadata.generated.schema.entity.data.table import Column, DataType, Table +from metadata.generated.schema.entity.services.connections.database.common.basicAuth import ( + BasicAuth, +) +from metadata.generated.schema.entity.services.connections.database.mysqlConnection import ( + MysqlConnection, +) from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) -from metadata.generated.schema.tests.testCase import TestCase -from metadata.generated.schema.tests.testSuite import TestSuite +from metadata.generated.schema.entity.services.databaseService import ( + DatabaseConnection, + DatabaseService, + DatabaseServiceType, +) +from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue from metadata.ingestion.api.models import Either from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.workflow.data_quality import TestSuiteWorkflow @@ -31,29 +51,30 @@ sqlite_shared = "file:cachedb?mode=memory&cache=shared&check_same_thread=False" -test_suite_config = { - "source": { - "type": "custom-database", - "serviceName": "sample_data", - "sourceConfig": { - "config": { - "type": "TestSuite", - "entityFullyQualifiedName": "sample_data.ecommerce_db.shopify.dim_address", +def get_test_suite_config(service_name: str, table_name: str) -> dict: + return { + "source": { + "type": "custom-database", + "serviceName": service_name, + "sourceConfig": { + "config": { + "type": "TestSuite", + "entityFullyQualifiedName": table_name, + } + }, + }, + "processor": {"type": "orm-test-runner", "config": {}}, + "sink": {"type": "metadata-rest", "config": {}}, + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "openmetadata", + "securityConfig": { + "jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" + }, } }, - }, - "processor": {"type": "orm-test-runner", "config": {}}, - "sink": {"type": "metadata-rest", "config": {}}, - "workflowConfig": { - "openMetadataServerConfig": { - "hostPort": "http://localhost:8585/api", - "authProvider": "openmetadata", - "securityConfig": { - "jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" - }, - } - }, -} + } class TestSuiteWorkflowTests(unittest.TestCase): @@ -61,39 +82,107 @@ class TestSuiteWorkflowTests(unittest.TestCase): metadata = OpenMetadata( OpenMetadataConnection.parse_obj( - test_suite_config["workflowConfig"]["openMetadataServerConfig"] + get_test_suite_config("", "")["workflowConfig"]["openMetadataServerConfig"] ) ) test_case_ids = [] test_suite_ids = [] - def tearDown(self) -> None: - for test_case_id in self.test_case_ids: - self.metadata.delete( - entity=TestCase, - entity_id=test_case_id, - recursive=True, - hard_delete=True, + @classmethod + def setUpClass(cls) -> None: + """Prepare ingredients""" + service = CreateDatabaseServiceRequest( + name=str(uuid.uuid4()), + serviceType=DatabaseServiceType.Mysql, + connection=DatabaseConnection( + config=MysqlConnection( + username="username", + authType=BasicAuth( + password="password", + ), + hostPort="http://localhost:1234", + ) + ), + ) + cls.service_entity: DatabaseService = cls.metadata.create_or_update( + data=service + ) + + create_db = CreateDatabaseRequest( + name=str(uuid.uuid4()), + service=cls.service_entity.fullyQualifiedName, + ) + + create_db_entity = cls.metadata.create_or_update(data=create_db) + + create_schema = CreateDatabaseSchemaRequest( + name=str(uuid.uuid4()), + database=create_db_entity.fullyQualifiedName, + ) + + cls.schema_entity = cls.metadata.create_or_update(data=create_schema) + + create_table = CreateTableRequest( + name=str(uuid.uuid4()), + databaseSchema=cls.schema_entity.fullyQualifiedName, + columns=[Column(name="id", dataType=DataType.BIGINT)], + ) + + cls.table_with_suite: Table = cls.metadata.create_or_update(create_table) + + cls.test_suite = cls.metadata.create_or_update_executable_test_suite( + data=CreateTestSuiteRequest( + name="test-suite", + executableEntityReference=cls.table_with_suite.fullyQualifiedName.__root__, ) - for test_suite_id in self.test_suite_ids: - self.metadata.delete_executable_test_suite( - entity=TestSuite, - entity_id=test_suite_id, - recursive=True, - hard_delete=True, + ) + + cls.metadata.create_or_update( + CreateTestCaseRequest( + name="testCaseForIntegration", + entityLink=f"<#E::table::{cls.table_with_suite.fullyQualifiedName.__root__}>", + testSuite=cls.test_suite.fullyQualifiedName, + testDefinition="tableRowCountToEqual", + parameterValues=[TestCaseParameterValue(name="value", value=10)], ) + ) - self.test_case_ids = [] - self.test_suite_ids = [] + create_table_2 = CreateTableRequest( + name=str(uuid.uuid4()), + databaseSchema=cls.schema_entity.fullyQualifiedName, + columns=[Column(name="id", dataType=DataType.BIGINT)], + ) + + cls.table: Table = cls.metadata.create_or_update(create_table_2) + + @classmethod + def tearDownClass(cls) -> None: + """Clean up""" + cls.metadata.delete( + entity=DatabaseService, + entity_id=cls.service_entity.id, + recursive=True, + hard_delete=True, + ) def test_create_workflow_object(self): """Test workflow object is correctly instantiated""" - TestSuiteWorkflow.create(test_suite_config) + TestSuiteWorkflow.create( + get_test_suite_config( + service_name=self.service_entity.name.__root__, + table_name=self.table_with_suite.fullyQualifiedName.__root__, + ) + ) def test_create_workflow_object_with_table_with_test_suite(self): """test workflow object is instantiated correctly from cli config""" - workflow = TestSuiteWorkflow.create(test_suite_config) + workflow = TestSuiteWorkflow.create( + get_test_suite_config( + service_name=self.service_entity.name.__root__, + table_name=self.table_with_suite.fullyQualifiedName.__root__, + ) + ) table: Table = workflow.source._get_table_entity() @@ -105,19 +194,28 @@ def test_create_workflow_object_with_table_with_test_suite(self): self.assertIsNotNone(table.testSuite) self.assertIsNone(table_and_tests.right.executable_test_suite) + self.assertTrue(len(table_and_tests.right.test_cases) >= 1) # We will pick up the tests from it - # Note that this number comes from what is defined in the sample data - self.assertEquals(len(table_and_tests.right.test_cases), 5) + self.assertTrue( + next( + ( + test + for test in table_and_tests.right.test_cases + if test.name.__root__ == "testCaseForIntegration" + ), + None, + ) + ) def test_create_workflow_config_with_table_without_suite(self): """We'll prepare the test suite creation payload""" - _test_suite_config = deepcopy(test_suite_config) - _test_suite_config["source"]["sourceConfig"]["config"][ - "entityFullyQualifiedName" - ] = "sample_data.ecommerce_db.shopify.dim_staff" - - workflow = TestSuiteWorkflow.create(_test_suite_config) + workflow = TestSuiteWorkflow.create( + get_test_suite_config( + service_name=self.service_entity.name.__root__, + table_name=self.table.fullyQualifiedName.__root__, + ) + ) # If the table does not have a test suite, we'll prepare the request to create one table: Table = workflow.source._get_table_entity() @@ -129,13 +227,16 @@ def test_create_workflow_config_with_table_without_suite(self): self.assertIsNone(table.testSuite) self.assertEquals( table_and_tests.right.executable_test_suite.name.__root__, - "sample_data.ecommerce_db.shopify.dim_staff.testSuite", + self.table.fullyQualifiedName.__root__ + ".testSuite", ) def test_create_workflow_config_with_tests(self): """We'll get the tests from the workflow YAML""" - _test_suite_config = deepcopy(test_suite_config) + _test_suite_config = get_test_suite_config( + service_name=self.service_entity.name.__root__, + table_name=self.table_with_suite.fullyQualifiedName.__root__, + ) processor = { "processor": { @@ -165,12 +266,13 @@ def test_create_workflow_config_with_tests(self): test_cases: List[TestCase] = workflow.steps[0].get_test_cases( test_cases=table_and_tests.right.test_cases, - test_suite_fqn="sample_data.ecommerce_db.shopify.dim_address.testSuite", - table_fqn="sample_data.ecommerce_db.shopify.dim_address", + test_suite_fqn=self.table_with_suite.fullyQualifiedName.__root__ + + ".testSuite", + table_fqn=self.table_with_suite.fullyQualifiedName.__root__, ) - # 5 defined test cases + the new one in the YAML - self.assertEquals(len(test_cases), 6) + # 1 defined test cases + the new one in the YAML + self.assertTrue(len(table_and_tests.right.test_cases) > 1) new_test_case = next( (test for test in test_cases if test.name.__root__ == "my_test_case"), None @@ -187,7 +289,10 @@ def test_create_workflow_config_with_tests(self): def test_get_test_case_names_from_cli_config(self): """test we can get all test case names from cli config""" - _test_suite_config = deepcopy(test_suite_config) + _test_suite_config = get_test_suite_config( + service_name=self.service_entity.name.__root__, + table_name=self.table_with_suite.fullyQualifiedName.__root__, + ) processor = { "processor": { @@ -227,7 +332,10 @@ def test_get_test_case_names_from_cli_config(self): def test_compare_and_create_test_cases(self): """Test function creates the correct test case if they don't exists""" - _test_suite_config = deepcopy(test_suite_config) + _test_suite_config = get_test_suite_config( + service_name=self.service_entity.name.__root__, + table_name=self.table_with_suite.fullyQualifiedName.__root__, + ) processor = { "processor": { @@ -245,7 +353,7 @@ def test_compare_and_create_test_cases(self): { "name": "my_test_case_two", "testDefinitionName": "columnValuesToBeBetween", - "columnName": "address_id", + "columnName": "id", "parameterValues": [ {"name": "minValue", "value": 1}, {"name": "maxValue", "value": 5}, @@ -261,12 +369,12 @@ def test_compare_and_create_test_cases(self): assert not self.metadata.get_by_name( entity=TestCase, - fqn="sample_data.ecommerce_db.shopify.dim_address.my_test_case", + fqn=f"{self.table_with_suite.fullyQualifiedName.__root__}.my_test_case", ) assert not self.metadata.get_by_name( entity=TestCase, - fqn="sample_data.ecommerce_db.shopify.dim_address.address_id.my_test_case_two", + fqn=f"{self.table_with_suite.fullyQualifiedName.__root__}.my_test_case_two", ) table: Table = workflow.source._get_table_entity() @@ -278,27 +386,27 @@ def test_compare_and_create_test_cases(self): created_test_case = workflow.steps[0].compare_and_create_test_cases( cli_test_cases_definitions=config_test_cases_def, test_cases=table_and_tests.right.test_cases, - test_suite_fqn="sample_data.ecommerce_db.shopify.dim_address.testSuite", - table_fqn="sample_data.ecommerce_db.shopify.dim_address", + test_suite_fqn=f"{self.table_with_suite.fullyQualifiedName.__root__}.testSuite", + table_fqn=self.table_with_suite.fullyQualifiedName.__root__, ) # clean up test my_test_case = self.metadata.get_by_name( entity=TestCase, - fqn="sample_data.ecommerce_db.shopify.dim_address.my_test_case", + fqn=f"{self.table_with_suite.fullyQualifiedName.__root__}.my_test_case", fields=["testDefinition", "testSuite"], ) my_test_case_two = self.metadata.get_by_name( entity=TestCase, - fqn="sample_data.ecommerce_db.shopify.dim_address.address_id.my_test_case_two", + fqn=f"{self.table_with_suite.fullyQualifiedName.__root__}.id.my_test_case_two", fields=["testDefinition", "testSuite"], ) assert my_test_case assert my_test_case_two - # We return the 5 sample data tests & the 2 new ones - assert len(created_test_case) == 7 + # We return the existing test & the 2 new ones + assert len(created_test_case) >= 3 self.metadata.delete(entity=TestCase, entity_id=my_test_case.id) self.metadata.delete(entity=TestCase, entity_id=my_test_case_two.id) From 720793f36ecb8cfe395a4bb5f31c45790cca3d4f Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Fri, 6 Oct 2023 15:41:11 +0200 Subject: [PATCH 11/12] Fix test --- ingestion/src/metadata/workflow/data_quality.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ingestion/src/metadata/workflow/data_quality.py b/ingestion/src/metadata/workflow/data_quality.py index b0883c9e066f..099ab305b878 100644 --- a/ingestion/src/metadata/workflow/data_quality.py +++ b/ingestion/src/metadata/workflow/data_quality.py @@ -13,8 +13,7 @@ """ from metadata.data_quality.processor.test_case_runner import TestCaseRunner from metadata.data_quality.source.test_suite import TestSuiteSource -from metadata.ingestion.api.processor import Processor -from metadata.ingestion.api.sink import Sink +from metadata.ingestion.api.steps import Processor, Sink from metadata.utils.importer import import_sink_class from metadata.utils.logger import test_suite_logger from metadata.workflow.base import BaseWorkflow From fa9927f2d300712ccb3088dc9f53cf3868661411 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Fri, 6 Oct 2023 16:16:14 +0200 Subject: [PATCH 12/12] Fix test --- ingestion/tests/integration/test_suite/test_workflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingestion/tests/integration/test_suite/test_workflow.py b/ingestion/tests/integration/test_suite/test_workflow.py index 74f594878d98..79cad960e69c 100644 --- a/ingestion/tests/integration/test_suite/test_workflow.py +++ b/ingestion/tests/integration/test_suite/test_workflow.py @@ -272,7 +272,7 @@ def test_create_workflow_config_with_tests(self): ) # 1 defined test cases + the new one in the YAML - self.assertTrue(len(table_and_tests.right.test_cases) > 1) + self.assertTrue(len(table_and_tests.right.test_cases) >= 1) new_test_case = next( (test for test in test_cases if test.name.__root__ == "my_test_case"), None