Skip to content

Commit

Permalink
chore(mempool_node): separate server struct
Browse files Browse the repository at this point in the history
commit-id:e8a80c55
  • Loading branch information
Itay-Tsabary-Starkware committed Sep 30, 2024
1 parent 56d4306 commit d88b77c
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 28 deletions.
4 changes: 2 additions & 2 deletions crates/gateway/src/communication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ use tracing::instrument;

use crate::gateway::Gateway;

pub type GatewayServer = LocalComponentServer<Gateway, GatewayRequest, GatewayResponse>;
pub type LocalGatewayServer = LocalComponentServer<Gateway, GatewayRequest, GatewayResponse>;

pub fn create_gateway_server(
gateway: Gateway,
rx_gateway: Receiver<GatewayRequestAndResponseSender>,
) -> GatewayServer {
) -> LocalGatewayServer {
LocalComponentServer::new(gateway, rx_gateway)
}

Expand Down
4 changes: 2 additions & 2 deletions crates/mempool/src/communication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ use tokio::sync::mpsc::Receiver;

use crate::mempool::Mempool;

pub type MempoolServer =
pub type LocalMempoolServer =
LocalComponentServer<MempoolCommunicationWrapper, MempoolRequest, MempoolResponse>;

pub fn create_mempool_server(
mempool: Mempool,
rx_mempool: Receiver<MempoolRequestAndResponseSender>,
) -> MempoolServer {
) -> LocalMempoolServer {
let communication_wrapper = MempoolCommunicationWrapper::new(mempool);
LocalComponentServer::new(communication_wrapper, rx_mempool)
}
Expand Down
1 change: 1 addition & 0 deletions crates/mempool_node/src/communication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::config::MempoolNodeConfig;

pub struct MempoolNodeCommunication {
batcher_channel: ComponentCommunication<BatcherRequestAndResponseSender>,
/// TODO(Tsabary): remove the redundant consensus_manager_channel.
consensus_manager_channel: ComponentCommunication<ConsensusManagerRequestAndResponseSender>,
mempool_channel: ComponentCommunication<MempoolRequestAndResponseSender>,
gateway_channel: ComponentCommunication<GatewayRequestAndResponseSender>,
Expand Down
67 changes: 46 additions & 21 deletions crates/mempool_node/src/servers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use starknet_consensus_manager::communication::{
create_consensus_manager_server,
ConsensusManagerServer,
};
use starknet_gateway::communication::{create_gateway_server, GatewayServer};
use starknet_gateway::communication::{create_gateway_server, LocalGatewayServer};
use starknet_http_server::communication::{create_http_server, HttpServer};
use starknet_mempool::communication::{create_mempool_server, MempoolServer};
use starknet_mempool::communication::{create_mempool_server, LocalMempoolServer};
use starknet_mempool_infra::errors::ComponentServerError;
use starknet_mempool_infra::starters::Startable;
use tracing::error;
Expand All @@ -18,12 +18,25 @@ use crate::communication::MempoolNodeCommunication;
use crate::components::Components;
use crate::config::MempoolNodeConfig;

pub struct Servers {
// Component servers that can run locally.
pub struct LocalServers {
pub batcher: Option<Box<LocalBatcherServer>>,
pub gateway: Option<Box<LocalGatewayServer>>,
pub mempool: Option<Box<LocalMempoolServer>>,
}

/// TODO(Tsabary): rename empty server to wrapper server.
// Component servers that wrap a component without a server.
pub struct WrapperServers {
pub consensus_manager: Option<Box<ConsensusManagerServer>>,
pub gateway: Option<Box<GatewayServer>>,
pub http_server: Option<Box<HttpServer>>,
pub mempool: Option<Box<MempoolServer>>,
}

/// TODO(Tsabary): make these fields private, currently public to support the outdated e2e test.
pub struct Servers {
pub local_servers: LocalServers,
pub wrapper_servers: WrapperServers,
}

pub fn create_servers(
Expand Down Expand Up @@ -70,41 +83,53 @@ pub fn create_servers(
None
};

Servers {
batcher: batcher_server,
consensus_manager: consensus_manager_server,
gateway: gateway_server,
http_server,
mempool: mempool_server,
}
let local_servers =
LocalServers { batcher: batcher_server, gateway: gateway_server, mempool: mempool_server };

let wrapper_servers =
WrapperServers { consensus_manager: consensus_manager_server, http_server };

Servers { local_servers, wrapper_servers }
}

pub async fn run_component_servers(
config: &MempoolNodeConfig,
servers: Servers,
) -> anyhow::Result<()> {
// Batcher server.
let batcher_future =
get_server_future("Batcher", config.components.batcher.execute, servers.batcher);
let batcher_future = get_server_future(
"Batcher",
config.components.batcher.execute,
servers.local_servers.batcher,
);

// Consensus Manager server.
let consensus_manager_future = get_server_future(
"Consensus Manager",
config.components.consensus_manager.execute,
servers.consensus_manager,
servers.wrapper_servers.consensus_manager,
);

// Gateway server.
let gateway_future =
get_server_future("Gateway", config.components.gateway.execute, servers.gateway);
let gateway_future = get_server_future(
"Gateway",
config.components.gateway.execute,
servers.local_servers.gateway,
);

// HttpServer server.
let http_server_future =
get_server_future("HttpServer", config.components.http_server.execute, servers.http_server);
let http_server_future = get_server_future(
"HttpServer",
config.components.http_server.execute,
servers.wrapper_servers.http_server,
);

// Mempool server.
let mempool_future =
get_server_future("Mempool", config.components.mempool.execute, servers.mempool);
let mempool_future = get_server_future(
"Mempool",
config.components.mempool.execute,
servers.local_servers.mempool,
);

// Start servers.
let batcher_handle = tokio::spawn(batcher_future);
Expand Down
7 changes: 4 additions & 3 deletions crates/tests-integration/src/integration_test_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ impl IntegrationTestSetup {
let HttpServerConfig { ip, port } = config.http_server_config;
let http_test_client = HttpTestClient::new(SocketAddr::from((ip, port)));

let gateway_future = get_server_future("Gateway", true, servers.gateway);
let gateway_future = get_server_future("Gateway", true, servers.local_servers.gateway);
let gateway_handle = task_executor.spawn_with_handle(gateway_future);

let http_server_future = get_server_future("HttpServer", true, servers.http_server);
let http_server_future =
get_server_future("HttpServer", true, servers.wrapper_servers.http_server);
let http_server_handle = task_executor.spawn_with_handle(http_server_future);

// Wait for server to spin up.
Expand All @@ -61,7 +62,7 @@ impl IntegrationTestSetup {
let batcher = MockBatcher::new(clients.get_mempool_client().unwrap());

// Build and run mempool.
let mempool_future = get_server_future("Mempool", true, servers.mempool);
let mempool_future = get_server_future("Mempool", true, servers.local_servers.mempool);
let mempool_handle = task_executor.spawn_with_handle(mempool_future);

Self {
Expand Down

0 comments on commit d88b77c

Please sign in to comment.