diff --git a/crates/sequencer_node/src/servers.rs b/crates/sequencer_node/src/servers.rs index af12788a67..bd69ed9129 100644 --- a/crates/sequencer_node/src/servers.rs +++ b/crates/sequencer_node/src/servers.rs @@ -46,75 +46,92 @@ pub struct SequencerNodeServers { wrapper_servers: WrapperServers, } -pub fn create_node_servers( +fn create_local_servers( config: &SequencerNodeConfig, communication: &mut SequencerNodeCommunication, - components: SequencerNodeComponents, -) -> SequencerNodeServers { + components: &mut SequencerNodeComponents, +) -> LocalServers { let batcher_server = match config.components.batcher.execution_mode { ComponentExecutionMode::LocalExecutionWithRemoteDisabled | ComponentExecutionMode::LocalExecutionWithRemoteEnabled => { Some(Box::new(LocalComponentServer::new( - components.batcher.expect("Batcher is not initialized."), + components.batcher.take().expect("Batcher is not initialized."), communication.take_batcher_rx(), ))) } ComponentExecutionMode::Disabled => None, }; - let consensus_manager_server = match config.components.consensus_manager.execution_mode { + let gateway_server = match config.components.gateway.execution_mode { ComponentExecutionMode::LocalExecutionWithRemoteDisabled | ComponentExecutionMode::LocalExecutionWithRemoteEnabled => { - Some(Box::new(WrapperServer::new( - components.consensus_manager.expect("Consensus Manager is not initialized."), + Some(Box::new(LocalComponentServer::new( + components.gateway.take().expect("Gateway is not initialized."), + communication.take_gateway_rx(), ))) } ComponentExecutionMode::Disabled => None, }; - let gateway_server = match config.components.gateway.execution_mode { + let mempool_server = match config.components.mempool.execution_mode { ComponentExecutionMode::LocalExecutionWithRemoteDisabled | ComponentExecutionMode::LocalExecutionWithRemoteEnabled => { Some(Box::new(LocalComponentServer::new( - components.gateway.expect("Gateway is not initialized."), - communication.take_gateway_rx(), + components.mempool.take().expect("Mempool is not initialized."), + communication.take_mempool_rx(), ))) } ComponentExecutionMode::Disabled => None, }; - let http_server = match config.components.http_server.execution_mode { + let mempool_p2p_propagator_server = match config.components.mempool_p2p.execution_mode { ComponentExecutionMode::LocalExecutionWithRemoteDisabled - | ComponentExecutionMode::LocalExecutionWithRemoteEnabled => Some(Box::new( - WrapperServer::new(components.http_server.expect("Http Server is not initialized.")), - )), + | ComponentExecutionMode::LocalExecutionWithRemoteEnabled => { + Some(Box::new(create_mempool_p2p_propagator_server( + components + .mempool_p2p_propagator + .take() + .expect("Mempool P2P Propagator is not initialized."), + communication.take_mempool_p2p_propagator_rx(), + ))) + } ComponentExecutionMode::Disabled => None, }; - let monitoring_endpoint_server = match config.components.monitoring_endpoint.execution_mode { + LocalServers { + batcher: batcher_server, + gateway: gateway_server, + mempool: mempool_server, + mempool_p2p_propagator: mempool_p2p_propagator_server, + } +} + +fn create_wrapper_servers( + config: &SequencerNodeConfig, + components: &mut SequencerNodeComponents, +) -> WrapperServers { + let consensus_manager_server = match config.components.consensus_manager.execution_mode { ComponentExecutionMode::LocalExecutionWithRemoteDisabled | ComponentExecutionMode::LocalExecutionWithRemoteEnabled => { Some(Box::new(WrapperServer::new( - components.monitoring_endpoint.expect("Monitoring Endpoint is not initialized."), + components.consensus_manager.take().expect("Consensus Manager is not initialized."), ))) } ComponentExecutionMode::Disabled => None, }; - let mempool_server = match config.components.mempool.execution_mode { + let http_server = match config.components.http_server.execution_mode { ComponentExecutionMode::LocalExecutionWithRemoteDisabled | ComponentExecutionMode::LocalExecutionWithRemoteEnabled => { - Some(Box::new(LocalComponentServer::new( - components.mempool.expect("Mempool is not initialized."), - communication.take_mempool_rx(), + Some(Box::new(WrapperServer::new( + components.http_server.take().expect("Http Server is not initialized."), ))) } ComponentExecutionMode::Disabled => None, }; - - let mempool_p2p_propagator_server = match config.components.mempool_p2p.execution_mode { + let monitoring_endpoint_server = match config.components.monitoring_endpoint.execution_mode { ComponentExecutionMode::LocalExecutionWithRemoteDisabled | ComponentExecutionMode::LocalExecutionWithRemoteEnabled => { - Some(Box::new(create_mempool_p2p_propagator_server( + Some(Box::new(WrapperServer::new( components - .mempool_p2p_propagator - .expect("Mempool P2P Propagator is not initialized."), - communication.take_mempool_p2p_propagator_rx(), + .monitoring_endpoint + .take() + .expect("Monitoring Endpoint is not initialized."), ))) } ComponentExecutionMode::Disabled => None, @@ -124,25 +141,30 @@ pub fn create_node_servers( ComponentExecutionMode::LocalExecutionWithRemoteDisabled | ComponentExecutionMode::LocalExecutionWithRemoteEnabled => { Some(Box::new(MempoolP2pRunnerServer::new( - components.mempool_p2p_runner.expect("Mempool P2P Runner is not initialized."), + components + .mempool_p2p_runner + .take() + .expect("Mempool P2P Runner is not initialized."), ))) } ComponentExecutionMode::Disabled => None, }; - - let local_servers = LocalServers { - batcher: batcher_server, - gateway: gateway_server, - mempool: mempool_server, - mempool_p2p_propagator: mempool_p2p_propagator_server, - }; - - let wrapper_servers = WrapperServers { + WrapperServers { consensus_manager: consensus_manager_server, http_server, monitoring_endpoint: monitoring_endpoint_server, mempool_p2p_runner: mempool_p2p_runner_server, - }; + } +} + +pub fn create_node_servers( + config: &SequencerNodeConfig, + communication: &mut SequencerNodeCommunication, + components: SequencerNodeComponents, +) -> SequencerNodeServers { + let mut components = components; + let local_servers = create_local_servers(config, communication, &mut components); + let wrapper_servers = create_wrapper_servers(config, &mut components); SequencerNodeServers { local_servers, wrapper_servers } }