Skip to content

Commit

Permalink
First pass at making Queues properly Sendable. Known to be incomplete.
Browse files Browse the repository at this point in the history
  • Loading branch information
gwynne committed Feb 4, 2024
1 parent 1a66209 commit fbb2ef5
Show file tree
Hide file tree
Showing 16 changed files with 133 additions and 95 deletions.
2 changes: 1 addition & 1 deletion Sources/Queues/Application+Queues.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ extension Application {
}
}

final class Storage {
final class Storage: @unchecked Sendable {
public var configuration: QueuesConfiguration
private (set) var commands: [QueuesCommand]
var driver: (any QueuesDriver)?
Expand Down
4 changes: 2 additions & 2 deletions Sources/Queues/Job.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import Vapor
/// A task that can be queued for future execution.
public protocol Job: AnyJob {
/// The data associated with a job
associatedtype Payload
associatedtype Payload: Sendable

/// Called when it's this Job's turn to be dequeued.
/// - Parameters:
Expand Down Expand Up @@ -101,7 +101,7 @@ extension Job {
}

/// A type-erased version of `Job`
public protocol AnyJob {
public protocol AnyJob: Sendable {
/// The name of the `Job`
static var name: String { get }
func _dequeue(_ context: QueueContext, id: String, payload: [UInt8]) -> EventLoopFuture<Void>
Expand Down
2 changes: 1 addition & 1 deletion Sources/Queues/JobData.swift
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import Foundation

/// Holds information about the Job that is to be encoded to the persistence store.
public struct JobData: Codable {
public struct JobData: Codable, Sendable {
/// The job data to be encoded.
public let payload: [UInt8]

Expand Down
3 changes: 1 addition & 2 deletions Sources/Queues/JobIdentifier.swift
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import struct Foundation.UUID

/// An identifier for a job
public struct JobIdentifier: Hashable, Equatable {

public struct JobIdentifier: Hashable, Equatable, Sendable {
/// The string value of the ID
public let string: String

Expand Down
4 changes: 2 additions & 2 deletions Sources/Queues/NotificationHook.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import NIOCore
import Foundation

/// Represents an object that can receive notifications about job statuses
public protocol JobEventDelegate {
public protocol JobEventDelegate: Sendable {

/// Called when the job is first dispatched
/// - Parameters:
Expand Down Expand Up @@ -50,7 +50,7 @@ extension JobEventDelegate {
}

/// Data on a job sent via a notification
public struct JobEventData {
public struct JobEventData: Sendable {
/// The id of the job, assigned at dispatch
public var id: String

Expand Down
2 changes: 1 addition & 1 deletion Sources/Queues/Queue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import Logging
import Foundation

/// A type that can store and retrieve jobs from a persistence layer
public protocol Queue {
public protocol Queue: Sendable {
/// The job context
var context: QueueContext { get }

Expand Down
2 changes: 1 addition & 1 deletion Sources/Queues/QueueContext.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import NIOCore
import Vapor

/// The context for a queue.
public struct QueueContext {
public struct QueueContext: Sendable {
/// The name of the queue
public let queueName: QueueName

Expand Down
2 changes: 1 addition & 1 deletion Sources/Queues/QueueName.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/// A specific queue that jobs are run on.
public struct QueueName {
public struct QueueName: Sendable {
/// The default queue that jobs are run on
public static let `default` = Self(string: "default")

Expand Down
11 changes: 6 additions & 5 deletions Sources/Queues/QueueWorker.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ extension Queue {
}

/// The worker that runs the `Job`
public struct QueueWorker {
let queue: Queue
public struct QueueWorker: Sendable {
let queue: any Queue

init(queue: any Queue) {
self.queue = queue
Expand All @@ -30,9 +30,10 @@ public struct QueueWorker {
self.queue.logger.trace("Getting data for job \(id)")

return self.queue.get(id).flatMap { data in
var logger = self.queue.logger
logger[metadataKey: "job_id"] = .string(id.string)

var _logger = self.queue.logger
_logger[metadataKey: "job_id"] = .string(id.string)
let logger = _logger

logger.trace("Received job data for \(id): \(data)")
// If the job has a delay, we must check to make sure we can execute.
// If the delay has not passed yet, requeue the job
Expand Down
2 changes: 1 addition & 1 deletion Sources/Queues/QueuesCommand.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import Darwin.C
#endif

/// The command to start the Queue job
public final class QueuesCommand: Command {
public final class QueuesCommand: Command, @unchecked Sendable {
/// See `Command.signature`
public let signature = Signature()

Expand Down
62 changes: 48 additions & 14 deletions Sources/Queues/QueuesConfiguration.swift
Original file line number Diff line number Diff line change
@@ -1,16 +1,37 @@
import ConsoleKitTerminal
import Logging
import NIOCore
import NIOConcurrencyHelpers

/// A `Service` to configure `Queues`s
public struct QueuesConfiguration {
/// Configuration parameters for the Queues module as a whole.
public struct QueuesConfiguration: Sendable {
private struct DataBox: Sendable {
var refreshInterval: TimeAmount = .seconds(1)
var persistenceKey: String = "vapor_queues"
var workerCount: WorkerCount = .default
var userInfo: [AnySendableHashable: any Sendable] = [:]

var jobs: [String: any AnyJob] = [:]
var scheduledJobs: [AnyScheduledJob] = []
var notificationHooks: [any JobEventDelegate] = []
}

private let dataBox: NIOLockedValueBox<DataBox> = .init(.init())

/// The number of seconds to wait before checking for the next job. Defaults to `1`
public var refreshInterval: TimeAmount
public var refreshInterval: TimeAmount {
get { self.dataBox.withLockedValue { $0.refreshInterval } }
set { self.dataBox.withLockedValue { $0.refreshInterval = newValue } }
}

/// The key that stores the data about a job. Defaults to `vapor_queues`
public var persistenceKey: String
public var persistenceKey: String {
get { self.dataBox.withLockedValue { $0.persistenceKey } }
set { self.dataBox.withLockedValue { $0.persistenceKey = newValue } }
}

/// Supported options for number of job handling workers.
public enum WorkerCount: ExpressibleByIntegerLiteral {
public enum WorkerCount: ExpressibleByIntegerLiteral, Sendable {
/// One worker per event loop.
case `default`

Expand All @@ -24,17 +45,34 @@ public struct QueuesConfiguration {
}

/// Sets the number of workers used for handling jobs.
public var workerCount: WorkerCount
public var workerCount: WorkerCount {
get { self.dataBox.withLockedValue { $0.workerCount } }
set { self.dataBox.withLockedValue { $0.workerCount = newValue } }
}

/// A logger
public let logger: Logger

// Arbitrary user info to be stored
public var userInfo: [AnyHashable: Any]
public var userInfo: [AnySendableHashable: any Sendable] {
get { self.dataBox.withLockedValue { $0.userInfo } }
set { self.dataBox.withLockedValue { $0.userInfo = newValue } }
}

var jobs: [String: any AnyJob] {
get { self.dataBox.withLockedValue { $0.jobs } }
set { self.dataBox.withLockedValue { $0.jobs = newValue } }
}

var jobs: [String: AnyJob]
var scheduledJobs: [AnyScheduledJob]
var notificationHooks: [JobEventDelegate]
var scheduledJobs: [AnyScheduledJob] {
get { self.dataBox.withLockedValue { $0.scheduledJobs } }
set { self.dataBox.withLockedValue { $0.scheduledJobs = newValue } }
}

var notificationHooks: [any JobEventDelegate] {
get { self.dataBox.withLockedValue { $0.notificationHooks } }
set { self.dataBox.withLockedValue { $0.notificationHooks = newValue } }
}

/// Creates an empty `JobsConfig`
public init(
Expand All @@ -43,14 +81,10 @@ public struct QueuesConfiguration {
workerCount: WorkerCount = .default,
logger: Logger = .init(label: "codes.vapor.queues")
) {
self.jobs = [:]
self.scheduledJobs = []
self.logger = logger
self.refreshInterval = refreshInterval
self.persistenceKey = persistenceKey
self.workerCount = workerCount
self.userInfo = [:]
self.notificationHooks = []
}

/// Adds a new `Job` to the queue configuration.
Expand Down
3 changes: 1 addition & 2 deletions Sources/Queues/ScheduleBuilder.swift
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import Foundation

/// An object that can be used to build a scheduled job
public final class ScheduleBuilder {

public final class ScheduleBuilder: @unchecked Sendable {
/// Months of the year
public enum Month: Int {
case january = 1
Expand Down
6 changes: 3 additions & 3 deletions Sources/Queues/ScheduledJob.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import Foundation
import Logging

/// Describes a job that can be scheduled and repeated
public protocol ScheduledJob {
public protocol ScheduledJob: Sendable {
var name: String { get }
/// The method called when the job is run
/// - Parameter context: A `JobContext` that can be used
Expand All @@ -14,8 +14,8 @@ extension ScheduledJob {
public var name: String { "\(Self.self)" }
}

class AnyScheduledJob {
let job: ScheduledJob
final class AnyScheduledJob: Sendable {
let job: any ScheduledJob
let scheduler: ScheduleBuilder

init(job: any ScheduledJob, scheduler: ScheduleBuilder) {
Expand Down
2 changes: 1 addition & 1 deletion Sources/XCTQueues/TestQueueDriver.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ struct TestQueuesDriver: QueuesDriver {
}

extension Application.Queues {
public final class TestQueueStorage {
public final class TestQueueStorage: @unchecked Sendable {
public var jobs: [JobIdentifier: JobData] = [:]
public var queue: [JobIdentifier] = []

Expand Down
4 changes: 2 additions & 2 deletions Tests/QueuesTests/AsyncQueueTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ final class AsyncQueueTests: XCTestCase {
app.queues.add(MyAsyncJob(promise: promise))

app.get("foo") { req in
req.queue.dispatch(MyAsyncJob.self, .init(foo: "bar"))
.map { _ in "done" }
try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "bar"))
return "done"
}

try app.testable().test(.GET, "foo") { res in
Expand Down
Loading

0 comments on commit fbb2ef5

Please sign in to comment.