Skip to content

Commit

Permalink
nits
Browse files Browse the repository at this point in the history
  • Loading branch information
andreaschat-db committed Jan 5, 2024
1 parent fd31c3c commit e6b2941
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,7 @@ object DeletionVectorBitmapGenerator {
candidateFiles: Seq[AddFile],
condition: Expression,
fileNameColumnOpt: Option[Column] = None,
rowIndexColumnOpt: Option[Column] = None)
: Seq[DeletionVectorResult] = {
rowIndexColumnOpt: Option[Column] = None): Seq[DeletionVectorResult] = {
val fileNameColumn = fileNameColumnOpt.getOrElse(col(s"${METADATA_NAME}.${FILE_PATH}"))
val rowIndexColumn = rowIndexColumnOpt.getOrElse(col(ROW_INDEX_COLUMN_NAME))
val matchedRowsDf = targetDf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,14 +288,14 @@ trait ClassicMergeExecutor extends MergeOutputGeneration {
deduplicateCDFDeletes: DeduplicateCDFDeletes,
writeUnmodifiedRows: Boolean): Seq[FileAction] = recordMergeOperation(
extraOpType = if (!writeUnmodifiedRows) {
"writeModifiedRowsOnly"
} else if (shouldOptimizeMatchedOnlyMerge(spark)) {
"writeAllUpdatesAndDeletes"
} else {
"writeAllChanges"
},
status = s"MERGE operation - Rewriting ${filesToRewrite.size} files",
sqlMetricName = "rewriteTimeMs") {
"writeModifiedRowsOnly"
} else if (shouldOptimizeMatchedOnlyMerge(spark)) {
"writeAllUpdatesAndDeletes"
} else {
"writeAllChanges"
},
status = s"MERGE operation - Rewriting ${filesToRewrite.size} files",
sqlMetricName = "rewriteTimeMs") {

val cdcEnabled = isCdcEnabled(deltaTxn)

Expand Down Expand Up @@ -356,11 +356,11 @@ trait ClassicMergeExecutor extends MergeOutputGeneration {
sourceDF = sourceDF.withColumn(SOURCE_ROW_INDEX_COL, monotonically_increasing_id())
}
val left = sourceDF
.withColumn(SOURCE_ROW_PRESENT_COL, Column(incrSourceRowCountExpr))
// In some cases, the optimizer (incorrectly) decides to omit the metrics column.
// This causes issues in the source determinism validation. We work around the issue by
// adding a redundant dummy filter to make sure the column is not pruned.
.filter(SOURCE_ROW_PRESENT_COL)
.withColumn(SOURCE_ROW_PRESENT_COL, Column(incrSourceRowCountExpr))
// In some cases, the optimizer (incorrectly) decides to omit the metrics column.
// This causes issues in the source determinism validation. We work around the issue by
// adding a redundant dummy filter to make sure the column is not pruned.
.filter(SOURCE_ROW_PRESENT_COL)
val targetDF = baseTargetDF
.withColumn(TARGET_ROW_PRESENT_COL, lit(true))
val right = if (deduplicateCDFDeletes.enabled) {
Expand Down

0 comments on commit e6b2941

Please sign in to comment.