From cc15c6a21d6a665e54b0196127d638e225abe7dc Mon Sep 17 00:00:00 2001 From: Tim <0xtimc@gmail.com> Date: Fri, 31 May 2024 18:01:30 +0100 Subject: [PATCH 1/4] Add async APIs for the drivers --- Package.swift | 4 ++-- Sources/Queues/Application+Queues.swift | 7 +++++++ Sources/Queues/QueuesDriver.swift | 9 +++++++++ 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/Package.swift b/Package.swift index a52f090..aec0cdf 100644 --- a/Package.swift +++ b/Package.swift @@ -1,4 +1,4 @@ -// swift-tools-version:5.6 +// swift-tools-version:5.8 import PackageDescription let package = Package( @@ -14,7 +14,7 @@ let package = Package( .library(name: "XCTQueues", targets: ["XCTQueues"]) ], dependencies: [ - .package(url: "https://github.com/vapor/vapor.git", from: "4.76.2"), + .package(url: "https://github.com/vapor/vapor.git", from: "4.101.1"), .package(url: "https://github.com/apple/swift-nio.git", from: "2.53.0"), ], targets: [ diff --git a/Sources/Queues/Application+Queues.swift b/Sources/Queues/Application+Queues.swift index b217eae..41fc86c 100644 --- a/Sources/Queues/Application+Queues.swift +++ b/Sources/Queues/Application+Queues.swift @@ -49,6 +49,13 @@ extension Application { driver.shutdown() } } + + func shutdownAsync(_ application: Application) async { + application.queues.storage.commands.forEach({$0.shutdown()}) + if let driver = application.queues.storage.driver { + await driver.asyncShutdown() + } + } } /// The `QueuesConfiguration` object diff --git a/Sources/Queues/QueuesDriver.swift b/Sources/Queues/QueuesDriver.swift index 1853a74..02452d7 100644 --- a/Sources/Queues/QueuesDriver.swift +++ b/Sources/Queues/QueuesDriver.swift @@ -6,4 +6,13 @@ public protocol QueuesDriver { /// Shuts down the driver func shutdown() + + /// Shut down the driver asyncrhonously. Helps avoid calling `.wait()` + func asyncShutdown() async +} + +extension QueuesDriver { + public func asyncShutdown() async { + shutdown() + } } From 573fcbb7be8ed6e54e57cbc9910d6071a2de26f9 Mon Sep 17 00:00:00 2001 From: Tim <0xtimc@gmail.com> Date: Fri, 31 May 2024 18:14:13 +0100 Subject: [PATCH 2/4] Use the async lifecycle stuff --- Sources/Queues/Application+Queues.swift | 4 +++- Sources/Queues/QueuesCommand.swift | 26 ++++++++++++++++++++++++ Sources/Queues/RepeatedTask+Cancel.swift | 12 ++++++++++- 3 files changed, 40 insertions(+), 2 deletions(-) diff --git a/Sources/Queues/Application+Queues.swift b/Sources/Queues/Application+Queues.swift index 41fc86c..dc7d914 100644 --- a/Sources/Queues/Application+Queues.swift +++ b/Sources/Queues/Application+Queues.swift @@ -51,7 +51,9 @@ extension Application { } func shutdownAsync(_ application: Application) async { - application.queues.storage.commands.forEach({$0.shutdown()}) + for command in application.queues.storage.commands { + await command.asyncShutdown() + } if let driver = application.queues.storage.driver { await driver.asyncShutdown() } diff --git a/Sources/Queues/QueuesCommand.swift b/Sources/Queues/QueuesCommand.swift index e4af895..cd2cada 100644 --- a/Sources/Queues/QueuesCommand.swift +++ b/Sources/Queues/QueuesCommand.swift @@ -208,6 +208,32 @@ public final class QueuesCommand: Command { } } + public func asyncShutdown() async { + self.lock.lock() + + self.isShuttingDown.store(true, ordering: .relaxed) + self.didShutdown = true + + // stop running in case shutting downf rom signal + self.application.running?.stop() + + // clear signal sources + self.signalSources.forEach { $0.cancel() } // clear refs + self.signalSources = [] + + // Release the lock before we start any suspensions + self.lock.unlock() + + // stop all job queue workers + for jobTask in self.jobTasks { + await jobTask.asyncCancel(on: self.eventLoopGroup.any()) + } + // stop all scheduled jobs + for scheduledTask in self.scheduledTasks.values { + await scheduledTask.task.asyncCancel(on: self.eventLoopGroup.any()) + } + } + deinit { assert(self.didShutdown, "JobsCommand did not shutdown before deinit") } diff --git a/Sources/Queues/RepeatedTask+Cancel.swift b/Sources/Queues/RepeatedTask+Cancel.swift index 7f2ac0b..841fa0c 100644 --- a/Sources/Queues/RepeatedTask+Cancel.swift +++ b/Sources/Queues/RepeatedTask+Cancel.swift @@ -10,4 +10,14 @@ extension RepeatedTask { print("failed cancelling repeated task \(error)") } } -} \ No newline at end of file + + func asyncCancel(on eventLoop: EventLoop) async { + do { + let promise = eventLoop.makePromise(of: Void.self) + self.cancel(promise: promise) + try await promise.futureResult.get() + } catch { + print("failed cancelling repeated task \(error)") + } + } +} From d11484846ed64d45834037dfb1bfe2349f9b47fa Mon Sep 17 00:00:00 2001 From: Tim <0xtimc@gmail.com> Date: Fri, 31 May 2024 18:17:46 +0100 Subject: [PATCH 3/4] Proper async test to ensure the new APIs are hit --- Tests/QueuesTests/AsyncQueueTests.swift | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/Tests/QueuesTests/AsyncQueueTests.swift b/Tests/QueuesTests/AsyncQueueTests.swift index 7c6c732..7c0f42e 100644 --- a/Tests/QueuesTests/AsyncQueueTests.swift +++ b/Tests/QueuesTests/AsyncQueueTests.swift @@ -9,9 +9,17 @@ import NIOCore import NIOConcurrencyHelpers final class AsyncQueueTests: XCTestCase { - func testAsyncJob() throws { - let app = Application(.testing) - defer { app.shutdown() } + var app: Application! + + override func setUp() async throws { + app = try await Application.make(.testing) + } + + override func tearDown() async throws { + try await app.asyncShutdown() + } + + func testAsyncJob() async throws { app.queues.use(.test) let promise = app.eventLoopGroup.any().makePromise(of: Void.self) @@ -22,7 +30,7 @@ final class AsyncQueueTests: XCTestCase { .map { _ in "done" } } - try app.testable().test(.GET, "foo") { res in + try await app.testable().test(.GET, "foo") { res async in XCTAssertEqual(res.status, .ok) XCTAssertEqual(res.body.string, "done") } @@ -34,7 +42,7 @@ final class AsyncQueueTests: XCTestCase { XCTAssertNotNil(job) XCTAssertEqual(job!.foo, "bar") - try app.queues.queue.worker.run().wait() + try await app.queues.queue.worker.run().get() XCTAssertEqual(app.queues.test.queue.count, 0) XCTAssertEqual(app.queues.test.jobs.count, 0) From f41cbb1202018d95a985e6875b0735367a5219aa Mon Sep 17 00:00:00 2001 From: Tim <0xtimc@gmail.com> Date: Fri, 31 May 2024 18:32:53 +0100 Subject: [PATCH 4/4] Will let the Sendable PR take care of thread safety --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index acc6159..48b8aa7 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -8,4 +8,4 @@ jobs: uses: vapor/ci/.github/workflows/run-unit-tests.yml@reusable-workflows with: with_coverage: true - with_tsan: true + with_tsan: false