diff --git a/test/test_iceberg.py b/test/test_iceberg.py index 7e702f4..22f6c07 100644 --- a/test/test_iceberg.py +++ b/test/test_iceberg.py @@ -10,15 +10,47 @@ import pyspark from pyspark.sql import SparkSession from pyspark.conf import SparkConf +from pyspark.sql.types import StructType, StructField, StringType, IntegerType + + +def get_data(spark): + data_set = [(1, "James", "Smith", 32, "M"), + (2, "Michael","Rose", 35 ,"M"), + (3, "Robert", "Williams", 41, "M"), + (4, "Maria", "Jones", 36, "F"), + (5, "Jen","Brown", 44, "F"), + (6, "Monika","Geller", 31, "F")] + + schema = StructType([StructField("id", StringType(), True), + StructField("firstname",StringType(),True), + StructField("lastname",StringType(),True), + StructField("age", IntegerType(), True), + StructField("gender", StringType(), True), + ]) + df = spark.createDataFrame(data=data_set,schema=schema) + return df def test_diff_two_same_branches(spark, lfs_client, lakefs_repo): print("repo name ", lakefs_repo) - df = spark.read.option("inferSchema","true").option("multiline","true").csv("./test/data-sets/film_permits.csv") - df.write.saveAsTable("lakefs.main.nyc.permits") + df = get_data(spark) + df.write.saveAsTable("lakefs.main.company.workers") #Commit, create a new branch, check that the tables are the same lfs_client.commits.commit(lakefs_repo, "main", CommitCreation(message="Initial data load")) lfs_client.branches.create_branch(lakefs_repo, BranchCreation(name="dev", source="main")) - df_main = spark.read.table("lakefs.main.nyc.permits") - df_dev = spark.read.table("lakefs.dev.nyc.permits") + df_main = spark.read.table("lakefs.main.company.workers") + df_dev = spark.read.table("lakefs.dev.company.workers") assert (df_main.schema == df_dev.schema) and (df_main.collect() == df_dev.collect()), "main and dev tables should be equal" + +def delete_on_dev_and_merge(spark, lfs_client, lakefs_repo): + df = get_data(spark) + df.write.saveAsTable("lakefs.main.company.workers") + + lfs_client.commits.commit(lakefs_repo, "main", CommitCreation(message="Initial data load")) + lfs_client.branches.create_branch(lakefs_repo, BranchCreation(name="dev", source="main")) + spark.sql("DELETE FROM lakefs.dev.company.workers WHERE id = 6") + lfs_client.commits.commit(lakefs_repo, "dev", CommitCreation(message="delete one row")) + lfs_client.refs.merge_into_branch(lakefs_repo, "dev", "main") + df_main = spark.read.table("lakefs.main.company.workers") + df_dev = spark.read.table("lakefs.dev.company.workers") + assert (df_main.schema == df_dev.schema) and (df_main.collect() == df_dev.collect()), "main and dev tables should be equal" \ No newline at end of file