From b075778e6acb8605dfc1169168099905cb9e11fa Mon Sep 17 00:00:00 2001 From: camper42 Date: Fri, 15 Apr 2022 12:57:02 +0800 Subject: [PATCH] Support partitioned by Date Type (#82) --- .../single/ClickHouseSingleSuite.scala | 88 +++++++++++++++++++ .../xenon/clickhouse/ClickHouseTable.scala | 3 +- .../clickhouse/read/InputPartitions.scala | 7 +- 3 files changed, 96 insertions(+), 2 deletions(-) diff --git a/spark-3.2/clickhouse-spark-it/src/test/scala/xenon/clickhouse/single/ClickHouseSingleSuite.scala b/spark-3.2/clickhouse-spark-it/src/test/scala/xenon/clickhouse/single/ClickHouseSingleSuite.scala index 59382ab1..3047b38a 100644 --- a/spark-3.2/clickhouse-spark-it/src/test/scala/xenon/clickhouse/single/ClickHouseSingleSuite.scala +++ b/spark-3.2/clickhouse-spark-it/src/test/scala/xenon/clickhouse/single/ClickHouseSingleSuite.scala @@ -14,6 +14,7 @@ package xenon.clickhouse.single +import java.sql.Date import java.sql.Timestamp import org.apache.spark.sql.types._ @@ -117,6 +118,49 @@ class ClickHouseSingleSuite extends BaseSparkSuite } } + test("clickhouse partition (date type)") { + val db = "db_part_date" + val tbl = "tbl_part_date" + val schema = + StructType( + StructField("id", LongType, false) :: + StructField("date", DateType, false) :: Nil + ) + withTable(db, tbl, schema, partKeys = Seq("date")) { + spark.sql( + s"""INSERT INTO `$db`.`$tbl` + |VALUES + | (11L, "2022-04-11"), + | (12L, "2022-04-12") AS tab(id, date) + |""".stripMargin + ) + spark.createDataFrame(Seq( + (21L, Date.valueOf("2022-04-21")), + (22L, Date.valueOf("2022-04-22")) + )) + .toDF("id", "date") + .writeTo(s"$db.$tbl").append + + checkAnswer( + spark.table(s"$db.$tbl").orderBy($"id"), + Row(11L, Date.valueOf("2022-04-11")) :: + Row(12L, Date.valueOf("2022-04-12")) :: + Row(21L, Date.valueOf("2022-04-21")) :: + Row(22L, Date.valueOf("2022-04-22")) :: Nil + ) + + checkAnswer( + spark.sql(s"SHOW PARTITIONS $db.$tbl"), + Seq( + Row("date=2022-04-11"), + Row("date=2022-04-12"), + Row("date=2022-04-21"), + Row("date=2022-04-22") + ) + ) + } + } + test("clickhouse multi part columns") { val db = "db_multi_part_col" val tbl = "tbl_multi_part_col" @@ -163,6 +207,50 @@ class ClickHouseSingleSuite extends BaseSparkSuite } } + test("clickhouse multi part columns (date type)") { + val db = "db_mul_part_date" + val tbl = "tbl_mul_part_date" + val schema = + StructType( + StructField("id", LongType, false) :: + StructField("part_1", DateType, false) :: + StructField("part_2", IntegerType, false) :: Nil + ) + withTable(db, tbl, schema, partKeys = Seq("part_1", "part_2")) { + spark.sql( + s"""INSERT INTO `$db`.`$tbl` + |VALUES + | (11L, "2022-04-11", 1), + | (12L, "2022-04-12", 2) AS tab(id, part_1, part_2) + |""".stripMargin + ) + spark.createDataFrame(Seq( + (21L, "2022-04-21", 1), + (22L, "2022-04-22", 2) + )) + .toDF("id", "part_1", "part_2") + .writeTo(s"$db.$tbl").append + + checkAnswer( + spark.table(s"$db.$tbl").orderBy($"id"), + Row(11L, Date.valueOf("2022-04-11"), 1) :: + Row(12L, Date.valueOf("2022-04-12"), 2) :: + Row(21L, Date.valueOf("2022-04-21"), 1) :: + Row(22L, Date.valueOf("2022-04-22"), 2) :: Nil + ) + + checkAnswer( + spark.sql(s"SHOW PARTITIONS $db.$tbl"), + Seq( + Row("part_1=2022-04-11/part_2=1"), + Row("part_1=2022-04-12/part_2=2"), + Row("part_1=2022-04-21/part_2=1"), + Row("part_1=2022-04-22/part_2=2") + ) + ) + } + } + test("clickhouse multi sort columns") { val db = "db_multi_sort_col" val tbl = "tbl_multi_sort_col" diff --git a/spark-3.2/clickhouse-spark/src/main/scala/xenon/clickhouse/ClickHouseTable.scala b/spark-3.2/clickhouse-spark/src/main/scala/xenon/clickhouse/ClickHouseTable.scala index 66b59eb7..8a56c567 100644 --- a/spark-3.2/clickhouse-spark/src/main/scala/xenon/clickhouse/ClickHouseTable.scala +++ b/spark-3.2/clickhouse-spark/src/main/scala/xenon/clickhouse/ClickHouseTable.scala @@ -15,7 +15,7 @@ package xenon.clickhouse import java.lang.{Integer => JInt, Long => JLong} -import java.time.ZoneId +import java.time.{LocalDate, ZoneId} import java.util import scala.collection.JavaConverters._ @@ -223,6 +223,7 @@ case class ClickHouseTable( case StringType => UTF8String.fromString(str.stripPrefix("'").stripSuffix("'")) case IntegerType => JInt.parseInt(str) case LongType => JLong.parseLong(str) + case DateType => LocalDate.parse(str.stripPrefix("'").stripSuffix("'"), dateFmt).toEpochDay.toInt case unsupported => throw new UnsupportedOperationException(s"$unsupported") } diff --git a/spark-3.2/clickhouse-spark/src/main/scala/xenon/clickhouse/read/InputPartitions.scala b/spark-3.2/clickhouse-spark/src/main/scala/xenon/clickhouse/read/InputPartitions.scala index 87cf3191..97d9d74e 100644 --- a/spark-3.2/clickhouse-spark/src/main/scala/xenon/clickhouse/read/InputPartitions.scala +++ b/spark-3.2/clickhouse-spark/src/main/scala/xenon/clickhouse/read/InputPartitions.scala @@ -40,6 +40,11 @@ case class ClickHouseInputPartition( def partFilterExpr: String = partition match { case NoPartitionSpec => "1=1" - case PartitionSpec(part, _, _) => s"${table.partition_key} = $part" + case PartitionSpec(part, _, _) => (part.contains("-"), part.contains("(")) match { + // quote when partition by a single Date Type column to avoid illegal types of arguments (Date, Int64) + case (true, false) => s"${table.partition_key} = '$part'" + // Date type column is quoted if there are multi partition columns + case _ => s"${table.partition_key} = $part" + } } }