From 9cfe1f24dbf843a2664ce0259988a195adbc0ea2 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 9 Dec 2024 09:23:18 +0800 Subject: [PATCH 01/16] fixup --- .../apache/gluten/backendsapi/clickhouse/CHBackend.scala | 2 +- .../apache/gluten/backendsapi/velox/VeloxBackend.scala | 2 +- .../src/main/scala/org/apache/gluten/GlutenPlugin.scala | 2 +- .../main/scala/org/apache/gluten/backend/Backend.scala | 2 ++ .../apache/gluten/{backend => component}/Component.scala | 4 ++-- .../apache/gluten/{backend => component}/package.scala | 8 +++++--- .../apache/gluten/extension/GlutenSessionExtensions.scala | 2 +- .../columnar/enumerated/EnumeratedTransform.scala | 2 +- .../extension/columnar/heuristic/HeuristicTransform.scala | 2 +- .../extension/columnar/transition/ConventionFunc.scala | 2 +- .../gluten/{backend => component}/ComponentSuite.scala | 4 ++-- .../apache/gluten/backendsapi/BackendsApiManager.scala | 2 +- 12 files changed, 19 insertions(+), 15 deletions(-) rename gluten-core/src/main/scala/org/apache/gluten/{backend => component}/Component.scala (98%) rename gluten-core/src/main/scala/org/apache/gluten/{backend => component}/package.scala (88%) rename gluten-core/src/test/scala/org/apache/gluten/{backend => component}/ComponentSuite.scala (97%) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala index 83a92db51897..3ff2805c036b 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala @@ -18,7 +18,7 @@ package org.apache.gluten.backendsapi.clickhouse import org.apache.gluten.GlutenBuildInfo._ import org.apache.gluten.GlutenConfig -import org.apache.gluten.backend.Component.BuildInfo +import org.apache.gluten.component.Component.BuildInfo import org.apache.gluten.backendsapi._ import org.apache.gluten.columnarbatch.CHBatch import org.apache.gluten.execution.WriteFilesExecTransformer diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index 6bf4b6a4e256..be497d424b6e 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -18,7 +18,7 @@ package org.apache.gluten.backendsapi.velox import org.apache.gluten.GlutenBuildInfo._ import org.apache.gluten.GlutenConfig -import org.apache.gluten.backend.Component.BuildInfo +import org.apache.gluten.component.Component.BuildInfo import org.apache.gluten.backendsapi._ import org.apache.gluten.columnarbatch.VeloxBatch import org.apache.gluten.exception.GlutenNotSupportException diff --git a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala index 03d16c41c72b..2262c4cc1784 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala @@ -18,7 +18,7 @@ package org.apache.gluten import org.apache.gluten.GlutenBuildInfo._ import org.apache.gluten.GlutenConfig._ -import org.apache.gluten.backend.Component +import org.apache.gluten.component.Component import org.apache.gluten.events.GlutenBuildInfoEvent import org.apache.gluten.exception.GlutenException import org.apache.gluten.extension.GlutenSessionExtensions diff --git a/gluten-core/src/main/scala/org/apache/gluten/backend/Backend.scala b/gluten-core/src/main/scala/org/apache/gluten/backend/Backend.scala index 02a2a44349c9..0434e11c02d3 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/backend/Backend.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/backend/Backend.scala @@ -16,6 +16,8 @@ */ package org.apache.gluten.backend +import org.apache.gluten.component.Component + trait Backend extends Component { /** diff --git a/gluten-core/src/main/scala/org/apache/gluten/backend/Component.scala b/gluten-core/src/main/scala/org/apache/gluten/component/Component.scala similarity index 98% rename from gluten-core/src/main/scala/org/apache/gluten/backend/Component.scala rename to gluten-core/src/main/scala/org/apache/gluten/component/Component.scala index 8670bede87e5..6a3b74699b71 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/backend/Component.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/component/Component.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.gluten.backend +package org.apache.gluten.component import org.apache.gluten.extension.columnar.transition.ConventionFunc import org.apache.gluten.extension.injector.Injector @@ -100,7 +100,7 @@ object Component { graph.sorted() } - private[backend] def sortedUnsafe(): Seq[Component] = { + private[component] def sortedUnsafe(): Seq[Component] = { graph.sorted() } diff --git a/gluten-core/src/main/scala/org/apache/gluten/backend/package.scala b/gluten-core/src/main/scala/org/apache/gluten/component/package.scala similarity index 88% rename from gluten-core/src/main/scala/org/apache/gluten/backend/package.scala rename to gluten-core/src/main/scala/org/apache/gluten/component/package.scala index a9981719a333..f74b96729418 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/backend/package.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/component/package.scala @@ -16,6 +16,8 @@ */ package org.apache.gluten +import org.apache.gluten.backend.Backend + import org.apache.spark.internal.Logging import java.util.ServiceLoader @@ -23,10 +25,10 @@ import java.util.concurrent.atomic.AtomicBoolean import scala.collection.JavaConverters._ -package object backend extends Logging { - private[backend] val allComponentsLoaded: AtomicBoolean = new AtomicBoolean(false) +package object component extends Logging { + private val allComponentsLoaded: AtomicBoolean = new AtomicBoolean(false) - private[backend] def ensureAllComponentsRegistered(): Unit = { + private[component] def ensureAllComponentsRegistered(): Unit = { if (!allComponentsLoaded.compareAndSet(false, true)) { return } diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala index 794f38365a0e..5b3e97f356ca 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenSessionExtensions.scala @@ -17,7 +17,7 @@ package org.apache.gluten.extension import org.apache.gluten.GlutenConfig -import org.apache.gluten.backend.Component +import org.apache.gluten.component.Component import org.apache.gluten.extension.injector.Injector import org.apache.spark.internal.Logging diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala index f1a325bc4379..34b4005a756d 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala @@ -16,7 +16,7 @@ */ package org.apache.gluten.extension.columnar.enumerated -import org.apache.gluten.backend.Component +import org.apache.gluten.component.Component import org.apache.gluten.exception.GlutenException import org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleCall import org.apache.gluten.extension.columnar.enumerated.planner.GlutenOptimization diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicTransform.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicTransform.scala index e53c4cbf80e0..f9def8c94ba2 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicTransform.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicTransform.scala @@ -16,7 +16,7 @@ */ package org.apache.gluten.extension.columnar.heuristic -import org.apache.gluten.backend.Component +import org.apache.gluten.component.Component import org.apache.gluten.exception.GlutenException import org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleCall import org.apache.gluten.extension.columnar.offload.OffloadSingleNode diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala index bb894c2af0e1..c4405aeb8d0a 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala @@ -16,7 +16,7 @@ */ package org.apache.gluten.extension.columnar.transition -import org.apache.gluten.backend.Component +import org.apache.gluten.component.Component import org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildConvention import org.apache.gluten.sql.shims.SparkShimLoader diff --git a/gluten-core/src/test/scala/org/apache/gluten/backend/ComponentSuite.scala b/gluten-core/src/test/scala/org/apache/gluten/component/ComponentSuite.scala similarity index 97% rename from gluten-core/src/test/scala/org/apache/gluten/backend/ComponentSuite.scala rename to gluten-core/src/test/scala/org/apache/gluten/component/ComponentSuite.scala index a6f8bf2a0cbd..60141e1f9177 100644 --- a/gluten-core/src/test/scala/org/apache/gluten/backend/ComponentSuite.scala +++ b/gluten-core/src/test/scala/org/apache/gluten/component/ComponentSuite.scala @@ -14,10 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.gluten.backend +package org.apache.gluten.component +import org.apache.gluten.backend.Backend import org.apache.gluten.extension.injector.Injector - import org.scalatest.BeforeAndAfterAll import org.scalatest.funsuite.AnyFunSuite diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendsApiManager.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendsApiManager.scala index ab8ab3688916..3b4e97afb361 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendsApiManager.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendsApiManager.scala @@ -16,7 +16,7 @@ */ package org.apache.gluten.backendsapi -import org.apache.gluten.backend.Component +import org.apache.gluten.component.Component object BackendsApiManager { private lazy val backend: SubstraitBackend = initializeInternal() From 826d14de3d16d9f2ed3122004875c9bd3b6cea33 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 9 Dec 2024 10:45:44 +0800 Subject: [PATCH 02/16] fixup --- .../backendsapi/clickhouse/CHBackend.scala | 2 +- .../backendsapi/clickhouse/CHRuleApi.scala | 4 +- .../backendsapi/velox/VeloxBackend.scala | 2 +- .../backendsapi/velox/VeloxRuleApi.scala | 4 +- .../columnar/heuristic/LegacyOffload.scala | 1 - .../columnar/validator/Validator.scala | 18 +- .../gluten/component/ComponentSuite.scala | 1 + .../EnsureLocalSortRequirements.scala | 2 +- .../columnar/validator/Validators.scala | 199 ++---------------- 9 files changed, 41 insertions(+), 192 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala index 3ff2805c036b..0e79e6ca93f4 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala @@ -18,9 +18,9 @@ package org.apache.gluten.backendsapi.clickhouse import org.apache.gluten.GlutenBuildInfo._ import org.apache.gluten.GlutenConfig -import org.apache.gluten.component.Component.BuildInfo import org.apache.gluten.backendsapi._ import org.apache.gluten.columnarbatch.CHBatch +import org.apache.gluten.component.Component.BuildInfo import org.apache.gluten.execution.WriteFilesExecTransformer import org.apache.gluten.expression.WindowFunctionsBuilder import org.apache.gluten.extension.ValidationResult diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala index 141778688967..9e129f224dcf 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala @@ -82,6 +82,7 @@ object CHRuleApi { injector.injectPreTransform(_ => WriteFilesWithBucketValue) // Legacy: The legacy transform rule. + val offloads = Seq(OffloadOthers(), OffloadExchange(), OffloadJoin()) val validatorBuilder: GlutenConfig => Validator = conf => Validator .builder() @@ -91,11 +92,10 @@ object CHRuleApi { .fallbackByBackendSettings() .fallbackByUserOptions() .fallbackByTestInjects() - .fallbackByNativeValidation() + .fallbackByNativeValidation(offloads) .build() val rewrites = Seq(RewriteIn, RewriteMultiChildrenCount, RewriteJoin, PullOutPreProject, PullOutPostProject) - val offloads = Seq(OffloadOthers(), OffloadExchange(), OffloadJoin()) injector.injectTransform( c => intercept(HeuristicTransform.Single(validatorBuilder(c.glutenConf), rewrites, offloads))) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index be497d424b6e..b485763e2545 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -18,9 +18,9 @@ package org.apache.gluten.backendsapi.velox import org.apache.gluten.GlutenBuildInfo._ import org.apache.gluten.GlutenConfig -import org.apache.gluten.component.Component.BuildInfo import org.apache.gluten.backendsapi._ import org.apache.gluten.columnarbatch.VeloxBatch +import org.apache.gluten.component.Component.BuildInfo import org.apache.gluten.exception.GlutenNotSupportException import org.apache.gluten.execution.WriteFilesExecTransformer import org.apache.gluten.expression.WindowFunctionsBuilder diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala index 7337be573710..d6887f0463ac 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala @@ -74,6 +74,7 @@ object VeloxRuleApi { injector.injectPreTransform(c => ArrowScanReplaceRule.apply(c.session)) // Legacy: The legacy transform rule. + val offloads = Seq(OffloadOthers(), OffloadExchange(), OffloadJoin()) val validatorBuilder: GlutenConfig => Validator = conf => Validator .builder() @@ -83,11 +84,10 @@ object VeloxRuleApi { .fallbackByBackendSettings() .fallbackByUserOptions() .fallbackByTestInjects() - .fallbackByNativeValidation() + .fallbackByNativeValidation(offloads) .build() val rewrites = Seq(RewriteIn, RewriteMultiChildrenCount, RewriteJoin, PullOutPreProject, PullOutPostProject) - val offloads = Seq(OffloadOthers(), OffloadExchange(), OffloadJoin()) injector.injectTransform( c => HeuristicTransform.Single(validatorBuilder(c.glutenConf), rewrites, offloads)) diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/LegacyOffload.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/LegacyOffload.scala index 9249e6ebf742..c0c44f390d29 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/LegacyOffload.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/LegacyOffload.scala @@ -23,7 +23,6 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.SparkPlan class LegacyOffload(rules: Seq[OffloadSingleNode]) extends Rule[SparkPlan] with LogLevelUtil { - def apply(plan: SparkPlan): SparkPlan = { val out = rules.foldLeft(plan)((p, rule) => p.transformUp { case p => rule.offload(p) }) diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validator.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validator.scala index 105dcb5db4aa..63a3a0af07b2 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validator.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validator.scala @@ -50,7 +50,7 @@ object Validator { /** Add a custom validator to pipeline. */ def add(validator: Validator): Builder = { - buffer += validator + buffer ++= flatten(validator) this } @@ -64,7 +64,15 @@ object Validator { new ValidatorPipeline(buffer.toSeq) } - private class ValidatorPipeline(validators: Seq[Validator]) extends Validator { + private def flatten(validator: Validator): Seq[Validator] = validator match { + case p: ValidatorPipeline => + p.validators.flatMap(flatten) + case other => Seq(other) + } + + private class ValidatorPipeline(val validators: Seq[Validator]) extends Validator { + assert(!validators.exists(_.isInstanceOf[ValidatorPipeline])) + override def validate(plan: SparkPlan): Validator.OutCome = { val init: Validator.OutCome = pass() val finalOut = validators.foldLeft(init) { @@ -86,4 +94,10 @@ object Validator { private object Builder { def apply(): Builder = new Builder() } + + implicit class ValidatorImplicits(v: Validator) { + def andThen(other: Validator): Validator = { + builder().add(v).add(other).build() + } + } } diff --git a/gluten-core/src/test/scala/org/apache/gluten/component/ComponentSuite.scala b/gluten-core/src/test/scala/org/apache/gluten/component/ComponentSuite.scala index 60141e1f9177..9e124aaaabd3 100644 --- a/gluten-core/src/test/scala/org/apache/gluten/component/ComponentSuite.scala +++ b/gluten-core/src/test/scala/org/apache/gluten/component/ComponentSuite.scala @@ -18,6 +18,7 @@ package org.apache.gluten.component import org.apache.gluten.backend.Backend import org.apache.gluten.extension.injector.Injector + import org.scalatest.BeforeAndAfterAll import org.scalatest.funsuite.AnyFunSuite diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/EnsureLocalSortRequirements.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/EnsureLocalSortRequirements.scala index 056315186df1..e17a8e746035 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/EnsureLocalSortRequirements.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/EnsureLocalSortRequirements.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.execution.{SortExec, SparkPlan} * This rule is similar with `EnsureRequirements` but only handle local `SortExec`. * * The reason is that, during transform SparkPlan to GlutenPlan, some operators do not need local - * sort any more, e.g., convert SortAggregate to HashAggregateTransformer, and we remove local sort + * sort anymore, e.g., convert SortAggregate to HashAggregateTransformer, and we remove local sort * eagerly. However, it may break the other operator's requirements, e.g., A SortMergeJoin on top of * SortAggregate with the same key. So, this rule adds local sort back if necessary. */ diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala index d246167bd7c8..a46979d24608 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala @@ -21,10 +21,10 @@ import org.apache.gluten.backendsapi.{BackendsApiManager, BackendSettingsApi} import org.apache.gluten.execution._ import org.apache.gluten.expression.ExpressionUtils import org.apache.gluten.extension.columnar.FallbackTags -import org.apache.gluten.extension.columnar.offload.OffloadJoin +import org.apache.gluten.extension.columnar.heuristic.LegacyOffload +import org.apache.gluten.extension.columnar.offload.OffloadSingleNode import org.apache.gluten.sql.shims.SparkShimLoader -import org.apache.spark.api.python.EvalPythonExecTransformer import org.apache.spark.internal.Logging import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} @@ -32,8 +32,7 @@ import org.apache.spark.sql.execution.datasources.WriteFilesExec import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins._ -import org.apache.spark.sql.execution.python.{ArrowEvalPythonExec, BatchEvalPythonExec} -import org.apache.spark.sql.execution.window.{WindowExec, WindowGroupLimitExecShim} +import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.hive.HiveTableScanExecTransformer object Validators { @@ -93,8 +92,8 @@ object Validators { * Attempts to offload the input query plan node and check native validation result. Fails when * native validation failed. */ - def fallbackByNativeValidation(): Validator.Builder = { - builder.add(new FallbackByNativeValidation) + def fallbackByNativeValidation(rules: Seq[OffloadSingleNode]): Validator.Builder = { + builder.add(new FallbackByNativeValidation(rules)) } } @@ -222,182 +221,18 @@ object Validators { } } - private class FallbackByNativeValidation extends Validator with Logging { - override def validate(plan: SparkPlan): Validator.OutCome = plan match { - case plan: BatchScanExec => - val transformer = ScanTransformerFactory.createBatchScanTransformer(plan) - transformer.doValidate().toValidatorOutcome() - case plan: FileSourceScanExec => - val transformer = ScanTransformerFactory.createFileSourceScanTransformer(plan) - transformer.doValidate().toValidatorOutcome() - case plan if HiveTableScanExecTransformer.isHiveTableScan(plan) => - HiveTableScanExecTransformer(plan).doValidate().toValidatorOutcome() - case plan: ProjectExec => - val transformer = ProjectExecTransformer(plan.projectList, plan.child) - transformer.doValidate().toValidatorOutcome() - case plan: FilterExec => - val transformer = BackendsApiManager.getSparkPlanExecApiInstance - .genFilterExecTransformer(plan.condition, plan.child) - transformer.doValidate().toValidatorOutcome() - case plan: HashAggregateExec => - val transformer = HashAggregateExecBaseTransformer.from(plan) - transformer.doValidate().toValidatorOutcome() - case plan: SortAggregateExec => - val transformer = HashAggregateExecBaseTransformer.from(plan) - transformer.doValidate().toValidatorOutcome() - case plan: ObjectHashAggregateExec => - val transformer = HashAggregateExecBaseTransformer.from(plan) - transformer.doValidate().toValidatorOutcome() - case plan: UnionExec => - val transformer = ColumnarUnionExec(plan.children) - transformer.doValidate().toValidatorOutcome() - case plan: ExpandExec => - val transformer = ExpandExecTransformer(plan.projections, plan.output, plan.child) - transformer.doValidate().toValidatorOutcome() - case plan: WriteFilesExec => - val transformer = WriteFilesExecTransformer( - plan.child, - plan.fileFormat, - plan.partitionColumns, - plan.bucketSpec, - plan.options, - plan.staticPartitions) - transformer.doValidate().toValidatorOutcome() - case plan: SortExec => - val transformer = - SortExecTransformer(plan.sortOrder, plan.global, plan.child, plan.testSpillFrequency) - transformer.doValidate().toValidatorOutcome() - case plan: ShuffleExchangeExec => - val transformer = ColumnarShuffleExchangeExec(plan, plan.child, plan.child.output) - transformer.doValidate().toValidatorOutcome() - case plan: ShuffledHashJoinExec => - val transformer = BackendsApiManager.getSparkPlanExecApiInstance - .genShuffledHashJoinExecTransformer( - plan.leftKeys, - plan.rightKeys, - plan.joinType, - OffloadJoin.getShjBuildSide(plan), - plan.condition, - plan.left, - plan.right, - plan.isSkewJoin) - transformer.doValidate().toValidatorOutcome() - case plan: BroadcastExchangeExec => - val transformer = ColumnarBroadcastExchangeExec(plan.mode, plan.child) - transformer.doValidate().toValidatorOutcome() - case bhj: BroadcastHashJoinExec => - val transformer = BackendsApiManager.getSparkPlanExecApiInstance - .genBroadcastHashJoinExecTransformer( - bhj.leftKeys, - bhj.rightKeys, - bhj.joinType, - bhj.buildSide, - bhj.condition, - bhj.left, - bhj.right, - isNullAwareAntiJoin = bhj.isNullAwareAntiJoin) - transformer.doValidate().toValidatorOutcome() - case plan: SortMergeJoinExec => - val transformer = BackendsApiManager.getSparkPlanExecApiInstance - .genSortMergeJoinExecTransformer( - plan.leftKeys, - plan.rightKeys, - plan.joinType, - plan.condition, - plan.left, - plan.right, - plan.isSkewJoin) - transformer.doValidate().toValidatorOutcome() - case plan: CartesianProductExec => - val transformer = BackendsApiManager.getSparkPlanExecApiInstance - .genCartesianProductExecTransformer(plan.left, plan.right, plan.condition) - transformer.doValidate().toValidatorOutcome() - case plan: BroadcastNestedLoopJoinExec => - val transformer = BackendsApiManager.getSparkPlanExecApiInstance - .genBroadcastNestedLoopJoinExecTransformer( - plan.left, - plan.right, - plan.buildSide, - plan.joinType, - plan.condition) - transformer.doValidate().toValidatorOutcome() - case plan: WindowExec => - val transformer = WindowExecTransformer( - plan.windowExpression, - plan.partitionSpec, - plan.orderSpec, - plan.child) - transformer.doValidate().toValidatorOutcome() - case plan if SparkShimLoader.getSparkShims.isWindowGroupLimitExec(plan) => - val windowGroupLimitPlan = SparkShimLoader.getSparkShims - .getWindowGroupLimitExecShim(plan) - .asInstanceOf[WindowGroupLimitExecShim] - val transformer = WindowGroupLimitExecTransformer( - windowGroupLimitPlan.partitionSpec, - windowGroupLimitPlan.orderSpec, - windowGroupLimitPlan.rankLikeFunction, - windowGroupLimitPlan.limit, - windowGroupLimitPlan.mode, - windowGroupLimitPlan.child - ) - transformer.doValidate().toValidatorOutcome() - case plan: CoalesceExec => - ColumnarCoalesceExec(plan.numPartitions, plan.child) - .doValidate() - .toValidatorOutcome() - case plan: GlobalLimitExec => - val (limit, offset) = - SparkShimLoader.getSparkShims.getLimitAndOffsetFromGlobalLimit(plan) - val transformer = LimitExecTransformer(plan.child, offset, limit) - transformer.doValidate().toValidatorOutcome() - case plan: LocalLimitExec => - val transformer = LimitExecTransformer(plan.child, 0L, plan.limit) - transformer.doValidate().toValidatorOutcome() - case plan: GenerateExec => - val transformer = BackendsApiManager.getSparkPlanExecApiInstance.genGenerateTransformer( - plan.generator, - plan.requiredChildOutput, - plan.outer, - plan.generatorOutput, - plan.child) - transformer.doValidate().toValidatorOutcome() - case plan: BatchEvalPythonExec => - val transformer = EvalPythonExecTransformer(plan.udfs, plan.resultAttrs, plan.child) - transformer.doValidate().toValidatorOutcome() - case plan: ArrowEvalPythonExec => - // When backend doesn't support ColumnarArrow or colunmnar arrow configuration not - // enabled, we will try offloading through EvalPythonExecTransformer - if ( - !BackendsApiManager.getSettings.supportColumnarArrowUdf() || - !GlutenConfig.getConf.enableColumnarArrowUDF - ) { - // Both CH and Velox will try using backend's built-in functions for calculate - val transformer = EvalPythonExecTransformer(plan.udfs, plan.resultAttrs, plan.child) - transformer.doValidate().toValidatorOutcome() - } - pass() - case plan: TakeOrderedAndProjectExec => - val (limit, offset) = - SparkShimLoader.getSparkShims.getLimitAndOffsetFromTopK(plan) - val transformer = TakeOrderedAndProjectExecTransformer( - limit, - plan.sortOrder, - plan.projectList, - plan.child, - offset) - transformer.doValidate().toValidatorOutcome() - case plan: SampleExec => - val transformer = - BackendsApiManager.getSparkPlanExecApiInstance.genSampleExecTransformer( - plan.lowerBound, - plan.upperBound, - plan.withReplacement, - plan.seed, - plan.child) - transformer.doValidate().toValidatorOutcome() - case _ => - // Currently we assume a plan to be offload-able by default. - pass() + private class FallbackByNativeValidation(offloadRules: Seq[OffloadSingleNode]) + extends Validator + with Logging { + private val offloadAttempt: LegacyOffload = LegacyOffload(offloadRules) + override def validate(plan: SparkPlan): Validator.OutCome = { + offloadAttempt.apply(plan) match { + case v: ValidatablePlan => + v.doValidate().toValidatorOutcome() + case _ => + // Currently we assume a plan to be offload-able by default. + pass() + } } } } From 4921f18b1c205b03a62e1c55ae8c17544d40bf11 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 9 Dec 2024 13:46:00 +0800 Subject: [PATCH 03/16] fixup --- .../extension/columnar/offload/OffloadSingleNodeRules.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala index 7dc40faa4315..7f258d10b2fc 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala @@ -312,7 +312,10 @@ object OffloadOthers { val child = plan.child // For ArrowEvalPythonExec, CH supports it through EvalPythonExecTransformer while // Velox backend uses ColumnarArrowEvalPythonExec. - if (!BackendsApiManager.getSettings.supportColumnarArrowUdf()) { + if ( + !BackendsApiManager.getSettings.supportColumnarArrowUdf() || + !GlutenConfig.getConf.enableColumnarArrowUDF + ) { EvalPythonExecTransformer(plan.udfs, plan.resultAttrs, child) } else { BackendsApiManager.getSparkPlanExecApiInstance.createColumnarArrowEvalPythonExec( From 9b08060e3653ade052561041af1452a656479e8f Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 9 Dec 2024 14:38:56 +0800 Subject: [PATCH 04/16] fixup --- .../columnar/validator/Validators.scala | 46 ++++++++++++++++--- 1 file changed, 40 insertions(+), 6 deletions(-) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala index a46979d24608..40d41e3f5494 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala @@ -26,6 +26,9 @@ import org.apache.gluten.extension.columnar.offload.OffloadSingleNode import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} import org.apache.spark.sql.execution.datasources.WriteFilesExec @@ -224,15 +227,46 @@ object Validators { private class FallbackByNativeValidation(offloadRules: Seq[OffloadSingleNode]) extends Validator with Logging { + import FallbackByNativeValidation._ private val offloadAttempt: LegacyOffload = LegacyOffload(offloadRules) override def validate(plan: SparkPlan): Validator.OutCome = { - offloadAttempt.apply(plan) match { - case v: ValidatablePlan => - v.doValidate().toValidatorOutcome() - case _ => - // Currently we assume a plan to be offload-able by default. - pass() + applyAsSingleNode(plan) { + node => + val offloadedNode = offloadAttempt.apply(node) + val outcomes = offloadedNode.collect { + case v: ValidatablePlan => + v.doValidate().toValidatorOutcome() + } + val failures = outcomes + .filter(_.isInstanceOf[Validator.Failed]) + .map(_.asInstanceOf[Validator.Failed]) + if (failures.nonEmpty) { + failures.reduce((f1, f2) => Validator.Failed(Seq(f1.reason, f2.reason).mkString(";"))) + } else { + pass() + } } } } + + private object FallbackByNativeValidation { + /** + * A fake leaf node that hides a subtree from the parent node to make sure the native validation + * only called on the interested plan nodes. + */ + private case class FakeLeaf(originalChild: SparkPlan) extends LeafExecNode { + override protected def doExecute(): RDD[InternalRow] = + throw new UnsupportedOperationException() + override def output: Seq[Attribute] = originalChild.output + override def supportsRowBased: Boolean = throw new UnsupportedOperationException() + override def supportsColumnar: Boolean = throw new UnsupportedOperationException() + } + + private def applyAsSingleNode[T](plan: SparkPlan)(body: SparkPlan => T): T = { + val newChildren = plan.children.map(child => FakeLeaf(originalChild = child)) + val newPlan = plan.withNewChildren(newChildren) + val applied = body(newPlan) + applied + } + } } From fdb65ebe82ddce180fdfcd8bb391882dcd3f134c Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 9 Dec 2024 14:52:48 +0800 Subject: [PATCH 05/16] fixup --- .../apache/gluten/extension/columnar/validator/Validators.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala index 40d41e3f5494..f6fa24c9be5d 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala @@ -250,6 +250,7 @@ object Validators { } private object FallbackByNativeValidation { + /** * A fake leaf node that hides a subtree from the parent node to make sure the native validation * only called on the interested plan nodes. From 3e6da36f1e8e918a76352c3801d571232ad661d8 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 9 Dec 2024 14:55:37 +0800 Subject: [PATCH 06/16] fixup --- .../apache/gluten/extension/columnar/validator/Validators.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala index f6fa24c9be5d..40d41e3f5494 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala @@ -250,7 +250,6 @@ object Validators { } private object FallbackByNativeValidation { - /** * A fake leaf node that hides a subtree from the parent node to make sure the native validation * only called on the interested plan nodes. From 8a684a70b32db850a706417f9c1a77be47280a9f Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 9 Dec 2024 14:55:44 +0800 Subject: [PATCH 07/16] fixup --- .../apache/gluten/extension/columnar/validator/Validators.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala index 40d41e3f5494..f6fa24c9be5d 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala @@ -250,6 +250,7 @@ object Validators { } private object FallbackByNativeValidation { + /** * A fake leaf node that hides a subtree from the parent node to make sure the native validation * only called on the interested plan nodes. From 1f2596daabbe0769850309ff52d854569fa732e5 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 9 Dec 2024 15:08:41 +0800 Subject: [PATCH 08/16] fixup --- .../gluten/extension/columnar/validator/Validators.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala index f6fa24c9be5d..3b2684426209 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala @@ -259,8 +259,7 @@ object Validators { override protected def doExecute(): RDD[InternalRow] = throw new UnsupportedOperationException() override def output: Seq[Attribute] = originalChild.output - override def supportsRowBased: Boolean = throw new UnsupportedOperationException() - override def supportsColumnar: Boolean = throw new UnsupportedOperationException() + override def supportsColumnar: Boolean = originalChild.supportsColumnar } private def applyAsSingleNode[T](plan: SparkPlan)(body: SparkPlan => T): T = { From e8442437dbd9737ae99944aec355b85736a82446 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 9 Dec 2024 15:09:35 +0800 Subject: [PATCH 09/16] fixup --- .../gluten/extension/columnar/validator/Validators.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala index 3b2684426209..224902ce3121 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala @@ -28,7 +28,8 @@ import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} import org.apache.spark.sql.execution.datasources.WriteFilesExec @@ -38,6 +39,8 @@ import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.hive.HiveTableScanExecTransformer +import scala.collection.Seq + object Validators { implicit class ValidatorBuilderImplicits(builder: Validator.Builder) { private val conf = GlutenConfig.getConf @@ -258,8 +261,10 @@ object Validators { private case class FakeLeaf(originalChild: SparkPlan) extends LeafExecNode { override protected def doExecute(): RDD[InternalRow] = throw new UnsupportedOperationException() - override def output: Seq[Attribute] = originalChild.output override def supportsColumnar: Boolean = originalChild.supportsColumnar + override def output: Seq[Attribute] = originalChild.output + override def outputOrdering: Seq[SortOrder] = originalChild.outputOrdering + override def outputPartitioning: Partitioning = originalChild.outputPartitioning } private def applyAsSingleNode[T](plan: SparkPlan)(body: SparkPlan => T): T = { From 3b3c8b4ae9e018f42c6ac9baf2dcfcffce6cee4b Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 9 Dec 2024 15:29:47 +0800 Subject: [PATCH 10/16] fixup --- .../apache/gluten/extension/columnar/validator/Validators.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala index 224902ce3121..24b1e681e49d 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala @@ -39,8 +39,6 @@ import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.hive.HiveTableScanExecTransformer -import scala.collection.Seq - object Validators { implicit class ValidatorBuilderImplicits(builder: Validator.Builder) { private val conf = GlutenConfig.getConf From 03c9fc325e785b223650440e9b2d86a6a8134733 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 9 Dec 2024 15:42:09 +0800 Subject: [PATCH 11/16] fixup --- .../gluten/extension/columnar/validator/Validators.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala index 24b1e681e49d..7e7f7b052d98 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala @@ -231,7 +231,7 @@ object Validators { import FallbackByNativeValidation._ private val offloadAttempt: LegacyOffload = LegacyOffload(offloadRules) override def validate(plan: SparkPlan): Validator.OutCome = { - applyAsSingleNode(plan) { + applyOnSingleNode(plan) { node => val offloadedNode = offloadAttempt.apply(node) val outcomes = offloadedNode.collect { @@ -265,7 +265,7 @@ object Validators { override def outputPartitioning: Partitioning = originalChild.outputPartitioning } - private def applyAsSingleNode[T](plan: SparkPlan)(body: SparkPlan => T): T = { + private def applyOnSingleNode[T](plan: SparkPlan)(body: SparkPlan => T): T = { val newChildren = plan.children.map(child => FakeLeaf(originalChild = child)) val newPlan = plan.withNewChildren(newChildren) val applied = body(newPlan) From 5a3a203ce8af3d5f9eab317ec134f866ec3bf049 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 9 Dec 2024 16:47:28 +0800 Subject: [PATCH 12/16] fixup --- .../planner/plan/GlutenPlanModel.scala | 13 ++++++++- .../execution/WriteFilesExecTransformer.scala | 28 +++++-------------- .../columnar/validator/Validators.scala | 7 ++++- 3 files changed, 25 insertions(+), 23 deletions(-) diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala index 94059020efca..4b6158165552 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala @@ -17,7 +17,7 @@ package org.apache.gluten.extension.columnar.enumerated.planner.plan import org.apache.gluten.execution.GlutenPlan -import org.apache.gluten.extension.columnar.enumerated.planner.metadata.GlutenMetadata +import org.apache.gluten.extension.columnar.enumerated.planner.metadata.{GlutenMetadata, LogicalLink} import org.apache.gluten.extension.columnar.enumerated.planner.property.{Conv, ConvDef} import org.apache.gluten.extension.columnar.transition.{Convention, ConventionReq} import org.apache.gluten.ras.{Metadata, PlanModel} @@ -27,6 +27,7 @@ import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{ColumnarToRowExec, LeafExecNode, SparkPlan} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase import org.apache.spark.task.{SparkTaskUtil, TaskResources} @@ -75,6 +76,16 @@ object GlutenPlanModel { final override val supportsRowBased: Boolean = { rowType() != Convention.RowType.None } + + override def logicalLink: Option[LogicalPlan] = { + if (metadata.logicalLink() eq LogicalLink.notFound) { + return None + } + Some(metadata.logicalLink().plan) + } + + override def setLogicalLink(logicalPlan: LogicalPlan): Unit = + throw new UnsupportedOperationException() } private object PlanModelImpl extends PlanModel[SparkPlan] { diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala index c69623b06193..0536808eb310 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala @@ -20,7 +20,6 @@ import org.apache.gluten.GlutenConfig import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.expression.ConverterUtils import org.apache.gluten.extension.ValidationResult -import org.apache.gluten.extension.columnar.enumerated.planner.plan.GlutenPlanModel.GroupLeafExec import org.apache.gluten.metrics.MetricsUpdater import org.apache.gluten.substrait.`type`.ColumnTypeNode import org.apache.gluten.substrait.SubstraitContext @@ -33,13 +32,12 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Literal} import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap -import org.apache.spark.sql.execution.{ProjectExec, SparkPlan} +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{ArrayType, MapType} -import org.apache.spark.sql.types.MetadataBuilder +import org.apache.spark.sql.types.{ArrayType, MapType, MetadataBuilder} import io.substrait.proto.NamedStruct import org.apache.parquet.hadoop.ParquetOutputFormat @@ -139,23 +137,11 @@ case class WriteFilesExecTransformer( } } - lazy val hasConstantComplexType = child match { - case t: ProjectExecTransformer => - t.projectList.exists(isConstantComplexType) - case p: ProjectExec => - p.projectList.exists(isConstantComplexType) - case g: GroupLeafExec => // support the ras - g.metadata - .logicalLink() - .plan - .collectFirst { - case p: Project if p.projectList.exists(isConstantComplexType) => true - } - .isDefined - case _ => false - } - // TODO: currently the velox don't support parquet write with complex data type - // with constant. + def hasConstantComplexType = child.logicalLink.collectFirst { + case p: Project if p.projectList.exists(isConstantComplexType) => true + }.isDefined + + // TODO: Currently Velox doesn't support Parquet write of constant with complex data type. if (fileFormat.isInstanceOf[ParquetFileFormat] && hasConstantComplexType) { return ValidationResult.failed( "Unsupported native parquet write: " + diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala index 7e7f7b052d98..dbcb612fc1d9 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala @@ -266,7 +266,12 @@ object Validators { } private def applyOnSingleNode[T](plan: SparkPlan)(body: SparkPlan => T): T = { - val newChildren = plan.children.map(child => FakeLeaf(originalChild = child)) + val newChildren = plan.children.map( + child => { + val fl = FakeLeaf(originalChild = child) + child.logicalLink.foreach(link => fl.setLogicalLink(link)) + fl + }) val newPlan = plan.withNewChildren(newChildren) val applied = body(newPlan) applied From 64bfd934cb32722010db8a2f1644a5e2bccd1879 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 9 Dec 2024 18:53:23 +0800 Subject: [PATCH 13/16] fixup --- .../org/apache/gluten/execution/WholeStageTransformer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala index dbfc11c136db..325956167593 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala @@ -198,7 +198,6 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f val transformStageId: Int ) extends WholeStageTransformerGenerateTreeStringShim with UnaryTransformSupport { - assert(child.isInstanceOf[TransformSupport]) def stageId: Int = transformStageId @@ -353,6 +352,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f } override def doExecuteColumnar(): RDD[ColumnarBatch] = { + assert(child.isInstanceOf[TransformSupport]) val pipelineTime: SQLMetric = longMetric("pipelineTime") // We should do transform first to make sure all subqueries are materialized val wsCtx = GlutenTimeMetric.withMillisTime { From 2039d5c7658bdb8e5195b2e046be2caf66866fbd Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 9 Dec 2024 19:25:05 +0800 Subject: [PATCH 14/16] fixup --- .../columnar/validator/Validators.scala | 37 +++++++++++++------ 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala index dbcb612fc1d9..70c177d83fbf 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala @@ -39,6 +39,8 @@ import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.hive.HiveTableScanExecTransformer +import scala.collection.mutable + object Validators { implicit class ValidatorBuilderImplicits(builder: Validator.Builder) { private val conf = GlutenConfig.getConf @@ -232,9 +234,10 @@ object Validators { private val offloadAttempt: LegacyOffload = LegacyOffload(offloadRules) override def validate(plan: SparkPlan): Validator.OutCome = { applyOnSingleNode(plan) { - node => + (node, hideOriginalChildren) => val offloadedNode = offloadAttempt.apply(node) - val outcomes = offloadedNode.collect { + val hidden = hideOriginalChildren(offloadedNode) + val outcomes = hidden.collect { case v: ValidatablePlan => v.doValidate().toValidatorOutcome() } @@ -265,16 +268,26 @@ object Validators { override def outputPartitioning: Partitioning = originalChild.outputPartitioning } - private def applyOnSingleNode[T](plan: SparkPlan)(body: SparkPlan => T): T = { - val newChildren = plan.children.map( - child => { - val fl = FakeLeaf(originalChild = child) - child.logicalLink.foreach(link => fl.setLogicalLink(link)) - fl - }) - val newPlan = plan.withNewChildren(newChildren) - val applied = body(newPlan) - applied + private def applyOnSingleNode[T](plan: SparkPlan)( + body: (SparkPlan, SparkPlan => SparkPlan) => T): T = { + val children = plan.children + + val lookup: mutable.Map[Int, FakeLeaf] = mutable.Map() + children.foreach(child => lookup += child.id -> FakeLeaf(child)) + + /** + * Traverse up the input plan and find the original leafs. Replace the leafs with FakeLeaf + * nodes then return. So any further operations with the returned query plan will not see the + * original leaf nodes. + */ + def insertFakeLeafs(input: SparkPlan): SparkPlan = { + input.transformUp { + case p if lookup.contains(p.id) => + lookup(p.id) + } + } + + body(plan, insertFakeLeafs) } } } From 02f3363cc072423b4ac423e4d7a9a425daa16c65 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 9 Dec 2024 19:47:02 +0800 Subject: [PATCH 15/16] fixup --- .../clickhouse/CHSparkPlanExecApi.scala | 2 +- .../execution/CHColumnarWriteFilesExec.scala | 7 ++ .../velox/VeloxSparkPlanExecApi.scala | 3 +- .../VeloxColumnarWriteFilesExec.scala | 9 ++- .../gluten/backendsapi/SparkPlanExecApi.scala | 2 +- .../execution/WriteFilesExecTransformer.scala | 2 +- .../columnar/validator/Validators.scala | 69 +++---------------- .../execution/ColumnarWriteFilesExec.scala | 6 +- 8 files changed, 31 insertions(+), 69 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala index de0680df10ab..6c1905ef3799 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -656,7 +656,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging { } override def createColumnarWriteFilesExec( - child: SparkPlan, + child: WriteFilesExecTransformer, noop: SparkPlan, fileFormat: FileFormat, partitionColumns: Seq[Attribute], diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWriteFilesExec.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWriteFilesExec.scala index 503fd1a90caa..851da331e018 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWriteFilesExec.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWriteFilesExec.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.sql.execution +import org.apache.gluten.execution.WriteFilesExecTransformer +import org.apache.gluten.extension.ValidationResult import org.apache.gluten.memory.CHThreadGroup import org.apache.spark.{Partition, SparkException, TaskContext, TaskOutputFileAlreadyExistException} @@ -149,6 +151,7 @@ class CHColumnarWriteFilesRDD( case class CHColumnarWriteFilesExec( override val left: SparkPlan, override val right: SparkPlan, + t: WriteFilesExecTransformer, fileFormat: FileFormat, partitionColumns: Seq[Attribute], bucketSpec: Option[BucketSpec], @@ -156,6 +159,10 @@ case class CHColumnarWriteFilesExec( staticPartitions: TablePartitionSpec ) extends ColumnarWriteFilesExec(left, right) { + override protected def doValidateInternal(): ValidationResult = { + t.doValidateInternal() + } + override protected def withNewChildrenInternal( newLeft: SparkPlan, newRight: SparkPlan): SparkPlan = diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala index d837ac423407..8984a9551b03 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala @@ -560,7 +560,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { ShuffleUtil.genColumnarShuffleWriter(parameters) } override def createColumnarWriteFilesExec( - child: SparkPlan, + child: WriteFilesExecTransformer, noop: SparkPlan, fileFormat: FileFormat, partitionColumns: Seq[Attribute], @@ -570,6 +570,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { VeloxColumnarWriteFilesExec( child, noop, + child, fileFormat, partitionColumns, bucketSpec, diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala index 249f5169d84c..88937b7d3287 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala @@ -18,6 +18,8 @@ package org.apache.spark.sql.execution import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.columnarbatch.ColumnarBatches +import org.apache.gluten.execution.WriteFilesExecTransformer +import org.apache.gluten.extension.ValidationResult import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators import org.apache.spark.{Partition, SparkException, TaskContext, TaskOutputFileAlreadyExistException} @@ -250,6 +252,7 @@ class VeloxColumnarWriteFilesRDD( case class VeloxColumnarWriteFilesExec private ( override val left: SparkPlan, override val right: SparkPlan, + t: WriteFilesExecTransformer, fileFormat: FileFormat, partitionColumns: Seq[Attribute], bucketSpec: Option[BucketSpec], @@ -257,6 +260,10 @@ case class VeloxColumnarWriteFilesExec private ( staticPartitions: TablePartitionSpec) extends ColumnarWriteFilesExec(left, right) { + override protected def doValidateInternal(): ValidationResult = { + t.doValidateInternal() + } + override def doExecuteWrite(writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] = { assert(child.supportsColumnar) @@ -276,5 +283,5 @@ case class VeloxColumnarWriteFilesExec private ( override protected def withNewChildrenInternal( newLeft: SparkPlan, newRight: SparkPlan): SparkPlan = - copy(newLeft, newRight, fileFormat, partitionColumns, bucketSpec, options, staticPartitions) + copy(newLeft, newRight, t, fileFormat, partitionColumns, bucketSpec, options, staticPartitions) } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala index ec032af92d96..7432bc0af9e8 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala @@ -365,7 +365,7 @@ trait SparkPlanExecApi { /** Create ColumnarWriteFilesExec */ def createColumnarWriteFilesExec( - child: SparkPlan, + child: WriteFilesExecTransformer, noop: SparkPlan, fileFormat: FileFormat, partitionColumns: Seq[Attribute], diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala index 0536808eb310..a9d3a6282ae1 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala @@ -127,7 +127,7 @@ case class WriteFilesExecTransformer( child.output.map(attr => WriteFilesExecTransformer.removeMetadata(attr, metadataExclusionList)) } - override protected def doValidateInternal(): ValidationResult = { + override def doValidateInternal(): ValidationResult = { val finalChildOutput = getFinalChildOutput def isConstantComplexType(e: Expression): Boolean = { diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala index 70c177d83fbf..b0eeccef7a10 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala @@ -26,10 +26,6 @@ import org.apache.gluten.extension.columnar.offload.OffloadSingleNode import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.spark.internal.Logging -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} -import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} import org.apache.spark.sql.execution.datasources.WriteFilesExec @@ -39,8 +35,6 @@ import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.hive.HiveTableScanExecTransformer -import scala.collection.mutable - object Validators { implicit class ValidatorBuilderImplicits(builder: Validator.Builder) { private val conf = GlutenConfig.getConf @@ -230,64 +224,17 @@ object Validators { private class FallbackByNativeValidation(offloadRules: Seq[OffloadSingleNode]) extends Validator with Logging { - import FallbackByNativeValidation._ private val offloadAttempt: LegacyOffload = LegacyOffload(offloadRules) override def validate(plan: SparkPlan): Validator.OutCome = { - applyOnSingleNode(plan) { - (node, hideOriginalChildren) => - val offloadedNode = offloadAttempt.apply(node) - val hidden = hideOriginalChildren(offloadedNode) - val outcomes = hidden.collect { - case v: ValidatablePlan => - v.doValidate().toValidatorOutcome() - } - val failures = outcomes - .filter(_.isInstanceOf[Validator.Failed]) - .map(_.asInstanceOf[Validator.Failed]) - if (failures.nonEmpty) { - failures.reduce((f1, f2) => Validator.Failed(Seq(f1.reason, f2.reason).mkString(";"))) - } else { - pass() - } + val offloadedNode = offloadAttempt.apply(plan) + val out = offloadedNode match { + case v: ValidatablePlan => + v.doValidate().toValidatorOutcome() + case other => + // Currently we assume a plan to be offload-able by default. + pass() } - } - } - - private object FallbackByNativeValidation { - - /** - * A fake leaf node that hides a subtree from the parent node to make sure the native validation - * only called on the interested plan nodes. - */ - private case class FakeLeaf(originalChild: SparkPlan) extends LeafExecNode { - override protected def doExecute(): RDD[InternalRow] = - throw new UnsupportedOperationException() - override def supportsColumnar: Boolean = originalChild.supportsColumnar - override def output: Seq[Attribute] = originalChild.output - override def outputOrdering: Seq[SortOrder] = originalChild.outputOrdering - override def outputPartitioning: Partitioning = originalChild.outputPartitioning - } - - private def applyOnSingleNode[T](plan: SparkPlan)( - body: (SparkPlan, SparkPlan => SparkPlan) => T): T = { - val children = plan.children - - val lookup: mutable.Map[Int, FakeLeaf] = mutable.Map() - children.foreach(child => lookup += child.id -> FakeLeaf(child)) - - /** - * Traverse up the input plan and find the original leafs. Replace the leafs with FakeLeaf - * nodes then return. So any further operations with the returned query plan will not see the - * original leaf nodes. - */ - def insertFakeLeafs(input: SparkPlan): SparkPlan = { - input.transformUp { - case p if lookup.contains(p.id) => - lookup(p.id) - } - } - - body(plan, insertFakeLeafs) + out } } } diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala index 45f202637020..87a2f6d9c28e 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.exception.GlutenException -import org.apache.gluten.execution.GlutenPlan +import org.apache.gluten.execution.{ValidatablePlan, WriteFilesExecTransformer} import org.apache.gluten.extension.columnar.transition.{Convention, ConventionReq} import org.apache.gluten.extension.columnar.transition.Convention.RowType import org.apache.gluten.sql.shims.SparkShimLoader @@ -43,7 +43,7 @@ abstract class ColumnarWriteFilesExec protected ( override val left: SparkPlan, override val right: SparkPlan) extends BinaryExecNode - with GlutenPlan + with ValidatablePlan with ColumnarWriteFilesExec.ExecuteWriteCompatible { val child: SparkPlan = left @@ -118,7 +118,7 @@ abstract class ColumnarWriteFilesExec protected ( object ColumnarWriteFilesExec { def apply( - child: SparkPlan, + child: WriteFilesExecTransformer, fileFormat: FileFormat, partitionColumns: Seq[Attribute], bucketSpec: Option[BucketSpec], From 4fa421f7f7ee78fe15648e07a733b7f9309a3d0d Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 9 Dec 2024 21:32:32 +0800 Subject: [PATCH 16/16] fixup --- .../gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala | 1 + .../apache/spark/sql/execution/CHColumnarWriteFilesExec.scala | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala index 6c1905ef3799..635228881566 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -666,6 +666,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging { CHColumnarWriteFilesExec( child, noop, + child, fileFormat, partitionColumns, bucketSpec, diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWriteFilesExec.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWriteFilesExec.scala index 851da331e018..d43d23bfc8d8 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWriteFilesExec.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWriteFilesExec.scala @@ -166,7 +166,7 @@ case class CHColumnarWriteFilesExec( override protected def withNewChildrenInternal( newLeft: SparkPlan, newRight: SparkPlan): SparkPlan = - copy(newLeft, newRight, fileFormat, partitionColumns, bucketSpec, options, staticPartitions) + copy(newLeft, newRight, t, fileFormat, partitionColumns, bucketSpec, options, staticPartitions) override def doExecuteWrite(writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] = { assert(child.supportsColumnar)