Skip to content

Commit

Permalink
MINOR - enable dynamic assertion dl (#17008)
Browse files Browse the repository at this point in the history
* fix: refactor runtime param setter + add dynamic assertion support for datalake

* chore: add missing test dependencies

* fix: centralize objecxt constructor in interface

* fix: remove abstract decorator in interface
  • Loading branch information
TeddyCr authored Jul 16, 2024
1 parent 87fea77 commit 3bcfdfe
Show file tree
Hide file tree
Showing 10 changed files with 322 additions and 108 deletions.
7 changes: 7 additions & 0 deletions ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@
VERSIONS["giturlparse"],
VERSIONS["avro"], # Sample Data
VERSIONS["grpc-tools"],
VERSIONS["neo4j"],
"testcontainers==3.7.1;python_version<'3.9'",
"testcontainers==4.4.0;python_version>='3.9'",
"minio==7.2.5",
Expand All @@ -375,6 +376,12 @@
"requests==2.31.0",
f"{DATA_DIFF['mysql']}==0.11.2",
*plugins["deltalake"],
*plugins["datalake-gcs"],
*plugins["pgspider"],
*plugins["clickhouse"],
*plugins["mssql"],
*plugins["dagster"],
*plugins["oracle"],
}

e2e_test = {
Expand Down
100 changes: 100 additions & 0 deletions ingestion/src/metadata/data_quality/builders/i_validator_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Builder interface defining the structure of builders for validators.
Validators are test classes (e.g. columnValuesToBeBetween, etc.)
"""

from abc import ABC, abstractmethod
from datetime import datetime
from typing import TYPE_CHECKING, Optional, Type, Union

from metadata.data_quality.validations.base_test_handler import BaseTestValidator
from metadata.data_quality.validations.runtime_param_setter.param_setter import (
RuntimeParameterSetter,
)
from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue
from metadata.profiler.processor.runner import QueryRunner
from metadata.utils.importer import import_test_case_class

if TYPE_CHECKING:
from pandas import DataFrame


class IValidatorBuilder(ABC):
"""Interface for validator builders"""

@property
def test_case(self):
"""Return the test case object"""
return self._test_case

@property
def validator(self):
"""Return the validator object"""
return self._validator

def __init__(
self,
runner: Union[QueryRunner, "DataFrame"],
test_case: TestCase,
entity_type: str,
) -> None:
"""Builder object for SQA validators. This builder is used to create a validator object
Args:
runner (QueryRunner): The runner object
test_case (TestCase): The test case object
entity_type (str): one of COLUMN or TABLE -- fetched from the test definition
"""
self._test_case = test_case
self.runner = runner
self.validator_cls: Type[BaseTestValidator] = import_test_case_class(
entity_type,
self._get_source_type(),
self.test_case.testDefinition.fullyQualifiedName, # type: ignore
)
self.reset()

def set_runtime_params(
self, runtime_params_setter: Optional[RuntimeParameterSetter]
):
"""Set the runtime parameters for the validator object
# TODO: We should support setting n runtime parameters
Args:
runtime_params_setter (Optional[RuntimeParameterSetter]): The runtime parameter setter
"""
if runtime_params_setter:
params = runtime_params_setter.get_parameters(self.test_case)
if not self.test_case.parameterValues:
# If there are no parameters, create a new list
self.test_case.parameterValues = []
self.test_case.parameterValues.append(
TestCaseParameterValue(
name="runtimeParams", value=params.model_dump_json()
)
)

def reset(self):
"""Reset the builder"""
self._validator = self.validator_cls(
self.runner,
test_case=self.test_case,
execution_date=int(datetime.now().timestamp() * 1000),
)

@abstractmethod
def _get_source_type(self):
"""Get the source type"""
raise NotImplementedError
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Builder defining the structure of builders for validators for Pandas sources
"""

from typing import TYPE_CHECKING

from metadata.data_quality.builders.i_validator_builder import IValidatorBuilder

if TYPE_CHECKING:
pass


class PandasValidatorBuilder(IValidatorBuilder):
"""Builder for Pandas validators"""

def _get_source_type(self) -> str:
"""Return the test case"""
return "pandas"
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Builder defining the structure of builders for validators for SQA sources
"""


from metadata.data_quality.builders.i_validator_builder import IValidatorBuilder


class SQAValidatorBuilder(IValidatorBuilder):
"""Builder for SQA validators"""

def _get_source_type(self) -> str:
"""Return the test case"""
return "sqlalchemy"
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,20 @@
Interfaces with database for all database engine
supporting sqlalchemy abstraction layer
"""
from datetime import datetime
from typing import Optional, Type

from metadata.data_quality.builders.i_validator_builder import IValidatorBuilder
from metadata.data_quality.builders.pandas_validator_builder import (
PandasValidatorBuilder,
)
from metadata.data_quality.interface.test_suite_interface import TestSuiteInterface
from metadata.data_quality.validations.base_test_handler import BaseTestValidator
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.database.datalakeConnection import (
DatalakeConnection,
)
from metadata.generated.schema.tests.basic import TestCaseResult
from metadata.generated.schema.tests.testCase import TestCase
from metadata.generated.schema.tests.testDefinition import TestDefinition
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection
from metadata.mixins.pandas.pandas_mixin import PandasInterfaceMixin
from metadata.utils.importer import import_test_case_class
from metadata.utils.logger import test_suite_logger

logger = test_suite_logger()
Expand Down Expand Up @@ -69,40 +67,7 @@ def __init__(
if self.dfs and self.table_partition_config:
self.dfs = self.get_partitioned_df(self.dfs)

def run_test_case(
self,
test_case: TestCase,
) -> Optional[TestCaseResult]:
"""Run table tests where platformsTest=OpenMetadata
Args:
test_case: test case object to execute
Returns:
TestCaseResult object
"""

try:
TestHandler: Type[ # pylint: disable=invalid-name
BaseTestValidator
] = import_test_case_class(
self.ometa_client.get_by_id(
TestDefinition, test_case.testDefinition.id
).entityType.value,
"pandas",
test_case.testDefinition.fullyQualifiedName,
)

test_handler = TestHandler(
self.dfs,
test_case=test_case,
execution_date=int(datetime.now().timestamp() * 1000),
)

return test_handler.run_validation()
except Exception as err:
logger.error(
f"Error executing {test_case.testDefinition.fullyQualifiedName} - {err}"
)

raise RuntimeError(err)
def _get_validator_builder(
self, test_case: TestCase, entity_type: str
) -> IValidatorBuilder:
return PandasValidatorBuilder(self.dfs, test_case, entity_type)
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,23 @@
supporting sqlalchemy abstraction layer
"""

from datetime import datetime
from typing import Optional, Union
from typing import Union

from sqlalchemy.orm import DeclarativeMeta
from sqlalchemy.orm.util import AliasedClass

from metadata.data_quality.builders.i_validator_builder import IValidatorBuilder
from metadata.data_quality.builders.sqa_validator_builder import SQAValidatorBuilder
from metadata.data_quality.interface.test_suite_interface import TestSuiteInterface
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.databaseService import DatabaseConnection
from metadata.generated.schema.tests.basic import TestCaseResult
from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue
from metadata.generated.schema.tests.testDefinition import TestDefinition
from metadata.generated.schema.tests.testCase import TestCase
from metadata.ingestion.connections.session import create_and_bind_session
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.mixins.sqalchemy.sqa_mixin import SQAInterfaceMixin
from metadata.profiler.processor.runner import QueryRunner
from metadata.profiler.processor.sampler.sqlalchemy.sampler import SQASampler
from metadata.utils.constants import TEN_MIN
from metadata.utils.importer import import_test_case_class
from metadata.utils.logger import test_suite_logger
from metadata.utils.ssl_manager import get_ssl_connection
from metadata.utils.timeout import cls_timeout
Expand Down Expand Up @@ -143,50 +141,7 @@ def _create_runner(self) -> None:
)
)

def run_test_case(
self,
test_case: TestCase,
) -> Optional[TestCaseResult]:
"""Run table tests where platformsTest=OpenMetadata
Args:
test_case: test case object to execute
Returns:
TestCaseResult object
"""

try:
TestHandler = import_test_case_class( # pylint: disable=invalid-name
self.ometa_client.get_by_id(
TestDefinition, test_case.testDefinition.id
).entityType.value,
"sqlalchemy",
test_case.testDefinition.fullyQualifiedName,
)

if TestHandler.runtime_parameter_setter:
setter = TestHandler.runtime_parameter_setter(
self.ometa_client,
self.service_connection_config,
self.table_entity,
self.sampler,
)
runtime_params = setter.get_parameters(test_case)
test_case.parameterValues.append(
TestCaseParameterValue(
name="runtimeParams", value=runtime_params.model_dump_json()
)
)

test_handler = TestHandler(
self.runner,
test_case=test_case,
execution_date=int(datetime.now().timestamp() * 1000),
)
return test_handler.run_validation()
except Exception as err:
logger.error(
f"Error executing {test_case.testDefinition.fullyQualifiedName} - {err}"
)
raise RuntimeError(err)
def _get_validator_builder(
self, test_case: TestCase, entity_type: str
) -> IValidatorBuilder:
return SQAValidatorBuilder(self.runner, test_case, entity_type)
Loading

0 comments on commit 3bcfdfe

Please sign in to comment.