Skip to content

Commit

Permalink
[Spark] Fix Spark-master compile errors (delta-io#3591)
Browse files Browse the repository at this point in the history
Spark-master based build broken of change
apache/spark#47785

---------

Co-authored-by: Thang Long VU <long.vu@databricks.com>
Co-authored-by: Thang Long Vu <107926660+longvu-db@users.noreply.github.com>
  • Loading branch information
3 people authored Sep 11, 2024
1 parent 73b420a commit b843ad6
Show file tree
Hide file tree
Showing 44 changed files with 274 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.annotation.Evolving
import org.apache.spark.sql.{functions, Column, DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.PrimitiveBooleanEncoder
import org.apache.spark.sql.connect.delta.ImplicitProtoConversions._
import org.apache.spark.sql.internal.ColumnNodeToProtoConverter.toExpr

/**
* Main class for programmatically interacting with Delta tables.
Expand Down Expand Up @@ -137,7 +138,7 @@ class DeltaTable private[tables](
val delete = proto.DeleteFromTable
.newBuilder()
.setTarget(df.plan.getRoot)
condition.foreach(c => delete.setCondition(c.expr))
condition.foreach(c => delete.setCondition(toExpr(c)))
val relation = proto.DeltaRelation.newBuilder().setDeleteFromTable(delete).build()
val extension = com.google.protobuf.Any.pack(relation)
val sparkRelation = spark_proto.Relation.newBuilder().setExtension(extension).build()
Expand Down Expand Up @@ -188,15 +189,15 @@ class DeltaTable private[tables](
val assignments = set.toSeq.map { case (field, value) =>
proto.Assignment
.newBuilder()
.setField(functions.expr(field).expr)
.setValue(value.expr)
.setField(toExpr(functions.expr(field)))
.setValue(toExpr(value))
.build()
}
val update = proto.UpdateTable
.newBuilder()
.setTarget(df.plan.getRoot)
.addAllAssignments(assignments.asJava)
condition.foreach(c => update.setCondition(c.expr))
condition.foreach(c => update.setCondition(toExpr(c)))
val relation = proto.DeltaRelation.newBuilder().setUpdateTable(update).build()
val extension = com.google.protobuf.Any.pack(relation)
val sparkRelation = spark_proto.Relation.newBuilder().setExtension(extension).build()
Expand Down
35 changes: 35 additions & 0 deletions spark/src/main/scala-spark-3.5/shims/ColumnExtShim.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql

import org.apache.spark.sql.catalyst.expressions.Expression

/**
* This shim is introduced to due to breaking `Column` API changes in Spark master with
* apache/spark#47785. It removed the following two APIs (both of which are already
* available in 3.5 and not needed to be shimmed):
* - `Column.expr`
* - `Column.apply(Expression)`
*/
object ColumnImplicitsShim {
/**
* Implicitly convert a [[Column]] to an [[Expression]]. Sometimes the `Column.expr` extension
* above conflicts other implicit conversions, so this method can be explicitly used.
*/
def expression(column: Column): Expression = {
column.expr
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.errors

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.errors.QueryCompilationErrors

/**
* A shim to handle a breaking change (apache/spark@53c1f31) which changed the
* `showColumnsWithConflictDatabasesError` method name and arguments in Spark master.
*/
object QueryCompilationErrorsShim {
def showColumnsWithConflictDatabasesError(
db: Seq[String],
v1TableName: TableIdentifier): Throwable = {
QueryCompilationErrors.showColumnsWithConflictDatabasesError(db, v1TableName)
}
}
53 changes: 53 additions & 0 deletions spark/src/main/scala-spark-master/shims/ColumnExtShim.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.internal.ExpressionUtils

/**
* This shim is introduced to due to breaking `Column` API changes in Spark master with
* apache/spark#47785. It removed the following two APIs:
* - `Column.expr` - Replaced with `ExpressionUtils.expression(column)`
* - `Column.apply(Expression)` - Replaced with `ExpressionUtils.column(e)`
*/
object ColumnImplicitsShim {

/**
* Extend [[Column]] to provide `expr` method to get the [[Expression]].
* This avoids changing every `Column.expr` to `ExpressionUtils.expression(column)`.
*
* @param column The column to get the expression from.
*/
implicit class ColumnExprExt(val column: Column) extends AnyVal {
def expr: Expression = ExpressionUtils.expression(column)
}

/**
* Provide an implicit constructor to create a [[Column]] from an [[Expression]].
*/
implicit class ColumnConstructorExt(val c: Column.type) extends AnyVal {
def apply(e: Expression): Column = ExpressionUtils.column(e)
}

/**
* Implicitly convert a [[Column]] to an [[Expression]]. Sometimes the `Column.expr` extension
* above conflicts other implicit conversions, so this method can be explicitly used.
*/
def expression(column: Column): Expression = {
ExpressionUtils.expression(column)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.errors

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.errors.QueryCompilationErrors

/**
* A shim to handle a breaking change (apache/spark@53c1f31) which changed the
* `showColumnsWithConflictDatabasesError` method name and arguments in Spark master.
*/
object QueryCompilationErrorsShim {
def showColumnsWithConflictDatabasesError(
db: Seq[String],
v1TableName: TableIdentifier): Throwable = {
QueryCompilationErrors.showColumnsWithConflictNamespacesError(
Seq(db.head), Seq(v1TableName.database.get))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.delta.util.AnalysisHelper
import org.apache.spark.annotation._
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.catalyst.ExtendedAnalysisException
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, NamedExpression}
Expand Down
1 change: 1 addition & 0 deletions spark/src/main/scala/io/delta/tables/DeltaTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.annotation._
import org.apache.spark.sql._
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.types.StructType

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.delta.commands.{DeltaGenerateCommand, DescribeDeltaD
import org.apache.spark.sql.delta.util.AnalysisHelper
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.{functions, Column, DataFrame}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.hadoop.mapreduce.{Job, TaskType}
import org.apache.spark.TaskContext
import org.apache.spark.internal.MDC
import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{Cast, ElementAt, Literal}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf, DeltaStreamUtils}

import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.Column
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.EqualNullSafe
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.errors.QueryCompilationErrorsShim._
import org.apache.spark.sql.execution.command.CreateTableLikeCommand
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
Expand Down Expand Up @@ -573,7 +574,7 @@ class DeltaAnalysis(session: SparkSession)
val v1TableName = child.identifier.asTableIdentifier
namespace.foreach { ns =>
if (v1TableName.database.exists(!resolver(_, ns.head))) {
throw QueryCompilationErrors.showColumnsWithConflictDatabasesError(ns, v1TableName)
throw showColumnsWithConflictDatabasesError(ns, v1TableName)
}
}
ShowDeltaTableColumnsCommand(child)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}

import org.apache.spark.sql._
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.catalyst.{FileSourceOptions, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStatistics, CatalogTable}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ import org.apache.spark.sql.delta.sources.DeltaSourceUtils.GENERATION_EXPRESSION
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.AnalysisHelper

import org.apache.spark.sql.{AnalysisException, Column, Dataset, SparkSession}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.Column
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.Analyzer
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ import org.apache.spark.sql.delta.util.JsonUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.{Column, DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.Column
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ import org.apache.hadoop.fs.{FileStatus, Path}

import org.apache.spark.SparkException
import org.apache.spark.internal.{MDC, MessageWithContext}
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, SparkSession}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.Column
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.apache.hadoop.fs.{FileStatus, Path}

import org.apache.spark.internal.Logging
import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.{Column, Encoder, SparkSession}
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.paths.SparkPath
import org.apache.spark.sql._
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf
import com.fasterxml.jackson.databind.annotation.JsonDeserialize

import org.apache.spark.SparkContext
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.Column
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, EqualNullSafe, Expression, If, Literal, Not}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import org.apache.hadoop.fs.Path

import org.apache.spark.SparkContext
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.Column
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, If, Literal}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.delta.skipping.clustering.temp.ClusterBySpec
import org.apache.spark.sql.delta.sources.DeltaSQLConf

import org.apache.spark.sql._
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression, Literal}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ import org.apache.spark.sql.delta.stats.StatisticsCollection
import org.apache.hadoop.fs.Path

import org.apache.spark.internal.MDC
import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.Column
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.catalog.CatalogUtils
import org.apache.spark.sql.catalyst.expressions._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,13 @@ import org.apache.spark.sql.util.ScalaExtensions.OptionExt

import org.apache.spark.internal.MDC
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession, SQLContext}
import org.apache.spark.sql.Column
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Literal}
import org.apache.spark.sql.catalyst.plans.logical.Statistics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ import org.apache.spark.sql.delta.commands.merge.MergeOutputGeneration.{SOURCE_R
import org.apache.spark.sql.delta.files.TahoeBatchFileIndex
import org.apache.spark.sql.delta.util.SetAccumulator

import org.apache.spark.sql.{Column, Dataset, SparkSession}
import org.apache.spark.sql.Column
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{And, Expression, Literal, Or}
import org.apache.spark.sql.catalyst.plans.logical.DeltaMergeIntoClause
import org.apache.spark.sql.functions.{coalesce, col, count, input_file_name, lit, monotonically_increasing_id, sum}
Expand Down
Loading

0 comments on commit b843ad6

Please sign in to comment.