From 895858aaca0c8adcfa8e4248f0dceea4472117ef Mon Sep 17 00:00:00 2001 From: Gwynne Raskind Date: Thu, 13 Jun 2024 17:56:58 -0500 Subject: [PATCH] Add some missing tests, add AsyncTestQueueDriver to XCTQueues, respect LOG_LEVEL env var in tests --- Sources/XCTQueues/TestQueueDriver.swift | 57 +++- Tests/QueuesTests/AsyncQueueTests.swift | 36 ++- Tests/QueuesTests/QueueTests.swift | 283 +++++++++++++++++-- Tests/QueuesTests/ScheduleBuilderTests.swift | 5 +- Tests/QueuesTests/Utilities.swift | 15 + 5 files changed, 370 insertions(+), 26 deletions(-) create mode 100644 Tests/QueuesTests/Utilities.swift diff --git a/Sources/XCTQueues/TestQueueDriver.swift b/Sources/XCTQueues/TestQueueDriver.swift index fcc2d0f..ace6885 100644 --- a/Sources/XCTQueues/TestQueueDriver.swift +++ b/Sources/XCTQueues/TestQueueDriver.swift @@ -10,6 +10,13 @@ extension Application.Queues.Provider { return $0.queues.use(custom: TestQueuesDriver()) } } + + public static var asyncTest: Self { + .init { + $0.queues.initializeAsyncTestStorage() + return $0.queues.use(custom: AsyncTestQueuesDriver()) + } + } } struct TestQueuesDriver: QueuesDriver { @@ -24,10 +31,29 @@ struct TestQueuesDriver: QueuesDriver { } } +struct AsyncTestQueuesDriver: QueuesDriver { + init() {} + func makeQueue(with context: QueueContext) -> any Queue { AsyncTestQueue(_context: .init(context)) } + func shutdown() {} +} + extension Application.Queues { - public final class TestQueueStorage: @unchecked Sendable { - public var jobs: [JobIdentifier: JobData] = [:] - public var queue: [JobIdentifier] = [] + public final class TestQueueStorage: Sendable { + private struct Box: Sendable { + var jobs: [JobIdentifier: JobData] = [:] + var queue: [JobIdentifier] = [] + } + private let box = NIOLockedValueBox(.init()) + + public var jobs: [JobIdentifier: JobData] { + get { self.box.withLockedValue { $0.jobs } } + set { self.box.withLockedValue { $0.jobs = newValue } } + } + + public var queue: [JobIdentifier] { + get { self.box.withLockedValue { $0.queue } } + set { self.box.withLockedValue { $0.queue = newValue } } + } /// Returns the payloads of all jobs in the queue having type `J`. public func all(_ job: J.Type) -> [J.Payload] { @@ -58,7 +84,7 @@ extension Application.Queues { struct TestQueueKey: StorageKey, LockKey { typealias Value = TestQueueStorage } - + public var test: TestQueueStorage { self.application.storage[TestQueueKey.self]! } @@ -66,6 +92,18 @@ extension Application.Queues { func initializeTestStorage() { self.application.storage[TestQueueKey.self] = .init() } + + struct AsyncTestQueueKey: StorageKey, LockKey { + typealias Value = TestQueueStorage + } + + public var asyncTest: TestQueueStorage { + self.application.storage[AsyncTestQueueKey.self]! + } + + func initializeAsyncTestStorage() { + self.application.storage[AsyncTestQueueKey.self] = .init() + } } struct TestQueue: Queue { @@ -106,3 +144,14 @@ struct TestQueue: Queue { } } } + +struct AsyncTestQueue: AsyncQueue { + let _context: NIOLockedValueBox + var context: QueueContext { self._context.withLockedValue { $0 } } + + func get(_ id: JobIdentifier) async throws -> JobData { self._context.withLockedValue { $0.application.queues.asyncTest.jobs[id]! } } + func set(_ id: JobIdentifier, to data: JobData) async throws { self._context.withLockedValue { $0.application.queues.asyncTest.jobs[id] = data } } + func clear(_ id: JobIdentifier) async throws { self._context.withLockedValue { $0.application.queues.asyncTest.jobs[id] = nil } } + func pop() async throws -> JobIdentifier? { self._context.withLockedValue { $0.application.queues.asyncTest.queue.popLast() } } + func push(_ id: JobIdentifier) async throws { self._context.withLockedValue { $0.application.queues.asyncTest.queue.append(id) } } +} diff --git a/Tests/QueuesTests/AsyncQueueTests.swift b/Tests/QueuesTests/AsyncQueueTests.swift index 114a111..d77cd01 100644 --- a/Tests/QueuesTests/AsyncQueueTests.swift +++ b/Tests/QueuesTests/AsyncQueueTests.swift @@ -17,6 +17,10 @@ func XCTAssertNoThrowAsync( final class AsyncQueueTests: XCTestCase { var app: Application! + override class func setUp() { + XCTAssert(isLoggingConfigured) + } + override func setUp() async throws { app = try await Application.make(.testing) } @@ -25,7 +29,7 @@ final class AsyncQueueTests: XCTestCase { try await app.asyncShutdown() } - func testAsyncJob() async throws { + func testAsyncJobWithSyncQueue() async throws { app.queues.use(.test) let promise = app.eventLoopGroup.any().makePromise(of: Void.self) @@ -54,6 +58,36 @@ final class AsyncQueueTests: XCTestCase { await XCTAssertNoThrowAsync(try await promise.futureResult.get()) } + + func testAsyncJobWithAsyncQueue() async throws { + app.queues.use(.asyncTest) + + let promise = app.eventLoopGroup.any().makePromise(of: Void.self) + app.queues.add(MyAsyncJob(promise: promise)) + + app.get("foo") { req in + try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "bar")) + return "done" + } + + try await app.testable().test(.GET, "foo") { res async in + XCTAssertEqual(res.status, .ok) + XCTAssertEqual(res.body.string, "done") + } + + XCTAssertEqual(app.queues.asyncTest.queue.count, 1) + XCTAssertEqual(app.queues.asyncTest.jobs.count, 1) + let job = app.queues.asyncTest.first(MyAsyncJob.self) + XCTAssert(app.queues.asyncTest.contains(MyAsyncJob.self)) + XCTAssertNotNil(job) + XCTAssertEqual(job!.foo, "bar") + + try await app.queues.queue.worker.run().get() + XCTAssertEqual(app.queues.asyncTest.queue.count, 0) + XCTAssertEqual(app.queues.asyncTest.jobs.count, 0) + + await XCTAssertNoThrowAsync(try await promise.futureResult.get()) + } } struct MyAsyncJob: AsyncJob { diff --git a/Tests/QueuesTests/QueueTests.swift b/Tests/QueuesTests/QueueTests.swift index c21eb5e..cd30736 100644 --- a/Tests/QueuesTests/QueueTests.swift +++ b/Tests/QueuesTests/QueueTests.swift @@ -19,9 +19,39 @@ func XCTAssertEqualAsync( } } +func XCTAssertTrueAsync( + _ predicate: @autoclosure () async throws -> Bool, + _ message: @autoclosure () -> String = "", + file: StaticString = #filePath, line: UInt = #line +) async { + do { + let result = try await predicate() + XCTAssertTrue(result, message(), file: file, line: line) + } catch { + return XCTAssertTrue(try { throw error }(), message(), file: file, line: line) + } +} + +func XCTAssertFalseAsync( + _ predicate: @autoclosure () async throws -> Bool, + _ message: @autoclosure () -> String = "", + file: StaticString = #filePath, line: UInt = #line +) async { + do { + let result = try await predicate() + XCTAssertFalse(result, message(), file: file, line: line) + } catch { + return XCTAssertFalse(try { throw error }(), message(), file: file, line: line) + } +} + final class QueueTests: XCTestCase { var app: Application! - + + override class func setUp() { + XCTAssert(isLoggingConfigured) + } + override func setUp() async throws { self.app = try await Application.make(.testing) self.app.queues.use(.test) @@ -33,29 +63,40 @@ final class QueueTests: XCTestCase { } func testVaporIntegrationWithInProcessJob() async throws { - let jobSignal = self.app.eventLoopGroup.any().makePromise(of: String.self) - self.app.queues.add(Foo(promise: jobSignal)) + let jobSignal1 = self.app.eventLoopGroup.any().makePromise(of: String.self) + self.app.queues.add(Foo1(promise: jobSignal1)) + let jobSignal2 = self.app.eventLoopGroup.any().makePromise(of: String.self) + self.app.queues.add(Foo2(promise: jobSignal2)) try self.app.queues.startInProcessJobs(on: .default) - self.app.get("bar") { req in - try await req.queue.dispatch(Foo.self, .init(foo: "Bar payload")) + self.app.get("bar1") { req in + try await req.queue.dispatch(Foo1.self, .init(foo: "Bar payload")).get() + return "job bar dispatched" + } + + self.app.get("bar2") { req in + try await req.queue.dispatch(Foo2.self, .init(foo: "Bar payload")) return "job bar dispatched" } - try await self.app.testable().test(.GET, "bar") { res async in + try await self.app.testable().test(.GET, "bar1") { res async in + XCTAssertEqual(res.status, .ok) + XCTAssertEqual(res.body.string, "job bar dispatched") + }.test(.GET, "bar2") { res async in XCTAssertEqual(res.status, .ok) XCTAssertEqual(res.body.string, "job bar dispatched") } - await XCTAssertEqualAsync(try await jobSignal.futureResult.get(), "Bar payload") + await XCTAssertEqualAsync(try await jobSignal1.futureResult.get(), "Bar payload") + await XCTAssertEqualAsync(try await jobSignal2.futureResult.get(), "Bar payload") } func testVaporIntegration() async throws { let promise = self.app.eventLoopGroup.any().makePromise(of: String.self) - self.app.queues.add(Foo(promise: promise)) - + self.app.queues.add(Foo1(promise: promise)) + self.app.get("foo") { req in - try await req.queue.dispatch(Foo.self, .init(foo: "bar")) + try await req.queue.dispatch(Foo1.self, .init(foo: "bar")) return "done" } @@ -66,8 +107,8 @@ final class QueueTests: XCTestCase { XCTAssertEqual(self.app.queues.test.queue.count, 1) XCTAssertEqual(self.app.queues.test.jobs.count, 1) - let job = self.app.queues.test.first(Foo.self) - XCTAssert(self.app.queues.test.contains(Foo.self)) + let job = self.app.queues.test.first(Foo1.self) + XCTAssert(self.app.queues.test.contains(Foo1.self)) XCTAssertNotNil(job) XCTAssertEqual(job!.foo, "bar") @@ -80,10 +121,10 @@ final class QueueTests: XCTestCase { func testSettingCustomId() async throws { let promise = self.app.eventLoopGroup.any().makePromise(of: String.self) - self.app.queues.add(Foo(promise: promise)) - + self.app.queues.add(Foo1(promise: promise)) + self.app.get("foo") { req in - try await req.queue.dispatch(Foo.self, .init(foo: "bar"), id: JobIdentifier(string: "my-custom-id")) + try await req.queue.dispatch(Foo1.self, .init(foo: "bar"), id: JobIdentifier(string: "my-custom-id")) return "done" } @@ -141,6 +182,21 @@ final class QueueTests: XCTestCase { try await promise.futureResult.get() } + func testAsyncRepeatingScheduledJob() async throws { + let scheduledJob = AsyncTestingScheduledJob() + XCTAssertEqual(scheduledJob.count.load(ordering: .relaxed), 0) + self.app.queues.schedule(scheduledJob).everySecond() + try self.app.queues.startScheduledJobs() + + let promise = self.app.eventLoopGroup.any().makePromise(of: Void.self) + self.app.eventLoopGroup.any().scheduleTask(in: .seconds(5)) { + XCTAssert(scheduledJob.count.load(ordering: .relaxed) > 4) + promise.succeed() + } + + try await promise.futureResult.get() + } + func testFailingScheduledJob() async throws { self.app.queues.schedule(FailingScheduledJob()).everySecond() try self.app.queues.startScheduledJobs() @@ -152,6 +208,17 @@ final class QueueTests: XCTestCase { try await promise.futureResult.get() } + func testAsyncFailingScheduledJob() async throws { + self.app.queues.schedule(AsyncFailingScheduledJob()).everySecond() + try self.app.queues.startScheduledJobs() + + let promise = self.app.eventLoopGroup.any().makePromise(of: Void.self) + self.app.eventLoopGroup.any().scheduleTask(in: .seconds(1)) { + promise.succeed() + } + try await promise.futureResult.get() + } + func testCustomWorkerCount() async throws { // Setup custom ELG with 4 threads let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 4) @@ -177,14 +244,14 @@ final class QueueTests: XCTestCase { let errorHook = ErrorHook() let dispatchHook = DispatchHook() let dequeuedHook = DequeuedHook() - self.app.queues.add(Foo(promise: promise)) + self.app.queues.add(Foo1(promise: promise)) self.app.queues.add(successHook) self.app.queues.add(errorHook) self.app.queues.add(dispatchHook) self.app.queues.add(dequeuedHook) self.app.get("foo") { req in - try await req.queue.dispatch(Foo.self, .init(foo: "bar")) + try await req.queue.dispatch(Foo1.self, .init(foo: "bar")) return "done" } @@ -199,8 +266,8 @@ final class QueueTests: XCTestCase { XCTAssertEqual(errorHook.errorCount, 0) XCTAssertEqual(self.app.queues.test.queue.count, 1) XCTAssertEqual(self.app.queues.test.jobs.count, 1) - let job = self.app.queues.test.first(Foo.self) - XCTAssert(self.app.queues.test.contains(Foo.self)) + let job = self.app.queues.test.first(Foo1.self) + XCTAssert(self.app.queues.test.contains(Foo1.self)) XCTAssertNotNil(job) XCTAssertEqual(job!.foo, "bar") XCTAssertFalse(dequeuedHook.successHit) @@ -215,6 +282,50 @@ final class QueueTests: XCTestCase { await XCTAssertEqualAsync(try await promise.futureResult.get(), "bar") } + func testAsyncSuccessHooks() async throws { + let promise = self.app.eventLoopGroup.any().makePromise(of: String.self) + let successHook = AsyncSuccessHook() + let errorHook = AsyncErrorHook() + let dispatchHook = AsyncDispatchHook() + let dequeuedHook = AsyncDequeuedHook() + self.app.queues.add(Foo1(promise: promise)) + self.app.queues.add(successHook) + self.app.queues.add(errorHook) + self.app.queues.add(dispatchHook) + self.app.queues.add(dequeuedHook) + + self.app.get("foo") { req in + try await req.queue.dispatch(Foo1.self, .init(foo: "bar")) + return "done" + } + + await XCTAssertFalseAsync(await dispatchHook.successHit) + try await self.app.testable().test(.GET, "foo") { res async in + XCTAssertEqual(res.status, .ok) + XCTAssertEqual(res.body.string, "done") + await XCTAssertTrueAsync(await dispatchHook.successHit) + } + + await XCTAssertFalseAsync(await successHook.successHit) + await XCTAssertEqualAsync(await errorHook.errorCount, 0) + XCTAssertEqual(self.app.queues.test.queue.count, 1) + XCTAssertEqual(self.app.queues.test.jobs.count, 1) + let job = self.app.queues.test.first(Foo1.self) + XCTAssert(self.app.queues.test.contains(Foo1.self)) + XCTAssertNotNil(job) + XCTAssertEqual(job!.foo, "bar") + await XCTAssertFalseAsync(await dequeuedHook.successHit) + + try await self.app.queues.queue.worker.run() + await XCTAssertTrueAsync(await successHook.successHit) + await XCTAssertEqualAsync(await errorHook.errorCount, 0) + XCTAssertEqual(self.app.queues.test.queue.count, 0) + XCTAssertEqual(self.app.queues.test.jobs.count, 0) + await XCTAssertTrueAsync(await dequeuedHook.successHit) + + await XCTAssertEqualAsync(try await promise.futureResult.get(), "bar") + } + func testFailureHooks() async throws { self.app.queues.use(.test) self.app.queues.add(Bar()) @@ -251,6 +362,42 @@ final class QueueTests: XCTestCase { XCTAssertEqual(self.app.queues.test.jobs.count, 0) } + func testAsyncFailureHooks() async throws { + self.app.queues.use(.test) + self.app.queues.add(Bar()) + let successHook = AsyncSuccessHook() + let errorHook = AsyncErrorHook() + self.app.queues.add(successHook) + self.app.queues.add(errorHook) + + self.app.get("foo") { req in + try await req.queue.dispatch(Bar.self, .init(foo: "bar"), maxRetryCount: 3) + return "done" + } + + try await self.app.testable().test(.GET, "foo") { res async in + XCTAssertEqual(res.status, .ok) + XCTAssertEqual(res.body.string, "done") + } + + await XCTAssertFalseAsync(await successHook.successHit) + await XCTAssertEqualAsync(await errorHook.errorCount, 0) + XCTAssertEqual(self.app.queues.test.queue.count, 1) + XCTAssertEqual(self.app.queues.test.jobs.count, 1) + let job = self.app.queues.test.first(Bar.self) + XCTAssert(self.app.queues.test.contains(Bar.self)) + XCTAssertNotNil(job) + + try await self.app.queues.queue.worker.run() + try await self.app.queues.queue.worker.run() + try await self.app.queues.queue.worker.run() + try await self.app.queues.queue.worker.run() + await XCTAssertFalseAsync(await successHook.successHit) + await XCTAssertEqualAsync(await errorHook.errorCount, 1) + XCTAssertEqual(self.app.queues.test.queue.count, 0) + XCTAssertEqual(self.app.queues.test.jobs.count, 0) + } + func testFailureHooksWithDelay() async throws { self.app.queues.add(Baz()) let successHook = SuccessHook() @@ -293,6 +440,55 @@ final class QueueTests: XCTestCase { XCTAssertEqual(self.app.queues.test.queue.count, 0) XCTAssertEqual(self.app.queues.test.jobs.count, 0) } + + func testAsyncFailureHooksWithDelay() async throws { + self.app.queues.add(Baz()) + let successHook = AsyncSuccessHook() + let errorHook = AsyncErrorHook() + self.app.queues.add(successHook) + self.app.queues.add(errorHook) + + self.app.get("foo") { req in + try await req.queue.dispatch(Baz.self, .init(foo: "baz"), maxRetryCount: 1) + return "done" + } + + try await self.app.testable().test(.GET, "foo") { res async in + XCTAssertEqual(res.status, .ok) + XCTAssertEqual(res.body.string, "done") + } + + await XCTAssertFalseAsync(await successHook.successHit) + await XCTAssertEqualAsync(await errorHook.errorCount, 0) + XCTAssertEqual(self.app.queues.test.queue.count, 1) + XCTAssertEqual(self.app.queues.test.jobs.count, 1) + var job = self.app.queues.test.first(Baz.self) + XCTAssert(self.app.queues.test.contains(Baz.self)) + XCTAssertNotNil(job) + + try await self.app.queues.queue.worker.run() + await XCTAssertFalseAsync(await successHook.successHit) + await XCTAssertEqualAsync(await errorHook.errorCount, 0) + XCTAssertEqual(self.app.queues.test.queue.count, 1) + XCTAssertEqual(self.app.queues.test.jobs.count, 1) + job = self.app.queues.test.first(Baz.self) + XCTAssert(self.app.queues.test.contains(Baz.self)) + XCTAssertNotNil(job) + + sleep(1) + + try await self.app.queues.queue.worker.run() + await XCTAssertFalseAsync(await successHook.successHit) + await XCTAssertEqualAsync(await errorHook.errorCount, 1) + XCTAssertEqual(self.app.queues.test.queue.count, 0) + XCTAssertEqual(self.app.queues.test.jobs.count, 0) + } + + func testStuffThatIsntActuallyUsedAnywhere() { + XCTAssertEqual(self.app.queues.queue(.default).key, "vapor_queues[default]") + XCTAssertNotNil(QueuesEventLoopPreference.indifferent.delegate(for: self.app.eventLoopGroup)) + XCTAssertNotNil(QueuesEventLoopPreference.delegate(on: self.app.eventLoopGroup.any()).delegate(for: self.app.eventLoopGroup)) + } } final class DispatchHook: JobEventDelegate, @unchecked Sendable { @@ -331,6 +527,26 @@ final class DequeuedHook: JobEventDelegate, @unchecked Sendable { } } +actor AsyncDispatchHook: AsyncJobEventDelegate { + var successHit = false + func dispatched(job: JobEventData) async throws { self.successHit = true } +} + +actor AsyncSuccessHook: AsyncJobEventDelegate { + var successHit = false + func success(jobId: String) async throws { self.successHit = true } +} + +actor AsyncErrorHook: AsyncJobEventDelegate { + var errorCount = 0 + func error(jobId: String, error: any Error) async throws { self.errorCount += 1 } +} + +actor AsyncDequeuedHook: AsyncJobEventDelegate { + var successHit = false + func didDequeue(jobId: String) async throws { self.successHit = true } +} + final class WorkerCountDriver: QueuesDriver, @unchecked Sendable { let count: EventLoopPromise let lock: NIOLock @@ -377,10 +593,15 @@ final class WorkerCountDriver: QueuesDriver, @unchecked Sendable { } struct Failure: Error {} + struct FailingScheduledJob: ScheduledJob { func run(context: QueueContext) -> EventLoopFuture { context.eventLoop.makeFailedFuture(Failure()) } } +struct AsyncFailingScheduledJob: AsyncScheduledJob { + func run(context: QueueContext) async throws { throw Failure() } +} + struct TestingScheduledJob: ScheduledJob { var count = ManagedAtomic(0) @@ -390,8 +611,30 @@ struct TestingScheduledJob: ScheduledJob { } } +struct AsyncTestingScheduledJob: AsyncScheduledJob { + var count = ManagedAtomic(0) + func run(context: QueueContext) async throws { self.count.wrappingIncrement(ordering: .relaxed) } +} + +struct Foo1: Job { + let promise: EventLoopPromise + + struct Data: Codable { + var foo: String + } + + func dequeue(_ context: QueueContext, _ data: Data) -> EventLoopFuture { + self.promise.succeed(data.foo) + return context.eventLoop.makeSucceededVoidFuture() + } + + func error(_ context: QueueContext, _ error: any Error, _ data: Data) -> EventLoopFuture { + self.promise.fail(error) + return context.eventLoop.makeSucceededVoidFuture() + } +} -struct Foo: Job { +struct Foo2: Job { let promise: EventLoopPromise struct Data: Codable { diff --git a/Tests/QueuesTests/ScheduleBuilderTests.swift b/Tests/QueuesTests/ScheduleBuilderTests.swift index f95d04b..4eb4dd9 100644 --- a/Tests/QueuesTests/ScheduleBuilderTests.swift +++ b/Tests/QueuesTests/ScheduleBuilderTests.swift @@ -3,6 +3,10 @@ import Queues import XCTest final class ScheduleBuilderTests: XCTestCase { + override class func setUp() { + XCTAssert(isLoggingConfigured) + } + func testHourlyBuilder() throws { let builder = ScheduleBuilder() builder.hourly().at(30) @@ -151,7 +155,6 @@ final class ScheduleBuilderTests: XCTestCase { // one hour later Date(calendar: est, hour: 21, minute: 00) ) - } } diff --git a/Tests/QueuesTests/Utilities.swift b/Tests/QueuesTests/Utilities.swift new file mode 100644 index 0000000..90cb197 --- /dev/null +++ b/Tests/QueuesTests/Utilities.swift @@ -0,0 +1,15 @@ +import Logging +import class Foundation.ProcessInfo + +func env(_ name: String) -> String? { + return ProcessInfo.processInfo.environment[name] +} + +let isLoggingConfigured: Bool = { + LoggingSystem.bootstrap { label in + var handler = StreamLogHandler.standardOutput(label: label) + handler.logLevel = env("LOG_LEVEL").flatMap { .init(rawValue: $0) } ?? .info + return handler + } + return true +}()