Skip to content

Commit

Permalink
Time resolution in constraints.
Browse files Browse the repository at this point in the history
  • Loading branch information
andreaschat-db committed Mar 19, 2024
1 parent 72fad38 commit 9a2a7c5
Showing 1 changed file with 10 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,20 @@
package org.apache.spark.sql.delta.constraints

import scala.collection.mutable

import org.apache.spark.sql.delta.{DeltaErrors, DeltaIllegalStateException}
import org.apache.spark.sql.delta.constraints.Constraints.{Check, NotNull}
import org.apache.spark.sql.delta.schema.SchemaUtils

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.optimizer.ReplaceExpressions
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.delta.util.AnalysisHelper
import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy, UnaryExecNode}
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -71,9 +71,16 @@ case class DeltaInvariantCheckerExec(
if (constraints.isEmpty) return child.execute()
val invariantChecks =
DeltaInvariantCheckerExec.buildInvariantChecks(child.output, constraints, session)
val boundRefs = invariantChecks.map(_.withBoundReferences(child.output))

// Resolve current_date()/current_time() expressions.
// We resolve currentTime for all invariants together to make sure we use the same timestamp.
val invariantsFakePlan = AnalysisHelper.FakeLogicalPlan(invariantChecks, Nil)
val newInvariantsPlan = optimizer.ComputeCurrentTime(invariantsFakePlan)

child.execute().mapPartitionsInternal { rows =>
val boundRefs = newInvariantsPlan.expressions
.asInstanceOf[Seq[CheckDeltaInvariant]]
.map(_.withBoundReferences(child.output))
val assertions = UnsafeProjection.create(boundRefs)
rows.map { row =>
assertions(row)
Expand Down

0 comments on commit 9a2a7c5

Please sign in to comment.