diff --git a/ingestion/setup.py b/ingestion/setup.py index 1e5eb8d3a11e..88ee7e3e94b3 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -364,6 +364,7 @@ VERSIONS["giturlparse"], VERSIONS["avro"], # Sample Data VERSIONS["grpc-tools"], + VERSIONS["neo4j"], "testcontainers==3.7.1;python_version<'3.9'", "testcontainers==4.4.0;python_version>='3.9'", "minio==7.2.5", @@ -375,6 +376,12 @@ "requests==2.31.0", f"{DATA_DIFF['mysql']}==0.11.2", *plugins["deltalake"], + *plugins["datalake-gcs"], + *plugins["pgspider"], + *plugins["clickhouse"], + *plugins["mssql"], + *plugins["dagster"], + *plugins["oracle"], } e2e_test = { diff --git a/ingestion/src/metadata/data_quality/builders/i_validator_builder.py b/ingestion/src/metadata/data_quality/builders/i_validator_builder.py new file mode 100644 index 000000000000..22466873ce03 --- /dev/null +++ b/ingestion/src/metadata/data_quality/builders/i_validator_builder.py @@ -0,0 +1,100 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Builder interface defining the structure of builders for validators. +Validators are test classes (e.g. columnValuesToBeBetween, etc.) +""" + +from abc import ABC, abstractmethod +from datetime import datetime +from typing import TYPE_CHECKING, Optional, Type, Union + +from metadata.data_quality.validations.base_test_handler import BaseTestValidator +from metadata.data_quality.validations.runtime_param_setter.param_setter import ( + RuntimeParameterSetter, +) +from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue +from metadata.profiler.processor.runner import QueryRunner +from metadata.utils.importer import import_test_case_class + +if TYPE_CHECKING: + from pandas import DataFrame + + +class IValidatorBuilder(ABC): + """Interface for validator builders""" + + @property + def test_case(self): + """Return the test case object""" + return self._test_case + + @property + def validator(self): + """Return the validator object""" + return self._validator + + def __init__( + self, + runner: Union[QueryRunner, "DataFrame"], + test_case: TestCase, + entity_type: str, + ) -> None: + """Builder object for SQA validators. This builder is used to create a validator object + + Args: + runner (QueryRunner): The runner object + test_case (TestCase): The test case object + entity_type (str): one of COLUMN or TABLE -- fetched from the test definition + """ + self._test_case = test_case + self.runner = runner + self.validator_cls: Type[BaseTestValidator] = import_test_case_class( + entity_type, + self._get_source_type(), + self.test_case.testDefinition.fullyQualifiedName, # type: ignore + ) + self.reset() + + def set_runtime_params( + self, runtime_params_setter: Optional[RuntimeParameterSetter] + ): + """Set the runtime parameters for the validator object + + # TODO: We should support setting n runtime parameters + + Args: + runtime_params_setter (Optional[RuntimeParameterSetter]): The runtime parameter setter + """ + if runtime_params_setter: + params = runtime_params_setter.get_parameters(self.test_case) + if not self.test_case.parameterValues: + # If there are no parameters, create a new list + self.test_case.parameterValues = [] + self.test_case.parameterValues.append( + TestCaseParameterValue( + name="runtimeParams", value=params.model_dump_json() + ) + ) + + def reset(self): + """Reset the builder""" + self._validator = self.validator_cls( + self.runner, + test_case=self.test_case, + execution_date=int(datetime.now().timestamp() * 1000), + ) + + @abstractmethod + def _get_source_type(self): + """Get the source type""" + raise NotImplementedError diff --git a/ingestion/src/metadata/data_quality/builders/pandas_validator_builder.py b/ingestion/src/metadata/data_quality/builders/pandas_validator_builder.py new file mode 100644 index 000000000000..299ec894b368 --- /dev/null +++ b/ingestion/src/metadata/data_quality/builders/pandas_validator_builder.py @@ -0,0 +1,29 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Builder defining the structure of builders for validators for Pandas sources +""" + +from typing import TYPE_CHECKING + +from metadata.data_quality.builders.i_validator_builder import IValidatorBuilder + +if TYPE_CHECKING: + pass + + +class PandasValidatorBuilder(IValidatorBuilder): + """Builder for Pandas validators""" + + def _get_source_type(self) -> str: + """Return the test case""" + return "pandas" diff --git a/ingestion/src/metadata/data_quality/builders/sqa_validator_builder.py b/ingestion/src/metadata/data_quality/builders/sqa_validator_builder.py new file mode 100644 index 000000000000..bf629517ee57 --- /dev/null +++ b/ingestion/src/metadata/data_quality/builders/sqa_validator_builder.py @@ -0,0 +1,25 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Builder defining the structure of builders for validators for SQA sources +""" + + +from metadata.data_quality.builders.i_validator_builder import IValidatorBuilder + + +class SQAValidatorBuilder(IValidatorBuilder): + """Builder for SQA validators""" + + def _get_source_type(self) -> str: + """Return the test case""" + return "sqlalchemy" diff --git a/ingestion/src/metadata/data_quality/interface/pandas/pandas_test_suite_interface.py b/ingestion/src/metadata/data_quality/interface/pandas/pandas_test_suite_interface.py index f0dadcd20c79..2546242e6bd5 100644 --- a/ingestion/src/metadata/data_quality/interface/pandas/pandas_test_suite_interface.py +++ b/ingestion/src/metadata/data_quality/interface/pandas/pandas_test_suite_interface.py @@ -13,22 +13,20 @@ Interfaces with database for all database engine supporting sqlalchemy abstraction layer """ -from datetime import datetime -from typing import Optional, Type +from metadata.data_quality.builders.i_validator_builder import IValidatorBuilder +from metadata.data_quality.builders.pandas_validator_builder import ( + PandasValidatorBuilder, +) from metadata.data_quality.interface.test_suite_interface import TestSuiteInterface -from metadata.data_quality.validations.base_test_handler import BaseTestValidator from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.connections.database.datalakeConnection import ( DatalakeConnection, ) -from metadata.generated.schema.tests.basic import TestCaseResult from metadata.generated.schema.tests.testCase import TestCase -from metadata.generated.schema.tests.testDefinition import TestDefinition from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection from metadata.mixins.pandas.pandas_mixin import PandasInterfaceMixin -from metadata.utils.importer import import_test_case_class from metadata.utils.logger import test_suite_logger logger = test_suite_logger() @@ -69,40 +67,7 @@ def __init__( if self.dfs and self.table_partition_config: self.dfs = self.get_partitioned_df(self.dfs) - def run_test_case( - self, - test_case: TestCase, - ) -> Optional[TestCaseResult]: - """Run table tests where platformsTest=OpenMetadata - - Args: - test_case: test case object to execute - - Returns: - TestCaseResult object - """ - - try: - TestHandler: Type[ # pylint: disable=invalid-name - BaseTestValidator - ] = import_test_case_class( - self.ometa_client.get_by_id( - TestDefinition, test_case.testDefinition.id - ).entityType.value, - "pandas", - test_case.testDefinition.fullyQualifiedName, - ) - - test_handler = TestHandler( - self.dfs, - test_case=test_case, - execution_date=int(datetime.now().timestamp() * 1000), - ) - - return test_handler.run_validation() - except Exception as err: - logger.error( - f"Error executing {test_case.testDefinition.fullyQualifiedName} - {err}" - ) - - raise RuntimeError(err) + def _get_validator_builder( + self, test_case: TestCase, entity_type: str + ) -> IValidatorBuilder: + return PandasValidatorBuilder(self.dfs, test_case, entity_type) diff --git a/ingestion/src/metadata/data_quality/interface/sqlalchemy/sqa_test_suite_interface.py b/ingestion/src/metadata/data_quality/interface/sqlalchemy/sqa_test_suite_interface.py index 3e4111dfdba4..cd2d79b48e84 100644 --- a/ingestion/src/metadata/data_quality/interface/sqlalchemy/sqa_test_suite_interface.py +++ b/ingestion/src/metadata/data_quality/interface/sqlalchemy/sqa_test_suite_interface.py @@ -14,25 +14,23 @@ supporting sqlalchemy abstraction layer """ -from datetime import datetime -from typing import Optional, Union +from typing import Union from sqlalchemy.orm import DeclarativeMeta from sqlalchemy.orm.util import AliasedClass +from metadata.data_quality.builders.i_validator_builder import IValidatorBuilder +from metadata.data_quality.builders.sqa_validator_builder import SQAValidatorBuilder from metadata.data_quality.interface.test_suite_interface import TestSuiteInterface from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.databaseService import DatabaseConnection -from metadata.generated.schema.tests.basic import TestCaseResult -from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue -from metadata.generated.schema.tests.testDefinition import TestDefinition +from metadata.generated.schema.tests.testCase import TestCase from metadata.ingestion.connections.session import create_and_bind_session from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.mixins.sqalchemy.sqa_mixin import SQAInterfaceMixin from metadata.profiler.processor.runner import QueryRunner from metadata.profiler.processor.sampler.sqlalchemy.sampler import SQASampler from metadata.utils.constants import TEN_MIN -from metadata.utils.importer import import_test_case_class from metadata.utils.logger import test_suite_logger from metadata.utils.ssl_manager import get_ssl_connection from metadata.utils.timeout import cls_timeout @@ -143,50 +141,7 @@ def _create_runner(self) -> None: ) ) - def run_test_case( - self, - test_case: TestCase, - ) -> Optional[TestCaseResult]: - """Run table tests where platformsTest=OpenMetadata - - Args: - test_case: test case object to execute - - Returns: - TestCaseResult object - """ - - try: - TestHandler = import_test_case_class( # pylint: disable=invalid-name - self.ometa_client.get_by_id( - TestDefinition, test_case.testDefinition.id - ).entityType.value, - "sqlalchemy", - test_case.testDefinition.fullyQualifiedName, - ) - - if TestHandler.runtime_parameter_setter: - setter = TestHandler.runtime_parameter_setter( - self.ometa_client, - self.service_connection_config, - self.table_entity, - self.sampler, - ) - runtime_params = setter.get_parameters(test_case) - test_case.parameterValues.append( - TestCaseParameterValue( - name="runtimeParams", value=runtime_params.model_dump_json() - ) - ) - - test_handler = TestHandler( - self.runner, - test_case=test_case, - execution_date=int(datetime.now().timestamp() * 1000), - ) - return test_handler.run_validation() - except Exception as err: - logger.error( - f"Error executing {test_case.testDefinition.fullyQualifiedName} - {err}" - ) - raise RuntimeError(err) + def _get_validator_builder( + self, test_case: TestCase, entity_type: str + ) -> IValidatorBuilder: + return SQAValidatorBuilder(self.runner, test_case, entity_type) diff --git a/ingestion/src/metadata/data_quality/interface/test_suite_interface.py b/ingestion/src/metadata/data_quality/interface/test_suite_interface.py index 84ca6e5ca23b..f5bfc9f1df67 100644 --- a/ingestion/src/metadata/data_quality/interface/test_suite_interface.py +++ b/ingestion/src/metadata/data_quality/interface/test_suite_interface.py @@ -15,20 +15,34 @@ """ from abc import ABC, abstractmethod -from typing import Optional +from typing import Optional, Type +from metadata.data_quality.builders.i_validator_builder import IValidatorBuilder +from metadata.data_quality.validations.base_test_handler import BaseTestValidator +from metadata.data_quality.validations.runtime_param_setter.param_setter import ( + RuntimeParameterSetter, +) +from metadata.data_quality.validations.runtime_param_setter.param_setter_factory import ( + RuntimeParameterSetterFactory, +) from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.databaseService import DatabaseConnection from metadata.generated.schema.tests.basic import TestCaseResult from metadata.generated.schema.tests.testCase import TestCase +from metadata.generated.schema.tests.testDefinition import TestDefinition from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.profiler.api.models import ProfileSampleConfig +from metadata.utils.logger import test_suite_logger from metadata.utils.partition import get_partition_details +logger = test_suite_logger() + class TestSuiteInterface(ABC): """Abstract interface for the processor""" + runtime_params_setter_fact = RuntimeParameterSetterFactory + @abstractmethod def __init__( self, @@ -41,10 +55,78 @@ def __init__( self.service_connection_config = service_connection_config self.table_entity = table_entity + @property + def sampler(self): + """Get the sampler object + + Note: Overriden in the implementation class. This should be removed from the interface. It has been + implemented as the RuntimeParameterSetter takes the sampler as an argument, though we may want to + remove that dependency. + """ + return None + @abstractmethod + def _get_validator_builder( + self, test_case: TestCase, entity_type: str + ) -> IValidatorBuilder: + """get the builder class for the validator. Define this in the implementation class + + Args: + test_case (TestCase): test case object + entity_type (str): type of the entity + + Returns: + IValidatorBuilder: a validator builder + """ + raise NotImplementedError + + @classmethod + def _get_runtime_params_setter_fact(cls) -> RuntimeParameterSetterFactory: + """Get the runtime parameter setter factory.""" + return cls.runtime_params_setter_fact() + + @classmethod + def _set_runtime_params_setter_fact( + cls, class_fact: Type[RuntimeParameterSetterFactory] + ): + """Set the runtime parameter setter factory. + Use this method to set the runtime parameter setter factory and override the default. + + Args: + class_fact (Type[RuntimeParameterSetterFactory]): the runtime parameter setter factory class + """ + cls.runtime_params_setter_fact = class_fact + def run_test_case(self, test_case: TestCase) -> Optional[TestCaseResult]: """run column data quality tests""" - raise NotImplementedError + runtime_params_setter_fact: RuntimeParameterSetterFactory = ( + self._get_runtime_params_setter_fact() + ) # type: ignore + runtime_params_setter: Optional[ + RuntimeParameterSetter + ] = runtime_params_setter_fact.get_runtime_param_setter( + test_case.testDefinition.fullyQualifiedName, # type: ignore + self.ometa_client, + self.service_connection_config, + self.table_entity, + self.sampler, + ) + + # get `column` or `table` type for validator import + entity_type: str = self.ometa_client.get_by_id( + TestDefinition, test_case.testDefinition.id + ).entityType.value + + validator_builder = self._get_validator_builder(test_case, entity_type) + validator_builder.set_runtime_params(runtime_params_setter) + validator: BaseTestValidator = validator_builder.validator + try: + return validator.run_validation() + except Exception as err: + logger.error( + f"Error executing {test_case.testDefinition.fullyQualifiedName} - {err}" + ) + raise RuntimeError(err) def _get_sample_query(self) -> Optional[str]: """Get the sampling query for the data quality tests diff --git a/ingestion/src/metadata/data_quality/validations/base_test_handler.py b/ingestion/src/metadata/data_quality/validations/base_test_handler.py index acffa7226383..be93fdc3832b 100644 --- a/ingestion/src/metadata/data_quality/validations/base_test_handler.py +++ b/ingestion/src/metadata/data_quality/validations/base_test_handler.py @@ -18,7 +18,7 @@ import reprlib from abc import ABC, abstractmethod from datetime import datetime -from typing import Callable, List, Optional, Type, TypeVar, Union +from typing import TYPE_CHECKING, Callable, List, Optional, Type, TypeVar, Union from metadata.data_quality.validations.runtime_param_setter.param_setter import ( RuntimeParameterSetter, @@ -31,6 +31,9 @@ from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue from metadata.profiler.processor.runner import QueryRunner +if TYPE_CHECKING: + from pandas import DataFrame + T = TypeVar("T", bound=Callable) R = TypeVar("R") @@ -45,7 +48,7 @@ class BaseTestValidator(ABC): def __init__( self, - runner: QueryRunner, + runner: Union[QueryRunner, List["DataFrame"]], test_case: TestCase, execution_date: Union[datetime, float], ) -> None: diff --git a/ingestion/src/metadata/data_quality/validations/runtime_param_setter/param_setter_factory.py b/ingestion/src/metadata/data_quality/validations/runtime_param_setter/param_setter_factory.py new file mode 100644 index 000000000000..15653fee4427 --- /dev/null +++ b/ingestion/src/metadata/data_quality/validations/runtime_param_setter/param_setter_factory.py @@ -0,0 +1,54 @@ +# Copyright 2024 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Module that defines the RuntimeParameterFactory class. +This class is responsible for creating instances of the RuntimeParameterSetter +based on the test case. +""" + +from typing import Optional + +from metadata.data_quality.validations.runtime_param_setter.param_setter import ( + RuntimeParameterSetter, +) +from metadata.data_quality.validations.runtime_param_setter.table_diff_params_setter import ( + TableDiffParamsSetter, +) +from metadata.ingestion.ometa.ometa_api import OpenMetadata + + +class RuntimeParameterSetterFactory: + """runtime parameter setter factory class""" + + def __init__(self) -> None: + """Set""" + self._setter_map = { + TableDiffParamsSetter: {"tableDiff"}, + } + + def get_runtime_param_setter( + self, + name: str, + ometa: OpenMetadata, + service_connection_config, + table_entity, + sampler, + ) -> Optional[RuntimeParameterSetter]: + """Get the runtime parameter setter""" + for setter_cls, validator_names in self._setter_map.items(): + if name in validator_names: + return setter_cls( + ometa, + service_connection_config, + table_entity, + sampler, + ) + return None diff --git a/ingestion/src/metadata/data_quality/validations/table/sqlalchemy/tableDiff.py b/ingestion/src/metadata/data_quality/validations/table/sqlalchemy/tableDiff.py index e157a1487544..53ca1b28f705 100644 --- a/ingestion/src/metadata/data_quality/validations/table/sqlalchemy/tableDiff.py +++ b/ingestion/src/metadata/data_quality/validations/table/sqlalchemy/tableDiff.py @@ -26,9 +26,6 @@ SQAValidatorMixin, ) from metadata.data_quality.validations.models import TableDiffRuntimeParameters -from metadata.data_quality.validations.runtime_param_setter.table_diff_params_setter import ( - TableDiffParamsSetter, -) from metadata.generated.schema.entity.data.table import Column from metadata.generated.schema.entity.services.connections.database.sapHanaConnection import ( SapHanaScheme, @@ -67,13 +64,10 @@ class TableDiffValidator(BaseTestValidator, SQAValidatorMixin): Compare two tables and fail if the number of differences exceeds a threshold """ - runtime_parameter_setter = TableDiffParamsSetter - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.runtime_params: TableDiffRuntimeParameters = self.get_runtime_params() + runtime_params: TableDiffRuntimeParameters def run_validation(self) -> TestCaseResult: + self.runtime_params = self.get_runtime_params() try: self._validate_dialects() return self._run()