Skip to content

Commit

Permalink
fix(slotqueue): asyncSpawns futures correctly (#1034)
Browse files Browse the repository at this point in the history
- asyncSpawns `run` and worker `dispatch` in slotqueue.
- removes usage of `then` from slotqueue.
  • Loading branch information
emizzle authored Dec 13, 2024
1 parent 7c804b0 commit 1f49f86
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 36 deletions.
2 changes: 1 addition & 1 deletion codex/sales.nim
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ proc startSlotQueue(sales: Sales) {.async.} =
trace "processing slot queue item", reqId = item.requestId, slotIdx = item.slotIndex
sales.processSlot(item, done)

asyncSpawn slotQueue.start()
slotQueue.start()

proc onAvailabilityAdded(availability: Availability) {.async.} =
await sales.onAvailabilityAdded(availability)
Expand Down
51 changes: 25 additions & 26 deletions codex/sales/slotqueue.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import ../rng
import ../utils
import ../contracts/requests
import ../utils/asyncheapqueue
import ../utils/then
import ../utils/trackedfutures

logScope:
Expand Down Expand Up @@ -333,7 +332,7 @@ proc addWorker(self: SlotQueue): ?!void =

proc dispatch(self: SlotQueue,
worker: SlotQueueWorker,
item: SlotQueueItem) {.async.} =
item: SlotQueueItem) {.async: (raises: []).} =
logScope:
requestId = item.requestId
slotIndex = item.slotIndex
Expand Down Expand Up @@ -380,22 +379,7 @@ proc clearSeenFlags*(self: SlotQueue) =

trace "all 'seen' flags cleared"

proc start*(self: SlotQueue) {.async.} =
if self.running:
return

trace "starting slot queue"

self.running = true

# must be called in `start` to avoid sideeffects in `new`
self.workers = newAsyncQueue[SlotQueueWorker](self.maxWorkers)

# Add initial workers to the `AsyncHeapQueue`. Once a worker has completed its
# task, a new worker will be pushed to the queue
for i in 0..<self.maxWorkers:
if err =? self.addWorker().errorOption:
error "start: error adding new worker", error = err.msg
proc run(self: SlotQueue) {.async: (raises: []).} =

while self.running:
try:
Expand All @@ -405,8 +389,8 @@ proc start*(self: SlotQueue) {.async.} =
# block until unpaused is true/fired, ie wait for queue to be unpaused
await self.unpaused.wait()

let worker = await self.workers.popFirst().track(self) # if workers saturated, wait here for new workers
let item = await self.queue.pop().track(self) # if queue empty, wait here for new items
let worker = await self.workers.popFirst() # if workers saturated, wait here for new workers
let item = await self.queue.pop() # if queue empty, wait here for new items

logScope:
reqId = item.requestId
Expand Down Expand Up @@ -434,19 +418,34 @@ proc start*(self: SlotQueue) {.async.} =

trace "processing item"

self.dispatch(worker, item)
.track(self)
.catch(proc (e: ref CatchableError) =
error "Unknown error dispatching worker", error = e.msg
)
asyncSpawn self.dispatch(worker, item).track(self)

await sleepAsync(1.millis) # poll
except CancelledError:
trace "slot queue cancelled"
return
break
except CatchableError as e: # raised from self.queue.pop() or self.workers.pop()
warn "slot queue error encountered during processing", error = e.msg

proc start*(self: SlotQueue) =
if self.running:
return

trace "starting slot queue"

self.running = true

# must be called in `start` to avoid sideeffects in `new`
self.workers = newAsyncQueue[SlotQueueWorker](self.maxWorkers)

# Add initial workers to the `AsyncHeapQueue`. Once a worker has completed its
# task, a new worker will be pushed to the queue
for i in 0..<self.maxWorkers:
if err =? self.addWorker().errorOption:
error "start: error adding new worker", error = err.msg

asyncSpawn self.run().track(self)

proc stop*(self: SlotQueue) {.async.} =
if not self.running:
return
Expand Down
16 changes: 7 additions & 9 deletions tests/codex/sales/testslotqueue.nim
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,21 @@ suite "Slot queue start/stop":
check not queue.running

test "can call start multiple times, and when already running":
asyncSpawn queue.start()
asyncSpawn queue.start()
queue.start()
queue.start()
check queue.running

test "can call stop when alrady stopped":
await queue.stop()
check not queue.running

test "can call stop when running":
asyncSpawn queue.start()
queue.start()
await queue.stop()
check not queue.running

test "can call stop multiple times":
asyncSpawn queue.start()
queue.start()
await queue.stop()
await queue.stop()
check not queue.running
Expand All @@ -62,8 +62,6 @@ suite "Slot queue workers":
queue = SlotQueue.new(maxSize = 5, maxWorkers = 3)
queue.onProcessSlot = onProcessSlot

proc startQueue = asyncSpawn queue.start()

teardown:
await queue.stop()

Expand All @@ -79,7 +77,7 @@ suite "Slot queue workers":
discard SlotQueue.new(maxSize = 1, maxWorkers = 2)

test "does not surpass max workers":
startQueue()
queue.start()
let item1 = SlotQueueItem.example
let item2 = SlotQueueItem.example
let item3 = SlotQueueItem.example
Expand All @@ -97,7 +95,7 @@ suite "Slot queue workers":

queue.onProcessSlot = processSlot

startQueue()
queue.start()
let item1 = SlotQueueItem.example
let item2 = SlotQueueItem.example
let item3 = SlotQueueItem.example
Expand All @@ -122,7 +120,7 @@ suite "Slot queue":
onProcessSlotCalled = true
onProcessSlotCalledWith.add (item.requestId, item.slotIndex)
done.complete()
asyncSpawn queue.start()
queue.start()

setup:
onProcessSlotCalled = false
Expand Down

0 comments on commit 1f49f86

Please sign in to comment.