Skip to content

Commit

Permalink
Support partitioned by Date Type (#82)
Browse files Browse the repository at this point in the history
  • Loading branch information
camper42 authored and pan3793 committed Apr 15, 2022
1 parent 2bfeee2 commit b075778
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package xenon.clickhouse.single

import java.sql.Date
import java.sql.Timestamp

import org.apache.spark.sql.types._
Expand Down Expand Up @@ -117,6 +118,49 @@ class ClickHouseSingleSuite extends BaseSparkSuite
}
}

test("clickhouse partition (date type)") {
val db = "db_part_date"
val tbl = "tbl_part_date"
val schema =
StructType(
StructField("id", LongType, false) ::
StructField("date", DateType, false) :: Nil
)
withTable(db, tbl, schema, partKeys = Seq("date")) {
spark.sql(
s"""INSERT INTO `$db`.`$tbl`
|VALUES
| (11L, "2022-04-11"),
| (12L, "2022-04-12") AS tab(id, date)
|""".stripMargin
)
spark.createDataFrame(Seq(
(21L, Date.valueOf("2022-04-21")),
(22L, Date.valueOf("2022-04-22"))
))
.toDF("id", "date")
.writeTo(s"$db.$tbl").append

checkAnswer(
spark.table(s"$db.$tbl").orderBy($"id"),
Row(11L, Date.valueOf("2022-04-11")) ::
Row(12L, Date.valueOf("2022-04-12")) ::
Row(21L, Date.valueOf("2022-04-21")) ::
Row(22L, Date.valueOf("2022-04-22")) :: Nil
)

checkAnswer(
spark.sql(s"SHOW PARTITIONS $db.$tbl"),
Seq(
Row("date=2022-04-11"),
Row("date=2022-04-12"),
Row("date=2022-04-21"),
Row("date=2022-04-22")
)
)
}
}

test("clickhouse multi part columns") {
val db = "db_multi_part_col"
val tbl = "tbl_multi_part_col"
Expand Down Expand Up @@ -163,6 +207,50 @@ class ClickHouseSingleSuite extends BaseSparkSuite
}
}

test("clickhouse multi part columns (date type)") {
val db = "db_mul_part_date"
val tbl = "tbl_mul_part_date"
val schema =
StructType(
StructField("id", LongType, false) ::
StructField("part_1", DateType, false) ::
StructField("part_2", IntegerType, false) :: Nil
)
withTable(db, tbl, schema, partKeys = Seq("part_1", "part_2")) {
spark.sql(
s"""INSERT INTO `$db`.`$tbl`
|VALUES
| (11L, "2022-04-11", 1),
| (12L, "2022-04-12", 2) AS tab(id, part_1, part_2)
|""".stripMargin
)
spark.createDataFrame(Seq(
(21L, "2022-04-21", 1),
(22L, "2022-04-22", 2)
))
.toDF("id", "part_1", "part_2")
.writeTo(s"$db.$tbl").append

checkAnswer(
spark.table(s"$db.$tbl").orderBy($"id"),
Row(11L, Date.valueOf("2022-04-11"), 1) ::
Row(12L, Date.valueOf("2022-04-12"), 2) ::
Row(21L, Date.valueOf("2022-04-21"), 1) ::
Row(22L, Date.valueOf("2022-04-22"), 2) :: Nil
)

checkAnswer(
spark.sql(s"SHOW PARTITIONS $db.$tbl"),
Seq(
Row("part_1=2022-04-11/part_2=1"),
Row("part_1=2022-04-12/part_2=2"),
Row("part_1=2022-04-21/part_2=1"),
Row("part_1=2022-04-22/part_2=2")
)
)
}
}

test("clickhouse multi sort columns") {
val db = "db_multi_sort_col"
val tbl = "tbl_multi_sort_col"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package xenon.clickhouse

import java.lang.{Integer => JInt, Long => JLong}
import java.time.ZoneId
import java.time.{LocalDate, ZoneId}
import java.util

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -223,6 +223,7 @@ case class ClickHouseTable(
case StringType => UTF8String.fromString(str.stripPrefix("'").stripSuffix("'"))
case IntegerType => JInt.parseInt(str)
case LongType => JLong.parseLong(str)
case DateType => LocalDate.parse(str.stripPrefix("'").stripSuffix("'"), dateFmt).toEpochDay.toInt
case unsupported => throw new UnsupportedOperationException(s"$unsupported")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ case class ClickHouseInputPartition(

def partFilterExpr: String = partition match {
case NoPartitionSpec => "1=1"
case PartitionSpec(part, _, _) => s"${table.partition_key} = $part"
case PartitionSpec(part, _, _) => (part.contains("-"), part.contains("(")) match {
// quote when partition by a single Date Type column to avoid illegal types of arguments (Date, Int64)
case (true, false) => s"${table.partition_key} = '$part'"
// Date type column is quoted if there are multi partition columns
case _ => s"${table.partition_key} = $part"
}
}
}

0 comments on commit b075778

Please sign in to comment.