Skip to content

Commit

Permalink
refactor(starknet_mempool_p2p): change MempoolP2pRunner to hold futur…
Browse files Browse the repository at this point in the history
…e intead of NetworkManager
  • Loading branch information
AlonLStarkWare committed Dec 16, 2024
1 parent 8fbf802 commit b8c21bf
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 16 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/starknet_mempool_p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ validator.workspace = true

[dev-dependencies]
futures.workspace = true
libp2p.workspace = true
papyrus_network = { workspace = true, features = ["testing"] }
papyrus_network_types = { workspace = true, features = ["testing"] }
papyrus_protobuf.workspace = true
Expand Down
3 changes: 2 additions & 1 deletion crates/starknet_mempool_p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ pub fn create_p2p_propagator_and_runner(
mempool_p2p_config.network_buffer_size,
)
.expect("Failed to register broadcast topic");
let network_future = network_manager.run();
let mempool_p2p_propagator = MempoolP2pPropagator::new(broadcast_topic_client.clone());
let mempool_p2p_runner = MempoolP2pRunner::new(
Some(network_manager),
Box::pin(network_future),
broadcasted_messages_receiver,
broadcast_topic_client,
gateway_client,
Expand Down
23 changes: 8 additions & 15 deletions crates/starknet_mempool_p2p/src/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
mod test;

use async_trait::async_trait;
use futures::future::BoxFuture;
use futures::stream::FuturesUnordered;
use futures::{pin_mut, StreamExt, TryFutureExt};
use futures::StreamExt;
use papyrus_network::network_manager::{
BroadcastTopicClient,
BroadcastTopicClientTrait,
BroadcastTopicServer,
NetworkManager,
NetworkError,
};
use papyrus_protobuf::mempool::RpcTransactionWrapper;
use starknet_gateway_types::communication::{GatewayClientError, SharedGatewayClient};
Expand All @@ -20,39 +21,31 @@ use starknet_sequencer_infra::errors::ComponentError;
use tracing::warn;

pub struct MempoolP2pRunner {
network_manager: Option<NetworkManager>,
network_future: BoxFuture<'static, Result<(), NetworkError>>,
broadcasted_topic_server: BroadcastTopicServer<RpcTransactionWrapper>,
broadcast_topic_client: BroadcastTopicClient<RpcTransactionWrapper>,
gateway_client: SharedGatewayClient,
}

impl MempoolP2pRunner {
pub fn new(
network_manager: Option<NetworkManager>,
network_future: BoxFuture<'static, Result<(), NetworkError>>,
broadcasted_topic_server: BroadcastTopicServer<RpcTransactionWrapper>,
broadcast_topic_client: BroadcastTopicClient<RpcTransactionWrapper>,
gateway_client: SharedGatewayClient,
) -> Self {
Self { network_manager, broadcasted_topic_server, broadcast_topic_client, gateway_client }
Self { network_future, broadcasted_topic_server, broadcast_topic_client, gateway_client }
}
}

#[async_trait]
impl ComponentStarter for MempoolP2pRunner {
async fn start(&mut self) -> Result<(), ComponentError> {
let network_future = self
.network_manager
.take()
.expect("Network manager not found")
.run()
.map_err(|_| ComponentError::InternalComponentError);
pin_mut!(network_future);
let mut gateway_futures = FuturesUnordered::new();
loop {
tokio::select! {
result = &mut network_future => {
result?;
panic!("Network stopped unexpectedly");
result = &mut self.network_future => {
return result.map_err(|_| ComponentError::InternalComponentError);
}
Some(result) = gateway_futures.next() => {
match result {
Expand Down

0 comments on commit b8c21bf

Please sign in to comment.