Skip to content

Commit

Permalink
added pytest to integration tests(#36)
Browse files Browse the repository at this point in the history
* added pytest and test cases to the integration test
  • Loading branch information
lynnro314 authored Nov 7, 2023
1 parent 9a90b39 commit 7f59306
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 70 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: 3.11
python-version: 3.9
cache: 'pip'
- run: pip install -r ./test/requirements.txt

Expand All @@ -49,6 +49,6 @@ jobs:

- name: Use lakeFS with S3 gateway
run: |
python ./test/run-test.py \
pytest ./test/test_iceberg.py -s \
--storage_namespace s3://iceberg-lakefs-testing/${{ github.run_number }}-s3-gateway/${{ steps.unique.outputs.value }} \
--repository gateway-test \
63 changes: 63 additions & 0 deletions test/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
import pytest

import lakefs_sdk
from lakefs_sdk.client import LakeFSClient
from lakefs_sdk.models import *

LAKEFS_ACCESS_KEY = 'AKIAIOSFODNN7EXAMPLE'
LAKEFS_SECRET_KEY = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'
MOCK_EMAIL = "test@acme.co"

def pytest_addoption(parser):
parser.addoption(
'--storage_namespace', action='store', default='local://'
)
parser.addoption(
'--repository', action='store', default='example'
)


@pytest.fixture
def lakefs_repo(request):
return request.config.getoption('--repository')


@pytest.fixture(scope="session")
def spark(pytestconfig):
repo_name = pytestconfig.getoption('--repository')
spark_config = SparkConf()
spark_config.set("spark.sql.catalog.lakefs", "org.apache.iceberg.spark.SparkCatalog")
spark_config.set("spark.sql.catalog.lakefs.catalog-impl", "io.lakefs.iceberg.LakeFSCatalog")
spark_config.set("spark.sql.catalog.lakefs.warehouse", f"lakefs://{repo_name}")
spark_config.set("spark.sql.catalog.lakefs.cache-enabled", "false")
spark_config.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
spark_config.set("spark.hadoop.fs.s3a.endpoint", "http://localhost:8000")
spark_config.set("spark.hadoop.fs.s3a.access.key", LAKEFS_ACCESS_KEY)
spark_config.set("spark.hadoop.fs.s3a.secret.key", LAKEFS_SECRET_KEY)
spark_config.set("spark.hadoop.fs.s3a.path.style.access", "true")
spark_config.set("spark.jars.packages", "io.lakefs:lakefs-iceberg:1.0-SNAPSHOT,org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.0,org.apache.hadoop:hadoop-aws:3.3.4,org.apache.hadoop:hadoop-client-api:3.3.4")

spark = SparkSession.builder.config(conf=spark_config).getOrCreate()
yield spark
spark.stop()


@pytest.fixture(scope="session")
def lfs_client(pytestconfig):
lfs_client = LakeFSClient(
lakefs_sdk.Configuration(username=LAKEFS_ACCESS_KEY,
password=LAKEFS_SECRET_KEY,
host='http://localhost:8000'))

# Setup lakeFS
repo_name = pytestconfig.getoption('--repository')
storage_namespace = pytestconfig.getoption('--storage_namespace')
lfs_client.internal_api.setup_comm_prefs(CommPrefsInput(feature_updates=False, security_updates=False, email=MOCK_EMAIL))
lfs_client.internal_api.setup(Setup(username="admin",
key=AccessKeyCredentials(access_key_id=LAKEFS_ACCESS_KEY, secret_access_key=LAKEFS_SECRET_KEY)))
lfs_client.repositories_api.create_repository(
RepositoryCreation(name=repo_name, storage_namespace=storage_namespace))
return lfs_client
5 changes: 3 additions & 2 deletions test/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
lakefs_client==0.104.0
pyspark==3.3.2
lakefs_sdk==1.1.0
pyspark==3.3.2
pytest==7.4.0
66 changes: 0 additions & 66 deletions test/run-test.py

This file was deleted.

59 changes: 59 additions & 0 deletions test/test_iceberg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import lakefs_sdk.client
from lakefs_sdk.models import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType


def get_data(spark):
data_set = [(1, "James", "Smith", 32, "M"),
(2, "Michael","Rose", 35, "M"),
(3, "Robert", "Williams", 41, "M"),
(4, "Maria", "Jones", 36, "F"),
(5, "Jen","Brown", 44, "F"),
(6, "Monika","Geller", 31, "F")]

schema = StructType([StructField("id", StringType(), True),
StructField("firstname", StringType(), True),
StructField("lastname", StringType(), True),
StructField("age", IntegerType(), True),
StructField("gender", StringType(), True),
])
df = spark.createDataFrame(data=data_set,schema=schema)
return df


def test_diff_two_same_branches(spark, lfs_client: lakefs_sdk.client.LakeFSClient, lakefs_repo):
df = get_data(spark)
df.write.saveAsTable("lakefs.main.company.workers")
lfs_client.commits_api.commit(lakefs_repo, "main", CommitCreation(message="Initial data load"))

#Create a new branch, check that the tables are the same
lfs_client.branches_api.create_branch(lakefs_repo, BranchCreation(name="dev", source="main"))
df_main = spark.read.table("lakefs.main.company.workers")
df_dev = spark.read.table("lakefs.dev.company.workers")
assert (df_main.schema == df_dev.schema) and (set(df_main.collect()) == set(df_dev.collect())), "main and dev tables should be equal"


def test_delete_on_dev_and_merge(spark, lfs_client: lakefs_sdk.client.LakeFSClient, lakefs_repo):
lfs_client.branches_api.create_branch(lakefs_repo, BranchCreation(name="test1", source="main"))
lfs_client.branches_api.create_branch(lakefs_repo, BranchCreation(name="test2", source="test1"))
spark.sql("DELETE FROM lakefs.test2.company.workers WHERE id = 6")
lfs_client.commits_api.commit(lakefs_repo, "test2", CommitCreation(message="delete one row"))
lfs_client.refs_api.merge_into_branch(lakefs_repo, "test2", "test1")
df_source = spark.read.table("lakefs.test1.company.workers")
df_dest = spark.read.table("lakefs.test2.company.workers")
assert (df_source.schema == df_dest.schema) and (set(df_source.collect()) == set(df_dest.collect())), "test1 and test2 tables should be equal"


def test_multiple_changes_and_merge(spark, lfs_client: lakefs_sdk.client.LakeFSClient, lakefs_repo):
lfs_client.branches_api.create_branch(lakefs_repo, BranchCreation(name="test3", source="main"))
lfs_client.branches_api.create_branch(lakefs_repo, BranchCreation(name="test4", source="test3"))
spark.sql("DELETE FROM lakefs.test4.company.workers WHERE id = 6")
spark.sql("DELETE FROM lakefs.test4.company.workers WHERE id = 5")
spark.sql("INSERT INTO lakefs.test4.company.workers VALUES (7, 'Jhon', 'Smith', 33, 'M')")
spark.sql("DELETE FROM lakefs.test4.company.workers WHERE id = 4")
spark.sql("INSERT INTO lakefs.test4.company.workers VALUES (8, 'Marta', 'Green', 31, 'F')")
lfs_client.commits_api.commit(lakefs_repo, "test4", CommitCreation(message="Some changes"))
lfs_client.refs_api.merge_into_branch(lakefs_repo, "test4", "test3")
df_source = spark.read.table("lakefs.test3.company.workers")
df_dest = spark.read.table("lakefs.test4.company.workers")
assert (df_source.schema == df_dest.schema) and (set(df_source.collect()) == set(df_dest.collect())), "test3 and test4 tables should be equal"

0 comments on commit 7f59306

Please sign in to comment.