Skip to content

Commit

Permalink
refactor(trackedfutures): remove return of future from tracked future…
Browse files Browse the repository at this point in the history
…s api

- cleans up all instances of `.track` to use the `module.trackedfutures.track(future)` procedure, for better readability
- removes the `track` override that is no longer used in the codebase
  • Loading branch information
emizzle committed Dec 18, 2024
1 parent 20bb5e5 commit ba3ef7f
Show file tree
Hide file tree
Showing 9 changed files with 40 additions and 33 deletions.
6 changes: 4 additions & 2 deletions codex/blockexchange/engine/advertiser.nim
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,12 @@ proc start*(b: Advertiser) {.async.} =

b.advertiserRunning = true
for i in 0..<b.concurrentAdvReqs:
let fut = b.processQueueLoop().track(b)
let fut = b.processQueueLoop()
b.trackedFutures.track(fut)
asyncSpawn fut

b.advertiseLocalStoreLoop = advertiseLocalStoreLoop(b).track(b)
b.advertiseLocalStoreLoop = advertiseLocalStoreLoop(b)
b.trackedFutures.track(b.advertiseLocalStoreLoop)
asyncSpawn b.advertiseLocalStoreLoop

proc stop*(b: Advertiser) {.async.} =
Expand Down
6 changes: 4 additions & 2 deletions codex/blockexchange/engine/discovery.nim
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,12 @@ proc start*(b: DiscoveryEngine) {.async.} =

b.discEngineRunning = true
for i in 0..<b.concurrentDiscReqs:
let fut = b.discoveryTaskLoop().track(b)
let fut = b.discoveryTaskLoop()
b.trackedFutures.track(fut)
asyncSpawn fut

b.discoveryLoop = b.discoveryQueueLoop().track(b)
b.discoveryLoop = b.discoveryQueueLoop()
b.trackedFutures.track(b.discoveryLoop)
asyncSpawn b.discoveryLoop

proc stop*(b: DiscoveryEngine) {.async.} =
Expand Down
3 changes: 2 additions & 1 deletion codex/blockexchange/engine/engine.nim
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ proc start*(b: BlockExcEngine) {.async.} =

b.blockexcRunning = true
for i in 0..<b.concurrentTasks:
let fut = b.blockexcTaskRunner().track(b)
let fut = b.blockexcTaskRunner()
b.trackedFutures.track(fut)
asyncSpawn fut

proc stop*(b: BlockExcEngine) {.async.} =
Expand Down
4 changes: 3 additions & 1 deletion codex/sales.nim
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,9 @@ proc onSlotFreed(sales: Sales,
if err =? queue.push(found).errorOption:
error "failed to push slot items to queue", error = err.msgDetail

asyncSpawn addSlotToQueue().track(sales)
let fut = addSlotToQueue()
sales.trackedFutures.track(fut)
asyncSpawn fut

proc subscribeRequested(sales: Sales) {.async.} =
let context = sales.context
Expand Down
12 changes: 8 additions & 4 deletions codex/sales/slotqueue.nim
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ proc addWorker(self: SlotQueue): ?!void =

let worker = SlotQueueWorker.init()
try:
discard worker.doneProcessing.track(self)
self.trackedFutures.track(worker.doneProcessing)
self.workers.addLastNoWait(worker)
except AsyncQueueFullError:
return failure("failed to add worker, worker queue full")
Expand All @@ -343,7 +343,7 @@ proc dispatch(self: SlotQueue,

if onProcessSlot =? self.onProcessSlot:
try:
discard worker.doneProcessing.track(self)
self.trackedFutures.track(worker.doneProcessing)
await onProcessSlot(item, worker.doneProcessing)
await worker.doneProcessing

Expand Down Expand Up @@ -418,7 +418,9 @@ proc run(self: SlotQueue) {.async: (raises: []).} =

trace "processing item"

asyncSpawn self.dispatch(worker, item).track(self)
let fut = self.dispatch(worker, item)
self.trackedFutures.track(fut)
asyncSpawn fut

await sleepAsync(1.millis) # poll
except CancelledError:
Expand All @@ -444,7 +446,9 @@ proc start*(self: SlotQueue) =
if err =? self.addWorker().errorOption:
error "start: error adding new worker", error = err.msg

asyncSpawn self.run().track(self)
let fut = self.run()
self.trackedFutures.track(fut)
asyncSpawn fut

proc stop*(self: SlotQueue) {.async.} =
if not self.running:
Expand Down
7 changes: 5 additions & 2 deletions codex/utils/asyncstatemachine.nim
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ proc scheduler(machine: Machine) {.async: (raises: []).} =
machine.state = next
debug "enter state", state = fromState & " => " & $machine.state
running = machine.run(machine.state)
asyncSpawn running.track(machine)
machine.trackedFutures.track(running)
asyncSpawn running
except CancelledError:
break # do not propagate bc it is asyncSpawned

Expand All @@ -90,7 +91,9 @@ proc start*(machine: Machine, initialState: State) =
machine.scheduled = newAsyncQueue[Event]()

machine.started = true
asyncSpawn machine.scheduler().track(machine)
let fut = machine.scheduler()
machine.trackedFutures.track(fut)
asyncSpawn fut
machine.schedule(Event.transition(machine.state, initialState))

proc stop*(machine: Machine) {.async.} =
Expand Down
12 changes: 2 additions & 10 deletions codex/utils/trackedfutures.nim
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ proc removeFuture(self: TrackedFutures, future: FutureBase) =
if not self.cancelling and not future.isNil:
self.futures.del(future.id)

proc track*[T](self: TrackedFutures, fut: Future[T]): Future[T] =
proc track*[T](self: TrackedFutures, fut: Future[T]) =
if self.cancelling:
return fut
return

self.futures[fut.id] = FutureBase(fut)

Expand All @@ -30,14 +30,6 @@ proc track*[T](self: TrackedFutures, fut: Future[T]): Future[T] =

fut.addCallback(cb)

return fut

proc track*[T, U](future: Future[T], self: U): Future[T] =
## Convenience method that allows chaining future, eg:
## `await someFut().track(sales)`, where `sales` has declared a
## `trackedFutures` property.
self.trackedFutures.track(future)

proc cancelTracked*(self: TrackedFutures) {.async: (raises: []).} =
self.cancelling = true

Expand Down
18 changes: 9 additions & 9 deletions tests/codex/utils/testtrackedfutures.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,46 +18,46 @@ asyncchecksuite "tracked futures":

test "tracks unfinished futures":
let fut = newFuture[void]("test")
discard fut.track(module)
module.trackedFutures.track(fut)
check module.trackedFutures.len == 1

test "does not track completed futures":
let fut = newFuture[void]("test")
fut.complete()
discard fut.track(module)
module.trackedFutures.track(fut)
check eventually module.trackedFutures.len == 0

test "does not track failed futures":
let fut = newFuture[void]("test")
fut.fail((ref CatchableError)(msg: "some error"))
discard fut.track(module)
module.trackedFutures.track(fut)
check eventually module.trackedFutures.len == 0

test "does not track cancelled futures":
let fut = newFuture[void]("test")
await fut.cancelAndWait()
discard fut.track(module)
module.trackedFutures.track(fut)
check eventually module.trackedFutures.len == 0

test "removes tracked future when finished":
let fut = newFuture[void]("test")
discard fut.track(module)
module.trackedFutures.track(fut)
fut.complete()
check eventually module.trackedFutures.len == 0

test "removes tracked future when cancelled":
let fut = newFuture[void]("test")
discard fut.track(module)
module.trackedFutures.track(fut)
await fut.cancelAndWait()
check eventually module.trackedFutures.len == 0

test "cancels and removes all tracked futures":
let fut1 = newFuture[void]("test1")
let fut2 = newFuture[void]("test2")
let fut3 = newFuture[void]("test3")
discard fut1.track(module)
discard fut2.track(module)
discard fut3.track(module)
module.trackedFutures.track(fut1)
module.trackedFutures.track(fut2)
module.trackedFutures.track(fut3)
await module.trackedFutures.cancelTracked()
check eventually fut1.cancelled
check eventually fut2.cancelled
Expand Down
5 changes: 3 additions & 2 deletions tests/integration/nodeprocess.nim
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,11 @@ proc waitUntilOutput*(node: NodeProcess, output: string) {.async.} =
trace "waiting until", output

let started = newFuture[void]()
let fut = node.captureOutput(output, started).track(node)
let fut = node.captureOutput(output, started)
node.trackedFutures.track(fut)
asyncSpawn fut
await started.wait(60.seconds) # allow enough time for proof generation

proc waitUntilStarted*(node: NodeProcess) {.async.} =
try:
await node.waitUntilOutput(node.startedOutput)
Expand Down

0 comments on commit ba3ef7f

Please sign in to comment.