diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index b0962c2..e26e115 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -39,7 +39,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v4 with: - python-version: 3.11 + python-version: 3.9 cache: 'pip' - run: pip install -r ./test/requirements.txt @@ -49,6 +49,6 @@ jobs: - name: Use lakeFS with S3 gateway run: | - python ./test/run-test.py \ + pytest ./test/test_iceberg.py -s \ --storage_namespace s3://iceberg-lakefs-testing/${{ github.run_number }}-s3-gateway/${{ steps.unique.outputs.value }} \ --repository gateway-test \ diff --git a/test/conftest.py b/test/conftest.py new file mode 100644 index 0000000..2f7ab59 --- /dev/null +++ b/test/conftest.py @@ -0,0 +1,63 @@ +import pyspark +from pyspark.sql import SparkSession +from pyspark.conf import SparkConf +import pytest + +import lakefs_sdk +from lakefs_sdk.client import LakeFSClient +from lakefs_sdk.models import * + +LAKEFS_ACCESS_KEY = 'AKIAIOSFODNN7EXAMPLE' +LAKEFS_SECRET_KEY = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY' +MOCK_EMAIL = "test@acme.co" + +def pytest_addoption(parser): + parser.addoption( + '--storage_namespace', action='store', default='local://' + ) + parser.addoption( + '--repository', action='store', default='example' + ) + + +@pytest.fixture +def lakefs_repo(request): + return request.config.getoption('--repository') + + +@pytest.fixture(scope="session") +def spark(pytestconfig): + repo_name = pytestconfig.getoption('--repository') + spark_config = SparkConf() + spark_config.set("spark.sql.catalog.lakefs", "org.apache.iceberg.spark.SparkCatalog") + spark_config.set("spark.sql.catalog.lakefs.catalog-impl", "io.lakefs.iceberg.LakeFSCatalog") + spark_config.set("spark.sql.catalog.lakefs.warehouse", f"lakefs://{repo_name}") + spark_config.set("spark.sql.catalog.lakefs.cache-enabled", "false") + spark_config.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") + spark_config.set("spark.hadoop.fs.s3a.endpoint", "http://localhost:8000") + spark_config.set("spark.hadoop.fs.s3a.access.key", LAKEFS_ACCESS_KEY) + spark_config.set("spark.hadoop.fs.s3a.secret.key", LAKEFS_SECRET_KEY) + spark_config.set("spark.hadoop.fs.s3a.path.style.access", "true") + spark_config.set("spark.jars.packages", "io.lakefs:lakefs-iceberg:1.0-SNAPSHOT,org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.0,org.apache.hadoop:hadoop-aws:3.3.4,org.apache.hadoop:hadoop-client-api:3.3.4") + + spark = SparkSession.builder.config(conf=spark_config).getOrCreate() + yield spark + spark.stop() + + +@pytest.fixture(scope="session") +def lfs_client(pytestconfig): + lfs_client = LakeFSClient( + lakefs_sdk.Configuration(username=LAKEFS_ACCESS_KEY, + password=LAKEFS_SECRET_KEY, + host='http://localhost:8000')) + + # Setup lakeFS + repo_name = pytestconfig.getoption('--repository') + storage_namespace = pytestconfig.getoption('--storage_namespace') + lfs_client.internal_api.setup_comm_prefs(CommPrefsInput(feature_updates=False, security_updates=False, email=MOCK_EMAIL)) + lfs_client.internal_api.setup(Setup(username="admin", + key=AccessKeyCredentials(access_key_id=LAKEFS_ACCESS_KEY, secret_access_key=LAKEFS_SECRET_KEY))) + lfs_client.repositories_api.create_repository( + RepositoryCreation(name=repo_name, storage_namespace=storage_namespace)) + return lfs_client diff --git a/test/requirements.txt b/test/requirements.txt index 2b45441..6dfaec4 100644 --- a/test/requirements.txt +++ b/test/requirements.txt @@ -1,2 +1,3 @@ -lakefs_client==0.104.0 -pyspark==3.3.2 \ No newline at end of file +lakefs_sdk==1.1.0 +pyspark==3.3.2 +pytest==7.4.0 diff --git a/test/run-test.py b/test/run-test.py deleted file mode 100644 index 8855f7b..0000000 --- a/test/run-test.py +++ /dev/null @@ -1,66 +0,0 @@ -import argparse -import sys - -import lakefs_client -from lakefs_client.client import LakeFSClient -from lakefs_client.models import * -from lakefs_client.model.access_key_credentials import AccessKeyCredentials -from lakefs_client.model.comm_prefs_input import CommPrefsInput -from lakefs_client.model.setup import Setup -from lakefs_client.model.repository_creation import RepositoryCreation -import pyspark -from pyspark.sql import SparkSession -from pyspark.conf import SparkConf - - -MOCK_EMAIL = "test@acme.co" - - -def main(): - parser = argparse.ArgumentParser() - parser.add_argument("--storage_namespace", default="local://", required=True) - parser.add_argument("--repository", default="example") - lakefs_access_key = 'AKIAIOSFODNN7EXAMPLE' - lakefs_secret_key = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY' - - args = parser.parse_args() - - lfs_client = LakeFSClient( - lakefs_client.Configuration(username=lakefs_access_key, - password=lakefs_secret_key, - host='http://localhost:8000')) - - # Setup lakeFS - repo_name = args.repository - lfs_client.config.setup_comm_prefs(CommPrefsInput(feature_updates=False, security_updates=False, email=MOCK_EMAIL)) - lfs_client.config.setup(Setup(username="lynn", - key=AccessKeyCredentials(access_key_id= lakefs_access_key, secret_access_key= lakefs_secret_key))) - - lfs_client.repositories.create_repository( - RepositoryCreation(name=repo_name, storage_namespace=args.storage_namespace)) - - spark_config = SparkConf() - spark_config.set("spark.sql.catalog.lakefs", "org.apache.iceberg.spark.SparkCatalog") - spark_config.set("spark.sql.catalog.lakefs.catalog-impl", "io.lakefs.iceberg.LakeFSCatalog") - spark_config.set("spark.sql.catalog.lakefs.warehouse", f"lakefs://{repo_name}") - spark_config.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") - spark_config.set("spark.hadoop.fs.s3a.endpoint", "http://localhost:8000") - spark_config.set("spark.hadoop.fs.s3a.access.key", lakefs_access_key) - spark_config.set("spark.hadoop.fs.s3a.secret.key", lakefs_secret_key) - spark_config.set("spark.hadoop.fs.s3a.path.style.access", "true") - spark_config.set("spark.jars.packages", "io.lakefs:lakefs-iceberg:1.0-SNAPSHOT,org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.0,org.apache.hadoop:hadoop-aws:3.3.4,org.apache.hadoop:hadoop-client-api:3.3.4") - - spark = SparkSession.builder.config(conf=spark_config).getOrCreate() - - df = spark.read.option("inferSchema","true").option("multiline","true").csv("./test/data-sets/film_permits.csv") - df.write.saveAsTable("lakefs.main.nyc.permits") - - #Commit, create a new branch, check that the tables are the same - lfs_client.commits.commit(repo_name, "main", CommitCreation(message="Initial data load")) - lfs_client.branches.create_branch(repo_name, BranchCreation(name="dev", source="main")) - df_main = spark.read.table("lakefs.main.nyc.permits") - df_dev = spark.read.table("lakefs.dev.nyc.permits") - assert (df_main.schema == df_dev.schema) and (df_main.collect() == df_dev.collect()), "main and dev tables should be equal" - -if __name__ == '__main__': - main() diff --git a/test/test_iceberg.py b/test/test_iceberg.py new file mode 100644 index 0000000..ad12e5a --- /dev/null +++ b/test/test_iceberg.py @@ -0,0 +1,59 @@ +import lakefs_sdk.client +from lakefs_sdk.models import * +from pyspark.sql.types import StructType, StructField, StringType, IntegerType + + +def get_data(spark): + data_set = [(1, "James", "Smith", 32, "M"), + (2, "Michael","Rose", 35, "M"), + (3, "Robert", "Williams", 41, "M"), + (4, "Maria", "Jones", 36, "F"), + (5, "Jen","Brown", 44, "F"), + (6, "Monika","Geller", 31, "F")] + + schema = StructType([StructField("id", StringType(), True), + StructField("firstname", StringType(), True), + StructField("lastname", StringType(), True), + StructField("age", IntegerType(), True), + StructField("gender", StringType(), True), + ]) + df = spark.createDataFrame(data=data_set,schema=schema) + return df + + +def test_diff_two_same_branches(spark, lfs_client: lakefs_sdk.client.LakeFSClient, lakefs_repo): + df = get_data(spark) + df.write.saveAsTable("lakefs.main.company.workers") + lfs_client.commits_api.commit(lakefs_repo, "main", CommitCreation(message="Initial data load")) + + #Create a new branch, check that the tables are the same + lfs_client.branches_api.create_branch(lakefs_repo, BranchCreation(name="dev", source="main")) + df_main = spark.read.table("lakefs.main.company.workers") + df_dev = spark.read.table("lakefs.dev.company.workers") + assert (df_main.schema == df_dev.schema) and (set(df_main.collect()) == set(df_dev.collect())), "main and dev tables should be equal" + + +def test_delete_on_dev_and_merge(spark, lfs_client: lakefs_sdk.client.LakeFSClient, lakefs_repo): + lfs_client.branches_api.create_branch(lakefs_repo, BranchCreation(name="test1", source="main")) + lfs_client.branches_api.create_branch(lakefs_repo, BranchCreation(name="test2", source="test1")) + spark.sql("DELETE FROM lakefs.test2.company.workers WHERE id = 6") + lfs_client.commits_api.commit(lakefs_repo, "test2", CommitCreation(message="delete one row")) + lfs_client.refs_api.merge_into_branch(lakefs_repo, "test2", "test1") + df_source = spark.read.table("lakefs.test1.company.workers") + df_dest = spark.read.table("lakefs.test2.company.workers") + assert (df_source.schema == df_dest.schema) and (set(df_source.collect()) == set(df_dest.collect())), "test1 and test2 tables should be equal" + + +def test_multiple_changes_and_merge(spark, lfs_client: lakefs_sdk.client.LakeFSClient, lakefs_repo): + lfs_client.branches_api.create_branch(lakefs_repo, BranchCreation(name="test3", source="main")) + lfs_client.branches_api.create_branch(lakefs_repo, BranchCreation(name="test4", source="test3")) + spark.sql("DELETE FROM lakefs.test4.company.workers WHERE id = 6") + spark.sql("DELETE FROM lakefs.test4.company.workers WHERE id = 5") + spark.sql("INSERT INTO lakefs.test4.company.workers VALUES (7, 'Jhon', 'Smith', 33, 'M')") + spark.sql("DELETE FROM lakefs.test4.company.workers WHERE id = 4") + spark.sql("INSERT INTO lakefs.test4.company.workers VALUES (8, 'Marta', 'Green', 31, 'F')") + lfs_client.commits_api.commit(lakefs_repo, "test4", CommitCreation(message="Some changes")) + lfs_client.refs_api.merge_into_branch(lakefs_repo, "test4", "test3") + df_source = spark.read.table("lakefs.test3.company.workers") + df_dest = spark.read.table("lakefs.test4.company.workers") + assert (df_source.schema == df_dest.schema) and (set(df_source.collect()) == set(df_dest.collect())), "test3 and test4 tables should be equal"