diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index b0962c2..dfaaf59 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -49,6 +49,6 @@ jobs: - name: Use lakeFS with S3 gateway run: | - python ./test/run-test.py \ + pytest ./test/test_iceberg.py \ --storage_namespace s3://iceberg-lakefs-testing/${{ github.run_number }}-s3-gateway/${{ steps.unique.outputs.value }} \ --repository gateway-test \ diff --git a/test/run-test.py b/test/conftest.py similarity index 50% rename from test/run-test.py rename to test/conftest.py index 8855f7b..f0f86d4 100644 --- a/test/run-test.py +++ b/test/conftest.py @@ -1,5 +1,8 @@ -import argparse -import sys +import pyspark +from pyspark.sql import SparkSession +from pyspark.conf import SparkConf + +import pytest import lakefs_client from lakefs_client.client import LakeFSClient @@ -8,59 +11,66 @@ from lakefs_client.model.comm_prefs_input import CommPrefsInput from lakefs_client.model.setup import Setup from lakefs_client.model.repository_creation import RepositoryCreation -import pyspark -from pyspark.sql import SparkSession -from pyspark.conf import SparkConf - +LAKEFS_ACCESS_KEY = 'AKIAIOSFODNN7EXAMPLE' +LAKEFS_SECRET_KEY = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY' MOCK_EMAIL = "test@acme.co" +def pytest_addoption(parser): + parser.addoption( + '--storage_namespace', action='store', default='local://' + ) + parser.addoption( + '--repository', action='store', default='example' + ) -def main(): - parser = argparse.ArgumentParser() - parser.add_argument("--storage_namespace", default="local://", required=True) - parser.add_argument("--repository", default="example") - lakefs_access_key = 'AKIAIOSFODNN7EXAMPLE' - lakefs_secret_key = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY' - args = parser.parse_args() +@pytest.fixture +def lakefs_repo(request): + return request.config.getoption('--repository') - lfs_client = LakeFSClient( - lakefs_client.Configuration(username=lakefs_access_key, - password=lakefs_secret_key, - host='http://localhost:8000')) +# @pytest.fixture +# def lakeFS_args(request): +# args = {} +# args['storage_namespace'] = request.config.getoption('--storage_namespace') +# args['repository'] = request.config.getoption('--repository') +# args['lakefs_access_key'] = 'AKIAIOSFODNN7EXAMPLE' +# args['lakefs_secret_key'] = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY' +# return args - # Setup lakeFS - repo_name = args.repository - lfs_client.config.setup_comm_prefs(CommPrefsInput(feature_updates=False, security_updates=False, email=MOCK_EMAIL)) - lfs_client.config.setup(Setup(username="lynn", - key=AccessKeyCredentials(access_key_id= lakefs_access_key, secret_access_key= lakefs_secret_key))) - - lfs_client.repositories.create_repository( - RepositoryCreation(name=repo_name, storage_namespace=args.storage_namespace)) +@pytest.fixture(scope="session") +def spark(pytestconfig): + repo_name = pytestconfig.getoption('--repository') spark_config = SparkConf() spark_config.set("spark.sql.catalog.lakefs", "org.apache.iceberg.spark.SparkCatalog") spark_config.set("spark.sql.catalog.lakefs.catalog-impl", "io.lakefs.iceberg.LakeFSCatalog") spark_config.set("spark.sql.catalog.lakefs.warehouse", f"lakefs://{repo_name}") spark_config.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") spark_config.set("spark.hadoop.fs.s3a.endpoint", "http://localhost:8000") - spark_config.set("spark.hadoop.fs.s3a.access.key", lakefs_access_key) - spark_config.set("spark.hadoop.fs.s3a.secret.key", lakefs_secret_key) + spark_config.set("spark.hadoop.fs.s3a.access.key", LAKEFS_ACCESS_KEY) + spark_config.set("spark.hadoop.fs.s3a.secret.key", LAKEFS_SECRET_KEY) spark_config.set("spark.hadoop.fs.s3a.path.style.access", "true") spark_config.set("spark.jars.packages", "io.lakefs:lakefs-iceberg:1.0-SNAPSHOT,org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.0,org.apache.hadoop:hadoop-aws:3.3.4,org.apache.hadoop:hadoop-client-api:3.3.4") spark = SparkSession.builder.config(conf=spark_config).getOrCreate() + yield spark + spark.stop() - df = spark.read.option("inferSchema","true").option("multiline","true").csv("./test/data-sets/film_permits.csv") - df.write.saveAsTable("lakefs.main.nyc.permits") +@pytest.fixture(scope="session") +def lfs_client(pytestconfig): + lfs_client = LakeFSClient( + lakefs_client.Configuration(username=LAKEFS_ACCESS_KEY, + password=LAKEFS_SECRET_KEY, + host='http://localhost:8000')) - #Commit, create a new branch, check that the tables are the same - lfs_client.commits.commit(repo_name, "main", CommitCreation(message="Initial data load")) - lfs_client.branches.create_branch(repo_name, BranchCreation(name="dev", source="main")) - df_main = spark.read.table("lakefs.main.nyc.permits") - df_dev = spark.read.table("lakefs.dev.nyc.permits") - assert (df_main.schema == df_dev.schema) and (df_main.collect() == df_dev.collect()), "main and dev tables should be equal" + # Setup lakeFS + repo_name = pytestconfig.getoption('--repository') + storage_namespace = pytestconfig.getoption('--storage_namespace') + lfs_client.config.setup_comm_prefs(CommPrefsInput(feature_updates=False, security_updates=False, email=MOCK_EMAIL)) + lfs_client.config.setup(Setup(username="lynn", + key=AccessKeyCredentials(access_key_id=LAKEFS_ACCESS_KEY, secret_access_key=LAKEFS_SECRET_KEY))) -if __name__ == '__main__': - main() + lfs_client.repositories.create_repository( + RepositoryCreation(name=repo_name, storage_namespace=storage_namespace)) + return lfs_client \ No newline at end of file diff --git a/test/requirements.txt b/test/requirements.txt index 2b45441..be19ac2 100644 --- a/test/requirements.txt +++ b/test/requirements.txt @@ -1,2 +1,3 @@ lakefs_client==0.104.0 -pyspark==3.3.2 \ No newline at end of file +pyspark==3.3.2 +pytest==7.4.0 \ No newline at end of file diff --git a/test/test_iceberg.py b/test/test_iceberg.py new file mode 100644 index 0000000..7e702f4 --- /dev/null +++ b/test/test_iceberg.py @@ -0,0 +1,24 @@ +import pytest + +import lakefs_client +from lakefs_client.client import LakeFSClient +from lakefs_client.models import * +from lakefs_client.model.access_key_credentials import AccessKeyCredentials +from lakefs_client.model.comm_prefs_input import CommPrefsInput +from lakefs_client.model.setup import Setup +from lakefs_client.model.repository_creation import RepositoryCreation +import pyspark +from pyspark.sql import SparkSession +from pyspark.conf import SparkConf + +def test_diff_two_same_branches(spark, lfs_client, lakefs_repo): + print("repo name ", lakefs_repo) + df = spark.read.option("inferSchema","true").option("multiline","true").csv("./test/data-sets/film_permits.csv") + df.write.saveAsTable("lakefs.main.nyc.permits") + + #Commit, create a new branch, check that the tables are the same + lfs_client.commits.commit(lakefs_repo, "main", CommitCreation(message="Initial data load")) + lfs_client.branches.create_branch(lakefs_repo, BranchCreation(name="dev", source="main")) + df_main = spark.read.table("lakefs.main.nyc.permits") + df_dev = spark.read.table("lakefs.dev.nyc.permits") + assert (df_main.schema == df_dev.schema) and (df_main.collect() == df_dev.collect()), "main and dev tables should be equal"