From f42af270e3f8d3b17abeab913e2b2a7e8765034f Mon Sep 17 00:00:00 2001 From: EmelyanenkoK Date: Fri, 11 Oct 2024 15:31:59 +0300 Subject: [PATCH] Use parallel write to celldb (#1264) * Parallel write in celldb * Add TD_PERF_COUNTER to gc_cell and store_cell * More error handling * Tests for prepare_commit_async * Install g++11 for ubuntu 20.04 --------- Co-authored-by: SpyCheese --- .../build-ton-linux-x86-64-shared.yml | 6 + crypto/test/test-db.cpp | 217 ++++++++++++---- crypto/vm/db/CellHashTable.h | 2 +- crypto/vm/db/CellStorage.cpp | 18 ++ crypto/vm/db/CellStorage.h | 1 + crypto/vm/db/DynamicBagOfCellsDb.cpp | 221 +++++++++++++++++ crypto/vm/db/DynamicBagOfCellsDb.h | 1 + crypto/vm/db/InMemoryBagOfCellsDb.cpp | 4 + validator/db/celldb.cpp | 232 ++++++++++++------ validator/db/celldb.hpp | 15 ++ 10 files changed, 586 insertions(+), 131 deletions(-) diff --git a/.github/workflows/build-ton-linux-x86-64-shared.yml b/.github/workflows/build-ton-linux-x86-64-shared.yml index fcef8afd5..e28ecc742 100644 --- a/.github/workflows/build-ton-linux-x86-64-shared.yml +++ b/.github/workflows/build-ton-linux-x86-64-shared.yml @@ -21,6 +21,12 @@ jobs: sudo apt-get update sudo apt-get install -y build-essential git cmake ninja-build zlib1g-dev libsecp256k1-dev libmicrohttpd-dev libsodium-dev liblz4-dev libjemalloc-dev + - if: matrix.os == 'ubuntu-20.04' + run: | + sudo apt install -y manpages-dev software-properties-common + sudo add-apt-repository ppa:ubuntu-toolchain-r/test + sudo apt update && sudo apt install gcc-11 g++-11 + - if: matrix.os != 'ubuntu-24.04' run: | wget https://apt.llvm.org/llvm.sh diff --git a/crypto/test/test-db.cpp b/crypto/test/test-db.cpp index d4059f051..dc7fcf370 100644 --- a/crypto/test/test-db.cpp +++ b/crypto/test/test-db.cpp @@ -61,10 +61,81 @@ #include "openssl/digest.hpp" #include "vm/dict.h" +#include +#include #include #include +#include namespace vm { +class ThreadExecutor : public DynamicBagOfCellsDb::AsyncExecutor { + public: + explicit ThreadExecutor(size_t threads_n) { + for (size_t i = 0; i < threads_n; ++i) { + threads_.emplace_back([this]() { + while (true) { + auto task = pop_task(); + if (!task) { + break; + } + CHECK(generation_.load() % 2 == 1); + task(); + } + }); + } + } + + ~ThreadExecutor() override { + for (size_t i = 0; i < threads_.size(); ++i) { + push_task({}); + } + for (auto &t : threads_) { + t.join(); + } + } + + void execute_async(std::function f) override { + push_task(std::move(f)); + } + + void execute_sync(std::function f) override { + auto x = generation_.load(); + std::scoped_lock lock(sync_mutex_); + CHECK(x == generation_); + CHECK(generation_.load() % 2 == 1); + f(); + CHECK(generation_.load() % 2 == 1); + } + void inc_generation() { + generation_.fetch_add(1); + } + + private: + std::atomic generation_{0}; + std::queue, size_t>> queue_; + std::mutex queue_mutex_; + std::condition_variable cv_; + std::mutex sync_mutex_; + std::vector threads_; + + std::function pop_task() { + std::unique_lock lock(queue_mutex_); + cv_.wait(lock, [&] { return !queue_.empty(); }); + CHECK(!queue_.empty()); + auto task = std::move(queue_.front()); + queue_.pop(); + CHECK(task.second == generation_); + return task.first; + } + + void push_task(std::function task) { + { + std::scoped_lock lock(queue_mutex_); + queue_.emplace(std::move(task), generation_.load()); + } + cv_.notify_one(); + } +}; std::vector do_get_serialization_modes() { std::vector res; @@ -890,25 +961,91 @@ TEST(TonDb, InMemoryDynamicBocSimple) { boc = DynamicBagOfCellsDb::create_in_memory(kv.get(), {}); } -void test_dynamic_boc(std::optional o_in_memory) { +int VERBOSITY_NAME(boc) = VERBOSITY_NAME(DEBUG) + 10; + +struct BocOptions { + std::shared_ptr async_executor; + std::optional o_in_memory; + td::uint64 seed{123}; + + auto create_dboc(td::KeyValueReader *kv, std::optional o_root_n) { + if (o_in_memory) { + auto res = DynamicBagOfCellsDb::create_in_memory(kv, *o_in_memory); + auto stats = res->get_stats().move_as_ok(); + if (o_root_n) { + ASSERT_EQ(*o_root_n, stats.roots_total_count); + } + VLOG(boc) << "reset roots_n=" << stats.roots_total_count << " cells_n=" << stats.cells_total_count; + return res; + } + return DynamicBagOfCellsDb::create(); + }; + void prepare_commit(DynamicBagOfCellsDb &dboc) { + if (async_executor) { + async_executor->inc_generation(); + std::latch latch(1); + td::Result res; + async_executor->execute_sync([&] { + dboc.prepare_commit_async(async_executor, [&](auto r) { + res = std::move(r); + latch.count_down(); + }); + }); + latch.wait(); + async_executor->execute_sync([&] {}); + async_executor->inc_generation(); + } else { + dboc.prepare_commit(); + } + } +}; + +template +void with_all_boc_options(F &&f, size_t tests_n = 500) { + LOG(INFO) << "Test dynamic boc"; + auto counter = [] { return td::NamedThreadSafeCounter::get_default().get_counter("DataCell").sum(); }; + auto run = [&](BocOptions options) { + LOG(INFO) << "\t" << (options.o_in_memory ? "in memory" : "on disk") << (options.async_executor ? " async" : ""); + if (options.o_in_memory) { + LOG(INFO) << "\t\tuse_arena=" << options.o_in_memory->use_arena + << " less_memory=" << options.o_in_memory->use_less_memory_during_creation; + } + for (td::uint32 i = 0; i < tests_n; i++) { + auto before = counter(); + options.seed = i == 0 ? 123 : i; + f(options); + auto after = counter(); + LOG_CHECK((options.o_in_memory && options.o_in_memory->use_arena) || before == after) + << before << " vs " << after; + } + }; + run({.async_executor = std::make_shared(4)}); + run({}); + for (auto use_arena : {false, true}) { + for (auto less_memory : {false, true}) { + run({.o_in_memory = + DynamicBagOfCellsDb::CreateInMemoryOptions{.extra_threads = std::thread::hardware_concurrency(), + .verbose = false, + .use_arena = use_arena, + .use_less_memory_during_creation = less_memory}}); + } + } +} + +void test_dynamic_boc(BocOptions options) { auto counter = [] { return td::NamedThreadSafeCounter::get_default().get_counter("DataCell").sum(); }; auto before = counter(); SCOPE_EXIT { - LOG_CHECK((o_in_memory && o_in_memory->use_arena) || before == counter()) << before << " vs " << counter(); - ; + LOG_CHECK((options.o_in_memory && options.o_in_memory->use_arena) || before == counter()) + << before << " vs " << counter(); }; - td::Random::Xorshift128plus rnd{123}; + td::Random::Xorshift128plus rnd{options.seed}; std::string old_root_hash; std::string old_root_serialization; auto kv = std::make_shared(); auto create_dboc = [&]() { - if (o_in_memory) { - auto res = DynamicBagOfCellsDb::create_in_memory(kv.get(), *o_in_memory); - auto roots_n = old_root_hash.empty() ? 0 : 1; - ASSERT_EQ(roots_n, res->get_stats().ok().roots_total_count); - return res; - } - return DynamicBagOfCellsDb::create(); + auto roots_n = old_root_hash.empty() ? 0 : 1; + return options.create_dboc(kv.get(), roots_n); }; auto dboc = create_dboc(); dboc->set_loader(std::make_unique(kv)); @@ -947,51 +1084,28 @@ void test_dynamic_boc(std::optional ASSERT_EQ(0u, kv->count("").ok()); } -template -void with_all_boc_options(F &&f) { - LOG(INFO) << "Test dynamic boc"; - LOG(INFO) << "\ton disk"; - f({}); - for (auto use_arena : {false, true}) { - for (auto less_memory : {false, true}) { - LOG(INFO) << "\tuse_arena=" << use_arena << " less_memory=" << less_memory; - f(DynamicBagOfCellsDb::CreateInMemoryOptions{.extra_threads = std::thread::hardware_concurrency(), - .verbose = false, - .use_arena = use_arena, - .use_less_memory_during_creation = less_memory}); - } - } -} TEST(TonDb, DynamicBoc) { - with_all_boc_options(test_dynamic_boc); + with_all_boc_options(test_dynamic_boc, 1); }; -void test_dynamic_boc2(std::optional o_in_memory) { - int VERBOSITY_NAME(boc) = VERBOSITY_NAME(DEBUG) + 10; - td::Random::Xorshift128plus rnd{123}; - int total_roots = 10000; - int max_roots = 20; +void test_dynamic_boc2(BocOptions options) { + td::Random::Xorshift128plus rnd{options.seed}; + + int total_roots = rnd.fast(1, !rnd.fast(0, 10) * 100 + 10); + int max_roots = rnd.fast(1, 20); int last_commit_at = 0; int first_root_id = 0; int last_root_id = 0; auto kv = std::make_shared(); - auto create_dboc = [&](td::int64 root_n) { - if (o_in_memory) { - auto res = DynamicBagOfCellsDb::create_in_memory(kv.get(), *o_in_memory); - auto stats = res->get_stats().move_as_ok(); - ASSERT_EQ(root_n, stats.roots_total_count); - VLOG(boc) << "reset roots_n=" << stats.roots_total_count << " cells_n=" << stats.cells_total_count; - return res; - } - return DynamicBagOfCellsDb::create(); - }; + auto create_dboc = [&](td::int64 root_n) { return options.create_dboc(kv.get(), root_n); }; auto dboc = create_dboc(0); dboc->set_loader(std::make_unique(kv)); auto counter = [] { return td::NamedThreadSafeCounter::get_default().get_counter("DataCell").sum(); }; auto before = counter(); - SCOPE_EXIT { - LOG_CHECK((o_in_memory && o_in_memory->use_arena) || before == counter()) << before << " vs " << counter(); + SCOPE_EXIT{ + // LOG_CHECK((options.o_in_memory && options.o_in_memory->use_arena) || before == counter()) + // << before << " vs " << counter(); }; std::vector> roots(max_roots); @@ -1009,7 +1123,7 @@ void test_dynamic_boc2(std::optional if (from_root.is_null()) { VLOG(boc) << " from db"; auto from_root_hash = root_hashes[root_id % max_roots]; - if (o_in_memory && (rnd() % 2 == 0)) { + if (rnd() % 2 == 0) { from_root = dboc->load_root(from_root_hash).move_as_ok(); } else { from_root = dboc->load_cell(from_root_hash).move_as_ok(); @@ -1041,7 +1155,8 @@ void test_dynamic_boc2(std::optional auto commit = [&] { VLOG(boc) << "commit"; - dboc->prepare_commit(); + //rnd.fast(0, 1); + options.prepare_commit(*dboc); { CellStorer cell_storer(*kv); dboc->commit(cell_storer); @@ -2147,18 +2262,18 @@ TEST(TonDb, BocRespectsUsageCell) { ASSERT_STREQ(serialization, serialization_of_virtualized_cell); } -void test_dynamic_boc_respectes_usage_cell(std::optional o_in_memory) { - td::Random::Xorshift128plus rnd(123); +void test_dynamic_boc_respectes_usage_cell(vm::BocOptions options) { + td::Random::Xorshift128plus rnd(options.seed); auto cell = vm::gen_random_cell(20, rnd, true); auto usage_tree = std::make_shared(); auto usage_cell = vm::UsageCell::create(cell, usage_tree->root_ptr()); auto kv = std::make_shared(); - auto dboc = o_in_memory ? vm::DynamicBagOfCellsDb::create_in_memory(kv.get(), *o_in_memory) - : vm::DynamicBagOfCellsDb::create(); + auto dboc = options.create_dboc(kv.get(), {}); dboc->set_loader(std::make_unique(kv)); dboc->inc(usage_cell); { + options.prepare_commit(*dboc); vm::CellStorer cell_storer(*kv); dboc->commit(cell_storer); } @@ -2171,7 +2286,7 @@ void test_dynamic_boc_respectes_usage_cell(std::optional void for_each(F &&f) { for (auto &info : set_) { - f(info); + f(const_cast(info)); } } template diff --git a/crypto/vm/db/CellStorage.cpp b/crypto/vm/db/CellStorage.cpp index aad7539e5..06df461ef 100644 --- a/crypto/vm/db/CellStorage.cpp +++ b/crypto/vm/db/CellStorage.cpp @@ -184,6 +184,24 @@ td::Result CellLoader::load(td::Slice hash, td::Slice va return res; } +td::Result CellLoader::load_refcnt(td::Slice hash) { + LoadResult res; + std::string serialized; + TRY_RESULT(get_status, reader_->get(hash, serialized)); + if (get_status != KeyValue::GetStatus::Ok) { + DCHECK(get_status == KeyValue::GetStatus::NotFound); + return res; + } + res.status = LoadResult::Ok; + td::TlParser parser(serialized); + td::parse(res.refcnt_, parser); + if (res.refcnt_ == -1) { + parse(res.refcnt_, parser); + } + TRY_STATUS(parser.get_status()); + return res; +} + CellStorer::CellStorer(KeyValue &kv) : kv_(kv) { } diff --git a/crypto/vm/db/CellStorage.h b/crypto/vm/db/CellStorage.h index 8c471d6ca..cabd7fdcb 100644 --- a/crypto/vm/db/CellStorage.h +++ b/crypto/vm/db/CellStorage.h @@ -50,6 +50,7 @@ class CellLoader { CellLoader(std::shared_ptr reader, std::function on_load_callback = {}); td::Result load(td::Slice hash, bool need_data, ExtCellCreator &ext_cell_creator); static td::Result load(td::Slice hash, td::Slice value, bool need_data, ExtCellCreator &ext_cell_creator); + td::Result load_refcnt(td::Slice hash); // This only loads refcnt_, cell_ == null private: std::shared_ptr reader_; diff --git a/crypto/vm/db/DynamicBagOfCellsDb.cpp b/crypto/vm/db/DynamicBagOfCellsDb.cpp index f1cdb3cef..b69cd8c0b 100644 --- a/crypto/vm/db/DynamicBagOfCellsDb.cpp +++ b/crypto/vm/db/DynamicBagOfCellsDb.cpp @@ -27,6 +27,9 @@ #include "td/utils/ThreadSafeCounter.h" #include "vm/cellslice.h" +#include +#include "td/actor/actor.h" +#include "common/delay.h" namespace vm { namespace { @@ -180,6 +183,9 @@ class DynamicBagOfCellsDbImpl : public DynamicBagOfCellsDb, private ExtCellCreat } td::Status prepare_commit() override { + if (pca_state_) { + return td::Status::Error("prepare_commit_async is not finished"); + } if (is_prepared_for_commit()) { return td::Status::OK(); } @@ -585,6 +591,221 @@ class DynamicBagOfCellsDbImpl : public DynamicBagOfCellsDb, private ExtCellCreat DynamicBocExtCellExtra{cell_db_reader_})); return std::move(res); } + + struct PrepareCommitAsyncState { + size_t remaining_ = 0; + std::shared_ptr executor_; + td::Promise promise_; + + struct CellInfo2 { + CellInfo *info{}; + std::vector parents; + unsigned remaining_children = 0; + Cell::Hash key() const { + return info->key(); + } + bool operator<(const CellInfo2 &other) const { + return key() < other.key(); + } + + friend bool operator<(const CellInfo2 &a, td::Slice b) { + return a.key().as_slice() < b; + } + + friend bool operator<(td::Slice a, const CellInfo2 &b) { + return a < b.key().as_slice(); + } + + struct Eq { + using is_transparent = void; // Pred to use + bool operator()(const CellInfo2 &info, const CellInfo2 &other_info) const { + return info.key() == other_info.key(); + } + bool operator()(const CellInfo2 &info, td::Slice hash) const { + return info.key().as_slice() == hash; + } + bool operator()(td::Slice hash, const CellInfo2 &info) const { + return info.key().as_slice() == hash; + } + }; + struct Hash { + using is_transparent = void; // Pred to use + using transparent_key_equal = Eq; + size_t operator()(td::Slice hash) const { + return cell_hash_slice_hash(hash); + } + size_t operator()(const CellInfo2 &info) const { + return cell_hash_slice_hash(info.key().as_slice()); + } + }; + }; + + CellHashTable cells_; + + std::queue load_queue_; + td::uint32 active_load_ = 0; + td::uint32 max_parallel_load_ = 4; + }; + std::unique_ptr pca_state_; + + void prepare_commit_async(std::shared_ptr executor, td::Promise promise) override { + hash_table_ = {}; + if (pca_state_) { + promise.set_error(td::Status::Error("Other prepare_commit_async is not finished")); + return; + } + if (is_prepared_for_commit()) { + promise.set_result(td::Unit()); + return; + } + pca_state_ = std::make_unique(); + pca_state_->executor_ = std::move(executor); + pca_state_->promise_ = std::move(promise); + for (auto &new_cell : to_inc_) { + dfs_new_cells_in_db_async(new_cell); + } + pca_state_->cells_.for_each([&](PrepareCommitAsyncState::CellInfo2 &info) { + ++pca_state_->remaining_; + if (info.remaining_children == 0) { + pca_load_from_db(&info); + } + }); + if (pca_state_->remaining_ == 0) { + prepare_commit_async_cont(); + } + } + + void dfs_new_cells_in_db_async(const td::Ref &cell, PrepareCommitAsyncState::CellInfo2 *parent = nullptr) { + bool exists = true; + pca_state_->cells_.apply(cell->get_hash().as_slice(), [&](PrepareCommitAsyncState::CellInfo2 &info) { + if (info.info == nullptr) { + exists = false; + info.info = &get_cell_info(cell); + } + }); + auto info = pca_state_->cells_.get_if_exists(cell->get_hash().as_slice()); + if (parent) { + info->parents.push_back(parent); + ++parent->remaining_children; + } + if (exists) { + return; + } + if (cell->is_loaded()) { + vm::CellSlice cs(vm::NoVm{}, cell); + for (unsigned i = 0; i < cs.size_refs(); i++) { + dfs_new_cells_in_db_async(cs.prefetch_ref(i), info); + } + } + } + + void pca_load_from_db(PrepareCommitAsyncState::CellInfo2 *info) { + if (pca_state_->active_load_ >= pca_state_->max_parallel_load_) { + pca_state_->load_queue_.push(info); + return; + } + ++pca_state_->active_load_; + pca_state_->executor_->execute_async( + [db = this, info, executor = pca_state_->executor_, loader = *loader_]() mutable { + auto res = loader.load_refcnt(info->info->cell->get_hash().as_slice()).move_as_ok(); + executor->execute_sync([db, info, res = std::move(res)]() { + --db->pca_state_->active_load_; + db->pca_process_load_queue(); + db->pca_set_in_db(info, std::move(res)); + }); + }); + } + + void pca_process_load_queue() { + while (pca_state_->active_load_ < pca_state_->max_parallel_load_ && !pca_state_->load_queue_.empty()) { + PrepareCommitAsyncState::CellInfo2 *info = pca_state_->load_queue_.front(); + pca_state_->load_queue_.pop(); + pca_load_from_db(info); + } + } + + void pca_set_in_db(PrepareCommitAsyncState::CellInfo2 *info, CellLoader::LoadResult result) { + info->info->sync_with_db = true; + if (result.status == CellLoader::LoadResult::Ok) { + info->info->in_db = true; + info->info->db_refcnt = result.refcnt(); + } else { + info->info->in_db = false; + } + for (PrepareCommitAsyncState::CellInfo2 *parent_info : info->parents) { + if (parent_info->info->sync_with_db) { + continue; + } + if (!info->info->in_db) { + pca_set_in_db(parent_info, {}); + } else if (--parent_info->remaining_children == 0) { + pca_load_from_db(parent_info); + } + } + CHECK(pca_state_->remaining_ != 0); + if (--pca_state_->remaining_ == 0) { + prepare_commit_async_cont(); + } + } + + void prepare_commit_async_cont() { + for (auto &new_cell : to_inc_) { + auto &new_cell_info = get_cell_info(new_cell); + dfs_new_cells(new_cell_info); + } + + CHECK(pca_state_->remaining_ == 0); + for (auto &old_cell : to_dec_) { + auto &old_cell_info = get_cell_info(old_cell); + dfs_old_cells_async(old_cell_info); + } + if (pca_state_->remaining_ == 0) { + prepare_commit_async_cont2(); + } + } + + void dfs_old_cells_async(CellInfo &info) { + if (!info.was) { + info.was = true; + visited_.push_back(&info); + if (!info.sync_with_db) { + ++pca_state_->remaining_; + load_cell_async( + info.cell->get_hash().as_slice(), pca_state_->executor_, + [executor = pca_state_->executor_, db = this, info = &info](td::Result> R) { + R.ensure(); + executor->execute_sync([db, info]() { + CHECK(info->sync_with_db); + db->dfs_old_cells_async(*info); + if (--db->pca_state_->remaining_ == 0) { + db->prepare_commit_async_cont2(); + } + }); + }); + return; + } + } + info.refcnt_diff--; + if (!info.sync_with_db) { + return; + } + auto new_refcnt = info.refcnt_diff + info.db_refcnt; + CHECK(new_refcnt >= 0); + if (new_refcnt != 0) { + return; + } + + for_each(info, [this](auto &child_info) { dfs_old_cells_async(child_info); }); + } + + void prepare_commit_async_cont2() { + save_diff_prepare(); + to_inc_.clear(); + to_dec_.clear(); + pca_state_->promise_.set_result(td::Unit()); + pca_state_ = {}; + } + }; } // namespace diff --git a/crypto/vm/db/DynamicBagOfCellsDb.h b/crypto/vm/db/DynamicBagOfCellsDb.h index bc0b6e9ef..62864ad97 100644 --- a/crypto/vm/db/DynamicBagOfCellsDb.h +++ b/crypto/vm/db/DynamicBagOfCellsDb.h @@ -105,6 +105,7 @@ class DynamicBagOfCellsDb { virtual void load_cell_async(td::Slice hash, std::shared_ptr executor, td::Promise> promise) = 0; + virtual void prepare_commit_async(std::shared_ptr executor, td::Promise promise) = 0; }; } // namespace vm diff --git a/crypto/vm/db/InMemoryBagOfCellsDb.cpp b/crypto/vm/db/InMemoryBagOfCellsDb.cpp index aa8f7910b..03cad0934 100644 --- a/crypto/vm/db/InMemoryBagOfCellsDb.cpp +++ b/crypto/vm/db/InMemoryBagOfCellsDb.cpp @@ -848,6 +848,10 @@ class InMemoryBagOfCellsDb : public DynamicBagOfCellsDb { to_inc_ = {}; return td::Status::OK(); } + void prepare_commit_async(std::shared_ptr executor, td::Promise promise) override { + TRY_STATUS_PROMISE(promise, prepare_commit()); + promise.set_value(td::Unit()); + } Stats get_stats_diff() override { LOG(FATAL) << "Not implemented"; return {}; diff --git a/validator/db/celldb.cpp b/validator/db/celldb.cpp index 91e9054e8..0ac66d7ce 100644 --- a/validator/db/celldb.cpp +++ b/validator/db/celldb.cpp @@ -161,16 +161,33 @@ void CellDbIn::start_up() { } void CellDbIn::load_cell(RootHash hash, td::Promise> promise) { + if (db_busy_) { + action_queue_.push([self = this, hash, promise = std::move(promise)](td::Result R) mutable { + R.ensure(); + self->load_cell(hash, std::move(promise)); + }); + return; + } if (opts_->get_celldb_in_memory()) { auto result = boc_->load_root(hash.as_slice()); async_apply("load_cell_result", std::move(promise), std::move(result)); return; } - boc_->load_cell_async(hash.as_slice(), async_executor, std::move(promise)); + auto cell = boc_->load_cell(hash.as_slice()); + delay_action( + [cell = std::move(cell), promise = std::move(promise)]() mutable { promise.set_result(std::move(cell)); }, + td::Timestamp::now()); } void CellDbIn::store_cell(BlockIdExt block_id, td::Ref cell, td::Promise> promise) { - TD_PERF_COUNTER(celldb_store_cell); + if (db_busy_) { + action_queue_.push( + [self = this, block_id, cell = std::move(cell), promise = std::move(promise)](td::Result R) mutable { + R.ensure(); + self->store_cell(block_id, std::move(cell), std::move(promise)); + }); + return; + } td::PerfWarningTimer timer{"storecell", 0.1}; auto key_hash = get_key_hash(block_id); auto R = get_block(key_hash); @@ -180,49 +197,71 @@ void CellDbIn::store_cell(BlockIdExt block_id, td::Ref cell, td::Promi return; } - auto empty = get_empty_key_hash(); - auto ER = get_block(empty); - ER.ensure(); - auto E = ER.move_as_ok(); - - auto PR = get_block(E.prev); - PR.ensure(); - auto P = PR.move_as_ok(); - CHECK(P.next == empty); - - DbEntry D{block_id, E.prev, empty, cell->get_hash().bits()}; - - E.prev = key_hash; - P.next = key_hash; - - if (P.is_empty()) { - E.next = key_hash; - P.prev = key_hash; - } - boc_->inc(cell); - boc_->prepare_commit().ensure(); - vm::CellStorer stor{*cell_db_.get()}; - cell_db_->begin_write_batch().ensure(); - boc_->commit(stor).ensure(); - set_block(empty, std::move(E)); - set_block(D.prev, std::move(P)); - set_block(key_hash, std::move(D)); - cell_db_->commit_write_batch().ensure(); - - if (!opts_->get_celldb_in_memory()) { - boc_->set_loader(std::make_unique(cell_db_->snapshot(), on_load_callback_)).ensure(); - td::actor::send_closure(parent_, &CellDb::update_snapshot, cell_db_->snapshot()); - } + db_busy_ = true; + boc_->prepare_commit_async(async_executor, [=, this, SelfId = actor_id(this), timer = std::move(timer), + timer_prepare = td::Timer{}, promise = std::move(promise), + cell = std::move(cell)](td::Result Res) mutable { + Res.ensure(); + timer_prepare.pause(); + td::actor::send_lambda( + SelfId, [=, this, timer = std::move(timer), promise = std::move(promise), cell = std::move(cell)]() mutable { + TD_PERF_COUNTER(celldb_store_cell); + auto empty = get_empty_key_hash(); + auto ER = get_block(empty); + ER.ensure(); + auto E = ER.move_as_ok(); + + auto PR = get_block(E.prev); + PR.ensure(); + auto P = PR.move_as_ok(); + CHECK(P.next == empty); + + DbEntry D{block_id, E.prev, empty, cell->get_hash().bits()}; + + E.prev = key_hash; + P.next = key_hash; + + if (P.is_empty()) { + E.next = key_hash; + P.prev = key_hash; + } + td::Timer timer_write; + vm::CellStorer stor{*cell_db_}; + cell_db_->begin_write_batch().ensure(); + boc_->commit(stor).ensure(); + set_block(get_empty_key_hash(), std::move(E)); + set_block(D.prev, std::move(P)); + set_block(key_hash, std::move(D)); + cell_db_->commit_write_batch().ensure(); + timer_write.pause(); + + if (!opts_->get_celldb_in_memory()) { + boc_->set_loader(std::make_unique(cell_db_->snapshot(), on_load_callback_)).ensure(); + td::actor::send_closure(parent_, &CellDb::update_snapshot, cell_db_->snapshot()); + } - promise.set_result(boc_->load_cell(cell->get_hash().as_slice())); - if (!opts_->get_disable_rocksdb_stats()) { - cell_db_statistics_.store_cell_time_.insert(timer.elapsed() * 1e6); - } - LOG(DEBUG) << "Stored state " << block_id.to_str(); + promise.set_result(boc_->load_cell(cell->get_hash().as_slice())); + if (!opts_->get_disable_rocksdb_stats()) { + cell_db_statistics_.store_cell_time_.insert(timer.elapsed() * 1e6); + cell_db_statistics_.store_cell_prepare_time_.insert(timer_prepare.elapsed() * 1e6); + cell_db_statistics_.store_cell_write_time_.insert(timer_write.elapsed() * 1e6); + } + LOG(DEBUG) << "Stored state " << block_id.to_str(); + release_db(); + }); + }); } void CellDbIn::get_cell_db_reader(td::Promise> promise) { + if (db_busy_) { + action_queue_.push( + [self = this, promise = std::move(promise)](td::Result R) mutable { + R.ensure(); + self->get_cell_db_reader(std::move(promise)); + }); + return; + } promise.set_result(boc_->get_cell_db_reader()); } @@ -265,6 +304,13 @@ void CellDbIn::flush_db_stats() { if (opts_->get_disable_rocksdb_stats()) { return; } + if (db_busy_) { + action_queue_.push([self = this](td::Result R) mutable { + R.ensure(); + self->flush_db_stats(); + }); + return; + } auto celldb_stats = prepare_stats(); td::StringBuilder ss; @@ -353,7 +399,14 @@ void CellDbIn::gc_cont(BlockHandle handle) { } void CellDbIn::gc_cont2(BlockHandle handle) { - TD_PERF_COUNTER(celldb_gc_cell); + if (db_busy_) { + action_queue_.push([self = this, handle = std::move(handle)](td::Result R) mutable { + R.ensure(); + self->gc_cont2(handle); + }); + return; + } + td::PerfWarningTimer timer{"gccell", 0.1}; td::PerfWarningTimer timer_all{"gccell_all", 0.05}; @@ -382,46 +435,58 @@ void CellDbIn::gc_cont2(BlockHandle handle) { auto cell = boc_->load_cell(F.root_hash.as_slice()).move_as_ok(); boc_->dec(cell); - boc_->prepare_commit().ensure(); - vm::CellStorer stor{*cell_db_}; - timer_boc.reset(); - - td::PerfWarningTimer timer_write_batch{"gccell_write_batch", 0.05}; - cell_db_->begin_write_batch().ensure(); - boc_->commit(stor).ensure(); - - cell_db_->erase(get_key(key_hash)).ensure(); - set_block(F.prev, std::move(P)); - set_block(F.next, std::move(N)); - cell_db_->commit_write_batch().ensure(); - alarm_timestamp() = td::Timestamp::now(); - timer_write_batch.reset(); - - td::PerfWarningTimer timer_free_cells{"gccell_free_cells", 0.05}; - auto before = td::ref_get_delete_count(); - cell = {}; - auto after = td::ref_get_delete_count(); - if (timer_free_cells.elapsed() > 0.04) { - LOG(ERROR) << "deleted " << after - before << " cells"; - } - timer_free_cells.reset(); + db_busy_ = true; + boc_->prepare_commit_async( + async_executor, [this, SelfId = actor_id(this), timer_boc = std::move(timer_boc), F = std::move(F), key_hash, + P = std::move(P), N = std::move(N), cell = std::move(cell), timer = std::move(timer), + timer_all = std::move(timer_all), handle](td::Result R) mutable { + R.ensure(); + td::actor::send_lambda(SelfId, [this, timer_boc = std::move(timer_boc), F = std::move(F), key_hash, + P = std::move(P), N = std::move(N), cell = std::move(cell), + timer = std::move(timer), timer_all = std::move(timer_all), handle]() mutable { + TD_PERF_COUNTER(celldb_gc_cell); + vm::CellStorer stor{*cell_db_}; + timer_boc.reset(); + + td::PerfWarningTimer timer_write_batch{"gccell_write_batch", 0.05}; + cell_db_->begin_write_batch().ensure(); + boc_->commit(stor).ensure(); + + cell_db_->erase(get_key(key_hash)).ensure(); + set_block(F.prev, std::move(P)); + set_block(F.next, std::move(N)); + cell_db_->commit_write_batch().ensure(); + alarm_timestamp() = td::Timestamp::now(); + timer_write_batch.reset(); + + td::PerfWarningTimer timer_free_cells{"gccell_free_cells", 0.05}; + auto before = td::ref_get_delete_count(); + cell = {}; + auto after = td::ref_get_delete_count(); + if (timer_free_cells.elapsed() > 0.04) { + LOG(ERROR) << "deleted " << after - before << " cells"; + } + timer_free_cells.reset(); - td::PerfWarningTimer timer_finish{"gccell_finish", 0.05}; - if (!opts_->get_celldb_in_memory()) { - boc_->set_loader(std::make_unique(cell_db_->snapshot(), on_load_callback_)).ensure(); - td::actor::send_closure(parent_, &CellDb::update_snapshot, cell_db_->snapshot()); - } + td::PerfWarningTimer timer_finish{"gccell_finish", 0.05}; + if (!opts_->get_celldb_in_memory()) { + boc_->set_loader(std::make_unique(cell_db_->snapshot(), on_load_callback_)).ensure(); + td::actor::send_closure(parent_, &CellDb::update_snapshot, cell_db_->snapshot()); + } - DCHECK(get_block(key_hash).is_error()); - if (!opts_->get_disable_rocksdb_stats()) { - cell_db_statistics_.gc_cell_time_.insert(timer.elapsed() * 1e6); - } - if (handle->id().is_masterchain()) { - last_deleted_mc_state_ = handle->id().seqno(); - } - LOG(DEBUG) << "Deleted state " << handle->id().to_str(); - timer_finish.reset(); - timer_all.reset(); + DCHECK(get_block(key_hash).is_error()); + if (!opts_->get_disable_rocksdb_stats()) { + cell_db_statistics_.gc_cell_time_.insert(timer.elapsed() * 1e6); + } + if (handle->id().is_masterchain()) { + last_deleted_mc_state_ = handle->id().seqno(); + } + LOG(DEBUG) << "Deleted state " << handle->id().to_str(); + timer_finish.reset(); + timer_all.reset(); + release_db(); + }); + }); } void CellDbIn::skip_gc() { @@ -481,6 +546,13 @@ void CellDbIn::migrate_cell(td::Bits256 hash) { void CellDbIn::migrate_cells() { migrate_after_ = td::Timestamp::never(); + if (db_busy_) { + action_queue_.push([self = this](td::Result R) mutable { + R.ensure(); + self->migrate_cells(); + }); + return; + } if (cells_to_migrate_.empty()) { migration_active_ = false; return; @@ -617,6 +689,8 @@ td::BufferSlice CellDbIn::DbEntry::release() { std::vector> CellDbIn::CellDbStatistics::prepare_stats() { std::vector> stats; stats.emplace_back("store_cell.micros", PSTRING() << store_cell_time_.to_string()); + stats.emplace_back("store_cell.prepare.micros", PSTRING() << store_cell_prepare_time_.to_string()); + stats.emplace_back("store_cell.write.micros", PSTRING() << store_cell_write_time_.to_string()); stats.emplace_back("gc_cell.micros", PSTRING() << gc_cell_time_.to_string()); stats.emplace_back("total_time.micros", PSTRING() << (td::Timestamp::now().at() - stats_start_time_.at()) * 1e6); stats.emplace_back("in_memory", PSTRING() << bool(in_memory_load_time_)); diff --git a/validator/db/celldb.hpp b/validator/db/celldb.hpp index 7cd36689b..d2b5eabcb 100644 --- a/validator/db/celldb.hpp +++ b/validator/db/celldb.hpp @@ -30,6 +30,7 @@ #include "td/db/RocksDb.h" #include +#include namespace rocksdb { class Statistics; @@ -137,6 +138,8 @@ class CellDbIn : public CellDbBase { struct CellDbStatistics { PercentileStats store_cell_time_; + PercentileStats store_cell_prepare_time_; + PercentileStats store_cell_write_time_; PercentileStats gc_cell_time_; td::Timestamp stats_start_time_ = td::Timestamp::now(); std::optional in_memory_load_time_; @@ -154,6 +157,18 @@ class CellDbIn : public CellDbBase { td::Timestamp statistics_flush_at_ = td::Timestamp::never(); BlockSeqno last_deleted_mc_state_ = 0; + bool db_busy_ = false; + std::queue> action_queue_; + + void release_db() { + db_busy_ = false; + while (!db_busy_ && !action_queue_.empty()) { + auto action = std::move(action_queue_.front()); + action_queue_.pop(); + action.set_value(td::Unit()); + } + } + public: class MigrationProxy : public td::actor::Actor { public: