Skip to content

Commit

Permalink
Merge branch 'main' into ISSUE-13473
Browse files Browse the repository at this point in the history
  • Loading branch information
kchenery committed Mar 7, 2024
2 parents 93cee1a + 68e6bcf commit 45e70bd
Show file tree
Hide file tree
Showing 122 changed files with 1,453 additions and 1,706 deletions.
1 change: 1 addition & 0 deletions ingestion/src/metadata/data_quality/api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class TestCaseDefinition(ConfigModel):
testDefinitionName: str
columnName: Optional[str] = None
parameterValues: Optional[List[TestCaseParameterValue]]
computePassedFailedRowCount: Optional[bool] = False


class TestSuiteProcessorConfig(ConfigModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,14 @@ def _update_test_cases(
if test_case_to_update.name == test_case.name.__root__
)
updated_test_case = self.metadata.patch_test_case_definition(
source=test_case,
test_case=test_case,
entity_link=entity_link.get_entity_link(
Table,
fqn=table_fqn,
column_name=test_case_definition.columnName,
),
test_case_parameter_values=test_case_definition.parameterValues,
compute_passed_failed_row_count=test_case_definition.computePassedFailedRowCount,
)
if updated_test_case:
test_cases.pop(indx)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def get_test_case_result_object( # pylint: disable=too-many-arguments
sampleData=None,
)

if (row_count is not None) and (
if (row_count is not None and row_count != 0) and (
# we'll need at least one of these to be not None to compute the other
(failed_rows is not None)
or (passed_rows is not None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ def run_query_results(

if res is None:
raise ValueError(
f"Query on table/column {column.name if column is not None else ''} returned None"
f"\nQuery on table/column {column.name if column is not None else ''} returned None. Your table might be empty. "
"If you confirmed your table is not empty and are still seeing this message you can:\n"
"\t1. check the documentation: https://docs.open-metadata.org/v1.3.x/connectors/ingestion/workflows/data-quality/tests\n"
"\t2. reach out to the Collate team for support"
)

return res
Expand Down
28 changes: 12 additions & 16 deletions ingestion/src/metadata/examples/workflows/test_suite.yaml
Original file line number Diff line number Diff line change
@@ -1,33 +1,29 @@
source:
type: TestSuite
serviceName: service_name
serviceConnection: {}
sourceConfig:
config:
type: TestSuite
entityFullyQualifiedName: db.schema.columns
entityFullyQualifiedName: my.service.db.schema.columns
processor:
type: orm-test-runner
config:
testSuites:
- name: test suite name
description: this is a description
testCases:
- name: test case name
description: test case description
testDefinitionName: name of the test definition for this test case
entityLink: "<#E::table::fqn> or <#E::table::fqn::columns::column_name>"
parameterValues:
- name: parameter name
value: value
- name: parameter name
value: value
testCases:
- name: test case name
description: test case description
testDefinitionName: name of the test definition for this test case
entityLink: "<#E::table::fqn> or <#E::table::fqn::columns::column_name>"
parameterValues:
- name: parameter name
value: value
- name: parameter name
value: value
sink:
type: metadata-rest
config: {}
workflowConfig:
openMetadataServerConfig:
hostPort: http://localhost:8585/api
hostPort: http://host:8585/api
authProvider: openmetadata
securityConfig:
jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
Expand Down
7 changes: 5 additions & 2 deletions ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,10 @@ def patch_table_constraints(

def patch_test_case_definition(
self,
source: TestCase,
test_case: TestCase,
entity_link: str,
test_case_parameter_values: Optional[List[TestCaseParameterValue]] = None,
compute_passed_failed_row_count: Optional[bool] = False,
) -> Optional[TestCase]:
"""Given a test case and a test case definition JSON PATCH the test case
Expand All @@ -245,7 +246,7 @@ def patch_test_case_definition(
test_case_definition: test case definition to add
"""
source: TestCase = self._fetch_entity_if_exists(
entity=TestCase, entity_id=source.id, fields=["testDefinition", "testSuite"]
entity=TestCase, entity_id=test_case.id, fields=["testDefinition", "testSuite"] # type: ignore
) # type: ignore

if not source:
Expand All @@ -256,6 +257,8 @@ def patch_test_case_definition(
destination.entityLink = EntityLink(__root__=entity_link)
if test_case_parameter_values:
destination.parameterValues = test_case_parameter_values
if compute_passed_failed_row_count != source.computePassedFailedRowCount:
destination.computePassedFailedRowCount = compute_passed_failed_row_count

return self.patch(entity=TestCase, source=source, destination=destination)

Expand Down
5 changes: 4 additions & 1 deletion ingestion/src/metadata/profiler/orm/converter/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,10 @@ def ometa_to_sqa_orm(
{
"__tablename__": str(table.name.__root__),
"__table_args__": {
"schema": orm_schema_name,
# SQLite does not support schemas
"schema": orm_schema_name
if table.serviceType != databaseService.DatabaseServiceType.SQLite
else None,
"extend_existing": True, # Recreates the table ORM object if it already exists. Useful for testing
"quote": check_snowflake_case_sensitive(
table.serviceType, table.name.__root__
Expand Down
122 changes: 86 additions & 36 deletions ingestion/tests/integration/test_suite/test_e2e_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,10 @@
],
},
{
"name": "table_column_name_to_exists",
"testDefinitionName": "tableColumnNameToExist",
"parameterValues": [{"name": "columnName", "value": "id"}],
"name": "table_column_to_be_not_null",
"testDefinitionName": "columnValuesToBeNotNull",
"columnName": "id",
"computePassedFailedRowCount": True,
},
],
},
Expand All @@ -94,7 +95,16 @@


class User(Base):
__tablename__ = ("users",)
__tablename__ = "users"
id = sqa.Column(sqa.Integer, primary_key=True)
name = sqa.Column(sqa.String(256))
fullname = sqa.Column(sqa.String(256))
nickname = sqa.Column(sqa.String(256))
age = sqa.Column(sqa.Integer)


class EmptyUser(Base):
__tablename__ = "empty_users"
id = sqa.Column(sqa.Integer, primary_key=True)
name = sqa.Column(sqa.String(256))
fullname = sqa.Column(sqa.String(256))
Expand Down Expand Up @@ -159,11 +169,25 @@ def setUpClass(cls):
databaseSchema=database_schema.fullyQualifiedName,
)
)
cls.metadata.create_or_update(
CreateTableRequest(
name="empty_users",
columns=[
Column(name="id", dataType=DataType.INT),
Column(name="name", dataType=DataType.STRING),
Column(name="fullname", dataType=DataType.STRING),
Column(name="nickname", dataType=DataType.STRING),
Column(name="age", dataType=DataType.INT),
],
databaseSchema=database_schema.fullyQualifiedName,
)
)

engine = sqa.create_engine(f"sqlite:///{cls.sqlite_conn.config.databaseMode}")
session = Session(bind=engine)

User.__table__.create(bind=engine)
EmptyUser.__table__.create(bind=engine)

for _ in range(10):
data = [
Expand Down Expand Up @@ -212,38 +236,64 @@ def tearDownClass(cls) -> None:

def test_e2e_cli_workflow(self):
"""test cli workflow e2e"""
workflow = TestSuiteWorkflow.create(test_suite_config)
workflow.execute()
workflow.raise_from_status()

test_case_1 = self.metadata.get_by_name(
entity=TestCase,
fqn="test_suite_service_test.test_suite_database.test_suite_database_schema.users.my_test_case",
fields=["testDefinition", "testSuite"],
)
test_case_2 = self.metadata.get_by_name(
entity=TestCase,
fqn="test_suite_service_test.test_suite_database.test_suite_database_schema.users.table_column_name_to_exists",
fields=["testDefinition", "testSuite"],
)
parameters = [
{"table_name": "users", "status": "Success"},
{"table_name": "empty_users", "status": "Aborted"},
]

assert test_case_1
assert test_case_2
for param in parameters:
with self.subTest(param=param):
table_name = param["table_name"]
status = param["status"]
test_suite_config["source"]["sourceConfig"]["config"].update(
{
"entityFullyQualifiedName": f"test_suite_service_test.test_suite_database.test_suite_database_schema.{table_name}"
}
)

test_case_result_1 = self.metadata.client.get(
"/dataQuality/testCases/test_suite_service_test.test_suite_database.test_suite_database_schema.users.my_test_case/testCaseResult",
data={
"startTs": int((datetime.now() - timedelta(days=3)).timestamp()),
"endTs": int((datetime.now() + timedelta(days=3)).timestamp()),
},
)
test_case_result_2 = self.metadata.client.get(
"/dataQuality/testCases/test_suite_service_test.test_suite_database.test_suite_database_schema.users.table_column_name_to_exists/testCaseResult",
data={
"startTs": int((datetime.now() - timedelta(days=3)).timestamp()),
"endTs": int((datetime.now() + timedelta(days=3)).timestamp()),
},
)
workflow = TestSuiteWorkflow.create(test_suite_config)
workflow.execute()
workflow.raise_from_status()

test_case_1 = self.metadata.get_by_name(
entity=TestCase,
fqn=f"test_suite_service_test.test_suite_database.test_suite_database_schema.{table_name}.my_test_case",
fields=["testDefinition", "testSuite"],
)
test_case_2 = self.metadata.get_by_name(
entity=TestCase,
fqn=f"test_suite_service_test.test_suite_database.test_suite_database_schema.{table_name}.id.table_column_to_be_not_null",
fields=["testDefinition", "testSuite"],
)

assert test_case_1
assert test_case_2

test_case_result_1 = self.metadata.client.get(
f"/dataQuality/testCases/test_suite_service_test.test_suite_database.test_suite_database_schema.{table_name}"
".my_test_case/testCaseResult",
data={
"startTs": int((datetime.now() - timedelta(days=3)).timestamp())
* 1000,
"endTs": int((datetime.now() + timedelta(days=3)).timestamp())
* 1000,
},
)
test_case_result_2 = self.metadata.client.get(
f"/dataQuality/testCases/test_suite_service_test.test_suite_database.test_suite_database_schema.{table_name}"
".id.table_column_to_be_not_null/testCaseResult",
data={
"startTs": int((datetime.now() - timedelta(days=3)).timestamp())
* 1000,
"endTs": int((datetime.now() + timedelta(days=3)).timestamp())
* 1000,
},
)

data_test_case_result_1: dict = test_case_result_1.get("data") # type: ignore
data_test_case_result_2: dict = test_case_result_2.get("data") # type: ignore

assert test_case_result_1
assert test_case_result_2
assert data_test_case_result_1
assert data_test_case_result_1[0]["testCaseStatus"] == "Success"
assert data_test_case_result_2
assert data_test_case_result_2[0]["testCaseStatus"] == status
Original file line number Diff line number Diff line change
Expand Up @@ -293,11 +293,11 @@ Validate a list of table column name matches an expected set of columns

### Table Custom SQL Test
Write you own SQL test. When writting your query you can use 2 strategies:
- `ROWS` (default): expects the query to be written as `SELECT <field>, <field> FROM <foo> WHERE <condition>`. **Note** if your query returns a large amount of rows it might cause an "Out Of Memeory" error. In this case we recoomend you to use the `COUNT` startegy.
- `ROWS` (default): expects the query to be written as `SELECT <field>, <field> FROM <foo> WHERE <condition>`. **Note** if your query returns a large amount of rows it might cause an "Out Of Memeory" error. In this case we recomend you to use the `COUNT` strategy.
- `COUNT`: expects the query to be written as `SELECT COUNT(<field>) FROM <foo> WHERE <condition>`.

**How to use the Threshold Parameter?**
The threshold allows you to define a limit for which you test should pass or fail - by defaut this number is 0. For example if my custom SQL query test returns 10 rows and my threshold is 5 the test will fail. If I update my threshold to 11 on my next run my test will pass.
The threshold allows you to define a limit for which you test should pass or fail - by defaut this number is 0. For example if my custom SQL query test returns 10 rows (or a COUNT value of 10) and my threshold is 5 the test will fail. If I update my threshold to 11 on my next run my test will pass.

**Properties**

Expand Down Expand Up @@ -473,6 +473,7 @@ Makes sure that there are no duplicate values in a given column.
description: test description
columnName: columnName
testDefinitionName: columnValuesToBeUnique
computePassedFailedRowCount: <true or false>
parameterValues:
- name: columnNames
value: true
Expand Down Expand Up @@ -516,6 +517,7 @@ Validates that there are no null values in the column.
description: test description
columnName: columnName
testDefinitionName: columnValuesToBeNotNull
computePassedFailedRowCount: <true or false>
parameterValues:
- name: columnValuesToBeNotNull
value: true
Expand Down Expand Up @@ -569,6 +571,7 @@ The other databases will fall back to the `LIKE` expression
description: test description
columnName: columnName
testDefinitionName: columnValuesToMatchRegex
computePassedFailedRowCount: <true or false>
parameterValues:
- name: regex
value: "%something%"
Expand Down Expand Up @@ -622,6 +625,7 @@ The other databases will fall back to the `LIKE` expression
description: test description
columnName: columnName
testDefinitionName: columnValuesToMatchRegex
computePassedFailedRowCount: <true or false>
parameterValues:
- name: forbiddenRegex
value: "%something%"
Expand Down Expand Up @@ -661,10 +665,13 @@ Validate values form a set are present in a column.
**YAML Config**

```yaml
testDefinitionName: columnValuesToBeInSet
parameterValues:
- name: allowedValues
value: ["forbidden1", "forbidden2"]
- name: myTestName
testDefinitionName: columnValuesToBeInSet
columnName: columnName
computePassedFailedRowCount: <true or false>
parameterValues:
- name: allowedValues
value: ["forbidden1", "forbidden2"]
```
**JSON Config**
Expand Down Expand Up @@ -708,6 +715,7 @@ Validate that there are no values in a column in a set of forbidden values.
description: test description
columnName: columnName
testDefinitionName: columnValuesToBeNotInSet
computePassedFailedRowCount: <true or false>
parameterValues:
- name: forbiddenValues
value: ["forbidden1", "forbidden2"]
Expand Down Expand Up @@ -762,6 +770,7 @@ Any of those two need to be informed.
description: test description
columnName: columnName
testDefinitionName: columnValuesToBeBetween
computePassedFailedRowCount: <true or false>
parameterValues:
- name: minValue
value: ["forbidden1", "forbidden2"]
Expand Down Expand Up @@ -893,6 +902,7 @@ Any of those two need to be informed.
description: test description
columnName: columnName
testDefinitionName: columnValueLengthsToBeBetween
computePassedFailedRowCount: <true or false>
parameterValues:
- name: minLength
value: 50
Expand Down
Loading

0 comments on commit 45e70bd

Please sign in to comment.