Skip to content

Commit

Permalink
Fix thread leak in FileDownloadDelegate (#614)
Browse files Browse the repository at this point in the history
* Fix thread leak in `FileDownloadDelegate`

* `SwiftFormat`

* Add a shared file IO thread pool per HTTPClient

* User bigger thread pool and initlize lazily during first file write

* thread pool is actually not used in tests

* Update documentation

* fix review comments

* make `fileIOThreadPool` internal

* Add test to verify that we actually share the same thread pool across all delegates for a given HTTPClient

Co-authored-by: Cory Benfield <lukasa@apple.com>
  • Loading branch information
dnadoba and Lukasa authored Aug 18, 2022
1 parent 0469acb commit fc510a3
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 18 deletions.
57 changes: 49 additions & 8 deletions Sources/AsyncHTTPClient/FileDownloadDelegate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public final class FileDownloadDelegate: HTTPClientResponseDelegate {
public typealias Response = Progress

private let filePath: String
private let io: NonBlockingFileIO
private(set) var fileIOThreadPool: NIOThreadPool?
private let reportHead: ((HTTPResponseHead) -> Void)?
private let reportProgress: ((Progress) -> Void)?

Expand All @@ -47,14 +47,46 @@ public final class FileDownloadDelegate: HTTPClientResponseDelegate {
/// the total byte count and download byte count passed to it as arguments. The callbacks
/// will be invoked in the same threading context that the delegate itself is invoked,
/// as controlled by `EventLoopPreference`.
public init(
public convenience init(
path: String,
pool: NIOThreadPool = NIOThreadPool(numberOfThreads: 1),
pool: NIOThreadPool,
reportHead: ((HTTPResponseHead) -> Void)? = nil,
reportProgress: ((Progress) -> Void)? = nil
) throws {
pool.start()
self.io = NonBlockingFileIO(threadPool: pool)
try self.init(path: path, pool: .some(pool), reportHead: reportHead, reportProgress: reportProgress)
}

/// Initializes a new file download delegate and uses the shared thread pool of the ``HTTPClient`` for file I/O.
///
/// - parameters:
/// - path: Path to a file you'd like to write the download to.
/// - reportHead: A closure called when the response head is available.
/// - reportProgress: A closure called when a body chunk has been downloaded, with
/// the total byte count and download byte count passed to it as arguments. The callbacks
/// will be invoked in the same threading context that the delegate itself is invoked,
/// as controlled by `EventLoopPreference`.
public convenience init(
path: String,
reportHead: ((HTTPResponseHead) -> Void)? = nil,
reportProgress: ((Progress) -> Void)? = nil
) throws {
try self.init(path: path, pool: nil, reportHead: reportHead, reportProgress: reportProgress)
}

private init(
path: String,
pool: NIOThreadPool?,
reportHead: ((HTTPResponseHead) -> Void)? = nil,
reportProgress: ((Progress) -> Void)? = nil
) throws {
if let pool = pool {
self.fileIOThreadPool = pool
} else {
// we should use the shared thread pool from the HTTPClient which
// we will get from the `HTTPClient.Task`
self.fileIOThreadPool = nil
}

self.filePath = path

self.reportHead = reportHead
Expand All @@ -79,24 +111,33 @@ public final class FileDownloadDelegate: HTTPClientResponseDelegate {
task: HTTPClient.Task<Response>,
_ buffer: ByteBuffer
) -> EventLoopFuture<Void> {
let threadPool: NIOThreadPool = {
guard let pool = self.fileIOThreadPool else {
let pool = task.fileIOThreadPool
self.fileIOThreadPool = pool
return pool
}
return pool
}()
let io = NonBlockingFileIO(threadPool: threadPool)
self.progress.receivedBytes += buffer.readableBytes
self.reportProgress?(self.progress)

let writeFuture: EventLoopFuture<Void>
if let fileHandleFuture = self.fileHandleFuture {
writeFuture = fileHandleFuture.flatMap {
self.io.write(fileHandle: $0, buffer: buffer, eventLoop: task.eventLoop)
io.write(fileHandle: $0, buffer: buffer, eventLoop: task.eventLoop)
}
} else {
let fileHandleFuture = self.io.openFile(
let fileHandleFuture = io.openFile(
path: self.filePath,
mode: .write,
flags: .allowFileCreation(),
eventLoop: task.eventLoop
)
self.fileHandleFuture = fileHandleFuture
writeFuture = fileHandleFuture.flatMap {
self.io.write(fileHandle: $0, buffer: buffer, eventLoop: task.eventLoop)
io.write(fileHandle: $0, buffer: buffer, eventLoop: task.eventLoop)
}
}

Expand Down
42 changes: 36 additions & 6 deletions Sources/AsyncHTTPClient/HTTPClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ public class HTTPClient {
let eventLoopGroupProvider: EventLoopGroupProvider
let configuration: Configuration
let poolManager: HTTPConnectionPool.Manager

/// Shared thread pool used for file IO. It is lazily created on first access of ``Task/fileIOThreadPool``.
private var fileIOThreadPool: NIOThreadPool?
private let fileIOThreadPoolLock = Lock()

private var state: State
private let stateLock = Lock()

Expand Down Expand Up @@ -213,6 +218,16 @@ public class HTTPClient {
}
}

private func shutdownFileIOThreadPool(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) {
self.fileIOThreadPoolLock.withLockVoid {
guard let fileIOThreadPool = fileIOThreadPool else {
callback(nil)
return
}
fileIOThreadPool.shutdownGracefully(queue: queue, callback)
}
}

private func shutdown(requiresCleanClose: Bool, queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) {
do {
try self.stateLock.withLock {
Expand Down Expand Up @@ -241,15 +256,28 @@ public class HTTPClient {
let error: Error? = (requiresClean && unclean) ? HTTPClientError.uncleanShutdown : nil
return (callback, error)
}

self.shutdownEventLoop(queue: queue) { error in
let reportedError = error ?? uncleanError
callback(reportedError)
self.shutdownFileIOThreadPool(queue: queue) { ioThreadPoolError in
self.shutdownEventLoop(queue: queue) { error in
let reportedError = error ?? ioThreadPoolError ?? uncleanError
callback(reportedError)
}
}
}
}
}

private func makeOrGetFileIOThreadPool() -> NIOThreadPool {
self.fileIOThreadPoolLock.withLock {
guard let fileIOThreadPool = fileIOThreadPool else {
let fileIOThreadPool = NIOThreadPool(numberOfThreads: ProcessInfo.processInfo.processorCount)
fileIOThreadPool.start()
self.fileIOThreadPool = fileIOThreadPool
return fileIOThreadPool
}
return fileIOThreadPool
}
}

/// Execute `GET` request using specified URL.
///
/// - parameters:
Expand Down Expand Up @@ -562,6 +590,7 @@ public class HTTPClient {
case .testOnly_exact(_, delegateOn: let delegateEL):
taskEL = delegateEL
}

logger.trace("selected EventLoop for task given the preference",
metadata: ["ahc-eventloop": "\(taskEL)",
"ahc-el-preference": "\(eventLoopPreference)"])
Expand All @@ -574,7 +603,8 @@ public class HTTPClient {
logger.debug("client is shutting down, failing request")
return Task<Delegate.Response>.failedTask(eventLoop: taskEL,
error: HTTPClientError.alreadyShutdown,
logger: logger)
logger: logger,
makeOrGetFileIOThreadPool: self.makeOrGetFileIOThreadPool)
}
}

Expand All @@ -597,7 +627,7 @@ public class HTTPClient {
}
}()

let task = Task<Delegate.Response>(eventLoop: taskEL, logger: logger)
let task = Task<Delegate.Response>(eventLoop: taskEL, logger: logger, makeOrGetFileIOThreadPool: self.makeOrGetFileIOThreadPool)
do {
let requestBag = try RequestBag(
request: request,
Expand Down
21 changes: 17 additions & 4 deletions Sources/AsyncHTTPClient/HTTPHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import Logging
import NIOConcurrencyHelpers
import NIOCore
import NIOHTTP1
import NIOPosix
import NIOSSL

extension HTTPClient {
Expand Down Expand Up @@ -502,7 +503,7 @@ public protocol HTTPClientResponseDelegate: AnyObject {
}

extension HTTPClientResponseDelegate {
/// Default implementation of ``HTTPClientResponseDelegate/didSendRequestHead(task:_:)-6khai``.
/// Default implementation of ``HTTPClientResponseDelegate/didSendRequest(task:)-9od5p``.
///
/// By default, this does nothing.
public func didSendRequestHead(task: HTTPClient.Task<Response>, _ head: HTTPRequestHead) {}
Expand Down Expand Up @@ -622,15 +623,27 @@ extension HTTPClient {
private var _isCancelled: Bool = false
private var _taskDelegate: HTTPClientTaskDelegate?
private let lock = Lock()
private let makeOrGetFileIOThreadPool: () -> NIOThreadPool

init(eventLoop: EventLoop, logger: Logger) {
/// The shared thread pool of a ``HTTPClient`` used for file IO. It is lazily created on first access.
internal var fileIOThreadPool: NIOThreadPool {
self.makeOrGetFileIOThreadPool()
}

init(eventLoop: EventLoop, logger: Logger, makeOrGetFileIOThreadPool: @escaping () -> NIOThreadPool) {
self.eventLoop = eventLoop
self.promise = eventLoop.makePromise()
self.logger = logger
self.makeOrGetFileIOThreadPool = makeOrGetFileIOThreadPool
}

static func failedTask(eventLoop: EventLoop, error: Error, logger: Logger) -> Task<Response> {
let task = self.init(eventLoop: eventLoop, logger: logger)
static func failedTask(
eventLoop: EventLoop,
error: Error,
logger: Logger,
makeOrGetFileIOThreadPool: @escaping () -> NIOThreadPool
) -> Task<Response> {
let task = self.init(eventLoop: eventLoop, logger: logger, makeOrGetFileIOThreadPool: makeOrGetFileIOThreadPool)
task.promise.fail(error)
return task
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ extension HTTPClientInternalTests {
("testConnectErrorCalloutOnCorrectEL", testConnectErrorCalloutOnCorrectEL),
("testInternalRequestURI", testInternalRequestURI),
("testHasSuffix", testHasSuffix),
("testSharedThreadPoolIsIdenticalForAllDelegates", testSharedThreadPoolIsIdenticalForAllDelegates),
]
}
}
35 changes: 35 additions & 0 deletions Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -554,4 +554,39 @@ class HTTPClientInternalTests: XCTestCase {
XCTAssertFalse(elements.hasSuffix([0, 0, 0]))
}
}

/// test to verify that we actually share the same thread pool across all ``FileDownloadDelegate``s for a given ``HTTPClient``
func testSharedThreadPoolIsIdenticalForAllDelegates() throws {
let httpBin = HTTPBin()
let httpClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup))
defer {
XCTAssertNoThrow(try httpClient.syncShutdown(requiresCleanClose: true))
XCTAssertNoThrow(try httpBin.shutdown())
}
var request = try Request(url: "http://localhost:\(httpBin.port)/events/10/content-length")
request.headers.add(name: "Accept", value: "text/event-stream")

let filePaths = (0..<10).map { _ in
TemporaryFileHelpers.makeTemporaryFilePath()
}
defer {
for filePath in filePaths {
TemporaryFileHelpers.removeTemporaryFile(at: filePath)
}
}
let delegates = try filePaths.map {
try FileDownloadDelegate(path: $0)
}

let resultFutures = delegates.map { delegate in
httpClient.execute(
request: request,
delegate: delegate
).futureResult
}
_ = try EventLoopFuture.whenAllSucceed(resultFutures, on: self.clientGroup.next()).wait()
let threadPools = delegates.map { $0.fileIOThreadPool }
let firstThreadPool = threadPools.first ?? nil
XCTAssert(threadPools.dropFirst().allSatisfy { $0 === firstThreadPool })
}
}
17 changes: 17 additions & 0 deletions Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,23 @@ enum TemporaryFileHelpers {
return try body(path)
}

internal static func makeTemporaryFilePath(
directory: String = temporaryDirectory
) -> String {
let (fd, path) = self.openTemporaryFile()
close(fd)
try! FileManager.default.removeItem(atPath: path)
return path
}

internal static func removeTemporaryFile(
at path: String
) {
if FileManager.default.fileExists(atPath: path) {
try? FileManager.default.removeItem(atPath: path)
}
}

internal static func fileSize(path: String) throws -> Int? {
return try FileManager.default.attributesOfItem(atPath: path)[.size] as? Int
}
Expand Down
11 changes: 11 additions & 0 deletions Tests/AsyncHTTPClientTests/RequestBagTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,17 @@ final class RequestBagTests: XCTestCase {
}
}

extension HTTPClient.Task {
convenience init(
eventLoop: EventLoop,
logger: Logger
) {
self.init(eventLoop: eventLoop, logger: logger) {
preconditionFailure("thread pool not needed in tests")
}
}
}

class UploadCountingDelegate: HTTPClientResponseDelegate {
typealias Response = Void

Expand Down

0 comments on commit fc510a3

Please sign in to comment.