Skip to content

Commit

Permalink
Merge with DVs v1
Browse files Browse the repository at this point in the history
  • Loading branch information
andreaschat-db committed Jan 3, 2024
1 parent 1d42c86 commit 31a804f
Show file tree
Hide file tree
Showing 7 changed files with 318 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ object DMLWithDeletionVectorsHelper extends DeltaCommand {
/**
* Finds the files in nameToAddFileMap in which rows were deleted by checking the row index set.
*/
private def findFilesWithMatchingRows(
def findFilesWithMatchingRows(
txn: OptimisticTransaction,
nameToAddFileMap: Map[String, AddFile],
matchedFileRowIndexSets: Seq[DeletionVectorResult]): Seq[TouchedFileWithDV] = {
Expand Down Expand Up @@ -215,8 +215,10 @@ object DMLWithDeletionVectorsHelper extends DeltaCommand {
addFilesWithNewDvs: Seq[AddFile],
snapshot: Snapshot): Seq[AddFile] = {
import org.apache.spark.sql.delta.implicits._
val statsColName = snapshot.getBaseStatsColumnName
val selectionPathAndStatsCols = Seq(col("path"), col(statsColName))

if (addFilesWithNewDvs.isEmpty) return Seq.empty

val selectionPathAndStatsCols = Seq(col("path"), col("stats"))
val addFilesWithNewDvsDf = addFilesWithNewDvs.toDF(spark)

// These files originate from snapshot.filesForScan which resets column statistics.
Expand All @@ -234,6 +236,7 @@ object DMLWithDeletionVectorsHelper extends DeltaCommand {
// null count. We want to set the bounds before the AddFile has DV descriptor attached.
// Attaching the DV descriptor here, causes wrong logical records computation in
// `updateStatsToWideBounds`.
val statsColName = snapshot.getBaseStatsColumnName
val addFilesWithWideBoundsDf = snapshot
.updateStatsToWideBounds(addFileWithStatsDf, statsColName)

Expand Down Expand Up @@ -362,14 +365,18 @@ object DeletionVectorBitmapGenerator {
tableHasDVs: Boolean,
targetDf: DataFrame,
candidateFiles: Seq[AddFile],
condition: Expression)
condition: Expression,
fileNameColumnOpt: Option[Column] = None,
rowIndexColumnOpt: Option[Column] = None)
: Seq[DeletionVectorResult] = {
val fileNameColumn = fileNameColumnOpt.getOrElse(col(s"${METADATA_NAME}.${FILE_PATH}"))
val rowIndexColumn = rowIndexColumnOpt.getOrElse(col(ROW_INDEX_COLUMN_NAME))
val matchedRowsDf = targetDf
.withColumn(FILE_NAME_COL, col(s"${METADATA_NAME}.${FILE_PATH}"))
.withColumn(FILE_NAME_COL, fileNameColumn)
// Filter after getting input file name as the filter might introduce a join and we
// cannot get input file name on join's output.
.filter(new Column(condition))
.withColumn(ROW_INDEX_COL, col(ROW_INDEX_COLUMN_NAME))
.withColumn(ROW_INDEX_COL, rowIndexColumn)

val df = if (tableHasDVs) {
// When the table already has DVs, join the `matchedRowDf` above to attach for each matched
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,35 @@ case class MergeIntoCommand(
} else {
val (filesToRewrite, deduplicateCDFDeletes) = findTouchedFiles(spark, deltaTxn)
if (filesToRewrite.nonEmpty) {
val newWrittenFiles = withStatusCode("DELTA", "Writing merged data") {
writeAllChanges(spark, deltaTxn, filesToRewrite, deduplicateCDFDeletes)
val shouldWriteDeletionVectors = shouldWritePersistentDeletionVectors(spark, deltaTxn)
if (shouldWriteDeletionVectors) {
val newWrittenFiles = withStatusCode("DELTA", "Writing modified data") {
writeAllChanges(
spark,
deltaTxn,
filesToRewrite,
deduplicateCDFDeletes,
writeUnmodifiedRows = false)
}

val dvActions = withStatusCode(
"DELTA",
"Writing Deletion Vectors for modified data") {
writeDVs(spark, deltaTxn, filesToRewrite)
}

newWrittenFiles ++ dvActions
} else {
val newWrittenFiles = withStatusCode("DELTA", "Writing merged data") {
writeAllChanges(
spark,
deltaTxn,
filesToRewrite,
deduplicateCDFDeletes,
writeUnmodifiedRows = true)
}
filesToRewrite.map(_.remove) ++ newWrittenFiles
}
filesToRewrite.map(_.remove) ++ newWrittenFiles
} else {
// Run an insert-only job instead of WriteChanges
writeOnlyInserts(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ trait MergeIntoCommandBase extends LeafRunnableCommand
val notMatchedBySourceClauses: Seq[DeltaMergeIntoNotMatchedBySourceClause]
val migratedSchema: Option[StructType]

protected def shouldWritePersistentDeletionVectors(
spark: SparkSession,
txn: OptimisticTransaction): Boolean = {
spark.conf.get(DeltaSQLConf.MERGE_USE_PERSISTENT_DELETION_VECTORS) &&
DeletionVectorUtils.deletionVectorsWritable(txn.snapshot)
}

override val (canMergeSchema, canOverwriteSchema) = {
// Delta options can't be passed to MERGE INTO currently, so they'll always be empty.
// The methods in options check if the auto migration flag is on, in which case schema evolution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ import scala.collection.JavaConverters._

import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions.{AddCDCFile, AddFile, FileAction}
import org.apache.spark.sql.delta.commands.MergeIntoCommandBase
import org.apache.spark.sql.delta.commands.{DeletionVectorBitmapGenerator, DMLWithDeletionVectorsHelper, MergeIntoCommandBase}
import org.apache.spark.sql.delta.commands.cdc.CDCReader.{CDC_TYPE_COLUMN_NAME, CDC_TYPE_NOT_CDC}
import org.apache.spark.sql.delta.commands.merge.MergeOutputGeneration.{SOURCE_ROW_INDEX_COL, TARGET_ROW_INDEX_COL}
import org.apache.spark.sql.delta.files.TahoeBatchFileIndex
import org.apache.spark.sql.delta.util.SetAccumulator

import org.apache.spark.sql.{Column, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.expressions.{Literal, Or}
import org.apache.spark.sql.catalyst.expressions.{And, Expression, Literal, Or}
import org.apache.spark.sql.catalyst.plans.logical.DeltaMergeIntoClause
import org.apache.spark.sql.functions.{coalesce, col, count, input_file_name, lit, monotonically_increasing_id, sum}

/**
Expand All @@ -42,10 +44,19 @@ import org.apache.spark.sql.functions.{coalesce, col, count, input_file_name, li
*
* Phase 2: Read the touched files again and write new files with updated and/or inserted rows.
* If there are updates, then use an outer join using the given condition to write the
* updates and inserts (see [[writeAllChanges()]]. If there are no matches for updates,
* only inserts, then write them directly (see [[writeInsertsOnlyWhenNoMatches()]].
* updates and inserts (see [[writeAllChanges()]]). If there are no matches for updates,
* only inserts, then write them directly (see [[writeInsertsOnlyWhenNoMatches()]]).
*
* See [[InsertOnlyMergeExecutor]] for the optimized executor used in case there are only inserts.
* Note, when deletion vectors are enabled, phase 2 is split into two parts:
* 2.a. Read the touched files again and only write modified and new
* rows (see [[writeAllChanges()]]).
* 2.b. Read the touched files and generate deletion vectors for the modified
* rows (see [[writeDVs()]]).
*
* If there are no matches for updates, only inserts, then write them directly
* (see [[writeInsertsOnlyWhenNoMatches()]]). This remains the same when DVs are enabled since there
* are no modified rows. Furthermore, eee [[InsertOnlyMergeExecutor]] for the optimized executor
* used in case there are only inserts.
*/
trait ClassicMergeExecutor extends MergeOutputGeneration {
self: MergeIntoCommandBase =>
Expand Down Expand Up @@ -199,6 +210,43 @@ trait ClassicMergeExecutor extends MergeOutputGeneration {
(touchedAddFiles, dedupe)
}

/**
* Helper function that produces an expression by combining a sequence of clauses with OR.
* Requires the sequence to be non-empty.
*/
protected def combineClausesWithOr(clauses: Seq[DeltaMergeIntoClause]): Expression = {
require(clauses.nonEmpty)
clauses
.map(_.condition.getOrElse(Literal.TrueLiteral))
.reduce((a, b) => Or(a, b))
}

protected def generateFilterForModifiedAndNewRows(
includeNotMatchedFilter: Boolean = true): Expression = {
val matchedExpression = if (matchedClauses.nonEmpty) {
Seq(And(Column(condition).expr, combineClausesWithOr(matchedClauses)))
} else {
Seq(Column(condition).expr)
}

val notMatchedExpressionOpt = if (includeNotMatchedFilter && notMatchedClauses.nonEmpty) {
val combinedClauses = combineClausesWithOr(notMatchedClauses)
Some(And(col(TARGET_ROW_PRESENT_COL).isNull.expr, combinedClauses))
} else {
None
}

val notMatchedBySourceExpressionOpt = if (notMatchedBySourceClauses.nonEmpty) {
val combinedClauses = combineClausesWithOr(notMatchedBySourceClauses)
Some(And(col(SOURCE_ROW_PRESENT_COL).isNull.expr, combinedClauses))
} else {
None
}

(matchedExpression ++ notMatchedExpressionOpt ++ notMatchedBySourceExpressionOpt)
.reduce((a, b) => Or(a, b))
}

/**
* Write new files by reading the touched files and updating/inserting data using the source
* query/table. This is implemented using a full-outer-join using the merge condition.
Expand All @@ -210,13 +258,14 @@ trait ClassicMergeExecutor extends MergeOutputGeneration {
spark: SparkSession,
deltaTxn: OptimisticTransaction,
filesToRewrite: Seq[AddFile],
deduplicateCDFDeletes: DeduplicateCDFDeletes): Seq[FileAction] =
recordMergeOperation(
extraOpType =
if (shouldOptimizeMatchedOnlyMerge(spark)) "writeAllUpdatesAndDeletes"
deduplicateCDFDeletes: DeduplicateCDFDeletes,
writeUnmodifiedRows: Boolean)
: Seq[FileAction] = recordMergeOperation(
extraOpType = if (!writeUnmodifiedRows) "writeModifiedRowsOnly"
else if (shouldOptimizeMatchedOnlyMerge(spark)) "writeAllUpdatesAndDeletes"
else "writeAllChanges",
status = s"MERGE operation - Rewriting ${filesToRewrite.size} files",
sqlMetricName = "rewriteTimeMs") {
status = s"MERGE operation - Rewriting ${filesToRewrite.size} files",
sqlMetricName = "rewriteTimeMs") {

val cdcEnabled = isCdcEnabled(deltaTxn)

Expand All @@ -233,10 +282,23 @@ trait ClassicMergeExecutor extends MergeOutputGeneration {
columnsToDrop = Nil)
val baseTargetDF = Dataset.ofRows(spark, targetPlan)

val joinType = if (shouldOptimizeMatchedOnlyMerge(spark)) {
"rightOuter"
val joinType = if (writeUnmodifiedRows) {
if (shouldOptimizeMatchedOnlyMerge(spark)) {
"rightOuter"
} else {
"fullOuter"
}
} else {
"fullOuter"
// Since we do not need to write unmodified rows, we can perform stricter joins.
if (isMatchedOnly) {
"inner"
} else if (notMatchedBySourceClauses.isEmpty) {
"leftOuter"
} else if (notMatchedClauses.isEmpty) {
"rightOuter"
} else {
"fullOuter"
}
}

logDebug(s"""writeAllChanges using $joinType join:
Expand All @@ -256,15 +318,23 @@ trait ClassicMergeExecutor extends MergeOutputGeneration {
// with value `true`, one to each side of the join. Whether this field is null or not after
// the outer join, will allow us to identify whether the joined row was a
// matched inner result or an unmatched result with null on one side.
val joinedDF = {
val joinedBaseDF = {
var sourceDF = getMergeSource.df
if (deduplicateCDFDeletes.enabled && deduplicateCDFDeletes.includesInserts) {
// Add row index for the source rows to identify inserted rows during the cdf deleted rows
// deduplication. See [[deduplicateCDFDeletes()]]
sourceDF = sourceDF.withColumn(SOURCE_ROW_INDEX_COL, monotonically_increasing_id())
}
val left = sourceDF
.withColumn(SOURCE_ROW_PRESENT_COL, Column(incrSourceRowCountExpr))
val left = if (writeUnmodifiedRows) {
sourceDF.withColumn(SOURCE_ROW_PRESENT_COL, Column(incrSourceRowCountExpr))
} else {
sourceDF
.withColumn(SOURCE_ROW_PRESENT_COL, Column(incrSourceRowCountExpr))
// In some cases, the optimiser (incorrectly) decides to omit the metrics column.
// This causes issues in the source determinism validation. We work around the issue by
// adding a redundant dummy filter to make sure the column is not pruned.
.filter(SOURCE_ROW_PRESENT_COL)
}
val targetDF = baseTargetDF
.withColumn(TARGET_ROW_PRESENT_COL, lit(true))
val right = if (deduplicateCDFDeletes.enabled) {
Expand All @@ -275,6 +345,13 @@ trait ClassicMergeExecutor extends MergeOutputGeneration {
left.join(right, Column(condition), joinType)
}

val joinedDF =
if (writeUnmodifiedRows) {
joinedBaseDF
} else {
joinedBaseDF.filter(generateFilterForModifiedAndNewRows().sql)
}

// Precompute conditions in matched and not matched clauses and generate
// the joinedDF with precomputed columns and clauses with rewritten conditions.
val (joinedAndPrecomputedConditionsDF, clausesWithPrecompConditions) =
Expand Down Expand Up @@ -356,4 +433,66 @@ trait ClassicMergeExecutor extends MergeOutputGeneration {

newFiles
}

/**
* Writes Deletion Vectors for rows modified by the merge operation.
*/
protected def writeDVs(
spark: SparkSession,
deltaTxn: OptimisticTransaction,
filesToRewrite: Seq[AddFile]): Seq[FileAction] = recordMergeOperation(
extraOpType = "writeDeletionVectors",
status = s"MERGE operation - Rewriting Deletion Vectors to ${filesToRewrite.size} files",
sqlMetricName = "rewriteTimeMs") {

val fileIndex = new TahoeBatchFileIndex(
spark,
"merge",
filesToRewrite,
deltaTxn.deltaLog,
deltaTxn.deltaLog.dataPath,
deltaTxn.snapshot)

val targetDF = DMLWithDeletionVectorsHelper.createTargetDfForScanningForMatches(
spark,
target,
fileIndex)

// For writing DVs we are only interested in the target table. When there are no
// notMatchedBySource clauses an inner join is sufficient. Otherwise, we need an rightOuter
// join to include target rows that are not matched.
val joinType = if (notMatchedBySourceClauses.isEmpty) {
"inner"
} else {
"rightOuter"
}

val joinedDF = getMergeSource.df
.withColumn(SOURCE_ROW_PRESENT_COL, lit(true))
.join(targetDF, Column(condition), joinType)

val modifiedRowsFilter = generateFilterForModifiedAndNewRows(includeNotMatchedFilter = false)

val matchedDVResult =
DeletionVectorBitmapGenerator.buildRowIndexSetsForFilesMatchingCondition(
spark,
deltaTxn,
tableHasDVs = true,
joinedDF,
filesToRewrite,
modifiedRowsFilter
)

val nameToAddFileMap = generateCandidateFileMap(targetDeltaLog.dataPath, filesToRewrite)

val touchedFilesWithDVs = DMLWithDeletionVectorsHelper
.findFilesWithMatchingRows(deltaTxn, nameToAddFileMap, matchedDVResult)

val (dvActions, _) = DMLWithDeletionVectorsHelper.processUnmodifiedData(
spark,
touchedFilesWithDVs,
deltaTxn.snapshot)

dvActions
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1276,6 +1276,13 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(false)

val MERGE_USE_PERSISTENT_DELETION_VECTORS =
buildConf("merge.deletionVectors.persistent")
.internal()
.doc("Enable persistent Deletion Vectors in Merge command.")
.booleanConf
.createWithDefault(false)

val DELETION_VECTOR_PACKING_TARGET_SIZE =
buildConf("deletionVectors.packing.targetSize")
.internal()
Expand Down
Loading

0 comments on commit 31a804f

Please sign in to comment.