Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
sabir-akhadov committed May 2, 2024
1 parent fae8f70 commit 5665320
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 6 deletions.
31 changes: 26 additions & 5 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,20 @@ class DeltaLog private(
// ensure any partitionSchema changes will be captured, and upon restart, the new snapshot will
// be initialized with the correct partition schema again.
val fileIndex = new TahoeBatchFileIndex(spark, actionType, addFiles, this, dataPath, snapshot)
val relation = buildHadoopFsRelationWithFileIndex(snapshot, fileIndex, bucketSpec = None)
// Drop null type columns from the relation's schema if it's not a streaming query until
// null type columns are fully supported.
val dropNullTypeColumnsFromSchema = if (isStreaming) {
// Can force the legacy behavior(dropping nullType columns) with a flag.
SQLConf.get.getConf(DeltaSQLConf.DELTA_STREAMING_CREATE_DATAFRAME_DROP_NULL_COLUMNS)
} else {
// Allow configurable behavior for non-streaming sources. This is used for testing.
SQLConf.get.getConf(DeltaSQLConf.DELTA_CREATE_DATAFRAME_DROP_NULL_COLUMNS)
}
val relation = buildHadoopFsRelationWithFileIndex(
snapshot,
fileIndex,
bucketSpec = None,
dropNullTypeColumnsFromSchema = dropNullTypeColumnsFromSchema)
Dataset.ofRows(spark, LogicalRelation(relation, isStreaming = isStreaming))
}

Expand Down Expand Up @@ -578,8 +591,16 @@ class DeltaLog private(
}
}

def buildHadoopFsRelationWithFileIndex(snapshot: SnapshotDescriptor, fileIndex: TahoeFileIndex,
bucketSpec: Option[BucketSpec]): HadoopFsRelation = {
def buildHadoopFsRelationWithFileIndex(
snapshot: SnapshotDescriptor,
fileIndex: TahoeFileIndex,
bucketSpec: Option[BucketSpec],
dropNullTypeColumnsFromSchema: Boolean = true): HadoopFsRelation = {
val dataSchema = if (dropNullTypeColumnsFromSchema) {
SchemaUtils.dropNullTypeColumns(snapshot.metadata.schema)
} else {
snapshot.metadata.schema
}
HadoopFsRelation(
fileIndex,
partitionSchema = DeltaColumnMapping.dropColumnMappingMetadata(
Expand All @@ -588,8 +609,8 @@ class DeltaLog private(
// column locations. Otherwise, for any partition columns not in `dataSchema`, Spark would
// just append them to the end of `dataSchema`.
dataSchema = DeltaColumnMapping.dropColumnMappingMetadata(
DeltaTableUtils.removeInternalMetadata(spark,
SchemaUtils.dropNullTypeColumns(snapshot.metadata.schema))),
DeltaTableUtils.removeInternalMetadata(spark, dataSchema)
),
bucketSpec = bucketSpec,
fileFormat(snapshot.protocol, snapshot.metadata),
// `metadata.format.options` is not set today. Even if we support it in future, we shouldn't
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1295,6 +1295,20 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(false)

val DELTA_STREAMING_CREATE_DATAFRAME_DROP_NULL_COLUMNS =
buildConf("streaming.createDataFrame.dropNullColumns")
.internal()
.doc("Whether to drop columns with NullType in DeltaLog.createDataFrame.")
.booleanConf
.createWithDefault(false)

val DELTA_CREATE_DATAFRAME_DROP_NULL_COLUMNS =
buildConf("createDataFrame.dropNullColumns")
.internal()
.doc("Whether to drop columns with NullType in DeltaLog.createDataFrame.")
.booleanConf
.createWithDefault(true)

val DELTA_CDF_UNSAFE_BATCH_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES =
buildConf("changeDataFeed.unsafeBatchReadOnIncompatibleSchemaChanges.enabled")
.doc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions.when
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, StreamingQueryException, Trigger}
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.types.{NullType, StringType, StructType}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.{ManualClock, Utils}

Expand All @@ -53,6 +53,81 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase

import testImplicits._

def testNullTypeColumn(shouldDropNullTypeColumns: Boolean): Unit = {
withTempPaths(3) { case Seq(sourcePath, sinkPath, checkpointPath) =>
withSQLConf(
DeltaSQLConf.DELTA_STREAMING_CREATE_DATAFRAME_DROP_NULL_COLUMNS.key ->
shouldDropNullTypeColumns.toString) {

spark.sql("select CAST(null as VOID) as nullTypeCol, id from range(10)")
.write
.format("delta")
.mode("append")
.save(sourcePath.getCanonicalPath)

def runStream() = {
spark.readStream
.format("delta")
.load(sourcePath.getCanonicalPath)
// Need to drop null type columns because it's not supported by the writer.
.drop("nullTypeCol")
.writeStream
.option("checkpointLocation", checkpointPath.getCanonicalPath)
.format("delta")
.start(sinkPath.getCanonicalPath)
.processAllAvailable()
}
if (shouldDropNullTypeColumns) {
val e = intercept[StreamingQueryException] {
runStream()
}
assert(e.getErrorClass == "STREAM_FAILED")
// This assertion checks the schema of the source did not change while processing a batch.
assert(e.getMessage.contains("assertion failed: Invalid batch: nullTypeCol"))
} else {
runStream()
}
}
}
}

test("streaming delta source should not drop null columns") {
testNullTypeColumn(shouldDropNullTypeColumns = false)
}

test("streaming delta source should drop null columns without feature flag") {
testNullTypeColumn(shouldDropNullTypeColumns = true)
}

def testCreateDataFrame(shouldDropNullTypeColumns: Boolean): Unit = {
withSQLConf(DeltaSQLConf.DELTA_CREATE_DATAFRAME_DROP_NULL_COLUMNS.key ->
shouldDropNullTypeColumns.toString) {
withTempPath { tempPath =>
spark.sql("select CAST(null as VOID) as nullTypeCol, id from range(10)")
.write
.format("delta")
.mode("append")
.save(tempPath.getCanonicalPath)
val deltaLog = DeltaLog.forTable(spark, tempPath)
val df = deltaLog.createDataFrame(deltaLog.update(), Seq.empty, isStreaming = false)
val nullTypeFields = df.schema.filter(_.dataType == NullType)
if(shouldDropNullTypeColumns) {
assert(nullTypeFields.isEmpty)
} else {
assert(nullTypeFields.size == 1)
}
}
}
}

test("DeltaLog.createDataFrame should drop null columns with feature flag") {
testCreateDataFrame(shouldDropNullTypeColumns = true)
}

test("DeltaLog.createDataFrame should not drop null columns without feature flag") {
testCreateDataFrame(shouldDropNullTypeColumns = false)
}

test("no schema should throw an exception") {
withTempDir { inputDir =>
new File(inputDir, "_delta_log").mkdir()
Expand Down

0 comments on commit 5665320

Please sign in to comment.