Skip to content

Commit

Permalink
changed to data-set, added test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
lynnro314 committed Oct 29, 2023
1 parent 5d68471 commit 3d96e57
Showing 1 changed file with 36 additions and 4 deletions.
40 changes: 36 additions & 4 deletions test/test_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,47 @@
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.types import StructType, StructField, StringType, IntegerType


def get_data(spark):
data_set = [(1, "James", "Smith", 32, "M"),
(2, "Michael","Rose", 35 ,"M"),
(3, "Robert", "Williams", 41, "M"),
(4, "Maria", "Jones", 36, "F"),
(5, "Jen","Brown", 44, "F"),
(6, "Monika","Geller", 31, "F")]

schema = StructType([StructField("id", StringType(), True),
StructField("firstname",StringType(),True),
StructField("lastname",StringType(),True),
StructField("age", IntegerType(), True),
StructField("gender", StringType(), True),
])
df = spark.createDataFrame(data=data_set,schema=schema)
return df

def test_diff_two_same_branches(spark, lfs_client, lakefs_repo):
print("repo name ", lakefs_repo)
df = spark.read.option("inferSchema","true").option("multiline","true").csv("./test/data-sets/film_permits.csv")
df.write.saveAsTable("lakefs.main.nyc.permits")
df = get_data(spark)
df.write.saveAsTable("lakefs.main.company.workers")

#Commit, create a new branch, check that the tables are the same
lfs_client.commits.commit(lakefs_repo, "main", CommitCreation(message="Initial data load"))
lfs_client.branches.create_branch(lakefs_repo, BranchCreation(name="dev", source="main"))
df_main = spark.read.table("lakefs.main.nyc.permits")
df_dev = spark.read.table("lakefs.dev.nyc.permits")
df_main = spark.read.table("lakefs.main.company.workers")
df_dev = spark.read.table("lakefs.dev.company.workers")
assert (df_main.schema == df_dev.schema) and (df_main.collect() == df_dev.collect()), "main and dev tables should be equal"

def delete_on_dev_and_merge(spark, lfs_client, lakefs_repo):
df = get_data(spark)
df.write.saveAsTable("lakefs.main.company.workers")

lfs_client.commits.commit(lakefs_repo, "main", CommitCreation(message="Initial data load"))
lfs_client.branches.create_branch(lakefs_repo, BranchCreation(name="dev", source="main"))
spark.sql("DELETE FROM lakefs.dev.company.workers WHERE id = 6")
lfs_client.commits.commit(lakefs_repo, "dev", CommitCreation(message="delete one row"))
lfs_client.refs.merge_into_branch(lakefs_repo, "dev", "main")
df_main = spark.read.table("lakefs.main.company.workers")
df_dev = spark.read.table("lakefs.dev.company.workers")
assert (df_main.schema == df_dev.schema) and (df_main.collect() == df_dev.collect()), "main and dev tables should be equal"

0 comments on commit 3d96e57

Please sign in to comment.