From 995a89a40ae6fa4e008cdde66596199ad1582a76 Mon Sep 17 00:00:00 2001 From: giladchase Date: Tue, 17 Dec 2024 10:52:26 +0200 Subject: [PATCH] chore(starknet_l1_provider): use client (#2649) - in batcher: replace their dummy l1 provider client with the real thing. The only effective change in behavior is that `validate` now returns an enum instead of a bool. - changes in sequencer_node are boilerplate, did nothing different from other similar nodes. - Added a field in the l1 config, currently unused, in order to make the config test pass which seems to require at least one field. Co-Authored-By: Gilad Chase --- Cargo.lock | 4 ++ config/sequencer/default_config.json | 55 +++++++++++++++++++ crates/starknet_batcher/Cargo.toml | 2 + crates/starknet_batcher/src/batcher.rs | 23 ++++---- crates/starknet_batcher/src/batcher_test.rs | 4 ++ .../src/transaction_provider.rs | 52 +++++++----------- .../src/transaction_provider_test.rs | 15 ++--- .../src/config_utils.rs | 1 + crates/starknet_l1_provider/Cargo.toml | 1 + crates/starknet_l1_provider/src/lib.rs | 22 +++++++- crates/starknet_sequencer_node/Cargo.toml | 2 + crates/starknet_sequencer_node/src/clients.rs | 22 ++++++++ .../src/communication.rs | 17 ++++++ .../starknet_sequencer_node/src/components.rs | 16 +++++- .../src/config/component_config.rs | 3 + .../src/config/node_config.rs | 4 ++ crates/starknet_sequencer_node/src/servers.rs | 33 ++++++++++- 17 files changed, 223 insertions(+), 53 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ec72ce93fc..f7b39f1bab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10279,6 +10279,7 @@ dependencies = [ "starknet-types-core", "starknet_api", "starknet_batcher_types", + "starknet_l1_provider_types", "starknet_mempool_types", "starknet_sequencer_infra", "starknet_state_sync_types", @@ -10502,6 +10503,7 @@ dependencies = [ "async-trait", "indexmap 2.6.0", "papyrus_base_layer", + "papyrus_config", "pretty_assertions", "serde", "starknet_api", @@ -10690,6 +10692,8 @@ dependencies = [ "starknet_gateway", "starknet_gateway_types", "starknet_http_server", + "starknet_l1_provider", + "starknet_l1_provider_types", "starknet_mempool", "starknet_mempool_p2p", "starknet_mempool_p2p_types", diff --git a/config/sequencer/default_config.json b/config/sequencer/default_config.json index 7e7fc21d4d..cb80d18bc1 100644 --- a/config/sequencer/default_config.json +++ b/config/sequencer/default_config.json @@ -369,6 +369,56 @@ "privacy": "Public", "value": "0.0.0.0:8080" }, + "components.l1_provider.execution_mode": { + "description": "The component execution mode.", + "privacy": "Public", + "value": "LocalExecutionWithRemoteDisabled" + }, + "components.l1_provider.local_server_config.#is_none": { + "description": "Flag for an optional field.", + "privacy": "TemporaryValue", + "value": false + }, + "components.l1_provider.local_server_config.channel_buffer_size": { + "description": "The communication channel buffer size.", + "privacy": "Public", + "value": 32 + }, + "components.l1_provider.remote_client_config.#is_none": { + "description": "Flag for an optional field.", + "privacy": "TemporaryValue", + "value": true + }, + "components.l1_provider.remote_client_config.idle_connections": { + "description": "The maximum number of idle connections to keep alive.", + "privacy": "Public", + "value": 18446744073709551615 + }, + "components.l1_provider.remote_client_config.idle_timeout": { + "description": "The duration in seconds to keep an idle connection open before closing.", + "privacy": "Public", + "value": 90 + }, + "components.l1_provider.remote_client_config.retries": { + "description": "The max number of retries for sending a message.", + "privacy": "Public", + "value": 3 + }, + "components.l1_provider.remote_client_config.socket": { + "description": "The remote component server socket.", + "privacy": "Public", + "value": "0.0.0.0:8080" + }, + "components.l1_provider.remote_server_config.#is_none": { + "description": "Flag for an optional field.", + "privacy": "TemporaryValue", + "value": true + }, + "components.l1_provider.remote_server_config.socket": { + "description": "The remote component server socket.", + "privacy": "Public", + "value": "0.0.0.0:8080" + }, "components.mempool.execution_mode": { "description": "The component execution mode.", "privacy": "Public", @@ -784,6 +834,11 @@ "privacy": "Public", "value": 8080 }, + "l1_provider_config._poll_interval": { + "description": "Interval in milliseconds between each scraping attempt of L1.", + "privacy": "Public", + "value": 100 + }, "mempool_p2p_config.network_buffer_size": { "description": "Network buffer size.", "privacy": "Public", diff --git a/crates/starknet_batcher/Cargo.toml b/crates/starknet_batcher/Cargo.toml index ad23321bc9..68d0c13c02 100644 --- a/crates/starknet_batcher/Cargo.toml +++ b/crates/starknet_batcher/Cargo.toml @@ -19,6 +19,7 @@ papyrus_storage.workspace = true serde.workspace = true starknet_api.workspace = true starknet_batcher_types.workspace = true +starknet_l1_provider_types.workspace = true starknet_mempool_types.workspace = true starknet_sequencer_infra.workspace = true starknet_state_sync_types.workspace = true @@ -36,4 +37,5 @@ mockall.workspace = true rstest.workspace = true starknet-types-core.workspace = true starknet_api = { workspace = true, features = ["testing"] } +starknet_l1_provider_types = { workspace = true, features = ["testing"] } starknet_mempool_types = { workspace = true, features = ["testing"] } diff --git a/crates/starknet_batcher/src/batcher.rs b/crates/starknet_batcher/src/batcher.rs index 016be04882..66b853ff5f 100644 --- a/crates/starknet_batcher/src/batcher.rs +++ b/crates/starknet_batcher/src/batcher.rs @@ -28,6 +28,7 @@ use starknet_batcher_types::batcher_types::{ ValidateBlockInput, }; use starknet_batcher_types::errors::BatcherError; +use starknet_l1_provider_types::SharedL1ProviderClient; use starknet_mempool_types::communication::SharedMempoolClient; use starknet_mempool_types::mempool_types::CommitBlockArgs; use starknet_sequencer_infra::component_definitions::ComponentStarter; @@ -42,11 +43,7 @@ use crate::block_builder::{ }; use crate::config::BatcherConfig; use crate::proposal_manager::{GenerateProposalError, ProposalManager, ProposalManagerTrait}; -use crate::transaction_provider::{ - DummyL1ProviderClient, - ProposeTransactionProvider, - ValidateTransactionProvider, -}; +use crate::transaction_provider::{ProposeTransactionProvider, ValidateTransactionProvider}; use crate::utils::{ deadline_as_instant, proposal_status_from, @@ -62,6 +59,7 @@ pub struct Batcher { pub config: BatcherConfig, pub storage_reader: Arc, pub storage_writer: Box, + pub l1_provider_client: SharedL1ProviderClient, pub mempool_client: SharedMempoolClient, active_height: Option, @@ -77,6 +75,7 @@ impl Batcher { config: BatcherConfig, storage_reader: Arc, storage_writer: Box, + l1_provider_client: SharedL1ProviderClient, mempool_client: SharedMempoolClient, block_builder_factory: Box, proposal_manager: Box, @@ -85,6 +84,7 @@ impl Batcher { config: config.clone(), storage_reader, storage_writer, + l1_provider_client, mempool_client, active_height: None, block_builder_factory, @@ -136,8 +136,7 @@ impl Batcher { let tx_provider = ProposeTransactionProvider::new( self.mempool_client.clone(), - // TODO: use a real L1 provider client. - Arc::new(DummyL1ProviderClient), + self.l1_provider_client.clone(), self.config.max_l1_handler_txs_per_block_proposal, ); @@ -186,8 +185,7 @@ impl Batcher { let tx_provider = ValidateTransactionProvider { tx_receiver: input_tx_receiver, - // TODO: use a real L1 provider client. - l1_provider_client: Arc::new(DummyL1ProviderClient), + l1_provider_client: self.l1_provider_client.clone(), }; let (block_builder, abort_signal_sender) = self @@ -429,7 +427,11 @@ impl Batcher { } } -pub fn create_batcher(config: BatcherConfig, mempool_client: SharedMempoolClient) -> Batcher { +pub fn create_batcher( + config: BatcherConfig, + mempool_client: SharedMempoolClient, + l1_provider_client: SharedL1ProviderClient, +) -> Batcher { let (storage_reader, storage_writer) = papyrus_storage::open_storage(config.storage.clone()) .expect("Failed to open batcher's storage"); @@ -445,6 +447,7 @@ pub fn create_batcher(config: BatcherConfig, mempool_client: SharedMempoolClient config, storage_reader, storage_writer, + l1_provider_client, mempool_client, block_builder_factory, proposal_manager, diff --git a/crates/starknet_batcher/src/batcher_test.rs b/crates/starknet_batcher/src/batcher_test.rs index 751c3cd264..e55d76bf45 100644 --- a/crates/starknet_batcher/src/batcher_test.rs +++ b/crates/starknet_batcher/src/batcher_test.rs @@ -36,6 +36,7 @@ use starknet_batcher_types::batcher_types::{ ValidateBlockInput, }; use starknet_batcher_types::errors::BatcherError; +use starknet_l1_provider_types::MockL1ProviderClient; use starknet_mempool_types::communication::MockMempoolClient; use starknet_mempool_types::mempool_types::CommitBlockArgs; use starknet_state_sync_types::state_sync_types::SyncBlock; @@ -105,6 +106,7 @@ struct MockDependencies { storage_reader: MockBatcherStorageReaderTrait, storage_writer: MockBatcherStorageWriterTrait, mempool_client: MockMempoolClient, + l1_provider_client: MockL1ProviderClient, proposal_manager: MockProposalManagerTraitWrapper, block_builder_factory: MockBlockBuilderFactoryTrait, } @@ -116,6 +118,7 @@ impl Default for MockDependencies { Self { storage_reader, storage_writer: MockBatcherStorageWriterTrait::new(), + l1_provider_client: MockL1ProviderClient::new(), mempool_client: MockMempoolClient::new(), proposal_manager: MockProposalManagerTraitWrapper::new(), block_builder_factory: MockBlockBuilderFactoryTrait::new(), @@ -128,6 +131,7 @@ fn create_batcher(mock_dependencies: MockDependencies) -> Batcher { BatcherConfig { outstream_content_buffer_size: STREAMING_CHUNK_SIZE, ..Default::default() }, Arc::new(mock_dependencies.storage_reader), Box::new(mock_dependencies.storage_writer), + Arc::new(mock_dependencies.l1_provider_client), Arc::new(mock_dependencies.mempool_client), Box::new(mock_dependencies.block_builder_factory), Box::new(mock_dependencies.proposal_manager), diff --git a/crates/starknet_batcher/src/transaction_provider.rs b/crates/starknet_batcher/src/transaction_provider.rs index 402cd51317..009f9d90ea 100644 --- a/crates/starknet_batcher/src/transaction_provider.rs +++ b/crates/starknet_batcher/src/transaction_provider.rs @@ -1,15 +1,15 @@ use std::cmp::min; -use std::sync::Arc; use std::vec; use async_trait::async_trait; #[cfg(test)] use mockall::automock; -use starknet_api::executable_transaction::{L1HandlerTransaction, Transaction}; +use starknet_api::executable_transaction::Transaction; use starknet_api::transaction::TransactionHash; +use starknet_l1_provider_types::errors::L1ProviderClientError; +use starknet_l1_provider_types::{SharedL1ProviderClient, ValidationStatus as L1ValidationStatus}; use starknet_mempool_types::communication::{MempoolClientError, SharedMempoolClient}; use thiserror::Error; -use tracing::warn; type TransactionProviderResult = Result; @@ -19,6 +19,8 @@ pub enum TransactionProviderError { MempoolError(#[from] MempoolClientError), #[error("L1Handler transaction validation failed for tx with hash {0}.")] L1HandlerTransactionValidationFailed(TransactionHash), + #[error(transparent)] + L1ProviderError(#[from] L1ProviderClientError), } #[derive(Debug, PartialEq)] @@ -64,8 +66,17 @@ impl ProposeTransactionProvider { } } - fn get_l1_handler_txs(&mut self, n_txs: usize) -> Vec { - self.l1_provider_client.get_txs(n_txs).into_iter().map(Transaction::L1Handler).collect() + async fn get_l1_handler_txs( + &mut self, + n_txs: usize, + ) -> TransactionProviderResult> { + Ok(self + .l1_provider_client + .get_txs(n_txs) + .await? + .into_iter() + .map(Transaction::L1Handler) + .collect()) } async fn get_mempool_txs( @@ -90,7 +101,7 @@ impl TransactionProvider for ProposeTransactionProvider { if self.phase == TxProviderPhase::L1 { let n_l1handler_txs_to_get = min(self.max_l1_handler_txs_per_block - self.n_l1handler_txs_so_far, n_txs); - let mut l1handler_txs = self.get_l1_handler_txs(n_l1handler_txs_to_get); + let mut l1handler_txs = self.get_l1_handler_txs(n_l1handler_txs_to_get).await?; self.n_l1handler_txs_so_far += l1handler_txs.len(); // Determine whether we need to switch to mempool phase. @@ -131,7 +142,9 @@ impl TransactionProvider for ValidateTransactionProvider { } for tx in &buffer { if let Transaction::L1Handler(tx) = tx { - if !self.l1_provider_client.validate(tx) { + let l1_validation_status = self.l1_provider_client.validate(tx.tx_hash).await?; + if l1_validation_status != L1ValidationStatus::Validated { + // TODO: add the validation status into the error. return Err(TransactionProviderError::L1HandlerTransactionValidationFailed( tx.tx_hash, )); @@ -141,28 +154,3 @@ impl TransactionProvider for ValidateTransactionProvider { Ok(NextTxs::Txs(buffer)) } } - -// TODO: Remove L1Provider code when the communication module of l1_provider is added. -#[cfg_attr(test, automock)] -#[async_trait] -pub trait L1ProviderClient: Send + Sync { - fn get_txs(&self, n_txs: usize) -> Vec; - fn validate(&self, tx: &L1HandlerTransaction) -> bool; -} - -pub type SharedL1ProviderClient = Arc; - -pub struct DummyL1ProviderClient; - -#[async_trait] -impl L1ProviderClient for DummyL1ProviderClient { - fn get_txs(&self, _n_txs: usize) -> Vec { - warn!("Dummy L1 provider client is used, no L1 transactions are provided."); - vec![] - } - - fn validate(&self, _tx: &L1HandlerTransaction) -> bool { - warn!("Dummy L1 provider client is used, tx is not really validated."); - true - } -} diff --git a/crates/starknet_batcher/src/transaction_provider_test.rs b/crates/starknet_batcher/src/transaction_provider_test.rs index fc7a5f341f..27a250f770 100644 --- a/crates/starknet_batcher/src/transaction_provider_test.rs +++ b/crates/starknet_batcher/src/transaction_provider_test.rs @@ -6,10 +6,10 @@ use rstest::{fixture, rstest}; use starknet_api::executable_transaction::{L1HandlerTransaction, Transaction}; use starknet_api::test_utils::invoke::{executable_invoke_tx, InvokeTxArgs}; use starknet_api::tx_hash; +use starknet_l1_provider_types::{MockL1ProviderClient, ValidationStatus as L1ValidationStatus}; use starknet_mempool_types::communication::MockMempoolClient; use crate::transaction_provider::{ - MockL1ProviderClient, NextTxs, ProposeTransactionProvider, TransactionProvider, @@ -33,7 +33,7 @@ impl MockDependencies { self.l1_provider_client .expect_get_txs() .with(eq(n_to_request)) - .returning(move |_| vec![L1HandlerTransaction::default(); n_to_return]); + .returning(move |_| Ok(vec![L1HandlerTransaction::default(); n_to_return])); } fn expect_get_mempool_txs(&mut self, n_to_request: usize) { @@ -42,11 +42,11 @@ impl MockDependencies { }); } - fn expect_validate_l1handler(&mut self, tx: L1HandlerTransaction, result: bool) { + fn expect_validate_l1handler(&mut self, tx: L1HandlerTransaction, result: L1ValidationStatus) { self.l1_provider_client .expect_validate() - .withf(move |tx_arg| tx_arg == &tx) - .returning(move |_| result); + .withf(move |tx_arg| tx_arg == &tx.tx_hash) + .returning(move |_| Ok(result)); } async fn simulate_input_txs(&mut self, txs: Vec) { @@ -163,7 +163,7 @@ async fn no_more_l1_handler(mut mock_dependencies: MockDependencies) { #[tokio::test] async fn validate_flow(mut mock_dependencies: MockDependencies) { let test_tx = test_l1handler_tx(); - mock_dependencies.expect_validate_l1handler(test_tx.clone(), true); + mock_dependencies.expect_validate_l1handler(test_tx.clone(), L1ValidationStatus::Validated); mock_dependencies .simulate_input_txs(vec![ Transaction::L1Handler(test_tx), @@ -183,7 +183,8 @@ async fn validate_flow(mut mock_dependencies: MockDependencies) { #[tokio::test] async fn validate_fails(mut mock_dependencies: MockDependencies) { let test_tx = test_l1handler_tx(); - mock_dependencies.expect_validate_l1handler(test_tx.clone(), false); + mock_dependencies + .expect_validate_l1handler(test_tx.clone(), L1ValidationStatus::AlreadyIncludedOnL2); mock_dependencies .simulate_input_txs(vec![ Transaction::L1Handler(test_tx), diff --git a/crates/starknet_integration_tests/src/config_utils.rs b/crates/starknet_integration_tests/src/config_utils.rs index 09fef5ddfa..de792df733 100644 --- a/crates/starknet_integration_tests/src/config_utils.rs +++ b/crates/starknet_integration_tests/src/config_utils.rs @@ -137,6 +137,7 @@ pub async fn get_http_only_component_config(gateway_socket: SocketAddr) -> Compo mempool: get_disabled_component_config(), mempool_p2p: get_disabled_component_config(), state_sync: get_disabled_component_config(), + l1_provider: get_disabled_component_config(), } } diff --git a/crates/starknet_l1_provider/Cargo.toml b/crates/starknet_l1_provider/Cargo.toml index 60a979bbd1..89e98570a6 100644 --- a/crates/starknet_l1_provider/Cargo.toml +++ b/crates/starknet_l1_provider/Cargo.toml @@ -12,6 +12,7 @@ testing = [] async-trait.workspace = true indexmap.workspace = true papyrus_base_layer.workspace = true +papyrus_config.workspace = true serde.workspace = true starknet_api.workspace = true starknet_l1_provider_types.workspace = true diff --git a/crates/starknet_l1_provider/src/lib.rs b/crates/starknet_l1_provider/src/lib.rs index cd3b7d8f75..217827a184 100644 --- a/crates/starknet_l1_provider/src/lib.rs +++ b/crates/starknet_l1_provider/src/lib.rs @@ -4,7 +4,13 @@ pub mod errors; #[cfg(test)] pub mod test_utils; +use std::collections::BTreeMap; +use std::time::Duration; + use indexmap::{IndexMap, IndexSet}; +use papyrus_config::converters::deserialize_milliseconds_to_duration; +use papyrus_config::dumping::{ser_param, SerializeConfig}; +use papyrus_config::{ParamPath, ParamPrivacyInput, SerializedParam}; use serde::{Deserialize, Serialize}; use starknet_api::executable_transaction::L1HandlerTransaction; use starknet_api::transaction::TransactionHash; @@ -190,7 +196,21 @@ impl std::fmt::Display for ProviderState { } #[derive(Clone, Debug, Default, Serialize, Deserialize, Validate, PartialEq)] -pub struct L1ProviderConfig; +pub struct L1ProviderConfig { + #[serde(deserialize_with = "deserialize_milliseconds_to_duration")] + pub _poll_interval: Duration, +} + +impl SerializeConfig for L1ProviderConfig { + fn dump(&self) -> BTreeMap { + BTreeMap::from([ser_param( + "_poll_interval", + &Duration::from_millis(100).as_millis(), + "Interval in milliseconds between each scraping attempt of L1.", + ParamPrivacyInput::Public, + )]) + } +} pub fn create_l1_provider(_config: L1ProviderConfig) -> L1Provider { L1Provider { state: ProviderState::Propose, ..Default::default() } diff --git a/crates/starknet_sequencer_node/Cargo.toml b/crates/starknet_sequencer_node/Cargo.toml index 91dbee0d88..c2707768ba 100644 --- a/crates/starknet_sequencer_node/Cargo.toml +++ b/crates/starknet_sequencer_node/Cargo.toml @@ -29,6 +29,8 @@ starknet_consensus_manager.workspace = true starknet_gateway.workspace = true starknet_gateway_types.workspace = true starknet_http_server.workspace = true +starknet_l1_provider.workspace = true +starknet_l1_provider_types.workspace = true starknet_mempool.workspace = true starknet_mempool_p2p.workspace = true starknet_mempool_p2p_types.workspace = true diff --git a/crates/starknet_sequencer_node/src/clients.rs b/crates/starknet_sequencer_node/src/clients.rs index 7f2811bfb3..0473cf3b05 100644 --- a/crates/starknet_sequencer_node/src/clients.rs +++ b/crates/starknet_sequencer_node/src/clients.rs @@ -14,6 +14,8 @@ use starknet_gateway_types::communication::{ RemoteGatewayClient, SharedGatewayClient, }; +use starknet_l1_provider::communication::{LocalL1ProviderClient, RemoteL1ProviderClient}; +use starknet_l1_provider_types::{L1ProviderRequest, L1ProviderResponse, SharedL1ProviderClient}; use starknet_mempool_p2p_types::communication::{ LocalMempoolP2pPropagatorClient, MempoolP2pPropagatorRequest, @@ -48,6 +50,7 @@ pub struct SequencerNodeClients { mempool_p2p_propagator_client: Client, state_sync_client: Client, + l1_provider_client: Client, } /// A macro to retrieve a shared client wrapped in an `Arc`. The returned client is either the local @@ -123,6 +126,16 @@ impl SequencerNodeClients { self.gateway_client.get_local_client() } + pub fn get_l1_provider_local_client( + &self, + ) -> Option> { + self.l1_provider_client.get_local_client() + } + + pub fn get_l1_provider_shared_client(&self) -> Option { + get_shared_client!(self, l1_provider_client) + } + pub fn get_mempool_p2p_propagator_shared_client( &self, ) -> Option { @@ -244,11 +257,20 @@ pub fn create_node_clients( &config.components.state_sync.remote_client_config ); + let l1_provider_client = create_client!( + &config.components.l1_provider.execution_mode, + LocalL1ProviderClient, + RemoteL1ProviderClient, + channels.take_l1_provider_tx(), + &config.components.l1_provider.remote_client_config + ); + SequencerNodeClients { batcher_client, mempool_client, gateway_client, mempool_p2p_propagator_client, state_sync_client, + l1_provider_client, } } diff --git a/crates/starknet_sequencer_node/src/communication.rs b/crates/starknet_sequencer_node/src/communication.rs index 812b72ab99..dccec5ef94 100644 --- a/crates/starknet_sequencer_node/src/communication.rs +++ b/crates/starknet_sequencer_node/src/communication.rs @@ -1,5 +1,6 @@ use starknet_batcher_types::communication::BatcherRequestAndResponseSender; use starknet_gateway_types::communication::GatewayRequestAndResponseSender; +use starknet_l1_provider::communication::L1ProviderRequestAndResponseSender; use starknet_mempool_p2p_types::communication::MempoolP2pPropagatorRequestAndResponseSender; use starknet_mempool_types::communication::MempoolRequestAndResponseSender; use starknet_sequencer_infra::component_definitions::ComponentCommunication; @@ -9,6 +10,7 @@ use tokio::sync::mpsc::{channel, Receiver, Sender}; pub struct SequencerNodeCommunication { batcher_channel: ComponentCommunication, gateway_channel: ComponentCommunication, + l1_provider_channel: ComponentCommunication, mempool_channel: ComponentCommunication, mempool_p2p_propagator_channel: ComponentCommunication, @@ -32,6 +34,14 @@ impl SequencerNodeCommunication { self.gateway_channel.take_rx() } + pub fn take_l1_provider_tx(&mut self) -> Sender { + self.l1_provider_channel.take_tx() + } + + pub fn take_l1_provider_rx(&mut self) -> Receiver { + self.l1_provider_channel.take_rx() + } + pub fn take_mempool_p2p_propagator_tx( &mut self, ) -> Sender { @@ -68,6 +78,9 @@ pub fn create_node_channels() -> SequencerNodeCommunication { let (tx_gateway, rx_gateway) = channel::(DEFAULT_INVOCATIONS_QUEUE_SIZE); + let (tx_l1_provider, rx_l1_provider) = + channel::(DEFAULT_INVOCATIONS_QUEUE_SIZE); + let (tx_mempool, rx_mempool) = channel::(DEFAULT_INVOCATIONS_QUEUE_SIZE); @@ -80,6 +93,10 @@ pub fn create_node_channels() -> SequencerNodeCommunication { SequencerNodeCommunication { batcher_channel: ComponentCommunication::new(Some(tx_batcher), Some(rx_batcher)), gateway_channel: ComponentCommunication::new(Some(tx_gateway), Some(rx_gateway)), + l1_provider_channel: ComponentCommunication::new( + Some(tx_l1_provider), + Some(rx_l1_provider), + ), mempool_channel: ComponentCommunication::new(Some(tx_mempool), Some(rx_mempool)), mempool_p2p_propagator_channel: ComponentCommunication::new( Some(tx_mempool_p2p_propagator), diff --git a/crates/starknet_sequencer_node/src/components.rs b/crates/starknet_sequencer_node/src/components.rs index 7e110e1aea..6f8b1e011e 100644 --- a/crates/starknet_sequencer_node/src/components.rs +++ b/crates/starknet_sequencer_node/src/components.rs @@ -4,6 +4,7 @@ use starknet_batcher::batcher::{create_batcher, Batcher}; use starknet_consensus_manager::consensus_manager::ConsensusManager; use starknet_gateway::gateway::{create_gateway, Gateway}; use starknet_http_server::http_server::{create_http_server, HttpServer}; +use starknet_l1_provider::{create_l1_provider, L1Provider}; use starknet_mempool::communication::{create_mempool, MempoolCommunicationWrapper}; use starknet_mempool_p2p::create_p2p_propagator_and_runner; use starknet_mempool_p2p::propagator::MempoolP2pPropagator; @@ -29,6 +30,7 @@ pub struct SequencerNodeComponents { pub consensus_manager: Option, pub gateway: Option, pub http_server: Option, + pub l1_provider: Option, pub mempool: Option, pub monitoring_endpoint: Option, pub mempool_p2p_propagator: Option, @@ -46,7 +48,10 @@ pub fn create_node_components( | ReactiveComponentExecutionMode::LocalExecutionWithRemoteEnabled => { let mempool_client = clients.get_mempool_shared_client().expect("Mempool Client should be available"); - Some(create_batcher(config.batcher_config.clone(), mempool_client)) + let l1_provider_client = clients + .get_l1_provider_shared_client() + .expect("L1 Provider Client should be available"); + Some(create_batcher(config.batcher_config.clone(), mempool_client, l1_provider_client)) } ReactiveComponentExecutionMode::Disabled | ReactiveComponentExecutionMode::Remote => None, }; @@ -139,11 +144,20 @@ pub fn create_node_components( } }; + let l1_provider = match config.components.l1_provider.execution_mode { + ReactiveComponentExecutionMode::LocalExecutionWithRemoteDisabled + | ReactiveComponentExecutionMode::LocalExecutionWithRemoteEnabled => { + Some(create_l1_provider(config.l1_provider_config.clone())) + } + ReactiveComponentExecutionMode::Disabled | ReactiveComponentExecutionMode::Remote => None, + }; + SequencerNodeComponents { batcher, consensus_manager, gateway, http_server, + l1_provider, mempool, monitoring_endpoint, mempool_p2p_propagator, diff --git a/crates/starknet_sequencer_node/src/config/component_config.rs b/crates/starknet_sequencer_node/src/config/component_config.rs index 1cde477042..1b469221e1 100644 --- a/crates/starknet_sequencer_node/src/config/component_config.rs +++ b/crates/starknet_sequencer_node/src/config/component_config.rs @@ -24,6 +24,8 @@ pub struct ComponentConfig { pub mempool_p2p: ReactiveComponentExecutionConfig, #[validate] pub state_sync: ReactiveComponentExecutionConfig, + #[validate] + pub l1_provider: ReactiveComponentExecutionConfig, // Active component configs. #[validate] @@ -42,6 +44,7 @@ impl SerializeConfig for ComponentConfig { append_sub_config_name(self.gateway.dump(), "gateway"), append_sub_config_name(self.http_server.dump(), "http_server"), append_sub_config_name(self.mempool.dump(), "mempool"), + append_sub_config_name(self.l1_provider.dump(), "l1_provider"), append_sub_config_name(self.mempool_p2p.dump(), "mempool_p2p"), append_sub_config_name(self.monitoring_endpoint.dump(), "monitoring_endpoint"), append_sub_config_name(self.state_sync.dump(), "state_sync"), diff --git a/crates/starknet_sequencer_node/src/config/node_config.rs b/crates/starknet_sequencer_node/src/config/node_config.rs index 5bc3500f4c..d8a289ad80 100644 --- a/crates/starknet_sequencer_node/src/config/node_config.rs +++ b/crates/starknet_sequencer_node/src/config/node_config.rs @@ -23,6 +23,7 @@ use starknet_batcher::VersionedConstantsOverrides; use starknet_consensus_manager::config::ConsensusManagerConfig; use starknet_gateway::config::{GatewayConfig, RpcStateReaderConfig}; use starknet_http_server::config::HttpServerConfig; +use starknet_l1_provider::L1ProviderConfig; use starknet_mempool_p2p::config::MempoolP2pConfig; use starknet_monitoring_endpoint::config::MonitoringEndpointConfig; use starknet_sierra_compile::config::SierraToCasmCompilationConfig; @@ -123,6 +124,8 @@ pub struct SequencerNodeConfig { #[validate] pub compiler_config: SierraToCasmCompilationConfig, #[validate] + pub l1_provider_config: L1ProviderConfig, + #[validate] pub mempool_p2p_config: MempoolP2pConfig, #[validate] pub monitoring_endpoint_config: MonitoringEndpointConfig, @@ -149,6 +152,7 @@ impl SerializeConfig for SequencerNodeConfig { "monitoring_endpoint_config", ), append_sub_config_name(self.state_sync_config.dump(), "state_sync_config"), + append_sub_config_name(self.l1_provider_config.dump(), "l1_provider_config"), ]; sub_configs.into_iter().flatten().collect() diff --git a/crates/starknet_sequencer_node/src/servers.rs b/crates/starknet_sequencer_node/src/servers.rs index 3c1fcb8822..538d2bbd68 100644 --- a/crates/starknet_sequencer_node/src/servers.rs +++ b/crates/starknet_sequencer_node/src/servers.rs @@ -6,6 +6,7 @@ use starknet_batcher::communication::{LocalBatcherServer, RemoteBatcherServer}; use starknet_consensus_manager::communication::ConsensusManagerServer; use starknet_gateway::communication::{LocalGatewayServer, RemoteGatewayServer}; use starknet_http_server::communication::HttpServer; +use starknet_l1_provider::communication::{LocalL1ProviderServer, RemoteL1ProviderServer}; use starknet_mempool::communication::{LocalMempoolServer, RemoteMempoolServer}; use starknet_mempool_p2p::propagator::{ LocalMempoolP2pPropagatorServer, @@ -37,6 +38,7 @@ use crate::config::node_config::SequencerNodeConfig; struct LocalServers { pub(crate) batcher: Option>, pub(crate) gateway: Option>, + pub(crate) l1_provider: Option>, pub(crate) mempool: Option>, pub(crate) mempool_p2p_propagator: Option>, pub(crate) state_sync: Option>, @@ -56,6 +58,7 @@ struct WrapperServers { pub struct RemoteServers { pub batcher: Option>, pub gateway: Option>, + pub l1_provider: Option>, pub mempool: Option>, pub mempool_p2p_propagator: Option>, pub state_sync: Option>, @@ -227,6 +230,11 @@ fn create_local_servers( components.gateway, communication.take_gateway_rx() ); + let l1_provider_server = create_local_server!( + &config.components.l1_provider.execution_mode, + components.l1_provider, + communication.take_l1_provider_rx() + ); let mempool_server = create_local_server!( &config.components.mempool.execution_mode, components.mempool, @@ -246,6 +254,7 @@ fn create_local_servers( LocalServers { batcher: batcher_server, gateway: gateway_server, + l1_provider: l1_provider_server, mempool: mempool_server, mempool_p2p_propagator: mempool_p2p_propagator_server, state_sync: state_sync_server, @@ -270,6 +279,13 @@ pub fn create_remote_servers( config.components.gateway.remote_server_config ); + let l1_provider_client = clients.get_l1_provider_local_client(); + let l1_provider_server = create_remote_server!( + &config.components.l1_provider.execution_mode, + l1_provider_client, + config.components.l1_provider.remote_server_config + ); + let mempool_client = clients.get_mempool_local_client(); let mempool_server = create_remote_server!( &config.components.mempool.execution_mode, @@ -283,7 +299,6 @@ pub fn create_remote_servers( mempool_p2p_propagator_client, config.components.mempool_p2p.remote_server_config ); - let state_sync_client = clients.get_state_sync_local_client(); let state_sync_server = create_remote_server!( &config.components.state_sync.execution_mode, @@ -294,6 +309,7 @@ pub fn create_remote_servers( RemoteServers { batcher: batcher_server, gateway: gateway_server, + l1_provider: l1_provider_server, mempool: mempool_server, mempool_p2p_propagator: mempool_p2p_propagator_server, state_sync: state_sync_server, @@ -322,7 +338,6 @@ fn create_wrapper_servers( &config.components.mempool_p2p.execution_mode.clone().into(), components.mempool_p2p_runner ); - let state_sync_runner_server = create_wrapper_server!( &config.components.state_sync.execution_mode.clone().into(), components.state_sync_runner @@ -390,6 +405,10 @@ pub async fn run_component_servers(servers: SequencerNodeServers) -> anyhow::Res // StateSyncRunner server. let state_sync_runner_future = get_server_future(servers.wrapper_servers.state_sync_runner); + // L1Provider server. + let local_l1_provider_future = get_server_future(servers.local_servers.l1_provider); + let remote_l1_provider_future = get_server_future(servers.remote_servers.l1_provider); + // Start servers. let local_batcher_handle = tokio::spawn(local_batcher_future); let remote_batcher_handle = tokio::spawn(remote_batcher_future); @@ -406,6 +425,8 @@ pub async fn run_component_servers(servers: SequencerNodeServers) -> anyhow::Res let local_state_sync_handle = tokio::spawn(local_state_sync_future); let remote_state_sync_handle = tokio::spawn(remote_state_sync_future); let state_sync_runner_handle = tokio::spawn(state_sync_runner_future); + let local_l1_provider_handle = tokio::spawn(local_l1_provider_future); + let remote_l1_provider_handle = tokio::spawn(remote_l1_provider_future); let result = tokio::select! { res = local_batcher_handle => { @@ -468,6 +489,14 @@ pub async fn run_component_servers(servers: SequencerNodeServers) -> anyhow::Res error!("State Sync Runner Server stopped."); res? } + res = local_l1_provider_handle => { + error!("Local L1 Provider Server stopped."); + res? + } + res = remote_l1_provider_handle => { + error!("Remote L1 Provider Server stopped."); + res? + } }; error!("Servers ended with unexpected Ok.");