Skip to content

Commit

Permalink
Fixes Druid Profiler failures (#13700)
Browse files Browse the repository at this point in the history
* fix: updated playwrigth test structure

* fix: druid profiler queries

* fix: python linting

* fix: python linting

* fix: do not compute random sample if profile sample is 100

* fix: updated workflow to test on push

* fix: move connector config to category folder

* fix: updated imports

* fix: added pytest-dependency package

* fix: updated readme.md

* fix: python linting

* fix: updated profile doc for Druid sampling

* fix: empty commit for CI

* fix: added workflow constrain back

* fix: sonar code smell

* fix: added secrets to container

* Update openmetadata-docs/content/v1.2.x-SNAPSHOT/connectors/ingestion/workflows/profiler/index.md

Co-authored-by: Pere Miquel Brull <peremiquelbrull@gmail.com>

* Update openmetadata-docs/content/v1.2.x-SNAPSHOT/connectors/ingestion/workflows/profiler/index.md

Co-authored-by: Pere Miquel Brull <peremiquelbrull@gmail.com>

* Update ingestion/tests/e2e/entity/database/test_redshift.py

* fix: ran pylint

* fix: updated redshift env var.

* fix: import linting

---------

Co-authored-by: Pere Miquel Brull <peremiquelbrull@gmail.com>
  • Loading branch information
TeddyCr and pmbrull authored Oct 25, 2023
1 parent 2058781 commit 452a33b
Show file tree
Hide file tree
Showing 31 changed files with 911 additions and 282 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/playwright-integration-tests-mysql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ jobs:
E2E_REDSHIFT_HOST_PORT: ${{ secrets.E2E_REDSHIFT_HOST_PORT }}
E2E_REDSHIFT_USERNAME: ${{ secrets.E2E_REDSHIFT_USERNAME }}
E2E_REDSHIFT_PASSWORD: ${{ secrets.E2E_REDSHIFT_PASSWORD }}
E2E_REDSHIFT_DATABASE: ${{ secrets.E2E_REDSHIFT_DATABASE }}
E2E_REDSHIFT_DB: ${{ secrets.E2E_REDSHIFT_DB }}
E2E_DRUID_HOST_PORT: ${{ secrets.E2E_DRUID_HOST_PORT }}
E2E_HIVE_HOST_PORT: ${{ secrets.E2E_HIVE_HOST_PORT }}
run: |
source env/bin/activate
make install_e2e_tests
Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/playwright-integration-tests-postgres.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ jobs:
E2E_REDSHIFT_HOST_PORT: ${{ secrets.E2E_REDSHIFT_HOST_PORT }}
E2E_REDSHIFT_USERNAME: ${{ secrets.E2E_REDSHIFT_USERNAME }}
E2E_REDSHIFT_PASSWORD: ${{ secrets.E2E_REDSHIFT_PASSWORD }}
E2E_REDSHIFT_DATABASE: ${{ secrets.E2E_REDSHIFT_DATABASE }}
E2E_REDSHIFT_DB: ${{ secrets.E2E_REDSHIFT_DB }}
E2E_DRUID_HOST_PORT: ${{ secrets.E2E_DRUID_HOST_PORT }}
E2E_HIVE_HOST_PORT: ${{ secrets.E2E_HIVE_HOST_PORT }}
run: |
source env/bin/activate
make install_e2e_tests
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ unit_ingestion: ## Run Python unit tests

.PHONY: run_e2e_tests
run_e2e_tests: ## Run e2e tests
pytest --screenshot=only-on-failure --output="ingestion/tests/e2e/artifacts" $(ARGS) --junitxml=ingestion/junit/test-results-e2e.xml ingestion/tests/e2e
pytest --screenshot=only-on-failure --output="ingestion/tests/e2e/artifacts" $(ARGS) --slowmo 5 --junitxml=ingestion/junit/test-results-e2e.xml ingestion/tests/e2e

.PHONY: run_python_tests
run_python_tests: ## Run all Python tests with coverage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from typing import Dict, List

from sqlalchemy import Column, inspect
from sqlalchemy.exc import ProgrammingError
from sqlalchemy.exc import ProgrammingError, ResourceClosedError
from sqlalchemy.orm import scoped_session

from metadata.generated.schema.entity.data.table import TableData
Expand All @@ -38,6 +38,7 @@
from metadata.profiler.orm.functions.table_metric_construct import (
table_metric_construct_factory,
)
from metadata.profiler.orm.registry import Dialects
from metadata.profiler.processor.runner import QueryRunner
from metadata.profiler.processor.sampler.sampler_factory import sampler_factory_
from metadata.utils.custom_thread_pool import CustomThreadPoolExecutor
Expand Down Expand Up @@ -258,6 +259,15 @@ def _compute_query_metrics(

row = runner.select_first_from_query(metric_query)
return dict(row)
except ResourceClosedError as exc:
# if the query returns no results, we will get a ResourceClosedError from Druid
if (
# pylint: disable=protected-access
runner._session.get_bind().dialect.name
!= Dialects.Druid
):
msg = f"Error trying to compute profile for {runner.table.__tablename__}.{column.name}: {exc}"
handle_query_exception(msg, exc, session)
except Exception as exc:
msg = f"Error trying to compute profile for {runner.table.__tablename__}.{column.name}: {exc}"
handle_query_exception(msg, exc, session)
Expand Down
9 changes: 9 additions & 0 deletions ingestion/src/metadata/profiler/metrics/static/stddev.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,15 @@ def _(element, compiler, **kw):
return "if(isNaN(stddevPop(%s)), null, stddevPop(%s))" % ((proc,) * 2)


@compiles(StdDevFn, Dialects.Druid)
def _(element, compiler, **kw): # pylint: disable=unused-argument
"""returns stdv for druid. Could not validate with our cluster
we might need to look into installing the druid-stats module
https://druid.apache.org/docs/latest/configuration/extensions/#loading-extensions
"""
return "NULL"


class StdDev(StaticMetric):
"""
STD Metric
Expand Down
1 change: 1 addition & 0 deletions ingestion/src/metadata/profiler/orm/functions/length.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def _(element, compiler, **kw):
@compiles(LenFn, Dialects.IbmDbSa)
@compiles(LenFn, Dialects.Db2)
@compiles(LenFn, Dialects.Hana)
@compiles(LenFn, Dialects.Druid)
def _(element, compiler, **kw):
return "LENGTH(%s)" % compiler.process(element.clauses, **kw)

Expand Down
8 changes: 8 additions & 0 deletions ingestion/src/metadata/profiler/orm/functions/median.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ def _(elements, compiler, **kwargs):
return f"if({null_check}({quantile_str}), null, {quantile_str})"


@compiles(MedianFn, Dialects.Druid)
def _(elements, compiler, **kwargs):
col, _, percentile = [
compiler.process(element, **kwargs) for element in elements.clauses
]
return f"APPROX_QUANTILE({col}, {percentile})"


# pylint: disable=unused-argument
@compiles(MedianFn, Dialects.Athena)
@compiles(MedianFn, Dialects.Presto)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
Helper module to handle data sampling
for the profiler
"""
import traceback
from typing import List, Optional, Union, cast

from sqlalchemy import Column, inspect, text
Expand All @@ -30,6 +31,7 @@
from metadata.profiler.orm.registry import Dialects
from metadata.profiler.processor.handle_partition import partition_filter_handler
from metadata.profiler.processor.sampler.sampler_interface import SamplerInterface
from metadata.utils.logger import profiler_interface_registry_logger
from metadata.utils.sqa_utils import (
build_query_filter,
dispatch_to_date_or_datetime,
Expand All @@ -38,6 +40,8 @@
get_value_filter,
)

logger = profiler_interface_registry_logger()

RANDOM_LABEL = "random"


Expand Down Expand Up @@ -105,7 +109,7 @@ def random_sample(self) -> Union[DeclarativeMeta, AliasedClass]:
if self._profile_sample_query:
return self._rdn_sample_from_user_query()

if not self.profile_sample:
if not self.profile_sample or int(self.profile_sample) == 100:
if self._partition_details:
return self._partitioned_table()

Expand Down Expand Up @@ -143,12 +147,23 @@ def fetch_sample_data(self, columns: Optional[List[Column]] = None) -> TableData
if col.name != RANDOM_LABEL and col.name in names
]

sqa_sample = (
self.client.query(*sqa_columns)
.select_from(rnd)
.limit(self.sample_limit)
.all()
)
try:
sqa_sample = (
self.client.query(*sqa_columns)
.select_from(rnd)
.limit(self.sample_limit)
.all()
)
except Exception:
logger.debug(
"Cannot fetch sample data with random sampling. Falling back to 100 rows."
)
logger.debug(traceback.format_exc())
sqa_columns = list(inspect(self.table).c)
sqa_sample = (
self.client.query(*sqa_columns).select_from(self.table).limit(100).all()
)

return TableData(
columns=[column.name for column in sqa_columns],
rows=[list(row) for row in sqa_sample],
Expand Down
67 changes: 66 additions & 1 deletion ingestion/tests/e2e/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,73 @@ https://playwright.dev/python/docs/intro
In the `e2e` folder you will find 2 folders and 1 file:
- `conftest.py`: defines some module scope fixture (module here is the `e2e` folder). All tests will use `init_with_redshift` by default -- ingestin metadata from a redshift service. The ingestion will only happens on the first test execution. The `create_data_consumer_user` allows tests to login as a Data Consumer and perform some actions
- `configs`: holds all the shared configuration. So far we have 2 main classes families (User and Connector) and common functions
- `entity`: holds entity related tests. It contains a subfolder per source.
- `entity`: holds entity related tests. It contains a subfolder per asset category. In the asset category folder you will find the `common_assertions.py`. This file contains all the common assertions to be ran for that specific asset.

## Install Dependencies and Run Tests
run `make install_e2e_tests`. Run `make run_e2e_tests`, you can also pass arguments such as `make run_e2e_tests ARGS="--browser webkit"` to run tests against webkit browser or `make run_e2e_tests ARGS="--headed --slowmo 100"` to run the tests in slowmo mode and head full.

## Adding a new test
The first step is to define the connector config for your source. this happens in `configs/connectors/<asset category>` folder. For a database connector, you will must ensure your class inherits from `DataBaseConnectorInterface`. You will then need to implement the `get_service()` and `set_connection()`. `get_service` specifies which service to choose from the `<assetCategory>/add-service` page of the webside and `set_connection` the different elements to configure on the connector connection config page. If you are unsure how an element can be accessed on the page you can run `playwright codegen http://localhost:8585/` -- more info [here](https://playwright.dev/python/docs/codegen). By default `DataBaseConnectorInterface` sets `self.supports_profiler_ingestion=True` which will result in the profiler ingestion to run when the test class is executed. You can `self.supports_profiler_ingestion=False` in your specific connector to override this behavior.

e.g.

```python
class DruidConnector(DataBaseConnectorInterface):
"""druid connector"""

def __init__(self, config):
super().__init__(config)
self.supports_profiler_ingestion=False

def set_connection():
...

def get_service():
...
```


Once your connector config has been created you will need to add a new test. Simply create a new file in the asset category of your choice (e.g. `entity/database/test_druid.py`). In this file create a new test class and mark this class with `@pytest.mark.usefixtures("setUpClass")` and `@pytest.mark.parametrize("setUpClass", ...)`. The first mark will make sure `setUpClass` fixture is ran before running your tests (this manage the ingestion of metadata and profiler as of Oct-25 2023) and `@pytest.mark.parametrize` will pass the right connector class to the `setUpClass` fixture. The second argument of `@pytest.mark.parametrize` should be as below
```python
[
{
"connector_obj": <connectorClassConfig>(
ConnectorTestConfig(...)
)
}
]
```

`ConnectorTestConfig` defines the configuration to use for the test. It has 2 arguments:
- `ingestion`: This allows you to define the different filtering when performing the ingestion. it expects a `ConnectorIngestionTestConfig` which will take 2 arguments:
- `metadata`: this allows you to define metadata ingestion filters. It take a `IngestionTestConfig` which takes 3 arguments:
- `database`: it expects an `IngestionFilterConfig` class which takes 2 argumenst:
- `includes`: a list of str
- `excludes`: a list of str
- `schema_`: see `database`
- `table`: see `database`
- `profiler`: see `metadata`
- `validation`: this config can be used when we need to validate expectations against specific entities. As of Oct-25 2023 it is only used in the `assert_profile_data`, `assert_sample_data_ingestion` and `assert_pii_column_auto_tagging` test functions of the profiler.

Once you have set up your class you can create your test. There are currently (as of Oct-25 2023) 5 assertions that can be performed:
- assert pipeline status are `success`. You can refer to the implementation in the existing test
- `assert_change_database_owner`: assert the owner of a data can be changed
- `assert_profile_data`: assert table profile data summary are visible
- `assert_sample_data_ingestion`: assert sample data are ingested and visible
- `assert_pii_column_auto_tagging`: assert auto PII tagging from the profiler has been performed

Note that in every test method you define the following class attributes are accessible:
- `connector_obj`: `<connectorClassConfig>`` the connector class pass to `setUpClass` in the `@pytest.mark.parametrize`
- `service_name`: `str`` the name of the service that was created for the test
- `metadata_ingestion_status`: `PipelineState` the ingestion status of the metadata pipeline
- `profiler_ingestion_status`: `PipelineState` the ingestion status of the profiler pipeline.

## Test Coverage
| **tests** | redshift | druid | hive |
|-----------------------------|:--------:|:-----:|:----:|
| metadata ingestion ||||
| profiler ingestion ||||
| change DB owner ||||
| Table Profiler Summary Data ||||
| Sample data visible ||||
| Profiler PII auto Tag ||||
2 changes: 1 addition & 1 deletion ingestion/tests/e2e/configs/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from playwright.sync_api import Page, expect

from ingestion.tests.e2e.configs.users.user import User
from .users.user import User

BASE_URL = "http://localhost:8585"

Expand Down
37 changes: 37 additions & 0 deletions ingestion/tests/e2e/configs/connectors/database/db2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""Redshift connector for e2e tests"""

import os

from playwright.sync_api import Page, expect

from .interface import DataBaseConnectorInterface


class Db2Connector(DataBaseConnectorInterface):
"""db2 connector"""

def get_service(self, page: Page):
"""get service from the service page"""
page.get_by_test_id("Db2").click()

def set_connection(self, page):
"""Set connection for redshift service"""
page.get_by_label("Username*").fill(os.environ["E2E_DB2_USERNAME"])
expect(page.get_by_label("Username*")).to_have_value(
os.environ["E2E_DB2_USERNAME"]
)

page.get_by_label("Password").fill(os.environ["E2E_DB2_PASSWORD"])
expect(page.get_by_label("Password")).to_have_value(
os.environ["E2E_DB2_PASSWORD"]
)

page.get_by_label("Host and Port*").fill(os.environ["E2E_DB2_HOST_PORT"])
expect(page.get_by_label("Host and Port*")).to_have_value(
os.environ["E2E_DB2_HOST_PORT"]
)

page.get_by_label("database*").fill(os.environ["E2E_DB2_DATABASE"])
expect(page.get_by_label("database*")).to_have_value(
os.environ["E2E_DB2_DATABASE"]
)
22 changes: 22 additions & 0 deletions ingestion/tests/e2e/configs/connectors/database/druid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""Redshift connector for e2e tests"""

import os

from playwright.sync_api import Page, expect

from .interface import DataBaseConnectorInterface


class DruidConnector(DataBaseConnectorInterface):
"""druid connector"""

def get_service(self, page: Page):
"""get service from the service page"""
page.get_by_test_id("Druid").click()

def set_connection(self, page):
"""Set connection for redshift service"""
page.get_by_label("Host and Port*").fill(os.environ["E2E_DRUID_HOST_PORT"])
expect(page.get_by_label("Host and Port*")).to_have_value(
os.environ["E2E_DRUID_HOST_PORT"]
)
24 changes: 24 additions & 0 deletions ingestion/tests/e2e/configs/connectors/database/hive.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""MySQL connector for e2e tests"""

import os

from playwright.sync_api import Page, expect

from .interface import DataBaseConnectorInterface


class HiveConnector(DataBaseConnectorInterface):
def get_service(self, page: Page):
"""get service from the service page"""
page.get_by_test_id("Hive").click()

def set_connection(self, page):
"""Set connection for redshift service"""
page.locator('[id="root\\/hostPort"]').fill(os.environ["E2E_HIVE_HOST_PORT"])
expect(page.locator('[id="root\\/hostPort"]')).to_have_value(
os.environ["E2E_HIVE_HOST_PORT"]
)

page.locator('[id="root\\/metastoreConnection__oneof_select"]').select_option(
"2"
)
Loading

0 comments on commit 452a33b

Please sign in to comment.