Skip to content

Commit

Permalink
changed to sdk client, create repo for every test
Browse files Browse the repository at this point in the history
  • Loading branch information
lynnro314 committed Nov 2, 2023
1 parent d296829 commit 93972c6
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 73 deletions.
36 changes: 11 additions & 25 deletions test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,9 @@

import pytest

import lakefs_client
from lakefs_client.client import LakeFSClient
from lakefs_client.models import *
from lakefs_client.model.access_key_credentials import AccessKeyCredentials
from lakefs_client.model.comm_prefs_input import CommPrefsInput
from lakefs_client.model.setup import Setup
from lakefs_client.model.repository_creation import RepositoryCreation
import lakefs_sdk
from lakefs_sdk.client import LakeFSClient
from lakefs_sdk.models import *

LAKEFS_ACCESS_KEY = 'AKIAIOSFODNN7EXAMPLE'
LAKEFS_SECRET_KEY = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'
Expand All @@ -29,15 +25,9 @@ def pytest_addoption(parser):
def lakefs_repo(request):
return request.config.getoption('--repository')

# @pytest.fixture
# def lakeFS_args(request):
# args = {}
# args['storage_namespace'] = request.config.getoption('--storage_namespace')
# args['repository'] = request.config.getoption('--repository')
# args['lakefs_access_key'] = 'AKIAIOSFODNN7EXAMPLE'
# args['lakefs_secret_key'] = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'
# return args

@pytest.fixture
def storage_namespace(request):
return request.config.getoption('--storage_namespace')

@pytest.fixture(scope="session")
def spark(pytestconfig):
Expand All @@ -46,6 +36,7 @@ def spark(pytestconfig):
spark_config.set("spark.sql.catalog.lakefs", "org.apache.iceberg.spark.SparkCatalog")
spark_config.set("spark.sql.catalog.lakefs.catalog-impl", "io.lakefs.iceberg.LakeFSCatalog")
spark_config.set("spark.sql.catalog.lakefs.warehouse", f"lakefs://{repo_name}")
spark_config.set("spark.sql.catalog.lakefs.cache-enabled", "false")
spark_config.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
spark_config.set("spark.hadoop.fs.s3a.endpoint", "http://localhost:8000")
spark_config.set("spark.hadoop.fs.s3a.access.key", LAKEFS_ACCESS_KEY)
Expand All @@ -60,17 +51,12 @@ def spark(pytestconfig):
@pytest.fixture(scope="session")
def lfs_client(pytestconfig):
lfs_client = LakeFSClient(
lakefs_client.Configuration(username=LAKEFS_ACCESS_KEY,
lakefs_sdk.Configuration(username=LAKEFS_ACCESS_KEY,
password=LAKEFS_SECRET_KEY,
host='http://localhost:8000'))

# Setup lakeFS
repo_name = pytestconfig.getoption('--repository')
storage_namespace = pytestconfig.getoption('--storage_namespace')
lfs_client.config.setup_comm_prefs(CommPrefsInput(feature_updates=False, security_updates=False, email=MOCK_EMAIL))
lfs_client.config.setup(Setup(username="lynn",
key=AccessKeyCredentials(access_key_id=LAKEFS_ACCESS_KEY, secret_access_key=LAKEFS_SECRET_KEY)))

lfs_client.repositories.create_repository(
RepositoryCreation(name=repo_name, storage_namespace=storage_namespace))
lfs_client.internal_api.setup_comm_prefs(CommPrefsInput(feature_updates=False, security_updates=False, email=MOCK_EMAIL))
# lfs_client.internal_api.setup(Setup(username="lynn",
# key=AccessKeyCredentials(access_key_id=LAKEFS_ACCESS_KEY, secret_access_key=LAKEFS_SECRET_KEY)))
return lfs_client
4 changes: 2 additions & 2 deletions test/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
lakefs_client==0.104.0
lakefs_sdk==1.1.0
pyspark==3.3.2
pytest==7.4.0
pytest==7.4.0
84 changes: 38 additions & 46 deletions test/test_iceberg.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,5 @@
import pytest

import lakefs_client
from lakefs_client.client import LakeFSClient
from lakefs_client.models import *
from lakefs_client.model.access_key_credentials import AccessKeyCredentials
from lakefs_client.model.comm_prefs_input import CommPrefsInput
from lakefs_client.model.setup import Setup
from lakefs_client.model.repository_creation import RepositoryCreation
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
import lakefs_sdk.client
from lakefs_sdk.models import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType


Expand All @@ -28,52 +18,54 @@ def get_data(spark):
StructField("gender", StringType(), True),
])
df = spark.createDataFrame(data=data_set,schema=schema)
df.printSchema()
df.show(truncate=False)
return df

def test_diff_two_same_branches(spark, lfs_client, lakefs_repo):
print("repo name ", lakefs_repo)

def initiate_repo_with_data(spark, lfs_client: lakefs_sdk.client.LakeFSClient, repo_name, storage_namespace, test_name):
storage_namespace = f"{storage_namespace}/{test_name}"
lfs_client.repositories_api.create_repository(
RepositoryCreation(name=repo_name, storage_namespace=storage_namespace))
df = get_data(spark)
df.write.saveAsTable("lakefs.main.company.workers")
lfs_client.commits_api.commit(repo_name, "main", CommitCreation(message="Initial data load"))


#Commit, create a new branch, check that the tables are the same
lfs_client.commits.commit(lakefs_repo, "main", CommitCreation(message="Initial data load"))
lfs_client.branches.create_branch(lakefs_repo, BranchCreation(name="dev", source="main"))
def test_diff_two_same_branches(spark, lfs_client: lakefs_sdk.client.LakeFSClient, lakefs_repo, storage_namespace):
repo_name = f"{lakefs_repo}_test1"
initiate_repo_with_data(spark, lfs_client, repo_name, storage_namespace, "test1")

#Create a new branch, check that the tables are the same
lfs_client.branches_api.create_branch(repo_name, BranchCreation(name="dev", source="main"))
df_main = spark.read.table("lakefs.main.company.workers")
df_dev = spark.read.table("lakefs.dev.company.workers")
assert (df_main.schema == df_dev.schema) and (df_main.collect() == df_dev.collect()), "main and dev tables should be equal"

def test_delete_on_dev_and_merge(spark, lfs_client, lakefs_repo):
lfs_client.branches.create_branch(lakefs_repo, BranchCreation(name="test1", source="main"))
print("1")
spark.sql("SELECT * FROM lakefs.test1.company.workers").show()
spark.sql("DELETE FROM lakefs.test1.company.workers WHERE id = 6")
print("2")
spark.sql("SELECT * FROM lakefs.test1.company.workers").show()
lfs_client.commits.commit(lakefs_repo, "test1", CommitCreation(message="delete one row"))
merge_output = lfs_client.refs.merge_into_branch(lakefs_repo, "test1", "main")
print(merge_output)
print("3")
spark.sql("SELECT * FROM lakefs.main.company.workers").show()

def test_delete_on_dev_and_merge(spark, lfs_client: lakefs_sdk.client.LakeFSClient, lakefs_repo, storage_namespace):
repo_name = f"{lakefs_repo}_test2"
initiate_repo_with_data(spark, lfs_client, repo_name, storage_namespace, "test2")

lfs_client.branches_api.create_branch(repo_name, BranchCreation(name="test2", source="main"))
spark.sql("DELETE FROM lakefs.test2.company.workers WHERE id = 6")
lfs_client.commits_api.commit(repo_name, "test2", CommitCreation(message="delete one row"))
lfs_client.refs_api.merge_into_branch(repo_name, "test2", "main")
df_main = spark.read.table("lakefs.main.company.workers")
df_dev = spark.read.table("lakefs.test1.company.workers")
assert (df_main.schema == df_dev.schema) and (df_main.collect() == df_dev.collect()), "main and test1 tables should be equal"
df_dev = spark.read.table("lakefs.test2.company.workers")
assert (df_main.schema == df_dev.schema) and (df_main.collect() == df_dev.collect()), "main and test2 tables should be equal"


def test_multiple_changes_and_merge(spark, lfs_client, lakefs_repo):
df = get_data(spark)
df.write.saveAsTable("lakefs.main.company.workers")
def test_multiple_changes_and_merge(spark, lfs_client: lakefs_sdk.client.LakeFSClient, lakefs_repo, storage_namespace):
repo_name = f"{lakefs_repo}_test3"
initiate_repo_with_data(spark, lfs_client, repo_name, storage_namespace, "test3")

lfs_client.commits.commit(lakefs_repo, "main", CommitCreation(message="Initial data load"))
lfs_client.branches.create_branch(lakefs_repo, BranchCreation(name="dev", source="main"))
spark.sql("DELETE FROM lakefs.dev.company.workers WHERE id = 6")
spark.sql("DELETE FROM lakefs.dev.company.workers WHERE id = 5")
spark.sql("INSERT INTO lakefs.dev.company.workers VALUES (7, 'Jhon', 'Smith', 33, 'M')")
spark.sql("DELETE FROM lakefs.dev.company.workers WHERE id = 4")
spark.sql("INSERT INTO lakefs.dev.company.workers VALUES (8, 'Marta', 'Green', 31, 'F')")
lfs_client.commits.commit(lakefs_repo, "dev", CommitCreation(message="Some changes"))
lfs_client.refs.merge_into_branch(lakefs_repo, "dev", "main")
lfs_client.branches.create_branch(repo_name, BranchCreation(name="test3", source="main"))
spark.sql("DELETE FROM lakefs.test3.company.workers WHERE id = 6")
spark.sql("DELETE FROM lakefs.test3.company.workers WHERE id = 5")
spark.sql("INSERT INTO lakefs.test3.company.workers VALUES (7, 'Jhon', 'Smith', 33, 'M')")
spark.sql("DELETE FROM lakefs.test3.company.workers WHERE id = 4")
spark.sql("INSERT INTO lakefs.test3.company.workers VALUES (8, 'Marta', 'Green', 31, 'F')")
lfs_client.commits.commit(repo_name, "test3", CommitCreation(message="Some changes"))
lfs_client.refs.merge_into_branch(repo_name, "test3", "main")
df_main = spark.read.table("lakefs.main.company.workers")
df_dev = spark.read.table("lakefs.dev.company.workers")
df_dev = spark.read.table("lakefs.test3.company.workers")
assert (df_main.schema == df_dev.schema) and (df_main.collect() == df_dev.collect()), "main and dev tables should be equal"

0 comments on commit 93972c6

Please sign in to comment.