From 4831e5b23a6f28920ecf69cf7ccf2b8ad5e38f06 Mon Sep 17 00:00:00 2001 From: sushi30 Date: Wed, 18 Sep 2024 13:53:54 +0200 Subject: [PATCH 1/2] fix(data-quality): snowflake data diff - fixed schema in snowflake URL for data diff - added e2e for snowflake data quality --- .../table_diff_params_setter.py | 3 +- .../cli_e2e/base/config_builders/builders.py | 33 ++++++++++ ingestion/tests/cli_e2e/base/e2e_types.py | 1 + ingestion/tests/cli_e2e/base/test_cli_db.py | 61 +++++++++++++++++++ ingestion/tests/cli_e2e/test_cli_snowflake.py | 30 ++++++++- .../integration/test_suite/test_workflow.py | 2 +- 6 files changed, 127 insertions(+), 3 deletions(-) diff --git a/ingestion/src/metadata/data_quality/validations/runtime_param_setter/table_diff_params_setter.py b/ingestion/src/metadata/data_quality/validations/runtime_param_setter/table_diff_params_setter.py index 86a09c17dd43..ec373acc7452 100644 --- a/ingestion/src/metadata/data_quality/validations/runtime_param_setter/table_diff_params_setter.py +++ b/ingestion/src/metadata/data_quality/validations/runtime_param_setter/table_diff_params_setter.py @@ -27,6 +27,7 @@ from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.tests.testCase import TestCase from metadata.ingestion.source.connections import get_connection +from metadata.profiler.orm.registry import Dialects from metadata.utils import fqn @@ -168,7 +169,7 @@ def get_data_diff_url(service_url: str, table_fqn) -> str: table_fqn ) # path needs to include the database AND schema in some of the connectors - if kwargs["scheme"] in ["mssql"]: + if kwargs["scheme"] in {Dialects.MSSQL, Dialects.Snowflake}: kwargs["path"] = f"/{database}/{schema}" return url._replace(**kwargs).geturl() diff --git a/ingestion/tests/cli_e2e/base/config_builders/builders.py b/ingestion/tests/cli_e2e/base/config_builders/builders.py index 1da3ce0fefce..024f287214c5 100644 --- a/ingestion/tests/cli_e2e/base/config_builders/builders.py +++ b/ingestion/tests/cli_e2e/base/config_builders/builders.py @@ -16,6 +16,10 @@ from copy import deepcopy +from metadata.generated.schema.metadataIngestion.testSuitePipeline import ( + TestSuiteConfigType, +) + from ..e2e_types import E2EType @@ -70,6 +74,34 @@ def build(self) -> dict: return self.config +class DataQualityConfigBuilder(BaseBuilder): + """Builder class for the data quality config""" + + # pylint: disable=invalid-name + def __init__(self, config: dict, config_args: dict) -> None: + super().__init__(config, config_args) + self.test_case_defintions = self.config_args.get("test_case_definitions", []) + self.entity_fqn = self.config_args.get("entity_fqn", []) + + # pylint: enable=invalid-name + + def build(self) -> dict: + """build profiler config""" + del self.config["source"]["sourceConfig"]["config"] + self.config["source"]["sourceConfig"] = { + "config": { + "type": TestSuiteConfigType.TestSuite.value, + "entityFullyQualifiedName": self.entity_fqn, + }, + } + + self.config["processor"] = { + "type": "orm-test-runner", + "config": {"testCases": self.test_case_defintions}, + } + return self.config + + class SchemaConfigBuilder(BaseBuilder): """Builder for schema filter config""" @@ -147,6 +179,7 @@ def builder_factory(builder, config: dict, config_args: dict): """Factory method to return the builder class""" builder_classes = { E2EType.PROFILER.value: ProfilerConfigBuilder, + E2EType.DATA_QUALITY.value: DataQualityConfigBuilder, E2EType.INGEST_DB_FILTER_SCHEMA.value: SchemaConfigBuilder, E2EType.INGEST_DB_FILTER_TABLE.value: TableConfigBuilder, E2EType.INGEST_DB_FILTER_MIX.value: MixConfigBuilder, diff --git a/ingestion/tests/cli_e2e/base/e2e_types.py b/ingestion/tests/cli_e2e/base/e2e_types.py index 81b7eb14890f..442c5c27b884 100644 --- a/ingestion/tests/cli_e2e/base/e2e_types.py +++ b/ingestion/tests/cli_e2e/base/e2e_types.py @@ -24,6 +24,7 @@ class E2EType(Enum): INGEST = "ingest" PROFILER = "profiler" PROFILER_PROCESSOR = "profiler-processor" + DATA_QUALITY = "test" INGEST_DB_FILTER_SCHEMA = "ingest-db-filter-schema" INGEST_DB_FILTER_TABLE = "ingest-db-filter-table" INGEST_DB_FILTER_MIX = "ingest-db-filter-mix" diff --git a/ingestion/tests/cli_e2e/base/test_cli_db.py b/ingestion/tests/cli_e2e/base/test_cli_db.py index 02ebfa6d40ed..0bc5eb23f0fd 100644 --- a/ingestion/tests/cli_e2e/base/test_cli_db.py +++ b/ingestion/tests/cli_e2e/base/test_cli_db.py @@ -17,8 +17,13 @@ from unittest import TestCase import pytest +from pydantic import TypeAdapter +from _openmetadata_testutils.pydantic.test_utils import assert_equal_pydantic_objects +from metadata.data_quality.api.models import TestCaseDefinition from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.tests.basic import TestCaseResult +from metadata.generated.schema.tests.testCase import TestCase as OMTestCase from metadata.ingestion.api.status import Status from .e2e_types import E2EType @@ -208,6 +213,50 @@ def test_profiler_with_time_partition(self) -> None: sink_status, ) + @pytest.mark.order(12) + def test_data_quality(self) -> None: + """12. Test data quality for the connector""" + if self.get_data_quality_table() is None: + return + self.delete_table_and_view() + self.create_table_and_view() + table: Table = self.openmetadata.get_by_name( + Table, self.get_data_quality_table(), nullable=False + ) + self.build_config_file() + self.run_command() + test_case_definitions = self.get_test_case_definitions() + self.build_config_file( + E2EType.DATA_QUALITY, + { + "entity_fqn": table.fullyQualifiedName.root, + "test_case_definitions": TypeAdapter( + List[TestCaseDefinition] + ).dump_python(test_case_definitions), + }, + ) + result = self.run_command("test") + sink_status, source_status = self.retrieve_statuses(result) + self.assert_status_for_data_quality(source_status, sink_status) + test_case_entities = [ + self.openmetadata.get_by_name( + OMTestCase, + ".".join([table.fullyQualifiedName.root, tcd.name]), + fields=["*"], + nullable=False, + ) + for tcd in test_case_definitions + ] + expected = self.get_expected_test_case_results() + try: + for test_case, expected in zip(test_case_entities, expected): + assert_equal_pydantic_objects(expected, test_case.testCaseResult) + finally: + for tc in test_case_entities: + self.openmetadata.delete( + OMTestCase, tc.id, recursive=True, hard_delete=True + ) + def retrieve_table(self, table_name_fqn: str) -> Table: return self.openmetadata.get_by_name(entity=Table, fqn=table_name_fqn) @@ -346,3 +395,15 @@ def get_profiler_processor_config(self, config: dict) -> dict: "config": {"tableConfig": [config]}, } } + + def get_data_quality_table(self): + return None + + def get_test_case_definitions(self) -> List[TestCaseDefinition]: + pass + + def get_expected_test_case_results(self) -> List[TestCaseResult]: + pass + + def assert_status_for_data_quality(self, source_status, sink_status): + pass diff --git a/ingestion/tests/cli_e2e/test_cli_snowflake.py b/ingestion/tests/cli_e2e/test_cli_snowflake.py index dc27e2fee9d8..f4467d9d8de6 100644 --- a/ingestion/tests/cli_e2e/test_cli_snowflake.py +++ b/ingestion/tests/cli_e2e/test_cli_snowflake.py @@ -20,9 +20,12 @@ from _openmetadata_testutils.pydantic.test_utils import assert_equal_pydantic_objects from metadata.generated.schema.entity.data.table import DmlOperationType, SystemProfile +from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus +from metadata.generated.schema.tests.testCase import TestCaseParameterValue from metadata.generated.schema.type.basic import Timestamp from metadata.ingestion.api.status import Status +from ...src.metadata.data_quality.api.models import TestCaseDefinition from .base.e2e_types import E2EType from .common.test_cli_db import CliCommonDB from .common_e2e_sqa_mixins import SQACommonMethods @@ -65,7 +68,7 @@ class SnowflakeCliTest(CliCommonDB.TestSuite, SQACommonMethods): insert_data_queries: List[str] = [ "INSERT INTO E2E_DB.e2e_test.persons (person_id, full_name) VALUES (1,'Peter Parker');", - "INSERT INTO E2E_DB.e2e_test.persons (person_id, full_name) VALUES (1, 'Clark Kent');", + "INSERT INTO E2E_DB.e2e_test.persons (person_id, full_name) VALUES (2, 'Clark Kent');", "INSERT INTO e2e_test.e2e_table (varchar_column, int_column) VALUES ('e2e_test.e2e_table', 1);", "INSERT INTO public.e2e_table (varchar_column, int_column) VALUES ('public.e2e_table', 1);", "INSERT INTO e2e_table (varchar_column, int_column) VALUES ('e2e_table', 1);", @@ -316,3 +319,28 @@ def wait_for_query_log(cls, timeout=600): ) if (datetime.now().timestamp() - start) > timeout: raise TimeoutError(f"Query log not updated for {timeout} seconds") + + def get_data_quality_table(self): + return "e2e_snowflake.E2E_DB.E2E_TEST.PERSONS" + + def get_test_case_definitions(self) -> List[TestCaseDefinition]: + return [ + TestCaseDefinition( + name="snowflake_data_diff", + testDefinitionName="tableDiff", + computePassedFailedRowCount=True, + parameterValues=[ + TestCaseParameterValue( + name="table2", + value=self.get_data_quality_table(), + ), + TestCaseParameterValue( + name="keyColumns", + value='["PERSON_ID"]', + ), + ], + ) + ] + + def get_expected_test_case_results(self): + return [TestCaseResult(testCaseStatus=TestCaseStatus.Success)] diff --git a/ingestion/tests/integration/test_suite/test_workflow.py b/ingestion/tests/integration/test_suite/test_workflow.py index 240493bb104d..dc928340e796 100644 --- a/ingestion/tests/integration/test_suite/test_workflow.py +++ b/ingestion/tests/integration/test_suite/test_workflow.py @@ -264,7 +264,7 @@ def test_create_workflow_config_with_tests(self): workflow.source._process_table_suite(table=table) )[0] - test_cases: List[TestCase] = workflow.steps[0].get_test_cases( + test_cases: List[TestCase] = workflow.steps[0].get_test_case_definitions( test_cases=table_and_tests.right.test_cases, test_suite_fqn=self.table_with_suite.fullyQualifiedName.root + ".testSuite", table_fqn=self.table_with_suite.fullyQualifiedName.root, From a235cb6a6ee715b80f3795714b4677a2c8c90322 Mon Sep 17 00:00:00 2001 From: sushi30 Date: Thu, 19 Sep 2024 12:11:04 +0200 Subject: [PATCH 2/2] reverted unintended change --- ingestion/tests/integration/test_suite/test_workflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingestion/tests/integration/test_suite/test_workflow.py b/ingestion/tests/integration/test_suite/test_workflow.py index dc928340e796..240493bb104d 100644 --- a/ingestion/tests/integration/test_suite/test_workflow.py +++ b/ingestion/tests/integration/test_suite/test_workflow.py @@ -264,7 +264,7 @@ def test_create_workflow_config_with_tests(self): workflow.source._process_table_suite(table=table) )[0] - test_cases: List[TestCase] = workflow.steps[0].get_test_case_definitions( + test_cases: List[TestCase] = workflow.steps[0].get_test_cases( test_cases=table_and_tests.right.test_cases, test_suite_fqn=self.table_with_suite.fullyQualifiedName.root + ".testSuite", table_fqn=self.table_with_suite.fullyQualifiedName.root,