From ec31a2c9276ba471ce8fafd67b5fcfd74ff610c5 Mon Sep 17 00:00:00 2001 From: MO NAN <651932351@qq.com> Date: Mon, 13 Nov 2023 11:26:12 +0800 Subject: [PATCH 1/2] Add READ_FRONT for multilayer storage to bypass backend storage (#4032) --- .../bcos-framework/storage2/MemoryStorage.h | 7 +- .../bcos-framework/storage2/Storage.h | 36 ++- .../bcos-framework/storage2/StorageMethods.h | 18 +- .../TransactionExecutor.h | 1 + bcos-storage/bcos-storage/RocksDBStorage2.h | 2 +- .../RollbackableStorage.h | 85 +++--- .../TransactionExecutorImpl.h | 2 +- .../benchmark/benchmakrExecutor.cpp | 3 +- .../tests/TestHostContext.cpp | 3 +- .../tests/TestMemoryStorage.h | 17 ++ .../tests/TestRollbackableStorage.cpp | 104 +++---- .../tests/TestTransactionExecutor.cpp | 3 +- .../MultiLayerStorage.h | 264 ++++++++++-------- .../ReadWriteSetStorage.h | 91 +++--- .../SchedulerParallelImpl.h | 1 + .../benchmark/benchmarkMultiLayerStorage.cpp | 2 +- .../tests/testMultiLayerStorage.cpp | 43 ++- 17 files changed, 374 insertions(+), 308 deletions(-) create mode 100644 transaction-executor/tests/TestMemoryStorage.h diff --git a/bcos-framework/bcos-framework/storage2/MemoryStorage.h b/bcos-framework/bcos-framework/storage2/MemoryStorage.h index 61e32acf3e..f9c8f39383 100644 --- a/bcos-framework/bcos-framework/storage2/MemoryStorage.h +++ b/bcos-framework/bcos-framework/storage2/MemoryStorage.h @@ -147,7 +147,7 @@ class MemoryStorage } friend auto tag_invoke( - bcos::storage2::tag_t /*unused*/, MemoryStorage& storage) + bcos::storage2::tag_t /*unused*/, MemoryStorage const& storage) requires(!withConcurrent) { auto range = RANGES::views::transform(storage.m_buckets[0].container, @@ -327,7 +327,7 @@ class MemoryStorage { task::AwaitableValue outputAwaitable(ReadIterator{}); ReadIterator& output = outputAwaitable.value(); - if constexpr (RANGES::sized_range>) + if constexpr (RANGES::sized_range) { output.m_iterators.reserve(RANGES::size(keys)); } @@ -440,7 +440,8 @@ class MemoryStorage else { it = bucket.get().container.emplace_hint( - it, Data{.key = KeyType(key), .value = std::forward(value)}); + it, Data{.key = KeyType(std::forward(key)), + .value = std::forward(value)}); } if constexpr (withMRU) diff --git a/bcos-framework/bcos-framework/storage2/Storage.h b/bcos-framework/bcos-framework/storage2/Storage.h index a302881670..037f8b5e13 100644 --- a/bcos-framework/bcos-framework/storage2/Storage.h +++ b/bcos-framework/bcos-framework/storage2/Storage.h @@ -13,16 +13,24 @@ struct STORAGE_BEGIN_TYPE }; inline constexpr STORAGE_BEGIN_TYPE STORAGE_BEGIN{}; +struct READ_FRONT_TYPE +{ +}; +inline constexpr READ_FRONT_TYPE READ_FRONT{}; + template using ReturnType = typename task::AwaitableReturnType; struct ReadSome { - auto operator()(auto& storage, RANGES::input_range auto const& keys) const - -> task::Task> - requires RANGES::range> + auto operator()(auto& storage, RANGES::input_range auto&& keys, auto&&... args) const + -> task::Task(keys), std::forward(args)...))>> + requires RANGES::range(keys), std::forward(args)...))>> { - co_return co_await tag_invoke(*this, storage, keys); + co_return co_await tag_invoke(*this, storage, std::forward(keys), + std::forward(args)...); } }; inline constexpr ReadSome readSome{}; @@ -53,10 +61,11 @@ struct RemoveSome inline constexpr RemoveSome removeSome{}; struct ReadOne { - auto operator()(auto& storage, auto const& key) const - -> task::Task> + auto operator()(auto& storage, auto const& key, auto&&... args) const + -> task::Task(args)...))>> { - co_return co_await tag_invoke(*this, storage, key); + co_return co_await tag_invoke(*this, storage, key, std::forward(args)...); } }; inline constexpr ReadOne readOne{}; @@ -152,4 +161,17 @@ auto tag_invoke(bcos::storage2::tag_t /*unused*/, auto& storage, auto co_return result.has_value(); } +task::Task tag_invoke( + bcos::storage2::tag_t /*unused*/, auto const& fromStorage, auto& toStorage) +{ + auto range = co_await storage2::range(fromStorage); + for (auto [key, value] : range) + { + if (value) + { + co_await storage2::writeOne(toStorage, *key, *value); + } + } +} + } // namespace bcos::storage2 \ No newline at end of file diff --git a/bcos-framework/bcos-framework/storage2/StorageMethods.h b/bcos-framework/bcos-framework/storage2/StorageMethods.h index f89abd1a15..11055c1c7e 100644 --- a/bcos-framework/bcos-framework/storage2/StorageMethods.h +++ b/bcos-framework/bcos-framework/storage2/StorageMethods.h @@ -114,24 +114,22 @@ task::Task tag_invoke(bcos::storage2::tag_t /*unused*/, } } -template -auto tag_invoke(bcos::storage2::tag_t /*unused*/, Storage& storage, +auto tag_invoke(bcos::storage2::tag_t /*unused*/, ErasableStorage auto& storage, RANGES::input_range auto const& keys) -> task::Task { co_await storage.remove(keys); co_return; } -template -auto tag_invoke(bcos::storage2::tag_t /*unused*/, Storage& storage, - RANGES::input_range auto const& keys) +auto tag_invoke(bcos::storage2::tag_t /*unused*/, ReadableStorage auto& storage, + RANGES::input_range auto&& keys) -> task::Task::Value>>, + std::optional(keys)))>::Value>>, 1>> { - using ValueType = std::remove_cvref_t< - typename task::AwaitableReturnType::Value>; + using ValueType = std::remove_cvref_t(keys)))>::Value>; static_assert(std::is_copy_assignable_v); boost::container::small_vector, 1> values; @@ -139,7 +137,7 @@ auto tag_invoke(bcos::storage2::tag_t /*unused*/, Storage& storage, { values.reserve(RANGES::size(keys)); } - auto it = co_await storage.read(keys); + auto it = co_await storage.read(std::forward(keys)); while (co_await it.next()) { if (co_await it.hasValue()) diff --git a/bcos-framework/bcos-framework/transaction-executor/TransactionExecutor.h b/bcos-framework/bcos-framework/transaction-executor/TransactionExecutor.h index 9c0b1b5852..9f483a0d10 100644 --- a/bcos-framework/bcos-framework/transaction-executor/TransactionExecutor.h +++ b/bcos-framework/bcos-framework/transaction-executor/TransactionExecutor.h @@ -83,6 +83,7 @@ inline constexpr ExecuteTransaction executeTransaction{}; template using tag_t = std::decay_t; + } // namespace bcos::transaction_executor template <> diff --git a/bcos-storage/bcos-storage/RocksDBStorage2.h b/bcos-storage/bcos-storage/RocksDBStorage2.h index 56375a4490..bd5063393a 100644 --- a/bcos-storage/bcos-storage/RocksDBStorage2.h +++ b/bcos-storage/bcos-storage/RocksDBStorage2.h @@ -92,7 +92,7 @@ class RocksDBStorage2 } }; - auto read(RANGES::input_range auto const& keys) & -> task::AwaitableValue + auto read(RANGES::input_range auto&& keys) & -> task::AwaitableValue { task::AwaitableValue readIteratorAwaitable(ReadIterator{m_valueResolver}); auto& readIterator = readIteratorAwaitable.value(); diff --git a/transaction-executor/bcos-transaction-executor/RollbackableStorage.h b/transaction-executor/bcos-transaction-executor/RollbackableStorage.h index 2d3d7a6c53..1d4ae8c063 100644 --- a/transaction-executor/bcos-transaction-executor/RollbackableStorage.h +++ b/transaction-executor/bcos-transaction-executor/RollbackableStorage.h @@ -5,19 +5,20 @@ #include "bcos-task/Trait.h" #include #include +#include namespace bcos::transaction_executor { template -concept HasReadDirectly = - requires(Storage&& storage) { - requires storage2::ReadableStorage; - requires storage2::ReadIterator>()))>>; +concept HasReadOneDirect = + requires(Storage& storage) { + requires RANGES::range>(), storage2::READ_FRONT))>>; }; template + requires HasReadOneDirect class Rollbackable { private: @@ -39,6 +40,7 @@ class Rollbackable Storage& storage() { return m_storage; } Savepoint current() const { return static_cast(m_records.size()); } + task::Task rollback(Savepoint savepoint) { for (auto index = static_cast(m_records.size()); index > savepoint; --index) @@ -47,77 +49,66 @@ class Rollbackable auto& record = m_records[index - 1]; if (record.oldValue) { - co_await m_storage.write(RANGES::single_view(record.key), - RANGES::single_view(std::move(*record.oldValue))); + co_await storage2::writeOne( + m_storage, std::move(record.key), std::move(*record.oldValue)); } else { - co_await m_storage.remove(RANGES::single_view(record.key)); + co_await storage2::removeOne(m_storage, record.key); } m_records.pop_back(); } co_return; } - auto read(RANGES::input_range auto const& keys) - -> task::Task> + friend auto tag_invoke(storage2::tag_t /*unused*/, Rollbackable& storage, + RANGES::input_range auto&& keys) + -> task::Task(), std::forward(keys)))>> { - co_return co_await m_storage.read(keys); + co_return co_await storage2::readSome( + storage.m_storage, std::forward(keys)); } - auto write(RANGES::input_range auto&& keys, RANGES::input_range auto&& values) - -> task::Task(keys), std::forward(values)))>> + friend auto tag_invoke(storage2::tag_t /*unused*/, Rollbackable& storage, + RANGES::input_range auto&& keys, RANGES::input_range auto&& values) + -> task::Task(), std::forward(keys), + std::forward(values)))>> { // Store values to history + auto oldValues = co_await storage2::readSome(storage.m_storage, keys, storage2::READ_FRONT); + for (auto&& [key, value] : RANGES::views::zip(keys, oldValues)) { - std::optional> storageIt; - if constexpr (HasReadDirectly) - { - storageIt = co_await m_storage.readDirect(keys); - } - else - { - storageIt = co_await m_storage.read(keys); - } - - auto keyIt = RANGES::begin(keys); - while (co_await storageIt->next()) + auto& record = + storage.m_records.emplace_back(Record{.key = StateKey{key}, .oldValue = {}}); + if (value) { - auto& record = - m_records.emplace_back(Record{.key = StateKey{*(keyIt++)}, .oldValue = {}}); - if (co_await storageIt->hasValue()) - { - // Update exists value, store the old value - record.oldValue.emplace(co_await storageIt->value()); - } + record.oldValue.emplace(std::move(*value)); } } - co_return co_await m_storage.write( + co_return co_await storage2::writeSome(storage.m_storage, std::forward(keys), std::forward(values)); } - auto remove(RANGES::input_range auto const& keys) - -> task::Task> + friend auto tag_invoke(storage2::tag_t /*unused*/, Rollbackable& storage, + RANGES::input_range auto const& keys) + -> task::Task(), keys))>> { // Store values to history + auto oldValues = co_await storage2::readSome(storage.m_storage, keys, storage2::READ_FRONT); + for (auto&& [key, value] : RANGES::views::zip(keys, oldValues)) { - auto storageIt = co_await m_storage.read(keys); - auto keyIt = RANGES::begin(keys); - while (co_await storageIt.next()) + if (value) { - auto& record = m_records.emplace_back(); - record.key = *(keyIt++); - if (co_await storageIt.hasValue()) - { - // Update exists value, store the old value - record.oldValue.emplace(co_await storageIt.value()); - } + auto& record = storage.m_records.emplace_back( + Record{.key = StateKey{key}, .oldValue = std::move(*value)}); } } - co_return co_await m_storage.remove(keys); + co_return co_await storage2::removeSome(storage.m_storage, keys); } }; diff --git a/transaction-executor/bcos-transaction-executor/TransactionExecutorImpl.h b/transaction-executor/bcos-transaction-executor/TransactionExecutorImpl.h index 4b42f51dc1..0b458c2e0a 100644 --- a/transaction-executor/bcos-transaction-executor/TransactionExecutorImpl.h +++ b/transaction-executor/bcos-transaction-executor/TransactionExecutorImpl.h @@ -52,7 +52,7 @@ class TransactionExecutorImpl << "Execte transaction: " << transaction.hash().hex(); } - Rollbackable> rollbackableStorage(storage); + Rollbackable> rollbackableStorage(storage); auto toAddress = unhexAddress(transaction.to()); evmc_message evmcMessage = {.kind = transaction.to().empty() ? EVMC_CREATE : EVMC_CALL, diff --git a/transaction-executor/benchmark/benchmakrExecutor.cpp b/transaction-executor/benchmark/benchmakrExecutor.cpp index 86f19661f0..e583f2c8a2 100644 --- a/transaction-executor/benchmark/benchmakrExecutor.cpp +++ b/transaction-executor/benchmark/benchmakrExecutor.cpp @@ -1,5 +1,6 @@ #include "../bcos-transaction-executor/TransactionExecutorImpl.h" #include "../tests/TestBytecode.h" +#include "../tests/TestMemoryStorage.h" #include "bcos-codec/bcos-codec/abi/ContractABICodec.h" #include "bcos-crypto/interfaces/crypto/CryptoSuite.h" #include "bcos-framework/ledger/LedgerConfig.h" @@ -18,8 +19,8 @@ using namespace bcos; using namespace bcos::storage2::memory_storage; using namespace bcos::transaction_executor; -using MutableStorage = MemoryStorage; using ReceiptFactory = bcostars::protocol::TransactionReceiptFactoryImpl; +static_assert(HasReadOneDirect); struct Fixture { diff --git a/transaction-executor/tests/TestHostContext.cpp b/transaction-executor/tests/TestHostContext.cpp index 4d68edbc47..88d3157e32 100644 --- a/transaction-executor/tests/TestHostContext.cpp +++ b/transaction-executor/tests/TestHostContext.cpp @@ -2,6 +2,7 @@ #include "../bcos-transaction-executor/vm/HostContext.h" #include "../bcos-transaction-executor/vm/VMInstance.h" #include "TestBytecode.h" +#include "TestMemoryStorage.h" #include "bcos-codec/bcos-codec/abi/ContractABICodec.h" #include "bcos-crypto/interfaces/crypto/CryptoSuite.h" #include "bcos-crypto/interfaces/crypto/Hash.h" @@ -36,7 +37,7 @@ class TestHostContextFixture { public: bcos::crypto::Hash::Ptr hashImpl = std::make_shared(); - memory_storage::MemoryStorage storage; + MutableStorage storage; Rollbackable rollbackableStorage; evmc_address helloworldAddress; VMFactory vmFactory; diff --git a/transaction-executor/tests/TestMemoryStorage.h b/transaction-executor/tests/TestMemoryStorage.h new file mode 100644 index 0000000000..1fea55ef2c --- /dev/null +++ b/transaction-executor/tests/TestMemoryStorage.h @@ -0,0 +1,17 @@ +#pragma once +#include "bcos-framework/storage2/MemoryStorage.h" +#include "bcos-framework/storage2/Storage.h" +#include "bcos-framework/transaction-executor/TransactionExecutor.h" + +namespace bcos::transaction_executor +{ +using MutableStorage = storage2::memory_storage::MemoryStorage; + +auto tag_invoke(storage2::tag_t /*unused*/, MutableStorage& storage, + RANGES::input_range auto&& keys, storage2::READ_FRONT_TYPE const& /*unused*/) + -> task::Task> +{ + co_return co_await storage2::readSome(storage, std::forward(keys)); +} +} // namespace bcos::transaction_executor \ No newline at end of file diff --git a/transaction-executor/tests/TestRollbackableStorage.cpp b/transaction-executor/tests/TestRollbackableStorage.cpp index 47b0f2dcf2..376885e116 100644 --- a/transaction-executor/tests/TestRollbackableStorage.cpp +++ b/transaction-executor/tests/TestRollbackableStorage.cpp @@ -1,8 +1,13 @@ + #include "../bcos-transaction-executor/RollbackableStorage.h" +#include "TestMemoryStorage.h" +#include "bcos-framework/storage2/MemoryStorage.h" +#include "bcos-framework/storage2/Storage.h" +#include "bcos-framework/storage2/StorageMethods.h" #include "bcos-framework/transaction-executor/TransactionExecutor.h" -#include -#include -#include +#include "bcos-task/Task.h" +#include "bcos-task/Trait.h" +#include "bcos-task/Wait.h" #include #include @@ -17,34 +22,36 @@ class TestRollbackableStorageFixture BOOST_FIXTURE_TEST_SUITE(TestRollbackableStorage, TestRollbackableStorageFixture) +using BackendStorag2 = + memory_storage::MemoryStorage; + BOOST_AUTO_TEST_CASE(addRollback) { task::syncWait([]() -> task::Task { - memory_storage::MemoryStorage memoryStorage; - + MutableStorage memoryStorage; Rollbackable rollbackableStorage(memoryStorage); - static_assert(storage2::ReadableStorage, "No match type!"); + + auto view = RANGES::single_view(StateKey{"table1"sv, "Key1"sv}); + co_await storage2::readSome(memoryStorage, view, storage2::READ_FRONT); std::string_view tableID = "table1"; auto point = rollbackableStorage.current(); storage::Entry entry; entry.set("OK!"); - co_await rollbackableStorage.write( - RANGES::views::single(StateKeyView{tableID, "Key1"}), RANGES::views::single(entry)); + co_await storage2::writeOne(rollbackableStorage, StateKey{tableID, "Key1"sv}, entry); storage::Entry entry2; entry2.set("OK2!"); - co_await rollbackableStorage.write( - RANGES::views::single(StateKeyView{tableID, "Key2"}), RANGES::views::single(entry2)); + co_await storage2::writeOne(rollbackableStorage, StateKey{tableID, "Key2"sv}, entry2); // Check the entry exists std::vector keys{StateKey{tableID, "Key1"sv}, StateKey{"table1"sv, "Key2"sv}}; - auto it = co_await rollbackableStorage.read(keys); + auto values = co_await storage2::readSome(rollbackableStorage, keys); auto count = 0; - while (co_await it.next()) + // while (co_await it.next()) + for (auto&& [key, value] : RANGES::views::zip(keys, values)) { - BOOST_REQUIRE(co_await it.hasValue()); - auto&& key = co_await it.key(); + BOOST_REQUIRE(value); BOOST_CHECK_EQUAL( static_cast(static_cast(std::get<0>(key))), tableID); BOOST_CHECK_EQUAL( @@ -56,13 +63,11 @@ BOOST_AUTO_TEST_CASE(addRollback) // Query again std::vector keys2{StateKey{tableID, "Key1"sv}, StateKey{"table1"sv, "Key2"sv}}; - auto it2 = co_await rollbackableStorage.read(keys2); - auto count2 = 0; - while (co_await it2.next()) + auto values2 = co_await storage2::readSome(rollbackableStorage, keys2); + for (auto&& [key, value] : RANGES::views::zip(keys2, values2)) { - BOOST_REQUIRE(!co_await it2.hasValue()); + BOOST_REQUIRE(!value); } - BOOST_CHECK_EQUAL(count2, 0); co_return; }()); @@ -71,45 +76,38 @@ BOOST_AUTO_TEST_CASE(addRollback) BOOST_AUTO_TEST_CASE(removeRollback) { task::syncWait([]() -> task::Task { - memory_storage::MemoryStorage memoryStorage; + MutableStorage memoryStorage; Rollbackable rollbackableStorage(memoryStorage); std::string_view tableID = "table1"; auto point = rollbackableStorage.current(); storage::Entry entry; entry.set("OK!"); - co_await rollbackableStorage.write( - RANGES::views::single(StateKey{tableID, "Key1"sv}), RANGES::views::single(entry)); + co_await storage2::writeOne(rollbackableStorage, StateKey{tableID, "Key1"sv}, entry); storage::Entry entry2; entry2.set("OK2!"); - co_await rollbackableStorage.write( - RANGES::views::single(StateKey{tableID, "Key2"sv}), RANGES::views::single(entry2)); + co_await storage2::writeOne(rollbackableStorage, StateKey{tableID, "Key2"sv}, entry2); // Check the entry exists std::vector keys{StateKey{tableID, "Key1"sv}, StateKey{"table1"sv, "Key2"sv}}; - auto it = co_await rollbackableStorage.read(keys); - auto count = 0; - while (co_await it.next()) + auto values = co_await storage2::readSome(rollbackableStorage, keys); + + for (auto&& [key, value] : RANGES::views::zip(keys, values)) { - BOOST_REQUIRE(co_await it.hasValue()); - auto&& key = co_await it.key(); + BOOST_REQUIRE(value); BOOST_CHECK_EQUAL(static_cast(std::get<0>(key)), tableID); BOOST_CHECK_EQUAL(static_cast(std::get<0>(key)), "table1"sv); - ++count; } - BOOST_CHECK_EQUAL(count, 2); co_await rollbackableStorage.rollback(point); // Query again std::vector keys2{StateKey{tableID, "Key1"sv}, StateKey{"table1"sv, "Key2"sv}}; - auto it2 = co_await rollbackableStorage.read(keys2); - auto count2 = 0; - while (co_await it2.next()) + auto values2 = co_await storage2::readSome(rollbackableStorage, keys2); + for (auto&& [key, value] : RANGES::views::zip(keys2, values2)) { - BOOST_REQUIRE(!co_await it2.hasValue()); + BOOST_REQUIRE(!value); } - BOOST_CHECK_EQUAL(count2, 0); co_return; }()); @@ -118,33 +116,23 @@ BOOST_AUTO_TEST_CASE(removeRollback) BOOST_AUTO_TEST_CASE(equal) { task::syncWait([]() -> task::Task { - storage2::memory_storage::MemoryStorage - storage; - - co_await storage.write( - RANGES::single_view(RANGES::in_place, "table"sv, "0"sv), - RANGES::single_view(0)); - co_await storage.write( - RANGES::single_view(RANGES::in_place, "table"sv, "1"sv), - RANGES::single_view(1)); - co_await storage.write( - RANGES::single_view(RANGES::in_place, "table"sv, "2"sv), - RANGES::single_view(2)); - - auto it = co_await storage.read( - RANGES::iota_view(0, 3) | RANGES::views::transform([](int num) { - return StateKey("table"sv, boost::lexical_cast(num)); - })); + BackendStorag2 storage; + + co_await storage2::writeOne(storage, transaction_executor::StateKey("table"sv, "0"sv), 0); + co_await storage2::writeOne(storage, transaction_executor::StateKey("table"sv, "1"sv), 1); + co_await storage2::writeOne(storage, transaction_executor::StateKey("table"sv, "2"sv), 2); + + auto keys = RANGES::iota_view(0, 3) | RANGES::views::transform([](int num) { + return StateKey("table"sv, boost::lexical_cast(num)); + }); + auto values = co_await storage2::readSome(storage, keys); int i = 0; - while (co_await it.next()) + for (auto&& [key, value] : RANGES::views::zip(keys, values)) { - auto& key = co_await it.key(); - auto& value = co_await it.value(); auto view = std::get<1>(key); auto str = boost::lexical_cast(i); BOOST_CHECK_EQUAL(static_cast(view), std::string_view(str)); - BOOST_CHECK_EQUAL(value, i); + BOOST_CHECK_EQUAL(*value, i); ++i; } BOOST_CHECK_EQUAL(i, 3); diff --git a/transaction-executor/tests/TestTransactionExecutor.cpp b/transaction-executor/tests/TestTransactionExecutor.cpp index 8bfcc14ed8..18c009daa4 100644 --- a/transaction-executor/tests/TestTransactionExecutor.cpp +++ b/transaction-executor/tests/TestTransactionExecutor.cpp @@ -1,5 +1,6 @@ #include "../bcos-transaction-executor/TransactionExecutorImpl.h" #include "TestBytecode.h" +#include "TestMemoryStorage.h" #include "bcos-codec/bcos-codec/abi/ContractABICodec.h" #include #include @@ -26,7 +27,7 @@ BOOST_FIXTURE_TEST_SUITE(TransactionExecutorImpl, TestTransactionExecutorImplFix BOOST_AUTO_TEST_CASE(execute) { task::syncWait([this]() mutable -> task::Task { - memory_storage::MemoryStorage storage; + MutableStorage storage; auto cryptoSuite = std::make_shared( bcos::executor::GlobalHashImpl::g_hashImpl, nullptr, nullptr); diff --git a/transaction-scheduler/bcos-transaction-scheduler/MultiLayerStorage.h b/transaction-scheduler/bcos-transaction-scheduler/MultiLayerStorage.h index bc9c60dce5..3541e77984 100644 --- a/transaction-scheduler/bcos-transaction-scheduler/MultiLayerStorage.h +++ b/transaction-scheduler/bcos-transaction-scheduler/MultiLayerStorage.h @@ -1,14 +1,14 @@ #pragma once #include "bcos-framework/storage2/Storage.h" +#include "bcos-framework/transaction-executor/TransactionExecutor.h" +#include "bcos-task/AwaitableValue.h" #include "bcos-task/Trait.h" #include "bcos-task/Wait.h" -#include -#include -#include +#include "transaction-executor/bcos-transaction-executor/RollbackableStorage.h" #include -#include #include #include +#include #include #include #include @@ -48,6 +48,8 @@ class MultiLayerStorage std::add_lvalue_reference_t, std::monostate> m_cacheStorage; + // 同一时间只允许一个可以修改的view + // Only one view that can be modified is allowed at a time std::mutex m_mutableMutex; public: @@ -78,175 +80,186 @@ class MultiLayerStorage : m_backendStorage(backendStorage), m_cacheStorage(cacheStorage) {} - static auto readStorage(auto& storage, - boost::container::small_vector>, 1>& - keyValues) -> task::Task + static task::Task fillMissingValues( + auto& storage, RANGES::input_range auto&& keys, RANGES::input_range auto& values) { - auto keyIndexes = - RANGES::views::enumerate(keyValues) | RANGES::views::filter([](auto&& tuple) { - return !std::get<1>(std::get<1>(tuple)); - }) | - RANGES::views::transform([](auto&& tuple) -> auto{ return std::get<0>(tuple); }) | - RANGES::to>(); - auto it = co_await storage.read(RANGES::views::transform( - keyIndexes, [&](auto& index) -> auto& { return std::get<0>(keyValues[index]); })); - - bool finished = true; - auto indexIt = RANGES::begin(keyIndexes); - while (co_await it.next()) + using StoreKeyType = std::conditional_t< + std::is_lvalue_reference_v>, + std::reference_wrapper, KeyType>; + + boost::container::small_vector< + std::pair>>, 1> + missingKeyValues; + for (auto&& [key, value] : RANGES::views::zip(keys, values)) { - if (co_await it.hasValue()) + if (!value) { - std::get<1>(keyValues[*indexIt]).emplace(co_await it.value()); + missingKeyValues.emplace_back( + std::forward(key), std::ref(value)); } - else + } + auto gotValues = + co_await storage2::readSome(storage, missingKeyValues | RANGES::views::keys); + + size_t count = 0; + for (auto&& [from, to] : + RANGES::views::zip(gotValues, missingKeyValues | RANGES::views::values)) + { + if (from) { - finished = false; + to.get() = std::move(from); + ++count; } - RANGES::advance(indexIt, 1); } - co_return finished; + + co_return count == RANGES::size(gotValues); } public: - View(const View&) = delete; - View& operator=(const View&) = delete; - View(View&&) noexcept = default; - View& operator=(View&&) noexcept = default; - ~View() noexcept = default; - - void release() { m_mutableLock.unlock(); } - - class ReadIterator + friend auto tag_invoke(storage2::tag_t /*unused*/, View& storage, + RANGES::input_range auto&& keys) + -> task::Task()), + std::forward(keys)))>> + requires RANGES::sized_range && + RANGES::sized_range(), keys))>> { - friend class View; - - private: - boost::container::small_vector>, 1> - m_keyValues; - int64_t m_index = -1; - - public: - using Key = std::remove_cvref_t const&; - using Value = std::remove_cvref_t const&; - - ReadIterator() = default; - - ReadIterator(const ReadIterator&) = delete; - ReadIterator(ReadIterator&& rhs) noexcept = default; - ReadIterator& operator=(const ReadIterator&) = delete; - ReadIterator& operator=(ReadIterator&&) noexcept = default; - ~ReadIterator() noexcept = default; - - task::AwaitableValue next() + task::AwaitableReturnType + values(RANGES::size(keys)); + if (storage.m_mutableStorage && + co_await fillMissingValues(*storage.m_mutableStorage, keys, values)) { - return {static_cast(++m_index) != m_keyValues.size()}; + co_return values; } - task::AwaitableValue key() const { return {std::get<0>(m_keyValues[m_index])}; } - task::AwaitableValue value() const + else { - return {*(std::get<1>(m_keyValues[m_index]))}; + values.resize(RANGES::size(keys)); } - task::AwaitableValue hasValue() const + + for (auto& immutableStorage : storage.m_immutableStorages) { - return {std::get<1>(m_keyValues[m_index]).has_value()}; + if (co_await fillMissingValues(*immutableStorage, keys, values)) + { + co_return values; + } } - }; - using Key = KeyType; - using Value = ValueType; + if constexpr (withCacheStorage) + { + if (co_await fillMissingValues(storage.m_cacheStorage, keys, values)) + { + co_return values; + } + } - template - void newTemporaryMutable(Args... args) + co_await fillMissingValues(storage.m_backendStorage, keys, values); + co_return values; + } + + friend auto tag_invoke(storage2::tag_t /*unused*/, View& storage, + RANGES::input_range auto&& keys, const storage2::READ_FRONT_TYPE& /*unused*/) + -> task::Task(), keys))>> { - if (m_mutableStorage) + if (storage.m_mutableStorage) { - BOOST_THROW_EXCEPTION(DuplicateMutableStorageError{}); + co_return co_await storage2::readSome( + *storage.m_mutableStorage, std::forward(keys)); } - m_mutableStorage = std::make_shared(args...); + for (auto& immutableStorage : storage.m_immutableStorages) + { + co_return co_await storage2::readSome( + *immutableStorage, std::forward(keys)); + } + + if constexpr (withCacheStorage) + { + co_return co_await storage2::readSome( + storage.m_cacheStorage, std::forward(keys)); + } + + co_return co_await storage2::readSome( + storage.m_backendStorage, std::forward(keys)); } - task::Task read(RANGES::input_range auto&& keys) + friend auto tag_invoke( + storage2::tag_t /*unused*/, View& storage, auto const& key) + -> task::Task(), key))>> { - ReadIterator iterator; - iterator.m_keyValues = RANGES::views::transform(keys, [](auto&& key) { - return std::tuple>( - std::forward(key), std::optional{}); - }) | RANGES::to(); - - if (m_mutableStorage) + if (storage.m_mutableStorage) { - if (co_await readStorage(*m_mutableStorage, iterator.m_keyValues)) + if (auto value = co_await storage2::readOne(*storage.m_mutableStorage, key)) { - co_return iterator; + co_return value; } } - if (!RANGES::empty(m_immutableStorages)) + for (auto& immutableStorage : storage.m_immutableStorages) { - for (auto& immutableStorage : m_immutableStorages) + if (auto value = co_await storage2::readOne(*immutableStorage, key)) { - if (co_await readStorage(*immutableStorage, iterator.m_keyValues)) - { - co_return iterator; - } + co_return value; } } if constexpr (withCacheStorage) { - if (co_await readStorage(m_cacheStorage, iterator.m_keyValues)) + if (auto value = co_await storage2::readOne(storage.m_cacheStorage, key)) { - co_return iterator; + co_return value; } } - auto missingKeyIndexes = - RANGES::views::enumerate(iterator.m_keyValues) | - RANGES::views::filter( - [](auto&& tuple) { return !std::get<1>(std::get<1>(tuple)); }) | - RANGES::views::transform([](auto&& tuple) -> auto{ return std::get<0>(tuple); }) | - RANGES::to>(); - co_await readStorage(m_backendStorage, iterator.m_keyValues); - // Write data into cache + co_return co_await storage2::readOne(storage.m_backendStorage, key); + } + + friend auto tag_invoke(storage2::tag_t /*unused*/, View& storage, + auto const& key, const storage2::READ_FRONT_TYPE& /*unused*/) + -> task::Task(), key))>> + { + if (storage.m_mutableStorage) + { + co_return co_await storage2::readOne(*storage.m_mutableStorage, key); + } + + for (auto& immutableStorage : storage.m_immutableStorages) + { + co_return co_await storage2::readOne(*immutableStorage, key); + } + if constexpr (withCacheStorage) { - for (auto index : missingKeyIndexes) - { - if (std::get<1>(iterator.m_keyValues[index])) - { - co_await storage2::writeOne(m_cacheStorage, - std::get<0>(iterator.m_keyValues[index]), - *std::get<1>(iterator.m_keyValues[index])); - } - } + co_return co_await storage2::readOne(storage.m_cacheStorage, key); } - co_return iterator; + co_return co_await storage2::readOne(storage.m_backendStorage, key); } - task::Task write(RANGES::input_range auto&& keys, RANGES::input_range auto&& values) + friend task::Task tag_invoke(storage2::tag_t /*unused*/, + View& storage, RANGES::input_range auto&& keys, RANGES::input_range auto&& values) { - if (!m_mutableStorage) [[unlikely]] + if (!storage.m_mutableStorage) [[unlikely]] { BOOST_THROW_EXCEPTION(NotExistsMutableStorageError{}); } - - co_await m_mutableStorage->write( + co_await storage2::writeSome(*storage.m_mutableStorage, std::forward(keys), std::forward(values)); - co_return; } - task::Task remove(RANGES::input_range auto const& keys) + friend task::Task tag_invoke(storage2::tag_t /*unused*/, + View& storage, RANGES::input_range auto&& keys) { - if (!m_mutableStorage) + if (!storage.m_mutableStorage) { BOOST_THROW_EXCEPTION(NotExistsMutableStorageError{}); } - co_await m_mutableStorage->remove(keys); - co_return; + co_await storage2::removeSome( + *storage.m_mutableStorage, std::forward(keys)); } MutableStorageType& mutableStorage() @@ -257,6 +270,30 @@ class MultiLayerStorage } return *m_mutableStorage; } + + public: + View(const View&) = delete; + View& operator=(const View&) = delete; + View(View&&) noexcept = default; + View& operator=(View&&) noexcept = default; + ~View() noexcept = default; + + using Key = KeyType; + using Value = ValueType; + + void release() { m_mutableLock.unlock(); } + + template + void newTemporaryMutable(Args... args) + { + if (m_mutableStorage) + { + BOOST_THROW_EXCEPTION(DuplicateMutableStorageError{}); + } + + m_mutableStorage = std::make_shared(args...); + } + BackendStorage& backendStorage() { return m_backendStorage; } }; @@ -398,4 +435,5 @@ class MultiLayerStorage } BackendStorage& backendStorage() { return m_backendStorage; } }; + } // namespace bcos::transaction_scheduler \ No newline at end of file diff --git a/transaction-scheduler/bcos-transaction-scheduler/ReadWriteSetStorage.h b/transaction-scheduler/bcos-transaction-scheduler/ReadWriteSetStorage.h index 9728371524..596932f9f2 100644 --- a/transaction-scheduler/bcos-transaction-scheduler/ReadWriteSetStorage.h +++ b/transaction-scheduler/bcos-transaction-scheduler/ReadWriteSetStorage.h @@ -1,5 +1,6 @@ #pragma once -#include +#include "bcos-framework/storage2/Storage.h" +#include "bcos-framework/transaction-executor/TransactionExecutor.h" #include #include #include @@ -33,6 +34,56 @@ class ReadWriteSetStorage } public: + friend auto tag_invoke(storage2::tag_t /*unused*/, + ReadWriteSetStorage& storage, RANGES::input_range auto&& keys) + -> task::Task(), std::forward(keys)))>> + { + for (auto&& key : keys) + { + storage.putSet(false, std::forward(key)); + } + co_return co_await storage2::readSome( + storage.m_storage, std::forward(keys)); + } + + friend auto tag_invoke(storage2::tag_t /*unused*/, + ReadWriteSetStorage& storage, RANGES::input_range auto&& keys, + const storage2::READ_FRONT_TYPE& /*unused*/) + -> task::Task(), std::forward(keys)))>> + { + co_return co_await storage2::readSome( + storage.m_storage, std::forward(keys)); + } + + friend auto tag_invoke(storage2::tag_t /*unused*/, + ReadWriteSetStorage& storage, RANGES::input_range auto&& keys, + RANGES::input_range auto&& values) + -> task::Task(), std::forward(keys), + std::forward(values)))>> + { + for (auto&& key : keys) + { + storage.putSet(true, std::forward(key)); + } + co_return co_await storage2::writeSome( + storage.m_storage, keys, std::forward(values)); + } + + friend auto tag_invoke(storage2::tag_t /*unused*/, + ReadWriteSetStorage& storage, RANGES::input_range auto const& keys) + -> task::Task(), keys))>> + { + for (auto&& key : keys) + { + storage.putSet(true, std::forward(key)); + } + co_return co_await storage2::removeSome(storage.m_storage, keys); + } + using Key = KeyType; using Value = typename task::AwaitableReturnType()))>::value_type; @@ -74,44 +125,6 @@ class ReadWriteSetStorage return false; } - - auto read(RANGES::input_range auto&& keys) - -> task::Task> - { - for (auto&& key : keys) - { - putSet(false, key); - } - co_return co_await readDirect(std::forward(keys)); - } - - auto readDirect(RANGES::input_range auto&& keys) - -> task::Task> - { - co_return co_await m_storage.read(std::forward(keys)); - } - - auto write(RANGES::input_range auto&& keys, RANGES::input_range auto&& values) - -> task::Task(keys), std::forward(values)))>> - { - for (auto&& key : keys) - { - putSet(true, std::forward(key)); - } - co_return co_await m_storage.write( - std::forward(keys), std::forward(values)); - } - - auto remove(RANGES::input_range auto const& keys) - -> task::Task> - { - for (auto&& key : keys) - { - putSet(true, std::forward(key)); - } - co_return co_await m_storage.remove(keys); - } }; } // namespace bcos::transaction_scheduler \ No newline at end of file diff --git a/transaction-scheduler/bcos-transaction-scheduler/SchedulerParallelImpl.h b/transaction-scheduler/bcos-transaction-scheduler/SchedulerParallelImpl.h index 7656303495..7d9e587c55 100644 --- a/transaction-scheduler/bcos-transaction-scheduler/SchedulerParallelImpl.h +++ b/transaction-scheduler/bcos-transaction-scheduler/SchedulerParallelImpl.h @@ -8,6 +8,7 @@ #include "bcos-framework/protocol/TransactionReceiptFactory.h" #include "bcos-framework/storage2/MemoryStorage.h" #include "bcos-framework/storage2/Storage.h" +#include "bcos-framework/storage2/StorageMethods.h" #include "bcos-framework/transaction-executor/TransactionExecutor.h" #include "bcos-framework/transaction-scheduler/TransactionScheduler.h" #include "bcos-tars-protocol/protocol/TransactionReceiptImpl.h" diff --git a/transaction-scheduler/benchmark/benchmarkMultiLayerStorage.cpp b/transaction-scheduler/benchmark/benchmarkMultiLayerStorage.cpp index 1f556b2f60..605d4f99dc 100644 --- a/transaction-scheduler/benchmark/benchmarkMultiLayerStorage.cpp +++ b/transaction-scheduler/benchmark/benchmarkMultiLayerStorage.cpp @@ -43,7 +43,7 @@ struct Fixture return entry; }); - co_await view.write(allKeys, allValues); + co_await storage2::writeSome(view, allKeys, allValues); }(count)); for (auto i = 0; i < layer; ++i) diff --git a/transaction-scheduler/tests/testMultiLayerStorage.cpp b/transaction-scheduler/tests/testMultiLayerStorage.cpp index 97dc29d6ee..752791f836 100644 --- a/transaction-scheduler/tests/testMultiLayerStorage.cpp +++ b/transaction-scheduler/tests/testMultiLayerStorage.cpp @@ -1,10 +1,10 @@ #include "bcos-framework/storage2/MemoryStorage.h" +#include "bcos-framework/storage2/Storage.h" #include "bcos-framework/transaction-executor/TransactionExecutor.h" +#include "bcos-transaction-scheduler/MultiLayerStorage.h" #include -#include #include #include -#include #include using namespace bcos; @@ -18,7 +18,8 @@ struct TableNameHash size_t operator()(const bcos::transaction_executor::StateKey& key) const { auto const& tableID = std::get<0>(key); - return std::hash{}(tableID); + std::string_view tableIDView(tableID.data(), tableID.size()); + return std::hash{}(tableIDView); } }; @@ -36,8 +37,7 @@ class TestMultiLayerStorageFixture BackendStorage backendStorage; MultiLayerStorage multiLayerStorage; - static_assert( - storage2::ReadableStorage, "No match storage!"); + // static_assert(HasReadOneDirect>); }; BOOST_FIXTURE_TEST_SUITE(TestMultiLayerStorage, TestMultiLayerStorageFixture) @@ -70,11 +70,9 @@ BOOST_AUTO_TEST_CASE(readWriteMutable) co_await storage2::writeOne(*view, key, entry); RANGES::single_view keyViews(key); - auto it = co_await view->read(keyViews); + auto values = co_await storage2::readSome(*view, keyViews); - co_await it.next(); - const auto& iteratorValue = co_await it.value(); - BOOST_CHECK_EQUAL(iteratorValue.get(), entry.get()); + BOOST_CHECK_EQUAL(values[0]->get(), entry.get()); BOOST_CHECK_NO_THROW(multiLayerStorage.pushMutableToImmutableFront()); view.reset(); @@ -104,7 +102,7 @@ BOOST_AUTO_TEST_CASE(merge) return entry; }); - co_await view->write(RANGES::iota_view(0, 100) | toKey, + co_await storage2::writeSome(*view, RANGES::iota_view(0, 100) | toKey, RANGES::iota_view(0, 100) | toValue); BOOST_CHECK_THROW( @@ -116,37 +114,32 @@ BOOST_AUTO_TEST_CASE(merge) auto view2 = multiLayerStorage.fork(false); auto keys = RANGES::iota_view(0, 100) | toKey; - auto it = co_await view2.read(keys); + auto values = co_await storage2::readSome(view2, keys); - int i = 0; - while (co_await it.next()) + for (auto&& [index, value] : RANGES::views::enumerate(values)) { - auto const& value = co_await it.value(); - BOOST_CHECK_EQUAL(value.get(), fmt::format("value: {}", i)); - ++i; + BOOST_CHECK_EQUAL(value->get(), fmt::format("value: {}", index)); } - BOOST_CHECK_EQUAL(i, 100); + BOOST_CHECK_EQUAL(RANGES::size(values), 100); multiLayerStorage.newMutable(); auto view3 = multiLayerStorage.fork(true); - co_await view3.remove(RANGES::iota_view(20, 30) | toKey); + co_await storage2::removeSome(view3, RANGES::iota_view(20, 30) | toKey); multiLayerStorage.pushMutableToImmutableFront(); co_await multiLayerStorage.mergeAndPopImmutableBack(); - auto removedIt = co_await view3.read(keys); - i = 0; - while (co_await removedIt.next()) + auto values2 = co_await storage2::readSome(view3, keys); + for (auto&& [index, value] : RANGES::views::enumerate(values2)) { - if (i >= 20 && i < 30) + if (index >= 20 && index < 30) { - BOOST_CHECK(!co_await removedIt.hasValue()); + BOOST_CHECK(!value); } else { - BOOST_CHECK(co_await removedIt.hasValue()); + BOOST_CHECK(value); } - ++i; } co_return; From 5d9163dda907206653e6f37a86b2e553e6bcce0f Mon Sep 17 00:00:00 2001 From: wenlinli <1574249665@qq.com> Date: Tue, 14 Nov 2023 15:21:32 +0800 Subject: [PATCH 2/2] turn off transactionBucket (#4044) --- .../txpool/storage/MemoryStorage.cpp | 19 ++++--------------- .../txpool/storage/MemoryStorage.h | 2 +- 2 files changed, 5 insertions(+), 16 deletions(-) diff --git a/bcos-txpool/bcos-txpool/txpool/storage/MemoryStorage.cpp b/bcos-txpool/bcos-txpool/txpool/storage/MemoryStorage.cpp index 6d1126d8ff..da286eea64 100644 --- a/bcos-txpool/bcos-txpool/txpool/storage/MemoryStorage.cpp +++ b/bcos-txpool/bcos-txpool/txpool/storage/MemoryStorage.cpp @@ -782,23 +782,12 @@ void MemoryStorage::batchFetchTxs(Block::Ptr _txsList, Block::Ptr _sysTxsList, s if (_avoidDuplicate) { - size_t eachBucketTxsLimit = 0; - // After performance testing, 0.25 had the best performance. - eachBucketTxsLimit = (_txsLimit / (0.25 * CPU_CORES)) + 1; - if (c_fileLogLevel == LogLevel::TRACE) [[unlikely]] - { - TXPOOL_LOG(TRACE) << LOG_DESC("batchFetchTxs") - << LOG_KV("pendingTxs", m_txsTable.size()) - << LOG_KV("limit", _txsLimit) - << LOG_KV("eachBucketTxsLimit", eachBucketTxsLimit); - } m_txsTable.forEach( - m_knownLatestSealedTxHash, eachBucketTxsLimit, [&](TxsMap::ReadAccessor::Ptr accessor) { + m_knownLatestSealedTxHash, [&](TxsMap::ReadAccessor::Ptr accessor) { const auto& tx = accessor->value(); - bool isTxValid = handleTx(tx); - bool needContinue = (_txsList->transactionsMetaDataSize() + - _sysTxsList->transactionsMetaDataSize()) < _txsLimit; - return std::pair(needContinue, isTxValid); + handleTx(tx); + return (_txsList->transactionsMetaDataSize() + + _sysTxsList->transactionsMetaDataSize()) < _txsLimit; }); } else diff --git a/bcos-txpool/bcos-txpool/txpool/storage/MemoryStorage.h b/bcos-txpool/bcos-txpool/txpool/storage/MemoryStorage.h index 5af8fae11e..edd1794807 100644 --- a/bcos-txpool/bcos-txpool/txpool/storage/MemoryStorage.h +++ b/bcos-txpool/bcos-txpool/txpool/storage/MemoryStorage.h @@ -169,7 +169,7 @@ class MemoryStorage : public TxPoolStorageInterface, TxPoolConfig::Ptr m_config; using TxsMap = BucketMap, TransactionBucket>; + std::hash>; TxsMap m_txsTable, m_invalidTxs; using HashSet = BucketSet>;