From b8c21bf2a4de109ddec117a7be1f3fcf82a8e821 Mon Sep 17 00:00:00 2001 From: Alon Lukatch Date: Mon, 16 Dec 2024 14:52:54 +0200 Subject: [PATCH] refactor(starknet_mempool_p2p): change MempoolP2pRunner to hold future intead of NetworkManager --- Cargo.lock | 1 + crates/starknet_mempool_p2p/Cargo.toml | 1 + crates/starknet_mempool_p2p/src/lib.rs | 3 ++- crates/starknet_mempool_p2p/src/runner/mod.rs | 23 +++++++------------ 4 files changed, 12 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ec72ce93fc..ec83c6e868 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10557,6 +10557,7 @@ version = "0.0.0" dependencies = [ "async-trait", "futures", + "libp2p", "papyrus_config", "papyrus_network", "papyrus_network_types", diff --git a/crates/starknet_mempool_p2p/Cargo.toml b/crates/starknet_mempool_p2p/Cargo.toml index 5e5b278e49..d934a4241f 100644 --- a/crates/starknet_mempool_p2p/Cargo.toml +++ b/crates/starknet_mempool_p2p/Cargo.toml @@ -26,6 +26,7 @@ validator.workspace = true [dev-dependencies] futures.workspace = true +libp2p.workspace = true papyrus_network = { workspace = true, features = ["testing"] } papyrus_network_types = { workspace = true, features = ["testing"] } papyrus_protobuf.workspace = true diff --git a/crates/starknet_mempool_p2p/src/lib.rs b/crates/starknet_mempool_p2p/src/lib.rs index 08150ddebb..1d6895f980 100644 --- a/crates/starknet_mempool_p2p/src/lib.rs +++ b/crates/starknet_mempool_p2p/src/lib.rs @@ -28,9 +28,10 @@ pub fn create_p2p_propagator_and_runner( mempool_p2p_config.network_buffer_size, ) .expect("Failed to register broadcast topic"); + let network_future = network_manager.run(); let mempool_p2p_propagator = MempoolP2pPropagator::new(broadcast_topic_client.clone()); let mempool_p2p_runner = MempoolP2pRunner::new( - Some(network_manager), + Box::pin(network_future), broadcasted_messages_receiver, broadcast_topic_client, gateway_client, diff --git a/crates/starknet_mempool_p2p/src/runner/mod.rs b/crates/starknet_mempool_p2p/src/runner/mod.rs index 8f2e6563cb..20ccd8359a 100644 --- a/crates/starknet_mempool_p2p/src/runner/mod.rs +++ b/crates/starknet_mempool_p2p/src/runner/mod.rs @@ -2,13 +2,14 @@ mod test; use async_trait::async_trait; +use futures::future::BoxFuture; use futures::stream::FuturesUnordered; -use futures::{pin_mut, StreamExt, TryFutureExt}; +use futures::StreamExt; use papyrus_network::network_manager::{ BroadcastTopicClient, BroadcastTopicClientTrait, BroadcastTopicServer, - NetworkManager, + NetworkError, }; use papyrus_protobuf::mempool::RpcTransactionWrapper; use starknet_gateway_types::communication::{GatewayClientError, SharedGatewayClient}; @@ -20,7 +21,7 @@ use starknet_sequencer_infra::errors::ComponentError; use tracing::warn; pub struct MempoolP2pRunner { - network_manager: Option, + network_future: BoxFuture<'static, Result<(), NetworkError>>, broadcasted_topic_server: BroadcastTopicServer, broadcast_topic_client: BroadcastTopicClient, gateway_client: SharedGatewayClient, @@ -28,31 +29,23 @@ pub struct MempoolP2pRunner { impl MempoolP2pRunner { pub fn new( - network_manager: Option, + network_future: BoxFuture<'static, Result<(), NetworkError>>, broadcasted_topic_server: BroadcastTopicServer, broadcast_topic_client: BroadcastTopicClient, gateway_client: SharedGatewayClient, ) -> Self { - Self { network_manager, broadcasted_topic_server, broadcast_topic_client, gateway_client } + Self { network_future, broadcasted_topic_server, broadcast_topic_client, gateway_client } } } #[async_trait] impl ComponentStarter for MempoolP2pRunner { async fn start(&mut self) -> Result<(), ComponentError> { - let network_future = self - .network_manager - .take() - .expect("Network manager not found") - .run() - .map_err(|_| ComponentError::InternalComponentError); - pin_mut!(network_future); let mut gateway_futures = FuturesUnordered::new(); loop { tokio::select! { - result = &mut network_future => { - result?; - panic!("Network stopped unexpectedly"); + result = &mut self.network_future => { + return result.map_err(|_| ComponentError::InternalComponentError); } Some(result) = gateway_futures.next() => { match result {