Skip to content

Commit

Permalink
Use spawn_async in ByRoot handling workers (#5557)
Browse files Browse the repository at this point in the history
* Use spawn_async in ByRoot handling workers

* box large variants
  • Loading branch information
dapplion authored Apr 12, 2024
1 parent 116a55e commit b6a1c86
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 234 deletions.
35 changes: 9 additions & 26 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ pub enum BlockingOrAsync {
/// queuing specifics.
pub enum Work<E: EthSpec> {
GossipAttestation {
attestation: GossipAttestationPackage<E>,
attestation: Box<GossipAttestationPackage<E>>,
process_individual: Box<dyn FnOnce(GossipAttestationPackage<E>) + Send + Sync>,
process_batch: Box<dyn FnOnce(Vec<GossipAttestationPackage<E>>) + Send + Sync>,
},
Expand All @@ -583,7 +583,7 @@ pub enum Work<E: EthSpec> {
process_batch: Box<dyn FnOnce(Vec<GossipAttestationPackage<E>>) + Send + Sync>,
},
GossipAggregate {
aggregate: GossipAggregatePackage<E>,
aggregate: Box<GossipAggregatePackage<E>>,
process_individual: Box<dyn FnOnce(GossipAggregatePackage<E>) + Send + Sync>,
process_batch: Box<dyn FnOnce(Vec<GossipAggregatePackage<E>>) + Send + Sync>,
},
Expand Down Expand Up @@ -624,8 +624,8 @@ pub enum Work<E: EthSpec> {
ChainSegment(AsyncFn),
ChainSegmentBackfill(AsyncFn),
Status(BlockingFn),
BlocksByRangeRequest(BlockingFnWithManualSendOnIdle),
BlocksByRootsRequest(BlockingFnWithManualSendOnIdle),
BlocksByRangeRequest(AsyncFn),
BlocksByRootsRequest(AsyncFn),
BlobsByRangeRequest(BlockingFn),
BlobsByRootsRequest(BlockingFn),
GossipBlsToExecutionChange(BlockingFn),
Expand Down Expand Up @@ -1015,7 +1015,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
process_individual: _,
process_batch,
} => {
aggregates.push(aggregate);
aggregates.push(*aggregate);
if process_batch_opt.is_none() {
process_batch_opt = Some(process_batch);
}
Expand Down Expand Up @@ -1075,7 +1075,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
process_individual: _,
process_batch,
} => {
attestations.push(attestation);
attestations.push(*attestation);
if process_batch_opt.is_none() {
process_batch_opt = Some(process_batch);
}
Expand Down Expand Up @@ -1445,7 +1445,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
process_individual,
process_batch: _,
} => task_spawner.spawn_blocking(move || {
process_individual(attestation);
process_individual(*attestation);
}),
Work::GossipAttestationBatch {
attestations,
Expand All @@ -1458,7 +1458,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
process_individual,
process_batch: _,
} => task_spawner.spawn_blocking(move || {
process_individual(aggregate);
process_individual(*aggregate);
}),
Work::GossipAggregateBatch {
aggregates,
Expand Down Expand Up @@ -1493,7 +1493,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
task_spawner.spawn_blocking(process_fn)
}
Work::BlocksByRangeRequest(work) | Work::BlocksByRootsRequest(work) => {
task_spawner.spawn_blocking_with_manual_send_idle(work)
task_spawner.spawn_async(work)
}
Work::ChainSegmentBackfill(process_fn) => task_spawner.spawn_async(process_fn),
Work::ApiRequestP0(process_fn) | Work::ApiRequestP1(process_fn) => match process_fn {
Expand Down Expand Up @@ -1555,23 +1555,6 @@ impl TaskSpawner {
WORKER_TASK_NAME,
)
}

/// Spawn a blocking task, passing the `SendOnDrop` into the task.
///
/// ## Notes
///
/// Users must ensure the `SendOnDrop` is dropped at the appropriate time!
pub fn spawn_blocking_with_manual_send_idle<F>(self, task: F)
where
F: FnOnce(SendOnDrop) + Send + 'static,
{
self.executor.spawn_blocking(
|| {
task(self.send_idle_on_drop);
},
WORKER_TASK_NAME,
)
}
}

/// This struct will send a message on `self.tx` when it is dropped. An error will be logged on
Expand Down
36 changes: 14 additions & 22 deletions beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,14 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
self.try_send(BeaconWorkEvent {
drop_during_sync: true,
work: Work::GossipAttestation {
attestation: GossipAttestationPackage {
attestation: Box::new(GossipAttestationPackage {
message_id,
peer_id,
attestation: Box::new(attestation),
subnet_id,
should_import,
seen_timestamp,
},
}),
process_individual: Box::new(process_individual),
process_batch: Box::new(process_batch),
},
Expand Down Expand Up @@ -148,13 +148,13 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
self.try_send(BeaconWorkEvent {
drop_during_sync: true,
work: Work::GossipAggregate {
aggregate: GossipAggregatePackage {
aggregate: Box::new(GossipAggregatePackage {
message_id,
peer_id,
aggregate: Box::new(aggregate),
beacon_block_root,
seen_timestamp,
},
}),
process_individual: Box::new(process_individual),
process_batch: Box::new(process_batch),
},
Expand Down Expand Up @@ -508,20 +508,16 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
request: BlocksByRangeRequest,
) -> Result<(), Error<T::EthSpec>> {
let processor = self.clone();
let process_fn = move |send_idle_on_drop| {
let process_fn = async move {
let executor = processor.executor.clone();
processor.handle_blocks_by_range_request(
executor,
send_idle_on_drop,
peer_id,
request_id,
request,
)
processor
.handle_blocks_by_range_request(executor, peer_id, request_id, request)
.await;
};

self.try_send(BeaconWorkEvent {
drop_during_sync: false,
work: Work::BlocksByRangeRequest(Box::new(process_fn)),
work: Work::BlocksByRangeRequest(Box::pin(process_fn)),
})
}

Expand All @@ -533,20 +529,16 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
request: BlocksByRootRequest,
) -> Result<(), Error<T::EthSpec>> {
let processor = self.clone();
let process_fn = move |send_idle_on_drop| {
let process_fn = async move {
let executor = processor.executor.clone();
processor.handle_blocks_by_root_request(
executor,
send_idle_on_drop,
peer_id,
request_id,
request,
)
processor
.handle_blocks_by_root_request(executor, peer_id, request_id, request)
.await;
};

self.try_send(BeaconWorkEvent {
drop_during_sync: false,
work: Work::BlocksByRootsRequest(Box::new(process_fn)),
work: Work::BlocksByRootsRequest(Box::pin(process_fn)),
})
}

Expand Down
Loading

0 comments on commit b6a1c86

Please sign in to comment.