diff --git a/Cargo.lock b/Cargo.lock index 22f0ae4317..acb844e32a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9067,6 +9067,7 @@ dependencies = [ "async-trait", "bincode 1.3.3", "hyper", + "papyrus_config", "pretty_assertions", "rstest", "serde", @@ -9074,6 +9075,7 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber", + "validator", ] [[package]] @@ -9120,6 +9122,7 @@ dependencies = [ "mempool_test_utils", "papyrus_config", "pretty_assertions", + "rstest", "serde", "serde_json", "starknet_gateway", diff --git a/config/mempool/default_config.json b/config/mempool/default_config.json index 2671161c30..e6888a1d7d 100644 --- a/config/mempool/default_config.json +++ b/config/mempool/default_config.json @@ -1,14 +1,94 @@ { + "components.gateway.component_type": { + "description": "The component type.", + "privacy": "Public", + "value": "IndependentComponent" + }, "components.gateway.execute": { "description": "The component execution flag.", "privacy": "Public", "value": true }, + "components.gateway.local_config.#is_none": { + "description": "Flag for an optional field.", + "privacy": "TemporaryValue", + "value": false + }, + "components.gateway.local_config.channel_buffer_size": { + "description": "The communication channel buffer size.", + "privacy": "Public", + "value": 32 + }, + "components.gateway.location": { + "description": "The component location.", + "privacy": "Public", + "value": "Local" + }, + "components.gateway.remote_config.#is_none": { + "description": "Flag for an optional field.", + "privacy": "TemporaryValue", + "value": true + }, + "components.gateway.remote_config.ip": { + "description": "The remote component server ip.", + "privacy": "Public", + "value": "0.0.0.0" + }, + "components.gateway.remote_config.port": { + "description": "The remote component server port.", + "privacy": "Public", + "value": 8080 + }, + "components.gateway.remote_config.retries": { + "description": "The max number of retries for sending a message.", + "privacy": "Public", + "value": 3 + }, + "components.mempool.component_type": { + "description": "The component type.", + "privacy": "Public", + "value": "SynchronousComponent" + }, "components.mempool.execute": { "description": "The component execution flag.", "privacy": "Public", "value": true }, + "components.mempool.local_config.#is_none": { + "description": "Flag for an optional field.", + "privacy": "TemporaryValue", + "value": false + }, + "components.mempool.local_config.channel_buffer_size": { + "description": "The communication channel buffer size.", + "privacy": "Public", + "value": 32 + }, + "components.mempool.location": { + "description": "The component location.", + "privacy": "Public", + "value": "Local" + }, + "components.mempool.remote_config.#is_none": { + "description": "Flag for an optional field.", + "privacy": "TemporaryValue", + "value": true + }, + "components.mempool.remote_config.ip": { + "description": "The remote component server ip.", + "privacy": "Public", + "value": "0.0.0.0" + }, + "components.mempool.remote_config.port": { + "description": "The remote component server port.", + "privacy": "Public", + "value": 8080 + }, + "components.mempool.remote_config.retries": { + "description": "The max number of retries for sending a message.", + "privacy": "Public", + "value": 3 + }, "gateway_config.compiler_config.sierra_to_casm_compiler_config.max_bytecode_size": { "description": "Limitation of contract bytecode size.", "privacy": "Public", diff --git a/crates/mempool_infra/Cargo.toml b/crates/mempool_infra/Cargo.toml index 808ec2bfba..ba91146412 100644 --- a/crates/mempool_infra/Cargo.toml +++ b/crates/mempool_infra/Cargo.toml @@ -15,12 +15,14 @@ workspace = true async-trait.workspace = true bincode.workspace = true hyper = { workspace = true, features = ["client", "http2", "server", "tcp"] } +papyrus_config = { path = "../papyrus_config", version = "0.4.0-rc.0" } rstest.workspace = true serde = { workspace = true, features = ["derive"] } thiserror.workspace = true tokio.workspace = true tracing.workspace = true tracing-subscriber = { workspace = true, features = ["env-filter"] } +validator.workspace = true [dev-dependencies] assert_matches.workspace = true diff --git a/crates/mempool_infra/src/component_definitions.rs b/crates/mempool_infra/src/component_definitions.rs index 4585fdc8e1..5cd742a85f 100644 --- a/crates/mempool_infra/src/component_definitions.rs +++ b/crates/mempool_infra/src/component_definitions.rs @@ -1,7 +1,16 @@ +use std::collections::BTreeMap; +use std::net::IpAddr; + use async_trait::async_trait; +use papyrus_config::dumping::{ser_param, SerializeConfig}; +use papyrus_config::{ParamPath, ParamPrivacyInput, SerializedParam}; use serde::{Deserialize, Serialize}; use thiserror::Error; use tokio::sync::mpsc::{Receiver, Sender}; +use validator::Validate; + +const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 32; +const DEFAULT_RETRIES: usize = 3; #[async_trait] pub trait ComponentRequestHandler { @@ -43,3 +52,65 @@ pub enum ServerError { #[error("Could not deserialize client request: {0}")] RequestDeserializationFailure(String), } + +// The communication configuration of the local component. +#[derive(Clone, Debug, Serialize, Deserialize, Validate, PartialEq)] +pub struct LocalComponentCommunicationConfig { + pub channel_buffer_size: usize, +} + +impl SerializeConfig for LocalComponentCommunicationConfig { + fn dump(&self) -> BTreeMap { + BTreeMap::from_iter([ser_param( + "channel_buffer_size", + &self.channel_buffer_size, + "The communication channel buffer size.", + ParamPrivacyInput::Public, + )]) + } +} + +impl Default for LocalComponentCommunicationConfig { + fn default() -> Self { + Self { channel_buffer_size: DEFAULT_CHANNEL_BUFFER_SIZE } + } +} + +// The communication configuration of the remote component. +#[derive(Clone, Debug, Serialize, Deserialize, Validate, PartialEq)] +pub struct RemoteComponentCommunicationConfig { + pub ip: IpAddr, + pub port: u16, + pub retries: usize, +} + +impl SerializeConfig for RemoteComponentCommunicationConfig { + fn dump(&self) -> BTreeMap { + BTreeMap::from_iter([ + ser_param( + "ip", + &self.ip.to_string(), + "The remote component server ip.", + ParamPrivacyInput::Public, + ), + ser_param( + "port", + &self.port, + "The remote component server port.", + ParamPrivacyInput::Public, + ), + ser_param( + "retries", + &self.retries, + "The max number of retries for sending a message.", + ParamPrivacyInput::Public, + ), + ]) + } +} + +impl Default for RemoteComponentCommunicationConfig { + fn default() -> Self { + Self { ip: "0.0.0.0".parse().unwrap(), port: 8080, retries: DEFAULT_RETRIES } + } +} diff --git a/crates/mempool_node/Cargo.toml b/crates/mempool_node/Cargo.toml index aa73ca92ff..309f601394 100644 --- a/crates/mempool_node/Cargo.toml +++ b/crates/mempool_node/Cargo.toml @@ -14,6 +14,7 @@ clap.workspace = true const_format.workspace = true futures.workspace = true papyrus_config.workspace = true +rstest.workspace = true serde.workspace = true starknet_gateway.workspace = true starknet_mempool.workspace = true diff --git a/crates/mempool_node/src/config/config_test.rs b/crates/mempool_node/src/config/config_test.rs index 34c2f44d46..1cf63a6e39 100644 --- a/crates/mempool_node/src/config/config_test.rs +++ b/crates/mempool_node/src/config/config_test.rs @@ -7,26 +7,27 @@ use colored::Colorize; use mempool_test_utils::get_absolute_path; use papyrus_config::dumping::SerializeConfig; use papyrus_config::validators::{ParsedValidationError, ParsedValidationErrors}; -use validator::Validate; +use rstest::rstest; +use starknet_mempool_infra::component_definitions::{ + LocalComponentCommunicationConfig, + RemoteComponentCommunicationConfig, +}; +use validator::{Validate, ValidationErrors}; use crate::config::{ ComponentConfig, ComponentExecutionConfig, + LocationType, MempoolNodeConfig, DEFAULT_CONFIG_PATH, }; -/// Test the validation of the struct ComponentConfig. -/// The validation validates at least one of the components is set with execute: true. -#[test] -fn test_components_config_validation() { - // Initialize an invalid config and check that the validator finds an error. - let mut component_config = ComponentConfig { - gateway: ComponentExecutionConfig { execute: false }, - mempool: ComponentExecutionConfig { execute: false }, - }; - - assert_matches!(component_config.validate().unwrap_err(), validation_errors => { +fn check_validation_error( + validation_result: Result<(), ValidationErrors>, + code_str: &str, + message_str: &str, +) { + assert_matches!(validation_result.unwrap_err(), validation_errors => { let parsed_errors = ParsedValidationErrors::from(validation_errors); assert_eq!(parsed_errors.0.len(), 1); let parsed_validation_error = &parsed_errors.0[0]; @@ -35,22 +36,127 @@ fn test_components_config_validation() { ParsedValidationError { param_path, code, message, params} if ( param_path == "__all__" && - code == "Invalid components configuration." && + code == code_str && params.is_empty() && - *message == Some("At least one component should be allowed to execute.".to_string()) + *message == Some(message_str.to_string()) ) ) }); +} - // Update the config to be valid and check that the validator finds no errors. - for (gateway_component_execute, mempool_component_execute) in - [(true, false), (false, true), (true, true)] - { - component_config.gateway.execute = gateway_component_execute; - component_config.mempool.execute = mempool_component_execute; +/// Test the validation of the struct ComponentExecutionConfig. +/// The validation validates that location of the component and the local/remote config are at sync. +#[rstest] +#[case( + LocationType::Local, + Some(LocalComponentCommunicationConfig::default()), + Some(RemoteComponentCommunicationConfig::default()), + "Local config and Remote config are mutually exclusive, can't be both active." +)] +#[case( + LocationType::Local, + None, + Some(RemoteComponentCommunicationConfig::default()), + "Local communication config is missing." +)] +#[case(LocationType::Local, None, None, "Local communication config is missing.")] +#[case( + LocationType::Remote, + Some(LocalComponentCommunicationConfig::default()), + Some(RemoteComponentCommunicationConfig::default()), + "Local config and Remote config are mutually exclusive, can't be both active." +)] +#[case( + LocationType::Remote, + Some(LocalComponentCommunicationConfig::default()), + None, + "Remote communication config is missing." +)] +#[case(LocationType::Remote, None, None, "Remote communication config is missing.")] +fn test_invalid_component_execution_config( + #[case] location: LocationType, + #[case] local_config: Option, + #[case] remote_config: Option, + #[case] expected_error_message: &str, +) { + // Initialize an invalid config and check that the validator finds an error. + let component_exe_config = ComponentExecutionConfig { + location, + local_config, + remote_config, + ..ComponentExecutionConfig::default() + }; + check_validation_error( + component_exe_config.validate(), + "Invalid component configuration.", + expected_error_message, + ); +} + +/// Test the validation of the struct ComponentExecutionConfig. +/// The validation validates that location of the component and the local/remote config are at sync. +#[rstest] +#[case::local(LocationType::Local)] +#[case::remote(LocationType::Remote)] +fn test_valid_component_execution_config(#[case] location: LocationType) { + // Initialize a valid config and check that the validator returns Ok. + let local_config = if location == LocationType::Local { + Some(LocalComponentCommunicationConfig::default()) + } else { + None + }; + let remote_config = if location == LocationType::Remote { + Some(RemoteComponentCommunicationConfig::default()) + } else { + None + }; + let component_exe_config = ComponentExecutionConfig { + location, + local_config, + remote_config, + ..ComponentExecutionConfig::default() + }; + assert!(component_exe_config.validate().is_ok()); +} + +#[test] +fn test_invalid_components_config() { + // Initialize an invalid config and check that the validator finds an error. + let component_config = ComponentConfig { + gateway: ComponentExecutionConfig { execute: false, ..ComponentExecutionConfig::default() }, + mempool: ComponentExecutionConfig { execute: false, ..ComponentExecutionConfig::default() }, + }; + + check_validation_error( + component_config.validate(), + "Invalid components configuration.", + "At least one component should be allowed to execute.", + ); +} + +/// Test the validation of the struct ComponentConfig. +/// The validation validates at least one of the components is set with execute: true. +#[rstest] +#[case(true, false)] +#[case(false, true)] +#[case(true, true)] +fn test_valid_components_config( + #[case] gateway_component_execute: bool, + #[case] mempool_component_execute: bool, +) { + // Initialize an invalid config and check that the validator finds an error. + let component_config = ComponentConfig { + gateway: ComponentExecutionConfig { + execute: gateway_component_execute, + ..ComponentExecutionConfig::default() + }, + mempool: ComponentExecutionConfig { + execute: mempool_component_execute, + ..ComponentExecutionConfig::default() + }, + }; - assert_matches!(component_config.validate(), Ok(())); - } + assert_matches!(component_config.validate(), Ok(())); } /// Test the validation of the struct MempoolNodeConfig and that the default config file is up to diff --git a/crates/mempool_node/src/config/mod.rs b/crates/mempool_node/src/config/mod.rs index 3fa5c715c7..3d502aeee6 100644 --- a/crates/mempool_node/src/config/mod.rs +++ b/crates/mempool_node/src/config/mod.rs @@ -6,11 +6,20 @@ use std::fs::File; use std::path::Path; use clap::Command; -use papyrus_config::dumping::{append_sub_config_name, ser_param, SerializeConfig}; +use papyrus_config::dumping::{ + append_sub_config_name, + ser_optional_sub_config, + ser_param, + SerializeConfig, +}; use papyrus_config::loading::load_and_process_config; use papyrus_config::{ConfigError, ParamPath, ParamPrivacyInput, SerializedParam}; use serde::{Deserialize, Serialize}; use starknet_gateway::config::{GatewayConfig, RpcStateReaderConfig}; +use starknet_mempool_infra::component_definitions::{ + LocalComponentCommunicationConfig, + RemoteComponentCommunicationConfig, +}; use validator::{Validate, ValidationError}; use crate::version::VERSION_FULL; @@ -18,37 +27,148 @@ use crate::version::VERSION_FULL; // The path of the default configuration file, provided as part of the crate. pub const DEFAULT_CONFIG_PATH: &str = "config/mempool/default_config.json"; -/// The single crate configuration. +// The configuration of the components. + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub enum LocationType { + Local, + Remote, +} +// TODO(Lev/Tsabary): When papyrus_config will support it, change to include communication config in +// the enum. + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub enum ComponentType { + // A component that does not require an infra server, and has an internal mutating task. + IndependentComponent, + // A component that requires an infra server, and has no internal mutating task. + SynchronousComponent, + // A component that requires an infra server, and has an internal mutating task. + AsynchronousComponent, +} +// TODO(Lev/Tsabary): Change the enum values to more discriptive. + +/// The single component configuration. #[derive(Clone, Debug, Serialize, Deserialize, Validate, PartialEq)] +#[validate(schema(function = "validate_single_component_config"))] pub struct ComponentExecutionConfig { pub execute: bool, + pub component_type: ComponentType, + pub location: LocationType, + pub local_config: Option, + pub remote_config: Option, } impl SerializeConfig for ComponentExecutionConfig { fn dump(&self) -> BTreeMap { - BTreeMap::from_iter([ser_param( - "execute", - &self.execute, - "The component execution flag.", - ParamPrivacyInput::Public, - )]) + let config = BTreeMap::from_iter([ + ser_param( + "execute", + &self.execute, + "The component execution flag.", + ParamPrivacyInput::Public, + ), + ser_param( + "location", + &self.location, + "The component location.", + ParamPrivacyInput::Public, + ), + ser_param( + "component_type", + &self.component_type, + "The component type.", + ParamPrivacyInput::Public, + ), + ]); + vec![ + config, + ser_optional_sub_config(&self.local_config, "local_config"), + ser_optional_sub_config(&self.remote_config, "remote_config"), + ] + .into_iter() + .flatten() + .collect() } } impl Default for ComponentExecutionConfig { fn default() -> Self { - Self { execute: true } + Self { + execute: true, + location: LocationType::Local, + component_type: ComponentType::SynchronousComponent, + local_config: Some(LocalComponentCommunicationConfig::default()), + remote_config: None, + } } } +/// Specific components default configurations. +impl ComponentExecutionConfig { + pub fn gateway_default_config() -> Self { + Self { + execute: true, + location: LocationType::Local, + component_type: ComponentType::IndependentComponent, + local_config: Some(LocalComponentCommunicationConfig::default()), + remote_config: None, + } + } + + pub fn mempool_default_config() -> Self { + Self { + execute: true, + location: LocationType::Local, + component_type: ComponentType::SynchronousComponent, + local_config: Some(LocalComponentCommunicationConfig::default()), + remote_config: None, + } + } +} + +pub fn validate_single_component_config( + component_config: &ComponentExecutionConfig, +) -> Result<(), ValidationError> { + let error_message = + if component_config.local_config.is_some() && component_config.remote_config.is_some() { + "Local config and Remote config are mutually exclusive, can't be both active." + } else if component_config.location == LocationType::Local + && component_config.local_config.is_none() + { + "Local communication config is missing." + } else if component_config.location == LocationType::Remote + && component_config.remote_config.is_none() + { + "Remote communication config is missing." + } else { + return Ok(()); + }; + + let mut error = ValidationError::new("Invalid component configuration."); + error.message = Some(error_message.into()); + Err(error) +} + /// The components configuration. -#[derive(Clone, Debug, Default, Serialize, Deserialize, Validate, PartialEq)] +#[derive(Clone, Debug, Serialize, Deserialize, Validate, PartialEq)] #[validate(schema(function = "validate_components_config"))] pub struct ComponentConfig { + #[validate] pub gateway: ComponentExecutionConfig, + #[validate] pub mempool: ComponentExecutionConfig, } +impl Default for ComponentConfig { + fn default() -> Self { + Self { + gateway: ComponentExecutionConfig::gateway_default_config(), + mempool: ComponentExecutionConfig::mempool_default_config(), + } + } +} + impl SerializeConfig for ComponentConfig { fn dump(&self) -> BTreeMap { #[allow(unused_mut)]