From 328b42ca5d26f7ca74b979df38137e001b5a86ed Mon Sep 17 00:00:00 2001 From: AurelienFT <32803821+AurelienFT@users.noreply.github.com> Date: Tue, 8 Oct 2024 04:56:58 +0200 Subject: [PATCH] Linking TxPoolV2 in place of V1 (#2263) Closes https://github.com/FuelLabs/fuel-core/issues/2020 Closes https://github.com/FuelLabs/fuel-core/issues/1961 Closes https://github.com/FuelLabs/fuel-core/issues/868 Closes https://github.com/FuelLabs/fuel-core/issues/2185 Closes https://github.com/FuelLabs/fuel-core/issues/2186 Closes https://github.com/FuelLabs/fuel-core/issues/2255 Closes https://github.com/FuelLabs/fuel-core/issues/2160 Closes https://github.com/FuelLabs/fuel-core/issues/1967 Closes https://github.com/FuelLabs/fuel-core/issues/2187 ## Description Remove code of the previous txpool and connect all modules to this one TODO: Improve description @AurelienFT ## Checklist - [x] Breaking changes are clearly marked as such in the PR description and changelog - [x] New behavior is reflected in tests - [x] [The specification](https://github.com/FuelLabs/fuel-specs/) matches the implemented behavior (link update PR if changes are needed) ### Before requesting review - [x] I have reviewed the code myself - [x] I have created follow-up issues caused by this PR and linked them here --------- Co-authored-by: Green Baneling --- CHANGELOG.md | 3 +- Cargo.lock | 29 +- Cargo.toml | 4 +- benches/benches/block_target_gas.rs | 7 +- benches/benches/transaction_throughput.rs | 13 +- .../tests/integration_tests.rs | 2 +- bin/fuel-core/src/cli/run.rs | 57 +- bin/fuel-core/src/cli/run/p2p.rs | 10 + bin/fuel-core/src/cli/run/tx_pool.rs | 48 +- crates/fuel-core/src/database/coin.rs | 2 +- crates/fuel-core/src/graphql_api.rs | 2 +- crates/fuel-core/src/graphql_api/database.rs | 6 +- crates/fuel-core/src/graphql_api/ports.rs | 16 +- .../src/graphql_api/storage/coins.rs | 2 +- .../src/graphql_api/storage/contracts.rs | 6 +- .../fuel-core/src/graphql_api/storage/old.rs | 6 +- .../src/graphql_api/worker_service.rs | 2 +- crates/fuel-core/src/p2p_test_helpers.rs | 16 +- crates/fuel-core/src/query/message/test.rs | 8 +- crates/fuel-core/src/query/subscriptions.rs | 16 +- .../fuel-core/src/query/subscriptions/test.rs | 8 +- crates/fuel-core/src/query/tx.rs | 2 +- crates/fuel-core/src/schema/node_info.rs | 2 +- crates/fuel-core/src/schema/tx.rs | 67 +- crates/fuel-core/src/schema/tx/types.rs | 24 +- crates/fuel-core/src/service/adapters.rs | 13 +- .../service/adapters/consensus_module/poa.rs | 32 +- .../src/service/adapters/executor.rs | 23 +- .../adapters/fuel_gas_price_provider.rs | 18 +- .../tests/producer_gas_price_tests.rs | 2 +- .../tests/tx_pool_gas_price_tests.rs | 2 +- .../src/service/adapters/graphql_api.rs | 71 +- .../service/adapters/graphql_api/off_chain.rs | 6 +- .../service/adapters/graphql_api/on_chain.rs | 6 +- crates/fuel-core/src/service/adapters/p2p.rs | 21 +- .../src/service/adapters/producer.rs | 15 +- .../fuel-core/src/service/adapters/txpool.rs | 147 +- crates/fuel-core/src/service/config.rs | 9 +- crates/fuel-core/src/service/query.rs | 74 +- crates/fuel-core/src/service/sub_services.rs | 23 +- .../services/consensus_module/poa/Cargo.toml | 1 + .../services/consensus_module/poa/src/lib.rs | 3 + .../consensus_module/poa/src/ports.rs | 20 +- .../consensus_module/poa/src/service.rs | 60 +- .../consensus_module/poa/src/service_test.rs | 169 +- .../service_test/manually_produce_tests.rs | 8 +- .../poa/src/service_test/trigger_tests.rs | 60 +- crates/services/gas_price_service/Cargo.toml | 1 + .../src/common/gas_price_algorithm.rs | 13 +- .../gas_price_service/src/v0/service.rs | 4 +- .../gas_price_service/src/v0/tests.rs | 8 +- .../src/v1/da_source_service.rs | 31 +- crates/services/p2p/Cargo.toml | 2 +- crates/services/p2p/src/config.rs | 10 + .../services/p2p/src/heavy_task_processor.rs | 192 -- crates/services/p2p/src/lib.rs | 1 - crates/services/p2p/src/ports.rs | 12 +- crates/services/p2p/src/service.rs | 80 +- .../services/producer/src/block_producer.rs | 26 +- crates/services/producer/src/mocks.rs | 5 +- crates/services/producer/src/ports.rs | 8 +- crates/services/src/async_processor.rs | 35 +- crates/services/src/sync_processor.rs | 27 +- crates/services/txpool/Cargo.toml | 46 - crates/services/txpool/README.md | 78 - crates/services/txpool/src/config.rs | 121 -- crates/services/txpool/src/containers.rs | 4 - .../txpool/src/containers/dependency.rs | 792 --------- crates/services/txpool/src/containers/sort.rs | 51 - .../txpool/src/containers/time_sort.rs | 68 - .../txpool/src/containers/tip_per_gas_sort.rs | 71 - crates/services/txpool/src/error.rs | 162 -- .../txpool/src/heavy_async_processing.rs | 195 --- crates/services/txpool/src/lib.rs | 98 -- crates/services/txpool/src/mock_db.rs | 185 -- crates/services/txpool/src/ports.rs | 141 -- crates/services/txpool/src/service.rs | 693 -------- .../txpool/src/service/test_helpers.rs | 310 ---- crates/services/txpool/src/service/tests.rs | 281 --- .../services/txpool/src/service/tests_p2p.rs | 410 ----- .../txpool/src/service/update_sender.rs | 339 ---- .../txpool/src/service/update_sender/tests.rs | 86 - .../service/update_sender/tests/test_e2e.rs | 244 --- .../update_sender/tests/test_permits.rs | 113 -- .../update_sender/tests/test_sending.rs | 145 -- .../update_sender/tests/test_subscribe.rs | 33 - .../src/service/update_sender/tests/utils.rs | 224 --- .../service/update_sender/tx_status_stream.rs | 103 -- crates/services/txpool/src/test_helpers.rs | 242 --- .../txpool/src/transaction_selector.rs | 337 ---- crates/services/txpool/src/txpool.rs | 609 ------- .../txpool/src/txpool/test_helpers.rs | 68 - crates/services/txpool/src/txpool/tests.rs | 1557 ----------------- crates/services/txpool/src/types.rs | 10 - crates/services/txpool_v2/Cargo.toml | 8 +- crates/services/txpool_v2/src/config.rs | 17 +- crates/services/txpool_v2/src/lib.rs | 15 +- crates/services/txpool_v2/src/pool.rs | 14 +- crates/services/txpool_v2/src/ports.rs | 9 +- .../txpool_v2/src/selection_algorithms/mod.rs | 6 + .../src/selection_algorithms/ratio_tip_gas.rs | 37 +- crates/services/txpool_v2/src/service.rs | 154 +- .../txpool_v2/src/service/subscriptions.rs | 4 +- .../txpool_v2/src/service/verifications.rs | 19 +- crates/services/txpool_v2/src/shared_state.rs | 48 +- .../services/txpool_v2/src/storage/graph.rs | 6 +- crates/services/txpool_v2/src/storage/mod.rs | 8 +- crates/services/txpool_v2/src/tests/mocks.rs | 10 +- .../txpool_v2/src/tests/stability_test.rs | 4 +- .../services/txpool_v2/src/tests/tests_p2p.rs | 6 +- .../txpool_v2/src/tests/tests_pool.rs | 8 + .../txpool_v2/src/tests/tests_service.rs | 2 +- .../services/txpool_v2/src/tests/universe.rs | 5 + .../txpool_v2/src/tx_status_stream.rs | 9 + crates/types/src/services/txpool.rs | 2 +- tests/Cargo.toml | 2 +- tests/test-helpers/Cargo.toml | 2 +- tests/test-helpers/src/builder.rs | 2 +- tests/tests/blocks.rs | 18 +- tests/tests/node_info.rs | 4 +- tests/tests/state_rewind.rs | 8 +- tests/tests/trigger_integration/never.rs | 2 +- tests/tests/tx/txn_status_subscription.rs | 6 +- tests/tests/tx/txpool.rs | 10 +- tests/tests/tx/upgrade.rs | 6 +- tests/tests/tx/utxo_validation.rs | 5 +- tests/tests/tx_gossip.rs | 33 +- tests/tests/vm_storage.rs | 2 +- 128 files changed, 1011 insertions(+), 8892 deletions(-) delete mode 100644 crates/services/p2p/src/heavy_task_processor.rs delete mode 100644 crates/services/txpool/Cargo.toml delete mode 100644 crates/services/txpool/README.md delete mode 100644 crates/services/txpool/src/config.rs delete mode 100644 crates/services/txpool/src/containers.rs delete mode 100644 crates/services/txpool/src/containers/dependency.rs delete mode 100644 crates/services/txpool/src/containers/sort.rs delete mode 100644 crates/services/txpool/src/containers/time_sort.rs delete mode 100644 crates/services/txpool/src/containers/tip_per_gas_sort.rs delete mode 100644 crates/services/txpool/src/error.rs delete mode 100644 crates/services/txpool/src/heavy_async_processing.rs delete mode 100644 crates/services/txpool/src/lib.rs delete mode 100644 crates/services/txpool/src/mock_db.rs delete mode 100644 crates/services/txpool/src/ports.rs delete mode 100644 crates/services/txpool/src/service.rs delete mode 100644 crates/services/txpool/src/service/test_helpers.rs delete mode 100644 crates/services/txpool/src/service/tests.rs delete mode 100644 crates/services/txpool/src/service/tests_p2p.rs delete mode 100644 crates/services/txpool/src/service/update_sender.rs delete mode 100644 crates/services/txpool/src/service/update_sender/tests.rs delete mode 100644 crates/services/txpool/src/service/update_sender/tests/test_e2e.rs delete mode 100644 crates/services/txpool/src/service/update_sender/tests/test_permits.rs delete mode 100644 crates/services/txpool/src/service/update_sender/tests/test_sending.rs delete mode 100644 crates/services/txpool/src/service/update_sender/tests/test_subscribe.rs delete mode 100644 crates/services/txpool/src/service/update_sender/tests/utils.rs delete mode 100644 crates/services/txpool/src/service/update_sender/tx_status_stream.rs delete mode 100644 crates/services/txpool/src/test_helpers.rs delete mode 100644 crates/services/txpool/src/transaction_selector.rs delete mode 100644 crates/services/txpool/src/txpool.rs delete mode 100644 crates/services/txpool/src/txpool/test_helpers.rs delete mode 100644 crates/services/txpool/src/txpool/tests.rs delete mode 100644 crates/services/txpool/src/types.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index c61c4fce7ba..08150ae5a0b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Added - [2309](https://github.com/FuelLabs/fuel-core/pull/2309): Limit number of concurrent queries to the graphql service. - [2216](https://github.com/FuelLabs/fuel-core/pull/2216): Add more function to the state and task of TxPoolV2 to handle the future interactions with others modules (PoA, BlockProducer, BlockImporter and P2P) +- [2263](https://github.com/FuelLabs/fuel-core/pull/2263): Use the Txpool v2 in the whole codebase ### Removed - [2306](https://github.com/FuelLabs/fuel-core/pull/2306): Removed hack for genesis asset contract from the code. @@ -27,8 +28,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - [2131](https://github.com/FuelLabs/fuel-core/pull/2131): Add flow in TxPool in order to ask to newly connected peers to share their transaction pool - [2182](https://github.com/FuelLabs/fuel-core/pull/2151): Limit number of transactions that can be fetched via TxSource::next - [2189](https://github.com/FuelLabs/fuel-core/pull/2151): Select next DA height to never include more than u16::MAX -1 transactions from L1. -- [2162](https://github.com/FuelLabs/fuel-core/pull/2162): Pool structure with dependencies, etc.. for the next transaction pool module. Also adds insertion/verification process in PoolV2 and tests refactoring - [2265](https://github.com/FuelLabs/fuel-core/pull/2265): Integrate Block Committer API for DA Block Costs. +- [2162](https://github.com/FuelLabs/fuel-core/pull/2162): Pool structure with dependencies, etc.. for the next transaction pool module. Also adds insertion/verification process in PoolV2 and tests refactoring - [2280](https://github.com/FuelLabs/fuel-core/pull/2280): Allow comma separated relayer addresses in cli - [2299](https://github.com/FuelLabs/fuel-core/pull/2299): Support blobs in the predicates. - [2300](https://github.com/FuelLabs/fuel-core/pull/2300): Added new function to `fuel-core-client` for checking whether a blob exists. diff --git a/Cargo.lock b/Cargo.lock index f5effb9bdc2..27d252200a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3457,6 +3457,7 @@ dependencies = [ "fuel-gas-price-algorithm", "futures", "num_enum", + "parking_lot", "reqwest", "serde", "strum 0.25.0", @@ -3571,6 +3572,7 @@ dependencies = [ "fuel-core-poa", "fuel-core-services", "fuel-core-storage", + "fuel-core-trace", "fuel-core-types 0.37.1", "k256", "mockall", @@ -3759,39 +3761,12 @@ dependencies = [ "anyhow", "async-trait", "derive_more", - "fuel-core-metrics", "fuel-core-services", "fuel-core-storage", "fuel-core-trace", "fuel-core-txpool", "fuel-core-types 0.37.1", "futures", - "itertools 0.12.1", - "mockall", - "num-rational", - "parking_lot", - "proptest", - "rayon", - "rstest", - "test-strategy", - "tokio", - "tokio-rayon", - "tokio-stream", - "tracing", -] - -[[package]] -name = "fuel-core-txpool-v2" -version = "0.37.1" -dependencies = [ - "anyhow", - "async-trait", - "derive_more", - "fuel-core-services", - "fuel-core-storage", - "fuel-core-trace", - "fuel-core-types 0.37.1", - "futures", "mockall", "num-rational", "parking_lot", diff --git a/Cargo.toml b/Cargo.toml index 859a82d3fec..a626103fb61 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,6 @@ members = [ "crates/services/producer", "crates/services/relayer", "crates/services/sync", - "crates/services/txpool", "crates/services/txpool_v2", "crates/services/upgradable-executor", "crates/services/upgradable-executor/wasm-executor", @@ -77,8 +76,7 @@ fuel-core-p2p = { version = "0.37.1", path = "./crates/services/p2p" } fuel-core-producer = { version = "0.37.1", path = "./crates/services/producer" } fuel-core-relayer = { version = "0.37.1", path = "./crates/services/relayer" } fuel-core-sync = { version = "0.37.1", path = "./crates/services/sync" } -fuel-core-txpool = { version = "0.37.1", path = "./crates/services/txpool" } -fuel-core-txpool-v2 = { version = "0.37.1", path = "./crates/services/txpool_v2" } +fuel-core-txpool = { version = "0.37.1", path = "./crates/services/txpool_v2" } fuel-core-storage = { version = "0.37.1", path = "./crates/storage", default-features = false } fuel-core-trace = { version = "0.37.1", path = "./crates/trace" } fuel-core-types = { version = "0.37.1", path = "./crates/types", default-features = false } diff --git a/benches/benches/block_target_gas.rs b/benches/benches/block_target_gas.rs index 0518d4f1ae5..9574720a3df 100644 --- a/benches/benches/block_target_gas.rs +++ b/benches/benches/block_target_gas.rs @@ -27,7 +27,6 @@ use fuel_core::{ Config, FuelService, }, - txpool::types::Word, }; use fuel_core_benches::{ default_gas_costs::default_gas_costs, @@ -57,6 +56,7 @@ use fuel_core_types::{ GTFArgs, Instruction, RegId, + Word, }, fuel_crypto::{ secp256r1, @@ -428,11 +428,8 @@ fn run_with_service_with_extra_inputs( let mut sub = shared.block_importer.block_importer.subscribe(); shared .txpool_shared_state - .insert(vec![std::sync::Arc::new(tx)]) + .insert(tx) .await - .into_iter() - .next() - .expect("Should be at least 1 element") .expect("Should include transaction successfully"); let res = sub.recv().await.expect("Should produce a block"); assert_eq!(res.tx_status.len(), 2, "res.tx_status: {:?}", res.tx_status); diff --git a/benches/benches/transaction_throughput.rs b/benches/benches/transaction_throughput.rs index c49055d50db..23454dda359 100644 --- a/benches/benches/transaction_throughput.rs +++ b/benches/benches/transaction_throughput.rs @@ -104,20 +104,15 @@ where for _ in 0..iters { let mut test_builder = test_builder.clone(); let sealed_block = { - let transactions = transactions - .iter() - .map(|tx| Arc::new(tx.clone())) - .collect(); + let transactions: Vec = + transactions.iter().cloned().collect(); // start the producer node let TestContext { srv, client, .. } = test_builder.finalize().await; // insert all transactions - let results = - srv.shared.txpool_shared_state.insert(transactions).await; - for result in results { - let result = result.expect("Should insert transaction"); - assert_eq!(result.removed.len(), 0); + for tx in transactions { + srv.shared.txpool_shared_state.insert(tx).await.unwrap(); } let _ = client.produce_blocks(1, None).await; diff --git a/bin/e2e-test-client/tests/integration_tests.rs b/bin/e2e-test-client/tests/integration_tests.rs index 794d54d2e95..a6a13f75f2a 100644 --- a/bin/e2e-test-client/tests/integration_tests.rs +++ b/bin/e2e-test-client/tests/integration_tests.rs @@ -4,12 +4,12 @@ use fuel_core::service::{ }; // Add methods on commands -use fuel_core::txpool::types::ContractId; use fuel_core_chain_config::{ SnapshotMetadata, SnapshotReader, }; use fuel_core_e2e_client::config::SuiteConfig; +use fuel_core_types::fuel_tx::ContractId; use std::{ fs, str::FromStr, diff --git a/bin/fuel-core/src/cli/run.rs b/bin/fuel-core/src/cli/run.rs index 54fdb8c1354..c88e77d1132 100644 --- a/bin/fuel-core/src/cli/run.rs +++ b/bin/fuel-core/src/cli/run.rs @@ -33,9 +33,12 @@ use fuel_core::{ RelayerConsensusConfig, VMConfig, }, - txpool::{ - config::BlackList, + txpool::config::{ + BlackList, Config as TxPoolConfig, + HeavyWorkConfig, + PoolLimits, + ServiceChannelLimits, }, types::{ fuel_tx::ContractId, @@ -438,22 +441,50 @@ impl Command { let TxPoolArgs { tx_pool_ttl, + tx_ttl_check_interval, tx_max_number, - tx_max_depth, + tx_max_total_bytes, + tx_max_total_gas, + tx_max_chain_count, tx_number_active_subscriptions, tx_blacklist_addresses, tx_blacklist_coins, tx_blacklist_messages, tx_blacklist_contracts, + tx_number_threads_to_verify_transactions, + tx_size_of_verification_queue, + tx_number_threads_p2p_sync, + tx_size_of_p2p_sync_queue, + tx_max_pending_read_requests, + tx_max_pending_write_requests, } = tx_pool; - let blacklist = BlackList::new( + let black_list = BlackList::new( tx_blacklist_addresses, tx_blacklist_coins, tx_blacklist_messages, tx_blacklist_contracts, ); + let pool_limits = PoolLimits { + max_txs: tx_max_number, + max_gas: tx_max_total_gas, + max_bytes_size: tx_max_total_bytes, + }; + + let pool_heavy_work_config = HeavyWorkConfig { + number_threads_to_verify_transactions: + tx_number_threads_to_verify_transactions, + size_of_verification_queue: tx_size_of_verification_queue, + number_threads_p2p_sync: tx_number_threads_p2p_sync, + size_of_p2p_sync_queue: tx_size_of_p2p_sync_queue, + }; + + let service_channel_limits = ServiceChannelLimits { + max_pending_read_pool_requests: tx_max_pending_read_requests, + max_pending_write_pool_requests: tx_max_pending_write_requests, + }; + let config = Config { graphql_config: GraphQLConfig { addr, @@ -477,15 +508,17 @@ impl Command { vm: VMConfig { backtrace: vm_backtrace, }, - txpool: TxPoolConfig::new( - tx_max_number, - tx_max_depth, + txpool: TxPoolConfig { + max_txs_chain_count: tx_max_chain_count, + max_txs_ttl: tx_pool_ttl.into(), + ttl_check_interval: tx_ttl_check_interval.into(), utxo_validation, - metrics, - tx_pool_ttl.into(), - tx_number_active_subscriptions, - blacklist, - ), + max_tx_update_subscriptions: tx_number_active_subscriptions, + black_list, + pool_limits, + heavy_work: pool_heavy_work_config, + service_channel_limits, + }, block_producer: ProducerConfig { coinbase_recipient, metrics, diff --git a/bin/fuel-core/src/cli/run/p2p.rs b/bin/fuel-core/src/cli/run/p2p.rs index 1e91f80589a..98a9a33759c 100644 --- a/bin/fuel-core/src/cli/run/p2p.rs +++ b/bin/fuel-core/src/cli/run/p2p.rs @@ -189,6 +189,14 @@ pub struct P2PArgs { /// For peer reputations, the maximum time since last heartbeat before penalty #[clap(long = "heartbeat-max-time-since-last", default_value = "40", env)] pub heartbeat_max_time_since_last: u64, + + /// Number of threads to read from the database. + #[clap(long = "p2p-database-read-threads", default_value = "2", env)] + pub database_read_threads: usize, + + /// Number of threads to read from the tx pool. + #[clap(long = "p2p-txpool-threads", default_value = "0", env)] + pub tx_pool_threads: usize, } #[derive(Debug, Clone, Args)] @@ -335,6 +343,8 @@ impl P2PArgs { info_interval: Some(Duration::from_secs(self.info_interval)), identify_interval: Some(Duration::from_secs(self.identify_interval)), metrics, + database_read_threads: self.database_read_threads, + tx_pool_threads: self.tx_pool_threads, state: NotInitialized, }; Ok(Some(config)) diff --git a/bin/fuel-core/src/cli/run/tx_pool.rs b/bin/fuel-core/src/cli/run/tx_pool.rs index 844b10c333a..34f33df44c1 100644 --- a/bin/fuel-core/src/cli/run/tx_pool.rs +++ b/bin/fuel-core/src/cli/run/tx_pool.rs @@ -1,9 +1,9 @@ //! Clap configuration related to TxPool service. -use fuel_core::txpool::types::ContractId; use fuel_core_types::{ fuel_tx::{ Address, + ContractId, UtxoId, }, fuel_types::Nonce, @@ -15,13 +15,25 @@ pub struct TxPoolArgs { #[clap(long = "tx-pool-ttl", default_value = "5m", env)] pub tx_pool_ttl: humantime::Duration, + /// The interval for checking the time to live of transactions. + #[clap(long = "tx-ttl-check-interval", default_value = "1m", env)] + pub tx_ttl_check_interval: humantime::Duration, + /// The max number of transactions that the `TxPool` can simultaneously store. #[clap(long = "tx-max-number", default_value = "4064", env)] pub tx_max_number: usize, - /// The max depth of the dependent transactions that supported by the `TxPool`. - #[clap(long = "tx-max-depth", default_value = "10", env)] - pub tx_max_depth: usize, + /// The max number of gas the `TxPool` can simultaneously store. + #[clap(long = "tx-max-total-gas", default_value = "30000000000", env)] + pub tx_max_total_gas: u64, + + /// The max number of bytes that the `TxPool` can simultaneously store. + #[clap(long = "tx-max-total-bytes", default_value = "131072000", env)] + pub tx_max_total_bytes: usize, + + /// The max number of tx in a chain of dependent transactions that supported by the `TxPool`. + #[clap(long = "tx-max-depth", default_value = "32", env)] + pub tx_max_chain_count: usize, /// The maximum number of active subscriptions that supported by the `TxPool`. #[clap(long = "tx-number-active-subscriptions", default_value = "4064", env)] @@ -42,6 +54,34 @@ pub struct TxPoolArgs { /// The list of banned contracts ignored by the `TxPool`. #[clap(long = "tx-blacklist-contracts", value_delimiter = ',', env)] pub tx_blacklist_contracts: Vec, + + /// Number of threads for managing verifications/insertions. + #[clap( + long = "tx-number-threads-to-verify-transactions", + default_value = "4", + env + )] + pub tx_number_threads_to_verify_transactions: usize, + + /// Maximum number of tasks in the verifications/insertions queue. + #[clap(long = "tx-size-of-verification-queue", default_value = "2000", env)] + pub tx_size_of_verification_queue: usize, + + /// Number of threads for managing the p2p synchronisation. + #[clap(long = "tx-number-threads-p2p-sync", default_value = "2", env)] + pub tx_number_threads_p2p_sync: usize, + + /// Maximum number of tasks in the p2p synchronisation queue. + #[clap(long = "tx-size-of-p2p-sync-queue", default_value = "20", env)] + pub tx_size_of_p2p_sync_queue: usize, + + /// Maximum number of pending write requests in the service. + #[clap(long = "tx-max-pending-write-requests", default_value = "500", env)] + pub tx_max_pending_write_requests: usize, + + /// Maximum number of pending read requests in the service. + #[clap(long = "tx-max-pending-read-requests", default_value = "1000", env)] + pub tx_max_pending_read_requests: usize, } #[cfg(test)] diff --git a/crates/fuel-core/src/database/coin.rs b/crates/fuel-core/src/database/coin.rs index 4c94d202bd7..488e0a90456 100644 --- a/crates/fuel-core/src/database/coin.rs +++ b/crates/fuel-core/src/database/coin.rs @@ -18,11 +18,11 @@ use fuel_core_storage::{ Result as StorageResult, StorageAsRef, }; -use fuel_core_txpool::types::TxId; use fuel_core_types::{ entities::coins::coin::CompressedCoin, fuel_tx::{ Address, + TxId, UtxoId, }, }; diff --git a/crates/fuel-core/src/graphql_api.rs b/crates/fuel-core/src/graphql_api.rs index bbe9843986e..46e2dc0d676 100644 --- a/crates/fuel-core/src/graphql_api.rs +++ b/crates/fuel-core/src/graphql_api.rs @@ -85,7 +85,7 @@ pub struct Config { pub debug: bool, pub vm_backtrace: bool, pub max_tx: usize, - pub max_txpool_depth: usize, + pub max_txpool_dependency_chain_length: usize, pub chain_name: String, } diff --git a/crates/fuel-core/src/graphql_api/database.rs b/crates/fuel-core/src/graphql_api/database.rs index 752b324529d..418e14c1318 100644 --- a/crates/fuel-core/src/graphql_api/database.rs +++ b/crates/fuel-core/src/graphql_api/database.rs @@ -29,10 +29,6 @@ use fuel_core_storage::{ StorageRead, StorageSize, }; -use fuel_core_txpool::types::{ - ContractId, - TxId, -}; use fuel_core_types::{ blockchain::{ block::CompressedBlock, @@ -53,8 +49,10 @@ use fuel_core_types::{ Address, AssetId, Bytes32, + ContractId, Salt, Transaction, + TxId, TxPointer, UtxoId, }, diff --git a/crates/fuel-core/src/graphql_api/ports.rs b/crates/fuel-core/src/graphql_api/ports.rs index e30715e92da..077a48d1637 100644 --- a/crates/fuel-core/src/graphql_api/ports.rs +++ b/crates/fuel-core/src/graphql_api/ports.rs @@ -19,7 +19,7 @@ use fuel_core_storage::{ StorageInspect, StorageRead, }; -use fuel_core_txpool::service::TxStatusMessage; +use fuel_core_txpool::TxStatusMessage; use fuel_core_types::{ blockchain::{ block::CompressedBlock, @@ -58,10 +58,7 @@ use fuel_core_types::{ executor::TransactionExecutionStatus, graphql_api::ContractBalance, p2p::PeerInfo, - txpool::{ - InsertionResult, - TransactionStatus, - }, + txpool::TransactionStatus, }, tai64::Tai64, }; @@ -199,14 +196,11 @@ pub trait DatabaseChain { #[async_trait] pub trait TxPoolPort: Send + Sync { - fn transaction(&self, id: TxId) -> Option; + async fn transaction(&self, id: TxId) -> anyhow::Result>; - fn submission_time(&self, id: TxId) -> Option; + async fn submission_time(&self, id: TxId) -> anyhow::Result>; - async fn insert( - &self, - txs: Vec>, - ) -> Vec>; + async fn insert(&self, txs: Transaction) -> anyhow::Result<()>; fn tx_update_subscribe( &self, diff --git a/crates/fuel-core/src/graphql_api/storage/coins.rs b/crates/fuel-core/src/graphql_api/storage/coins.rs index 58c8a23958d..42d22ba94ec 100644 --- a/crates/fuel-core/src/graphql_api/storage/coins.rs +++ b/crates/fuel-core/src/graphql_api/storage/coins.rs @@ -8,9 +8,9 @@ use fuel_core_storage::{ structured_storage::TableWithBlueprint, Mappable, }; -use fuel_core_txpool::types::TxId; use fuel_core_types::fuel_tx::{ Address, + TxId, UtxoId, }; diff --git a/crates/fuel-core/src/graphql_api/storage/contracts.rs b/crates/fuel-core/src/graphql_api/storage/contracts.rs index 27056502b61..51b4168ba14 100644 --- a/crates/fuel-core/src/graphql_api/storage/contracts.rs +++ b/crates/fuel-core/src/graphql_api/storage/contracts.rs @@ -7,8 +7,10 @@ use fuel_core_storage::{ structured_storage::TableWithBlueprint, Mappable, }; -use fuel_core_txpool::types::ContractId; -use fuel_core_types::entities::contract::ContractsInfoType; +use fuel_core_types::{ + entities::contract::ContractsInfoType, + fuel_tx::ContractId, +}; /// Contract info pub struct ContractsInfo; diff --git a/crates/fuel-core/src/graphql_api/storage/old.rs b/crates/fuel-core/src/graphql_api/storage/old.rs index a083b55c5f3..adb9c3b7f3d 100644 --- a/crates/fuel-core/src/graphql_api/storage/old.rs +++ b/crates/fuel-core/src/graphql_api/storage/old.rs @@ -15,13 +15,15 @@ use fuel_core_storage::{ structured_storage::TableWithBlueprint, Mappable, }; -use fuel_core_txpool::types::TxId; use fuel_core_types::{ blockchain::{ block::CompressedBlock, consensus::Consensus, }, - fuel_tx::Transaction, + fuel_tx::{ + Transaction, + TxId, + }, fuel_types::BlockHeight, }; diff --git a/crates/fuel-core/src/graphql_api/worker_service.rs b/crates/fuel-core/src/graphql_api/worker_service.rs index a1736aeefc7..959733d4919 100644 --- a/crates/fuel-core/src/graphql_api/worker_service.rs +++ b/crates/fuel-core/src/graphql_api/worker_service.rs @@ -42,7 +42,6 @@ use fuel_core_storage::{ Result as StorageResult, StorageAsMut, }; -use fuel_core_txpool::types::TxId; use fuel_core_types::{ blockchain::{ block::{ @@ -67,6 +66,7 @@ use fuel_core_types::{ Input, Output, Transaction, + TxId, UniqueIdentifier, }, fuel_types::{ diff --git a/crates/fuel-core/src/p2p_test_helpers.rs b/crates/fuel-core/src/p2p_test_helpers.rs index 8303122eb45..fbc9f32afb4 100644 --- a/crates/fuel-core/src/p2p_test_helpers.rs +++ b/crates/fuel-core/src/p2p_test_helpers.rs @@ -73,7 +73,6 @@ use std::{ Index, IndexMut, }, - sync::Arc, time::Duration, }; use tokio::sync::broadcast; @@ -504,7 +503,7 @@ impl Node { let tx_id = not_found[0]; let mut wait_transaction = - self.node.transaction_status_change(tx_id).unwrap(); + self.node.transaction_status_change(tx_id).await.unwrap(); loop { tokio::select! { @@ -546,20 +545,15 @@ impl Node { pub async fn insert_txs(&self) -> HashMap { let mut expected = HashMap::new(); for tx in &self.test_txs { - let tx_result = self - .node + let tx_id = tx.id(&ChainId::default()); + self.node .shared .txpool_shared_state - .insert(vec![Arc::new(tx.clone())]) + .insert(tx.clone()) .await - .pop() - .unwrap() .unwrap(); - let tx = Transaction::from(tx_result.inserted.as_ref()); - expected.insert(tx.id(&ChainId::default()), tx); - - assert!(tx_result.removed.is_empty()); + expected.insert(tx_id, tx.clone()); } expected } diff --git a/crates/fuel-core/src/query/message/test.rs b/crates/fuel-core/src/query/message/test.rs index 350004ba31d..3078f3a7de9 100644 --- a/crates/fuel-core/src/query/message/test.rs +++ b/crates/fuel-core/src/query/message/test.rs @@ -8,14 +8,12 @@ use fuel_core_types::{ }, entities::relayer::message::MerkleProof, fuel_tx::{ - Script, - Transaction, - }, - fuel_types::{ AssetId, - BlockHeight, ContractId, + Script, + Transaction, }, + fuel_types::BlockHeight, tai64::Tai64, }; diff --git a/crates/fuel-core/src/query/subscriptions.rs b/crates/fuel-core/src/query/subscriptions.rs index 701632fd16c..120bd73692c 100644 --- a/crates/fuel-core/src/query/subscriptions.rs +++ b/crates/fuel-core/src/query/subscriptions.rs @@ -1,6 +1,6 @@ use crate::schema::tx::types::TransactionStatus as ApiTxStatus; use fuel_core_storage::Result as StorageResult; -use fuel_core_txpool::service::TxStatusMessage; +use fuel_core_txpool::TxStatusMessage; use fuel_core_types::{ fuel_types::Bytes32, services::txpool::TransactionStatus as TxPoolTxStatus, @@ -17,20 +17,11 @@ mod test; #[cfg_attr(test, mockall::automock)] pub(crate) trait TxnStatusChangeState { /// Return the transaction status from the tx pool and database. - fn get_tx_status(&self, id: Bytes32) -> StorageResult>; -} - -impl TxnStatusChangeState for F -where - F: Fn(Bytes32) -> StorageResult> + Send + Sync, -{ - fn get_tx_status(&self, id: Bytes32) -> StorageResult> { - self(id) - } + async fn get_tx_status(&self, id: Bytes32) -> StorageResult>; } #[tracing::instrument(skip(state, stream), fields(transaction_id = %transaction_id))] -pub(crate) fn transaction_status_change<'a, State>( +pub(crate) async fn transaction_status_change<'a, State>( state: State, stream: BoxStream<'a, TxStatusMessage>, transaction_id: Bytes32, @@ -42,6 +33,7 @@ where // has a status. let check_db_first = state .get_tx_status(transaction_id) + .await .transpose() .map(TxStatusMessage::from); diff --git a/crates/fuel-core/src/query/subscriptions/test.rs b/crates/fuel-core/src/query/subscriptions/test.rs index c20b668a459..67e01848b7c 100644 --- a/crates/fuel-core/src/query/subscriptions/test.rs +++ b/crates/fuel-core/src/query/subscriptions/test.rs @@ -19,7 +19,10 @@ //! - `tx_status_message()`: Generates a TxStatusMessage //! - `transaction_status()`: Generates a TransactionStatus //! - `input_stream()`: Generates a Vec of length 0 to 5 -use fuel_core_txpool::service::TxStatusMessage; +use fuel_core_txpool::{ + error::RemovedReason, + TxStatusMessage, +}; use fuel_core_types::{ fuel_types::Bytes32, services::txpool::TransactionStatus, @@ -76,7 +79,7 @@ fn failed() -> TransactionStatus { /// Returns a TransactionStatus with SqueezedOut status and an empty error message fn squeezed() -> TransactionStatus { TransactionStatus::SqueezedOut { - reason: fuel_core_txpool::Error::SqueezedOut(String::new()).to_string(), + reason: fuel_core_txpool::error::Error::Removed(RemovedReason::Ttl).to_string(), } } @@ -251,6 +254,7 @@ fn test_tsc_inner( let stream = futures::stream::iter(stream).boxed(); super::transaction_status_change(mock_state, stream, txn_id(0)) + .await .collect::>() .await }) diff --git a/crates/fuel-core/src/query/tx.rs b/crates/fuel-core/src/query/tx.rs index 956701ccd99..1d2f1531363 100644 --- a/crates/fuel-core/src/query/tx.rs +++ b/crates/fuel-core/src/query/tx.rs @@ -14,11 +14,11 @@ use fuel_core_storage::{ tables::Transactions, Result as StorageResult, }; -use fuel_core_txpool::types::TxId; use fuel_core_types::{ fuel_tx::{ Receipt, Transaction, + TxId, TxPointer, }, fuel_types::Address, diff --git a/crates/fuel-core/src/schema/node_info.rs b/crates/fuel-core/src/schema/node_info.rs index ef4a5965f72..f58e28a1d3b 100644 --- a/crates/fuel-core/src/schema/node_info.rs +++ b/crates/fuel-core/src/schema/node_info.rs @@ -76,7 +76,7 @@ impl NodeQuery { utxo_validation: config.utxo_validation, vm_backtrace: config.vm_backtrace, max_tx: (config.max_tx as u64).into(), - max_depth: (config.max_txpool_depth as u64).into(), + max_depth: (config.max_txpool_dependency_chain_length as u64).into(), node_version: VERSION.to_owned(), }) } diff --git a/crates/fuel-core/src/schema/tx.rs b/crates/fuel-core/src/schema/tx.rs index 00a2aface70..d0a1474340a 100644 --- a/crates/fuel-core/src/schema/tx.rs +++ b/crates/fuel-core/src/schema/tx.rs @@ -10,11 +10,16 @@ use crate::{ IntoApiResult, QUERY_COSTS, }, + graphql_api::{ + database::ReadView, + ports::MemoryPool, + }, query::{ transaction_status_change, BlockQueryData, SimpleTransactionData, TransactionQueryData, + TxnStatusChangeState, }, schema::{ scalars::{ @@ -44,12 +49,10 @@ use fuel_core_storage::{ Error as StorageError, Result as StorageResult, }; -use fuel_core_txpool::{ - ports::MemoryPool, - service::TxStatusMessage, -}; +use fuel_core_txpool::TxStatusMessage; use fuel_core_types::{ fuel_tx::{ + Bytes32, Cacheable, Transaction as FuelTx, UniqueIdentifier, @@ -70,8 +73,8 @@ use futures::{ }; use itertools::Itertools; use std::{ + borrow::Cow, iter, - sync::Arc, }; use tokio_stream::StreamExt; use types::{ @@ -100,7 +103,7 @@ impl TxQuery { let id = id.0; let txpool = ctx.data_unchecked::(); - if let Some(transaction) = txpool.transaction(id) { + if let Some(transaction) = txpool.transaction(id).await? { Ok(Some(Transaction(transaction, id))) } else { query @@ -329,11 +332,10 @@ impl TxMutation { .latest_consensus_params(); let tx = FuelTx::from_bytes(&tx.0)?; - let _: Vec<_> = txpool - .insert(vec![Arc::new(tx.clone())]) + txpool + .insert(tx.clone()) .await - .into_iter() - .try_collect()?; + .map_err(|e| anyhow::anyhow!(e))?; let id = tx.id(¶ms.chain_id()); let tx = Transaction(tx, id); @@ -369,18 +371,12 @@ impl TxStatusSubscription { let rx = txpool.tx_update_subscribe(id.into())?; let query = ctx.read_view()?; - Ok(transaction_status_change( - move |id| match query.tx_status(&id) { - Ok(status) => Ok(Some(status)), - Err(StorageError::NotFound(_, _)) => Ok(txpool - .submission_time(id) - .map(|time| txpool::TransactionStatus::Submitted { time })), - Err(err) => Err(err), - }, - rx, - id.into(), + let status_change_state = StatusChangeState { txpool, query }; + Ok( + transaction_status_change(status_change_state, rx, id.into()) + .await + .map_err(async_graphql::Error::from), ) - .map_err(async_graphql::Error::from)) } /// Submits transaction to the `TxPool` and await either confirmation or failure. @@ -428,11 +424,7 @@ async fn submit_and_await_status<'a>( let tx_id = tx.id(¶ms.chain_id()); let subscription = txpool.tx_update_subscribe(tx_id)?; - let _: Vec<_> = txpool - .insert(vec![Arc::new(tx)]) - .await - .into_iter() - .try_collect()?; + txpool.insert(tx).await?; Ok(subscription .map(move |event| match event { @@ -446,3 +438,26 @@ async fn submit_and_await_status<'a>( }) .take(2)) } + +struct StatusChangeState<'a> { + query: Cow<'a, ReadView>, + txpool: &'a TxPool, +} + +impl<'a> TxnStatusChangeState for StatusChangeState<'a> { + async fn get_tx_status( + &self, + id: Bytes32, + ) -> StorageResult> { + match self.query.tx_status(&id) { + Ok(status) => Ok(Some(status)), + Err(StorageError::NotFound(_, _)) => Ok(self + .txpool + .submission_time(id) + .await + .map_err(|e| anyhow::anyhow!(e))? + .map(|time| txpool::TransactionStatus::Submitted { time })), + Err(err) => Err(err), + } + } +} diff --git a/crates/fuel-core/src/schema/tx/types.rs b/crates/fuel-core/src/schema/tx/types.rs index 060d18ae5dc..820271430e2 100644 --- a/crates/fuel-core/src/schema/tx/types.rs +++ b/crates/fuel-core/src/schema/tx/types.rs @@ -701,7 +701,9 @@ impl Transaction { let id = self.1; let query = ctx.read_view()?; let txpool = ctx.data_unchecked::(); - get_tx_status(id, query.as_ref(), txpool).map_err(Into::into) + get_tx_status(id, query.as_ref(), txpool) + .await + .map_err(Into::into) } async fn script(&self) -> Option { @@ -984,7 +986,7 @@ impl DryRunTransactionExecutionStatus { } #[tracing::instrument(level = "debug", skip(query, txpool), ret, err)] -pub(crate) fn get_tx_status( +pub(crate) async fn get_tx_status( id: fuel_core_types::fuel_types::Bytes32, query: &ReadView, txpool: &TxPool, @@ -997,12 +999,18 @@ pub(crate) fn get_tx_status( let status = TransactionStatus::new(id, status); Ok(Some(status)) } - None => match txpool.submission_time(id) { - Some(submitted_time) => Ok(Some(TransactionStatus::Submitted( - SubmittedStatus(submitted_time), - ))), - _ => Ok(None), - }, + None => { + let submitted_time = txpool + .submission_time(id) + .await + .map_err(|e| StorageError::Other(anyhow::anyhow!(e)))?; + match submitted_time { + Some(submitted_time) => Ok(Some(TransactionStatus::Submitted( + SubmittedStatus(submitted_time), + ))), + _ => Ok(None), + } + } } } diff --git a/crates/fuel-core/src/service/adapters.rs b/crates/fuel-core/src/service/adapters.rs index 7c356611967..f55d4572335 100644 --- a/crates/fuel-core/src/service/adapters.rs +++ b/crates/fuel-core/src/service/adapters.rs @@ -10,6 +10,7 @@ use fuel_core_poa::{ }; use fuel_core_services::stream::BoxStream; use fuel_core_storage::transactional::Changes; +use fuel_core_txpool::BorrowedTxPool; #[cfg(feature = "p2p")] use fuel_core_types::services::p2p::peer_reputation::AppScore; use fuel_core_types::{ @@ -18,7 +19,6 @@ use fuel_core_types::{ consensus::Consensus, }, fuel_tx::Transaction, - fuel_types::BlockHeight, services::{ block_importer::SharedImportResult, block_producer::Components, @@ -101,17 +101,16 @@ impl TxPoolAdapter { } } -#[derive(Clone)] pub struct TransactionsSource { - txpool: TxPoolSharedState, - _block_height: BlockHeight, + tx_pool: BorrowedTxPool, + minimum_gas_price: u64, } impl TransactionsSource { - pub fn new(txpool: TxPoolSharedState, block_height: BlockHeight) -> Self { + pub fn new(minimum_gas_price: u64, tx_pool: BorrowedTxPool) -> Self { Self { - txpool, - _block_height: block_height, + tx_pool, + minimum_gas_price, } } } diff --git a/crates/fuel-core/src/service/adapters/consensus_module/poa.rs b/crates/fuel-core/src/service/adapters/consensus_module/poa.rs index adcd72d2d61..f3ebf270f91 100644 --- a/crates/fuel-core/src/service/adapters/consensus_module/poa.rs +++ b/crates/fuel-core/src/service/adapters/consensus_module/poa.rs @@ -26,18 +26,14 @@ use fuel_core_services::stream::BoxStream; use fuel_core_storage::transactional::Changes; use fuel_core_types::{ blockchain::block::Block, - fuel_tx::TxId, + fuel_tx::Bytes32, fuel_types::BlockHeight, services::{ block_importer::{ BlockImportInfo, UncommittedResult as UncommittedImporterResult, }, - executor::{ - Error as ExecutorError, - UncommittedResult, - }, - txpool::ArcPoolTx, + executor::UncommittedResult, }, tai64::Tai64, }; @@ -45,6 +41,7 @@ use std::path::{ Path, PathBuf, }; +use tokio::sync::watch; use tokio_stream::{ wrappers::BroadcastStream, StreamExt, @@ -81,27 +78,12 @@ impl ConsensusModulePort for PoAAdapter { } impl TransactionPool for TxPoolAdapter { - fn pending_number(&self) -> usize { - self.service.pending_number() - } - - fn total_consumable_gas(&self) -> u64 { - self.service.total_consumable_gas() - } - - fn remove_txs(&self, ids: Vec<(TxId, ExecutorError)>) -> Vec { - self.service.remove_txs( - ids.into_iter() - .map(|(tx_id, err)| (tx_id, err.to_string())) - .collect(), - ) + fn new_txs_watcher(&self) -> watch::Receiver<()> { + self.service.get_new_txs_notifier() } - fn transaction_status_events(&self) -> BoxStream { - Box::pin( - BroadcastStream::new(self.service.new_tx_notification_subscribe()) - .filter_map(|result| result.ok()), - ) + fn notify_skipped_txs(&self, tx_ids_and_reasons: Vec<(Bytes32, String)>) { + self.service.notify_skipped_txs(tx_ids_and_reasons) } } diff --git a/crates/fuel-core/src/service/adapters/executor.rs b/crates/fuel-core/src/service/adapters/executor.rs index 05d4400a1f4..663c219b001 100644 --- a/crates/fuel-core/src/service/adapters/executor.rs +++ b/crates/fuel-core/src/service/adapters/executor.rs @@ -3,10 +3,12 @@ use crate::{ service::adapters::TransactionsSource, }; use fuel_core_executor::ports::MaybeCheckedTransaction; +use fuel_core_txpool::Constraints; use fuel_core_types::{ blockchain::primitives::DaBlockHeight, services::relayer::Event, }; +use std::sync::Arc; impl fuel_core_executor::ports::TransactionsSource for TransactionsSource { fn next( @@ -15,18 +17,19 @@ impl fuel_core_executor::ports::TransactionsSource for TransactionsSource { transactions_limit: u16, block_transaction_size_limit: u32, ) -> Vec { - self.txpool - .select_transactions( - gas_limit, - transactions_limit, - block_transaction_size_limit, - ) + self.tx_pool + .exclusive_lock() + .extract_transactions_for_block(Constraints { + minimal_gas_price: self.minimum_gas_price, + max_gas: gas_limit, + maximum_txs: transactions_limit, + maximum_block_size: block_transaction_size_limit, + }) .into_iter() .map(|tx| { - MaybeCheckedTransaction::CheckedTransaction( - tx.as_ref().into(), - tx.used_consensus_parameters_version(), - ) + let transaction = Arc::unwrap_or_clone(tx); + let version = transaction.used_consensus_parameters_version(); + MaybeCheckedTransaction::CheckedTransaction(transaction.into(), version) }) .collect() } diff --git a/crates/fuel-core/src/service/adapters/fuel_gas_price_provider.rs b/crates/fuel-core/src/service/adapters/fuel_gas_price_provider.rs index c1d79e3d1f3..aeb17627ccf 100644 --- a/crates/fuel-core/src/service/adapters/fuel_gas_price_provider.rs +++ b/crates/fuel-core/src/service/adapters/fuel_gas_price_provider.rs @@ -5,10 +5,7 @@ use fuel_core_gas_price_service::common::gas_price_algorithm::{ }; use fuel_core_producer::block_producer::gas_price::GasPriceProvider as ProducerGasPriceProvider; -use fuel_core_txpool::{ - ports::GasPriceProvider as TxPoolGasPriceProvider, - Result as TxPoolResult, -}; +use fuel_core_txpool::ports::GasPriceProvider as TxPoolGasPriceProvider; use fuel_core_types::fuel_types::BlockHeight; pub type Result = std::result::Result; @@ -54,8 +51,8 @@ impl FuelGasPriceProvider where A: GasPriceAlgorithm + Send + Sync, { - async fn next_gas_price(&self) -> u64 { - self.algorithm.next_gas_price().await + fn next_gas_price(&self) -> u64 { + self.algorithm.next_gas_price() } } @@ -65,17 +62,16 @@ where A: GasPriceAlgorithm + Send + Sync, { async fn next_gas_price(&self) -> anyhow::Result { - Ok(self.next_gas_price().await) + Ok(self.next_gas_price()) } } -#[async_trait::async_trait] impl TxPoolGasPriceProvider for FuelGasPriceProvider where - A: GasPriceAlgorithm + Send + Sync, + A: GasPriceAlgorithm + Send + Sync + 'static, { - async fn next_gas_price(&self) -> TxPoolResult { - Ok(self.next_gas_price().await) + fn next_gas_price(&self) -> u64 { + self.next_gas_price() } } diff --git a/crates/fuel-core/src/service/adapters/fuel_gas_price_provider/tests/producer_gas_price_tests.rs b/crates/fuel-core/src/service/adapters/fuel_gas_price_provider/tests/producer_gas_price_tests.rs index 81429785248..d8929cb4fdf 100644 --- a/crates/fuel-core/src/service/adapters/fuel_gas_price_provider/tests/producer_gas_price_tests.rs +++ b/crates/fuel-core/src/service/adapters/fuel_gas_price_provider/tests/producer_gas_price_tests.rs @@ -13,7 +13,7 @@ async fn gas_price__if_requested_block_height_is_latest_return_gas_price() { // when let expected_price = algo.next_gas_price(); - let actual_price = gas_price_provider.next_gas_price().await; + let actual_price = gas_price_provider.next_gas_price(); // then assert_eq!(expected_price, actual_price); diff --git a/crates/fuel-core/src/service/adapters/fuel_gas_price_provider/tests/tx_pool_gas_price_tests.rs b/crates/fuel-core/src/service/adapters/fuel_gas_price_provider/tests/tx_pool_gas_price_tests.rs index 81429785248..d8929cb4fdf 100644 --- a/crates/fuel-core/src/service/adapters/fuel_gas_price_provider/tests/tx_pool_gas_price_tests.rs +++ b/crates/fuel-core/src/service/adapters/fuel_gas_price_provider/tests/tx_pool_gas_price_tests.rs @@ -13,7 +13,7 @@ async fn gas_price__if_requested_block_height_is_latest_return_gas_price() { // when let expected_price = algo.next_gas_price(); - let actual_price = gas_price_provider.next_gas_price().await; + let actual_price = gas_price_provider.next_gas_price(); // then assert_eq!(expected_price, actual_price); diff --git a/crates/fuel-core/src/service/adapters/graphql_api.rs b/crates/fuel-core/src/service/adapters/graphql_api.rs index 5c1ecd9c80b..ff96e484ed9 100644 --- a/crates/fuel-core/src/service/adapters/graphql_api.rs +++ b/crates/fuel-core/src/service/adapters/graphql_api.rs @@ -2,6 +2,7 @@ use super::{ BlockImporterAdapter, BlockProducerAdapter, ConsensusParametersProvider, + SharedMemoryPool, StaticGasPrice, }; use crate::{ @@ -15,19 +16,20 @@ use crate::{ P2pPort, TxPoolPort, }, - service::adapters::{ - import_result_provider::ImportResultProvider, - P2PAdapter, - TxPoolAdapter, + graphql_api::ports::MemoryPool, + service::{ + adapters::{ + import_result_provider::ImportResultProvider, + P2PAdapter, + TxPoolAdapter, + }, + vm_pool::MemoryFromPool, }, }; use async_trait::async_trait; use fuel_core_services::stream::BoxStream; use fuel_core_storage::Result as StorageResult; -use fuel_core_txpool::{ - service::TxStatusMessage, - types::TxId, -}; +use fuel_core_txpool::TxStatusMessage; use fuel_core_types::{ blockchain::header::ConsensusParametersVersion, entities::relayer::message::MerkleProof, @@ -35,16 +37,14 @@ use fuel_core_types::{ Bytes32, ConsensusParameters, Transaction, + TxId, }, fuel_types::BlockHeight, services::{ block_importer::SharedImportResult, executor::TransactionExecutionStatus, p2p::PeerInfo, - txpool::{ - InsertionResult, - TransactionStatus, - }, + txpool::TransactionStatus, }, tai64::Tai64, }; @@ -58,28 +58,36 @@ mod on_chain; #[async_trait] impl TxPoolPort for TxPoolAdapter { - fn transaction(&self, id: TxId) -> Option { - self.service + async fn transaction(&self, id: TxId) -> anyhow::Result> { + Ok(self + .service .find_one(id) - .map(|info| info.tx().clone().deref().into()) + .await + .map_err(|e| anyhow::anyhow!(e))? + .map(|info| info.tx().clone().deref().into())) } - fn submission_time(&self, id: TxId) -> Option { - self.service + async fn submission_time(&self, id: TxId) -> anyhow::Result> { + Ok(self + .service .find_one(id) - .map(|info| Tai64::from_unix(info.submitted_time().as_secs() as i64)) + .await + .map_err(|e| anyhow::anyhow!(e))? + .map(|info| { + Tai64::from_unix( + info.creation_instant() + .duration_since(std::time::UNIX_EPOCH) + .expect("Time can't be lower than 0") + .as_secs() as i64, + ) + })) } - async fn insert( - &self, - txs: Vec>, - ) -> Vec> { + async fn insert(&self, tx: Transaction) -> anyhow::Result<()> { self.service - .insert(txs) + .insert(tx) .await - .into_iter() - .map(|res| res.map_err(|e| anyhow::anyhow!(e))) - .collect() + .map_err(|e| anyhow::anyhow!(e)) } fn tx_update_subscribe( @@ -161,7 +169,7 @@ impl worker::TxPool for TxPoolAdapter { block_height: &BlockHeight, status: TransactionStatus, ) { - self.service.send_complete(id, block_height, status) + self.service.notify_complete_tx(id, block_height, status) } } @@ -215,3 +223,12 @@ impl worker::BlockImporter for GraphQLBlockImporter { self.import_result_provider_adapter.result_at_height(height) } } + +#[async_trait::async_trait] +impl MemoryPool for SharedMemoryPool { + type Memory = MemoryFromPool; + + async fn get_memory(&self) -> Self::Memory { + self.memory_pool.take_raw().await + } +} diff --git a/crates/fuel-core/src/service/adapters/graphql_api/off_chain.rs b/crates/fuel-core/src/service/adapters/graphql_api/off_chain.rs index 092c83da438..d554c7ddc45 100644 --- a/crates/fuel-core/src/service/adapters/graphql_api/off_chain.rs +++ b/crates/fuel-core/src/service/adapters/graphql_api/off_chain.rs @@ -42,10 +42,6 @@ use fuel_core_storage::{ Result as StorageResult, StorageAsRef, }; -use fuel_core_txpool::types::{ - ContractId, - TxId, -}; use fuel_core_types::{ blockchain::{ block::CompressedBlock, @@ -56,8 +52,10 @@ use fuel_core_types::{ fuel_tx::{ Address, Bytes32, + ContractId, Salt, Transaction, + TxId, TxPointer, UtxoId, }, diff --git a/crates/fuel-core/src/service/adapters/graphql_api/on_chain.rs b/crates/fuel-core/src/service/adapters/graphql_api/on_chain.rs index 76532a8d718..b3a6d860e76 100644 --- a/crates/fuel-core/src/service/adapters/graphql_api/on_chain.rs +++ b/crates/fuel-core/src/service/adapters/graphql_api/on_chain.rs @@ -30,10 +30,6 @@ use fuel_core_storage::{ Result as StorageResult, StorageAsRef, }; -use fuel_core_txpool::types::{ - ContractId, - TxId, -}; use fuel_core_types::{ blockchain::{ block::CompressedBlock, @@ -43,7 +39,9 @@ use fuel_core_types::{ entities::relayer::message::Message, fuel_tx::{ AssetId, + ContractId, Transaction, + TxId, }, fuel_types::{ BlockHeight, diff --git a/crates/fuel-core/src/service/adapters/p2p.rs b/crates/fuel-core/src/service/adapters/p2p.rs index db3545c095b..a01fde29430 100644 --- a/crates/fuel-core/src/service/adapters/p2p.rs +++ b/crates/fuel-core/src/service/adapters/p2p.rs @@ -10,12 +10,12 @@ use fuel_core_p2p::ports::{ }; use fuel_core_services::stream::BoxStream; use fuel_core_storage::Result as StorageResult; -use fuel_core_txpool::types::TxId; use fuel_core_types::{ blockchain::{ consensus::Genesis, SealedBlockHeader, }, + fuel_tx::TxId, fuel_types::BlockHeight, services::p2p::{ NetworkableTransactionPool, @@ -59,19 +59,28 @@ impl BlockHeightImporter for BlockImporterAdapter { } impl TxPool for TxPoolAdapter { - fn get_tx_ids(&self, max_txs: usize) -> Vec { - self.service.get_tx_ids(max_txs) + async fn get_tx_ids(&self, max_txs: usize) -> anyhow::Result> { + self.service + .get_tx_ids(max_txs) + .await + .map_err(|e| anyhow::anyhow!(e)) } - fn get_full_txs(&self, tx_ids: Vec) -> Vec> { - self.service + async fn get_full_txs( + &self, + tx_ids: Vec, + ) -> anyhow::Result>> { + Ok(self + .service .find(tx_ids) + .await + .map_err(|e| anyhow::anyhow!(e))? .into_iter() .map(|tx_info| { tx_info.map(|tx| { NetworkableTransactionPool::PoolTransaction(tx.tx().clone()) }) }) - .collect() + .collect()) } } diff --git a/crates/fuel-core/src/service/adapters/producer.rs b/crates/fuel-core/src/service/adapters/producer.rs index d7ac7590106..7a977f69228 100644 --- a/crates/fuel-core/src/service/adapters/producer.rs +++ b/crates/fuel-core/src/service/adapters/producer.rs @@ -78,12 +78,21 @@ impl BlockProducerAdapter { } } -#[async_trait::async_trait] impl TxPool for TxPoolAdapter { type TxSource = TransactionsSource; - fn get_source(&self, block_height: BlockHeight) -> Self::TxSource { - TransactionsSource::new(self.service.clone(), block_height) + async fn get_source( + &self, + gas_price: u64, + _: BlockHeight, + ) -> anyhow::Result { + let tx_pool = self + .service + .borrow_txpool() + .await + .map_err(|e| anyhow::anyhow!(e))?; + + Ok(TransactionsSource::new(gas_price, tx_pool)) } } diff --git a/crates/fuel-core/src/service/adapters/txpool.rs b/crates/fuel-core/src/service/adapters/txpool.rs index 97ff57fb200..f88d5ca44d4 100644 --- a/crates/fuel-core/src/service/adapters/txpool.rs +++ b/crates/fuel-core/src/service/adapters/txpool.rs @@ -1,14 +1,10 @@ use crate::{ database::OnChainIterableKeyValueView, - service::{ - adapters::{ - BlockImporterAdapter, - ConsensusParametersProvider, - P2PAdapter, - SharedMemoryPool, - StaticGasPrice, - }, - vm_pool::MemoryFromPool, + service::adapters::{ + BlockImporterAdapter, + ConsensusParametersProvider, + P2PAdapter, + StaticGasPrice, }, }; use fuel_core_services::stream::BoxStream; @@ -21,15 +17,10 @@ use fuel_core_storage::{ Result as StorageResult, StorageAsRef, }; -use fuel_core_txpool::{ - ports::{ - BlockImporter, - ConsensusParametersProvider as ConsensusParametersProviderTrait, - GasPriceProvider, - MemoryPool, - }, - types::TxId, - Result as TxPoolResult, +use fuel_core_txpool::ports::{ + BlockImporter, + ConsensusParametersProvider as ConsensusParametersProviderTrait, + GasPriceProvider, }; use fuel_core_types::{ blockchain::header::ConsensusParametersVersion, @@ -41,6 +32,7 @@ use fuel_core_types::{ BlobId, ConsensusParameters, Transaction, + TxId, UtxoId, }, fuel_types::{ @@ -68,9 +60,7 @@ impl BlockImporter for BlockImporterAdapter { #[cfg(feature = "p2p")] #[async_trait::async_trait] -impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter { - type GossipedTransaction = TransactionGossipData; - +impl fuel_core_txpool::ports::NotifyP2P for P2PAdapter { fn broadcast_transaction(&self, transaction: Arc) -> anyhow::Result<()> { if let Some(service) = &self.service { service.broadcast_transaction(transaction) @@ -79,6 +69,23 @@ impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter { } } + fn notify_gossip_transaction_validity( + &self, + message_info: GossipsubMessageInfo, + validity: GossipsubMessageAcceptance, + ) -> anyhow::Result<()> { + if let Some(service) = &self.service { + service.notify_gossip_transaction_validity(message_info, validity) + } else { + Ok(()) + } + } +} + +#[cfg(feature = "p2p")] +impl fuel_core_txpool::ports::P2PSubscriptions for P2PAdapter { + type GossipedTransaction = TransactionGossipData; + fn gossiped_transaction_events(&self) -> BoxStream { use tokio_stream::{ wrappers::BroadcastStream, @@ -108,19 +115,11 @@ impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter { Box::pin(fuel_core_services::stream::pending()) } } +} - fn notify_gossip_transaction_validity( - &self, - message_info: GossipsubMessageInfo, - validity: GossipsubMessageAcceptance, - ) -> anyhow::Result<()> { - if let Some(service) = &self.service { - service.notify_gossip_transaction_validity(message_info, validity) - } else { - Ok(()) - } - } - +#[cfg(feature = "p2p")] +#[async_trait::async_trait] +impl fuel_core_txpool::ports::P2PRequests for P2PAdapter { async fn request_tx_ids(&self, peer_id: PeerId) -> anyhow::Result> { if let Some(service) = &self.service { service.get_all_transactions_ids_from_peer(peer_id).await @@ -145,47 +144,54 @@ impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter { } #[cfg(not(feature = "p2p"))] -#[async_trait::async_trait] -impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter { - type GossipedTransaction = TransactionGossipData; +const _: () = { + #[async_trait::async_trait] + impl fuel_core_txpool::ports::NotifyP2P for P2PAdapter { + fn broadcast_transaction( + &self, + _transaction: Arc, + ) -> anyhow::Result<()> { + Ok(()) + } - fn broadcast_transaction( - &self, - _transaction: Arc, - ) -> anyhow::Result<()> { - Ok(()) + fn notify_gossip_transaction_validity( + &self, + _message_info: GossipsubMessageInfo, + _validity: GossipsubMessageAcceptance, + ) -> anyhow::Result<()> { + Ok(()) + } } - fn gossiped_transaction_events(&self) -> BoxStream { - Box::pin(fuel_core_services::stream::pending()) - } + impl fuel_core_txpool::ports::P2PSubscriptions for P2PAdapter { + type GossipedTransaction = TransactionGossipData; - fn notify_gossip_transaction_validity( - &self, - _message_info: GossipsubMessageInfo, - _validity: GossipsubMessageAcceptance, - ) -> anyhow::Result<()> { - Ok(()) - } + fn gossiped_transaction_events(&self) -> BoxStream { + Box::pin(fuel_core_services::stream::pending()) + } - fn subscribe_new_peers(&self) -> BoxStream { - Box::pin(fuel_core_services::stream::pending()) + fn subscribe_new_peers(&self) -> BoxStream { + Box::pin(fuel_core_services::stream::pending()) + } } - async fn request_tx_ids(&self, _peer_id: PeerId) -> anyhow::Result> { - Ok(vec![]) - } + #[async_trait::async_trait] + impl fuel_core_txpool::ports::P2PRequests for P2PAdapter { + async fn request_tx_ids(&self, _peer_id: PeerId) -> anyhow::Result> { + Ok(vec![]) + } - async fn request_txs( - &self, - _peer_id: PeerId, - _tx_ids: Vec, - ) -> anyhow::Result>> { - Ok(vec![]) + async fn request_txs( + &self, + _peer_id: PeerId, + _tx_ids: Vec, + ) -> anyhow::Result>> { + Ok(vec![]) + } } -} +}; -impl fuel_core_txpool::ports::TxPoolDb for OnChainIterableKeyValueView { +impl fuel_core_txpool::ports::TxPoolPersistentStorage for OnChainIterableKeyValueView { fn utxo(&self, utxo_id: &UtxoId) -> StorageResult> { self.storage::() .get(utxo_id) @@ -209,8 +215,8 @@ impl fuel_core_txpool::ports::TxPoolDb for OnChainIterableKeyValueView { #[async_trait::async_trait] impl GasPriceProvider for StaticGasPrice { - async fn next_gas_price(&self) -> TxPoolResult { - Ok(self.gas_price) + fn next_gas_price(&self) -> u64 { + self.gas_price } } @@ -221,12 +227,3 @@ impl ConsensusParametersProviderTrait for ConsensusParametersProvider { self.shared_state.latest_consensus_parameters_with_version() } } - -#[async_trait::async_trait] -impl MemoryPool for SharedMemoryPool { - type Memory = MemoryFromPool; - - async fn get_memory(&self) -> Self::Memory { - self.memory_pool.take_raw().await - } -} diff --git a/crates/fuel-core/src/service/config.rs b/crates/fuel-core/src/service/config.rs index 773f845b347..8e17d417e85 100644 --- a/crates/fuel-core/src/service/config.rs +++ b/crates/fuel-core/src/service/config.rs @@ -27,6 +27,7 @@ use fuel_core_p2p::config::{ pub use fuel_core_poa::Trigger; #[cfg(feature = "relayer")] use fuel_core_relayer::Config as RelayerConfig; +use fuel_core_txpool::config::Config as TxPoolConfig; use fuel_core_types::blockchain::header::StateTransitionBytecodeVersion; use crate::{ @@ -54,7 +55,7 @@ pub struct Config { pub block_production: Trigger, pub predefined_blocks_path: Option, pub vm: VMConfig, - pub txpool: fuel_core_txpool::Config, + pub txpool: TxPoolConfig, pub block_producer: fuel_core_producer::Config, pub starting_gas_price: u64, pub gas_price_change_percent: u64, @@ -155,10 +156,10 @@ impl Config { block_production: Trigger::Instant, predefined_blocks_path: None, vm: Default::default(), - txpool: fuel_core_txpool::Config { + txpool: TxPoolConfig { utxo_validation, - transaction_ttl: Duration::from_secs(60 * 100000000), - ..fuel_core_txpool::Config::default() + max_txs_ttl: Duration::from_secs(60 * 100000000), + ..Default::default() }, block_producer: fuel_core_producer::Config { ..Default::default() diff --git a/crates/fuel-core/src/service/query.rs b/crates/fuel-core/src/service/query.rs index 9bf0c3823ea..f82f0f9679a 100644 --- a/crates/fuel-core/src/service/query.rs +++ b/crates/fuel-core/src/service/query.rs @@ -1,21 +1,25 @@ //! Queries we can run directly on `FuelService`. -use std::sync::Arc; - +use fuel_core_storage::Result as StorageResult; use fuel_core_types::{ fuel_tx::{ Transaction, UniqueIdentifier, }, fuel_types::Bytes32, - services::txpool::InsertionResult, + services::txpool::TransactionStatus as TxPoolTxStatus, }; use futures::{ Stream, StreamExt, }; +use std::time::SystemTimeError; use crate::{ - query::transaction_status_change, + database::OffChainIterableKeyValueView, + query::{ + transaction_status_change, + TxnStatusChangeState, + }, schema::tx::types::TransactionStatus, }; @@ -23,26 +27,19 @@ use super::*; impl FuelService { /// Submit a transaction to the txpool. - pub async fn submit(&self, tx: Transaction) -> anyhow::Result { - let results: Vec<_> = self - .shared + pub async fn submit(&self, tx: Transaction) -> anyhow::Result<()> { + self.shared .txpool_shared_state - .insert(vec![Arc::new(tx)]) + .insert(tx) .await - .into_iter() - .collect::>() - .map_err(|e| anyhow::anyhow!(e))?; - results - .into_iter() - .next() - .ok_or_else(|| anyhow::anyhow!("Nothing was inserted")) + .map_err(|e| anyhow::anyhow!(e)) } /// Submit a transaction to the txpool and return a stream of status changes. pub async fn submit_and_status_change( &self, tx: Transaction, - ) -> anyhow::Result>> { + ) -> anyhow::Result> + '_> { let id = tx.id(&self .shared .config @@ -50,7 +47,7 @@ impl FuelService { .chain_config() .consensus_parameters .chain_id()); - let stream = self.transaction_status_change(id)?; + let stream = self.transaction_status_change(id).await?; self.submit(tx).await?; Ok(stream) } @@ -67,7 +64,7 @@ impl FuelService { .chain_config() .consensus_parameters .chain_id()); - let stream = self.transaction_status_change(id)?.filter(|status| { + let stream = self.transaction_status_change(id).await?.filter(|status| { futures::future::ready(!matches!(status, Ok(TransactionStatus::Submitted(_)))) }); futures::pin_mut!(stream); @@ -79,20 +76,39 @@ impl FuelService { } /// Return a stream of status changes for a transaction. - pub fn transaction_status_change( + pub async fn transaction_status_change( &self, id: Bytes32, - ) -> anyhow::Result>> { - let txpool = self.shared.txpool_shared_state.clone(); + ) -> anyhow::Result> + '_> { + let txpool = &self.shared.txpool_shared_state; let db = self.shared.database.off_chain().latest_view()?; let rx = txpool.tx_update_subscribe(id)?; - Ok(transaction_status_change( - move |id| match db.get_tx_status(&id)? { - Some(status) => Ok(Some(status)), - None => Ok(txpool.find_one(id).map(Into::into)), - }, - rx, - id, - )) + let state = StatusChangeState { db, txpool }; + Ok(transaction_status_change(state, rx, id).await) + } +} + +struct StatusChangeState<'a> { + db: OffChainIterableKeyValueView, + txpool: &'a TxPoolSharedState, +} + +impl<'a> TxnStatusChangeState for StatusChangeState<'a> { + async fn get_tx_status(&self, id: Bytes32) -> StorageResult> { + match self.db.get_tx_status(&id)? { + Some(status) => Ok(Some(status)), + None => { + let result = self + .txpool + .find_one(id) + .await + .map_err(|e| anyhow::anyhow!(e))?; + let status = result + .map(|status| status.try_into()) + .transpose() + .map_err(|e: SystemTimeError| anyhow::anyhow!(e))?; + Ok(status) + } + } } } diff --git a/crates/fuel-core/src/service/sub_services.rs b/crates/fuel-core/src/service/sub_services.rs index 82864d73fd9..13c9f6a9d01 100644 --- a/crates/fuel-core/src/service/sub_services.rs +++ b/crates/fuel-core/src/service/sub_services.rs @@ -65,14 +65,7 @@ pub type PoAService = fuel_core_poa::Service< >; #[cfg(feature = "p2p")] pub type P2PService = fuel_core_p2p::service::Service; -pub type TxPoolSharedState = fuel_core_txpool::service::SharedState< - P2PAdapter, - Database, - ExecutorAdapter, - FuelGasPriceProvider, - ConsensusParametersProvider, - SharedMemoryPool, ->; +pub type TxPoolSharedState = fuel_core_txpool::SharedState; pub type BlockProducerService = fuel_core_producer::block_producer::Producer< Database, TxPoolAdapter, @@ -201,15 +194,15 @@ pub fn init_sub_services( let gas_price_provider = FuelGasPriceProvider::new(gas_price_service_v0.shared.clone()); let txpool = fuel_core_txpool::new_service( + chain_id, config.txpool.clone(), - database.on_chain().clone(), - importer_adapter.clone(), p2p_adapter.clone(), - executor.clone(), + importer_adapter.clone(), + database.on_chain().clone(), + consensus_parameters_provider.clone(), last_height, gas_price_provider.clone(), - consensus_parameters_provider.clone(), - SharedMemoryPool::new(config.memory_pool_size), + executor.clone(), ); let tx_pool_adapter = TxPoolAdapter::new(txpool.shared.clone()); @@ -298,8 +291,8 @@ pub fn init_sub_services( utxo_validation: config.utxo_validation, debug: config.debug, vm_backtrace: config.vm.backtrace, - max_tx: config.txpool.max_tx, - max_txpool_depth: config.txpool.max_depth, + max_tx: config.txpool.pool_limits.max_txs, + max_txpool_dependency_chain_length: config.txpool.max_txs_chain_count, chain_name, }; diff --git a/crates/services/consensus_module/poa/Cargo.toml b/crates/services/consensus_module/poa/Cargo.toml index eb97756e4eb..5d71379853d 100644 --- a/crates/services/consensus_module/poa/Cargo.toml +++ b/crates/services/consensus_module/poa/Cargo.toml @@ -29,6 +29,7 @@ aws-config = { version = "1.1.7", features = ["behavior-version-latest"] } fuel-core-poa = { path = ".", features = ["test-helpers"] } fuel-core-services = { workspace = true, features = ["test-helpers"] } fuel-core-storage = { path = "./../../../storage", features = ["test-helpers"] } +fuel-core-trace = { path = "./../../../trace" } fuel-core-types = { path = "./../../../types", features = ["test-helpers"] } mockall = { workspace = true } rand = { workspace = true } diff --git a/crates/services/consensus_module/poa/src/lib.rs b/crates/services/consensus_module/poa/src/lib.rs index 272c569ed61..19dfeb826cf 100644 --- a/crates/services/consensus_module/poa/src/lib.rs +++ b/crates/services/consensus_module/poa/src/lib.rs @@ -24,3 +24,6 @@ pub use service::{ new_service, Service, }; + +#[cfg(test)] +fuel_core_trace::enable_tracing!(); diff --git a/crates/services/consensus_module/poa/src/ports.rs b/crates/services/consensus_module/poa/src/ports.rs index 2148bcfb44b..af3e889eff9 100644 --- a/crates/services/consensus_module/poa/src/ports.rs +++ b/crates/services/consensus_module/poa/src/ports.rs @@ -10,10 +10,7 @@ use fuel_core_types::{ header::BlockHeader, primitives::DaBlockHeight, }, - fuel_tx::{ - Transaction, - TxId, - }, + fuel_tx::Transaction, fuel_types::{ BlockHeight, Bytes32, @@ -23,11 +20,7 @@ use fuel_core_types::{ BlockImportInfo, UncommittedResult as UncommittedImportResult, }, - executor::{ - Error as ExecutorError, - UncommittedResult as UncommittedExecutionResult, - }, - txpool::ArcPoolTx, + executor::UncommittedResult as UncommittedExecutionResult, }, tai64::Tai64, }; @@ -35,14 +28,9 @@ use std::collections::HashMap; #[cfg_attr(test, mockall::automock)] pub trait TransactionPool: Send + Sync { - /// Returns the number of pending transactions in the `TxPool`. - fn pending_number(&self) -> usize; - - fn total_consumable_gas(&self) -> u64; - - fn remove_txs(&self, tx_ids: Vec<(TxId, ExecutorError)>) -> Vec; + fn new_txs_watcher(&self) -> tokio::sync::watch::Receiver<()>; - fn transaction_status_events(&self) -> BoxStream; + fn notify_skipped_txs(&self, tx_ids_and_reasons: Vec<(Bytes32, String)>); } /// The source of transactions for the block. diff --git a/crates/services/consensus_module/poa/src/service.rs b/crates/services/consensus_module/poa/src/service.rs index 2d2f07a21d7..04255ac108a 100644 --- a/crates/services/consensus_module/poa/src/service.rs +++ b/crates/services/consensus_module/poa/src/service.rs @@ -16,7 +16,6 @@ use tokio::{ Instant, }, }; -use tokio_stream::StreamExt; use crate::{ ports::{ @@ -37,10 +36,7 @@ use crate::{ Trigger, }; use fuel_core_services::{ - stream::{ - BoxFuture, - BoxStream, - }, + stream::BoxFuture, RunnableService, RunnableTask, Service as OtherService, @@ -132,7 +128,7 @@ pub struct MainTask { block_producer: B, block_importer: I, txpool: T, - tx_status_update_stream: BoxStream, + new_txs_watcher: tokio::sync::watch::Receiver<()>, request_receiver: mpsc::Receiver, shared_state: SharedState, last_height: BlockHeight, @@ -164,7 +160,7 @@ where predefined_blocks: PB, clock: C, ) -> Self { - let tx_status_update_stream = txpool.transaction_status_events(); + let new_txs_watcher = txpool.new_txs_watcher(); let (request_sender, request_receiver) = mpsc::channel(1024); let (last_height, last_timestamp, last_block_created) = Self::extract_block_info(clock.now(), last_block); @@ -194,7 +190,7 @@ where txpool, block_producer, block_importer, - tx_status_update_stream, + new_txs_watcher, request_receiver, shared_state: SharedState { request_sender }, last_height, @@ -343,16 +339,18 @@ where .await? .into(); - let mut tx_ids_to_remove = Vec::with_capacity(skipped_transactions.len()); - for (tx_id, err) in skipped_transactions { - tracing::error!( + if !skipped_transactions.is_empty() { + let mut tx_ids_to_remove = Vec::with_capacity(skipped_transactions.len()); + for (tx_id, err) in skipped_transactions { + tracing::error!( "During block production got invalid transaction {:?} with error {:?}", tx_id, err ); - tx_ids_to_remove.push((tx_id, err)); + tx_ids_to_remove.push((tx_id, err.to_string())); + } + self.txpool.notify_skipped_txs(tx_ids_to_remove); } - self.txpool.remove_txs(tx_ids_to_remove); // Sign the block and seal it let seal = self.signer.seal_block(&block).await?; @@ -438,16 +436,9 @@ where Ok(()) } - pub(crate) async fn on_txpool_event(&mut self) -> anyhow::Result<()> { + async fn on_txpool_event(&mut self) -> anyhow::Result<()> { match self.trigger { - Trigger::Instant => { - let pending_number = self.txpool.pending_number(); - // skip production if there are no pending transactions - if pending_number > 0 { - self.produce_next_block().await?; - } - Ok(()) - } + Trigger::Instant => self.produce_next_block().await, Trigger::Never | Trigger::Interval { .. } => Ok(()), } } @@ -531,19 +522,14 @@ where let should_continue; let mut sync_state = self.sync_task_handle.shared.clone(); // make sure we're synced first - while *sync_state.borrow_and_update() == SyncState::NotSynced { + if *sync_state.borrow_and_update() == SyncState::NotSynced { tokio::select! { biased; result = watcher.while_started() => { should_continue = result?.started(); return Ok(should_continue); } - _ = sync_state.changed() => { - break; - } - _ = self.tx_status_update_stream.next() => { - // ignore txpool events while syncing - } + _ = sync_state.changed() => {} } } @@ -587,19 +573,9 @@ where should_continue = false; } } - // TODO: This should likely be refactored to use something like tokio::sync::Notify. - // Otherwise, if a bunch of txs are submitted at once and all the txs are included - // into the first block production trigger, we'll still call the event handler - // for each tx after they've already been included into a block. - // The poa service also doesn't care about events unrelated to new tx submissions, - // and shouldn't be awoken when txs are completed or squeezed out of the pool. - txpool_event = self.tx_status_update_stream.next() => { - if txpool_event.is_some() { - self.on_txpool_event().await.context("While processing txpool event")?; - should_continue = true; - } else { - should_continue = false; - } + _ = self.new_txs_watcher.changed() => { + self.on_txpool_event().await.context("While processing txpool event")?; + should_continue = true; } _ = next_block_production => { match self.on_timer().await.context("While processing timer event") { diff --git a/crates/services/consensus_module/poa/src/service_test.rs b/crates/services/consensus_module/poa/src/service_test.rs index 6b04e339a2b..5230e03f42b 100644 --- a/crates/services/consensus_module/poa/src/service_test.rs +++ b/crates/services/consensus_module/poa/src/service_test.rs @@ -23,7 +23,6 @@ use crate::{ use async_trait::async_trait; use fuel_core_chain_config::default_consensus_dev_key; use fuel_core_services::{ - stream::pending, Service as StorageTrait, ServiceRunner, State, @@ -41,10 +40,7 @@ use fuel_core_types::{ SealedBlock, }, fuel_crypto::SecretKey, - fuel_tx::{ - field::ScriptGasLimit, - *, - }, + fuel_tx::*, fuel_types::{ BlockHeight, ChainId, @@ -140,7 +136,7 @@ impl TestContextBuilder { self } - fn build(self) -> TestContext { + async fn build(self) -> TestContext { let config = self.config.unwrap_or_default(); let producer = self.producer.unwrap_or_else(|| { let mut producer = MockBlockProducer::default(); @@ -192,7 +188,7 @@ impl TestContextBuilder { predefined_blocks, watch, ); - service.start().unwrap(); + service.start_and_await().await.unwrap(); TestContext { service, time } } } @@ -239,74 +235,35 @@ impl TestContext { pub struct TxPoolContext { pub txpool: MockTransactionPool, pub txs: Arc>>, - pub status_sender: Arc>>, + pub new_txs_notifier: watch::Sender<()>, } impl MockTransactionPool { fn no_tx_updates() -> Self { let mut txpool = MockTransactionPool::default(); - txpool - .expect_transaction_status_events() - .returning(|| Box::pin(pending())); + txpool.expect_new_txs_watcher().returning({ + let (sender, _) = watch::channel(()); + move || sender.subscribe() + }); + txpool.expect_notify_skipped_txs().returning(|_| {}); txpool } pub fn new_with_txs(txs: Vec