Skip to content

Commit

Permalink
Write INT64 by default for Timestamps
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko committed Jul 14, 2024
1 parent 573a57f commit 86fdff7
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,8 @@ case class DeltaParquetFileFormat(
dataSchema: StructType): OutputWriterFactory = {
val factory = super.prepareWrite(sparkSession, job, options, dataSchema)
val conf = ContextUtil.getConfiguration(job)
// Always write timestamp as TIMESTAMP_MICROS for Iceberg compat based on Iceberg spec
if (IcebergCompatV1.isEnabled(metadata) || IcebergCompatV2.isEnabled(metadata)) {
// If the key is not set explicitly, default to INT64 for Iceberg compatability
if (!sparkSession.conf.contains(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key)) {
conf.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key,
SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS.toString)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,16 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.format.converter.ParquetMetadataConverter
import org.apache.parquet.hadoop.ParquetFileReader

import org.apache.spark.sql.{DataFrame, Dataset, QueryTest}
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.schema.MessageType
import org.apache.spark.sql.{DataFrame, Dataset, QueryTest, Row}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}

import java.io.File
import java.sql.Timestamp

trait DeltaParquetFileFormatSuiteBase
extends QueryTest
Expand Down Expand Up @@ -294,3 +300,55 @@ class DeltaParquetFileFormatWithPredicatePushdownSuite extends DeltaParquetFileF
}
}
}

class DeltaParquetTimestampFormatSuite extends DeltaParquetFileFormatSuiteBase {
import testImplicits._

private def getSchemaFromFirstFileInDirectory(tempDir: File): MessageType = {
val files = tempDir.list().filter(_.contains(".parquet")).filter(!_.contains(".crc"))
val file = s"${tempDir}/${files(0)}"

val reader = ParquetFileReader
.open(HadoopInputFile.fromPath(new Path(file), new Configuration()))

val schema = reader.getFooter.getFileMetaData.getSchema

reader.close()

schema
}

private def writeDatesDataframe(tempDir: File): Unit = {
val schema = StructType(List(
StructField("eventTimeString", TimestampType, nullable = false)
))
val sc = spark.sparkContext
val dates = spark.createDataFrame(sc.parallelize(Seq(
Row(Timestamp.valueOf("2024-01-01 23:00:01")),
Row(Timestamp.valueOf("2024-11-30 19:25:32")),
Row(Timestamp.valueOf("2026-12-29 09:22:00")),
Row(Timestamp.valueOf("2026-05-09 10:12:43"))
)), schema).repartition(1)
dates.write.format("delta").mode("append").save(tempDir.toString)
}

test("Write Parquet timestamp without any config") {
withTempDir { tempDir =>
writeDatesDataframe(tempDir)
val parquetSchema = getSchemaFromFirstFileInDirectory(tempDir)
assert(
parquetSchema.toString.contains("optional int64 eventTimeString (TIMESTAMP(MICROS,true))"))
}
}

test("Write Parquet timestamp with explicit INT96") {
withTempDir { tempDir =>
spark.conf.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key,
SQLConf.ParquetOutputTimestampType.INT96.toString)
writeDatesDataframe(tempDir)
val parquetSchema = getSchemaFromFirstFileInDirectory(tempDir)
assert(
parquetSchema.toString.contains("optional int96 eventTimeString"))
}
}
}

0 comments on commit 86fdff7

Please sign in to comment.