Skip to content

Commit

Permalink
Improve CollatorNode
Browse files Browse the repository at this point in the history
* Keep track of validator groups
* Pre-generate shard blocks
  • Loading branch information
SpyCheese committed Jun 25, 2024
1 parent 3695bf0 commit 90d2edf
Show file tree
Hide file tree
Showing 12 changed files with 402 additions and 166 deletions.
2 changes: 1 addition & 1 deletion tl/generate/scheme/ton_api.tl
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,7 @@ validatorSession.newValidatorGroupStats session_id:int256 workchain:int shard:lo
self_idx:int nodes:(vector validatorSession.newValidatorGroupStats.node) = validatorSession.NewValidatorGroupStats;

---functions---
collatorNode.generateBlock workchain:int shard:long min_mc_id:tonNode.blockIdExt prev_blocks:(vector tonNode.blockIdExt)
collatorNode.generateBlock shard:tonNode.shardId cc_seqno:int prev_blocks:(vector tonNode.blockIdExt)
creator:int256 = collatorNode.GenerateBlockResult;

---types---
Expand Down
Binary file modified tl/generate/scheme/ton_api.tlo
Binary file not shown.
4 changes: 2 additions & 2 deletions validator-engine/validator-engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -575,8 +575,8 @@ td::Result<bool> Config::config_add_control_process(ton::PublicKeyHash key, td::
}

td::Result<bool> Config::config_add_shard(ton::ShardIdFull shard) {
if (!shard.is_valid_ext()) {
return td::Status::Error(PSTRING() << "invalid shard " << shard.to_str());
if (!shard.is_valid_ext() || shard.is_masterchain()) {
return td::Status::Error(PSTRING() << "invalid shard to collate " << shard.to_str());
}
if (std::find(shards_to_monitor.begin(), shards_to_monitor.end(), shard) != shards_to_monitor.end()) {
return false;
Expand Down
362 changes: 257 additions & 105 deletions validator/collator-node.cpp

Large diffs are not rendered by default.

50 changes: 27 additions & 23 deletions validator/collator-node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,46 +34,50 @@ class CollatorNode : public td::actor::Actor {
void del_shard(ShardIdFull shard);

void new_masterchain_block_notification(td::Ref<MasterchainState> state);
void update_validator_group_info(ShardIdFull shard, std::vector<BlockIdExt> prev, CatchainSeqno cc_seqno);

private:
void receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice data, td::Promise<td::BufferSlice> promise);
void receive_query_cont(ShardIdFull shard, td::Ref<MasterchainState> min_mc_state,
std::vector<BlockIdExt> prev_blocks, Ed25519_PublicKey creator,
td::Promise<BlockCandidate> promise);

bool can_collate_shard(ShardIdFull shard) const;

adnl::AdnlNodeIdShort local_id_;
td::actor::ActorId<ValidatorManager> manager_;
td::actor::ActorId<adnl::Adnl> adnl_;
td::actor::ActorId<rldp::Rldp> rldp_;
std::vector<ShardIdFull> shards_;
std::set<adnl::AdnlNodeIdShort> validators_;

BlockIdExt last_masterchain_block_{};
std::map<ShardIdFull, BlockIdExt> last_top_blocks_;
std::vector<ShardIdFull> collating_shards_;
std::set<adnl::AdnlNodeIdShort> validator_adnl_ids_;

struct CacheEntry {
bool started = false;
BlockSeqno block_seqno = 0;
td::optional<BlockCandidate> result;
td::CancellationTokenSource cancellation_token_source;
std::vector<td::Promise<BlockCandidate>> promises;

void cancel(td::Status reason);
};
struct ValidatorGroupInfo {
CatchainSeqno cc_seqno{0};
std::vector<BlockIdExt> prev;
BlockSeqno next_block_seqno{0};
std::map<std::vector<BlockIdExt>, std::shared_ptr<CacheEntry>> cache;

void cleanup();
};
std::map<std::tuple<BlockSeqno, ShardIdFull, std::vector<BlockIdExt>>, std::shared_ptr<CacheEntry>> cache_;

td::optional<BlockIdExt> get_shard_top_block(ShardIdFull shard) const {
auto it = last_top_blocks_.lower_bound(shard);
if (it != last_top_blocks_.end() && shard_intersects(it->first, shard)) {
return it->second;
}
if (it != last_top_blocks_.begin()) {
--it;
if (shard_intersects(it->first, shard)) {
return it->second;
}
}
return {};
}
struct FutureValidatorGroup {
std::vector<std::vector<BlockIdExt>> pending_blocks;
std::vector<td::Promise<td::Unit>> promises;
};
std::map<ShardIdFull, ValidatorGroupInfo> validator_groups_;
std::map<std::pair<ShardIdFull, CatchainSeqno>, FutureValidatorGroup> future_validator_groups_;

td::Ref<MasterchainState> last_masterchain_state_;

td::Result<FutureValidatorGroup*> get_future_validator_group(ShardIdFull shard, CatchainSeqno cc_seqno);

void generate_block(ShardIdFull shard, CatchainSeqno cc_seqno, std::vector<BlockIdExt> prev_blocks,
td::Timestamp timeout, td::Promise<BlockCandidate> promise);
void process_result(std::shared_ptr<CacheEntry> cache_entry, td::Result<BlockCandidate> R);

public:
Expand Down
3 changes: 2 additions & 1 deletion validator/fabric.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ void run_validate_query(ShardIdFull shard, BlockIdExt min_masterchain_block_id,
void run_collate_query(ShardIdFull shard, const BlockIdExt& min_masterchain_block_id, std::vector<BlockIdExt> prev,
Ed25519_PublicKey creator, td::Ref<ValidatorSet> validator_set,
td::actor::ActorId<ValidatorManager> manager, td::Timestamp timeout,
td::Promise<BlockCandidate> promise, unsigned mode = 0);
td::Promise<BlockCandidate> promise, td::CancellationToken cancellation_token = {},
unsigned mode = 0);
void run_collate_hardfork(ShardIdFull shard, const BlockIdExt& min_masterchain_block_id, std::vector<BlockIdExt> prev,
td::actor::ActorId<ValidatorManager> manager, td::Timestamp timeout,
td::Promise<BlockCandidate> promise);
Expand Down
9 changes: 6 additions & 3 deletions validator/impl/collator-impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ class Collator final : public td::actor::Actor {
static constexpr bool shard_splitting_enabled = true;

public:
Collator(ShardIdFull shard, bool is_hardfork, BlockIdExt min_masterchain_block_id,
std::vector<BlockIdExt> prev, Ref<ValidatorSet> validator_set, Ed25519_PublicKey collator_id,
td::actor::ActorId<ValidatorManager> manager, td::Timestamp timeout, td::Promise<BlockCandidate> promise,
Collator(ShardIdFull shard, bool is_hardfork, BlockIdExt min_masterchain_block_id, std::vector<BlockIdExt> prev,
Ref<ValidatorSet> validator_set, Ed25519_PublicKey collator_id, td::actor::ActorId<ValidatorManager> manager,
td::Timestamp timeout, td::Promise<BlockCandidate> promise, td::CancellationToken cancellation_token,
unsigned mode);
~Collator() override = default;
bool is_busy() const {
Expand Down Expand Up @@ -343,6 +343,9 @@ class Collator final : public td::actor::Actor {
void return_block_candidate(td::Result<td::Unit> saved);
bool update_last_proc_int_msg(const std::pair<ton::LogicalTime, ton::Bits256>& new_lt_hash);

td::CancellationToken cancellation_token_;
bool check_cancelled();

public:
static td::uint32 get_skip_externals_queue_size();
};
Expand Down
37 changes: 33 additions & 4 deletions validator/impl/collator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ static inline bool dbg(int c) {
Collator::Collator(ShardIdFull shard, bool is_hardfork, BlockIdExt min_masterchain_block_id,
std::vector<BlockIdExt> prev, td::Ref<ValidatorSet> validator_set, Ed25519_PublicKey collator_id,
td::actor::ActorId<ValidatorManager> manager, td::Timestamp timeout,
td::Promise<BlockCandidate> promise, unsigned mode)
td::Promise<BlockCandidate> promise, td::CancellationToken cancellation_token, unsigned mode)
: shard_(shard)
, is_hardfork_(is_hardfork)
, min_mc_block_id{min_masterchain_block_id}
Expand All @@ -92,9 +92,11 @@ Collator::Collator(ShardIdFull shard, bool is_hardfork, BlockIdExt min_mastercha
, medium_timeout_(td::Timestamp::at(timeout.at() - 1.5))
, main_promise(std::move(promise))
, mode_(mode)
, perf_timer_("collate", 0.1, [manager](double duration) {
send_closure(manager, &ValidatorManager::add_perf_timer_stat, "collate", duration);
}) {
, perf_timer_("collate", 0.1,
[manager](double duration) {
send_closure(manager, &ValidatorManager::add_perf_timer_stat, "collate", duration);
})
, cancellation_token_(std::move(cancellation_token)) {
}

/**
Expand Down Expand Up @@ -382,6 +384,9 @@ bool Collator::fatal_error(std::string err_msg, int err_code) {
*/
void Collator::check_pending() {
// LOG(DEBUG) << "pending = " << pending;
if (!check_cancelled()) {
return;
}
if (!pending) {
step = 2;
try {
Expand Down Expand Up @@ -2423,6 +2428,9 @@ bool Collator::out_msg_queue_cleanup() {
LOG(WARNING) << "cleaning up outbound queue takes too long, ending";
break;
}
if (!check_cancelled()) {
return false;
}
if (i == queue_parts.size()) {
i = 0;
}
Expand Down Expand Up @@ -3516,6 +3524,9 @@ bool Collator::process_inbound_internal_messages() {
LOG(WARNING) << "soft timeout reached, stop processing inbound internal messages";
break;
}
if (!check_cancelled()) {
return false;
}
LOG(DEBUG) << "processing inbound message with (lt,hash)=(" << kv->lt << "," << kv->key.to_hex()
<< ") from neighbor #" << kv->source;
if (verbosity > 2) {
Expand Down Expand Up @@ -3565,6 +3576,9 @@ bool Collator::process_inbound_external_messages() {
LOG(WARNING) << "medium timeout reached, stop processing inbound external messages";
break;
}
if (!check_cancelled()) {
return false;
}
auto ext_msg = ext_msg_struct.cell;
ton::Bits256 hash{ext_msg->get_hash().bits()};
int r = process_external_message(std::move(ext_msg));
Expand Down Expand Up @@ -3819,6 +3833,9 @@ bool Collator::process_new_messages(bool enqueue_only) {
LOG(INFO) << "BLOCK FULL, enqueue all remaining new messages";
enqueue_only = true;
}
if (!check_cancelled()) {
return false;
}
LOG(DEBUG) << "have message with lt=" << msg.lt;
int res = process_one_new_message(std::move(msg), enqueue_only);
if (res < 0) {
Expand Down Expand Up @@ -5360,6 +5377,18 @@ void Collator::after_get_external_messages(td::Result<std::vector<std::pair<Ref<
check_pending();
}

/**
* Checks if collation was cancelled via cancellation token
*
* @returns false if the collation was cancelled, true otherwise
*/
bool Collator::check_cancelled() {
if (cancellation_token_) {
return fatal_error(td::Status::Error(ErrorCode::cancelled, "cancelled"));
}
return true;
}

td::uint32 Collator::get_skip_externals_queue_size() {
return SKIP_EXTERNALS_QUEUE_SIZE;
}
Expand Down
7 changes: 4 additions & 3 deletions validator/impl/fabric.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ void run_validate_query(ShardIdFull shard, BlockIdExt min_masterchain_block_id,
void run_collate_query(ShardIdFull shard, const BlockIdExt& min_masterchain_block_id, std::vector<BlockIdExt> prev,
Ed25519_PublicKey creator, td::Ref<ValidatorSet> validator_set,
td::actor::ActorId<ValidatorManager> manager, td::Timestamp timeout,
td::Promise<BlockCandidate> promise, unsigned mode) {
td::Promise<BlockCandidate> promise, td::CancellationToken cancellation_token, unsigned mode) {
BlockSeqno seqno = 0;
for (auto& p : prev) {
if (p.seqno() > seqno) {
Expand All @@ -224,7 +224,8 @@ void run_collate_query(ShardIdFull shard, const BlockIdExt& min_masterchain_bloc
}
td::actor::create_actor<Collator>(PSTRING() << "collate" << shard.to_str() << ":" << (seqno + 1), shard, false,
min_masterchain_block_id, std::move(prev), std::move(validator_set), creator,
std::move(manager), timeout, std::move(promise), mode)
std::move(manager), timeout, std::move(promise), std::move(cancellation_token),
mode)
.release();
}

Expand All @@ -240,7 +241,7 @@ void run_collate_hardfork(ShardIdFull shard, const BlockIdExt& min_masterchain_b
td::actor::create_actor<Collator>(PSTRING() << "collate" << shard.to_str() << ":" << (seqno + 1), shard, true,
min_masterchain_block_id, std::move(prev), td::Ref<ValidatorSet>{},
Ed25519_PublicKey{Bits256::zero()}, std::move(manager), timeout, std::move(promise),
0)
td::CancellationToken{}, 0)
.release();
}

Expand Down
88 changes: 65 additions & 23 deletions validator/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,28 @@ void ValidatorManagerImpl::prevalidate_block(BlockBroadcast broadcast, td::Promi
promise.set_error(td::Status::Error("not monitoring shard"));
return;
}
promise = [SelfId = actor_id(this), promise = std::move(promise), block_id = broadcast.block_id,
cc_seqno = broadcast.catchain_seqno](td::Result<td::Unit> R) mutable {
if (R.is_ok()) {
td::actor::send_closure(SelfId, &ValidatorManagerImpl::validated_block_broadcast, block_id, cc_seqno);
}
promise.set_result(std::move(R));
};
td::actor::create_actor<ValidateBroadcast>("broadcast", std::move(broadcast), last_masterchain_block_handle_,
last_masterchain_state_, last_known_key_block_handle_, actor_id(this),
td::Timestamp::in(2.0), std::move(promise))
.release();
}

void ValidatorManagerImpl::validated_block_broadcast(BlockIdExt block_id, CatchainSeqno cc_seqno) {
for (auto &[_, collator_node] : collator_nodes_) {
if (collator_node.can_collate_shard(block_id.shard_full())) {
td::actor::send_closure(collator_node.actor, &CollatorNode::update_validator_group_info, block_id.shard_full(),
std::vector{block_id}, cc_seqno);
}
}
}

void ValidatorManagerImpl::sync_complete(td::Promise<td::Unit> promise) {
started_ = true;

Expand Down Expand Up @@ -471,7 +487,7 @@ void ValidatorManagerImpl::new_ihr_message(td::BufferSlice data) {
}

void ValidatorManagerImpl::new_shard_block(BlockIdExt block_id, CatchainSeqno cc_seqno, td::BufferSlice data) {
if (!is_validator()) {
if (!is_validator() && !is_shard_collator(block_id.shard_full())) {
return;
}
if (!last_masterchain_block_handle_) {
Expand Down Expand Up @@ -513,29 +529,36 @@ void ValidatorManagerImpl::new_block_candidate(BlockIdExt block_id, td::BufferSl
}

void ValidatorManagerImpl::add_shard_block_description(td::Ref<ShardTopBlockDescription> desc) {
if (desc->may_be_valid(last_masterchain_block_handle_, last_masterchain_state_)) {
auto it = shard_blocks_.find(ShardTopBlockDescriptionId{desc->shard(), desc->catchain_seqno()});
if (it != shard_blocks_.end() && desc->block_id().id.seqno <= it->second.latest_desc->block_id().id.seqno) {
VLOG(VALIDATOR_DEBUG) << "dropping duplicate shard block broadcast";
return;
}
shard_blocks_[ShardTopBlockDescriptionId{desc->block_id().shard_full(), desc->catchain_seqno()}].latest_desc = desc;
VLOG(VALIDATOR_DEBUG) << "new shard block descr for " << desc->block_id();
if (need_monitor(desc->block_id().shard_full())) {
auto P = td::PromiseCreator::lambda([](td::Result<td::Ref<ShardState>> R) {
if (R.is_error()) {
auto S = R.move_as_error();
if (S.code() != ErrorCode::timeout && S.code() != ErrorCode::notready) {
VLOG(VALIDATOR_NOTICE) << "failed to get shard state: " << S;
} else {
VLOG(VALIDATOR_DEBUG) << "failed to get shard state: " << S;
}
if (!desc->may_be_valid(last_masterchain_block_handle_, last_masterchain_state_)) {
return;
}
auto it = shard_blocks_.find(ShardTopBlockDescriptionId{desc->shard(), desc->catchain_seqno()});
if (it != shard_blocks_.end() && desc->block_id().id.seqno <= it->second.latest_desc->block_id().id.seqno) {
VLOG(VALIDATOR_DEBUG) << "dropping duplicate shard block broadcast";
return;
}
shard_blocks_[ShardTopBlockDescriptionId{desc->block_id().shard_full(), desc->catchain_seqno()}].latest_desc = desc;
VLOG(VALIDATOR_DEBUG) << "new shard block descr for " << desc->block_id();
if (need_monitor(desc->block_id().shard_full())) {
auto P = td::PromiseCreator::lambda([](td::Result<td::Ref<ShardState>> R) {
if (R.is_error()) {
auto S = R.move_as_error();
if (S.code() != ErrorCode::timeout && S.code() != ErrorCode::notready) {
VLOG(VALIDATOR_NOTICE) << "failed to get shard state: " << S;
} else {
VLOG(VALIDATOR_DEBUG) << "failed to get shard state: " << S;
}
});
wait_block_state_short(desc->block_id(), 0, td::Timestamp::in(60.0), std::move(P));
}
if (validating_masterchain()) {
preload_msg_queue_to_masterchain(desc);
}
});
wait_block_state_short(desc->block_id(), 0, td::Timestamp::in(60.0), std::move(P));
}
if (validating_masterchain()) {
preload_msg_queue_to_masterchain(desc);
}
for (auto& [_, collator_node] : collator_nodes_) {
if (collator_node.can_collate_shard(desc->shard())) {
td::actor::send_closure(collator_node.actor, &CollatorNode::update_validator_group_info, desc->shard(),
std::vector{desc->block_id()}, desc->catchain_seqno());
}
}
}
Expand Down Expand Up @@ -2863,6 +2886,21 @@ PublicKeyHash ValidatorManagerImpl::get_validator(ShardIdFull shard, td::Ref<Val
return PublicKeyHash::zero();
}

bool ValidatorManagerImpl::is_shard_collator(ShardIdFull shard) {
for (auto &[_, collator_node] : collator_nodes_) {
if (collator_node.can_collate_shard(shard)) {
return true;
}
}
return false;
}

bool ValidatorManagerImpl::Collator::can_collate_shard(ShardIdFull shard) const {
return std::any_of(shards.begin(), shards.end(),
[&](const ShardIdFull &our_shard) { return shard_intersects(shard, our_shard); });
}


void ValidatorManagerImpl::got_next_key_blocks(std::vector<BlockIdExt> r) {
if (r.size() == 0) {
delay_action([SelfId = actor_id(
Expand Down Expand Up @@ -3364,6 +3402,10 @@ void ValidatorManagerImpl::get_validator_sessions_info(
}

void ValidatorManagerImpl::add_collator(adnl::AdnlNodeIdShort id, ShardIdFull shard) {
if (shard.is_masterchain() || !shard.is_valid_ext()) {
LOG(WARNING) << "cannot collate shard " << shard.to_str();
return;
}
auto it = collator_nodes_.find(id);
if (it == collator_nodes_.end()) {
it = collator_nodes_.emplace(id, Collator()).first;
Expand Down
Loading

0 comments on commit 90d2edf

Please sign in to comment.