Skip to content

Commit

Permalink
chore: use standard initialization for server creation
Browse files Browse the repository at this point in the history
commit-id:c439ca8b
  • Loading branch information
nadin-Starkware committed Nov 5, 2024
1 parent fcc6b10 commit bbba509
Show file tree
Hide file tree
Showing 8 changed files with 15 additions and 68 deletions.
9 changes: 0 additions & 9 deletions crates/batcher/src/communication.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,15 @@
use async_trait::async_trait;
use starknet_batcher_types::communication::{
BatcherRequest,
BatcherRequestAndResponseSender,
BatcherResponse,
};
use starknet_sequencer_infra::component_definitions::ComponentRequestHandler;
use starknet_sequencer_infra::component_server::LocalComponentServer;
use tokio::sync::mpsc::Receiver;

use crate::batcher::Batcher;

pub type LocalBatcherServer = LocalComponentServer<Batcher, BatcherRequest, BatcherResponse>;

pub fn create_local_batcher_server(
batcher: Batcher,
rx_batcher: Receiver<BatcherRequestAndResponseSender>,
) -> LocalBatcherServer {
LocalComponentServer::new(batcher, rx_batcher)
}

#[async_trait]
impl ComponentRequestHandler<BatcherRequest, BatcherResponse> for Batcher {
async fn handle_request(&mut self, request: BatcherRequest) -> BatcherResponse {
Expand Down
6 changes: 0 additions & 6 deletions crates/consensus_manager/src/communication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,3 @@ use starknet_sequencer_infra::component_server::WrapperServer;
use crate::consensus_manager::ConsensusManager;

pub type ConsensusManagerServer = WrapperServer<ConsensusManager>;

pub fn create_consensus_manager_server(
consensus_manager: ConsensusManager,
) -> ConsensusManagerServer {
WrapperServer::new(consensus_manager)
}
9 changes: 0 additions & 9 deletions crates/gateway/src/communication.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,17 @@
use async_trait::async_trait;
use starknet_gateway_types::communication::{
GatewayRequest,
GatewayRequestAndResponseSender,
GatewayResponse,
};
use starknet_gateway_types::errors::GatewayError;
use starknet_sequencer_infra::component_definitions::ComponentRequestHandler;
use starknet_sequencer_infra::component_server::LocalComponentServer;
use tokio::sync::mpsc::Receiver;
use tracing::instrument;

use crate::gateway::Gateway;

pub type LocalGatewayServer = LocalComponentServer<Gateway, GatewayRequest, GatewayResponse>;

pub fn create_gateway_server(
gateway: Gateway,
rx_gateway: Receiver<GatewayRequestAndResponseSender>,
) -> LocalGatewayServer {
LocalComponentServer::new(gateway, rx_gateway)
}

#[async_trait]
impl ComponentRequestHandler<GatewayRequest, GatewayResponse> for Gateway {
#[instrument(skip(self))]
Expand Down
6 changes: 1 addition & 5 deletions crates/http_server/src/communication.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
use starknet_sequencer_infra::component_server::{create_empty_server, WrapperServer};
use starknet_sequencer_infra::component_server::WrapperServer;

use crate::http_server::HttpServer as HttpServerComponent;

pub type HttpServer = WrapperServer<HttpServerComponent>;

pub fn create_http_server(http_server: HttpServerComponent) -> HttpServer {
create_empty_server(http_server)
}
9 changes: 0 additions & 9 deletions crates/mempool/src/communication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,18 @@ use starknet_mempool_p2p_types::communication::SharedMempoolP2pPropagatorClient;
use starknet_mempool_types::communication::{
AddTransactionArgsWrapper,
MempoolRequest,
MempoolRequestAndResponseSender,
MempoolResponse,
};
use starknet_mempool_types::errors::MempoolError;
use starknet_mempool_types::mempool_types::{CommitBlockArgs, MempoolResult};
use starknet_sequencer_infra::component_definitions::{ComponentRequestHandler, ComponentStarter};
use starknet_sequencer_infra::component_server::LocalComponentServer;
use tokio::sync::mpsc::Receiver;

use crate::mempool::Mempool;

pub type LocalMempoolServer =
LocalComponentServer<MempoolCommunicationWrapper, MempoolRequest, MempoolResponse>;

pub fn create_mempool_server(
mempool: MempoolCommunicationWrapper,
rx_mempool: Receiver<MempoolRequestAndResponseSender>,
) -> LocalMempoolServer {
LocalComponentServer::new(mempool, rx_mempool)
}

pub fn create_mempool(
mempool_p2p_propagator_client: SharedMempoolP2pPropagatorClient,
) -> MempoolCommunicationWrapper {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ impl<Component: ComponentStarter + Send> ComponentServerStarter for WrapperServe
}
}

pub fn create_empty_server<Component: Send>(component: Component) -> WrapperServer<Component> {
WrapperServer::new(component)
}

impl<Component> ComponentReplacer<Component> for WrapperServer<Component> {
fn replace(&mut self, component: Component) -> Result<(), ReplaceComponentError> {
self.component = component;
Expand Down
8 changes: 1 addition & 7 deletions crates/monitoring_endpoint/src/communication.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
use starknet_sequencer_infra::component_server::{create_empty_server, WrapperServer};
use starknet_sequencer_infra::component_server::WrapperServer;

use crate::monitoring_endpoint::MonitoringEndpoint;

pub type MonitoringEndpointServer = WrapperServer<MonitoringEndpoint>;

pub fn create_monitoring_endpoint_server(
monitoring_endpont: MonitoringEndpoint,
) -> MonitoringEndpointServer {
create_empty_server(monitoring_endpont)
}
32 changes: 13 additions & 19 deletions crates/sequencer_node/src/servers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,18 @@ use std::future::pending;
use std::pin::Pin;

use futures::{Future, FutureExt};
use starknet_batcher::communication::{create_local_batcher_server, LocalBatcherServer};
use starknet_consensus_manager::communication::{
create_consensus_manager_server,
ConsensusManagerServer,
};
use starknet_gateway::communication::{create_gateway_server, LocalGatewayServer};
use starknet_http_server::communication::{create_http_server, HttpServer};
use starknet_mempool::communication::{create_mempool_server, LocalMempoolServer};
use starknet_batcher::communication::LocalBatcherServer;
use starknet_consensus_manager::communication::ConsensusManagerServer;
use starknet_gateway::communication::LocalGatewayServer;
use starknet_http_server::communication::HttpServer;
use starknet_mempool::communication::LocalMempoolServer;
use starknet_mempool_p2p::propagator::{
create_mempool_p2p_propagator_server,
LocalMempoolP2pPropagatorServer,
};
use starknet_mempool_p2p::runner::MempoolP2pRunnerServer;
use starknet_monitoring_endpoint::communication::{
create_monitoring_endpoint_server,
MonitoringEndpointServer,
};
use starknet_sequencer_infra::component_server::ComponentServerStarter;
use starknet_monitoring_endpoint::communication::MonitoringEndpointServer;
use starknet_sequencer_infra::component_server::{ComponentServerStarter, LocalComponentServer, WrapperServer};
use starknet_sequencer_infra::errors::ComponentServerError;
use tracing::error;

Expand Down Expand Up @@ -56,7 +50,7 @@ pub fn create_node_servers(
let batcher_server = match config.components.batcher.execution_mode {
ComponentExecutionMode::LocalExecutionWithRemoteDisabled
| ComponentExecutionMode::LocalExecutionWithRemoteEnabled => {
Some(Box::new(create_local_batcher_server(
Some(Box::new(LocalComponentServer::new(
components.batcher.expect("Batcher is not initialized."),
communication.take_batcher_rx(),
)))
Expand All @@ -66,7 +60,7 @@ pub fn create_node_servers(
let consensus_manager_server = match config.components.consensus_manager.execution_mode {
ComponentExecutionMode::LocalExecutionWithRemoteDisabled
| ComponentExecutionMode::LocalExecutionWithRemoteEnabled => {
Some(Box::new(create_consensus_manager_server(
Some(Box::new(WrapperServer::new(
components.consensus_manager.expect("Consensus Manager is not initialized."),
)))
}
Expand All @@ -75,7 +69,7 @@ pub fn create_node_servers(
let gateway_server = match config.components.gateway.execution_mode {
ComponentExecutionMode::LocalExecutionWithRemoteDisabled
| ComponentExecutionMode::LocalExecutionWithRemoteEnabled => {
Some(Box::new(create_gateway_server(
Some(Box::new(LocalComponentServer::new(
components.gateway.expect("Gateway is not initialized."),
communication.take_gateway_rx(),
)))
Expand All @@ -85,14 +79,14 @@ pub fn create_node_servers(
let http_server = match config.components.http_server.execution_mode {
ComponentExecutionMode::LocalExecutionWithRemoteDisabled
| ComponentExecutionMode::LocalExecutionWithRemoteEnabled => Some(Box::new(
create_http_server(components.http_server.expect("Http Server is not initialized.")),
WrapperServer::new(components.http_server.expect("Http Server is not initialized.")),
)),
ComponentExecutionMode::Disabled => None,
};
let monitoring_endpoint_server = match config.components.monitoring_endpoint.execution_mode {
ComponentExecutionMode::LocalExecutionWithRemoteDisabled
| ComponentExecutionMode::LocalExecutionWithRemoteEnabled => {
Some(Box::new(create_monitoring_endpoint_server(
Some(Box::new(WrapperServer::new(
components.monitoring_endpoint.expect("Monitoring Endpoint is not initialized."),
)))
}
Expand All @@ -101,7 +95,7 @@ pub fn create_node_servers(
let mempool_server = match config.components.mempool.execution_mode {
ComponentExecutionMode::LocalExecutionWithRemoteDisabled
| ComponentExecutionMode::LocalExecutionWithRemoteEnabled => {
Some(Box::new(create_mempool_server(
Some(Box::new(LocalComponentServer::new(
components.mempool.expect("Mempool is not initialized."),
communication.take_mempool_rx(),
)))
Expand Down

0 comments on commit bbba509

Please sign in to comment.