Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(blockexchange): ensures futures are asyncSpawned #1037

Merged
merged 3 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 26 additions & 25 deletions codex/blockexchange/engine/advertiser.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import ../peers

import ../../utils
import ../../utils/exceptions
import ../../utils/trackedfutures
import ../../discovery
import ../../stores/blockstore
import ../../logutils
Expand All @@ -42,7 +44,7 @@

advertiseLocalStoreLoop*: Future[void] # Advertise loop task handle
advertiseQueue*: AsyncQueue[Cid] # Advertise queue
advertiseTasks*: seq[Future[void]] # Advertise tasks
trackedFutures*: TrackedFutures # Advertise tasks futures

advertiseLocalStoreLoopSleep: Duration # Advertise loop sleep
inFlightAdvReqs*: Table[Cid, Future[void]] # Inflight advertise requests
Expand Down Expand Up @@ -70,20 +72,26 @@
await b.addCidToQueue(cid)
await b.addCidToQueue(manifest.treeCid)

proc advertiseLocalStoreLoop(b: Advertiser) {.async.} =
proc advertiseLocalStoreLoop(b: Advertiser) {.async: (raises: []).} =
while b.advertiserRunning:
if cids =? await b.localStore.listBlocks(blockType = BlockType.Manifest):
trace "Advertiser begins iterating blocks..."
for c in cids:
if cid =? await c:
await b.advertiseBlock(cid)
trace "Advertiser iterating blocks finished."
try:
if cids =? await b.localStore.listBlocks(blockType = BlockType.Manifest):
trace "Advertiser begins iterating blocks..."
for c in cids:
if cid =? await c:
await b.advertiseBlock(cid)
trace "Advertiser iterating blocks finished."

await sleepAsync(b.advertiseLocalStoreLoopSleep)

await sleepAsync(b.advertiseLocalStoreLoopSleep)
except CancelledError:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Btw, we should probably plan to drop the v3 compat mode from Chronos as it'll then start warning about those all over.

break # do not propagate as advertiseLocalStoreLoop was asyncSpawned
except CatchableError as e:
error "failed to advertise blocks in local store", error = e.msgDetail

Check warning on line 90 in codex/blockexchange/engine/advertiser.nim

View check run for this annotation

Codecov / codecov/patch

codex/blockexchange/engine/advertiser.nim#L90

Added line #L90 was not covered by tests

info "Exiting advertise task loop"

proc processQueueLoop(b: Advertiser) {.async.} =
proc processQueueLoop(b: Advertiser) {.async: (raises: []).} =
while b.advertiserRunning:
try:
let
Expand Down Expand Up @@ -129,9 +137,11 @@

b.advertiserRunning = true
for i in 0..<b.concurrentAdvReqs:
b.advertiseTasks.add(processQueueLoop(b))
let fut = b.processQueueLoop().track(b)
asyncSpawn fut
Copy link
Contributor

@benbierens benbierens Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see how track() works, but I don't recommend this. If I look for the usage of the "trackedFutures" field in the engine.nim, all I find is the creation and cancel-call. I'd much prefer the connection between the field and track() to be explicit at the point of use. So a small change like
let fut = b.processQueueLoop().track(b.trackedFutures)

What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I confess I'm always weirded out when I see this. My simple brain parses that the Future returned by processQueuedLoop is tracking something, when in fact there's this unexpected inversion and it's actually b.trackedFutures that is tracking b.processQueueLoop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, the API could use some work, but that can be handled in another PR and applied across the codebase.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are reasons the API ended up where we are now, but those reasons are being removed from the codebase, so we'll have some freedom to change the API (not in this PR).

Something like this would be better?

let fut = b.processQueueLoop()
b.trackedFutures.track(fut)
asyncSpawn fut

Copy link
Contributor

@marcinczenko marcinczenko Dec 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking just at the code:

let fut = b.processQueueLoop().track(b)

it is not evident what it does. To understand you have to check the return type of processQueueLoop which apparently has ability to “track” objects of the type corresponding to b. The reader really needs to dissect the code to understand.

The proposed alternative is way easier to understand. You could even just say b.trackedFutures.add(fut) or maybe even instead of trackedFutures something like a type FutureTracker and then b.futureTracker.add(fut) or maybe there could be a special type of a “TrackedFuture” which would take care of itself magically (I do not know if that would work here - I did not dive into details)…

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe there could be a special type of a “TrackedFuture” which would take care of itself magically

There are definitely ways to do this, however I'm not a fan of magic things in most cases because they tend to hide what's happening and make things harder to reason, harder to debug, and harder to add new features (even though it's usually intended to make things easier to implement, however it's rarely the case).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When saying "magic" I mean more something in the style of RAII (Resource Acquisition is Initialization) common in C++ and less in the style of Java/C#/Spring like fancy annotations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR here: #1046


b.advertiseLocalStoreLoop = advertiseLocalStoreLoop(b)
b.advertiseLocalStoreLoop = advertiseLocalStoreLoop(b).track(b)
asyncSpawn b.advertiseLocalStoreLoop

proc stop*(b: Advertiser) {.async.} =
## Stop the advertiser
Expand All @@ -145,19 +155,9 @@
b.advertiserRunning = false
# Stop incoming tasks from callback and localStore loop
b.localStore.onBlockStored = CidCallback.none
if not b.advertiseLocalStoreLoop.isNil and not b.advertiseLocalStoreLoop.finished:
trace "Awaiting advertise loop to stop"
await b.advertiseLocalStoreLoop.cancelAndWait()
trace "Advertise loop stopped"

# Clear up remaining tasks
for task in b.advertiseTasks:
if not task.finished:
trace "Awaiting advertise task to stop"
await task.cancelAndWait()
trace "Advertise task stopped"

trace "Advertiser stopped"
trace "Stopping advertise loop and tasks"
await b.trackedFutures.cancelTracked()
trace "Advertiser loop and tasks stopped"

proc new*(
T: type Advertiser,
Expand All @@ -173,5 +173,6 @@
discovery: discovery,
concurrentAdvReqs: concurrentAdvReqs,
advertiseQueue: newAsyncQueue[Cid](concurrentAdvReqs),
trackedFutures: TrackedFutures.new(),
inFlightAdvReqs: initTable[Cid, Future[void]](),
advertiseLocalStoreLoopSleep: advertiseLocalStoreLoopSleep)
44 changes: 24 additions & 20 deletions codex/blockexchange/engine/discovery.nim
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import ../peers

import ../../utils
import ../../utils/trackedfutures
import ../../discovery
import ../../stores/blockstore
import ../../logutils
Expand Down Expand Up @@ -50,12 +51,12 @@
concurrentDiscReqs: int # Concurrent discovery requests
discoveryLoop*: Future[void] # Discovery loop task handle
discoveryQueue*: AsyncQueue[Cid] # Discovery queue
discoveryTasks*: seq[Future[void]] # Discovery tasks
trackedFutures*: TrackedFutures # Tracked Discovery tasks futures
minPeersPerBlock*: int # Max number of peers with block
discoveryLoopSleep: Duration # Discovery loop sleep
inFlightDiscReqs*: Table[Cid, Future[seq[SignedPeerRecord]]] # Inflight discovery requests

proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} =
proc discoveryQueueLoop(b: DiscoveryEngine) {.async: (raises: []).} =
while b.discEngineRunning:
for cid in toSeq(b.pendingBlocks.wantListBlockCids):
try:
Expand All @@ -66,13 +67,15 @@
except CatchableError as exc:
warn "Exception in discovery loop", exc = exc.msg

logScope:
sleep = b.discoveryLoopSleep
wanted = b.pendingBlocks.len

await sleepAsync(b.discoveryLoopSleep)
try:
logScope:
sleep = b.discoveryLoopSleep
wanted = b.pendingBlocks.len
await sleepAsync(b.discoveryLoopSleep)
except CancelledError:
discard # do not propagate as discoveryQueueLoop was asyncSpawned

proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} =
proc discoveryTaskLoop(b: DiscoveryEngine) {.async: (raises: []).} =
## Run discovery tasks
##

Expand Down Expand Up @@ -116,6 +119,11 @@
return
except CatchableError as exc:
warn "Exception in discovery task runner", exc = exc.msg
except Exception as e:

Check warning on line 122 in codex/blockexchange/engine/discovery.nim

View check run for this annotation

Codecov / codecov/patch

codex/blockexchange/engine/discovery.nim#L122

Added line #L122 was not covered by tests
# Raised by b.discovery.removeProvider somehow...
# This should not be catchable, and we should never get here. Therefore,
# raise a Defect.
raiseAssert "Exception when removing provider"

Check warning on line 126 in codex/blockexchange/engine/discovery.nim

View check run for this annotation

Codecov / codecov/patch

codex/blockexchange/engine/discovery.nim#L126

Added line #L126 was not covered by tests

info "Exiting discovery task runner"

Expand All @@ -139,9 +147,11 @@

b.discEngineRunning = true
for i in 0..<b.concurrentDiscReqs:
b.discoveryTasks.add(discoveryTaskLoop(b))
let fut = b.discoveryTaskLoop().track(b)
asyncSpawn fut

b.discoveryLoop = discoveryQueueLoop(b)
b.discoveryLoop = b.discoveryQueueLoop().track(b)
asyncSpawn b.discoveryLoop

proc stop*(b: DiscoveryEngine) {.async.} =
## Stop the discovery engine
Expand All @@ -153,16 +163,9 @@
return

b.discEngineRunning = false
for task in b.discoveryTasks:
if not task.finished:
trace "Awaiting discovery task to stop"
await task.cancelAndWait()
trace "Discovery task stopped"

if not b.discoveryLoop.isNil and not b.discoveryLoop.finished:
trace "Awaiting discovery loop to stop"
await b.discoveryLoop.cancelAndWait()
trace "Discovery loop stopped"
trace "Stopping discovery loop and tasks"
await b.trackedFutures.cancelTracked()
trace "Discovery loop and tasks stopped"

trace "Discovery engine stopped"

Expand All @@ -187,6 +190,7 @@
pendingBlocks: pendingBlocks,
concurrentDiscReqs: concurrentDiscReqs,
discoveryQueue: newAsyncQueue[Cid](concurrentDiscReqs),
trackedFutures: TrackedFutures.new(),
inFlightDiscReqs: initTable[Cid, Future[seq[SignedPeerRecord]]](),
discoveryLoopSleep: discoveryLoopSleep,
minPeersPerBlock: minPeersPerBlock)
29 changes: 17 additions & 12 deletions codex/blockexchange/engine/engine.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import ../../stores/blockstore
import ../../blocktype
import ../../utils
import ../../utils/exceptions
import ../../utils/trackedfutures
import ../../merkletree
import ../../logutils
import ../../manifest
Expand Down Expand Up @@ -70,7 +72,7 @@
peers*: PeerCtxStore # Peers we're currently actively exchanging with
taskQueue*: AsyncHeapQueue[BlockExcPeerCtx] # Peers we're currently processing tasks for
concurrentTasks: int # Number of concurrent peers we're serving at any given time
blockexcTasks: seq[Future[void]] # Future to control blockexc task
trackedFutures: TrackedFutures # Tracks futures of blockexc tasks
blockexcRunning: bool # Indicates if the blockexc task is running
pendingBlocks*: PendingBlocksManager # Blocks we're awaiting to be resolved
peersPerRequest: int # Max number of peers to request from
Expand All @@ -88,7 +90,7 @@
proc scheduleTask(b: BlockExcEngine, task: BlockExcPeerCtx): bool {.gcsafe} =
b.taskQueue.pushOrUpdateNoWait(task).isOk()

proc blockexcTaskRunner(b: BlockExcEngine): Future[void] {.gcsafe.}
proc blockexcTaskRunner(b: BlockExcEngine) {.async: (raises: []).}

proc start*(b: BlockExcEngine) {.async.} =
## Start the blockexc task
Expand All @@ -104,7 +106,8 @@

b.blockexcRunning = true
for i in 0..<b.concurrentTasks:
b.blockexcTasks.add(blockexcTaskRunner(b))
let fut = b.blockexcTaskRunner().track(b)
asyncSpawn fut

proc stop*(b: BlockExcEngine) {.async.} =
## Stop the blockexc blockexc
Expand All @@ -119,11 +122,7 @@
return

b.blockexcRunning = false
for task in b.blockexcTasks:
if not task.finished:
trace "Awaiting task to stop"
await task.cancelAndWait()
trace "Task stopped"
await b.trackedFutures.cancelTracked()

trace "NetworkStore stopped"

Expand Down Expand Up @@ -565,16 +564,21 @@

task.peerWants.keepItIf(it.address notin successAddresses)

proc blockexcTaskRunner(b: BlockExcEngine) {.async.} =
proc blockexcTaskRunner(b: BlockExcEngine) {.async: (raises: []).} =
## process tasks
##

trace "Starting blockexc task runner"
while b.blockexcRunning:
let
peerCtx = await b.taskQueue.pop()
try:
let
peerCtx = await b.taskQueue.pop()

await b.taskHandler(peerCtx)
await b.taskHandler(peerCtx)
except CancelledError:
break # do not propagate as blockexcTaskRunner was asyncSpawned
except CatchableError as e:
error "error running block exchange task", error = e.msgDetail

Check warning on line 581 in codex/blockexchange/engine/engine.nim

View check run for this annotation

Codecov / codecov/patch

codex/blockexchange/engine/engine.nim#L581

Added line #L581 was not covered by tests

info "Exiting blockexc task runner"

Expand Down Expand Up @@ -603,6 +607,7 @@
network: network,
wallet: wallet,
concurrentTasks: concurrentTasks,
trackedFutures: TrackedFutures.new(),
taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize),
discovery: discovery,
advertiser: advertiser,
Expand Down
Loading