-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(blockexchange): ensures futures are asyncSpawned #1037
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,8 @@ | |
import ../peers | ||
|
||
import ../../utils | ||
import ../../utils/exceptions | ||
import ../../utils/trackedfutures | ||
import ../../discovery | ||
import ../../stores/blockstore | ||
import ../../logutils | ||
|
@@ -42,7 +44,7 @@ | |
|
||
advertiseLocalStoreLoop*: Future[void] # Advertise loop task handle | ||
advertiseQueue*: AsyncQueue[Cid] # Advertise queue | ||
advertiseTasks*: seq[Future[void]] # Advertise tasks | ||
trackedFutures*: TrackedFutures # Advertise tasks futures | ||
|
||
advertiseLocalStoreLoopSleep: Duration # Advertise loop sleep | ||
inFlightAdvReqs*: Table[Cid, Future[void]] # Inflight advertise requests | ||
|
@@ -70,20 +72,26 @@ | |
await b.addCidToQueue(cid) | ||
await b.addCidToQueue(manifest.treeCid) | ||
|
||
proc advertiseLocalStoreLoop(b: Advertiser) {.async.} = | ||
proc advertiseLocalStoreLoop(b: Advertiser) {.async: (raises: []).} = | ||
while b.advertiserRunning: | ||
if cids =? await b.localStore.listBlocks(blockType = BlockType.Manifest): | ||
trace "Advertiser begins iterating blocks..." | ||
for c in cids: | ||
if cid =? await c: | ||
await b.advertiseBlock(cid) | ||
trace "Advertiser iterating blocks finished." | ||
try: | ||
if cids =? await b.localStore.listBlocks(blockType = BlockType.Manifest): | ||
trace "Advertiser begins iterating blocks..." | ||
for c in cids: | ||
if cid =? await c: | ||
await b.advertiseBlock(cid) | ||
trace "Advertiser iterating blocks finished." | ||
|
||
await sleepAsync(b.advertiseLocalStoreLoopSleep) | ||
|
||
await sleepAsync(b.advertiseLocalStoreLoopSleep) | ||
except CancelledError: | ||
break # do not propagate as advertiseLocalStoreLoop was asyncSpawned | ||
except CatchableError as e: | ||
error "failed to advertise blocks in local store", error = e.msgDetail | ||
|
||
info "Exiting advertise task loop" | ||
|
||
proc processQueueLoop(b: Advertiser) {.async.} = | ||
proc processQueueLoop(b: Advertiser) {.async: (raises: []).} = | ||
while b.advertiserRunning: | ||
try: | ||
let | ||
|
@@ -129,9 +137,11 @@ | |
|
||
b.advertiserRunning = true | ||
for i in 0..<b.concurrentAdvReqs: | ||
b.advertiseTasks.add(processQueueLoop(b)) | ||
let fut = b.processQueueLoop().track(b) | ||
asyncSpawn fut | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see how track() works, but I don't recommend this. If I look for the usage of the "trackedFutures" field in the engine.nim, all I find is the creation and cancel-call. I'd much prefer the connection between the field and track() to be explicit at the point of use. So a small change like What do you think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I confess I'm always weirded out when I see this. My simple brain parses that the Future returned by There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, the API could use some work, but that can be handled in another PR and applied across the codebase. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are reasons the API ended up where we are now, but those reasons are being removed from the codebase, so we'll have some freedom to change the API (not in this PR). Something like this would be better? let fut = b.processQueueLoop()
b.trackedFutures.track(fut)
asyncSpawn fut There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looking just at the code: let fut = b.processQueueLoop().track(b) it is not evident what it does. To understand you have to check the return type of The proposed alternative is way easier to understand. You could even just say There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There are definitely ways to do this, however I'm not a fan of magic things in most cases because they tend to hide what's happening and make things harder to reason, harder to debug, and harder to add new features (even though it's usually intended to make things easier to implement, however it's rarely the case). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When saying "magic" I mean more something in the style of RAII (Resource Acquisition is Initialization) common in C++ and less in the style of Java/C#/Spring like fancy annotations. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. PR here: #1046 |
||
|
||
b.advertiseLocalStoreLoop = advertiseLocalStoreLoop(b) | ||
b.advertiseLocalStoreLoop = advertiseLocalStoreLoop(b).track(b) | ||
asyncSpawn b.advertiseLocalStoreLoop | ||
|
||
proc stop*(b: Advertiser) {.async.} = | ||
## Stop the advertiser | ||
|
@@ -145,19 +155,9 @@ | |
b.advertiserRunning = false | ||
# Stop incoming tasks from callback and localStore loop | ||
b.localStore.onBlockStored = CidCallback.none | ||
if not b.advertiseLocalStoreLoop.isNil and not b.advertiseLocalStoreLoop.finished: | ||
trace "Awaiting advertise loop to stop" | ||
await b.advertiseLocalStoreLoop.cancelAndWait() | ||
trace "Advertise loop stopped" | ||
|
||
# Clear up remaining tasks | ||
for task in b.advertiseTasks: | ||
if not task.finished: | ||
trace "Awaiting advertise task to stop" | ||
await task.cancelAndWait() | ||
trace "Advertise task stopped" | ||
|
||
trace "Advertiser stopped" | ||
trace "Stopping advertise loop and tasks" | ||
await b.trackedFutures.cancelTracked() | ||
trace "Advertiser loop and tasks stopped" | ||
|
||
proc new*( | ||
T: type Advertiser, | ||
|
@@ -173,5 +173,6 @@ | |
discovery: discovery, | ||
concurrentAdvReqs: concurrentAdvReqs, | ||
advertiseQueue: newAsyncQueue[Cid](concurrentAdvReqs), | ||
trackedFutures: TrackedFutures.new(), | ||
inFlightAdvReqs: initTable[Cid, Future[void]](), | ||
advertiseLocalStoreLoopSleep: advertiseLocalStoreLoopSleep) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Btw, we should probably plan to drop the v3 compat mode from Chronos as it'll then start warning about those all over.