Skip to content

Commit

Permalink
[Spark] Identity Columns APIs in DeltaColumnBuilder (delta-io#2857)
Browse files Browse the repository at this point in the history
#### Which Delta project/connector is this regarding?
- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description
This PR is part of delta-io#1959
* We introduce `generateAlwaysAsIdentity` and
`generatedByDefaultAsIdentity`APIs into DeltaColumnBuilder so that users
can create Delta table with Identity column.
* We guard the creation of identity column tables with a feature flag
until development is complete.

## How was this patch tested?
New tests. 

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
Yes, we introduce `generateAlwaysAsIdentity` and
`generatedByDefaultAsIdentity` interfaces to DeltaColumnBuilder for
creating identity columns.
**Interfaces**
```
def generatedAlwaysAsIdentity(): DeltaColumnBuilder
def generatedAlwaysAsIdentity(start: Long, step: Long): DeltaColumnBuilder
def generatedByDefaultAsIdentity(): DeltaColumnBuilder
def generatedByDefaultAsIdentity(start: Long, step: Long): DeltaColumnBuilder
```
When the `start` and the `step` parameters are not specified, they
default to `1L`. `generatedByDefaultAsIdentity` allows users to insert
values into the column while a column specified
with`generatedAlwaysAsIdentity` can only ever have system generated
values.

**Example Usage**
```
// Creates a Delta identity column.
io.delta.tables.DeltaTable.columnBuilder(spark, "id")
      .dataType(LongType)
      .generatedAlwaysAsIdentity()
// Which is equivalent to the call
io.delta.tables.DeltaTable.columnBuilder(spark, "id")
      .dataType(LongType)
      .generatedAlwaysAsIdentity(start = 1L, step = 1L)
```
  • Loading branch information
c27kwan authored Apr 30, 2024
1 parent e84a874 commit 44b76fa
Show file tree
Hide file tree
Showing 9 changed files with 495 additions and 5 deletions.
18 changes: 18 additions & 0 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -1121,6 +1121,24 @@
},
"sqlState" : "KD00E"
},
"DELTA_IDENTITY_COLUMNS_ILLEGAL_STEP" : {
"message" : [
"IDENTITY column step cannot be 0."
],
"sqlState" : "42611"
},
"DELTA_IDENTITY_COLUMNS_UNSUPPORTED_DATA_TYPE" : {
"message" : [
"DataType <dataType> is not supported for IDENTITY columns."
],
"sqlState" : "428H2"
},
"DELTA_IDENTITY_COLUMNS_WITH_GENERATED_EXPRESSION" : {
"message" : [
"IDENTITY column cannot be specified with a generated column expression."
],
"sqlState" : "42613"
},
"DELTA_ILLEGAL_FILE_FOUND" : {
"message" : [
"Illegal files found in a dataChange = false transaction. Files: <file>"
Expand Down
93 changes: 90 additions & 3 deletions spark/src/main/scala/io/delta/tables/DeltaColumnBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@

package io.delta.tables

import org.apache.spark.sql.delta.DeltaErrors
import org.apache.spark.sql.delta.sources.DeltaSourceUtils.GENERATION_EXPRESSION_METADATA_KEY
import org.apache.spark.sql.delta.{DeltaErrors, IdentityColumn}
import org.apache.spark.sql.delta.sources.DeltaSourceUtils.{GENERATION_EXPRESSION_METADATA_KEY, IDENTITY_INFO_ALLOW_EXPLICIT_INSERT, IDENTITY_INFO_START, IDENTITY_INFO_STEP}
import org.apache.spark.sql.util.ScalaExtensions._

import org.apache.spark.annotation._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{DataType, MetadataBuilder, StructField}
import org.apache.spark.sql.types.{DataType, LongType, MetadataBuilder, StructField}

/**
* :: Evolving ::
Expand All @@ -39,6 +40,9 @@ class DeltaColumnBuilder private[tables](
private var nullable: Boolean = true
private var generationExpr: Option[String] = None
private var comment: Option[String] = None
private var identityStart: Option[Long] = None
private var identityStep: Option[Long] = None
private var identityAllowExplicitInsert: Option[Boolean] = None

/**
* :: Evolving ::
Expand Down Expand Up @@ -96,6 +100,69 @@ class DeltaColumnBuilder private[tables](
this
}

/**
* :: Evolving ::
*
* Specify a column as an identity column with default values that is always generated
* by the system (i.e. does not allow user-specified values).
*
* @since 3.3.0
*/
@Evolving
def generatedAlwaysAsIdentity(): DeltaColumnBuilder = {
generatedAlwaysAsIdentity(IdentityColumn.defaultStart, IdentityColumn.defaultStep)
}

/**
* :: Evolving ::
*
* Specify a column as an identity column that is always generated by the system (i.e. does not
* allow user-specified values).
*
* @param start the start value of the identity column
* @param step the increment step of the identity column
* @since 3.3.0
*/
@Evolving
def generatedAlwaysAsIdentity(start: Long, step: Long): DeltaColumnBuilder = {
this.identityStart = Some(start)
this.identityStep = Some(step)
this.identityAllowExplicitInsert = Some(false)

this
}

/**
* :: Evolving ::
*
* Specify a column as an identity column that allows user-specified values such that the
* generated values use default start and step values.
*
* @since 3.3.0
*/
@Evolving
def generatedByDefaultAsIdentity(): DeltaColumnBuilder = {
generatedByDefaultAsIdentity(IdentityColumn.defaultStart, IdentityColumn.defaultStep)
}

/**
* :: Evolving ::
*
* Specify a column as an identity column that allows user-specified values.
*
* @param start the start value of the identity column
* @param step the increment step of the identity column
* @since 3.3.0
*/
@Evolving
def generatedByDefaultAsIdentity(start: Long, step: Long): DeltaColumnBuilder = {
this.identityStart = Some(start)
this.identityStep = Some(step)
this.identityAllowExplicitInsert = Some(true)

this
}

/**
* :: Evolving ::
*
Expand Down Expand Up @@ -123,6 +190,26 @@ class DeltaColumnBuilder private[tables](
if (generationExpr.nonEmpty) {
metadataBuilder.putString(GENERATION_EXPRESSION_METADATA_KEY, generationExpr.get)
}

identityAllowExplicitInsert.ifDefined { allowExplicitInsert =>
if (generationExpr.nonEmpty) {
throw DeltaErrors.identityColumnWithGenerationExpression()
}

if (dataType != null && dataType != LongType) {
throw DeltaErrors.identityColumnDataTypeNotSupported(dataType)
}

metadataBuilder.putBoolean(
IDENTITY_INFO_ALLOW_EXPLICIT_INSERT, allowExplicitInsert)
metadataBuilder.putLong(IDENTITY_INFO_START, identityStart.get)
val step = identityStep.get
if (step == 0L) {
throw DeltaErrors.identityColumnIllegalStep()
}
metadataBuilder.putLong(IDENTITY_INFO_STEP, identityStep.get)
}

if (comment.nonEmpty) {
metadataBuilder.putString("comment", comment.get)
}
Expand Down
16 changes: 16 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2445,6 +2445,22 @@ trait DeltaErrorsBase
new DeltaAnalysisException(errorClass = "DELTA_UNSET_NON_EXISTENT_PROPERTY", Array(key, table))
}

def identityColumnWithGenerationExpression(): Throwable = {
new DeltaAnalysisException(
errorClass = "DELTA_IDENTITY_COLUMNS_WITH_GENERATED_EXPRESSION", Array.empty)
}

def identityColumnIllegalStep(): Throwable = {
new DeltaAnalysisException(errorClass = "DELTA_IDENTITY_COLUMNS_ILLEGAL_STEP", Array.empty)
}

def identityColumnDataTypeNotSupported(unsupportedType: DataType): Throwable = {
new DeltaUnsupportedOperationException(
errorClass = "DELTA_IDENTITY_COLUMNS_UNSUPPORTED_DATA_TYPE",
messageParameters = Array(unsupportedType.typeName)
)
}

def identityColumnInconsistentMetadata(
colName: String,
hasStart: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,15 @@ trait OptimisticTransactionImpl extends TransactionalWrite
}
isCreatingNewTable = true
}

val identityColumnAllowed =
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_IDENTITY_COLUMN_ENABLED)
if (!identityColumnAllowed &&
ColumnWithDefaultExprUtils.hasIdentityColumn(newMetadataTmp.schema)) {
throw DeltaErrors.unsupportedWriterTableFeaturesInTableException(
deltaLog.dataPath.toString, Seq(IdentityColumnsTableFeature.name))
}

val protocolBeforeUpdate = protocol
// The `.schema` cannot be generated correctly unless the column mapping metadata is correctly
// filled for all the fields. Therefore, the column mapping changes need to happen first.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1744,6 +1744,21 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(true)

///////////////////
// IDENTITY COLUMN
///////////////////

val DELTA_IDENTITY_COLUMN_ENABLED =
buildConf("identityColumn.enabled")
.internal()
.doc(
"""
| The umbrella config to turn on/off the IDENTITY column support.
| If true, enable Delta IDENTITY column write support. If a table has an IDENTITY column,
| it is not writable but still readable if this config is set to false.
|""".stripMargin)
.booleanConf
.createWithDefault(false)

///////////
// TESTING
Expand Down
44 changes: 42 additions & 2 deletions spark/src/test/scala/org/apache/spark/sql/delta/DDLTestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.test.DeltaSQLCommandTest

import org.apache.spark.sql.{QueryTest, SparkSession}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{DataType, LongType, StructField}
Expand Down Expand Up @@ -73,8 +75,46 @@ case class TestColumnSpec(
}
}

object GeneratedAsIdentityType extends Enumeration {
type GeneratedAsIdentityType = Value
val GeneratedAlways, GeneratedByDefault = Value
}

case class IdentityColumnSpec(
generatedAsIdentityType: GeneratedAsIdentityType.GeneratedAsIdentityType,
startsWith: Option[Long] = None,
incrementBy: Option[Long] = None,
colName: String = "id",
dataType: DataType = LongType,
comment: Option[String] = None)
extends ColumnSpec {

override def ddl: String = {
throw new UnsupportedOperationException(
"DDL generation is not supported for identity columns yet")
}

override def structField(spark: SparkSession): StructField = {
var col = io.delta.tables.DeltaTable.columnBuilder(spark, colName)
.dataType(dataType)
val start = startsWith.getOrElse(IdentityColumn.defaultStart.toLong)
val step = incrementBy.getOrElse(IdentityColumn.defaultStep.toLong)
col = generatedAsIdentityType match {
case GeneratedAsIdentityType.GeneratedAlways =>
col.generatedAlwaysAsIdentity(start, step)
case GeneratedAsIdentityType.GeneratedByDefault =>
col.generatedByDefaultAsIdentity(start, step)
}

comment.foreach { c =>
col = col.comment(c)
}

col.build()
}
}

trait DDLTestUtils extends QueryTest with SharedSparkSession {
trait DDLTestUtils extends QueryTest with SharedSparkSession with DeltaSQLCommandTest {
protected object DDLType extends Enumeration {
val CREATE, REPLACE, CREATE_OR_REPLACE = Value
}
Expand Down Expand Up @@ -157,7 +197,7 @@ trait SQLDDLTestUtils extends DDLTestUtils {
}
}

trait ScalaDLLTestUtils extends DDLTestUtils {
trait ScalaDDLTestUtils extends DDLTestUtils {
protected def runDDL(
ddlType: DDLType.Value,
tableName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3205,6 +3205,43 @@ trait DeltaErrorsSuiteBase
Some(s"Could not resolve expression: ${exprs.mkString(",")}")
)
}
{
val unsupportedDataType = IntegerType
val e = intercept[DeltaUnsupportedOperationException] {
throw DeltaErrors.identityColumnDataTypeNotSupported(unsupportedDataType)
}
checkErrorMessage(
e,
Some("DELTA_IDENTITY_COLUMNS_UNSUPPORTED_DATA_TYPE"),
Some("428H2"),
Some(s"DataType ${unsupportedDataType.typeName} is not supported for IDENTITY columns."),
startWith = true
)
}
{
val e = intercept[DeltaAnalysisException] {
throw DeltaErrors.identityColumnIllegalStep()
}
checkErrorMessage(
e,
Some("DELTA_IDENTITY_COLUMNS_ILLEGAL_STEP"),
Some("42611"),
Some("IDENTITY column step cannot be 0."),
startWith = true
)
}
{
val e = intercept[DeltaAnalysisException] {
throw DeltaErrors.identityColumnWithGenerationExpression()
}
checkErrorMessage(
e,
Some("DELTA_IDENTITY_COLUMNS_WITH_GENERATED_EXPRESSION"),
Some("42613"),
Some("IDENTITY column cannot be specified with a generated column expression."),
startWith = true
)
}
}
}

Expand Down
Loading

0 comments on commit 44b76fa

Please sign in to comment.