Skip to content

Commit

Permalink
Allow delaying retires of failed job (#101)
Browse files Browse the repository at this point in the history
* Add nextRetryIn to job for delaying retries

* Small improvements to docs and test

* Jobs with retry delay 0 are pushed back to the queue

* Clear jobs data before pushing data for retry

* Fix indentation
  • Loading branch information
kacperk authored Aug 23, 2021
1 parent 1762fa0 commit a0b96a5
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 14 deletions.
20 changes: 19 additions & 1 deletion Sources/Queues/Job.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ public protocol Job: AnyJob {
_ error: Error,
_ payload: Payload
) -> EventLoopFuture<Void>

/// Called when there was an error and job will be retired.
///
/// - Parameters:
/// - attempt: Number of job attempts which failed
/// - Returns: Number of seconds for which next retry will be delayed.
/// Return `-1` if you want to retry job immediately without putting it back to the queue.
func nextRetryIn(attempt: Int) -> Int

static func serializePayload(_ payload: Payload) throws -> [UInt8]
static func parsePayload(_ bytes: [UInt8]) throws -> Payload
Expand Down Expand Up @@ -60,7 +68,16 @@ extension Job {
) -> EventLoopFuture<Void> {
context.eventLoop.makeSucceededFuture(())
}


/// See `Job`.`nextRetryIn`
public func nextRetryIn(attempt: Int) -> Int {
return -1
}

public func _nextRetryIn(attempt: Int) -> Int {
return nextRetryIn(attempt: attempt)
}

public func _error(_ context: QueueContext, id: String, _ error: Error, payload: [UInt8]) -> EventLoopFuture<Void> {
var contextCopy = context
contextCopy.logger[metadataKey: "job_id"] = .string(id)
Expand Down Expand Up @@ -88,4 +105,5 @@ public protocol AnyJob {
static var name: String { get }
func _dequeue(_ context: QueueContext, id: String, payload: [UInt8]) -> EventLoopFuture<Void>
func _error(_ context: QueueContext, id: String, _ error: Error, payload: [UInt8]) -> EventLoopFuture<Void>
func _nextRetryIn(attempt: Int) -> Int
}
7 changes: 6 additions & 1 deletion Sources/Queues/JobData.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ public struct JobData: Codable {

/// The maxRetryCount for the `Job`.
public let maxRetryCount: Int

/// The number of attempts made to run the `Job`.
public let attempts: Int?

/// A date to execute this job after
public let delayUntil: Date?
Expand All @@ -21,12 +24,14 @@ public struct JobData: Codable {
maxRetryCount: Int,
jobName: String,
delayUntil: Date?,
queuedAt: Date
queuedAt: Date,
attempts: Int = 0
) {
self.payload = payload
self.maxRetryCount = maxRetryCount
self.jobName = jobName
self.delayUntil = delayUntil
self.queuedAt = queuedAt
self.attempts = attempts
}
}
77 changes: 65 additions & 12 deletions Sources/Queues/QueueWorker.swift
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,9 @@ public struct QueueWorker {
payload: data.payload,
logger: logger,
remainingTries: data.maxRetryCount,
attempts: data.attempts,
jobData: data
).flatMap {
logger.trace("Job done being run")
return self.queue.clear(id)
}
)
}
}
}
Expand All @@ -80,6 +78,7 @@ public struct QueueWorker {
payload: [UInt8],
logger: Logger,
remainingTries: Int,
attempts: Int?,
jobData: JobData
) -> EventLoopFuture<Void> {
logger.trace("Running the queue job (remaining tries: \(remainingTries)")
Expand All @@ -92,6 +91,9 @@ public struct QueueWorker {
}.flatten(on: self.queue.context.eventLoop).flatMapError { error in
self.queue.logger.error("Could not send success notification: \(error)")
return self.queue.context.eventLoop.future()
}.flatMap {
logger.trace("Job done being run")
return self.queue.clear(id)
}
}.flatMapError { error in
logger.trace("Job failed (remaining tries: \(remainingTries)")
Expand All @@ -109,24 +111,75 @@ public struct QueueWorker {
}.flatten(on: self.queue.context.eventLoop).flatMapError { error in
self.queue.logger.error("Failed to send error notification: \(error)")
return self.queue.context.eventLoop.future()
}.flatMap {
logger.trace("Job done being run")
return self.queue.clear(id)
}
}
} else {
logger.error("Job failed, retrying... \(error)", metadata: [
"job_id": .string(id.string),
"job_name": .string(name),
"queue": .string(self.queue.queueName.string)
])
return self.run(
return self.retry(
id: id,
name: name,
job: job,
payload: payload,
logger: logger,
remainingTries: remainingTries - 1,
jobData: jobData
remainingTries: remainingTries,
attempts: attempts,
jobData: jobData,
error: error
)
}
}
}

private func retry(
id: JobIdentifier,
name: String,
job: AnyJob,
payload: [UInt8],
logger: Logger,
remainingTries: Int,
attempts: Int?,
jobData: JobData,
error: Error
) -> EventLoopFuture<Void> {
let attempts = attempts ?? 0
let delayInSeconds = job._nextRetryIn(attempt: attempts + 1)
if delayInSeconds == -1 {
logger.error("Job failed, retrying... \(error)", metadata: [
"job_id": .string(id.string),
"job_name": .string(name),
"queue": .string(self.queue.queueName.string)
])
return self.run(
id: id,
name: name,
job: job,
payload: payload,
logger: logger,
remainingTries: remainingTries - 1,
attempts: attempts + 1,
jobData: jobData
)
} else {
logger.error("Job failed, retrying in \(delayInSeconds)s... \(error)", metadata: [
"job_id": .string(id.string),
"job_name": .string(name),
"queue": .string(self.queue.queueName.string)
])
let storage = JobData(
payload: jobData.payload,
maxRetryCount: remainingTries - 1,
jobName: jobData.jobName,
delayUntil: Date(timeIntervalSinceNow: Double(delayInSeconds)),
queuedAt: jobData.queuedAt,
attempts: attempts + 1
)
return self.queue.clear(id).flatMap {
self.queue.set(id, to: storage)
}.flatMap {
self.queue.push(id)
}
}
}
}
65 changes: 65 additions & 0 deletions Tests/QueuesTests/QueueTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ final class QueueTests: XCTestCase {
app.queues.add(Bar())
app.queues.add(SuccessHook())
app.queues.add(ErrorHook())
ErrorHook.errorCount = 0

app.get("foo") { req in
req.queue.dispatch(Bar.self, .init(foo: "bar"), maxRetryCount: 3)
Expand All @@ -259,6 +260,51 @@ final class QueueTests: XCTestCase {
XCTAssertEqual(app.queues.test.queue.count, 0)
XCTAssertEqual(app.queues.test.jobs.count, 0)
}

func testFailureHooksWithDelay() throws {
let app = Application(.testing)
defer { app.shutdown() }
app.queues.use(.test)
app.queues.add(Baz())
app.queues.add(SuccessHook())
app.queues.add(ErrorHook())
ErrorHook.errorCount = 0

app.get("foo") { req in
req.queue.dispatch(Baz.self, .init(foo: "baz"), maxRetryCount: 1)
.map { _ in "done" }
}

try app.testable().test(.GET, "foo") { res in
XCTAssertEqual(res.status, .ok)
XCTAssertEqual(res.body.string, "done")
}

XCTAssertEqual(SuccessHook.successHit, false)
XCTAssertEqual(ErrorHook.errorCount, 0)
XCTAssertEqual(app.queues.test.queue.count, 1)
XCTAssertEqual(app.queues.test.jobs.count, 1)
var job = app.queues.test.first(Baz.self)
XCTAssert(app.queues.test.contains(Baz.self))
XCTAssertNotNil(job)

try app.queues.queue.worker.run().wait()
XCTAssertEqual(SuccessHook.successHit, false)
XCTAssertEqual(ErrorHook.errorCount, 0)
XCTAssertEqual(app.queues.test.queue.count, 1)
XCTAssertEqual(app.queues.test.jobs.count, 1)
job = app.queues.test.first(Baz.self)
XCTAssert(app.queues.test.contains(Baz.self))
XCTAssertNotNil(job)

sleep(1)

try app.queues.queue.worker.run().wait()
XCTAssertEqual(SuccessHook.successHit, false)
XCTAssertEqual(ErrorHook.errorCount, 1)
XCTAssertEqual(app.queues.test.queue.count, 0)
XCTAssertEqual(app.queues.test.jobs.count, 0)
}
}

class DispatchHook: JobEventDelegate {
Expand Down Expand Up @@ -408,3 +454,22 @@ struct Bar: Job {
return context.eventLoop.makeSucceededFuture(())
}
}

struct Baz: Job {
struct Data: Codable {
var foo: String
}

func dequeue(_ context: QueueContext, _ data: Data) -> EventLoopFuture<Void> {
return context.eventLoop.makeFailedFuture(Abort(.badRequest))
}

func error(_ context: QueueContext, _ error: Error, _ data: Data) -> EventLoopFuture<Void> {
return context.eventLoop.makeSucceededFuture(())
}

func nextRetryIn(attempt: Int) -> Int {
return attempt
}
}

0 comments on commit a0b96a5

Please sign in to comment.