Skip to content

Commit

Permalink
[Spark] [Delta X-Compile] Refactor AnalysisException to DeltaAnalysis…
Browse files Browse the repository at this point in the history
…Exception (batch 3) (delta-io#2815)

#### Which Delta project/connector is this regarding?

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

We want Delta to cross-compile against both Spark 3.5 and Spark Master
(4.0).

Unfortunately, the constructor `new AnalysisException("msg")` became
protected in Spark 4.0, meaning that all such occurances do not compile
against Spark 3.5.

Thus, we decided to:
- replace `AnalysisException` with `DeltaAnalysisException`
- use errorClasses
- assign temporary error classes when needed to speed this along

This PR fixes all remaining related compilation errors.

## How was this patch tested?

New UTs in `DeltaErrorsSuite`.

Also, cherry-picked to the oss-cross-compile branch
(delta-io#2780) and cross-compiled:
- (this branch) Spark 3.5: ✅ 
- (this branch) Spark 4.0: no remaining compilation errors.
  • Loading branch information
scottsand-db authored Mar 29, 2024
1 parent acd9c6c commit 9c302b0
Show file tree
Hide file tree
Showing 11 changed files with 287 additions and 71 deletions.
69 changes: 63 additions & 6 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,12 @@
],
"sqlState" : "42703"
},
"DELTA_COLUMN_MISSING_DATA_TYPE" : {
"message" : [
"The data type of the column <colName> was not provided."
],
"sqlState" : "42601"
},
"DELTA_COLUMN_NOT_FOUND" : {
"message" : [
"Unable to find the column `<columnName>` given [<columnList>]"
Expand All @@ -443,8 +449,9 @@
"DELTA_COLUMN_PATH_NOT_NESTED" : {
"message" : [
"Expected <columnPath> to be a nested data type, but found <other>. Was looking for the",
"index of <column> in a nested field",
""
"index of <column> in a nested field.",
"Schema:",
"<schema>"
],
"sqlState" : "42704"
},
Expand Down Expand Up @@ -585,6 +592,18 @@
],
"sqlState" : "42K03"
},
"DELTA_CREATE_TABLE_IDENTIFIER_LOCATION_MISMATCH" : {
"message" : [
"Creating path-based Delta table with a different location isn't supported. Identifier: <identifier>, Location: <location>"
],
"sqlState" : "0AKDC"
},
"DELTA_CREATE_TABLE_MISSING_TABLE_NAME_OR_LOCATION" : {
"message" : [
"Table name or location has to be specified."
],
"sqlState" : "42601"
},
"DELTA_CREATE_TABLE_SCHEME_MISMATCH" : {
"message" : [
"The specified schema does not match the existing schema at <path>.",
Expand Down Expand Up @@ -690,6 +709,13 @@
],
"sqlState" : "42KD8"
},
"DELTA_DROP_COLUMN_ON_SINGLE_FIELD_SCHEMA" : {
"message" : [
"Cannot drop column from a schema with a single column. Schema:",
"<schema>"
],
"sqlState" : "0AKDC"
},
"DELTA_DUPLICATE_COLUMNS_FOUND" : {
"message" : [
"Found duplicate column(s) <coltype>: <duplicateCols>"
Expand Down Expand Up @@ -956,7 +982,9 @@
"<value>",
"followed by the name of the column (only if that column is a struct type).",
"e.g. mymap.key.mykey",
"If the column is a basic type, mymap.key or mymap.value is sufficient."
"If the column is a basic type, mymap.key or mymap.value is sufficient.",
"Schema:",
"<schema>"
],
"sqlState" : "KD003"
},
Expand Down Expand Up @@ -1092,9 +1120,9 @@
"DELTA_INCORRECT_ARRAY_ACCESS_BY_NAME" : {
"message" : [
"An ArrayType was found. In order to access elements of an ArrayType, specify",
"<rightName>",
"Instead of <wrongName>",
""
"<rightName> instead of <wrongName>.",
"Schema:",
"<schema>"
],
"sqlState" : "KD003"
},
Expand Down Expand Up @@ -2791,5 +2819,34 @@
"message" : [
"<message>"
]
},
"_LEGACY_ERROR_TEMP_DELTA_0008" : {
"message" : [
"Error while searching for position of column <column>.",
"Schema:",
"<schema>",
"Error:",
"<message>"
]
},
"_LEGACY_ERROR_TEMP_DELTA_0009" : {
"message" : [
"<optionalPrefixMessage>Updating nested fields is only supported for StructType."
]
},
"_LEGACY_ERROR_TEMP_DELTA_0010" : {
"message" : [
"<optionalPrefixMessage>Found unsupported expression <expression> while parsing target column name parts."
]
},
"_LEGACY_ERROR_TEMP_DELTA_0011" : {
"message" : [
"Failed to resolve plan."
]
},
"_LEGACY_ERROR_TEMP_DELTA_0012" : {
"message" : [
"Could not resolve expression: <expression>"
]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class DeltaColumnBuilder private[tables](
}
val fieldMetadata = metadataBuilder.build()
if (dataType == null) {
throw DeltaErrors.analysisException(s"The data type of the column $colName is not provided")
throw DeltaErrors.columnBuilderMissingDataType(colName)
}
StructField(
colName,
Expand Down
11 changes: 9 additions & 2 deletions spark/src/main/scala/io/delta/tables/DeltaMergeBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package io.delta.tables
import scala.collection.JavaConverters._
import scala.collection.Map

import org.apache.spark.sql.delta.{DeltaErrors, PostHocResolveUpCast, PreprocessTableMerge, ResolveDeltaMergeInto}
import org.apache.spark.sql.delta.{DeltaAnalysisException, PostHocResolveUpCast, PreprocessTableMerge, ResolveDeltaMergeInto}
import org.apache.spark.sql.delta.DeltaTableUtils.withActiveSession
import org.apache.spark.sql.delta.DeltaViewHelper
import org.apache.spark.sql.delta.commands.MergeIntoCommand
Expand All @@ -28,6 +28,7 @@ import org.apache.spark.sql.delta.util.AnalysisHelper
import org.apache.spark.annotation._
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.ExtendedAnalysisException
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, NamedExpression}
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -305,7 +306,13 @@ class DeltaMergeBuilder private(
ResolveDeltaMergeInto.resolveReferencesAndSchema(mergePlan, sparkSession.sessionState.conf)(
tryResolveReferencesForExpressions(sparkSession))
if (!resolvedMergeInto.resolved) {
throw DeltaErrors.analysisException("Failed to resolve\n", plan = Some(resolvedMergeInto))
throw new ExtendedAnalysisException(
new DeltaAnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_DELTA_0011",
messageParameters = Array.empty
),
resolvedMergeInto
)
}
val strippedMergeInto = resolvedMergeInto.copy(
target = DeltaViewHelper.stripTempViewForMerge(resolvedMergeInto.target, SQLConf.get)
Expand Down
6 changes: 2 additions & 4 deletions spark/src/main/scala/io/delta/tables/DeltaTableBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ class DeltaTableBuilder private[tables](
@Evolving
def execute(): DeltaTable = withActiveSession(spark) {
if (identifier == null && location.isEmpty) {
throw DeltaErrors.analysisException("Table name or location has to be specified")
throw DeltaErrors.createTableMissingTableNameOrLocation()
}

if (this.identifier == null) {
Expand All @@ -317,9 +317,7 @@ class DeltaTableBuilder private[tables](

if (DeltaTableUtils.isValidPath(tableId) && location.nonEmpty
&& tableId.table != location.get) {
throw DeltaErrors.analysisException(
s"Creating path-based Delta table with a different location isn't supported. "
+ s"Identifier: $identifier, Location: ${location.get}")
throw DeltaErrors.createTableIdentifierLocationMismatch(identifier, location.get)
}

val table = spark.sessionState.sqlParser.parseMultipartIdentifier(identifier)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.delta.DeltaAnalysisException

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, ExtractValue, GetStructField}

Expand Down Expand Up @@ -64,11 +66,6 @@ object DeltaUpdateTable {
*/
def getTargetColNameParts(resolvedTargetCol: Expression, errMsg: String = null): Seq[String] = {

def fail(extraMsg: String): Nothing = {
val msg = Option(errMsg).map(_ + " - ").getOrElse("") + extraMsg
throw new AnalysisException(msg)
}

def extractRecursively(expr: Expression): Seq[String] = expr match {
case attr: Attribute => Seq(attr.name)

Expand All @@ -77,10 +74,16 @@ object DeltaUpdateTable {
case GetStructField(c, _, Some(name)) => extractRecursively(c) :+ name

case _: ExtractValue =>
fail("Updating nested fields is only supported for StructType.")
throw new DeltaAnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_DELTA_0009",
messageParameters = Array(Option(errMsg).map(_ + " - ").getOrElse(""))
)

case other =>
fail(s"Found unsupported expression '$other' while parsing target column name parts")
throw new DeltaAnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_DELTA_0010",
messageParameters = Array(Option(errMsg).map(_ + " - ").getOrElse(""), other.sql)
)
}

extractRecursively(resolvedTargetCol)
Expand Down
63 changes: 46 additions & 17 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,9 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.JsonUtils
import io.delta.sql.DeltaSparkSessionExtension
import org.apache.hadoop.fs.{ChecksumException, Path}
import org.json4s.JValue

import org.apache.spark.{SparkConf, SparkEnv, SparkException}
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.ExtendedAnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.CatalogTable
Expand Down Expand Up @@ -258,15 +256,6 @@ trait DeltaErrorsBase

def formatSchema(schema: StructType): String = schema.treeString

def analysisException(
msg: String,
line: Option[Int] = None,
startPosition: Option[Int] = None,
plan: Option[LogicalPlan] = None,
cause: Option[Throwable] = None): AnalysisException = {
new ExtendedAnalysisException(msg, line, startPosition, plan, cause)
}

def notNullColumnMissingException(constraint: Constraints.NotNull): Throwable = {
new DeltaInvariantViolationException(
errorClass = "DELTA_MISSING_NOT_NULL_COLUMN_VALUE",
Expand Down Expand Up @@ -1626,10 +1615,10 @@ trait DeltaErrorsBase
messageParameters = Array(option, operation))
}

def foundMapTypeColumnException(key: String, value: String): Throwable = {
def foundMapTypeColumnException(key: String, value: String, schema: StructType): Throwable = {
new DeltaAnalysisException(
errorClass = "DELTA_FOUND_MAP_TYPE_COLUMN",
messageParameters = Array(key, value)
messageParameters = Array(key, value, schema.treeString)
)
}
def columnNotInSchemaException(column: String, schema: StructType): Throwable = {
Expand Down Expand Up @@ -2601,20 +2590,28 @@ trait DeltaErrorsBase
)
}

def incorrectArrayAccessByName(rightName: String, wrongName: String): Throwable = {
def incorrectArrayAccessByName(
rightName: String,
wrongName: String,
schema: StructType): Throwable = {
new DeltaAnalysisException(
errorClass = "DELTA_INCORRECT_ARRAY_ACCESS_BY_NAME",
messageParameters = Array(rightName, wrongName)
messageParameters = Array(rightName, wrongName, schema.treeString)
)
}

def columnPathNotNested(columnPath: String, other: DataType, column: Seq[String]): Throwable = {
def columnPathNotNested(
columnPath: String,
other: DataType,
column: Seq[String],
schema: StructType): Throwable = {
new DeltaAnalysisException(
errorClass = "DELTA_COLUMN_PATH_NOT_NESTED",
messageParameters = Array(
s"$columnPath",
s"$other",
s"${SchemaUtils.prettyFieldName(column)}"
s"${SchemaUtils.prettyFieldName(column)}",
schema.treeString
)
)
}
Expand Down Expand Up @@ -3274,6 +3271,38 @@ trait DeltaErrorsBase
messageParameters = Array(toSQLId(columnName))
)
}

def columnBuilderMissingDataType(colName: String): Throwable = {
new DeltaAnalysisException(
errorClass = "DELTA_COLUMN_MISSING_DATA_TYPE",
messageParameters = Array(toSQLId(colName)))
}

def createTableMissingTableNameOrLocation(): Throwable = {
new DeltaAnalysisException(
errorClass = "DELTA_CREATE_TABLE_MISSING_TABLE_NAME_OR_LOCATION",
messageParameters = Array.empty)
}

def createTableIdentifierLocationMismatch(identifier: String, location: String): Throwable = {
new DeltaAnalysisException(
errorClass = "DELTA_CREATE_TABLE_IDENTIFIER_LOCATION_MISMATCH",
messageParameters = Array(identifier, location))
}

def dropColumnOnSingleFieldSchema(schema: StructType): Throwable = {
new DeltaAnalysisException(
errorClass = "DELTA_DROP_COLUMN_ON_SINGLE_FIELD_SCHEMA",
messageParameters = Array(schema.treeString))
}

def errorFindingColumnPosition(
columnPath: Seq[String], schema: StructType, extraErrMsg: String): Throwable = {
new DeltaAnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_DELTA_0008",
messageParameters = Array(
UnresolvedAttribute(columnPath).name, schema.treeString, extraErrMsg))
}
}

object DeltaErrors extends DeltaErrorsBase
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,8 @@ def normalizeColumnNamesInDataType(
case (_: MapType, _) =>
throw DeltaErrors.foundMapTypeColumnException(
prettyFieldName(currentPath :+ "key"),
prettyFieldName(currentPath :+ "value"))
prettyFieldName(currentPath :+ "value"),
schema)

case (array: ArrayType, "element") =>
val childPosition = findRecursively(
Expand All @@ -723,17 +724,19 @@ def normalizeColumnNamesInDataType(
case (_: ArrayType, _) =>
throw DeltaErrors.incorrectArrayAccessByName(
prettyFieldName(currentPath :+ "element"),
prettyFieldName(currentPath))
prettyFieldName(currentPath),
schema)
case _ =>
throw DeltaErrors.columnPathNotNested(currentFieldName, currentType, currentPath)
throw DeltaErrors.columnPathNotNested(currentFieldName, currentType, currentPath, schema)
}
}

try {
findRecursively(column, schema)
} catch {
case e: DeltaAnalysisException => throw e
case e: AnalysisException =>
throw new AnalysisException(e.getMessage + s":\n${schema.treeString}")
throw DeltaErrors.errorFindingColumnPosition(column, schema, e.getMessage)
}
}

Expand Down Expand Up @@ -905,8 +908,7 @@ def normalizeColumnNamesInDataType(
StructType(pre ++ Seq(mid) ++ post.tail) -> droppedColumn
} else {
if (length == 1) {
throw new AnalysisException(
"Cannot drop column from a struct type with a single field: " + schema)
throw DeltaErrors.dropColumnOnSingleFieldSchema(schema)
}
StructType(pre ++ post.tail) -> field
}
Expand Down
Loading

0 comments on commit 9c302b0

Please sign in to comment.