From a5c4452b405a301c9ab362e0a48d13949fcc3551 Mon Sep 17 00:00:00 2001 From: Hadi Sharghi Date: Sat, 26 Aug 2023 18:38:56 +0330 Subject: [PATCH] Add optional `didDequeue`, `success` and `error` notification hook methods (delegate) and their async versions, to pass the `JobEventData` to the delegate --- Sources/Queues/AsyncJobEventDelegate.swift | 37 +++++++++++++++++++ Sources/Queues/NotificationHook.swift | 32 +++++++++++++++++ Sources/Queues/QueueWorker.swift | 18 ++++++---- Tests/QueuesTests/QueueTests.swift | 41 +++++++++++++++++++++- 4 files changed, 121 insertions(+), 7 deletions(-) diff --git a/Sources/Queues/AsyncJobEventDelegate.swift b/Sources/Queues/AsyncJobEventDelegate.swift index d76fad4..c9589af 100644 --- a/Sources/Queues/AsyncJobEventDelegate.swift +++ b/Sources/Queues/AsyncJobEventDelegate.swift @@ -12,23 +12,42 @@ public protocol AsyncJobEventDelegate: JobEventDelegate { /// - jobId: The id of the Job func didDequeue(jobId: String) async throws + /// Called when the job is dequeued + /// - Parameters: + /// - job: The `JobData` associated with the job + func didDequeue(job: JobEventData) async throws + /// Called when the job succeeds /// - Parameters: /// - jobId: The id of the Job func success(jobId: String) async throws + /// Called when the job succeeds + /// - Parameters: + /// - job: The `JobData` associated with the job + func success(job: JobEventData) async throws + /// Called when the job returns an error /// - Parameters: /// - jobId: The id of the Job /// - error: The error that caused the job to fail func error(jobId: String, error: Error) async throws + + /// Called when the job returns an error + /// - Parameters: + /// - job: The `JobData` associated with the job + /// - error: The error that caused the job to fail + func error(job: JobEventData, error: Error) async throws } extension AsyncJobEventDelegate { public func dispatched(job: JobEventData) async throws { } public func didDequeue(jobId: String) async throws { } + public func didDequeue(job: JobEventData) async throws { } public func success(jobId: String) async throws { } + public func success(job: JobEventData) async throws { } public func error(jobId: String, error: Error) async throws { } + public func error(job: JobEventData) async throws { } public func dispatched(job: JobEventData, eventLoop: EventLoop) -> EventLoopFuture { eventLoop.makeFutureWithTask { @@ -41,6 +60,12 @@ extension AsyncJobEventDelegate { try await self.didDequeue(jobId: jobId) } } + + public func didDequeue(job: JobEventData, eventLoop: EventLoop) -> EventLoopFuture { + eventLoop.makeFutureWithTask { + try await self.didDequeue(job: job) + } + } public func success(jobId: String, eventLoop: EventLoop) -> EventLoopFuture { eventLoop.makeFutureWithTask { @@ -48,9 +73,21 @@ extension AsyncJobEventDelegate { } } + public func success(job: JobEventData, eventLoop: EventLoop) -> EventLoopFuture { + eventLoop.makeFutureWithTask { + try await self.success(job: job) + } + } + public func error(jobId: String, error: Error, eventLoop: EventLoop) -> EventLoopFuture { eventLoop.makeFutureWithTask { try await self.error(jobId: jobId, error: error) } } + + public func error(job: JobEventData, error: Error, eventLoop: EventLoop) -> EventLoopFuture { + eventLoop.makeFutureWithTask { + try await self.error(job: job, error: error) + } + } } diff --git a/Sources/Queues/NotificationHook.swift b/Sources/Queues/NotificationHook.swift index ea317d9..3b6f864 100644 --- a/Sources/Queues/NotificationHook.swift +++ b/Sources/Queues/NotificationHook.swift @@ -16,6 +16,12 @@ public protocol JobEventDelegate { /// - eventLoop: The eventLoop func didDequeue(jobId: String, eventLoop: EventLoop) -> EventLoopFuture + /// Called when the job is dequeued + /// - Parameters: + /// - job: The `JobData` associated with the job + /// - eventLoop: The eventLoop + func didDequeue(job: JobEventData, eventLoop: EventLoop) -> EventLoopFuture + /// Called when the job succeeds /// - Parameters: @@ -23,12 +29,26 @@ public protocol JobEventDelegate { /// - eventLoop: The eventLoop func success(jobId: String, eventLoop: EventLoop) -> EventLoopFuture + /// Called when the job succeeds + /// - Parameters: + /// - job: The `JobData` associated with the job + /// - eventLoop: The eventLoop + func success(job: JobEventData, eventLoop: EventLoop) -> EventLoopFuture + + /// Called when the job returns an error /// - Parameters: /// - jobId: The id of the Job /// - error: The error that caused the job to fail /// - eventLoop: The eventLoop func error(jobId: String, error: Error, eventLoop: EventLoop) -> EventLoopFuture + + /// Called when the job returns an error + /// - Parameters: + /// - job: The `JobData` associated with the job + /// - error: The error that caused the job to fail + /// - eventLoop: The eventLoop + func error(job: JobEventData, error: Error, eventLoop: EventLoop) -> EventLoopFuture } extension JobEventDelegate { @@ -40,13 +60,25 @@ extension JobEventDelegate { eventLoop.future() } + public func didDequeue(job: JobEventData, eventLoop: EventLoop) -> EventLoopFuture { + eventLoop.future() + } + public func success(jobId: String, eventLoop: EventLoop) -> EventLoopFuture { eventLoop.future() } + public func success(job: JobEventData, eventLoop: EventLoop) -> EventLoopFuture { + eventLoop.future() + } + public func error(jobId: String, error: Error, eventLoop: EventLoop) -> EventLoopFuture { eventLoop.future() } + + public func error(job: JobEventData, error: Error, eventLoop: EventLoop) -> EventLoopFuture { + eventLoop.future() + } } /// Data on a job sent via a notification diff --git a/Sources/Queues/QueueWorker.swift b/Sources/Queues/QueueWorker.swift index b375188..4bf6fdd 100644 --- a/Sources/Queues/QueueWorker.swift +++ b/Sources/Queues/QueueWorker.swift @@ -48,8 +48,10 @@ public struct QueueWorker { logger.trace("Sending dequeued notification hooks") - return self.queue.configuration.notificationHooks.map { - $0.didDequeue(jobId: id.string, eventLoop: self.queue.eventLoop) + return self.queue.configuration.notificationHooks.map { hook in + hook.didDequeue(jobId: id.string, eventLoop: self.queue.eventLoop).flatMap { + hook.didDequeue(job: .init(id: id.string, queueName: self.queue.queueName.string, jobData: data), eventLoop: self.queue.eventLoop) + } }.flatten(on: self.queue.eventLoop).flatMapError { error in logger.error("Could not send didDequeue notification: \(error)") return self.queue.eventLoop.future() @@ -90,8 +92,10 @@ public struct QueueWorker { return futureJob.flatMap { complete in logger.trace("Ran job successfully") logger.trace("Sending success notification hooks") - return self.queue.configuration.notificationHooks.map { - $0.success(jobId: id.string, eventLoop: self.queue.context.eventLoop) + return self.queue.configuration.notificationHooks.map { hook in + hook.success(jobId: id.string, eventLoop: self.queue.context.eventLoop).flatMap { + hook.success(job: .init(id: id.string, queueName: self.queue.queueName.string, jobData: jobData), eventLoop: self.queue.context.eventLoop) + } }.flatten(on: self.queue.context.eventLoop).flatMapError { error in self.queue.logger.error("Could not send success notification: \(error)") return self.queue.context.eventLoop.future() @@ -110,8 +114,10 @@ public struct QueueWorker { logger.trace("Sending failure notification hooks") return job._error(self.queue.context, id: id.string, error, payload: payload).flatMap { _ in - return self.queue.configuration.notificationHooks.map { - $0.error(jobId: id.string, error: error, eventLoop: self.queue.context.eventLoop) + return self.queue.configuration.notificationHooks.map { hook in + hook.error(jobId: id.string, error: error, eventLoop: self.queue.context.eventLoop).flatMap { + hook.error(job: .init(id: id.string, queueName: self.queue.queueName.string, jobData: jobData), error: error, eventLoop: self.queue.context.eventLoop) + } }.flatten(on: self.queue.context.eventLoop).flatMapError { error in self.queue.logger.error("Failed to send error notification: \(error)") return self.queue.context.eventLoop.future() diff --git a/Tests/QueuesTests/QueueTests.swift b/Tests/QueuesTests/QueueTests.swift index 562287b..c50f634 100644 --- a/Tests/QueuesTests/QueueTests.swift +++ b/Tests/QueuesTests/QueueTests.swift @@ -198,6 +198,7 @@ final class QueueTests: XCTestCase { app.queues.add(DequeuedHook()) ErrorHook.errorCount = 0 DequeuedHook.successHit = false + DequeuedHook.foo = "" app.get("foo") { req in req.queue.dispatch(Foo.self, .init(foo: "bar")) @@ -223,10 +224,12 @@ final class QueueTests: XCTestCase { try app.queues.queue.worker.run().wait() XCTAssertEqual(SuccessHook.successHit, true) + XCTAssertEqual(SuccessHook.foo, "bar") XCTAssertEqual(ErrorHook.errorCount, 0) XCTAssertEqual(app.queues.test.queue.count, 0) XCTAssertEqual(app.queues.test.jobs.count, 0) XCTAssertEqual(DequeuedHook.successHit, true) + XCTAssertEqual(DequeuedHook.foo, "bar") try XCTAssertEqual(promise.futureResult.wait(), "bar") } @@ -239,6 +242,7 @@ final class QueueTests: XCTestCase { app.queues.add(SuccessHook()) app.queues.add(ErrorHook()) ErrorHook.errorCount = 0 + ErrorHook.foo = "" app.get("foo") { req in req.queue.dispatch(Bar.self, .init(foo: "bar"), maxRetryCount: 3) @@ -261,6 +265,7 @@ final class QueueTests: XCTestCase { try app.queues.queue.worker.run().wait() XCTAssertEqual(SuccessHook.successHit, false) XCTAssertEqual(ErrorHook.errorCount, 1) + XCTAssertEqual(ErrorHook.foo, "bar") XCTAssertEqual(app.queues.test.queue.count, 0) XCTAssertEqual(app.queues.test.jobs.count, 0) } @@ -322,29 +327,63 @@ class DispatchHook: JobEventDelegate { class SuccessHook: JobEventDelegate { static var successHit = false - + static var foo = "" + func success(jobId: String, eventLoop: EventLoop) -> EventLoopFuture { Self.successHit = true return eventLoop.future() } + + func success(job: JobEventData, eventLoop: EventLoop) -> EventLoopFuture { + do { + let payload = try JSONDecoder().decode(Foo.Data.self, from: .init(job.payload)) + Self.foo = payload.foo + return eventLoop.future() + } catch { + return eventLoop.makeFailedFuture(error) + } + } } class ErrorHook: JobEventDelegate { static var errorCount = 0 + static var foo = "" func error(jobId: String, error: Error, eventLoop: EventLoop) -> EventLoopFuture { Self.errorCount += 1 return eventLoop.future() } + + func error(job: JobEventData, error: Error, eventLoop: EventLoop) -> EventLoopFuture { + do { + let payload = try JSONDecoder().decode(Bar.Data.self, from: .init(job.payload)) + Self.foo = payload.foo + return eventLoop.future() + } catch { + return eventLoop.makeFailedFuture(error) + } + } + } class DequeuedHook: JobEventDelegate { static var successHit = false + static var foo = "" func didDequeue(jobId: String, eventLoop: EventLoop) -> EventLoopFuture { Self.successHit = true return eventLoop.future() } + + func didDequeue(job: JobEventData, eventLoop: EventLoop) -> EventLoopFuture { + do { + let payload = try JSONDecoder().decode(Bar.Data.self, from: .init(job.payload)) + Self.foo = payload.foo + return eventLoop.future() + } catch { + return eventLoop.makeFailedFuture(error) + } + } } final class WorkerCountDriver: QueuesDriver {