Skip to content

Commit

Permalink
test(starknet_mempool_p2p): change p2p runner test to use automock fo…
Browse files Browse the repository at this point in the history
…r gateway client
  • Loading branch information
AlonLStarkWare committed Dec 16, 2024
1 parent 9314006 commit b455530
Showing 1 changed file with 28 additions and 37 deletions.
65 changes: 28 additions & 37 deletions crates/starknet_mempool_p2p/src/runner/test.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use futures::channel::mpsc::Sender;
use futures::stream::StreamExt;
use futures::SinkExt;
use papyrus_network::network_manager::test_utils::{
mock_register_broadcast_topic,
Expand All @@ -17,64 +14,58 @@ use papyrus_protobuf::mempool::RpcTransactionWrapper;
use papyrus_test_utils::{get_rng, GetTestInstance};
use starknet_api::rpc_transaction::RpcTransaction;
use starknet_api::transaction::TransactionHash;
use starknet_gateway_types::communication::{GatewayClient, GatewayClientResult};
use starknet_gateway_types::gateway_types::GatewayInput;
use starknet_gateway_types::communication::MockGatewayClient;
use starknet_sequencer_infra::component_definitions::ComponentStarter;
use tokio::time::sleep;

use super::MempoolP2pRunner;

// TODO(eitan): Make it an automock
#[derive(Clone)]
struct MockGatewayClient {
add_tx_sender: Sender<RpcTransaction>,
}

#[async_trait]
impl GatewayClient for MockGatewayClient {
async fn add_tx(&self, gateway_input: GatewayInput) -> GatewayClientResult<TransactionHash> {
let _ = self.clone().add_tx_sender.send(gateway_input.rpc_tx).await;
Ok(TransactionHash::default())
}
}

// The p2p runner receives a tx from network, and forwards it to the gateway.
#[tokio::test]
async fn start_component_receive_tx_happy_flow() {
async fn incoming_p2p_tx_reaches_gateway_client() {
// Mock a network for the other node to send tx to our p2p runner using the subscriber channels.
let TestSubscriberChannels { mock_network, subscriber_channels } =
mock_register_broadcast_topic().expect("Failed to create mock network");
let BroadcastTopicChannels { broadcasted_messages_receiver, broadcast_topic_client } =
subscriber_channels;
subscriber_channels; // used to created our node's p2p runner below, which will listen for incoming txs over broadcasted_messages_receiver.
let BroadcastNetworkMock {
broadcasted_messages_sender: mut mock_broadcasted_messages_sender,
..
} = mock_network;
} = mock_network; // other node sending tx to our p2p runner

// Creating a placeholder network manager with default config for init of a mempool receiver
let placeholder_network_manager = NetworkManager::new(NetworkConfig::default(), None);
let (add_tx_sender, mut add_tx_receiver) = futures::channel::mpsc::channel(1);
let mock_gateway_client = Arc::new(MockGatewayClient { add_tx_sender });

// send an empty message on this channel to indicate that the tx reached the gateway client.
let (add_tx_indicator_sender, add_tx_indicator_receiver) = futures::channel::oneshot::channel();

let mut mock_gateway_client = MockGatewayClient::new();
mock_gateway_client.expect_add_tx().return_once(move |_| {
add_tx_indicator_sender.send(()).unwrap();
Ok(TransactionHash::default())
});
let mut mempool_p2p_runner = MempoolP2pRunner::new(
Some(placeholder_network_manager),
broadcasted_messages_receiver,
broadcast_topic_client,
mock_gateway_client,
broadcasted_messages_receiver, // listen to incoming tx
broadcast_topic_client, // broadcast tx or report peer
Arc::new(mock_gateway_client),
);

let message_metadata = BroadcastedMessageMetadata::get_test_instance(&mut get_rng());
let expected_rpc_transaction =
RpcTransactionWrapper(RpcTransaction::get_test_instance(&mut get_rng()));

// Sending the expected transaction to the mempool receiver
// Sending the expected transaction to the mempool runner
let res =
mock_broadcasted_messages_sender.send((expected_rpc_transaction.clone(), message_metadata));

res.await.expect("Failed to send message");

tokio::select! {
_ = mempool_p2p_runner.start() => {panic!("Mempool receiver failed to start");}
actual_rpc_transaction = add_tx_receiver.next() => {
assert_eq!(actual_rpc_transaction, Some(expected_rpc_transaction.0));
}
_ = sleep(Duration::from_secs(5)) => {
panic!("Test timed out");
}
// if the runner takes longer than 5 seconds to start, we panic.
// if the runner fails, there was a network issue => panic.
// if the runner returns successfully, we panic because the runner should never terminate.
res = tokio::time::timeout(Duration::from_secs(5), mempool_p2p_runner.start()) => {res.expect("Test timed out").expect("Runner failed - network stopped unexpectedly"); panic!("Runner terminated")},
// if a message was received on this oneshot channel, the gateway client received the tx.
res = add_tx_indicator_receiver => {res.unwrap()}
}
}
// TODO(eitan): Add test for when the gateway client fails to add the transaction

0 comments on commit b455530

Please sign in to comment.