Skip to content

Commit

Permalink
async -> enqueue. await -> enqueueAndWait (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfed authored Feb 14, 2023
1 parent 8b6a898 commit 1ecda35
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 119 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func testFIFOQueueOrdering() async {
actor Counter {
nonisolated
func incrementAndAssertCountEquals(_ expectedCount: Int) {
queue.async {
queue.enqueue {
await self.increment()
let incrementedCount = await self.count
XCTAssertEqual(incrementedCount, expectedCount) // always succeeds
Expand All @@ -62,7 +62,7 @@ func testFIFOQueueOrdering() async {

nonisolated
func flushQueue() async {
await queue.await { }
await queue.enqueueAndWait { }
}

func increment() {
Expand Down Expand Up @@ -104,15 +104,15 @@ func testActorQueueOrdering() async {

nonisolated
func incrementAndAssertCountEquals(_ expectedCount: Int) {
queue.async { myself in
queue.enqueue { myself in
myself.count += 1
XCTAssertEqual(expectedCount, myself.count) // always succeeds
}
}

nonisolated
func flushQueue() async {
await queue.await { _ in }
await queue.enqueueAndWait { _ in }
}

private var count = 0
Expand Down
12 changes: 6 additions & 6 deletions Sources/AsyncQueue/ActorQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@
///
/// nonisolated
/// public func log(_ message: String) {
/// queue.async { myself in
/// queue.enqueue { myself in
/// myself.logs.append(message)
/// }
/// }
///
/// nonisolated
/// public func retrieveLogs() async -> [String] {
/// await queue.await { myself in myself.logs }
/// await queue.enqueueAndWait { myself in myself.logs }
/// }
///
/// private let queue = ActorQueue<LogStore>()
Expand Down Expand Up @@ -78,7 +78,7 @@ public final class ActorQueue<ActorType: Actor> {

// MARK: Public

/// Sets the actor context within which each `async` and `await`ed task will execute.
/// Sets the actor context within which each `enqueue` and `enqueueAndWait` task will execute.
/// It is recommended that this method be called in the adopted actor’s `init` method.
/// **Must be called prior to enqueuing any work on the receiver.**
///
Expand All @@ -92,15 +92,15 @@ public final class ActorQueue<ActorType: Actor> {
/// Schedules an asynchronous task for execution and immediately returns.
/// The scheduled task will not execute until all prior tasks have completed or suspended.
/// - Parameter task: The task to enqueue. The task's parameter is a reference to the actor whose execution context has been adopted.
public func async(_ task: @escaping @Sendable (isolated ActorType) async -> Void) {
public func enqueue(_ task: @escaping @Sendable (isolated ActorType) async -> Void) {
taskStreamContinuation.yield(ActorTask(executionContext: executionContext, task: task))
}

/// Schedules an asynchronous task and returns after the task is complete.
/// The scheduled task will not execute until all prior tasks have completed or suspended.
/// - Parameter task: The task to enqueue. The task's parameter is a reference to the actor whose execution context has been adopted.
/// - Returns: The value returned from the enqueued task.
public func await<T>(_ task: @escaping @Sendable (isolated ActorType) async -> T) async -> T {
public func enqueueAndWait<T>(_ task: @escaping @Sendable (isolated ActorType) async -> T) async -> T {
let executionContext = self.executionContext // Capture/retain the executionContext before suspending.
return await withUnsafeContinuation { continuation in
taskStreamContinuation.yield(ActorTask(executionContext: executionContext) { executionContext in
Expand All @@ -113,7 +113,7 @@ public final class ActorQueue<ActorType: Actor> {
/// The scheduled task will not execute until all prior tasks have completed or suspended.
/// - Parameter task: The task to enqueue. The task's parameter is a reference to the actor whose execution context has been adopted.
/// - Returns: The value returned from the enqueued task.
public func await<T>(_ task: @escaping @Sendable (isolated ActorType) async throws -> T) async throws -> T {
public func enqueueAndWait<T>(_ task: @escaping @Sendable (isolated ActorType) async throws -> T) async throws -> T {
let executionContext = self.executionContext // Capture/retain the executionContext before suspending.
return try await withUnsafeThrowingContinuation { continuation in
taskStreamContinuation.yield(ActorTask(executionContext: executionContext) { executionContext in
Expand Down
12 changes: 6 additions & 6 deletions Sources/AsyncQueue/FIFOQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public final class FIFOQueue: Sendable {
/// Schedules an asynchronous task for execution and immediately returns.
/// The scheduled task will not execute until all prior tasks – including suspended tasks – have completed.
/// - Parameter task: The task to enqueue.
public func async(_ task: @escaping @Sendable () async -> Void) {
public func enqueue(_ task: @escaping @Sendable () async -> Void) {
taskStreamContinuation.yield(task)
}

Expand All @@ -63,15 +63,15 @@ public final class FIFOQueue: Sendable {
/// - Parameters:
/// - isolatedActor: The actor within which the task is isolated.
/// - task: The task to enqueue.
public func async<ActorType: Actor>(on isolatedActor: ActorType, _ task: @escaping @Sendable (isolated ActorType) async -> Void) {
public func enqueue<ActorType: Actor>(on isolatedActor: ActorType, _ task: @escaping @Sendable (isolated ActorType) async -> Void) {
taskStreamContinuation.yield { await task(isolatedActor) }
}

/// Schedules an asynchronous task and returns after the task is complete.
/// The scheduled task will not execute until all prior tasks – including suspended tasks – have completed.
/// - Parameter task: The task to enqueue.
/// - Returns: The value returned from the enqueued task.
public func await<T>(_ task: @escaping @Sendable () async -> T) async -> T {
public func enqueueAndWait<T>(_ task: @escaping @Sendable () async -> T) async -> T {
await withUnsafeContinuation { continuation in
taskStreamContinuation.yield {
continuation.resume(returning: await task())
Expand All @@ -85,7 +85,7 @@ public final class FIFOQueue: Sendable {
/// - isolatedActor: The actor within which the task is isolated.
/// - task: The task to enqueue.
/// - Returns: The value returned from the enqueued task.
public func await<ActorType: Actor, T>(on isolatedActor: isolated ActorType, _ task: @escaping @Sendable (isolated ActorType) async -> T) async -> T {
public func enqueueAndWait<ActorType: Actor, T>(on isolatedActor: isolated ActorType, _ task: @escaping @Sendable (isolated ActorType) async -> T) async -> T {
await withUnsafeContinuation { continuation in
taskStreamContinuation.yield {
continuation.resume(returning: await task(isolatedActor))
Expand All @@ -97,7 +97,7 @@ public final class FIFOQueue: Sendable {
/// The scheduled task will not execute until all prior tasks – including suspended tasks – have completed.
/// - Parameter task: The task to enqueue.
/// - Returns: The value returned from the enqueued task.
public func await<T>(_ task: @escaping @Sendable () async throws -> T) async throws -> T {
public func enqueueAndWait<T>(_ task: @escaping @Sendable () async throws -> T) async throws -> T {
try await withUnsafeThrowingContinuation { continuation in
taskStreamContinuation.yield {
do {
Expand All @@ -115,7 +115,7 @@ public final class FIFOQueue: Sendable {
/// - isolatedActor: The actor within which the task is isolated.
/// - task: The task to enqueue.
/// - Returns: The value returned from the enqueued task.
public func await<ActorType: Actor, T>(on isolatedActor: isolated ActorType, _ task: @escaping @Sendable (isolated ActorType) async throws -> T) async throws -> T {
public func enqueueAndWait<ActorType: Actor, T>(on isolatedActor: isolated ActorType, _ task: @escaping @Sendable (isolated ActorType) async throws -> T) async throws -> T {
try await withUnsafeThrowingContinuation { continuation in
taskStreamContinuation.yield {
do {
Expand Down
58 changes: 29 additions & 29 deletions Tests/AsyncQueueTests/ActorQueueTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ final class ActorQueueTests: XCTestCase {
XCTAssertNil(weakCounter)
}

func test_async_retainsAdoptedActorUntilEnqueuedTasksComplete() async {
func test_enqueue_retainsAdoptedActorUntilEnqueuedTasksComplete() async {
let systemUnderTest = ActorQueue<Counter>()
var counter: Counter? = Counter()
weak var weakCounter = counter
systemUnderTest.adoptExecutionContext(of: counter!)

let semaphore = Semaphore()
systemUnderTest.async { counter in
systemUnderTest.enqueue { counter in
await semaphore.wait()
}

Expand All @@ -63,64 +63,64 @@ final class ActorQueueTests: XCTestCase {
await semaphore.signal()
}

func test_async_taskParameterIsAdoptedActor() async {
func test_enqueue_taskParameterIsAdoptedActor() async {
let semaphore = Semaphore()
systemUnderTest.async { counter in
systemUnderTest.enqueue { counter in
XCTAssertTrue(counter === self.counter)
await semaphore.signal()
}

await semaphore.wait()
}

func test_await_taskParameterIsAdoptedActor() async {
await systemUnderTest.await { counter in
func test_enqueueAndWait_taskParameterIsAdoptedActor() async {
await systemUnderTest.enqueueAndWait { counter in
XCTAssertTrue(counter === self.counter)
}
}

func test_async_sendsEventsInOrder() async {
func test_enqueue_sendsEventsInOrder() async {
for iteration in 1...1_000 {
systemUnderTest.async { counter in
systemUnderTest.enqueue { counter in
counter.incrementAndExpectCount(equals: iteration)
}
}
await systemUnderTest.await { _ in /* Drain the queue */ }
await systemUnderTest.enqueueAndWait { _ in /* Drain the queue */ }
}

func test_async_startsExecutionOfNextTaskAfterSuspension() async {
func test_enqueue_startsExecutionOfNextTaskAfterSuspension() async {
let systemUnderTest = ActorQueue<Semaphore>()
let semaphore = Semaphore()
systemUnderTest.adoptExecutionContext(of: semaphore)

systemUnderTest.async { semaphore in
systemUnderTest.enqueue { semaphore in
await semaphore.wait()
}
systemUnderTest.async { semaphore in
systemUnderTest.enqueue { semaphore in
// Signal the semaphore from the actor queue.
// If the actor queue were FIFO, this test would hang since this code would never execute:
// we'd still be waiting for the prior `wait()` tasks to finish.
semaphore.signal()
}
await systemUnderTest.await { _ in /* Drain the queue */ }
await systemUnderTest.enqueueAndWait { _ in /* Drain the queue */ }
}

func test_await_allowsReentrancy() async {
await systemUnderTest.await { [systemUnderTest] counter in
await systemUnderTest.await { counter in
func test_enqueueAndWait_allowsReentrancy() async {
await systemUnderTest.enqueueAndWait { [systemUnderTest] counter in
await systemUnderTest.enqueueAndWait { counter in
counter.incrementAndExpectCount(equals: 1)
}
counter.incrementAndExpectCount(equals: 2)
}
}

func test_async_executesEnqueuedTasksAfterReceiverIsDeallocated() async {
func test_enqueue_executesEnqueuedTasksAfterReceiverIsDeallocated() async {
var systemUnderTest: ActorQueue<Counter>? = ActorQueue()
systemUnderTest?.adoptExecutionContext(of: counter)

let expectation = self.expectation(description: #function)
let semaphore = Semaphore()
systemUnderTest?.async { counter in
systemUnderTest?.enqueue { counter in
// Make the task wait.
await semaphore.wait()
counter.incrementAndExpectCount(equals: 1)
Expand All @@ -136,7 +136,7 @@ final class ActorQueueTests: XCTestCase {
await waitForExpectations(timeout: 1.0)
}

func test_async_doesNotRetainTaskAfterExecution() async {
func test_enqueue_doesNotRetainTaskAfterExecution() async {
final class Reference: Sendable {}
final class ReferenceHolder: @unchecked Sendable {
init() {
Expand All @@ -157,14 +157,14 @@ final class ActorQueueTests: XCTestCase {
systemUnderTest.adoptExecutionContext(of: syncSemaphore)

let expectation = self.expectation(description: #function)
systemUnderTest.async { [reference = referenceHolder.reference] syncSemaphore in
systemUnderTest.enqueue { [reference = referenceHolder.reference] syncSemaphore in
// Now that we've started the task and captured the reference, release the synchronous code.
syncSemaphore.signal()
// Wait for the synchronous setup to complete and the reference to be nil'd out.
await asyncSemaphore.wait()
// Retain the unsafe counter until the task is completed.
_ = reference
systemUnderTest.async { _ in
systemUnderTest.enqueue { _ in
// Signal that this task has cleaned up.
// This closure will not execute until the prior closure completes.
expectation.fulfill()
Expand All @@ -182,9 +182,9 @@ final class ActorQueueTests: XCTestCase {
XCTAssertNil(referenceHolder.weakReference)
}

func test_await_sendsEventsInOrder() async {
func test_enqueueAndWait_sendsEventsInOrder() async {
for iteration in 1...1_000 {
systemUnderTest.async { counter in
systemUnderTest.enqueue { counter in
counter.incrementAndExpectCount(equals: iteration)
}

Expand All @@ -193,26 +193,26 @@ final class ActorQueueTests: XCTestCase {
continue
}

await systemUnderTest.await { counter in
await systemUnderTest.enqueueAndWait { counter in
XCTAssertEqual(counter.count, iteration)
}
}
await systemUnderTest.await { _ in /* Drain the queue */ }
await systemUnderTest.enqueueAndWait { _ in /* Drain the queue */ }
}

func test_await_canReturn() async {
func test_enqueueAndWait_canReturn() async {
let expectedValue = UUID()
let returnedValue = await systemUnderTest.await { _ in expectedValue }
let returnedValue = await systemUnderTest.enqueueAndWait { _ in expectedValue }
XCTAssertEqual(expectedValue, returnedValue)
}

func test_await_canThrow() async {
func test_enqueueAndWait_canThrow() async {
struct TestError: Error, Equatable {
private let identifier = UUID()
}
let expectedError = TestError()
do {
try await systemUnderTest.await { _ in throw expectedError }
try await systemUnderTest.enqueueAndWait { _ in throw expectedError }
} catch {
XCTAssertEqual(error as? TestError, expectedError)
}
Expand Down
Loading

0 comments on commit 1ecda35

Please sign in to comment.