Skip to content

Commit

Permalink
added simple test case
Browse files Browse the repository at this point in the history
  • Loading branch information
lynnro314 committed Jul 13, 2023
1 parent 22e7460 commit d453ff1
Showing 1 changed file with 12 additions and 3 deletions.
15 changes: 12 additions & 3 deletions test/run-test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import lakefs_client
from lakefs_client.client import LakeFSClient
from lakefs_client.models import *
from lakefs_client.model.access_key_credentials import AccessKeyCredentials
from lakefs_client.model.comm_prefs_input import CommPrefsInput
from lakefs_client.model.setup import Setup
Expand Down Expand Up @@ -33,17 +34,18 @@ def main():
host='http://localhost:8000'))

# Setup lakeFS
repo_name = args.repository
lfs_client.config.setup_comm_prefs(CommPrefsInput(feature_updates=False, security_updates=False, email=MOCK_EMAIL))
lfs_client.config.setup(Setup(username="lynn",
key=AccessKeyCredentials(access_key_id= lakefs_access_key, secret_access_key= lakefs_secret_key)))

lfs_client.repositories.create_repository(
RepositoryCreation(name=args.repository, storage_namespace=args.storage_namespace))
RepositoryCreation(name=repo_name, storage_namespace=args.storage_namespace))

spark_config = SparkConf()
spark_config.set("spark.sql.catalog.lakefs", "org.apache.iceberg.spark.SparkCatalog")
spark_config.set("spark.sql.catalog.lakefs.catalog-impl", "io.lakefs.iceberg.LakeFSCatalog")
spark_config.set("spark.sql.catalog.lakefs.warehouse", f"lakefs://{args.repository}")
spark_config.set("spark.sql.catalog.lakefs.warehouse", f"lakefs://{repo_name}")
spark_config.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
spark_config.set("spark.hadoop.fs.s3a.endpoint", "http://localhost:8000")
spark_config.set("spark.hadoop.fs.s3a.access.key", lakefs_access_key)
Expand All @@ -56,7 +58,14 @@ def main():
df = spark.read.option("inferSchema","true").option("multiline","true").csv("./test/data-sets/film_permits.csv")
df.write.saveAsTable("lakefs.main.nyc.permits")


#Commit, create a new branch, check that the tables are the same
lfs_client.commits.commit(repo_name, "main", CommitCreation(message="Initial data load"))
lfs_client.branches.create_branch(repo_name, BranchCreation(name="dev", source="main"))
df_main = spark.read.table("lakefs.main.nyc.permits")
df_dev = spark.read.table("lakefs.dev.nyc.permits")
df_main.show(10)
df_dev.show(10)
assert (df_main.schema == df_main.schema) and (df_main.collect() == df_main.collect())

if __name__ == '__main__':
main()

0 comments on commit d453ff1

Please sign in to comment.