diff --git a/crates/papyrus_proc_macros/src/lib.rs b/crates/papyrus_proc_macros/src/lib.rs index 9f2f27a652..91a1c0a4e0 100644 --- a/crates/papyrus_proc_macros/src/lib.rs +++ b/crates/papyrus_proc_macros/src/lib.rs @@ -243,7 +243,7 @@ pub fn handle_response_variants(input: TokenStream) -> TokenStream { } = parse_macro_input!(input as HandleResponseVariantsMacroInput); let expanded = quote! { - match response { + match response? { #response_enum::#request_response_enum_var(Ok(response)) => Ok(response), #response_enum::#request_response_enum_var(Err(response)) => { Err(#component_client_error::#component_error(response)) diff --git a/crates/starknet_batcher_types/src/communication.rs b/crates/starknet_batcher_types/src/communication.rs index 838d37df1c..cf1157fa31 100644 --- a/crates/starknet_batcher_types/src/communication.rs +++ b/crates/starknet_batcher_types/src/communication.rs @@ -10,7 +10,10 @@ use starknet_sequencer_infra::component_client::{ LocalComponentClient, RemoteComponentClient, }; -use starknet_sequencer_infra::component_definitions::ComponentRequestAndResponseSender; +use starknet_sequencer_infra::component_definitions::{ + ComponentClient, + ComponentRequestAndResponseSender, +}; use thiserror::Error; use crate::batcher_types::{ @@ -100,7 +103,7 @@ pub enum BatcherClientError { impl BatcherClient for LocalBatcherClient { async fn build_proposal(&self, input: BuildProposalInput) -> BatcherClientResult<()> { let request = BatcherRequest::BuildProposal(input); - let response = self.send(request).await?; + let response = self.send(request).await; handle_response_variants!(BatcherResponse, BuildProposal, BatcherClientError, BatcherError) } @@ -109,7 +112,7 @@ impl BatcherClient for LocalBatcherClient { input: GetProposalContentInput, ) -> BatcherClientResult { let request = BatcherRequest::GetProposalContent(input); - let response = self.send(request).await?; + let response = self.send(request).await; handle_response_variants!( BatcherResponse, GetProposalContent, @@ -120,7 +123,7 @@ impl BatcherClient for LocalBatcherClient { async fn validate_proposal(&self, input: ValidateProposalInput) -> BatcherClientResult<()> { let request = BatcherRequest::ValidateProposal(input); - let response = self.send(request).await?; + let response = self.send(request).await; handle_response_variants!( BatcherResponse, ValidateProposal, @@ -134,7 +137,7 @@ impl BatcherClient for LocalBatcherClient { input: SendProposalContentInput, ) -> BatcherClientResult { let request = BatcherRequest::SendProposalContent(input); - let response = self.send(request).await?; + let response = self.send(request).await; handle_response_variants!( BatcherResponse, SendProposalContent, @@ -145,13 +148,13 @@ impl BatcherClient for LocalBatcherClient { async fn start_height(&self, input: StartHeightInput) -> BatcherClientResult<()> { let request = BatcherRequest::StartHeight(input); - let response = self.send(request).await?; + let response = self.send(request).await; handle_response_variants!(BatcherResponse, StartHeight, BatcherClientError, BatcherError) } async fn decision_reached(&self, input: DecisionReachedInput) -> BatcherClientResult<()> { let request = BatcherRequest::DecisionReached(input); - let response = self.send(request).await?; + let response = self.send(request).await; handle_response_variants!( BatcherResponse, DecisionReached, @@ -165,7 +168,7 @@ impl BatcherClient for LocalBatcherClient { impl BatcherClient for RemoteBatcherClient { async fn build_proposal(&self, input: BuildProposalInput) -> BatcherClientResult<()> { let request = BatcherRequest::BuildProposal(input); - let response = self.send(request).await?; + let response = self.send(request).await; handle_response_variants!(BatcherResponse, BuildProposal, BatcherClientError, BatcherError) } @@ -174,7 +177,7 @@ impl BatcherClient for RemoteBatcherClient { input: GetProposalContentInput, ) -> BatcherClientResult { let request = BatcherRequest::GetProposalContent(input); - let response = self.send(request).await?; + let response = self.send(request).await; handle_response_variants!( BatcherResponse, GetProposalContent, @@ -185,7 +188,7 @@ impl BatcherClient for RemoteBatcherClient { async fn validate_proposal(&self, input: ValidateProposalInput) -> BatcherClientResult<()> { let request = BatcherRequest::ValidateProposal(input); - let response = self.send(request).await?; + let response = self.send(request).await; handle_response_variants!( BatcherResponse, ValidateProposal, @@ -199,7 +202,7 @@ impl BatcherClient for RemoteBatcherClient { input: SendProposalContentInput, ) -> BatcherClientResult { let request = BatcherRequest::SendProposalContent(input); - let response = self.send(request).await?; + let response = self.send(request).await; handle_response_variants!( BatcherResponse, SendProposalContent, @@ -210,13 +213,13 @@ impl BatcherClient for RemoteBatcherClient { async fn start_height(&self, input: StartHeightInput) -> BatcherClientResult<()> { let request = BatcherRequest::StartHeight(input); - let response = self.send(request).await?; + let response = self.send(request).await; handle_response_variants!(BatcherResponse, StartHeight, BatcherClientError, BatcherError) } async fn decision_reached(&self, input: DecisionReachedInput) -> BatcherClientResult<()> { let request = BatcherRequest::DecisionReached(input); - let response = self.send(request).await?; + let response = self.send(request).await; handle_response_variants!( BatcherResponse, DecisionReached, diff --git a/crates/starknet_gateway_types/src/communication.rs b/crates/starknet_gateway_types/src/communication.rs index 72cf01fc36..e7228584ee 100644 --- a/crates/starknet_gateway_types/src/communication.rs +++ b/crates/starknet_gateway_types/src/communication.rs @@ -11,7 +11,10 @@ use starknet_sequencer_infra::component_client::{ LocalComponentClient, RemoteComponentClient, }; -use starknet_sequencer_infra::component_definitions::ComponentRequestAndResponseSender; +use starknet_sequencer_infra::component_definitions::{ + ComponentClient, + ComponentRequestAndResponseSender, +}; use thiserror::Error; use crate::errors::GatewayError; @@ -56,7 +59,7 @@ impl GatewayClient for LocalGatewayClient { #[instrument(skip(self))] async fn add_tx(&self, gateway_input: GatewayInput) -> GatewayClientResult { let request = GatewayRequest::AddTransaction(gateway_input); - let response = self.send(request).await?; + let response = self.send(request).await; handle_response_variants!(GatewayResponse, AddTransaction, GatewayClientError, GatewayError) } } @@ -66,7 +69,7 @@ impl GatewayClient for RemoteGatewayClient { #[instrument(skip(self))] async fn add_tx(&self, gateway_input: GatewayInput) -> GatewayClientResult { let request = GatewayRequest::AddTransaction(gateway_input); - let response = self.send(request).await?; + let response = self.send(request).await; handle_response_variants!(GatewayResponse, AddTransaction, GatewayClientError, GatewayError) } } diff --git a/crates/starknet_mempool_p2p_types/src/communication.rs b/crates/starknet_mempool_p2p_types/src/communication.rs index b5c58dd4ef..7e05ee3028 100644 --- a/crates/starknet_mempool_p2p_types/src/communication.rs +++ b/crates/starknet_mempool_p2p_types/src/communication.rs @@ -10,7 +10,10 @@ use starknet_sequencer_infra::component_client::{ LocalComponentClient, RemoteComponentClient, }; -use starknet_sequencer_infra::component_definitions::ComponentRequestAndResponseSender; +use starknet_sequencer_infra::component_definitions::{ + ComponentClient, + ComponentRequestAndResponseSender, +}; use thiserror::Error; use crate::errors::MempoolP2pPropagatorError; @@ -69,7 +72,7 @@ impl MempoolP2pPropagatorClient for LocalMempoolP2pPropagatorClient { transaction: RpcTransaction, ) -> MempoolP2pPropagatorClientResult<()> { let request = MempoolP2pPropagatorRequest::AddTransaction(transaction); - let response = self.send(request).await?; + let response = self.send(request).await; handle_response_variants!( MempoolP2pPropagatorResponse, AddTransaction, @@ -83,7 +86,7 @@ impl MempoolP2pPropagatorClient for LocalMempoolP2pPropagatorClient { propagation_metadata: BroadcastedMessageMetadata, ) -> MempoolP2pPropagatorClientResult<()> { let request = MempoolP2pPropagatorRequest::ContinuePropagation(propagation_metadata); - let response = self.send(request).await?; + let response = self.send(request).await; handle_response_variants!( MempoolP2pPropagatorResponse, ContinuePropagation, @@ -100,7 +103,7 @@ impl MempoolP2pPropagatorClient for RemoteMempoolP2pPropagatorClient { transaction: RpcTransaction, ) -> MempoolP2pPropagatorClientResult<()> { let request = MempoolP2pPropagatorRequest::AddTransaction(transaction); - let response = self.send(request).await?; + let response = self.send(request).await; handle_response_variants!( MempoolP2pPropagatorResponse, AddTransaction, @@ -114,7 +117,7 @@ impl MempoolP2pPropagatorClient for RemoteMempoolP2pPropagatorClient { propagation_metadata: BroadcastedMessageMetadata, ) -> MempoolP2pPropagatorClientResult<()> { let request = MempoolP2pPropagatorRequest::ContinuePropagation(propagation_metadata); - let response = self.send(request).await?; + let response = self.send(request).await; handle_response_variants!( MempoolP2pPropagatorResponse, ContinuePropagation, diff --git a/crates/starknet_mempool_types/src/communication.rs b/crates/starknet_mempool_types/src/communication.rs index 700933a1d7..8c17e21899 100644 --- a/crates/starknet_mempool_types/src/communication.rs +++ b/crates/starknet_mempool_types/src/communication.rs @@ -12,7 +12,10 @@ use starknet_sequencer_infra::component_client::{ LocalComponentClient, RemoteComponentClient, }; -use starknet_sequencer_infra::component_definitions::ComponentRequestAndResponseSender; +use starknet_sequencer_infra::component_definitions::{ + ComponentClient, + ComponentRequestAndResponseSender, +}; use thiserror::Error; use crate::errors::MempoolError; @@ -70,19 +73,19 @@ pub enum MempoolClientError { impl MempoolClient for LocalMempoolClient { async fn add_tx(&self, args: AddTransactionArgsWrapper) -> MempoolClientResult<()> { let request = MempoolRequest::AddTransaction(args); - let response = self.send(request).await?; + let response = self.send(request).await; handle_response_variants!(MempoolResponse, AddTransaction, MempoolClientError, MempoolError) } async fn commit_block(&self, args: CommitBlockArgs) -> MempoolClientResult<()> { let request = MempoolRequest::CommitBlock(args); - let response = self.send(request).await?; + let response = self.send(request).await; handle_response_variants!(MempoolResponse, CommitBlock, MempoolClientError, MempoolError) } async fn get_txs(&self, n_txs: usize) -> MempoolClientResult> { let request = MempoolRequest::GetTransactions(n_txs); - let response = self.send(request).await?; + let response = self.send(request).await; handle_response_variants!( MempoolResponse, GetTransactions, @@ -96,19 +99,19 @@ impl MempoolClient for LocalMempoolClient { impl MempoolClient for RemoteMempoolClient { async fn add_tx(&self, args: AddTransactionArgsWrapper) -> MempoolClientResult<()> { let request = MempoolRequest::AddTransaction(args); - let response = self.send(request).await?; + let response = self.send(request).await; handle_response_variants!(MempoolResponse, AddTransaction, MempoolClientError, MempoolError) } async fn commit_block(&self, args: CommitBlockArgs) -> MempoolClientResult<()> { let request = MempoolRequest::CommitBlock(args); - let response = self.send(request).await?; + let response = self.send(request).await; handle_response_variants!(MempoolResponse, CommitBlock, MempoolClientError, MempoolError) } async fn get_txs(&self, n_txs: usize) -> MempoolClientResult> { let request = MempoolRequest::GetTransactions(n_txs); - let response = self.send(request).await?; + let response = self.send(request).await; handle_response_variants!( MempoolResponse, GetTransactions, diff --git a/crates/starknet_sequencer_infra/src/component_client/local_component_client.rs b/crates/starknet_sequencer_infra/src/component_client/local_component_client.rs index 0a86518196..4724f85b05 100644 --- a/crates/starknet_sequencer_infra/src/component_client/local_component_client.rs +++ b/crates/starknet_sequencer_infra/src/component_client/local_component_client.rs @@ -1,10 +1,13 @@ use std::any::type_name; +use async_trait::async_trait; +use serde::de::DeserializeOwned; +use serde::Serialize; use tokio::sync::mpsc::{channel, Sender}; use tracing::info; use crate::component_client::ClientResult; -use crate::component_definitions::ComponentRequestAndResponseSender; +use crate::component_definitions::{ComponentClient, ComponentRequestAndResponseSender}; /// The `LocalComponentClient` struct is a generic client for sending component requests and /// receiving responses asynchronously. @@ -20,16 +23,22 @@ use crate::component_definitions::ComponentRequestAndResponseSender; /// # Example /// ```rust /// // Example usage of the LocalComponentClient +/// use serde::{Deserialize, Serialize}; /// use tokio::sync::mpsc::Sender; /// /// use crate::starknet_sequencer_infra::component_client::LocalComponentClient; -/// use crate::starknet_sequencer_infra::component_definitions::ComponentRequestAndResponseSender; +/// use crate::starknet_sequencer_infra::component_definitions::{ +/// ComponentClient, +/// ComponentRequestAndResponseSender, +/// }; /// /// // Define your request and response types +/// #[derive(Deserialize, Serialize)] /// struct MyRequest { /// pub content: String, /// } /// +/// #[derive(Deserialize, Serialize)] /// struct MyResponse { /// content: String, /// } @@ -71,8 +80,16 @@ where pub fn new(tx: Sender>) -> Self { Self { tx } } +} - pub async fn send(&self, request: Request) -> ClientResult { +#[async_trait] +impl ComponentClient + for LocalComponentClient +where + Request: Send + Sync + Serialize + DeserializeOwned, + Response: Send + Sync + Serialize + DeserializeOwned, +{ + async fn send(&self, request: Request) -> ClientResult { let (res_tx, mut res_rx) = channel::(1); let request_and_res_tx = ComponentRequestAndResponseSender { request, tx: res_tx }; self.tx.send(request_and_res_tx).await.expect("Outbound connection should be open."); diff --git a/crates/starknet_sequencer_infra/src/component_definitions.rs b/crates/starknet_sequencer_infra/src/component_definitions.rs index 2432391db3..6fd199b08c 100644 --- a/crates/starknet_sequencer_infra/src/component_definitions.rs +++ b/crates/starknet_sequencer_infra/src/component_definitions.rs @@ -6,12 +6,14 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use async_trait::async_trait; use papyrus_config::dumping::{ser_param, SerializeConfig}; use papyrus_config::{ParamPath, ParamPrivacyInput, SerializedParam}; +use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use thiserror::Error; use tokio::sync::mpsc::{Receiver, Sender}; use tracing::{error, info}; use validator::Validate; +use crate::component_client::ClientResult; use crate::errors::ComponentError; pub const APPLICATION_OCTET_STREAM: &str = "application/octet-stream"; @@ -25,6 +27,15 @@ pub trait ComponentRequestHandler { async fn handle_request(&mut self, request: Request) -> Response; } +#[async_trait] +pub trait ComponentClient +where + Request: Send + Sync + Serialize + DeserializeOwned, + Response: Send + Sync + Serialize + DeserializeOwned, +{ + async fn send(&self, request: Request) -> ClientResult; +} + #[async_trait] pub trait ComponentStarter { async fn start(&mut self) -> Result<(), ComponentError> { diff --git a/crates/starknet_sequencer_infra/src/component_server/remote_component_server.rs b/crates/starknet_sequencer_infra/src/component_server/remote_component_server.rs index 65e9080ffe..bbca70a5ed 100644 --- a/crates/starknet_sequencer_infra/src/component_server/remote_component_server.rs +++ b/crates/starknet_sequencer_infra/src/component_server/remote_component_server.rs @@ -11,7 +11,12 @@ use serde::de::DeserializeOwned; use serde::Serialize; use crate::component_client::{ClientError, LocalComponentClient}; -use crate::component_definitions::{RemoteServerConfig, ServerError, APPLICATION_OCTET_STREAM}; +use crate::component_definitions::{ + ComponentClient, + RemoteServerConfig, + ServerError, + APPLICATION_OCTET_STREAM, +}; use crate::component_server::ComponentServerStarter; use crate::errors::ComponentServerError; use crate::serde_utils::BincodeSerdeWrapper; diff --git a/crates/starknet_sequencer_infra/src/tests/local_component_client_server_test.rs b/crates/starknet_sequencer_infra/src/tests/local_component_client_server_test.rs index 1eba7b07c3..f369867796 100644 --- a/crates/starknet_sequencer_infra/src/tests/local_component_client_server_test.rs +++ b/crates/starknet_sequencer_infra/src/tests/local_component_client_server_test.rs @@ -4,7 +4,7 @@ use tokio::sync::mpsc::channel; use tokio::task; use crate::component_client::{ClientError, ClientResult, LocalComponentClient}; -use crate::component_definitions::ComponentRequestAndResponseSender; +use crate::component_definitions::{ComponentClient, ComponentRequestAndResponseSender}; use crate::component_server::{ComponentServerStarter, LocalComponentServer}; use crate::tests::{ test_a_b_functionality, diff --git a/crates/starknet_state_sync_types/src/communication.rs b/crates/starknet_state_sync_types/src/communication.rs index 36228a8735..7b4e406c14 100644 --- a/crates/starknet_state_sync_types/src/communication.rs +++ b/crates/starknet_state_sync_types/src/communication.rs @@ -9,7 +9,10 @@ use starknet_sequencer_infra::component_client::{ LocalComponentClient, RemoteComponentClient, }; -use starknet_sequencer_infra::component_definitions::ComponentRequestAndResponseSender; +use starknet_sequencer_infra::component_definitions::{ + ComponentClient, + ComponentRequestAndResponseSender, +}; use thiserror::Error; use crate::errors::StateSyncError; @@ -73,7 +76,7 @@ impl StateSyncClient for RemoteStateSyncClient { block_number: BlockNumber, ) -> StateSyncClientResult> { let request = StateSyncRequest::GetBlock(block_number); - let response = self.send(request).await?; + let response = self.send(request).await; handle_response_variants!(StateSyncResponse, GetBlock, StateSyncClientError, StateSyncError) } }