Skip to content

Commit

Permalink
MINOR: tests(datalake): use minio (#17805)
Browse files Browse the repository at this point in the history
* tests(datalake): use minio

1. use minio instead of moto for mimicking s3 behavior.
2. removed moto dependency as it is not compatible with aiobotocore (getmoto/moto#7070 (comment))

* - moved test_datalake_profiler_e2e.py to datalake/test_profiler
- use minio instead of moto

* fixed tests

* fixed tests

* removed default name for minio container
  • Loading branch information
sushi30 committed Sep 12, 2024
1 parent 1c6695b commit a3d6c1d
Show file tree
Hide file tree
Showing 10 changed files with 375 additions and 546 deletions.
8 changes: 2 additions & 6 deletions ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,8 @@
*COMMONS["datalake"],
},
"datalake-s3": {
# requires aiobotocore
# https://github.com/fsspec/s3fs/blob/9bf99f763edaf7026318e150c4bd3a8d18bb3a00/requirements.txt#L1
# however, the latest version of `s3fs` conflicts its `aiobotocore` dep with `boto3`'s dep on `botocore`.
# Leaving this marked to the automatic resolution to speed up installation.
"s3fs",
# vendoring 'boto3' to keep all dependencies aligned (s3fs, boto3, botocore, aiobotocore)
"s3fs[boto3]",
*COMMONS["datalake"],
},
"deltalake": {"delta-spark<=2.3.0", "deltalake~=0.17"},
Expand Down Expand Up @@ -343,7 +340,6 @@
"coverage",
# Install GE because it's not in the `all` plugin
VERSIONS["great-expectations"],
"moto~=5.0",
"basedpyright~=1.14",
"pytest==7.0.0",
"pytest-cov",
Expand Down
3 changes: 2 additions & 1 deletion ingestion/tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
from metadata.workflow.ingestion import IngestionWorkflow

if not sys.version_info >= (3, 9):
collect_ignore = ["trino", "kafka"]
# these tests use test-containers which are not supported in python 3.8
collect_ignore = ["trino", "kafka", "datalake"]


@pytest.fixture(scope="session", autouse=True)
Expand Down
2 changes: 1 addition & 1 deletion ingestion/tests/integration/containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class MinioContainerConfigs:
access_key: str = "minio"
secret_key: str = "password"
port: int = 9000
container_name: str = "test-minio"
container_name: Optional[str] = None
exposed_port: Optional[int] = None

def with_exposed_port(self, container):
Expand Down
93 changes: 46 additions & 47 deletions ingestion/tests/integration/datalake/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@
import os
from copy import deepcopy

import boto3
import pytest
from moto import mock_aws

from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.workflow.data_quality import TestSuiteWorkflow
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.profiler import ProfilerWorkflow

BUCKET_NAME = "MyBucket"
from ..containers import MinioContainerConfigs, get_minio_container

BUCKET_NAME = "my-bucket"

INGESTION_CONFIG = {
"source": {
Expand Down Expand Up @@ -77,7 +77,7 @@
"sourceConfig": {
"config": {
"type": "TestSuite",
"entityFullyQualifiedName": 'datalake_for_integration_tests.default.MyBucket."users.csv"',
"entityFullyQualifiedName": f'datalake_for_integration_tests.default.{BUCKET_NAME}."users.csv"',
}
},
},
Expand Down Expand Up @@ -128,31 +128,19 @@
}


@pytest.fixture(scope="module", autouse=True)
def aws():
with mock_aws():
yield boto3.client("s3", region_name="us-east-1")
@pytest.fixture(scope="session")
def minio_container():
with get_minio_container(MinioContainerConfigs()) as container:
yield container


@pytest.fixture(scope="class", autouse=True)
def setup_s3(request) -> None:
def setup_s3(minio_container) -> None:
# Mock our S3 bucket and ingest a file
boto3.DEFAULT_SESSION = None
request.cls.s3_client = boto3.client(
"s3",
region_name="us-west-1",
)
s3 = boto3.resource(
"s3",
region_name="us-west-1",
aws_access_key_id="fake_access_key",
aws_secret_access_key="fake_secret_key",
)
request.cls.s3_client.create_bucket(
Bucket=BUCKET_NAME,
CreateBucketConfiguration={"LocationConstraint": "us-west-1"},
)
s3.meta.client.head_bucket(Bucket=BUCKET_NAME)
client = minio_container.get_client()
if client.bucket_exists(BUCKET_NAME):
return
client.make_bucket(BUCKET_NAME)
current_dir = os.path.dirname(__file__)
resources_dir = os.path.join(current_dir, "resources")

Expand All @@ -161,23 +149,31 @@ def setup_s3(request) -> None:
for path, _, files in os.walk(resources_dir)
for filename in files
]

request.cls.s3_keys = []

for path in resources_paths:
key = os.path.relpath(path, resources_dir)
request.cls.s3_keys.append(key)
request.cls.s3_client.upload_file(Filename=path, Bucket=BUCKET_NAME, Key=key)
yield
bucket = s3.Bucket(BUCKET_NAME)
for key in bucket.objects.all():
key.delete()
bucket.delete()
client.fput_object(BUCKET_NAME, key, path)
return


@pytest.fixture(scope="class")
def ingestion_config(minio_container):
ingestion_config = deepcopy(INGESTION_CONFIG)
ingestion_config["source"]["serviceConnection"]["config"]["configSource"].update(
{
"securityConfig": {
"awsAccessKeyId": minio_container.access_key,
"awsSecretAccessKey": minio_container.secret_key,
"awsRegion": "us-west-1",
"endPointURL": f"http://localhost:{minio_container.get_exposed_port(minio_container.port)}",
}
}
)
return ingestion_config


@pytest.fixture(scope="class")
def run_ingestion(metadata):
ingestion_workflow = MetadataWorkflow.create(INGESTION_CONFIG)
def run_ingestion(metadata, ingestion_config):
ingestion_workflow = MetadataWorkflow.create(ingestion_config)
ingestion_workflow.execute()
ingestion_workflow.raise_from_status()
ingestion_workflow.stop()
Expand All @@ -188,28 +184,31 @@ def run_ingestion(metadata):
metadata.delete(DatabaseService, db_service.id, recursive=True, hard_delete=True)


@pytest.fixture
def run_test_suite_workflow(run_ingestion):
ingestion_workflow = TestSuiteWorkflow.create(DATA_QUALITY_CONFIG)
@pytest.fixture(scope="class")
def run_test_suite_workflow(run_ingestion, ingestion_config):
workflow_config = deepcopy(DATA_QUALITY_CONFIG)
workflow_config["source"]["serviceConnection"] = ingestion_config["source"][
"serviceConnection"
]
ingestion_workflow = TestSuiteWorkflow.create(workflow_config)
ingestion_workflow.execute()
ingestion_workflow.raise_from_status()
ingestion_workflow.stop()


@pytest.fixture(scope="session")
def profiler_workflow_config(workflow_config):
config = deepcopy(INGESTION_CONFIG)
config["source"]["sourceConfig"]["config"].update(
@pytest.fixture(scope="class")
def profiler_workflow_config(ingestion_config, workflow_config):
ingestion_config["source"]["sourceConfig"]["config"].update(
{
"type": "Profiler",
}
)
config["processor"] = {
ingestion_config["processor"] = {
"type": "orm-profiler",
"config": {},
}
config["workflowConfig"] = workflow_config
return config
ingestion_config["workflowConfig"] = workflow_config
return ingestion_config


@pytest.fixture()
Expand Down
Loading

0 comments on commit a3d6c1d

Please sign in to comment.