Skip to content

Commit

Permalink
[Spark] Add FileMetadataMaterializationTracker for Row Tracking Backf…
Browse files Browse the repository at this point in the history
…ill (delta-io#2926)

<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description
### Background on Row Tracking Backfill 
In a future PR, we will introduce Row Tracking Backfill, a command in
Delta Lake that assigns Row IDs and Row Commit Versions to each rows
when we enable Row Tracking on an existing table. This is done by
committing `addFile` actions to assign the `baseRowId` and the
`defaultRowCommitVersion` for every files in the table. Due to the size
of the table, doing it in one commit can be very large, unstable and
causes a lot of concurrency conflicts, we propose doing it by batches
(that is multiple commits, each commit handles a subset of the `addFile`
actions of the table).

### Why we need this for Row Tracking Backfill?
However, we could still hit stability issues when a single batch is
large enough to OOM the driver. This would happen when individual tasks
batched together would be huge. Think of tables where each `AddFile` is
just 1-2 rows.

### What is the solution?
We propose having a global file materialization limit that restricts the
number of files that can be materialized at once on the driver, this
limit will be added when Row Tracking Backfill is introduced in a future
PR. A case we need to consider is what if the task size is more than the
materialization limit. In that case we can allow the task to complete as
we do not want to break the task boundary (breaks idempotence of Row
Tracking Backfill).

The `FileMetadataMaterializationTracker` is the component used in the
Row Tracking Backfill process to ensure that a single batch is not large
enough to OOM the driver.

### Design

The driver holds a semaphore that can give out permits equal to the
materialization limit of the driver.

The driver also holds a overallocation lock that allows only one query
to over allocate to complete materializing the task.

Each `RowTrackingBackfillCommand` instance will maintain a
`FileMetadataMaterializationTracker` that will keep track of how many
files were materialized and this will acquire/release the over
provisioning semaphore as well.

A permit to materialize a file is acquired while iterating the files
iterator while creating a task. The permits are released upon failure or
when the batch completes executing.

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->

## How was this patch tested?
Added UTs.
<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

## Does this PR introduce _any_ user-facing changes?
No.
<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
  • Loading branch information
longvu-db authored Apr 30, 2024
1 parent 7714e81 commit 866a5c1
Show file tree
Hide file tree
Showing 2 changed files with 292 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

import java.util.concurrent.Semaphore
import java.util.concurrent.atomic.AtomicInteger

import org.apache.spark.sql.delta.FileMetadataMaterializationTracker.TaskLevelPermitAllocator
import org.apache.spark.sql.delta.metering.DeltaLogging

import org.apache.spark.internal.Logging

/**
* An instance of this class tracks and controls the materialization usage of a single command
* query (e.g. Backfill) with respect to the driver limits. Each query must use one instance of the
* FileMaterializationTracker.
*
* tasks - tasks are the basic unit of computation.
* For example, in Backfill, each task bins multiple files into batches to be executed.
*
* A task has to be materialized in its entirety, so in the case where we are unable to acquire
* permits to materialize a task we acquire an over allocation lock that will allow tasks to
* complete materializing. Over allocation is only allowed for one thread at once in the driver.
* This allows us to restrict the amount of file metadata being materialized at once on the driver.
*
* Accessed by the thread materializing files and by the thread releasing resources after execution.
*
*/
class FileMetadataMaterializationTracker extends Logging {

/** The number of permits allocated from the global file materialization semaphore */
@volatile private var numPermitsFromSemaphore: Int = 0

/** The number of permits over allocated by holding the overAllocationLock */
@volatile private var numOverAllocatedPermits: Int = 0

private val materializationMetrics = new FileMetadataMaterializationMetrics()

/**
* A per task permit allocator which allows materializing a new task.
* @return - TaskLevelPermitAllocator to be used to materialize a task
*/
def createTaskLevelPermitAllocator(): TaskLevelPermitAllocator = {
new TaskLevelPermitAllocator(this)
}

/**
* Acquire a permit from the materialization semaphore, if there is no permit available the thread
* acquires the overAllocationLock which allows it to freely acquire permits in the future.
* Only one thread can over allocate at once.
*
* @param isNewTask - indicates whether the permit is being acquired for a new task, this will
* allow us to prevent overallocation to spill over to new tasks.
*/
private def acquirePermit(isNewTask: Boolean = false): Unit = {
var hasAcquiredPermit = false
if (isNewTask) {
FileMetadataMaterializationTracker.materializationSemaphore.acquire(1)
hasAcquiredPermit = true
} else if (numOverAllocatedPermits > 0) {
materializationMetrics.overAllocFilesMaterializedCount += 1
} else if (!FileMetadataMaterializationTracker.materializationSemaphore.tryAcquire(1)) {
// we acquire the overAllocationLock for this thread
logInfo("Acquiring over allocation lock for this query.")
val startTime = System.currentTimeMillis()
FileMetadataMaterializationTracker.overAllocationLock.acquire(1)
val waitTime = System.currentTimeMillis() - startTime
logInfo(s"Acquired over allocation lock for this query in $waitTime ms")
materializationMetrics.overAllocWaitTimeMs += waitTime
materializationMetrics.overAllocWaitCount += 1
materializationMetrics.overAllocFilesMaterializedCount += 1
} else {
// tryAcquire was successful
hasAcquiredPermit = true
}
if (hasAcquiredPermit) {
this.synchronized {
numPermitsFromSemaphore += 1
}
} else {
this.synchronized {
numOverAllocatedPermits += 1
}
}
materializeOneFile()
}

/** Increment the number of materialized file in materializationMetrics. */
def materializeOneFile(): Unit = materializationMetrics.filesMaterializedCount += 1

/**
* Release `numPermits` file permits and release overAllocationLock lock if held by the thread
* and the number of over allocated files is 0.
*/
def releasePermits(numPermits: Int): Unit = {
var permitsToRelease = numPermits
this.synchronized {
if (numOverAllocatedPermits > 0) {
val overAllocatedPermitsToRelease = Math.min(numOverAllocatedPermits, numPermits)
numOverAllocatedPermits -= overAllocatedPermitsToRelease
permitsToRelease -= overAllocatedPermitsToRelease
if (numOverAllocatedPermits == 0) {
FileMetadataMaterializationTracker.overAllocationLock.release(1)
logInfo("Released over allocation lock.")
}
}
numPermitsFromSemaphore -= permitsToRelease
}
FileMetadataMaterializationTracker.materializationSemaphore.release(permitsToRelease)
}

/**
* This will release all acquired file permits by the tracker.
*/
def releaseAllPermits(): Unit = {
this.synchronized {
if (numOverAllocatedPermits > 0) {
FileMetadataMaterializationTracker.overAllocationLock.release(1)
}
if (numPermitsFromSemaphore > 0) {
FileMetadataMaterializationTracker.materializationSemaphore.release(numPermitsFromSemaphore)
}
numPermitsFromSemaphore = 0
numOverAllocatedPermits = 0
}
}
}

object FileMetadataMaterializationTracker extends DeltaLogging {
// Global limit for number of files that can be materialized at once on the driver
private val globalFileMaterializationLimit: AtomicInteger = new AtomicInteger(-1)

// Semaphore to control file materialization
private var materializationSemaphore: Semaphore = _

/**
* Global lock that is held by a thread and allows it to materialize files without
* acquiring permits the materializationSemaphore.
*
* This lock is released when the thread completes executing the command's job that
* acquired it, or when all permits are released during bin packing.
*/
private val overAllocationLock = new Semaphore(1)

/**
* Initialize the global materialization semaphore using an existing semaphore. This is used
* for unit tests.
*/
private[sql] def initializeSemaphoreForTests(semaphore: Semaphore): Unit = {
globalFileMaterializationLimit.set(semaphore.availablePermits())
materializationSemaphore = semaphore
}

/**
* A per task level allocator that controls permit allocation and releasing for the task
*/
class TaskLevelPermitAllocator(tracker: FileMetadataMaterializationTracker) {

/** Indicates whether the file materialization is for a new task */
var isNewTask = true

/**
* Acquire a single file materialization permit.
*/
def acquirePermit(): Unit = {
if (isNewTask) {
logInfo("Acquiring file materialization permits for a new task")
}
tracker.acquirePermit(isNewTask = isNewTask)
isNewTask = false
}
}
}

/**
* Instance of this class is used for recording metrics of the FileMetadataMaterializationTracker
*/
case class FileMetadataMaterializationMetrics(
/** Total number of files materialized */
var filesMaterializedCount: Long = 0L,

/** Number of times we wait to acquire the over allocation lock */
var overAllocWaitCount: Long = 0L,

/** Total time waited to acquire the over allocation lock in ms */
var overAllocWaitTimeMs: Long = 0L,

/** Number of files materialized by using over allocation lock */
var overAllocFilesMaterializedCount: Long = 0L) {

override def toString(): String = {
s"Number of files materialized: $filesMaterializedCount, " +
s"Number of times over-allocated: $overAllocWaitCount, " +
s"Total time spent waiting to acquire over-allocation lock: $overAllocWaitTimeMs, " +
s"Files materialized by over allocation: $overAllocFilesMaterializedCount"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

import java.util.concurrent.Semaphore

import org.scalatest.concurrent.TimeLimits
import org.scalatest.time.{Seconds, Span}

import org.apache.spark.SparkFunSuite

class FileMetadataMaterializationTrackerSuite extends SparkFunSuite with TimeLimits {

test("tracker - unit test") {

def acquireForTask(tracker: FileMetadataMaterializationTracker, numPermits: Int): Unit = {
val taskLevelPermitAllocator = tracker.createTaskLevelPermitAllocator()
for (i <- 1 to numPermits) {
taskLevelPermitAllocator.acquirePermit()
}
}

// Initialize the semaphore for tests
val semaphore = new Semaphore(10)
FileMetadataMaterializationTracker.initializeSemaphoreForTests(semaphore)
val tracker = new FileMetadataMaterializationTracker()

// test that acquiring a permit should work and decrement the available permits.
acquireForTask(tracker, 1)
assert(semaphore.availablePermits() === 9)

// releasing the permit should increment the semaphore's count
tracker.releasePermits(1)
assert(semaphore.availablePermits() === 10)

// test overallocation
acquireForTask(tracker, 11) // allowed to over allocate
assert(semaphore.availablePermits() === 0)
assert(semaphore.availablePermits() === 0)
tracker.releasePermits(11)
assert(semaphore.availablePermits() === 10) // make sure we don't overflow

// test - wait for other task to release overallocation lock
acquireForTask(tracker, 11)

val acquireThread = new Thread() {
override def run(): Unit = {
val taskLevelPermitAllocator = tracker.createTaskLevelPermitAllocator()
taskLevelPermitAllocator.acquirePermit()
}
}
// we acquire in a separate thread so that we can make sure the acquiring is blocked
// until another thread(main thread here) releases a permit.
acquireThread.start()
Thread.sleep(2000) // Sleep for 2 seconds to make sure the acquireThread is blocked
assert(acquireThread.isAlive) // acquire thread is actually blocked
tracker.releasePermits(11)
failAfter(Span(2, Seconds)) {
acquireThread.join() // acquire thread should get unblocked
}

// test releaseAllPermits
assert(semaphore.availablePermits() === 9)
tracker.releaseAllPermits()
assert(semaphore.availablePermits() === 10)
}
}

0 comments on commit 866a5c1

Please sign in to comment.