diff --git a/spark/src/main/resources/error/delta-error-classes.json b/spark/src/main/resources/error/delta-error-classes.json index d3615832816..a319b610b6c 100644 --- a/spark/src/main/resources/error/delta-error-classes.json +++ b/spark/src/main/resources/error/delta-error-classes.json @@ -1121,6 +1121,24 @@ }, "sqlState" : "KD00E" }, + "DELTA_IDENTITY_COLUMNS_ILLEGAL_STEP" : { + "message" : [ + "IDENTITY column step cannot be 0." + ], + "sqlState" : "42611" + }, + "DELTA_IDENTITY_COLUMNS_UNSUPPORTED_DATA_TYPE" : { + "message" : [ + "DataType is not supported for IDENTITY columns." + ], + "sqlState" : "428H2" + }, + "DELTA_IDENTITY_COLUMNS_WITH_GENERATED_EXPRESSION" : { + "message" : [ + "IDENTITY column cannot be specified with a generated column expression." + ], + "sqlState" : "42613" + }, "DELTA_ILLEGAL_FILE_FOUND" : { "message" : [ "Illegal files found in a dataChange = false transaction. Files: " diff --git a/spark/src/main/scala/io/delta/tables/DeltaColumnBuilder.scala b/spark/src/main/scala/io/delta/tables/DeltaColumnBuilder.scala index b5d3adaa065..e5dcb57acb0 100644 --- a/spark/src/main/scala/io/delta/tables/DeltaColumnBuilder.scala +++ b/spark/src/main/scala/io/delta/tables/DeltaColumnBuilder.scala @@ -16,12 +16,13 @@ package io.delta.tables -import org.apache.spark.sql.delta.DeltaErrors -import org.apache.spark.sql.delta.sources.DeltaSourceUtils.GENERATION_EXPRESSION_METADATA_KEY +import org.apache.spark.sql.delta.{DeltaErrors, IdentityColumn} +import org.apache.spark.sql.delta.sources.DeltaSourceUtils.{GENERATION_EXPRESSION_METADATA_KEY, IDENTITY_INFO_ALLOW_EXPLICIT_INSERT, IDENTITY_INFO_START, IDENTITY_INFO_STEP} +import org.apache.spark.sql.util.ScalaExtensions._ import org.apache.spark.annotation._ import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.types.{DataType, MetadataBuilder, StructField} +import org.apache.spark.sql.types.{DataType, LongType, MetadataBuilder, StructField} /** * :: Evolving :: @@ -39,6 +40,9 @@ class DeltaColumnBuilder private[tables]( private var nullable: Boolean = true private var generationExpr: Option[String] = None private var comment: Option[String] = None + private var identityStart: Option[Long] = None + private var identityStep: Option[Long] = None + private var identityAllowExplicitInsert: Option[Boolean] = None /** * :: Evolving :: @@ -96,6 +100,69 @@ class DeltaColumnBuilder private[tables]( this } + /** + * :: Evolving :: + * + * Specify a column as an identity column with default values that is always generated + * by the system (i.e. does not allow user-specified values). + * + * @since 3.3.0 + */ + @Evolving + def generatedAlwaysAsIdentity(): DeltaColumnBuilder = { + generatedAlwaysAsIdentity(IdentityColumn.defaultStart, IdentityColumn.defaultStep) + } + + /** + * :: Evolving :: + * + * Specify a column as an identity column that is always generated by the system (i.e. does not + * allow user-specified values). + * + * @param start the start value of the identity column + * @param step the increment step of the identity column + * @since 3.3.0 + */ + @Evolving + def generatedAlwaysAsIdentity(start: Long, step: Long): DeltaColumnBuilder = { + this.identityStart = Some(start) + this.identityStep = Some(step) + this.identityAllowExplicitInsert = Some(false) + + this + } + + /** + * :: Evolving :: + * + * Specify a column as an identity column that allows user-specified values such that the + * generated values use default start and step values. + * + * @since 3.3.0 + */ + @Evolving + def generatedByDefaultAsIdentity(): DeltaColumnBuilder = { + generatedByDefaultAsIdentity(IdentityColumn.defaultStart, IdentityColumn.defaultStep) + } + + /** + * :: Evolving :: + * + * Specify a column as an identity column that allows user-specified values. + * + * @param start the start value of the identity column + * @param step the increment step of the identity column + * @since 3.3.0 + */ + @Evolving + def generatedByDefaultAsIdentity(start: Long, step: Long): DeltaColumnBuilder = { + this.identityStart = Some(start) + this.identityStep = Some(step) + this.identityAllowExplicitInsert = Some(true) + + this + } + /** * :: Evolving :: * @@ -123,6 +190,26 @@ class DeltaColumnBuilder private[tables]( if (generationExpr.nonEmpty) { metadataBuilder.putString(GENERATION_EXPRESSION_METADATA_KEY, generationExpr.get) } + + identityAllowExplicitInsert.ifDefined { allowExplicitInsert => + if (generationExpr.nonEmpty) { + throw DeltaErrors.identityColumnWithGenerationExpression() + } + + if (dataType != null && dataType != LongType) { + throw DeltaErrors.identityColumnDataTypeNotSupported(dataType) + } + + metadataBuilder.putBoolean( + IDENTITY_INFO_ALLOW_EXPLICIT_INSERT, allowExplicitInsert) + metadataBuilder.putLong(IDENTITY_INFO_START, identityStart.get) + val step = identityStep.get + if (step == 0L) { + throw DeltaErrors.identityColumnIllegalStep() + } + metadataBuilder.putLong(IDENTITY_INFO_STEP, identityStep.get) + } + if (comment.nonEmpty) { metadataBuilder.putString("comment", comment.get) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index 6024382c530..aa0a755d2b0 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -2445,6 +2445,22 @@ trait DeltaErrorsBase new DeltaAnalysisException(errorClass = "DELTA_UNSET_NON_EXISTENT_PROPERTY", Array(key, table)) } + def identityColumnWithGenerationExpression(): Throwable = { + new DeltaAnalysisException( + errorClass = "DELTA_IDENTITY_COLUMNS_WITH_GENERATED_EXPRESSION", Array.empty) + } + + def identityColumnIllegalStep(): Throwable = { + new DeltaAnalysisException(errorClass = "DELTA_IDENTITY_COLUMNS_ILLEGAL_STEP", Array.empty) + } + + def identityColumnDataTypeNotSupported(unsupportedType: DataType): Throwable = { + new DeltaUnsupportedOperationException( + errorClass = "DELTA_IDENTITY_COLUMNS_UNSUPPORTED_DATA_TYPE", + messageParameters = Array(unsupportedType.typeName) + ) + } + def identityColumnInconsistentMetadata( colName: String, hasStart: Boolean, diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index 510d1c9f4eb..8096f487550 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -462,6 +462,15 @@ trait OptimisticTransactionImpl extends TransactionalWrite } isCreatingNewTable = true } + + val identityColumnAllowed = + spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_IDENTITY_COLUMN_ENABLED) + if (!identityColumnAllowed && + ColumnWithDefaultExprUtils.hasIdentityColumn(newMetadataTmp.schema)) { + throw DeltaErrors.unsupportedWriterTableFeaturesInTableException( + deltaLog.dataPath.toString, Seq(IdentityColumnsTableFeature.name)) + } + val protocolBeforeUpdate = protocol // The `.schema` cannot be generated correctly unless the column mapping metadata is correctly // filled for all the fields. Therefore, the column mapping changes need to happen first. diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index 40449af6207..85cf62737c4 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -1744,6 +1744,21 @@ trait DeltaSQLConfBase { .booleanConf .createWithDefault(true) + /////////////////// + // IDENTITY COLUMN + /////////////////// + + val DELTA_IDENTITY_COLUMN_ENABLED = + buildConf("identityColumn.enabled") + .internal() + .doc( + """ + | The umbrella config to turn on/off the IDENTITY column support. + | If true, enable Delta IDENTITY column write support. If a table has an IDENTITY column, + | it is not writable but still readable if this config is set to false. + |""".stripMargin) + .booleanConf + .createWithDefault(false) /////////// // TESTING diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DDLTestUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DDLTestUtils.scala index 905f17dc3fd..a35b288ed9d 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DDLTestUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DDLTestUtils.scala @@ -16,6 +16,8 @@ package org.apache.spark.sql.delta +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest + import org.apache.spark.sql.{QueryTest, SparkSession} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{DataType, LongType, StructField} @@ -73,8 +75,46 @@ case class TestColumnSpec( } } +object GeneratedAsIdentityType extends Enumeration { + type GeneratedAsIdentityType = Value + val GeneratedAlways, GeneratedByDefault = Value +} + +case class IdentityColumnSpec( + generatedAsIdentityType: GeneratedAsIdentityType.GeneratedAsIdentityType, + startsWith: Option[Long] = None, + incrementBy: Option[Long] = None, + colName: String = "id", + dataType: DataType = LongType, + comment: Option[String] = None) + extends ColumnSpec { + + override def ddl: String = { + throw new UnsupportedOperationException( + "DDL generation is not supported for identity columns yet") + } + + override def structField(spark: SparkSession): StructField = { + var col = io.delta.tables.DeltaTable.columnBuilder(spark, colName) + .dataType(dataType) + val start = startsWith.getOrElse(IdentityColumn.defaultStart.toLong) + val step = incrementBy.getOrElse(IdentityColumn.defaultStep.toLong) + col = generatedAsIdentityType match { + case GeneratedAsIdentityType.GeneratedAlways => + col.generatedAlwaysAsIdentity(start, step) + case GeneratedAsIdentityType.GeneratedByDefault => + col.generatedByDefaultAsIdentity(start, step) + } + + comment.foreach { c => + col = col.comment(c) + } + + col.build() + } +} -trait DDLTestUtils extends QueryTest with SharedSparkSession { +trait DDLTestUtils extends QueryTest with SharedSparkSession with DeltaSQLCommandTest { protected object DDLType extends Enumeration { val CREATE, REPLACE, CREATE_OR_REPLACE = Value } @@ -157,7 +197,7 @@ trait SQLDDLTestUtils extends DDLTestUtils { } } -trait ScalaDLLTestUtils extends DDLTestUtils { +trait ScalaDDLTestUtils extends DDLTestUtils { protected def runDDL( ddlType: DDLType.Value, tableName: String, diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala index 31b4f4fe271..23235dbd2ca 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala @@ -3205,6 +3205,43 @@ trait DeltaErrorsSuiteBase Some(s"Could not resolve expression: ${exprs.mkString(",")}") ) } + { + val unsupportedDataType = IntegerType + val e = intercept[DeltaUnsupportedOperationException] { + throw DeltaErrors.identityColumnDataTypeNotSupported(unsupportedDataType) + } + checkErrorMessage( + e, + Some("DELTA_IDENTITY_COLUMNS_UNSUPPORTED_DATA_TYPE"), + Some("428H2"), + Some(s"DataType ${unsupportedDataType.typeName} is not supported for IDENTITY columns."), + startWith = true + ) + } + { + val e = intercept[DeltaAnalysisException] { + throw DeltaErrors.identityColumnIllegalStep() + } + checkErrorMessage( + e, + Some("DELTA_IDENTITY_COLUMNS_ILLEGAL_STEP"), + Some("42611"), + Some("IDENTITY column step cannot be 0."), + startWith = true + ) + } + { + val e = intercept[DeltaAnalysisException] { + throw DeltaErrors.identityColumnWithGenerationExpression() + } + checkErrorMessage( + e, + Some("DELTA_IDENTITY_COLUMNS_WITH_GENERATED_EXPRESSION"), + Some("42613"), + Some("IDENTITY column cannot be specified with a generated column expression."), + startWith = true + ) + } } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnSuite.scala new file mode 100644 index 00000000000..95df388f2c6 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnSuite.scala @@ -0,0 +1,200 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import scala.collection.mutable.ListBuffer + +import org.apache.spark.sql.delta.GeneratedAsIdentityType.{GeneratedAlways, GeneratedAsIdentityType, GeneratedByDefault} +import org.apache.spark.sql.delta.actions.Protocol +import org.apache.spark.sql.delta.schema.SchemaUtils +import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, Row} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ + +/** + * General test suite for identity columns. + */ +trait IdentityColumnSuiteBase extends IdentityColumnTestUtils { + + import testImplicits._ + protected val tblName = "identity_test" + test("Don't allow IDENTITY column in the schema if the feature is disabled") { + withSQLConf(DeltaSQLConf.DELTA_IDENTITY_COLUMN_ENABLED.key -> "false") { + withTable(tblName) { + val e = intercept[DeltaUnsupportedTableFeatureException] { + createTableWithIdColAndIntValueCol( + tblName, GeneratedByDefault, startsWith = None, incrementBy = None) + } + val errorMsg = e.getMessage + assert(errorMsg.contains("requires writer table feature(s) that are unsupported")) + assert(errorMsg.contains(IdentityColumnsTableFeature.name)) + } + } + } + + // Build expected schema of the following table definition for verification: + // CREATE TABLE tableName ( + // id BIGINT IDENTITY (START WITH INCREMENT BY ), + // value INT + // ); + private def expectedSchema( + generatedAsIdentityType: GeneratedAsIdentityType, + start: Long = IdentityColumn.defaultStart, + step: Long = IdentityColumn.defaultStep): StructType = { + val colFields = new ListBuffer[StructField] + + val allowExplicitInsert = generatedAsIdentityType == GeneratedByDefault + val builder = new MetadataBuilder() + builder.putBoolean(DeltaSourceUtils.IDENTITY_INFO_ALLOW_EXPLICIT_INSERT, + allowExplicitInsert) + builder.putLong(DeltaSourceUtils.IDENTITY_INFO_START, start) + builder.putLong(DeltaSourceUtils.IDENTITY_INFO_STEP, step) + colFields += StructField("id", LongType, true, builder.build()) + colFields += StructField("value", IntegerType) + + StructType(colFields.toSeq) + } + + test("various configuration") { + val starts = Seq( + Long.MinValue, + Integer.MIN_VALUE.toLong, + -100L, + 0L, + 1000L, + Integer.MAX_VALUE.toLong, + Long.MaxValue + ) + val steps = Seq( + Long.MinValue, + Integer.MIN_VALUE.toLong, + -100L, + 1000L, + Integer.MAX_VALUE.toLong, + Long.MaxValue + ) + for { + generatedAsIdentityType <- GeneratedAsIdentityType.values + startsWith <- starts + incrementBy <- steps + } { + withTable(tblName) { + createTableWithIdColAndIntValueCol( + tblName, generatedAsIdentityType, Some(startsWith), Some(incrementBy)) + val table = DeltaLog.forTable(spark, TableIdentifier(tblName)) + val actualSchema = + DeltaColumnMapping.dropColumnMappingMetadata(table.snapshot.metadata.schema) + assert(actualSchema === expectedSchema(generatedAsIdentityType, startsWith, incrementBy)) + } + } + } + + test("default configuration") { + for { + generatedAsIdentityType <- GeneratedAsIdentityType.values + startsWith <- Seq(Some(1L), None) + incrementBy <- Seq(Some(1L), None) + } { + withTable(tblName) { + createTableWithIdColAndIntValueCol( + tblName, generatedAsIdentityType, startsWith, incrementBy) + val table = DeltaLog.forTable(spark, TableIdentifier(tblName)) + val actualSchema = + DeltaColumnMapping.dropColumnMappingMetadata(table.snapshot.metadata.schema) + assert(actualSchema === expectedSchema(generatedAsIdentityType)) + } + } + } + +} + +class IdentityColumnScalaSuite + extends IdentityColumnSuiteBase + with ScalaDDLTestUtils { + + test("unsupported column type") { + val tblName = "identity_test" + for (unsupportedType <- unsupportedDataTypes) { + withTable(tblName) { + val ex = intercept[DeltaUnsupportedOperationException] { + createTable( + tblName, + Seq( + IdentityColumnSpec(GeneratedAlways, dataType = unsupportedType), + TestColumnSpec(colName = "value", dataType = StringType) + ) + ) + } + assert(ex.getErrorClass === "DELTA_IDENTITY_COLUMNS_UNSUPPORTED_DATA_TYPE") + assert(ex.getMessage.contains("is not supported for IDENTITY columns")) + } + } + } + + test("unsupported step") { + val tblName = "identity_test" + for { + generatedAsIdentityType <- GeneratedAsIdentityType.values + startsWith <- Seq(Some(1L), None) + } { + withTable(tblName) { + val ex = intercept[DeltaAnalysisException] { + createTableWithIdColAndIntValueCol( + tblName, generatedAsIdentityType, startsWith, incrementBy = Some(0)) + } + assert(ex.getErrorClass === "DELTA_IDENTITY_COLUMNS_ILLEGAL_STEP") + assert(ex.getMessage.contains("step cannot be 0.")) + } + } + } + + test("cannot specify generatedAlwaysAs with identity columns") { + def expectColumnBuilderError(f: => StructField): Unit = { + val ex = intercept[DeltaAnalysisException] { + f + } + assert(ex.getErrorClass === "DELTA_IDENTITY_COLUMNS_WITH_GENERATED_EXPRESSION") + ex.getMessage.contains( + "Identity column cannot be specified with a generated column expression.") + } + val generatedColumn = io.delta.tables.DeltaTable.columnBuilder(spark, "id") + .dataType(LongType) + .generatedAlwaysAs("id + 1") + + expectColumnBuilderError { + generatedColumn.generatedAlwaysAsIdentity().build() + } + + expectColumnBuilderError { + generatedColumn.generatedByDefaultAsIdentity().build() + } + } +} + +class IdentityColumnScalaIdColumnMappingSuite + extends IdentityColumnSuiteBase + with ScalaDDLTestUtils + with DeltaColumnMappingEnableIdMode + +class IdentityColumnScalaNameColumnMappingSuite + extends IdentityColumnSuiteBase + with ScalaDDLTestUtils + with DeltaColumnMappingEnableNameMode diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnTestUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnTestUtils.scala new file mode 100644 index 00000000000..a382ebfa12b --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnTestUtils.scala @@ -0,0 +1,68 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import org.apache.spark.sql.delta.GeneratedAsIdentityType.{GeneratedAlways, GeneratedAsIdentityType} +import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.Row +import org.apache.spark.sql.types._ + +trait IdentityColumnTestUtils + extends DDLTestUtils { + + protected override def sparkConf: SparkConf = { + super.sparkConf + .set(DeltaSQLConf.DELTA_IDENTITY_COLUMN_ENABLED.key, "true") + } + + protected val unsupportedDataTypes: Seq[DataType] = Seq( + BooleanType, + ByteType, + ShortType, + IntegerType, + DoubleType, + DateType, + TimestampType, + StringType, + BinaryType, + DecimalType(precision = 5, scale = 2), + YearMonthIntervalType(startField = 0, endField = 0) // Interval Year + ) + + def createTableWithIdColAndIntValueCol( + tableName: String, + generatedAsIdentityType: GeneratedAsIdentityType, + startsWith: Option[Long], + incrementBy: Option[Long], + tblProperties: Map[String, String] = Map.empty): Unit = { + createTable( + tableName, + Seq( + IdentityColumnSpec( + generatedAsIdentityType, + startsWith, + incrementBy + ), + TestColumnSpec(colName = "value", dataType = IntegerType) + ), + tblProperties = tblProperties + ) + } +} +