Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
zjg555543 committed Oct 23, 2024
1 parent bc5324a commit 2d15ab1
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 28 deletions.
37 changes: 18 additions & 19 deletions validator/db/celldb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,20 +88,20 @@ void CellDbIn::start_up() {
};

CellDbBase::start_up();
td::RocksDbOptions db_options;
if (!opts_->get_disable_rocksdb_stats()) {
statistics_ = td::RocksDb::create_statistics();
statistics_flush_at_ = td::Timestamp::in(60.0);
snapshot_statistics_ = std::make_shared<td::RocksDbSnapshotStatistics>();
db_options.snapshot_statistics = snapshot_statistics_;
}
db_options.statistics = statistics_;
if (opts_->get_celldb_cache_size()) {
db_options.block_cache = td::RocksDb::create_cache(opts_->get_celldb_cache_size().value());
LOG(WARNING) << "Set CellDb block cache size to " << td::format::as_size(opts_->get_celldb_cache_size().value());
}
db_options.use_direct_reads = opts_->get_celldb_direct_io();
if (cell_db_ == NULL){
td::RocksDbOptions db_options;
if (!opts_->get_disable_rocksdb_stats()) {
statistics_ = td::RocksDb::create_statistics();
statistics_flush_at_ = td::Timestamp::in(60.0);
snapshot_statistics_ = std::make_shared<td::RocksDbSnapshotStatistics>();
db_options.snapshot_statistics = snapshot_statistics_;
}
db_options.statistics = statistics_;
if (opts_->get_celldb_cache_size()) {
db_options.block_cache = td::RocksDb::create_cache(opts_->get_celldb_cache_size().value());
LOG(WARNING) << "Set CellDb block cache size to " << td::format::as_size(opts_->get_celldb_cache_size().value());
}
db_options.use_direct_reads = opts_->get_celldb_direct_io();
LOG(INFO) << "CellDbIn using a new rocksdb instance";
cell_db_ = std::make_shared<td::RocksDb>(td::RocksDb::open(path_, std::move(db_options)).move_as_ok());
}
Expand Down Expand Up @@ -453,7 +453,7 @@ void CellDb::load_cell(RootHash hash, td::Promise<td::Ref<vm::DataCell>> promise
ranCount = 0;
}
if (!started_) {
td::actor::send_closure(cell_db_read_[ranNum], &CellDbIn::load_cell, hash, std::move(promise));
td::actor::send_closure(cell_db_read_[ranNum], &CellDbIn::load_cell, hash, std::move(promise));
} else {
auto P = td::PromiseCreator::lambda(
[cell_db_in = cell_db_read_[ranNum].get(), hash, promise = std::move(promise)](td::Result<td::Ref<vm::DataCell>> R) mutable {
Expand All @@ -468,28 +468,27 @@ void CellDb::load_cell(RootHash hash, td::Promise<td::Ref<vm::DataCell>> promise
}

void CellDb::store_cell(BlockIdExt block_id, td::Ref<vm::Cell> cell, td::Promise<td::Ref<vm::DataCell>> promise) {
td::actor::send_closure(cell_db_, &CellDbIn::store_cell, block_id, std::move(cell), std::move(promise));
td::actor::send_closure(cell_db_read_[0], &CellDbIn::store_cell, block_id, std::move(cell), std::move(promise));
}

void CellDb::get_cell_db_reader(td::Promise<std::shared_ptr<vm::CellDbReader>> promise) {
td::actor::send_closure(cell_db_, &CellDbIn::get_cell_db_reader, std::move(promise));
td::actor::send_closure(cell_db_read_[0], &CellDbIn::get_cell_db_reader, std::move(promise));
}

void CellDb::get_last_deleted_mc_state(td::Promise<BlockSeqno> promise) {
td::actor::send_closure(cell_db_, &CellDbIn::get_last_deleted_mc_state, std::move(promise));
td::actor::send_closure(cell_db_read_[0], &CellDbIn::get_last_deleted_mc_state, std::move(promise));
}

void CellDb::start_up() {
CellDbBase::start_up();
boc_ = vm::DynamicBagOfCellsDb::create();
boc_->set_celldb_compress_depth(opts_->get_celldb_compress_depth());
cell_db_ = td::actor::create_actor<CellDbIn>("celldbin", root_db_, actor_id(this), path_, opts_, rocks_db_);

for (int i = 0; i < THREAD_COUNTS; i++) {
cell_db_read_[i] = td::actor::create_actor<CellDbIn>("celldbin", root_db_, actor_id(this), path_, opts_, rocks_db_);
}
on_load_callback_ = [actor = std::make_shared<td::actor::ActorOwn<CellDbIn::MigrationProxy>>(
td::actor::create_actor<CellDbIn::MigrationProxy>("celldbmigration", cell_db_.get())),
td::actor::create_actor<CellDbIn::MigrationProxy>("celldbmigration", cell_db_read_[0].get())),
compress_depth = opts_->get_celldb_compress_depth()](const vm::CellLoader::LoadResult& res) {
if (res.cell_.is_null()) {
return;
Expand Down
1 change: 0 additions & 1 deletion validator/db/celldb.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ class CellDb : public CellDbBase {
std::string path_;
td::Ref<ValidatorManagerOptions> opts_;

td::actor::ActorOwn<CellDbIn> cell_db_;
td::actor::ActorOwn<CellDbIn> cell_db_read_[THREAD_COUNTS];
std::shared_ptr<vm::KeyValue> rocks_db_;

Expand Down
3 changes: 3 additions & 0 deletions validator/db/db-utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ void PercentileStats::clear() {
}

int GetDBRandomNum(){
if (THREAD_COUNTS <= 1) {
return 0;
}
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> distr(0, THREAD_COUNTS -1);
Expand Down
2 changes: 1 addition & 1 deletion validator/db/db-utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class PercentileStats {
std::multiset<double> values_;
};

const int THREAD_COUNTS = 10;
const int THREAD_COUNTS = 1;
int GetDBRandomNum();


Expand Down
10 changes: 4 additions & 6 deletions validator/db/rootdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ void RootDb::store_block_state(BlockHandle handle, td::Ref<ShardState> state,
td::actor::send_closure(b, &ArchiveManager::update_handle, std::move(handle), std::move(P));
}
});
td::actor::send_closure(cell_db_, &CellDb::store_cell, handle->id(), state->root_cell(), std::move(P));
td::actor::send_closure(cell_db_read_[0], &CellDb::store_cell, handle->id(), state->root_cell(), std::move(P));
} else {
get_block_state(handle, std::move(promise));
}
Expand Down Expand Up @@ -272,19 +272,19 @@ void RootDb::get_block_state(ConstBlockHandle handle, td::Promise<td::Ref<ShardS
LOG(ERROR) << "RootDb mailbox: " << this->get_name() << " " << this->get_actor_info_ptr()->mailbox().reader().calc_size() << ", ranNum: " << ranNum;
ranCount = 0;
}

td::actor::send_closure(cell_db_read_[ranNum], &CellDb::load_cell, handle->state(), std::move(P));
} else {
promise.set_error(td::Status::Error(ErrorCode::notready, "state not in db"));
}
}

void RootDb::get_cell_db_reader(td::Promise<std::shared_ptr<vm::CellDbReader>> promise) {
td::actor::send_closure(cell_db_, &CellDb::get_cell_db_reader, std::move(promise));
td::actor::send_closure(cell_db_read_[0], &CellDb::get_cell_db_reader, std::move(promise));
}

void RootDb::get_last_deleted_mc_state(td::Promise<BlockSeqno> promise) {
td::actor::send_closure(cell_db_, &CellDb::get_last_deleted_mc_state, std::move(promise));
td::actor::send_closure(cell_db_read_[0], &CellDb::get_last_deleted_mc_state, std::move(promise));
}

void RootDb::store_persistent_state_file(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::BufferSlice state,
Expand Down Expand Up @@ -429,8 +429,6 @@ void RootDb::start_up() {
db_options.use_direct_reads = opts_->get_celldb_direct_io();
auto path = root_path_ + "/celldb/";
auto rock_db = std::make_shared<td::RocksDb>(td::RocksDb::open(path, std::move(db_options)).move_as_ok());

cell_db_ = td::actor::create_actor<CellDb>("celldb", actor_id(this), path, opts_, rock_db);
for (int i = 0; i < THREAD_COUNTS; i++){
cell_db_read_[i] = td::actor::create_actor<CellDb>("celldb", actor_id(this), path, opts_, rock_db);
}
Expand Down
2 changes: 1 addition & 1 deletion validator/db/rootdb.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class RootDb : public Db {
std::string root_path_;
td::Ref<ValidatorManagerOptions> opts_;

td::actor::ActorOwn<CellDb> cell_db_;
// td::actor::ActorOwn<CellDb> cell_db_;
td::actor::ActorOwn<CellDb> cell_db_read_[THREAD_COUNTS];
td::actor::ActorOwn<StateDb> state_db_;
td::actor::ActorOwn<StaticFilesDb> static_files_db_;
Expand Down

0 comments on commit 2d15ab1

Please sign in to comment.