Skip to content

Commit

Permalink
test(starknet_mempool_p2p): test gateway client error on adding tx fr…
Browse files Browse the repository at this point in the history
…om p2p
  • Loading branch information
AlonLStarkWare committed Dec 16, 2024
1 parent b455530 commit fdaf396
Showing 1 changed file with 65 additions and 2 deletions.
67 changes: 65 additions & 2 deletions crates/starknet_mempool_p2p/src/runner/test.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;
use std::time::Duration;

use futures::SinkExt;
use futures::{SinkExt, StreamExt};
use papyrus_network::network_manager::test_utils::{
mock_register_broadcast_topic,
BroadcastNetworkMock,
Expand All @@ -14,7 +14,8 @@ use papyrus_protobuf::mempool::RpcTransactionWrapper;
use papyrus_test_utils::{get_rng, GetTestInstance};
use starknet_api::rpc_transaction::RpcTransaction;
use starknet_api::transaction::TransactionHash;
use starknet_gateway_types::communication::MockGatewayClient;
use starknet_gateway_types::communication::{GatewayClientError, MockGatewayClient};
use starknet_gateway_types::errors::{GatewayError, GatewaySpecError};
use starknet_sequencer_infra::component_definitions::ComponentStarter;

use super::MempoolP2pRunner;
Expand Down Expand Up @@ -69,3 +70,65 @@ async fn incoming_p2p_tx_reaches_gateway_client() {
res = add_tx_indicator_receiver => {res.unwrap()}
}
}

// The p2p runner receives a tx from network, and fails to forwards it to the gateway.
#[tokio::test]
async fn incoming_p2p_tx_fails_on_gateway_client() {
// Mock a network for the other node to send tx to our p2p runner using the subscriber channels.
let TestSubscriberChannels { mock_network, subscriber_channels } =
mock_register_broadcast_topic().expect("Failed to create mock network");
let BroadcastTopicChannels { broadcasted_messages_receiver, broadcast_topic_client } =
subscriber_channels; // used to created our node's p2p runner below, which will listen for incoming txs over broadcasted_messages_receiver.
let BroadcastNetworkMock {
broadcasted_messages_sender: mut mock_broadcasted_messages_sender,
reported_messages_receiver: mut mock_reported_messages_receiver,
..
} = mock_network; // other node sending tx to our p2p runner

// Creating a placeholder network manager with default config for init of a mempool receiver
let placeholder_network_manager = NetworkManager::new(NetworkConfig::default(), None);

// send an empty message on this channel to indicate that the tx reached the gateway client.
let (add_tx_indicator_sender, add_tx_indicator_receiver) = futures::channel::oneshot::channel();

let message_metadata = BroadcastedMessageMetadata::get_test_instance(&mut get_rng());
let message_metadata_clone = message_metadata.clone();
let expected_rpc_transaction =
RpcTransactionWrapper(RpcTransaction::get_test_instance(&mut get_rng()));

let mut mock_gateway_client = MockGatewayClient::new();
mock_gateway_client.expect_add_tx().return_once(move |_| {
add_tx_indicator_sender.send(()).unwrap();
Err(GatewayClientError::GatewayError(GatewayError::GatewaySpecError {
source: GatewaySpecError::DuplicateTx,
p2p_message_metadata: Some(message_metadata_clone),
}))
});
let mut mempool_p2p_runner = MempoolP2pRunner::new(
Some(placeholder_network_manager),
broadcasted_messages_receiver,
broadcast_topic_client,
Arc::new(mock_gateway_client),
);

// Sending the expected transaction to the mempool runner
let res = mock_broadcasted_messages_sender
.send((expected_rpc_transaction.clone(), message_metadata.clone()));

res.await.expect("Failed to send message");

tokio::select! {
// if the runner takes longer than 5 seconds to start, we panic.
// if the runner fails, there was a network issue => panic.
// if the runner returns successfully, we panic because the runner should never terminate.
res = tokio::time::timeout(Duration::from_secs(5), mempool_p2p_runner.start()) => {res.expect("Test timed out").expect("Runner failed - network stopped unexpectedly"); panic!("Runner terminated")},
// if a message was received on this oneshot channel, the gateway client received the tx.
res = add_tx_indicator_receiver => {
// if unwrap fails, the tx wasnt forwarded to the gateway client.
res.unwrap();
// After gateway client fails to add the tx, the p2p runner should have reported the peer.
let peer_reported = mock_reported_messages_receiver.next().await.expect("Failed to receive message");
assert_eq!(peer_reported, message_metadata.originator_id.private_get_peer_id())
}
}
}

0 comments on commit fdaf396

Please sign in to comment.