Skip to content

Commit

Permalink
Adopt Async Lifecycle Handler (#132)
Browse files Browse the repository at this point in the history
  • Loading branch information
0xTim authored May 31, 2024
1 parent 54763e7 commit 9117f54
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 9 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ jobs:
uses: vapor/ci/.github/workflows/run-unit-tests.yml@reusable-workflows
with:
with_coverage: true
with_tsan: true
with_tsan: false
4 changes: 2 additions & 2 deletions Package.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// swift-tools-version:5.6
// swift-tools-version:5.8
import PackageDescription

let package = Package(
Expand All @@ -14,7 +14,7 @@ let package = Package(
.library(name: "XCTQueues", targets: ["XCTQueues"])
],
dependencies: [
.package(url: "https://github.com/vapor/vapor.git", from: "4.76.2"),
.package(url: "https://github.com/vapor/vapor.git", from: "4.101.1"),
.package(url: "https://github.com/apple/swift-nio.git", from: "2.53.0"),
],
targets: [
Expand Down
9 changes: 9 additions & 0 deletions Sources/Queues/Application+Queues.swift
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ extension Application {
driver.shutdown()
}
}

func shutdownAsync(_ application: Application) async {
for command in application.queues.storage.commands {
await command.asyncShutdown()
}
if let driver = application.queues.storage.driver {
await driver.asyncShutdown()
}
}
}

/// The `QueuesConfiguration` object
Expand Down
26 changes: 26 additions & 0 deletions Sources/Queues/QueuesCommand.swift
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,32 @@ public final class QueuesCommand: Command {
}
}

public func asyncShutdown() async {
self.lock.lock()

self.isShuttingDown.store(true, ordering: .relaxed)
self.didShutdown = true

// stop running in case shutting downf rom signal
self.application.running?.stop()

// clear signal sources
self.signalSources.forEach { $0.cancel() } // clear refs
self.signalSources = []

// Release the lock before we start any suspensions
self.lock.unlock()

// stop all job queue workers
for jobTask in self.jobTasks {
await jobTask.asyncCancel(on: self.eventLoopGroup.any())
}
// stop all scheduled jobs
for scheduledTask in self.scheduledTasks.values {
await scheduledTask.task.asyncCancel(on: self.eventLoopGroup.any())
}
}

deinit {
assert(self.didShutdown, "JobsCommand did not shutdown before deinit")
}
Expand Down
9 changes: 9 additions & 0 deletions Sources/Queues/QueuesDriver.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,13 @@ public protocol QueuesDriver {

/// Shuts down the driver
func shutdown()

/// Shut down the driver asyncrhonously. Helps avoid calling `.wait()`
func asyncShutdown() async
}

extension QueuesDriver {
public func asyncShutdown() async {
shutdown()
}
}
12 changes: 11 additions & 1 deletion Sources/Queues/RepeatedTask+Cancel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,14 @@ extension RepeatedTask {
print("failed cancelling repeated task \(error)")
}
}
}

func asyncCancel(on eventLoop: EventLoop) async {
do {
let promise = eventLoop.makePromise(of: Void.self)
self.cancel(promise: promise)
try await promise.futureResult.get()
} catch {
print("failed cancelling repeated task \(error)")
}
}
}
18 changes: 13 additions & 5 deletions Tests/QueuesTests/AsyncQueueTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,17 @@ import NIOCore
import NIOConcurrencyHelpers

final class AsyncQueueTests: XCTestCase {
func testAsyncJob() throws {
let app = Application(.testing)
defer { app.shutdown() }
var app: Application!

override func setUp() async throws {
app = try await Application.make(.testing)
}

override func tearDown() async throws {
try await app.asyncShutdown()
}

func testAsyncJob() async throws {
app.queues.use(.test)

let promise = app.eventLoopGroup.any().makePromise(of: Void.self)
Expand All @@ -22,7 +30,7 @@ final class AsyncQueueTests: XCTestCase {
.map { _ in "done" }
}

try app.testable().test(.GET, "foo") { res in
try await app.testable().test(.GET, "foo") { res async in
XCTAssertEqual(res.status, .ok)
XCTAssertEqual(res.body.string, "done")
}
Expand All @@ -34,7 +42,7 @@ final class AsyncQueueTests: XCTestCase {
XCTAssertNotNil(job)
XCTAssertEqual(job!.foo, "bar")

try app.queues.queue.worker.run().wait()
try await app.queues.queue.worker.run().get()
XCTAssertEqual(app.queues.test.queue.count, 0)
XCTAssertEqual(app.queues.test.jobs.count, 0)

Expand Down

0 comments on commit 9117f54

Please sign in to comment.