Skip to content

Commit

Permalink
Add optional didDequeue, success and error notification hook me…
Browse files Browse the repository at this point in the history
…thods (delegate) and their async versions, to pass the `JobEventData` to the delegate
  • Loading branch information
hsharghi committed Aug 26, 2023
1 parent 0511901 commit a5c4452
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 7 deletions.
37 changes: 37 additions & 0 deletions Sources/Queues/AsyncJobEventDelegate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,42 @@ public protocol AsyncJobEventDelegate: JobEventDelegate {
/// - jobId: The id of the Job
func didDequeue(jobId: String) async throws

/// Called when the job is dequeued
/// - Parameters:
/// - job: The `JobData` associated with the job
func didDequeue(job: JobEventData) async throws

/// Called when the job succeeds
/// - Parameters:
/// - jobId: The id of the Job
func success(jobId: String) async throws

/// Called when the job succeeds
/// - Parameters:
/// - job: The `JobData` associated with the job
func success(job: JobEventData) async throws

/// Called when the job returns an error
/// - Parameters:
/// - jobId: The id of the Job
/// - error: The error that caused the job to fail
func error(jobId: String, error: Error) async throws

/// Called when the job returns an error
/// - Parameters:
/// - job: The `JobData` associated with the job
/// - error: The error that caused the job to fail
func error(job: JobEventData, error: Error) async throws
}

extension AsyncJobEventDelegate {
public func dispatched(job: JobEventData) async throws { }
public func didDequeue(jobId: String) async throws { }
public func didDequeue(job: JobEventData) async throws { }
public func success(jobId: String) async throws { }
public func success(job: JobEventData) async throws { }
public func error(jobId: String, error: Error) async throws { }
public func error(job: JobEventData) async throws { }

public func dispatched(job: JobEventData, eventLoop: EventLoop) -> EventLoopFuture<Void> {
eventLoop.makeFutureWithTask {
Expand All @@ -41,16 +60,34 @@ extension AsyncJobEventDelegate {
try await self.didDequeue(jobId: jobId)
}
}

public func didDequeue(job: JobEventData, eventLoop: EventLoop) -> EventLoopFuture<Void> {
eventLoop.makeFutureWithTask {
try await self.didDequeue(job: job)
}
}

public func success(jobId: String, eventLoop: EventLoop) -> EventLoopFuture<Void> {
eventLoop.makeFutureWithTask {
try await self.success(jobId: jobId)
}
}

public func success(job: JobEventData, eventLoop: EventLoop) -> EventLoopFuture<Void> {
eventLoop.makeFutureWithTask {
try await self.success(job: job)
}
}

public func error(jobId: String, error: Error, eventLoop: EventLoop) -> EventLoopFuture<Void> {
eventLoop.makeFutureWithTask {
try await self.error(jobId: jobId, error: error)
}
}

public func error(job: JobEventData, error: Error, eventLoop: EventLoop) -> EventLoopFuture<Void> {
eventLoop.makeFutureWithTask {
try await self.error(job: job, error: error)
}
}
}
32 changes: 32 additions & 0 deletions Sources/Queues/NotificationHook.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,39 @@ public protocol JobEventDelegate {
/// - eventLoop: The eventLoop
func didDequeue(jobId: String, eventLoop: EventLoop) -> EventLoopFuture<Void>

/// Called when the job is dequeued
/// - Parameters:
/// - job: The `JobData` associated with the job
/// - eventLoop: The eventLoop
func didDequeue(job: JobEventData, eventLoop: EventLoop) -> EventLoopFuture<Void>


/// Called when the job succeeds
/// - Parameters:
/// - jobId: The id of the Job
/// - eventLoop: The eventLoop
func success(jobId: String, eventLoop: EventLoop) -> EventLoopFuture<Void>

/// Called when the job succeeds
/// - Parameters:
/// - job: The `JobData` associated with the job
/// - eventLoop: The eventLoop
func success(job: JobEventData, eventLoop: EventLoop) -> EventLoopFuture<Void>


/// Called when the job returns an error
/// - Parameters:
/// - jobId: The id of the Job
/// - error: The error that caused the job to fail
/// - eventLoop: The eventLoop
func error(jobId: String, error: Error, eventLoop: EventLoop) -> EventLoopFuture<Void>

/// Called when the job returns an error
/// - Parameters:
/// - job: The `JobData` associated with the job
/// - error: The error that caused the job to fail
/// - eventLoop: The eventLoop
func error(job: JobEventData, error: Error, eventLoop: EventLoop) -> EventLoopFuture<Void>
}

extension JobEventDelegate {
Expand All @@ -40,13 +60,25 @@ extension JobEventDelegate {
eventLoop.future()
}

public func didDequeue(job: JobEventData, eventLoop: EventLoop) -> EventLoopFuture<Void> {
eventLoop.future()
}

public func success(jobId: String, eventLoop: EventLoop) -> EventLoopFuture<Void> {
eventLoop.future()
}

public func success(job: JobEventData, eventLoop: EventLoop) -> EventLoopFuture<Void> {
eventLoop.future()
}

public func error(jobId: String, error: Error, eventLoop: EventLoop) -> EventLoopFuture<Void> {
eventLoop.future()
}

public func error(job: JobEventData, error: Error, eventLoop: EventLoop) -> EventLoopFuture<Void> {
eventLoop.future()
}
}

/// Data on a job sent via a notification
Expand Down
18 changes: 12 additions & 6 deletions Sources/Queues/QueueWorker.swift
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ public struct QueueWorker {

logger.trace("Sending dequeued notification hooks")

return self.queue.configuration.notificationHooks.map {
$0.didDequeue(jobId: id.string, eventLoop: self.queue.eventLoop)
return self.queue.configuration.notificationHooks.map { hook in
hook.didDequeue(jobId: id.string, eventLoop: self.queue.eventLoop).flatMap {
hook.didDequeue(job: .init(id: id.string, queueName: self.queue.queueName.string, jobData: data), eventLoop: self.queue.eventLoop)
}
}.flatten(on: self.queue.eventLoop).flatMapError { error in
logger.error("Could not send didDequeue notification: \(error)")
return self.queue.eventLoop.future()
Expand Down Expand Up @@ -90,8 +92,10 @@ public struct QueueWorker {
return futureJob.flatMap { complete in
logger.trace("Ran job successfully")
logger.trace("Sending success notification hooks")
return self.queue.configuration.notificationHooks.map {
$0.success(jobId: id.string, eventLoop: self.queue.context.eventLoop)
return self.queue.configuration.notificationHooks.map { hook in
hook.success(jobId: id.string, eventLoop: self.queue.context.eventLoop).flatMap {
hook.success(job: .init(id: id.string, queueName: self.queue.queueName.string, jobData: jobData), eventLoop: self.queue.context.eventLoop)
}
}.flatten(on: self.queue.context.eventLoop).flatMapError { error in
self.queue.logger.error("Could not send success notification: \(error)")
return self.queue.context.eventLoop.future()
Expand All @@ -110,8 +114,10 @@ public struct QueueWorker {

logger.trace("Sending failure notification hooks")
return job._error(self.queue.context, id: id.string, error, payload: payload).flatMap { _ in
return self.queue.configuration.notificationHooks.map {
$0.error(jobId: id.string, error: error, eventLoop: self.queue.context.eventLoop)
return self.queue.configuration.notificationHooks.map { hook in
hook.error(jobId: id.string, error: error, eventLoop: self.queue.context.eventLoop).flatMap {
hook.error(job: .init(id: id.string, queueName: self.queue.queueName.string, jobData: jobData), error: error, eventLoop: self.queue.context.eventLoop)
}
}.flatten(on: self.queue.context.eventLoop).flatMapError { error in
self.queue.logger.error("Failed to send error notification: \(error)")
return self.queue.context.eventLoop.future()
Expand Down
41 changes: 40 additions & 1 deletion Tests/QueuesTests/QueueTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ final class QueueTests: XCTestCase {
app.queues.add(DequeuedHook())
ErrorHook.errorCount = 0
DequeuedHook.successHit = false
DequeuedHook.foo = ""

app.get("foo") { req in
req.queue.dispatch(Foo.self, .init(foo: "bar"))
Expand All @@ -223,10 +224,12 @@ final class QueueTests: XCTestCase {

try app.queues.queue.worker.run().wait()
XCTAssertEqual(SuccessHook.successHit, true)
XCTAssertEqual(SuccessHook.foo, "bar")
XCTAssertEqual(ErrorHook.errorCount, 0)
XCTAssertEqual(app.queues.test.queue.count, 0)
XCTAssertEqual(app.queues.test.jobs.count, 0)
XCTAssertEqual(DequeuedHook.successHit, true)
XCTAssertEqual(DequeuedHook.foo, "bar")

try XCTAssertEqual(promise.futureResult.wait(), "bar")
}
Expand All @@ -239,6 +242,7 @@ final class QueueTests: XCTestCase {
app.queues.add(SuccessHook())
app.queues.add(ErrorHook())
ErrorHook.errorCount = 0
ErrorHook.foo = ""

app.get("foo") { req in
req.queue.dispatch(Bar.self, .init(foo: "bar"), maxRetryCount: 3)
Expand All @@ -261,6 +265,7 @@ final class QueueTests: XCTestCase {
try app.queues.queue.worker.run().wait()
XCTAssertEqual(SuccessHook.successHit, false)
XCTAssertEqual(ErrorHook.errorCount, 1)
XCTAssertEqual(ErrorHook.foo, "bar")
XCTAssertEqual(app.queues.test.queue.count, 0)
XCTAssertEqual(app.queues.test.jobs.count, 0)
}
Expand Down Expand Up @@ -322,29 +327,63 @@ class DispatchHook: JobEventDelegate {

class SuccessHook: JobEventDelegate {
static var successHit = false

static var foo = ""

func success(jobId: String, eventLoop: EventLoop) -> EventLoopFuture<Void> {
Self.successHit = true
return eventLoop.future()
}

func success(job: JobEventData, eventLoop: EventLoop) -> EventLoopFuture<Void> {
do {
let payload = try JSONDecoder().decode(Foo.Data.self, from: .init(job.payload))
Self.foo = payload.foo
return eventLoop.future()
} catch {
return eventLoop.makeFailedFuture(error)
}
}
}

class ErrorHook: JobEventDelegate {
static var errorCount = 0
static var foo = ""

func error(jobId: String, error: Error, eventLoop: EventLoop) -> EventLoopFuture<Void> {
Self.errorCount += 1
return eventLoop.future()
}

func error(job: JobEventData, error: Error, eventLoop: EventLoop) -> EventLoopFuture<Void> {
do {
let payload = try JSONDecoder().decode(Bar.Data.self, from: .init(job.payload))
Self.foo = payload.foo
return eventLoop.future()
} catch {
return eventLoop.makeFailedFuture(error)
}
}

}

class DequeuedHook: JobEventDelegate {
static var successHit = false
static var foo = ""

func didDequeue(jobId: String, eventLoop: EventLoop) -> EventLoopFuture<Void> {
Self.successHit = true
return eventLoop.future()
}

func didDequeue(job: JobEventData, eventLoop: EventLoop) -> EventLoopFuture<Void> {
do {
let payload = try JSONDecoder().decode(Bar.Data.self, from: .init(job.payload))
Self.foo = payload.foo
return eventLoop.future()
} catch {
return eventLoop.makeFailedFuture(error)
}
}
}

final class WorkerCountDriver: QueuesDriver {
Expand Down

0 comments on commit a5c4452

Please sign in to comment.