Skip to content

Commit

Permalink
fix(blockexchange): asyncSpawns block exchange tasks
Browse files Browse the repository at this point in the history
- prevents silently swallow future exceptions
  • Loading branch information
emizzle committed Dec 13, 2024
1 parent 9a4eec4 commit 00b09b2
Showing 1 changed file with 17 additions and 12 deletions.
29 changes: 17 additions & 12 deletions codex/blockexchange/engine/engine.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import pkg/questionable
import ../../stores/blockstore
import ../../blocktype
import ../../utils
import ../../utils/exceptions
import ../../utils/trackedfutures
import ../../merkletree
import ../../logutils
import ../../manifest
Expand Down Expand Up @@ -70,7 +72,7 @@ type
peers*: PeerCtxStore # Peers we're currently actively exchanging with
taskQueue*: AsyncHeapQueue[BlockExcPeerCtx] # Peers we're currently processing tasks for
concurrentTasks: int # Number of concurrent peers we're serving at any given time
blockexcTasks: seq[Future[void]] # Future to control blockexc task
trackedFutures: TrackedFutures # Tracks futures of blockexc tasks
blockexcRunning: bool # Indicates if the blockexc task is running
pendingBlocks*: PendingBlocksManager # Blocks we're awaiting to be resolved
peersPerRequest: int # Max number of peers to request from
Expand All @@ -88,7 +90,7 @@ type
proc scheduleTask(b: BlockExcEngine, task: BlockExcPeerCtx): bool {.gcsafe} =
b.taskQueue.pushOrUpdateNoWait(task).isOk()

proc blockexcTaskRunner(b: BlockExcEngine): Future[void] {.gcsafe.}
proc blockexcTaskRunner(b: BlockExcEngine) {.async: (raises: []).}

proc start*(b: BlockExcEngine) {.async.} =
## Start the blockexc task
Expand All @@ -104,7 +106,8 @@ proc start*(b: BlockExcEngine) {.async.} =

b.blockexcRunning = true
for i in 0..<b.concurrentTasks:
b.blockexcTasks.add(blockexcTaskRunner(b))
let fut = b.blockexcTaskRunner().track(b)
asyncSpawn fut

proc stop*(b: BlockExcEngine) {.async.} =
## Stop the blockexc blockexc
Expand All @@ -119,11 +122,7 @@ proc stop*(b: BlockExcEngine) {.async.} =
return

b.blockexcRunning = false
for task in b.blockexcTasks:
if not task.finished:
trace "Awaiting task to stop"
await task.cancelAndWait()
trace "Task stopped"
await b.trackedFutures.cancelTracked()

trace "NetworkStore stopped"

Expand Down Expand Up @@ -565,16 +564,21 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =

task.peerWants.keepItIf(it.address notin successAddresses)

proc blockexcTaskRunner(b: BlockExcEngine) {.async.} =
proc blockexcTaskRunner(b: BlockExcEngine) {.async: (raises: []).} =
## process tasks
##

trace "Starting blockexc task runner"
while b.blockexcRunning:
let
peerCtx = await b.taskQueue.pop()
try:
let
peerCtx = await b.taskQueue.pop()

await b.taskHandler(peerCtx)
await b.taskHandler(peerCtx)
except CancelledError:
break # do not propagate as blockexcTaskRunner was asyncSpawned
except CatchableError as e:
error "error running block exchange task", error = e.msgDetail

Check warning on line 581 in codex/blockexchange/engine/engine.nim

View check run for this annotation

Codecov / codecov/patch

codex/blockexchange/engine/engine.nim#L581

Added line #L581 was not covered by tests

info "Exiting blockexc task runner"

Expand Down Expand Up @@ -603,6 +607,7 @@ proc new*(
network: network,
wallet: wallet,
concurrentTasks: concurrentTasks,
trackedFutures: TrackedFutures.new(),
taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize),
discovery: discovery,
advertiser: advertiser,
Expand Down

0 comments on commit 00b09b2

Please sign in to comment.