Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix 17903: fix(data-quality): snowflake data diff #17907

Merged
merged 4 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.tests.testCase import TestCase
from metadata.ingestion.source.connections import get_connection
from metadata.profiler.orm.registry import Dialects
from metadata.utils import fqn


Expand Down Expand Up @@ -168,7 +169,7 @@ def get_data_diff_url(service_url: str, table_fqn) -> str:
table_fqn
)
# path needs to include the database AND schema in some of the connectors
if kwargs["scheme"] in ["mssql"]:
if kwargs["scheme"] in {Dialects.MSSQL, Dialects.Snowflake}:
kwargs["path"] = f"/{database}/{schema}"
return url._replace(**kwargs).geturl()

Expand Down
33 changes: 33 additions & 0 deletions ingestion/tests/cli_e2e/base/config_builders/builders.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

from copy import deepcopy

from metadata.generated.schema.metadataIngestion.testSuitePipeline import (
TestSuiteConfigType,
)

from ..e2e_types import E2EType


Expand Down Expand Up @@ -70,6 +74,34 @@ def build(self) -> dict:
return self.config


class DataQualityConfigBuilder(BaseBuilder):
"""Builder class for the data quality config"""

# pylint: disable=invalid-name
def __init__(self, config: dict, config_args: dict) -> None:
super().__init__(config, config_args)
self.test_case_defintions = self.config_args.get("test_case_definitions", [])
self.entity_fqn = self.config_args.get("entity_fqn", [])

# pylint: enable=invalid-name

def build(self) -> dict:
"""build profiler config"""
del self.config["source"]["sourceConfig"]["config"]
self.config["source"]["sourceConfig"] = {
"config": {
"type": TestSuiteConfigType.TestSuite.value,
"entityFullyQualifiedName": self.entity_fqn,
},
}

self.config["processor"] = {
"type": "orm-test-runner",
"config": {"testCases": self.test_case_defintions},
}
return self.config


class SchemaConfigBuilder(BaseBuilder):
"""Builder for schema filter config"""

Expand Down Expand Up @@ -147,6 +179,7 @@ def builder_factory(builder, config: dict, config_args: dict):
"""Factory method to return the builder class"""
builder_classes = {
E2EType.PROFILER.value: ProfilerConfigBuilder,
E2EType.DATA_QUALITY.value: DataQualityConfigBuilder,
E2EType.INGEST_DB_FILTER_SCHEMA.value: SchemaConfigBuilder,
E2EType.INGEST_DB_FILTER_TABLE.value: TableConfigBuilder,
E2EType.INGEST_DB_FILTER_MIX.value: MixConfigBuilder,
Expand Down
1 change: 1 addition & 0 deletions ingestion/tests/cli_e2e/base/e2e_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class E2EType(Enum):
INGEST = "ingest"
PROFILER = "profiler"
PROFILER_PROCESSOR = "profiler-processor"
DATA_QUALITY = "test"
INGEST_DB_FILTER_SCHEMA = "ingest-db-filter-schema"
INGEST_DB_FILTER_TABLE = "ingest-db-filter-table"
INGEST_DB_FILTER_MIX = "ingest-db-filter-mix"
Expand Down
61 changes: 61 additions & 0 deletions ingestion/tests/cli_e2e/base/test_cli_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@
from unittest import TestCase

import pytest
from pydantic import TypeAdapter

from _openmetadata_testutils.pydantic.test_utils import assert_equal_pydantic_objects
from metadata.data_quality.api.models import TestCaseDefinition
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.tests.basic import TestCaseResult
from metadata.generated.schema.tests.testCase import TestCase as OMTestCase
from metadata.ingestion.api.status import Status

from .e2e_types import E2EType
Expand Down Expand Up @@ -208,6 +213,50 @@ def test_profiler_with_time_partition(self) -> None:
sink_status,
)

@pytest.mark.order(12)
def test_data_quality(self) -> None:
"""12. Test data quality for the connector"""
if self.get_data_quality_table() is None:
return
self.delete_table_and_view()
self.create_table_and_view()
table: Table = self.openmetadata.get_by_name(
Table, self.get_data_quality_table(), nullable=False
)
self.build_config_file()
self.run_command()
test_case_definitions = self.get_test_case_definitions()
self.build_config_file(
E2EType.DATA_QUALITY,
{
"entity_fqn": table.fullyQualifiedName.root,
"test_case_definitions": TypeAdapter(
List[TestCaseDefinition]
).dump_python(test_case_definitions),
},
)
result = self.run_command("test")
sink_status, source_status = self.retrieve_statuses(result)
self.assert_status_for_data_quality(source_status, sink_status)
test_case_entities = [
self.openmetadata.get_by_name(
OMTestCase,
".".join([table.fullyQualifiedName.root, tcd.name]),
fields=["*"],
nullable=False,
)
for tcd in test_case_definitions
]
expected = self.get_expected_test_case_results()
try:
for test_case, expected in zip(test_case_entities, expected):
assert_equal_pydantic_objects(expected, test_case.testCaseResult)
finally:
for tc in test_case_entities:
self.openmetadata.delete(
OMTestCase, tc.id, recursive=True, hard_delete=True
)

def retrieve_table(self, table_name_fqn: str) -> Table:
return self.openmetadata.get_by_name(entity=Table, fqn=table_name_fqn)

Expand Down Expand Up @@ -346,3 +395,15 @@ def get_profiler_processor_config(self, config: dict) -> dict:
"config": {"tableConfig": [config]},
}
}

def get_data_quality_table(self):
return None

def get_test_case_definitions(self) -> List[TestCaseDefinition]:
pass

def get_expected_test_case_results(self) -> List[TestCaseResult]:
pass

def assert_status_for_data_quality(self, source_status, sink_status):
pass
30 changes: 29 additions & 1 deletion ingestion/tests/cli_e2e/test_cli_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@

from _openmetadata_testutils.pydantic.test_utils import assert_equal_pydantic_objects
from metadata.generated.schema.entity.data.table import DmlOperationType, SystemProfile
from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus
from metadata.generated.schema.tests.testCase import TestCaseParameterValue
from metadata.generated.schema.type.basic import Timestamp
from metadata.ingestion.api.status import Status

from ...src.metadata.data_quality.api.models import TestCaseDefinition
from .base.e2e_types import E2EType
from .common.test_cli_db import CliCommonDB
from .common_e2e_sqa_mixins import SQACommonMethods
Expand Down Expand Up @@ -65,7 +68,7 @@ class SnowflakeCliTest(CliCommonDB.TestSuite, SQACommonMethods):

insert_data_queries: List[str] = [
"INSERT INTO E2E_DB.e2e_test.persons (person_id, full_name) VALUES (1,'Peter Parker');",
"INSERT INTO E2E_DB.e2e_test.persons (person_id, full_name) VALUES (1, 'Clark Kent');",
"INSERT INTO E2E_DB.e2e_test.persons (person_id, full_name) VALUES (2, 'Clark Kent');",
"INSERT INTO e2e_test.e2e_table (varchar_column, int_column) VALUES ('e2e_test.e2e_table', 1);",
"INSERT INTO public.e2e_table (varchar_column, int_column) VALUES ('public.e2e_table', 1);",
"INSERT INTO e2e_table (varchar_column, int_column) VALUES ('e2e_table', 1);",
Expand Down Expand Up @@ -316,3 +319,28 @@ def wait_for_query_log(cls, timeout=600):
)
if (datetime.now().timestamp() - start) > timeout:
raise TimeoutError(f"Query log not updated for {timeout} seconds")

def get_data_quality_table(self):
return "e2e_snowflake.E2E_DB.E2E_TEST.PERSONS"

def get_test_case_definitions(self) -> List[TestCaseDefinition]:
return [
TestCaseDefinition(
name="snowflake_data_diff",
testDefinitionName="tableDiff",
computePassedFailedRowCount=True,
parameterValues=[
TestCaseParameterValue(
name="table2",
value=self.get_data_quality_table(),
),
TestCaseParameterValue(
name="keyColumns",
value='["PERSON_ID"]',
),
],
)
]

def get_expected_test_case_results(self):
return [TestCaseResult(testCaseStatus=TestCaseStatus.Success)]
Loading