From fbb2ef5e8c4db78ad7735a180754c9a8c784fa71 Mon Sep 17 00:00:00 2001 From: Gwynne Raskind Date: Sat, 3 Feb 2024 21:24:46 -0600 Subject: [PATCH] First pass at making Queues properly Sendable. Known to be incomplete. --- Sources/Queues/Application+Queues.swift | 2 +- Sources/Queues/Job.swift | 4 +- Sources/Queues/JobData.swift | 2 +- Sources/Queues/JobIdentifier.swift | 3 +- Sources/Queues/NotificationHook.swift | 4 +- Sources/Queues/Queue.swift | 2 +- Sources/Queues/QueueContext.swift | 2 +- Sources/Queues/QueueName.swift | 2 +- Sources/Queues/QueueWorker.swift | 11 ++- Sources/Queues/QueuesCommand.swift | 2 +- Sources/Queues/QueuesConfiguration.swift | 62 +++++++++--- Sources/Queues/ScheduleBuilder.swift | 3 +- Sources/Queues/ScheduledJob.swift | 6 +- Sources/XCTQueues/TestQueueDriver.swift | 2 +- Tests/QueuesTests/AsyncQueueTests.swift | 4 +- Tests/QueuesTests/QueueTests.swift | 117 ++++++++++++----------- 16 files changed, 133 insertions(+), 95 deletions(-) diff --git a/Sources/Queues/Application+Queues.swift b/Sources/Queues/Application+Queues.swift index f13d1d8..b6490bc 100644 --- a/Sources/Queues/Application+Queues.swift +++ b/Sources/Queues/Application+Queues.swift @@ -21,7 +21,7 @@ extension Application { } } - final class Storage { + final class Storage: @unchecked Sendable { public var configuration: QueuesConfiguration private (set) var commands: [QueuesCommand] var driver: (any QueuesDriver)? diff --git a/Sources/Queues/Job.swift b/Sources/Queues/Job.swift index dbd975d..238b0c4 100644 --- a/Sources/Queues/Job.swift +++ b/Sources/Queues/Job.swift @@ -6,7 +6,7 @@ import Vapor /// A task that can be queued for future execution. public protocol Job: AnyJob { /// The data associated with a job - associatedtype Payload + associatedtype Payload: Sendable /// Called when it's this Job's turn to be dequeued. /// - Parameters: @@ -101,7 +101,7 @@ extension Job { } /// A type-erased version of `Job` -public protocol AnyJob { +public protocol AnyJob: Sendable { /// The name of the `Job` static var name: String { get } func _dequeue(_ context: QueueContext, id: String, payload: [UInt8]) -> EventLoopFuture diff --git a/Sources/Queues/JobData.swift b/Sources/Queues/JobData.swift index ba228dd..96d3f31 100644 --- a/Sources/Queues/JobData.swift +++ b/Sources/Queues/JobData.swift @@ -1,7 +1,7 @@ import Foundation /// Holds information about the Job that is to be encoded to the persistence store. -public struct JobData: Codable { +public struct JobData: Codable, Sendable { /// The job data to be encoded. public let payload: [UInt8] diff --git a/Sources/Queues/JobIdentifier.swift b/Sources/Queues/JobIdentifier.swift index c811d14..3739220 100644 --- a/Sources/Queues/JobIdentifier.swift +++ b/Sources/Queues/JobIdentifier.swift @@ -1,8 +1,7 @@ import struct Foundation.UUID /// An identifier for a job -public struct JobIdentifier: Hashable, Equatable { - +public struct JobIdentifier: Hashable, Equatable, Sendable { /// The string value of the ID public let string: String diff --git a/Sources/Queues/NotificationHook.swift b/Sources/Queues/NotificationHook.swift index f4c7223..e7958c0 100644 --- a/Sources/Queues/NotificationHook.swift +++ b/Sources/Queues/NotificationHook.swift @@ -2,7 +2,7 @@ import NIOCore import Foundation /// Represents an object that can receive notifications about job statuses -public protocol JobEventDelegate { +public protocol JobEventDelegate: Sendable { /// Called when the job is first dispatched /// - Parameters: @@ -50,7 +50,7 @@ extension JobEventDelegate { } /// Data on a job sent via a notification -public struct JobEventData { +public struct JobEventData: Sendable { /// The id of the job, assigned at dispatch public var id: String diff --git a/Sources/Queues/Queue.swift b/Sources/Queues/Queue.swift index 9041dc8..9db4b20 100644 --- a/Sources/Queues/Queue.swift +++ b/Sources/Queues/Queue.swift @@ -3,7 +3,7 @@ import Logging import Foundation /// A type that can store and retrieve jobs from a persistence layer -public protocol Queue { +public protocol Queue: Sendable { /// The job context var context: QueueContext { get } diff --git a/Sources/Queues/QueueContext.swift b/Sources/Queues/QueueContext.swift index c5a706d..bad1db9 100644 --- a/Sources/Queues/QueueContext.swift +++ b/Sources/Queues/QueueContext.swift @@ -3,7 +3,7 @@ import NIOCore import Vapor /// The context for a queue. -public struct QueueContext { +public struct QueueContext: Sendable { /// The name of the queue public let queueName: QueueName diff --git a/Sources/Queues/QueueName.swift b/Sources/Queues/QueueName.swift index a062ca0..1941666 100644 --- a/Sources/Queues/QueueName.swift +++ b/Sources/Queues/QueueName.swift @@ -1,5 +1,5 @@ /// A specific queue that jobs are run on. -public struct QueueName { +public struct QueueName: Sendable { /// The default queue that jobs are run on public static let `default` = Self(string: "default") diff --git a/Sources/Queues/QueueWorker.swift b/Sources/Queues/QueueWorker.swift index f2bf7fb..a3d56a7 100644 --- a/Sources/Queues/QueueWorker.swift +++ b/Sources/Queues/QueueWorker.swift @@ -9,8 +9,8 @@ extension Queue { } /// The worker that runs the `Job` -public struct QueueWorker { - let queue: Queue +public struct QueueWorker: Sendable { + let queue: any Queue init(queue: any Queue) { self.queue = queue @@ -30,9 +30,10 @@ public struct QueueWorker { self.queue.logger.trace("Getting data for job \(id)") return self.queue.get(id).flatMap { data in - var logger = self.queue.logger - logger[metadataKey: "job_id"] = .string(id.string) - + var _logger = self.queue.logger + _logger[metadataKey: "job_id"] = .string(id.string) + let logger = _logger + logger.trace("Received job data for \(id): \(data)") // If the job has a delay, we must check to make sure we can execute. // If the delay has not passed yet, requeue the job diff --git a/Sources/Queues/QueuesCommand.swift b/Sources/Queues/QueuesCommand.swift index 613b404..ef6b39f 100644 --- a/Sources/Queues/QueuesCommand.swift +++ b/Sources/Queues/QueuesCommand.swift @@ -11,7 +11,7 @@ import Darwin.C #endif /// The command to start the Queue job -public final class QueuesCommand: Command { +public final class QueuesCommand: Command, @unchecked Sendable { /// See `Command.signature` public let signature = Signature() diff --git a/Sources/Queues/QueuesConfiguration.swift b/Sources/Queues/QueuesConfiguration.swift index d0e2670..fed5ca0 100644 --- a/Sources/Queues/QueuesConfiguration.swift +++ b/Sources/Queues/QueuesConfiguration.swift @@ -1,16 +1,37 @@ +import ConsoleKitTerminal import Logging import NIOCore +import NIOConcurrencyHelpers -/// A `Service` to configure `Queues`s -public struct QueuesConfiguration { +/// Configuration parameters for the Queues module as a whole. +public struct QueuesConfiguration: Sendable { + private struct DataBox: Sendable { + var refreshInterval: TimeAmount = .seconds(1) + var persistenceKey: String = "vapor_queues" + var workerCount: WorkerCount = .default + var userInfo: [AnySendableHashable: any Sendable] = [:] + + var jobs: [String: any AnyJob] = [:] + var scheduledJobs: [AnyScheduledJob] = [] + var notificationHooks: [any JobEventDelegate] = [] + } + + private let dataBox: NIOLockedValueBox = .init(.init()) + /// The number of seconds to wait before checking for the next job. Defaults to `1` - public var refreshInterval: TimeAmount + public var refreshInterval: TimeAmount { + get { self.dataBox.withLockedValue { $0.refreshInterval } } + set { self.dataBox.withLockedValue { $0.refreshInterval = newValue } } + } /// The key that stores the data about a job. Defaults to `vapor_queues` - public var persistenceKey: String + public var persistenceKey: String { + get { self.dataBox.withLockedValue { $0.persistenceKey } } + set { self.dataBox.withLockedValue { $0.persistenceKey = newValue } } + } /// Supported options for number of job handling workers. - public enum WorkerCount: ExpressibleByIntegerLiteral { + public enum WorkerCount: ExpressibleByIntegerLiteral, Sendable { /// One worker per event loop. case `default` @@ -24,17 +45,34 @@ public struct QueuesConfiguration { } /// Sets the number of workers used for handling jobs. - public var workerCount: WorkerCount + public var workerCount: WorkerCount { + get { self.dataBox.withLockedValue { $0.workerCount } } + set { self.dataBox.withLockedValue { $0.workerCount = newValue } } + } /// A logger public let logger: Logger // Arbitrary user info to be stored - public var userInfo: [AnyHashable: Any] + public var userInfo: [AnySendableHashable: any Sendable] { + get { self.dataBox.withLockedValue { $0.userInfo } } + set { self.dataBox.withLockedValue { $0.userInfo = newValue } } + } + + var jobs: [String: any AnyJob] { + get { self.dataBox.withLockedValue { $0.jobs } } + set { self.dataBox.withLockedValue { $0.jobs = newValue } } + } - var jobs: [String: AnyJob] - var scheduledJobs: [AnyScheduledJob] - var notificationHooks: [JobEventDelegate] + var scheduledJobs: [AnyScheduledJob] { + get { self.dataBox.withLockedValue { $0.scheduledJobs } } + set { self.dataBox.withLockedValue { $0.scheduledJobs = newValue } } + } + + var notificationHooks: [any JobEventDelegate] { + get { self.dataBox.withLockedValue { $0.notificationHooks } } + set { self.dataBox.withLockedValue { $0.notificationHooks = newValue } } + } /// Creates an empty `JobsConfig` public init( @@ -43,14 +81,10 @@ public struct QueuesConfiguration { workerCount: WorkerCount = .default, logger: Logger = .init(label: "codes.vapor.queues") ) { - self.jobs = [:] - self.scheduledJobs = [] self.logger = logger self.refreshInterval = refreshInterval self.persistenceKey = persistenceKey self.workerCount = workerCount - self.userInfo = [:] - self.notificationHooks = [] } /// Adds a new `Job` to the queue configuration. diff --git a/Sources/Queues/ScheduleBuilder.swift b/Sources/Queues/ScheduleBuilder.swift index ff7f404..c658fc0 100644 --- a/Sources/Queues/ScheduleBuilder.swift +++ b/Sources/Queues/ScheduleBuilder.swift @@ -1,8 +1,7 @@ import Foundation /// An object that can be used to build a scheduled job -public final class ScheduleBuilder { - +public final class ScheduleBuilder: @unchecked Sendable { /// Months of the year public enum Month: Int { case january = 1 diff --git a/Sources/Queues/ScheduledJob.swift b/Sources/Queues/ScheduledJob.swift index d44ecf0..a7f197d 100644 --- a/Sources/Queues/ScheduledJob.swift +++ b/Sources/Queues/ScheduledJob.swift @@ -3,7 +3,7 @@ import Foundation import Logging /// Describes a job that can be scheduled and repeated -public protocol ScheduledJob { +public protocol ScheduledJob: Sendable { var name: String { get } /// The method called when the job is run /// - Parameter context: A `JobContext` that can be used @@ -14,8 +14,8 @@ extension ScheduledJob { public var name: String { "\(Self.self)" } } -class AnyScheduledJob { - let job: ScheduledJob +final class AnyScheduledJob: Sendable { + let job: any ScheduledJob let scheduler: ScheduleBuilder init(job: any ScheduledJob, scheduler: ScheduleBuilder) { diff --git a/Sources/XCTQueues/TestQueueDriver.swift b/Sources/XCTQueues/TestQueueDriver.swift index 508ee97..ee822c0 100644 --- a/Sources/XCTQueues/TestQueueDriver.swift +++ b/Sources/XCTQueues/TestQueueDriver.swift @@ -29,7 +29,7 @@ struct TestQueuesDriver: QueuesDriver { } extension Application.Queues { - public final class TestQueueStorage { + public final class TestQueueStorage: @unchecked Sendable { public var jobs: [JobIdentifier: JobData] = [:] public var queue: [JobIdentifier] = [] diff --git a/Tests/QueuesTests/AsyncQueueTests.swift b/Tests/QueuesTests/AsyncQueueTests.swift index 61a376a..73ae532 100644 --- a/Tests/QueuesTests/AsyncQueueTests.swift +++ b/Tests/QueuesTests/AsyncQueueTests.swift @@ -12,8 +12,8 @@ final class AsyncQueueTests: XCTestCase { app.queues.add(MyAsyncJob(promise: promise)) app.get("foo") { req in - req.queue.dispatch(MyAsyncJob.self, .init(foo: "bar")) - .map { _ in "done" } + try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "bar")) + return "done" } try app.testable().test(.GET, "foo") { res in diff --git a/Tests/QueuesTests/QueueTests.swift b/Tests/QueuesTests/QueueTests.swift index 537fbb9..1eb1856 100644 --- a/Tests/QueuesTests/QueueTests.swift +++ b/Tests/QueuesTests/QueueTests.swift @@ -137,13 +137,14 @@ final class QueueTests: XCTestCase { let app = Application(.testing) defer { app.shutdown() } - XCTAssertEqual(TestingScheduledJob.count.load(ordering: .relaxed), 0) - app.queues.schedule(TestingScheduledJob()).everySecond() + let scheduledJob = TestingScheduledJob() + XCTAssertEqual(scheduledJob.count.load(ordering: .relaxed), 0) + app.queues.schedule(scheduledJob).everySecond() try app.queues.startScheduledJobs() - let promise = app.eventLoopGroup.next().makePromise(of: Void.self) - app.eventLoopGroup.next().scheduleTask(in: .seconds(5)) { () -> Void in - XCTAssert(TestingScheduledJob.count.load(ordering: .relaxed) > 4) + let promise = app.eventLoopGroup.any().makePromise(of: Void.self) + app.eventLoopGroup.any().scheduleTask(in: .seconds(5)) { () -> Void in + XCTAssert(scheduledJob.count.load(ordering: .relaxed) > 4) promise.succeed(()) } @@ -187,42 +188,44 @@ final class QueueTests: XCTestCase { app.queues.use(.test) let promise = app.eventLoopGroup.any().makePromise(of: String.self) + let successHook = SuccessHook() + let errorHook = ErrorHook() + let dispatchHook = DispatchHook() + let dequeuedHook = DequeuedHook() app.queues.add(Foo(promise: promise)) - app.queues.add(SuccessHook()) - app.queues.add(ErrorHook()) - app.queues.add(DispatchHook()) - app.queues.add(DequeuedHook()) - ErrorHook.errorCount = 0 - DequeuedHook.successHit = false + app.queues.add(successHook) + app.queues.add(errorHook) + app.queues.add(dispatchHook) + app.queues.add(dequeuedHook) app.get("foo") { req in req.queue.dispatch(Foo.self, .init(foo: "bar")) .map { _ in "done" } } - XCTAssertEqual(DispatchHook.successHit, false) + XCTAssertFalse(dispatchHook.successHit) try app.testable().test(.GET, "foo") { res in XCTAssertEqual(res.status, .ok) XCTAssertEqual(res.body.string, "done") - XCTAssertEqual(DispatchHook.successHit, true) + XCTAssertTrue(dispatchHook.successHit) } - XCTAssertEqual(SuccessHook.successHit, false) - XCTAssertEqual(ErrorHook.errorCount, 0) + XCTAssertFalse(successHook.successHit) + XCTAssertEqual(errorHook.errorCount, 0) XCTAssertEqual(app.queues.test.queue.count, 1) XCTAssertEqual(app.queues.test.jobs.count, 1) let job = app.queues.test.first(Foo.self) XCTAssert(app.queues.test.contains(Foo.self)) XCTAssertNotNil(job) XCTAssertEqual(job!.foo, "bar") - XCTAssertEqual(DequeuedHook.successHit, false) + XCTAssertFalse(dequeuedHook.successHit) try app.queues.queue.worker.run().wait() - XCTAssertEqual(SuccessHook.successHit, true) - XCTAssertEqual(ErrorHook.errorCount, 0) + XCTAssertTrue(successHook.successHit) + XCTAssertEqual(errorHook.errorCount, 0) XCTAssertEqual(app.queues.test.queue.count, 0) XCTAssertEqual(app.queues.test.jobs.count, 0) - XCTAssertEqual(DequeuedHook.successHit, true) + XCTAssertTrue(dequeuedHook.successHit) try XCTAssertEqual(promise.futureResult.wait(), "bar") } @@ -232,10 +235,11 @@ final class QueueTests: XCTestCase { defer { app.shutdown() } app.queues.use(.test) app.queues.add(Bar()) - app.queues.add(SuccessHook()) - app.queues.add(ErrorHook()) - ErrorHook.errorCount = 0 - + let successHook = SuccessHook() + let errorHook = ErrorHook() + app.queues.add(successHook) + app.queues.add(errorHook) + app.get("foo") { req in req.queue.dispatch(Bar.self, .init(foo: "bar"), maxRetryCount: 3) .map { _ in "done" } @@ -246,8 +250,8 @@ final class QueueTests: XCTestCase { XCTAssertEqual(res.body.string, "done") } - XCTAssertEqual(SuccessHook.successHit, false) - XCTAssertEqual(ErrorHook.errorCount, 0) + XCTAssertFalse(successHook.successHit) + XCTAssertEqual(errorHook.errorCount, 0) XCTAssertEqual(app.queues.test.queue.count, 1) XCTAssertEqual(app.queues.test.jobs.count, 1) let job = app.queues.test.first(Bar.self) @@ -255,8 +259,8 @@ final class QueueTests: XCTestCase { XCTAssertNotNil(job) try app.queues.queue.worker.run().wait() - XCTAssertEqual(SuccessHook.successHit, false) - XCTAssertEqual(ErrorHook.errorCount, 1) + XCTAssertFalse(successHook.successHit) + XCTAssertEqual(errorHook.errorCount, 1) XCTAssertEqual(app.queues.test.queue.count, 0) XCTAssertEqual(app.queues.test.jobs.count, 0) } @@ -266,9 +270,10 @@ final class QueueTests: XCTestCase { defer { app.shutdown() } app.queues.use(.test) app.queues.add(Baz()) - app.queues.add(SuccessHook()) - app.queues.add(ErrorHook()) - ErrorHook.errorCount = 0 + let successHook = SuccessHook() + let errorHook = ErrorHook() + app.queues.add(successHook) + app.queues.add(errorHook) app.get("foo") { req in req.queue.dispatch(Baz.self, .init(foo: "baz"), maxRetryCount: 1) @@ -280,8 +285,8 @@ final class QueueTests: XCTestCase { XCTAssertEqual(res.body.string, "done") } - XCTAssertEqual(SuccessHook.successHit, false) - XCTAssertEqual(ErrorHook.errorCount, 0) + XCTAssertFalse(successHook.successHit) + XCTAssertEqual(errorHook.errorCount, 0) XCTAssertEqual(app.queues.test.queue.count, 1) XCTAssertEqual(app.queues.test.jobs.count, 1) var job = app.queues.test.first(Baz.self) @@ -289,8 +294,8 @@ final class QueueTests: XCTestCase { XCTAssertNotNil(job) try app.queues.queue.worker.run().wait() - XCTAssertEqual(SuccessHook.successHit, false) - XCTAssertEqual(ErrorHook.errorCount, 0) + XCTAssertFalse(successHook.successHit) + XCTAssertEqual(errorHook.errorCount, 0) XCTAssertEqual(app.queues.test.queue.count, 1) XCTAssertEqual(app.queues.test.jobs.count, 1) job = app.queues.test.first(Baz.self) @@ -300,50 +305,50 @@ final class QueueTests: XCTestCase { sleep(1) try app.queues.queue.worker.run().wait() - XCTAssertEqual(SuccessHook.successHit, false) - XCTAssertEqual(ErrorHook.errorCount, 1) + XCTAssertFalse(successHook.successHit) + XCTAssertEqual(errorHook.errorCount, 1) XCTAssertEqual(app.queues.test.queue.count, 0) XCTAssertEqual(app.queues.test.jobs.count, 0) } } -class DispatchHook: JobEventDelegate { - static var successHit = false +final class DispatchHook: JobEventDelegate, @unchecked Sendable { + var successHit = false - func dispatched(job: JobEventData, eventLoop: EventLoop) -> EventLoopFuture { - Self.successHit = true + func dispatched(job: JobEventData, eventLoop: any EventLoop) -> EventLoopFuture { + self.successHit = true return eventLoop.future() } } -class SuccessHook: JobEventDelegate { - static var successHit = false +final class SuccessHook: JobEventDelegate, @unchecked Sendable { + var successHit = false - func success(jobId: String, eventLoop: EventLoop) -> EventLoopFuture { - Self.successHit = true + func success(jobId: String, eventLoop: any EventLoop) -> EventLoopFuture { + self.successHit = true return eventLoop.future() } } -class ErrorHook: JobEventDelegate { - static var errorCount = 0 +final class ErrorHook: JobEventDelegate, @unchecked Sendable { + var errorCount = 0 - func error(jobId: String, error: Error, eventLoop: EventLoop) -> EventLoopFuture { - Self.errorCount += 1 + func error(jobId: String, error: any Error, eventLoop: any EventLoop) -> EventLoopFuture { + self.errorCount += 1 return eventLoop.future() } } -class DequeuedHook: JobEventDelegate { - static var successHit = false +final class DequeuedHook: JobEventDelegate, @unchecked Sendable { + var successHit = false - func didDequeue(jobId: String, eventLoop: EventLoop) -> EventLoopFuture { - Self.successHit = true + func didDequeue(jobId: String, eventLoop: any EventLoop) -> EventLoopFuture { + self.successHit = true return eventLoop.future() } } -final class WorkerCountDriver: QueuesDriver { +final class WorkerCountDriver: QueuesDriver, @unchecked Sendable { let count: EventLoopPromise let lock: NIOLock var recordedEventLoops: Set @@ -408,17 +413,17 @@ struct FailingScheduledJob: ScheduledJob { } struct TestingScheduledJob: ScheduledJob { - static var count = ManagedAtomic(0) + var count = ManagedAtomic(0) func run(context: QueueContext) -> EventLoopFuture { - TestingScheduledJob.count.wrappingIncrement(ordering: .relaxed) + self.count.wrappingIncrement(ordering: .relaxed) return context.eventLoop.future() } } extension ByteBuffer { var string: String { - return .init(decoding: self.readableBytesView, as: UTF8.self) + .init(decoding: self.readableBytesView, as: UTF8.self) } }