Skip to content

Commit

Permalink
added pytest
Browse files Browse the repository at this point in the history
  • Loading branch information
lynnro314 committed Jul 30, 2023
1 parent 8b4f37b commit 523b8de
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 39 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,6 @@ jobs:

- name: Use lakeFS with S3 gateway
run: |
python ./test/run-test.py \
pytest ./test/test_iceberg.py \
--storage_namespace s3://iceberg-lakefs-testing/${{ github.run_number }}-s3-gateway/${{ steps.unique.outputs.value }} \
--repository gateway-test \
84 changes: 47 additions & 37 deletions test/run-test.py → test/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import argparse
import sys
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

import pytest

import lakefs_client
from lakefs_client.client import LakeFSClient
Expand All @@ -8,59 +11,66 @@
from lakefs_client.model.comm_prefs_input import CommPrefsInput
from lakefs_client.model.setup import Setup
from lakefs_client.model.repository_creation import RepositoryCreation
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf


LAKEFS_ACCESS_KEY = 'AKIAIOSFODNN7EXAMPLE'
LAKEFS_SECRET_KEY = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'
MOCK_EMAIL = "test@acme.co"

def pytest_addoption(parser):
parser.addoption(
'--storage_namespace', action='store', default='local://'
)
parser.addoption(
'--repository', action='store', default='example'
)

def main():
parser = argparse.ArgumentParser()
parser.add_argument("--storage_namespace", default="local://", required=True)
parser.add_argument("--repository", default="example")
lakefs_access_key = 'AKIAIOSFODNN7EXAMPLE'
lakefs_secret_key = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'

args = parser.parse_args()
@pytest.fixture
def lakefs_repo(request):
return request.config.getoption('--repository')

lfs_client = LakeFSClient(
lakefs_client.Configuration(username=lakefs_access_key,
password=lakefs_secret_key,
host='http://localhost:8000'))
# @pytest.fixture
# def lakeFS_args(request):
# args = {}
# args['storage_namespace'] = request.config.getoption('--storage_namespace')
# args['repository'] = request.config.getoption('--repository')
# args['lakefs_access_key'] = 'AKIAIOSFODNN7EXAMPLE'
# args['lakefs_secret_key'] = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'
# return args

# Setup lakeFS
repo_name = args.repository
lfs_client.config.setup_comm_prefs(CommPrefsInput(feature_updates=False, security_updates=False, email=MOCK_EMAIL))
lfs_client.config.setup(Setup(username="lynn",
key=AccessKeyCredentials(access_key_id= lakefs_access_key, secret_access_key= lakefs_secret_key)))

lfs_client.repositories.create_repository(
RepositoryCreation(name=repo_name, storage_namespace=args.storage_namespace))

@pytest.fixture(scope="session")
def spark(pytestconfig):
repo_name = pytestconfig.getoption('--repository')
spark_config = SparkConf()
spark_config.set("spark.sql.catalog.lakefs", "org.apache.iceberg.spark.SparkCatalog")
spark_config.set("spark.sql.catalog.lakefs.catalog-impl", "io.lakefs.iceberg.LakeFSCatalog")
spark_config.set("spark.sql.catalog.lakefs.warehouse", f"lakefs://{repo_name}")
spark_config.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
spark_config.set("spark.hadoop.fs.s3a.endpoint", "http://localhost:8000")
spark_config.set("spark.hadoop.fs.s3a.access.key", lakefs_access_key)
spark_config.set("spark.hadoop.fs.s3a.secret.key", lakefs_secret_key)
spark_config.set("spark.hadoop.fs.s3a.access.key", LAKEFS_ACCESS_KEY)
spark_config.set("spark.hadoop.fs.s3a.secret.key", LAKEFS_SECRET_KEY)
spark_config.set("spark.hadoop.fs.s3a.path.style.access", "true")
spark_config.set("spark.jars.packages", "io.lakefs:lakefs-iceberg:1.0-SNAPSHOT,org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.0,org.apache.hadoop:hadoop-aws:3.3.4,org.apache.hadoop:hadoop-client-api:3.3.4")

spark = SparkSession.builder.config(conf=spark_config).getOrCreate()
yield spark
spark.stop()

df = spark.read.option("inferSchema","true").option("multiline","true").csv("./test/data-sets/film_permits.csv")
df.write.saveAsTable("lakefs.main.nyc.permits")
@pytest.fixture(scope="session")
def lakefs_client(pytestconfig):
lfs_client = LakeFSClient(
lakefs_client.Configuration(username=LAKEFS_ACCESS_KEY,
password=LAKEFS_SECRET_KEY,
host='http://localhost:8000'))

#Commit, create a new branch, check that the tables are the same
lfs_client.commits.commit(repo_name, "main", CommitCreation(message="Initial data load"))
lfs_client.branches.create_branch(repo_name, BranchCreation(name="dev", source="main"))
df_main = spark.read.table("lakefs.main.nyc.permits")
df_dev = spark.read.table("lakefs.dev.nyc.permits")
assert (df_main.schema == df_dev.schema) and (df_main.collect() == df_dev.collect()), "main and dev tables should be equal"
# Setup lakeFS
repo_name = pytestconfig.getoption('--repository')
storage_namespace = pytestconfig.getoption('--storage_namespace')
lfs_client.config.setup_comm_prefs(CommPrefsInput(feature_updates=False, security_updates=False, email=MOCK_EMAIL))
lfs_client.config.setup(Setup(username="lynn",
key=AccessKeyCredentials(access_key_id=LAKEFS_ACCESS_KEY, secret_access_key=LAKEFS_SECRET_KEY)))

if __name__ == '__main__':
main()
lfs_client.repositories.create_repository(
RepositoryCreation(name=repo_name, storage_namespace=storage_namespace))
return lfs_client
3 changes: 2 additions & 1 deletion test/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
lakefs_client==0.104.0
pyspark==3.3.2
pyspark==3.3.2
pytest==7.4.0
26 changes: 26 additions & 0 deletions test/test_iceberg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import argparse
import sys
import pytest

import lakefs_client
from lakefs_client.client import LakeFSClient
from lakefs_client.models import *
from lakefs_client.model.access_key_credentials import AccessKeyCredentials
from lakefs_client.model.comm_prefs_input import CommPrefsInput
from lakefs_client.model.setup import Setup
from lakefs_client.model.repository_creation import RepositoryCreation
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

def test_diff_two_same_branches(spark, lakefs_client, lakefs_repo):
print("repo name ", lakefs_repo)
df = spark.read.option("inferSchema","true").option("multiline","true").csv("./test/data-sets/film_permits.csv")
df.write.saveAsTable("lakefs.main.nyc.permits")

#Commit, create a new branch, check that the tables are the same
lakefs_client.commits.commit(lakefs_repo, "main", CommitCreation(message="Initial data load"))
lakefs_client.branches.create_branch(lakefs_repo, BranchCreation(name="dev", source="main"))
df_main = spark.read.table("lakefs.main.nyc.permits")
df_dev = spark.read.table("lakefs.dev.nyc.permits")
assert (df_main.schema == df_dev.schema) and (df_main.collect() == df_dev.collect()), "main and dev tables should be equal"

0 comments on commit 523b8de

Please sign in to comment.