Skip to content

Commit

Permalink
Out msg queues: improve logs, various small changes
Browse files Browse the repository at this point in the history
  • Loading branch information
SpyCheese committed Nov 27, 2024
1 parent 3dce9d1 commit 5d79855
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 79 deletions.
15 changes: 7 additions & 8 deletions ton/ton-types.h
Original file line number Diff line number Diff line change
Expand Up @@ -428,15 +428,14 @@ struct Ed25519_PublicKey {

struct OutMsgQueueProofBroadcast : public td::CntObject {
OutMsgQueueProofBroadcast(ShardIdFull dst_shard, BlockIdExt block_id, td::int32 max_bytes, td::int32 max_msgs,
td::BufferSlice queue_proofs, td::BufferSlice block_state_proofs,
std::vector<std::int32_t> msg_counts)
td::BufferSlice queue_proof, td::BufferSlice block_state_proof, int msg_count)
: dst_shard(std::move(dst_shard))
, block_id(block_id)
, max_bytes(max_bytes)
, max_msgs(max_msgs)
, queue_proofs(std::move(queue_proofs))
, block_state_proofs(std::move(block_state_proofs))
, msg_counts(std::move(msg_counts)) {
, queue_proofs(std::move(queue_proof))
, block_state_proofs(std::move(block_state_proof))
, msg_count(std::move(msg_count)) {
}
ShardIdFull dst_shard;
BlockIdExt block_id;
Expand All @@ -448,11 +447,11 @@ struct OutMsgQueueProofBroadcast : public td::CntObject {
// outMsgQueueProof
td::BufferSlice queue_proofs;
td::BufferSlice block_state_proofs;
std::vector<std::int32_t> msg_counts;
int msg_count;

virtual OutMsgQueueProofBroadcast* make_copy() const {
OutMsgQueueProofBroadcast* make_copy() const override {
return new OutMsgQueueProofBroadcast(dst_shard, block_id, max_bytes, max_msgs, queue_proofs.clone(),
block_state_proofs.clone(), msg_counts);
block_state_proofs.clone(), msg_count);
}
};

Expand Down
3 changes: 1 addition & 2 deletions validator-session/validator-session-types.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,8 @@ struct EndValidatorGroupStats {
};

struct BlockSourceInfo {
td::uint32 round, first_block_round;
PublicKey source;
td::int32 source_priority;
BlockCandidatePriority priority;
};

} // namespace validatorsession
Expand Down
18 changes: 10 additions & 8 deletions validator-session/validator-session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -553,9 +553,9 @@ void ValidatorSessionImpl::check_generate_slot() {
LOG(WARNING) << print_id << ": failed to generate block candidate: " << R.move_as_error();
}
});
callback_->on_generate_slot(
BlockSourceInfo{cur_round_, first_block_round_, description().get_source_public_key(local_idx()), priority},
std::move(P));
callback_->on_generate_slot(BlockSourceInfo{description().get_source_public_key(local_idx()),
BlockCandidatePriority{cur_round_, first_block_round_, priority}},
std::move(P));
} else {
alarm_timestamp().relax(t);
}
Expand Down Expand Up @@ -634,8 +634,9 @@ void ValidatorSessionImpl::try_approve_block(const SentBlock *block) {
pending_approve_.insert(block_id);

callback_->on_candidate(
BlockSourceInfo{cur_round_, first_block_round_, description().get_source_public_key(block->get_src_idx()),
description().get_node_priority(block->get_src_idx(), cur_round_)},
BlockSourceInfo{description().get_source_public_key(block->get_src_idx()),
BlockCandidatePriority{cur_round_, first_block_round_,
description().get_node_priority(block->get_src_idx(), cur_round_)}},
B->root_hash_, B->data_.clone(), B->collated_data_.clone(), std::move(P));
} else if (T.is_in_past()) {
if (!active_requests_.count(block_id)) {
Expand Down Expand Up @@ -909,9 +910,10 @@ void ValidatorSessionImpl::on_new_round(td::uint32 round) {
stats.rounds.pop_back();
}

BlockSourceInfo source_info{cur_round_, first_block_round_,
description().get_source_public_key(block->get_src_idx()),
description().get_node_priority(block->get_src_idx(), cur_round_)};
BlockSourceInfo source_info{
description().get_source_public_key(block->get_src_idx()),
BlockCandidatePriority{cur_round_, first_block_round_,
description().get_node_priority(block->get_src_idx(), cur_round_)}};
if (it == blocks_.end()) {
callback_->on_block_committed(std::move(source_info), block->get_root_hash(), block->get_file_hash(),
td::BufferSlice(), std::move(export_sigs), std::move(export_approve_sigs),
Expand Down
13 changes: 9 additions & 4 deletions validator/full-node-fast-sync-overlays.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ void FullNodeFastSyncOverlay::process_block_broadcast(PublicKeyHash src, ton_api
}

void FullNodeFastSyncOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_outMsgQueueProofBroadcast &query) {
if (src == local_id_.pubkey_hash()) {
return; // dropping broadcast from self
}
BlockIdExt block_id = create_block_id(query.block_);
ShardIdFull shard_id = create_shard_id(query.dst_shard_);
if (query.proof_->get_id() != ton_api::tonNode_outMsgQueueProof::ID) {
Expand All @@ -68,7 +71,8 @@ void FullNodeFastSyncOverlay::process_broadcast(PublicKeyHash src, ton_api::tonN
}
auto proof = std::move(R.move_as_ok()[0]);

LOG(INFO) << "got tonNode.outMsgQueueProofBroadcast " << shard_id.to_str() << " " << block_id.to_str();
LOG(INFO) << "got tonNode.outMsgQueueProofBroadcast to " << shard_id.to_str() << " from " << block_id.to_str()
<< ", msgs=" << proof->msg_count_ << ", size=" << tl_proof->queue_proofs_.size();
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::add_out_msg_queue_proof, shard_id,
std::move(proof));
}
Expand Down Expand Up @@ -236,9 +240,10 @@ void FullNodeFastSyncOverlay::send_out_msg_queue_proof_broadcast(td::Ref<OutMsgQ
create_tl_object<ton_api::tonNode_importedMsgQueueLimits>(broadcast->max_bytes, broadcast->max_msgs),
create_tl_object<ton_api::tonNode_outMsgQueueProof>(broadcast->queue_proofs.clone(),
broadcast->block_state_proofs.clone(),
std::vector<std::int32_t>(broadcast->msg_counts)));
VLOG(FULL_NODE_DEBUG) << "Sending outMsgQueueProof in fast sync overlay: " << broadcast->dst_shard.to_str() << " "
<< broadcast->block_id.to_str();
std::vector<td::int32>(1, broadcast->msg_count)));
VLOG(FULL_NODE_DEBUG) << "Sending outMsgQueueProof in fast sync overlay to " << broadcast->dst_shard.to_str()
<< " from " << broadcast->block_id.to_str() << ", msgs=" << broadcast->msg_count
<< " bytes=" << broadcast->queue_proofs.size();
td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, local_id_, overlay_id_,
local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), std::move(B));
}
Expand Down
9 changes: 7 additions & 2 deletions validator/full-node-shard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -703,8 +703,13 @@ void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNod
promise.set_result(serialize_tl_object(R.move_as_ok(), true));
}
});
VLOG(FULL_NODE_DEBUG) << "Got query getOutMsgQueueProof (" << blocks.size() << " blocks) to shard "
<< dst_shard.to_str() << " from " << src;
FLOG(DEBUG) {
sb << "Got query getOutMsgQueueProof to shard " << dst_shard.to_str() << " from blocks";
for (const BlockIdExt &id : blocks) {
sb << " " << id.id.to_str();
}
sb << " from " << src;
};
td::actor::create_actor<BuildOutMsgQueueProof>("buildqueueproof", dst_shard, std::move(blocks), limits,
validator_manager_, std::move(P))
.release();
Expand Down
18 changes: 12 additions & 6 deletions validator/impl/collator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5854,33 +5854,39 @@ bool Collator::create_block_candidate() {
block_candidate =
std::make_unique<BlockCandidate>(created_by_, new_block_id_ext, block::compute_file_hash(cdata_slice.as_slice()),
blk_slice.clone(), cdata_slice.clone());
const bool need_out_msg_queue_broadcasts = true;
const bool need_out_msg_queue_broadcasts = !is_masterchain();
if (need_out_msg_queue_broadcasts) {
// we can't generate two proofs at the same time for the same root (it is not currently supported by cells)
// so we have can't reuse new state and have to regenerate it with merkle update
auto new_state = vm::MerkleUpdate::apply(prev_state_root_pure_, state_update);
CHECK(new_state.not_null());
CHECK(new_state->get_hash() == state_root->get_hash());
assert(config_ && shard_conf_);
CHECK(shard_conf_);
auto neighbor_list = shard_conf_->get_neighbor_shard_hash_ids(shard_);
LOG(INFO) << "Build OutMsgQueueProofs for " << neighbor_list.size() << " neighbours";
for (ton::BlockId blk_id : neighbor_list) {
for (BlockId blk_id : neighbor_list) {
auto prefix = blk_id.shard_full();
if (shard_intersects(prefix, shard_)) {
continue;
}
auto limits = mc_state_->get_imported_msg_queue_limits(blk_id.workchain);

// one could use monitor_min_split_depth here, to decrease number of broadcasts
// but current implementation OutMsgQueueImporter doesn't support it

auto r_proof = OutMsgQueueProof::build(
prefix, {OutMsgQueueProof::OneBlock{.id = new_block_id_ext, .state_root = new_state, .block_root = new_block}}, limits);
prefix,
{OutMsgQueueProof::OneBlock{.id = new_block_id_ext, .state_root = new_state, .block_root = new_block}},
limits);
if (r_proof.is_ok()) {
auto proof = r_proof.move_as_ok();
CHECK(proof->msg_counts_.size() == 1);
block_candidate->out_msg_queue_proof_broadcasts.push_back(td::Ref<OutMsgQueueProofBroadcast>(
true, OutMsgQueueProofBroadcast(prefix, new_block_id_ext, limits.max_bytes, limits.max_msgs,
std::move(proof->queue_proofs_), std::move(proof->block_state_proofs_),
std::move(proof->msg_counts_))));
proof->msg_counts_[0])));
} else {
LOG(ERROR) << "Failed to build OutMsgQueueProof: " << r_proof.error();
LOG(ERROR) << "Failed to build OutMsgQueueProof to " << prefix.to_str() << ": " << r_proof.error();
}
}
}
Expand Down
75 changes: 41 additions & 34 deletions validator/impl/out-msg-queue-proof.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,29 +92,6 @@ static td::Result<std::vector<td::int32>> process_queue(
++msg_count[kv->source];
++msg_count_total;

// TODO: Get processed_upto from destination shard (in request?)
/*
// Parse message to check if it was processed (as in Collator::process_inbound_message)
ton::LogicalTime enqueued_lt = kv->msg->prefetch_ulong(64);
auto msg_env = kv->msg->prefetch_ref();
block::tlb::MsgEnvelope::Record_std env;
if (!tlb::unpack_cell(msg_env, env)) {
return td::Status::Error("cannot unpack MsgEnvelope of an internal message");
}
vm::CellSlice cs{vm::NoVmOrd{}, env.msg};
block::gen::CommonMsgInfo::Record_int_msg_info info;
if (!tlb::unpack(cs, info)) {
return td::Status::Error("cannot unpack CommonMsgInfo of an internal message");
}
auto src_prefix = block::tlb::MsgAddressInt::get_prefix(info.src);
auto dest_prefix = block::tlb::MsgAddressInt::get_prefix(info.dest);
auto cur_prefix = block::interpolate_addr(src_prefix, dest_prefix, env.cur_addr);
auto next_prefix = block::interpolate_addr(src_prefix, dest_prefix, env.next_addr);
block::EnqueuedMsgDescr descr{cur_prefix, next_prefix, kv->lt, enqueued_lt, env.msg->get_hash().bits()};
if (dst_processed_upto->already_processed(descr)) {
} else {
}*/

dfs_cs(*kv->msg);
TRY_STATUS_PREFIX(check_no_prunned(*kv->msg), "invalid message proof: ")
if (estimated_proof_size >= limits.max_bytes || msg_count_total >= (long long)limits.max_msgs) {
Expand Down Expand Up @@ -301,7 +278,12 @@ void OutMsgQueueImporter::get_neighbor_msg_queue_proofs(
return;
}

LOG(DEBUG) << "Importing neighbor msg queues for shard " << dst_shard.to_str() << ", " << blocks.size() << " blocks";
FLOG(DEBUG) {
sb << "Importing neighbor msg queues for shard " << dst_shard.to_str() << ", " << blocks.size() << " blocks:";
for (const BlockIdExt& block : blocks) {
sb << " " << block.id.to_str();
}
};

cache_[{dst_shard, blocks}] = entry = std::make_shared<CacheEntry>();
entry->dst_shard = dst_shard;
Expand All @@ -321,7 +303,7 @@ void OutMsgQueueImporter::get_neighbor_msg_queue_proofs(
prefix = shard_prefix(prefix, min_split);
}

LOG(INFO) << "search for out msg queue proof " << prefix.to_str() << block.to_str();
LOG(DEBUG) << "search for out msg queue proof " << prefix.to_str() << " " << block.to_str();
auto& small_entry = small_cache_[std::make_pair(dst_shard, block)];
if (!small_entry.result.is_null()) {
entry->result[block] = small_entry.result;
Expand Down Expand Up @@ -397,7 +379,13 @@ void OutMsgQueueImporter::get_proof_import(std::shared_ptr<CacheEntry> entry, st
[=, SelfId = actor_id(this), retry_after = td::Timestamp::in(0.1),
dst_shard = entry->dst_shard](td::Result<std::vector<td::Ref<OutMsgQueueProof>>> R) {
if (R.is_error()) {
LOG(DEBUG) << "Failed to get out msg queue for " << dst_shard.to_str() << ": " << R.move_as_error();
FLOG(DEBUG) {
sb << "Failed to get out msg queue for " << dst_shard.to_str() << " from";
for (const BlockIdExt &block : blocks) {
sb << " " << block.id.to_str();
}
sb << ": " << R.move_as_error();
};
delay_action(
[=]() {
td::actor::send_closure(SelfId, &OutMsgQueueImporter::get_proof_import, entry, std::move(blocks),
Expand Down Expand Up @@ -443,8 +431,11 @@ void OutMsgQueueImporter::got_proof(std::shared_ptr<CacheEntry> entry, std::vect

void OutMsgQueueImporter::finish_query(std::shared_ptr<CacheEntry> entry) {
FLOG(INFO) {
sb << "Done importing neighbor msg queues for shard " << entry->dst_shard.to_str() << ", " << entry->blocks.size()
<< " blocks in " << entry->timer.elapsed() << "s";
sb << "Done importing neighbor msg queues for shard " << entry->dst_shard.to_str() << " from";
for (const BlockIdExt &block : entry->blocks) {
sb << " " << block.id.to_str();
}
sb << " in " << entry->timer.elapsed() << "s";
sb << " sources{";
if (entry->from_broadcast) {
sb << " broadcast=" << entry->from_broadcast;
Expand Down Expand Up @@ -479,8 +470,13 @@ void OutMsgQueueImporter::finish_query(std::shared_ptr<CacheEntry> entry) {

bool OutMsgQueueImporter::check_timeout(std::shared_ptr<CacheEntry> entry) {
if (entry->timeout.is_in_past()) {
LOG(DEBUG) << "Aborting importing neighbor msg queues for shard " << entry->dst_shard.to_str() << ", "
<< entry->blocks.size() << " blocks: timeout";
FLOG(DEBUG) {
sb << "Aborting importing neighbor msg queues for shard " << entry->dst_shard.to_str() << " from";
for (const BlockIdExt &block : entry->blocks) {
sb << " " << block.id.to_str();
}
sb << ": timeout";
};
for (auto& p : entry->promises) {
p.first.set_error(td::Status::Error(ErrorCode::timeout, "timeout"));
}
Expand All @@ -499,8 +495,13 @@ void OutMsgQueueImporter::alarm() {
auto& promises = it->second->promises;
if (it->second->timeout.is_in_past()) {
if (!it->second->done) {
LOG(DEBUG) << "Aborting importing neighbor msg queues for shard " << it->second->dst_shard.to_str() << ", "
<< it->second->blocks.size() << " blocks: timeout";
FLOG(DEBUG) {
sb << "Aborting importing neighbor msg queues for shard " << it->second->dst_shard.to_str() << " from";
for (const BlockIdExt &block : it->second->blocks) {
sb << " " << block.id.to_str();
}
sb << ": timeout";
};
for (auto& p : promises) {
p.first.set_error(td::Status::Error(ErrorCode::timeout, "timeout"));
}
Expand Down Expand Up @@ -540,7 +541,7 @@ void OutMsgQueueImporter::alarm() {
}

void OutMsgQueueImporter::add_out_msg_queue_proof(ShardIdFull dst_shard, td::Ref<OutMsgQueueProof> proof) {
LOG(INFO) << "add out msg queue proof " << dst_shard.to_str() << proof->block_id_.to_str();
LOG(INFO) << "add out msg queue proof " << dst_shard.to_str() << " " << proof->block_id_.to_str();
auto& small_entry = small_cache_[std::make_pair(dst_shard, proof->block_id_)];
if (!small_entry.result.is_null()) {
return;
Expand All @@ -556,7 +557,13 @@ void OutMsgQueueImporter::add_out_msg_queue_proof(ShardIdFull dst_shard, td::Ref

void BuildOutMsgQueueProof::abort_query(td::Status reason) {
if (promise_) {
LOG(DEBUG) << "failed to build msg queue proof to " << dst_shard_.to_str() << ": " << reason;
FLOG(DEBUG) {
sb << "failed to build msg queue proof to " << dst_shard_.to_str() << " from";
for (const auto& block : blocks_) {
sb << " " << block.id.id.to_str();
}
sb << ": " << reason;
};
promise_.set_error(
reason.move_as_error_prefix(PSTRING() << "failed to build msg queue proof to " << dst_shard_.to_str() << ": "));
}
Expand Down
16 changes: 12 additions & 4 deletions validator/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,9 @@ void ValidatorManagerImpl::wait_neighbor_msg_queue_proofs(
if (dst_shard.is_masterchain()) {
// We spit queries for masterchain {dst_shard, {block_1, ..., block_n}} into separate queries
// {dst_shard, {block_1}}, ..., {dst_shard, {block_n}}
// Also, use cache
// Also, use cache.
// This is performed here and not in OutMsgQueueImporter because it's important to use
// cached_msg_queue_to_masterchain_, which is related to the current list of shard block descriptions
class Worker : public td::actor::Actor {
public:
Worker(size_t pending, td::Promise<std::map<BlockIdExt, td::Ref<OutMsgQueueProof>>> promise)
Expand Down Expand Up @@ -2958,12 +2960,15 @@ PublicKeyHash ValidatorManagerImpl::get_validator(ShardIdFull shard, td::Ref<Val
}

bool ValidatorManagerImpl::is_shard_collator(ShardIdFull shard) {
if (shard.is_masterchain()) {
return validating_masterchain();
}
for (auto &[_, collator_node] : collator_nodes_) {
if (collator_node.can_collate_shard(shard)) {
return true;
}
}
return false;
return is_validator() && opts_->get_collators_list()->self_collate;
}

bool ValidatorManagerImpl::Collator::can_collate_shard(ShardIdFull shard) const {
Expand Down Expand Up @@ -3524,7 +3529,7 @@ void ValidatorManagerImpl::del_collator(adnl::AdnlNodeIdShort id, ShardIdFull sh
} else {
td::actor::send_closure(it->second.actor, &CollatorNode::del_shard, shard);
}
};
}

void ValidatorManagerImpl::get_collation_manager_stats(
td::Promise<tl_object_ptr<ton_api::engine_validator_collationManagerStats>> promise) {
Expand Down Expand Up @@ -3575,15 +3580,18 @@ void ValidatorManagerImpl::get_collation_manager_stats(
}

void ValidatorManagerImpl::add_out_msg_queue_proof(ShardIdFull dst_shard, td::Ref<OutMsgQueueProof> proof) {
if (!collator_nodes_.empty()) {
if (is_shard_collator(dst_shard)) {
if (out_msg_queue_importer_.empty()) {
out_msg_queue_importer_ = td::actor::create_actor<OutMsgQueueImporter>("outmsgqueueimporter", actor_id(this),
opts_, last_masterchain_state_);
}
td::actor::send_closure(out_msg_queue_importer_, &OutMsgQueueImporter::add_out_msg_queue_proof, dst_shard,
std::move(proof));
} else {
VLOG(VALIDATOR_DEBUG) << "Dropping unneeded out msg queue proof to shard " << dst_shard.to_str();
}
}

void ValidatorManagerImpl::add_persistent_state_description(td::Ref<PersistentStateDescription> desc) {
auto now = (UnixTime)td::Clocks::system();
if (desc->end_time <= now) {
Expand Down
Loading

0 comments on commit 5d79855

Please sign in to comment.