diff --git a/ingestion/setup.py b/ingestion/setup.py index aa5bd5aa27cf..84e2f6025274 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -207,11 +207,8 @@ *COMMONS["datalake"], }, "datalake-s3": { - # requires aiobotocore - # https://github.com/fsspec/s3fs/blob/9bf99f763edaf7026318e150c4bd3a8d18bb3a00/requirements.txt#L1 - # however, the latest version of `s3fs` conflicts its `aiobotocore` dep with `boto3`'s dep on `botocore`. - # Leaving this marked to the automatic resolution to speed up installation. - "s3fs", + # vendoring 'boto3' to keep all dependencies aligned (s3fs, boto3, botocore, aiobotocore) + "s3fs[boto3]", *COMMONS["datalake"], }, "deltalake": {"delta-spark<=2.3.0", "deltalake~=0.17"}, @@ -343,7 +340,6 @@ "coverage", # Install GE because it's not in the `all` plugin VERSIONS["great-expectations"], - "moto~=5.0", "basedpyright~=1.14", "pytest==7.0.0", "pytest-cov", diff --git a/ingestion/tests/integration/conftest.py b/ingestion/tests/integration/conftest.py index 3c987e5aa68f..81f19a2ea878 100644 --- a/ingestion/tests/integration/conftest.py +++ b/ingestion/tests/integration/conftest.py @@ -15,7 +15,8 @@ from metadata.workflow.ingestion import IngestionWorkflow if not sys.version_info >= (3, 9): - collect_ignore = ["trino", "kafka"] + # these tests use test-containers which are not supported in python 3.8 + collect_ignore = ["trino", "kafka", "datalake"] @pytest.fixture(scope="session", autouse=True) diff --git a/ingestion/tests/integration/containers.py b/ingestion/tests/integration/containers.py index 9483f2468a4d..3bf46b799c18 100644 --- a/ingestion/tests/integration/containers.py +++ b/ingestion/tests/integration/containers.py @@ -53,7 +53,7 @@ class MinioContainerConfigs: access_key: str = "minio" secret_key: str = "password" port: int = 9000 - container_name: str = "test-minio" + container_name: Optional[str] = None exposed_port: Optional[int] = None def with_exposed_port(self, container): diff --git a/ingestion/tests/integration/datalake/conftest.py b/ingestion/tests/integration/datalake/conftest.py index 1ed88fa8ffb6..337bea1081af 100644 --- a/ingestion/tests/integration/datalake/conftest.py +++ b/ingestion/tests/integration/datalake/conftest.py @@ -14,16 +14,16 @@ import os from copy import deepcopy -import boto3 import pytest -from moto import mock_aws from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.workflow.data_quality import TestSuiteWorkflow from metadata.workflow.metadata import MetadataWorkflow from metadata.workflow.profiler import ProfilerWorkflow -BUCKET_NAME = "MyBucket" +from ..containers import MinioContainerConfigs, get_minio_container + +BUCKET_NAME = "my-bucket" INGESTION_CONFIG = { "source": { @@ -77,7 +77,7 @@ "sourceConfig": { "config": { "type": "TestSuite", - "entityFullyQualifiedName": 'datalake_for_integration_tests.default.MyBucket."users.csv"', + "entityFullyQualifiedName": f'datalake_for_integration_tests.default.{BUCKET_NAME}."users.csv"', } }, }, @@ -128,31 +128,19 @@ } -@pytest.fixture(scope="module", autouse=True) -def aws(): - with mock_aws(): - yield boto3.client("s3", region_name="us-east-1") +@pytest.fixture(scope="session") +def minio_container(): + with get_minio_container(MinioContainerConfigs()) as container: + yield container @pytest.fixture(scope="class", autouse=True) -def setup_s3(request) -> None: +def setup_s3(minio_container) -> None: # Mock our S3 bucket and ingest a file - boto3.DEFAULT_SESSION = None - request.cls.s3_client = boto3.client( - "s3", - region_name="us-west-1", - ) - s3 = boto3.resource( - "s3", - region_name="us-west-1", - aws_access_key_id="fake_access_key", - aws_secret_access_key="fake_secret_key", - ) - request.cls.s3_client.create_bucket( - Bucket=BUCKET_NAME, - CreateBucketConfiguration={"LocationConstraint": "us-west-1"}, - ) - s3.meta.client.head_bucket(Bucket=BUCKET_NAME) + client = minio_container.get_client() + if client.bucket_exists(BUCKET_NAME): + return + client.make_bucket(BUCKET_NAME) current_dir = os.path.dirname(__file__) resources_dir = os.path.join(current_dir, "resources") @@ -161,23 +149,31 @@ def setup_s3(request) -> None: for path, _, files in os.walk(resources_dir) for filename in files ] - - request.cls.s3_keys = [] - for path in resources_paths: key = os.path.relpath(path, resources_dir) - request.cls.s3_keys.append(key) - request.cls.s3_client.upload_file(Filename=path, Bucket=BUCKET_NAME, Key=key) - yield - bucket = s3.Bucket(BUCKET_NAME) - for key in bucket.objects.all(): - key.delete() - bucket.delete() + client.fput_object(BUCKET_NAME, key, path) + return + + +@pytest.fixture(scope="class") +def ingestion_config(minio_container): + ingestion_config = deepcopy(INGESTION_CONFIG) + ingestion_config["source"]["serviceConnection"]["config"]["configSource"].update( + { + "securityConfig": { + "awsAccessKeyId": minio_container.access_key, + "awsSecretAccessKey": minio_container.secret_key, + "awsRegion": "us-west-1", + "endPointURL": f"http://localhost:{minio_container.get_exposed_port(minio_container.port)}", + } + } + ) + return ingestion_config @pytest.fixture(scope="class") -def run_ingestion(metadata): - ingestion_workflow = MetadataWorkflow.create(INGESTION_CONFIG) +def run_ingestion(metadata, ingestion_config): + ingestion_workflow = MetadataWorkflow.create(ingestion_config) ingestion_workflow.execute() ingestion_workflow.raise_from_status() ingestion_workflow.stop() @@ -188,28 +184,31 @@ def run_ingestion(metadata): metadata.delete(DatabaseService, db_service.id, recursive=True, hard_delete=True) -@pytest.fixture -def run_test_suite_workflow(run_ingestion): - ingestion_workflow = TestSuiteWorkflow.create(DATA_QUALITY_CONFIG) +@pytest.fixture(scope="class") +def run_test_suite_workflow(run_ingestion, ingestion_config): + workflow_config = deepcopy(DATA_QUALITY_CONFIG) + workflow_config["source"]["serviceConnection"] = ingestion_config["source"][ + "serviceConnection" + ] + ingestion_workflow = TestSuiteWorkflow.create(workflow_config) ingestion_workflow.execute() ingestion_workflow.raise_from_status() ingestion_workflow.stop() -@pytest.fixture(scope="session") -def profiler_workflow_config(workflow_config): - config = deepcopy(INGESTION_CONFIG) - config["source"]["sourceConfig"]["config"].update( +@pytest.fixture(scope="class") +def profiler_workflow_config(ingestion_config, workflow_config): + ingestion_config["source"]["sourceConfig"]["config"].update( { "type": "Profiler", } ) - config["processor"] = { + ingestion_config["processor"] = { "type": "orm-profiler", "config": {}, } - config["workflowConfig"] = workflow_config - return config + ingestion_config["workflowConfig"] = workflow_config + return ingestion_config @pytest.fixture() diff --git a/ingestion/tests/integration/orm_profiler/resources/profiler_test_.csv b/ingestion/tests/integration/datalake/resources/profiler_test_.csv similarity index 100% rename from ingestion/tests/integration/orm_profiler/resources/profiler_test_.csv rename to ingestion/tests/integration/datalake/resources/profiler_test_.csv diff --git a/ingestion/tests/integration/datalake/test_datalake_profiler_e2e.py b/ingestion/tests/integration/datalake/test_datalake_profiler_e2e.py new file mode 100644 index 000000000000..1a8244348a43 --- /dev/null +++ b/ingestion/tests/integration/datalake/test_datalake_profiler_e2e.py @@ -0,0 +1,311 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test Datalake Profiler workflow + +To run this we need OpenMetadata server up and running. + +No sample data is required beforehand +""" +import pytest + +from ingestion.tests.integration.datalake.conftest import BUCKET_NAME +from metadata.generated.schema.entity.data.table import ColumnProfile, Table +from metadata.utils.time_utils import ( + get_beginning_of_day_timestamp_mill, + get_end_of_day_timestamp_mill, +) +from metadata.workflow.profiler import ProfilerWorkflow +from metadata.workflow.workflow_output_handler import WorkflowResultStatus + + +@pytest.fixture(scope="class", autouse=True) +def before_each(run_ingestion): + pass + + +class TestDatalakeProfilerTestE2E: + """datalake profiler E2E test""" + + def test_datalake_profiler_workflow(self, ingestion_config, metadata): + ingestion_config["source"]["sourceConfig"]["config"].update( + { + "type": "Profiler", + } + ) + ingestion_config["processor"] = { + "type": "orm-profiler", + "config": {}, + } + + profiler_workflow = ProfilerWorkflow.create(ingestion_config) + profiler_workflow.execute() + status = profiler_workflow.result_status() + profiler_workflow.stop() + + assert status == WorkflowResultStatus.SUCCESS + + table_profile = metadata.get_profile_data( + f'{ingestion_config["source"]["serviceName"]}.default.{BUCKET_NAME}."profiler_test_.csv"', + get_beginning_of_day_timestamp_mill(), + get_end_of_day_timestamp_mill(), + ) + + column_profile = metadata.get_profile_data( + f'{ingestion_config["source"]["serviceName"]}.default.{BUCKET_NAME}."profiler_test_.csv".first_name', + get_beginning_of_day_timestamp_mill(), + get_end_of_day_timestamp_mill(), + profile_type=ColumnProfile, + ) + + assert table_profile.entities + assert column_profile.entities + + def test_values_partitioned_datalake_profiler_workflow( + self, metadata, ingestion_config + ): + """Test partitioned datalake profiler workflow""" + ingestion_config["source"]["sourceConfig"]["config"].update( + { + "type": "Profiler", + } + ) + ingestion_config["processor"] = { + "type": "orm-profiler", + "config": { + "tableConfig": [ + { + "fullyQualifiedName": f'{ingestion_config["source"]["serviceName"]}.default.{BUCKET_NAME}."profiler_test_.csv"', + "partitionConfig": { + "enablePartitioning": "true", + "partitionColumnName": "first_name", + "partitionIntervalType": "COLUMN-VALUE", + "partitionValues": ["John"], + }, + } + ] + }, + } + + profiler_workflow = ProfilerWorkflow.create(ingestion_config) + profiler_workflow.execute() + status = profiler_workflow.result_status() + profiler_workflow.stop() + + assert status == WorkflowResultStatus.SUCCESS + + table = metadata.get_by_name( + entity=Table, + fqn=f'{ingestion_config["source"]["serviceName"]}.default.{BUCKET_NAME}."profiler_test_.csv"', + fields=["tableProfilerConfig"], + nullable=False, + ) + + profile = metadata.get_latest_table_profile(table.fullyQualifiedName).profile + + assert profile.rowCount == 1.0 + + def test_datetime_partitioned_datalake_profiler_workflow( + self, ingestion_config, metadata + ): + """Test partitioned datalake profiler workflow""" + ingestion_config["source"]["sourceConfig"]["config"].update( + { + "type": "Profiler", + } + ) + ingestion_config["processor"] = { + "type": "orm-profiler", + "config": { + "tableConfig": [ + { + "fullyQualifiedName": f'{ingestion_config["source"]["serviceName"]}.default.{BUCKET_NAME}."profiler_test_.csv"', + "partitionConfig": { + "enablePartitioning": "true", + "partitionColumnName": "birthdate", + "partitionIntervalType": "TIME-UNIT", + "partitionIntervalUnit": "YEAR", + "partitionInterval": 35, + }, + } + ], + }, + } + + profiler_workflow = ProfilerWorkflow.create(ingestion_config) + profiler_workflow.execute() + status = profiler_workflow.result_status() + profiler_workflow.stop() + + assert status == WorkflowResultStatus.SUCCESS + + table = metadata.get_by_name( + entity=Table, + fqn=f'{ingestion_config["source"]["serviceName"]}.default.{BUCKET_NAME}."profiler_test_.csv"', + fields=["tableProfilerConfig"], + ) + + profile = metadata.get_latest_table_profile(table.fullyQualifiedName).profile + + assert profile.rowCount == 2.0 + + def test_integer_range_partitioned_datalake_profiler_workflow( + self, ingestion_config, metadata + ): + """Test partitioned datalake profiler workflow""" + ingestion_config["source"]["sourceConfig"]["config"].update( + { + "type": "Profiler", + } + ) + ingestion_config["processor"] = { + "type": "orm-profiler", + "config": { + "tableConfig": [ + { + "fullyQualifiedName": f'{ingestion_config["source"]["serviceName"]}.default.{BUCKET_NAME}."profiler_test_.csv"', + "profileSample": 100, + "partitionConfig": { + "enablePartitioning": "true", + "partitionColumnName": "age", + "partitionIntervalType": "INTEGER-RANGE", + "partitionIntegerRangeStart": 35, + "partitionIntegerRangeEnd": 44, + }, + } + ], + }, + } + + profiler_workflow = ProfilerWorkflow.create(ingestion_config) + profiler_workflow.execute() + status = profiler_workflow.result_status() + profiler_workflow.stop() + + assert status == WorkflowResultStatus.SUCCESS + + table = metadata.get_by_name( + entity=Table, + fqn=f'{ingestion_config["source"]["serviceName"]}.default.{BUCKET_NAME}."profiler_test_.csv"', + fields=["tableProfilerConfig"], + ) + + profile = metadata.get_latest_table_profile(table.fullyQualifiedName).profile + + assert profile.rowCount == 2.0 + + def test_datalake_profiler_workflow_with_custom_profiler_config( + self, metadata, ingestion_config + ): + """Test custom profiler config return expected sample and metric computation""" + profiler_metrics = [ + "MIN", + "MAX", + "MEAN", + "MEDIAN", + ] + id_metrics = ["MIN", "MAX"] + non_metric_values = ["name", "timestamp"] + + ingestion_config["source"]["sourceConfig"]["config"].update( + { + "type": "Profiler", + } + ) + ingestion_config["processor"] = { + "type": "orm-profiler", + "config": { + "profiler": { + "name": "ingestion_profiler", + "metrics": profiler_metrics, + }, + "tableConfig": [ + { + "fullyQualifiedName": f'{ingestion_config["source"]["serviceName"]}.default.{BUCKET_NAME}."profiler_test_.csv"', + "columnConfig": { + "includeColumns": [ + {"columnName": "id", "metrics": id_metrics}, + {"columnName": "age"}, + ] + }, + } + ], + }, + } + + profiler_workflow = ProfilerWorkflow.create(ingestion_config) + profiler_workflow.execute() + status = profiler_workflow.result_status() + profiler_workflow.stop() + + assert status == WorkflowResultStatus.SUCCESS + + table = metadata.get_by_name( + entity=Table, + fqn=f'{ingestion_config["source"]["serviceName"]}.default.{BUCKET_NAME}."profiler_test_.csv"', + fields=["tableProfilerConfig"], + ) + + id_profile = metadata.get_profile_data( + f'{ingestion_config["source"]["serviceName"]}.default.{BUCKET_NAME}."profiler_test_.csv".id', + get_beginning_of_day_timestamp_mill(), + get_end_of_day_timestamp_mill(), + profile_type=ColumnProfile, + ).entities + + latest_id_profile = max(id_profile, key=lambda o: o.timestamp.root) + + id_metric_ln = 0 + for metric_name, metric in latest_id_profile: + if metric_name.upper() in id_metrics: + assert metric is not None + id_metric_ln += 1 + else: + assert metric is None if metric_name not in non_metric_values else True + + assert id_metric_ln == len(id_metrics) + + age_profile = metadata.get_profile_data( + f'{ingestion_config["source"]["serviceName"]}.default.{BUCKET_NAME}."profiler_test_.csv".age', + get_beginning_of_day_timestamp_mill(), + get_end_of_day_timestamp_mill(), + profile_type=ColumnProfile, + ).entities + + latest_age_profile = max(age_profile, key=lambda o: o.timestamp.root) + + age_metric_ln = 0 + for metric_name, metric in latest_age_profile: + if metric_name.upper() in profiler_metrics: + assert metric is not None + age_metric_ln += 1 + else: + assert metric is None if metric_name not in non_metric_values else True + + assert age_metric_ln == len(profiler_metrics) + + latest_exc_timestamp = latest_age_profile.timestamp.root + first_name_profile = metadata.get_profile_data( + f'{ingestion_config["source"]["serviceName"]}.default.{BUCKET_NAME}."profiler_test_.csv".first_name_profile', + get_beginning_of_day_timestamp_mill(), + get_end_of_day_timestamp_mill(), + profile_type=ColumnProfile, + ).entities + + assert not [ + p for p in first_name_profile if p.timestamp.root == latest_exc_timestamp + ] + + sample_data = metadata.get_sample_data(table) + assert sorted([c.root for c in sample_data.sampleData.columns]) == sorted( + ["id", "age"] + ) diff --git a/ingestion/tests/integration/datalake/test_ingestion.py b/ingestion/tests/integration/datalake/test_ingestion.py index 58c1847fee07..1bfff44f1c7c 100644 --- a/ingestion/tests/integration/datalake/test_ingestion.py +++ b/ingestion/tests/integration/datalake/test_ingestion.py @@ -13,6 +13,7 @@ import pytest +from ingestion.tests.integration.datalake.conftest import BUCKET_NAME from metadata.generated.schema.entity.data.table import DataType, Table from metadata.ingestion.ometa.models import EntityList from metadata.ingestion.ometa.ometa_api import OpenMetadata @@ -37,11 +38,15 @@ def test_ingestion(self, run_ingestion): ) # type: ignore entities = resp.entities - assert len(entities) == 4 + assert len(entities) == 5 names = [entity.name.root for entity in entities] - assert {"names.json", "names.jsonl", "new_users.parquet", "users.csv"} == set( - names - ) + assert { + "names.json", + "names.jsonl", + "new_users.parquet", + "users.csv", + "profiler_test_.csv", + } == set(names) for entity in entities: columns = entity.columns @@ -53,7 +58,7 @@ def test_profiler(self, run_profiler): """Also excluding the test for parquet files until the above is fixed""" csv_ = self.metadata.get_by_name( entity=Table, - fqn='datalake_for_integration_tests.default.MyBucket."users.csv"', + fqn=f'datalake_for_integration_tests.default.{BUCKET_NAME}."users.csv"', fields=["tableProfilerConfig"], ) # parquet_ = self.metadata.get_by_name( @@ -63,13 +68,13 @@ def test_profiler(self, run_profiler): # ) json_ = self.metadata.get_by_name( entity=Table, - fqn='datalake_for_integration_tests.default.MyBucket."names.json"', + fqn=f'datalake_for_integration_tests.default.{BUCKET_NAME}."names.json"', fields=["tableProfilerConfig"], ) jsonl_ = self.metadata.get_by_name( entity=Table, - fqn='datalake_for_integration_tests.default.MyBucket."names.jsonl"', + fqn=f'datalake_for_integration_tests.default.{BUCKET_NAME}."names.jsonl"', fields=["tableProfilerConfig"], ) diff --git a/ingestion/tests/integration/orm_profiler/test_datalake_profiler_e2e.py b/ingestion/tests/integration/orm_profiler/test_datalake_profiler_e2e.py deleted file mode 100644 index 7106be330ee0..000000000000 --- a/ingestion/tests/integration/orm_profiler/test_datalake_profiler_e2e.py +++ /dev/null @@ -1,440 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Test Datalake Profiler workflow - -To run this we need OpenMetadata server up and running. - -No sample data is required beforehand -""" - -import os -from copy import deepcopy -from pathlib import Path -from unittest import TestCase - -import boto3 -import botocore -from moto import mock_aws - -from metadata.generated.schema.entity.data.table import ColumnProfile, Table -from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( - OpenMetadataConnection, -) -from metadata.generated.schema.entity.services.databaseService import DatabaseService -from metadata.generated.schema.security.client.openMetadataJWTClientConfig import ( - OpenMetadataJWTClientConfig, -) -from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.utils.time_utils import ( - get_beginning_of_day_timestamp_mill, - get_end_of_day_timestamp_mill, -) -from metadata.workflow.metadata import MetadataWorkflow -from metadata.workflow.profiler import ProfilerWorkflow -from metadata.workflow.workflow_output_handler import WorkflowResultStatus - -SERVICE_NAME = Path(__file__).stem -REGION = "us-west-1" -BUCKET_NAME = "MyBucket" -INGESTION_CONFIG = { - "source": { - "type": "datalake", - "serviceName": SERVICE_NAME, - "serviceConnection": { - "config": { - "type": "Datalake", - "configSource": { - "securityConfig": { - "awsAccessKeyId": "fake_access_key", - "awsSecretAccessKey": "fake_secret_key", - "awsRegion": REGION, - } - }, - "bucketName": f"{BUCKET_NAME}", - } - }, - "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, - }, - "sink": {"type": "metadata-rest", "config": {}}, - "workflowConfig": { - "openMetadataServerConfig": { - "hostPort": "http://localhost:8585/api", - "authProvider": "openmetadata", - "securityConfig": { - "jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" - }, - } - }, -} - - -@mock_aws -class DatalakeProfilerTestE2E(TestCase): - """datalake profiler E2E test""" - - @classmethod - def setUpClass(cls) -> None: - server_config = OpenMetadataConnection( - hostPort="http://localhost:8585/api", - authProvider="openmetadata", - securityConfig=OpenMetadataJWTClientConfig( - jwtToken="eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" - ), - ) # type: ignore - cls.metadata = OpenMetadata(server_config) - - def setUp(self) -> None: - # Mock our S3 bucket and ingest a file - boto3.DEFAULT_SESSION = None - self.client = boto3.client( - "s3", - region_name=REGION, - ) - - # check that we are not running our test against a real bucket - try: - s3 = boto3.resource( - "s3", - region_name=REGION, - aws_access_key_id="fake_access_key", - aws_secret_access_key="fake_secret_key", - ) - s3.meta.client.head_bucket(Bucket=BUCKET_NAME) - except botocore.exceptions.ClientError: - pass - else: - err = f"{BUCKET_NAME} should not exist." - raise EnvironmentError(err) - self.client.create_bucket( - Bucket=BUCKET_NAME, - CreateBucketConfiguration={"LocationConstraint": REGION}, - ) - current_dir = os.path.dirname(__file__) - resources_dir = os.path.join(current_dir, "resources") - - resources_paths = [ - os.path.join(path, filename) - for path, _, files in os.walk(resources_dir) - for filename in files - ] - - self.s3_keys = [] - - for path in resources_paths: - key = os.path.relpath(path, resources_dir) - self.s3_keys.append(key) - self.client.upload_file(Filename=path, Bucket=BUCKET_NAME, Key=key) - - # Ingest our S3 data - ingestion_workflow = MetadataWorkflow.create(INGESTION_CONFIG) - ingestion_workflow.execute() - ingestion_workflow.raise_from_status() - ingestion_workflow.print_status() - ingestion_workflow.stop() - - def test_datalake_profiler_workflow(self): - workflow_config = deepcopy(INGESTION_CONFIG) - workflow_config["source"]["sourceConfig"]["config"].update( - { - "type": "Profiler", - } - ) - workflow_config["processor"] = { - "type": "orm-profiler", - "config": {}, - } - - profiler_workflow = ProfilerWorkflow.create(workflow_config) - profiler_workflow.execute() - status = profiler_workflow.result_status() - profiler_workflow.stop() - - assert status == WorkflowResultStatus.SUCCESS - - table_profile = self.metadata.get_profile_data( - f'{SERVICE_NAME}.default.MyBucket."profiler_test_.csv"', - get_beginning_of_day_timestamp_mill(), - get_end_of_day_timestamp_mill(), - ) - - column_profile = self.metadata.get_profile_data( - f'{SERVICE_NAME}.default.MyBucket."profiler_test_.csv".first_name', - get_beginning_of_day_timestamp_mill(), - get_end_of_day_timestamp_mill(), - profile_type=ColumnProfile, - ) - - assert table_profile.entities - assert column_profile.entities - - def test_values_partitioned_datalake_profiler_workflow(self): - """Test partitioned datalake profiler workflow""" - workflow_config = deepcopy(INGESTION_CONFIG) - workflow_config["source"]["sourceConfig"]["config"].update( - { - "type": "Profiler", - } - ) - workflow_config["processor"] = { - "type": "orm-profiler", - "config": { - "tableConfig": [ - { - "fullyQualifiedName": f'{SERVICE_NAME}.default.MyBucket."profiler_test_.csv"', - "partitionConfig": { - "enablePartitioning": "true", - "partitionColumnName": "first_name", - "partitionIntervalType": "COLUMN-VALUE", - "partitionValues": ["John"], - }, - } - ] - }, - } - - profiler_workflow = ProfilerWorkflow.create(workflow_config) - profiler_workflow.execute() - status = profiler_workflow.result_status() - profiler_workflow.stop() - - assert status == WorkflowResultStatus.SUCCESS - - table = self.metadata.get_by_name( - entity=Table, - fqn=f'{SERVICE_NAME}.default.MyBucket."profiler_test_.csv"', - fields=["tableProfilerConfig"], - ) - - profile = self.metadata.get_latest_table_profile( - table.fullyQualifiedName - ).profile - - assert profile.rowCount == 1.0 - - def test_datetime_partitioned_datalake_profiler_workflow(self): - """Test partitioned datalake profiler workflow""" - workflow_config = deepcopy(INGESTION_CONFIG) - workflow_config["source"]["sourceConfig"]["config"].update( - { - "type": "Profiler", - } - ) - workflow_config["processor"] = { - "type": "orm-profiler", - "config": { - "tableConfig": [ - { - "fullyQualifiedName": f'{SERVICE_NAME}.default.MyBucket."profiler_test_.csv"', - "partitionConfig": { - "enablePartitioning": "true", - "partitionColumnName": "birthdate", - "partitionIntervalType": "TIME-UNIT", - "partitionIntervalUnit": "YEAR", - "partitionInterval": 35, - }, - } - ], - }, - } - - profiler_workflow = ProfilerWorkflow.create(workflow_config) - profiler_workflow.execute() - status = profiler_workflow.result_status() - profiler_workflow.stop() - - assert status == WorkflowResultStatus.SUCCESS - - table = self.metadata.get_by_name( - entity=Table, - fqn=f'{SERVICE_NAME}.default.MyBucket."profiler_test_.csv"', - fields=["tableProfilerConfig"], - ) - - profile = self.metadata.get_latest_table_profile( - table.fullyQualifiedName - ).profile - - assert profile.rowCount == 2.0 - - def test_integer_range_partitioned_datalake_profiler_workflow(self): - """Test partitioned datalake profiler workflow""" - workflow_config = deepcopy(INGESTION_CONFIG) - workflow_config["source"]["sourceConfig"]["config"].update( - { - "type": "Profiler", - } - ) - workflow_config["processor"] = { - "type": "orm-profiler", - "config": { - "tableConfig": [ - { - "fullyQualifiedName": f'{SERVICE_NAME}.default.MyBucket."profiler_test_.csv"', - "profileSample": 100, - "partitionConfig": { - "enablePartitioning": "true", - "partitionColumnName": "age", - "partitionIntervalType": "INTEGER-RANGE", - "partitionIntegerRangeStart": 35, - "partitionIntegerRangeEnd": 44, - }, - } - ], - }, - } - - profiler_workflow = ProfilerWorkflow.create(workflow_config) - profiler_workflow.execute() - status = profiler_workflow.result_status() - profiler_workflow.stop() - - assert status == WorkflowResultStatus.SUCCESS - - table = self.metadata.get_by_name( - entity=Table, - fqn=f'{SERVICE_NAME}.default.MyBucket."profiler_test_.csv"', - fields=["tableProfilerConfig"], - ) - - profile = self.metadata.get_latest_table_profile( - table.fullyQualifiedName - ).profile - - assert profile.rowCount == 2.0 - - def test_datalake_profiler_workflow_with_custom_profiler_config(self): - """Test custom profiler config return expected sample and metric computation""" - profiler_metrics = [ - "MIN", - "MAX", - "MEAN", - "MEDIAN", - ] - id_metrics = ["MIN", "MAX"] - non_metric_values = ["name", "timestamp"] - - workflow_config = deepcopy(INGESTION_CONFIG) - workflow_config["source"]["sourceConfig"]["config"].update( - { - "type": "Profiler", - } - ) - workflow_config["processor"] = { - "type": "orm-profiler", - "config": { - "profiler": { - "name": "ingestion_profiler", - "metrics": profiler_metrics, - }, - "tableConfig": [ - { - "fullyQualifiedName": f'{SERVICE_NAME}.default.MyBucket."profiler_test_.csv"', - "columnConfig": { - "includeColumns": [ - {"columnName": "id", "metrics": id_metrics}, - {"columnName": "age"}, - ] - }, - } - ], - }, - } - - profiler_workflow = ProfilerWorkflow.create(workflow_config) - profiler_workflow.execute() - status = profiler_workflow.result_status() - profiler_workflow.stop() - - assert status == WorkflowResultStatus.SUCCESS - - table = self.metadata.get_by_name( - entity=Table, - fqn=f'{SERVICE_NAME}.default.MyBucket."profiler_test_.csv"', - fields=["tableProfilerConfig"], - ) - - id_profile = self.metadata.get_profile_data( - f'{SERVICE_NAME}.default.MyBucket."profiler_test_.csv".id', - get_beginning_of_day_timestamp_mill(), - get_end_of_day_timestamp_mill(), - profile_type=ColumnProfile, - ).entities - - latest_id_profile = max(id_profile, key=lambda o: o.timestamp.root) - - id_metric_ln = 0 - for metric_name, metric in latest_id_profile: - if metric_name.upper() in id_metrics: - assert metric is not None - id_metric_ln += 1 - else: - assert metric is None if metric_name not in non_metric_values else True - - assert id_metric_ln == len(id_metrics) - - age_profile = self.metadata.get_profile_data( - f'{SERVICE_NAME}.default.MyBucket."profiler_test_.csv".age', - get_beginning_of_day_timestamp_mill(), - get_end_of_day_timestamp_mill(), - profile_type=ColumnProfile, - ).entities - - latest_age_profile = max(age_profile, key=lambda o: o.timestamp.root) - - age_metric_ln = 0 - for metric_name, metric in latest_age_profile: - if metric_name.upper() in profiler_metrics: - assert metric is not None - age_metric_ln += 1 - else: - assert metric is None if metric_name not in non_metric_values else True - - assert age_metric_ln == len(profiler_metrics) - - latest_exc_timestamp = latest_age_profile.timestamp.root - first_name_profile = self.metadata.get_profile_data( - f'{SERVICE_NAME}.default.MyBucket."profiler_test_.csv".first_name_profile', - get_beginning_of_day_timestamp_mill(), - get_end_of_day_timestamp_mill(), - profile_type=ColumnProfile, - ).entities - - assert not [ - p for p in first_name_profile if p.timestamp.root == latest_exc_timestamp - ] - - sample_data = self.metadata.get_sample_data(table) - assert sorted([c.root for c in sample_data.sampleData.columns]) == sorted( - ["id", "age"] - ) - - def tearDown(self): - s3 = boto3.resource( - "s3", - region_name=REGION, - ) - bucket = s3.Bucket(BUCKET_NAME) - for key in bucket.objects.all(): - key.delete() - bucket.delete() - - service_id = str( - self.metadata.get_by_name(entity=DatabaseService, fqn=SERVICE_NAME).id.root - ) - - self.metadata.delete( - entity=DatabaseService, - entity_id=service_id, - recursive=True, - hard_delete=True, - ) diff --git a/ingestion/tests/integration/sources/database/delta_lake/conftest.py b/ingestion/tests/integration/sources/database/delta_lake/conftest.py index 3fc5ac318910..a11a33076c92 100644 --- a/ingestion/tests/integration/sources/database/delta_lake/conftest.py +++ b/ingestion/tests/integration/sources/database/delta_lake/conftest.py @@ -36,7 +36,7 @@ def with_exposed_port(self, minio): ] = f"http://localhost:{self.minio_config.exposed_port}" -@pytest.fixture(scope="session") +@pytest.fixture(scope="module") def deltalake_storage_environment(): config = DeltaLakeStorageTestConfig() minio = get_minio_container(config.minio_config) diff --git a/ingestion/tests/unit/profiler/pandas/test_custom_metrics.py b/ingestion/tests/unit/profiler/pandas/test_custom_metrics.py index 0270750c702f..cac724d17e1a 100644 --- a/ingestion/tests/unit/profiler/pandas/test_custom_metrics.py +++ b/ingestion/tests/unit/profiler/pandas/test_custom_metrics.py @@ -18,10 +18,7 @@ from unittest.mock import patch from uuid import uuid4 -import boto3 -import botocore import pandas as pd -from moto import mock_aws from metadata.generated.schema.entity.data.table import Column as EntityColumn from metadata.generated.schema.entity.data.table import ColumnName, DataType, Table @@ -43,7 +40,6 @@ REGION = "us-west-1" -@mock_aws class MetricsTest(TestCase): """ Run checks on different metrics @@ -103,45 +99,6 @@ class MetricsTest(TestCase): ) def setUp(self): - # Mock our S3 bucket and ingest a file - boto3.DEFAULT_SESSION = None - self.client = boto3.client( - "s3", - region_name=REGION, - ) - - # check that we are not running our test against a real bucket - try: - s3 = boto3.resource( - "s3", - region_name=REGION, - aws_access_key_id="fake_access_key", - aws_secret_access_key="fake_secret_key", - ) - s3.meta.client.head_bucket(Bucket=BUCKET_NAME) - except botocore.exceptions.ClientError: - pass - else: - err = f"{BUCKET_NAME} should not exist." - raise EnvironmentError(err) - self.client.create_bucket( - Bucket=BUCKET_NAME, - CreateBucketConfiguration={"LocationConstraint": REGION}, - ) - - resources_paths = [ - os.path.join(path, filename) - for path, _, files in os.walk(self.resources_dir) - for filename in files - ] - - self.s3_keys = [] - - for path in resources_paths: - key = os.path.relpath(path, self.resources_dir) - self.s3_keys.append(key) - self.client.upload_file(Filename=path, Bucket=BUCKET_NAME, Key=key) - with patch( "metadata.mixins.pandas.pandas_mixin.fetch_dataframe", return_value=self.dfs,