Skip to content

Commit

Permalink
Block broadcasts in custom overlays (#986)
Browse files Browse the repository at this point in the history
Co-authored-by: SpyCheese <mikle98@yandex.ru>
  • Loading branch information
EmelyanenkoK and SpyCheese authored May 10, 2024
1 parent 6fb2019 commit d5c0993
Show file tree
Hide file tree
Showing 21 changed files with 233 additions and 133 deletions.
2 changes: 1 addition & 1 deletion create-hardfork/create-hardfork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ class HardforkCreator : public td::actor::Actor {
}
void send_shard_block_info(ton::BlockIdExt block_id, ton::CatchainSeqno cc_seqno, td::BufferSlice data) override {
}
void send_broadcast(ton::BlockBroadcast broadcast) override {
void send_broadcast(ton::BlockBroadcast broadcast, bool custom_overlays_only) override {
}
void download_block(ton::BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout,
td::Promise<ton::ReceivedBlock> promise) override {
Expand Down
2 changes: 1 addition & 1 deletion test/test-ton-collator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ class TestNode : public td::actor::Actor {
}
}
}
void send_broadcast(ton::BlockBroadcast broadcast) override {
void send_broadcast(ton::BlockBroadcast broadcast, bool custom_overlays_only) override {
}
void download_block(ton::BlockIdExt block_id, td::uint32 priority, td::Timestamp timeout,
td::Promise<ton::ReceivedBlock> promise) override {
Expand Down
2 changes: 1 addition & 1 deletion tl/generate/scheme/ton_api.tl
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ engine.validator.config out_port:int addrs:(vector engine.Addr) adnl:(vector eng
liteservers:(vector engine.liteServer) control:(vector engine.controlInterface)
gc:engine.gc = engine.validator.Config;

engine.validator.customOverlayNode adnl_id:int256 msg_sender:Bool msg_sender_priority:int = engine.validator.CustomOverlayNode;
engine.validator.customOverlayNode adnl_id:int256 msg_sender:Bool msg_sender_priority:int block_sender:Bool = engine.validator.CustomOverlayNode;
engine.validator.customOverlay name:string nodes:(vector engine.validator.customOverlayNode) = engine.validator.CustomOverlay;
engine.validator.customOverlaysConfig overlays:(vector engine.validator.customOverlay) = engine.validator.CustomOverlaysConfig;

Expand Down
Binary file modified tl/generate/scheme/ton_api.tlo
Binary file not shown.
7 changes: 4 additions & 3 deletions validator-engine-console/validator-engine-console-query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1171,9 +1171,10 @@ td::Status ShowCustomOverlaysQuery::receive(td::BufferSlice data) {
td::TerminalIO::out() << "Overlay \"" << overlay->name_ << "\": " << overlay->nodes_.size() << " nodes\n";
for (const auto &node : overlay->nodes_) {
td::TerminalIO::out() << " " << node->adnl_id_
<< (node->msg_sender_ ? (PSTRING() << " (sender, p=" << node->msg_sender_priority_ << ")")
: "")
<< "\n";
<< (node->msg_sender_
? (PSTRING() << " (msg sender, p=" << node->msg_sender_priority_ << ")")
: "")
<< (node->block_sender_ ? " (block sender)" : "") << "\n";
}
td::TerminalIO::out() << "\n";
}
Expand Down
22 changes: 7 additions & 15 deletions validator-engine/validator-engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2357,16 +2357,9 @@ void ValidatorEngine::load_custom_overlays_config() {
}

for (auto &overlay : custom_overlays_config_->overlays_) {
std::vector<ton::adnl::AdnlNodeIdShort> nodes;
std::map<ton::adnl::AdnlNodeIdShort, int> senders;
for (const auto &node : overlay->nodes_) {
nodes.emplace_back(node->adnl_id_);
if (node->msg_sender_) {
senders[ton::adnl::AdnlNodeIdShort{node->adnl_id_}] = node->msg_sender_priority_;
}
}
td::actor::send_closure(full_node_, &ton::validator::fullnode::FullNode::add_ext_msg_overlay, std::move(nodes),
std::move(senders), overlay->name_, [](td::Result<td::Unit> R) { R.ensure(); });
td::actor::send_closure(full_node_, &ton::validator::fullnode::FullNode::add_custom_overlay,
ton::validator::fullnode::CustomOverlayParams::fetch(*overlay),
[](td::Result<td::Unit> R) { R.ensure(); });
}
}

Expand Down Expand Up @@ -3571,11 +3564,10 @@ void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_addCustom
senders[ton::adnl::AdnlNodeIdShort{node->adnl_id_}] = node->msg_sender_priority_;
}
}
std::string name = overlay->name_;
auto params = ton::validator::fullnode::CustomOverlayParams::fetch(*query.overlay_);
td::actor::send_closure(
full_node_, &ton::validator::fullnode::FullNode::add_ext_msg_overlay, std::move(nodes), std::move(senders),
std::move(name),
[SelfId = actor_id(this), overlay = std::move(overlay),
full_node_, &ton::validator::fullnode::FullNode::add_custom_overlay, std::move(params),
[SelfId = actor_id(this), overlay = std::move(query.overlay_),
promise = std::move(promise)](td::Result<td::Unit> R) mutable {
if (R.is_error()) {
promise.set_value(create_control_query_error(R.move_as_error()));
Expand Down Expand Up @@ -3605,7 +3597,7 @@ void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_delCustom
return;
}
td::actor::send_closure(
full_node_, &ton::validator::fullnode::FullNode::del_ext_msg_overlay, query.name_,
full_node_, &ton::validator::fullnode::FullNode::del_custom_overlay, query.name_,
[SelfId = actor_id(this), name = query.name_, promise = std::move(promise)](td::Result<td::Unit> R) mutable {
if (R.is_error()) {
promise.set_value(create_control_query_error(R.move_as_error()));
Expand Down
77 changes: 59 additions & 18 deletions validator/full-node-private-overlay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,7 @@ void FullNodePrivateBlockOverlay::process_block_broadcast(PublicKeyHash src, ton
}
VLOG(FULL_NODE_DEBUG) << "Received block broadcast in private overlay from " << src << ": "
<< B.ok().block_id.to_str();
auto P = td::PromiseCreator::lambda([](td::Result<td::Unit> R) {
if (R.is_error()) {
if (R.error().code() == ErrorCode::notready) {
LOG(DEBUG) << "dropped broadcast: " << R.move_as_error();
} else {
LOG(INFO) << "dropped broadcast: " << R.move_as_error();
}
}
});
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::prevalidate_block, B.move_as_ok(),
std::move(P));
td::actor::send_closure(full_node_, &FullNode::process_block_broadcast, B.move_as_ok());
}

void FullNodePrivateBlockOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast &query) {
Expand All @@ -60,6 +50,9 @@ void FullNodePrivateBlockOverlay::process_broadcast(PublicKeyHash src, ton_api::
}

void FullNodePrivateBlockOverlay::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) {
if (adnl::AdnlNodeIdShort{src} == local_id_) {
return;
}
auto B = fetch_tl_object<ton_api::tonNode_Broadcast>(std::move(broadcast), true);
if (B.is_error()) {
return;
Expand Down Expand Up @@ -169,18 +162,47 @@ void FullNodePrivateBlockOverlay::tear_down() {
}
}

void FullNodeCustomOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcast &query) {
process_block_broadcast(src, query);
}

void FullNodeCustomOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcastCompressed &query) {
process_block_broadcast(src, query);
}

void FullNodeCustomOverlay::process_block_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query) {
if (!block_senders_.count(adnl::AdnlNodeIdShort(src))) {
VLOG(FULL_NODE_DEBUG) << "Dropping block broadcast in private overlay \"" << name_ << "\" from unauthorized sender "
<< src;
return;
}
auto B = deserialize_block_broadcast(query, overlay::Overlays::max_fec_broadcast_size());
if (B.is_error()) {
LOG(DEBUG) << "dropped broadcast: " << B.move_as_error();
return;
}
VLOG(FULL_NODE_DEBUG) << "Received block broadcast in custom overlay \"" << name_ << "\" from " << src << ": "
<< B.ok().block_id.to_str();
td::actor::send_closure(full_node_, &FullNode::process_block_broadcast, B.move_as_ok());
}

void FullNodeCustomOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_externalMessageBroadcast &query) {
auto it = senders_.find(adnl::AdnlNodeIdShort{src});
if (it == senders_.end()) {
auto it = msg_senders_.find(adnl::AdnlNodeIdShort{src});
if (it == msg_senders_.end()) {
VLOG(FULL_NODE_DEBUG) << "Dropping external message broadcast in custom overlay \"" << name_
<< "\" from unauthorized sender " << src;
return;
}
LOG(FULL_NODE_DEBUG) << "Got external message in private overlay \"" << name_ << "\" from " << src
<< " (priority=" << it->second << ")";
VLOG(FULL_NODE_DEBUG) << "Got external message in custom overlay \"" << name_ << "\" from " << src
<< " (priority=" << it->second << ")";
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::new_external_message,
std::move(query.message_->data_), it->second);
}

void FullNodeCustomOverlay::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) {
if (adnl::AdnlNodeIdShort{src} == local_id_) {
return;
}
auto B = fetch_tl_object<ton_api::tonNode_Broadcast>(std::move(broadcast), true);
if (B.is_error()) {
return;
Expand All @@ -192,7 +214,7 @@ void FullNodeCustomOverlay::send_external_message(td::BufferSlice data) {
if (!inited_ || config_.ext_messages_broadcast_disabled_) {
return;
}
LOG(FULL_NODE_DEBUG) << "Sending external message to private overlay \"" << name_ << "\"";
VLOG(FULL_NODE_DEBUG) << "Sending external message to custom overlay \"" << name_ << "\"";
auto B = create_serialize_tl_object<ton_api::tonNode_externalMessageBroadcast>(
create_tl_object<ton_api::tonNode_externalMessage>(std::move(data)));
if (B.size() <= overlay::Overlays::max_simple_broadcast_size()) {
Expand All @@ -204,6 +226,21 @@ void FullNodeCustomOverlay::send_external_message(td::BufferSlice data) {
}
}

void FullNodeCustomOverlay::send_broadcast(BlockBroadcast broadcast) {
if (!inited_) {
return;
}
VLOG(FULL_NODE_DEBUG) << "Sending block broadcast to custom overlay \"" << name_
<< "\": " << broadcast.block_id.to_str();
auto B = serialize_block_broadcast(broadcast, true); // compression_enabled = true
if (B.is_error()) {
VLOG(FULL_NODE_WARNING) << "failed to serialize block broadcast: " << B.move_as_error();
return;
}
td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, local_id_, overlay_id_,
local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), B.move_as_ok());
}

void FullNodeCustomOverlay::start_up() {
std::sort(nodes_.begin(), nodes_.end());
nodes_.erase(std::unique(nodes_.begin(), nodes_.end()), nodes_.end());
Expand Down Expand Up @@ -234,7 +271,8 @@ void FullNodeCustomOverlay::try_init() {

void FullNodeCustomOverlay::init() {
LOG(FULL_NODE_WARNING) << "Creating custom overlay \"" << name_ << "\" for adnl id " << local_id_ << " : "
<< nodes_.size() << " nodes, overlay_id=" << overlay_id_;
<< nodes_.size() << " nodes, " << msg_senders_.size() << " msg senders, "
<< block_senders_.size() << " block senders, overlay_id=" << overlay_id_;
class Callback : public overlay::Overlays::Callback {
public:
void receive_message(adnl::AdnlNodeIdShort src, overlay::OverlayIdShort overlay_id, td::BufferSlice data) override {
Expand All @@ -256,9 +294,12 @@ void FullNodeCustomOverlay::init() {
};

std::map<PublicKeyHash, td::uint32> authorized_keys;
for (const auto &sender : senders_) {
for (const auto &sender : msg_senders_) {
authorized_keys[sender.first.pubkey_hash()] = overlay::Overlays::max_fec_broadcast_size();
}
for (const auto &sender : block_senders_) {
authorized_keys[sender.pubkey_hash()] = overlay::Overlays::max_fec_broadcast_size();
}
overlay::OverlayPrivacyRules rules{overlay::Overlays::max_fec_broadcast_size(), 0, std::move(authorized_keys)};
td::actor::send_closure(
overlays_, &overlay::Overlays::create_private_overlay, local_id_, overlay_id_full_.clone(), nodes_,
Expand Down
34 changes: 23 additions & 11 deletions validator/full-node-private-overlay.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ class FullNodePrivateBlockOverlay : public td::actor::Actor {
td::actor::ActorId<keyring::Keyring> keyring, td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<rldp::Rldp> rldp, td::actor::ActorId<rldp2::Rldp> rldp2,
td::actor::ActorId<overlay::Overlays> overlays,
td::actor::ActorId<ValidatorManagerInterface> validator_manager)
td::actor::ActorId<ValidatorManagerInterface> validator_manager,
td::actor::ActorId<FullNode> full_node)
: local_id_(local_id)
, nodes_(std::move(nodes))
, zero_state_file_hash_(zero_state_file_hash)
Expand All @@ -63,7 +64,8 @@ class FullNodePrivateBlockOverlay : public td::actor::Actor {
, rldp_(rldp)
, rldp2_(rldp2)
, overlays_(overlays)
, validator_manager_(validator_manager) {
, validator_manager_(validator_manager)
, full_node_(full_node) {
}

private:
Expand All @@ -79,6 +81,7 @@ class FullNodePrivateBlockOverlay : public td::actor::Actor {
td::actor::ActorId<rldp2::Rldp> rldp2_;
td::actor::ActorId<overlay::Overlays> overlays_;
td::actor::ActorId<ValidatorManagerInterface> validator_manager_;
td::actor::ActorId<FullNode> full_node_;

bool inited_ = false;
overlay::OverlayIdFull overlay_id_full_;
Expand All @@ -90,6 +93,10 @@ class FullNodePrivateBlockOverlay : public td::actor::Actor {

class FullNodeCustomOverlay : public td::actor::Actor {
public:
void process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcast &query);
void process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcastCompressed &query);
void process_block_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query);

void process_broadcast(PublicKeyHash src, ton_api::tonNode_externalMessageBroadcast &query);
template <class T>
void process_broadcast(PublicKeyHash, T &) {
Expand All @@ -98,6 +105,7 @@ class FullNodeCustomOverlay : public td::actor::Actor {
void receive_broadcast(PublicKeyHash src, td::BufferSlice query);

void send_external_message(td::BufferSlice data);
void send_broadcast(BlockBroadcast broadcast);

void set_config(FullNodeConfig config) {
config_ = std::move(config);
Expand All @@ -106,31 +114,34 @@ class FullNodeCustomOverlay : public td::actor::Actor {
void start_up() override;
void tear_down() override;

FullNodeCustomOverlay(adnl::AdnlNodeIdShort local_id, std::vector<adnl::AdnlNodeIdShort> nodes,
std::map<adnl::AdnlNodeIdShort, int> senders, std::string name, FileHash zero_state_file_hash,
FullNodeCustomOverlay(adnl::AdnlNodeIdShort local_id, CustomOverlayParams params, FileHash zero_state_file_hash,
FullNodeConfig config, td::actor::ActorId<keyring::Keyring> keyring,
td::actor::ActorId<adnl::Adnl> adnl, td::actor::ActorId<rldp::Rldp> rldp,
td::actor::ActorId<rldp2::Rldp> rldp2, td::actor::ActorId<overlay::Overlays> overlays,
td::actor::ActorId<ValidatorManagerInterface> validator_manager)
td::actor::ActorId<ValidatorManagerInterface> validator_manager,
td::actor::ActorId<FullNode> full_node)
: local_id_(local_id)
, nodes_(std::move(nodes))
, senders_(std::move(senders))
, name_(std::move(name))
, name_(std::move(params.name_))
, nodes_(std::move(params.nodes_))
, msg_senders_(std::move(params.msg_senders_))
, block_senders_(std::move(params.block_senders_))
, zero_state_file_hash_(zero_state_file_hash)
, config_(config)
, keyring_(keyring)
, adnl_(adnl)
, rldp_(rldp)
, rldp2_(rldp2)
, overlays_(overlays)
, validator_manager_(validator_manager) {
, validator_manager_(validator_manager)
, full_node_(full_node) {
}

private:
adnl::AdnlNodeIdShort local_id_;
std::vector<adnl::AdnlNodeIdShort> nodes_;
std::map<adnl::AdnlNodeIdShort, int> senders_;
std::string name_;
std::vector<adnl::AdnlNodeIdShort> nodes_;
std::map<adnl::AdnlNodeIdShort, int> msg_senders_;
std::set<adnl::AdnlNodeIdShort> block_senders_;
FileHash zero_state_file_hash_;
FullNodeConfig config_;

Expand All @@ -140,6 +151,7 @@ class FullNodeCustomOverlay : public td::actor::Actor {
td::actor::ActorId<rldp2::Rldp> rldp2_;
td::actor::ActorId<overlay::Overlays> overlays_;
td::actor::ActorId<ValidatorManagerInterface> validator_manager_;
td::actor::ActorId<FullNode> full_node_;

bool inited_ = false;
overlay::OverlayIdFull overlay_id_full_;
Expand Down
21 changes: 7 additions & 14 deletions validator/full-node-shard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -661,17 +661,7 @@ void FullNodeShardImpl::process_block_broadcast(PublicKeyHash src, ton_api::tonN
return;
}
VLOG(FULL_NODE_DEBUG) << "Received block broadcast from " << src << ": " << B.ok().block_id.to_str();
auto P = td::PromiseCreator::lambda([](td::Result<td::Unit> R) {
if (R.is_error()) {
if (R.error().code() == ErrorCode::notready) {
LOG(DEBUG) << "dropped broadcast: " << R.move_as_error();
} else {
LOG(INFO) << "dropped broadcast: " << R.move_as_error();
}
}
});
td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::prevalidate_block, B.move_as_ok(),
std::move(P));
td::actor::send_closure(full_node_, &FullNode::process_block_broadcast, B.move_as_ok());
}

void FullNodeShardImpl::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) {
Expand Down Expand Up @@ -1137,7 +1127,8 @@ FullNodeShardImpl::FullNodeShardImpl(ShardIdFull shard, PublicKeyHash local_id,
td::actor::ActorId<rldp::Rldp> rldp, td::actor::ActorId<rldp2::Rldp> rldp2,
td::actor::ActorId<overlay::Overlays> overlays,
td::actor::ActorId<ValidatorManagerInterface> validator_manager,
td::actor::ActorId<adnl::AdnlExtClient> client)
td::actor::ActorId<adnl::AdnlExtClient> client,
td::actor::ActorId<FullNode> full_node)
: shard_(shard)
, local_id_(local_id)
, adnl_id_(adnl_id)
Expand All @@ -1149,6 +1140,7 @@ FullNodeShardImpl::FullNodeShardImpl(ShardIdFull shard, PublicKeyHash local_id,
, overlays_(overlays)
, validator_manager_(validator_manager)
, client_(client)
, full_node_(full_node)
, config_(config) {
}

Expand All @@ -1157,9 +1149,10 @@ td::actor::ActorOwn<FullNodeShard> FullNodeShard::create(
FullNodeConfig config, td::actor::ActorId<keyring::Keyring> keyring, td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<rldp::Rldp> rldp, td::actor::ActorId<rldp2::Rldp> rldp2,
td::actor::ActorId<overlay::Overlays> overlays, td::actor::ActorId<ValidatorManagerInterface> validator_manager,
td::actor::ActorId<adnl::AdnlExtClient> client) {
td::actor::ActorId<adnl::AdnlExtClient> client, td::actor::ActorId<FullNode> full_node) {
return td::actor::create_actor<FullNodeShardImpl>("tonnode", shard, local_id, adnl_id, zero_state_file_hash, config,
keyring, adnl, rldp, rldp2, overlays, validator_manager, client);
keyring, adnl, rldp, rldp2, overlays, validator_manager, client,
full_node);
}

} // namespace fullnode
Expand Down
2 changes: 1 addition & 1 deletion validator/full-node-shard.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class FullNodeShard : public td::actor::Actor {
FullNodeConfig config, td::actor::ActorId<keyring::Keyring> keyring, td::actor::ActorId<adnl::Adnl> adnl,
td::actor::ActorId<rldp::Rldp> rldp, td::actor::ActorId<rldp2::Rldp> rldp2,
td::actor::ActorId<overlay::Overlays> overlays, td::actor::ActorId<ValidatorManagerInterface> validator_manager,
td::actor::ActorId<adnl::AdnlExtClient> client);
td::actor::ActorId<adnl::AdnlExtClient> client, td::actor::ActorId<FullNode> full_node);
};

} // namespace fullnode
Expand Down
Loading

0 comments on commit d5c0993

Please sign in to comment.