Skip to content

Commit

Permalink
Compress block candidate broadcasts (#1007)
Browse files Browse the repository at this point in the history
Co-authored-by: SpyCheese <mikle98@yandex.ru>
  • Loading branch information
EmelyanenkoK and SpyCheese authored May 29, 2024
1 parent ceefac7 commit 8a4d44d
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 29 deletions.
2 changes: 2 additions & 0 deletions tl/generate/scheme/ton_api.tl
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,8 @@ tonNode.newShardBlockBroadcast block:tonNode.newShardBlock = tonNode.Broadcast;
// signature may be empty, at least for now
tonNode.newBlockCandidateBroadcast id:tonNode.blockIdExt catchain_seqno:int validator_set_hash:int
collator_signature:tonNode.blockSignature data:bytes = tonNode.Broadcast;
tonNode.newBlockCandidateBroadcastCompressed id:tonNode.blockIdExt catchain_seqno:int validator_set_hash:int
collator_signature:tonNode.blockSignature flags:# compressed:bytes = tonNode.Broadcast;

tonNode.shardPublicOverlayId workchain:int shard:long zero_state_file_hash:int256 = tonNode.ShardPublicOverlayId;

Expand Down
Binary file modified tl/generate/scheme/ton_api.tlo
Binary file not shown.
80 changes: 61 additions & 19 deletions validator/full-node-private-overlay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,37 @@ void FullNodePrivateBlockOverlay::process_broadcast(PublicKeyHash src, ton_api::

void FullNodePrivateBlockOverlay::process_broadcast(PublicKeyHash src,
ton_api::tonNode_newBlockCandidateBroadcast &query) {
if (query.data_.size() > FullNode::max_block_size()) {
process_block_candidate_broadcast(src, query);
}

void FullNodePrivateBlockOverlay::process_broadcast(PublicKeyHash src,
ton_api::tonNode_newBlockCandidateBroadcastCompressed &query) {
process_block_candidate_broadcast(src, query);
}

void FullNodePrivateBlockOverlay::process_block_candidate_broadcast(PublicKeyHash src,
ton_api::tonNode_Broadcast &query) {
BlockIdExt block_id;
CatchainSeqno cc_seqno;
td::uint32 validator_set_hash;
td::BufferSlice data;
auto S = deserialize_block_candidate_broadcast(query, block_id, cc_seqno, validator_set_hash, data,
overlay::Overlays::max_fec_broadcast_size());
if (S.is_error()) {
LOG(DEBUG) << "dropped broadcast: " << S;
return;
}
if (data.size() > FullNode::max_block_size()) {
VLOG(FULL_NODE_WARNING) << "received block candidate with too big size from " << src;
return;
}
BlockIdExt block_id = create_block_id(query.id_);
if (td::sha256_bits256(query.data_.as_slice()) != block_id.file_hash) {
if (td::sha256_bits256(data.as_slice()) != block_id.file_hash) {
VLOG(FULL_NODE_WARNING) << "received block candidate with incorrect file hash from " << src;
return;
}
VLOG(FULL_NODE_DEBUG) << "Received newBlockCandidate in private overlay from " << src << ": " << block_id.to_str();
td::actor::send_closure(full_node_, &FullNode::process_block_candidate_broadcast, block_id, query.catchain_seqno_,
query.validator_set_hash_, std::move(query.data_));
td::actor::send_closure(full_node_, &FullNode::process_block_candidate_broadcast, block_id, cc_seqno,
validator_set_hash, std::move(data));
}

void FullNodePrivateBlockOverlay::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) {
Expand Down Expand Up @@ -99,12 +118,15 @@ void FullNodePrivateBlockOverlay::send_block_candidate(BlockIdExt block_id, Catc
if (!inited_) {
return;
}
auto B =
serialize_block_candidate_broadcast(block_id, cc_seqno, validator_set_hash, data, true); // compression enabled
if (B.is_error()) {
VLOG(FULL_NODE_WARNING) << "failed to serialize block candidate broadcast: " << B.move_as_error();
return;
}
VLOG(FULL_NODE_DEBUG) << "Sending newBlockCandidate in private overlay: " << block_id.to_str();
auto B = create_serialize_tl_object<ton_api::tonNode_newBlockCandidateBroadcast>(
create_tl_block_id(block_id), cc_seqno, validator_set_hash,
create_tl_object<ton_api::tonNode_blockSignature>(Bits256::zero(), td::BufferSlice()), std::move(data));
td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, local_id_, overlay_id_,
local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), std::move(B));
local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), B.move_as_ok());
}

void FullNodePrivateBlockOverlay::send_broadcast(BlockBroadcast broadcast) {
Expand Down Expand Up @@ -230,25 +252,42 @@ void FullNodeCustomOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNod
}

void FullNodeCustomOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcast &query) {
process_block_candidate_broadcast(src, query);
}

void FullNodeCustomOverlay::process_broadcast(PublicKeyHash src,
ton_api::tonNode_newBlockCandidateBroadcastCompressed &query) {
process_block_candidate_broadcast(src, query);
}

void FullNodeCustomOverlay::process_block_candidate_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query) {
if (!block_senders_.count(adnl::AdnlNodeIdShort(src))) {
VLOG(FULL_NODE_DEBUG) << "Dropping block candidate broadcast in private overlay \"" << name_
<< "\" from unauthorized sender " << src;
return;
}
if (query.data_.size() > FullNode::max_block_size()) {
BlockIdExt block_id;
CatchainSeqno cc_seqno;
td::uint32 validator_set_hash;
td::BufferSlice data;
auto S = deserialize_block_candidate_broadcast(query, block_id, cc_seqno, validator_set_hash, data,
overlay::Overlays::max_fec_broadcast_size());
if (S.is_error()) {
LOG(DEBUG) << "dropped broadcast: " << S;
return;
}
if (data.size() > FullNode::max_block_size()) {
VLOG(FULL_NODE_WARNING) << "received block candidate with too big size from " << src;
return;
}
BlockIdExt block_id = create_block_id(query.id_);
if (td::sha256_bits256(query.data_.as_slice()) != block_id.file_hash) {
if (td::sha256_bits256(data.as_slice()) != block_id.file_hash) {
VLOG(FULL_NODE_WARNING) << "received block candidate with incorrect file hash from " << src;
return;
}
// ignore cc_seqno and validator_hash for now
VLOG(FULL_NODE_DEBUG) << "Received newBlockCandidate in custom overlay \"" << name_ << "\" from " << src << ": "
<< block_id.to_str();
td::actor::send_closure(full_node_, &FullNode::process_block_candidate_broadcast, block_id, query.catchain_seqno_,
query.validator_set_hash_, std::move(query.data_));
td::actor::send_closure(full_node_, &FullNode::process_block_candidate_broadcast, block_id, cc_seqno,
validator_set_hash, std::move(data));
}

void FullNodeCustomOverlay::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) {
Expand Down Expand Up @@ -298,12 +337,15 @@ void FullNodeCustomOverlay::send_block_candidate(BlockIdExt block_id, CatchainSe
if (!inited_) {
return;
}
auto B =
serialize_block_candidate_broadcast(block_id, cc_seqno, validator_set_hash, data, true); // compression enabled
if (B.is_error()) {
VLOG(FULL_NODE_WARNING) << "failed to serialize block candidate broadcast: " << B.move_as_error();
return;
}
VLOG(FULL_NODE_DEBUG) << "Sending newBlockCandidate in custom overlay \"" << name_ << "\": " << block_id.to_str();
auto B = create_serialize_tl_object<ton_api::tonNode_newBlockCandidateBroadcast>(
create_tl_block_id(block_id), cc_seqno, validator_set_hash,
create_tl_object<ton_api::tonNode_blockSignature>(Bits256::zero(), td::BufferSlice()), std::move(data));
td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, local_id_, overlay_id_,
local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), std::move(B));
local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), B.move_as_ok());
}

void FullNodeCustomOverlay::start_up() {
Expand Down
8 changes: 8 additions & 0 deletions validator/full-node-private-overlay.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ class FullNodePrivateBlockOverlay : public td::actor::Actor {
void process_block_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query);

void process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast &query);

void process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcast &query);
void process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcastCompressed &query);
void process_block_candidate_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query);

template <class T>
void process_broadcast(PublicKeyHash, T &) {
VLOG(FULL_NODE_WARNING) << "dropping unknown broadcast";
Expand Down Expand Up @@ -101,7 +105,11 @@ class FullNodeCustomOverlay : public td::actor::Actor {
void process_block_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query);

void process_broadcast(PublicKeyHash src, ton_api::tonNode_externalMessageBroadcast &query);

void process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcast &query);
void process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcastCompressed &query);
void process_block_candidate_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query);

template <class T>
void process_broadcast(PublicKeyHash, T &) {
VLOG(FULL_NODE_WARNING) << "dropping unknown broadcast";
Expand Down
59 changes: 59 additions & 0 deletions validator/full-node-serializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,63 @@ td::Status deserialize_block_full(ton_api::tonNode_DataFull& obj, BlockIdExt& id
return S;
}

td::Result<td::BufferSlice> serialize_block_candidate_broadcast(BlockIdExt block_id, CatchainSeqno cc_seqno,
td::uint32 validator_set_hash, td::Slice data,
bool compression_enabled) {
if (!compression_enabled) {
return create_serialize_tl_object<ton_api::tonNode_newBlockCandidateBroadcast>(
create_tl_block_id(block_id), cc_seqno, validator_set_hash,
create_tl_object<ton_api::tonNode_blockSignature>(Bits256::zero(), td::BufferSlice()), td::BufferSlice(data));
}
TRY_RESULT(root, vm::std_boc_deserialize(data));
TRY_RESULT(data_new, vm::std_boc_serialize(root, 2));
td::BufferSlice compressed = td::lz4_compress(data_new);
VLOG(FULL_NODE_DEBUG) << "Compressing block candidate broadcast: " << data.size() << " -> " << compressed.size();
return create_serialize_tl_object<ton_api::tonNode_newBlockCandidateBroadcastCompressed>(
create_tl_block_id(block_id), cc_seqno, validator_set_hash,
create_tl_object<ton_api::tonNode_blockSignature>(Bits256::zero(), td::BufferSlice()), 0, std::move(compressed));
}

static td::Status deserialize_block_candidate_broadcast(ton_api::tonNode_newBlockCandidateBroadcast& obj,
BlockIdExt& block_id, CatchainSeqno& cc_seqno,
td::uint32& validator_set_hash, td::BufferSlice& data) {
block_id = create_block_id(obj.id_);
cc_seqno = obj.catchain_seqno_;
validator_set_hash = obj.validator_set_hash_;
data = std::move(obj.data_);
return td::Status::OK();
}

static td::Status deserialize_block_candidate_broadcast(ton_api::tonNode_newBlockCandidateBroadcastCompressed& obj,
BlockIdExt& block_id, CatchainSeqno& cc_seqno,
td::uint32& validator_set_hash, td::BufferSlice& data,
int max_decompressed_data_size) {
block_id = create_block_id(obj.id_);
cc_seqno = obj.catchain_seqno_;
validator_set_hash = obj.validator_set_hash_;
TRY_RESULT(decompressed, td::lz4_decompress(obj.compressed_, max_decompressed_data_size));
TRY_RESULT(root, vm::std_boc_deserialize(decompressed));
TRY_RESULT_ASSIGN(data, vm::std_boc_serialize(root, 31));
VLOG(FULL_NODE_DEBUG) << "Decompressing block candidate broadcast: " << obj.compressed_.size() << " -> "
<< data.size();
return td::Status::OK();
}

td::Status deserialize_block_candidate_broadcast(ton_api::tonNode_Broadcast& obj, BlockIdExt& block_id,
CatchainSeqno& cc_seqno, td::uint32& validator_set_hash,
td::BufferSlice& data, int max_decompressed_data_size) {
td::Status S;
ton_api::downcast_call(obj, td::overloaded(
[&](ton_api::tonNode_newBlockCandidateBroadcast& f) {
S = deserialize_block_candidate_broadcast(f, block_id, cc_seqno, validator_set_hash,
data);
},
[&](ton_api::tonNode_newBlockCandidateBroadcastCompressed& f) {
S = deserialize_block_candidate_broadcast(f, block_id, cc_seqno, validator_set_hash,
data, max_decompressed_data_size);
},
[&](auto&) { S = td::Status::Error("unknown data type"); }));
return S;
}

} // namespace ton::validator::fullnode
7 changes: 7 additions & 0 deletions validator/full-node-serializer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,11 @@ td::Result<td::BufferSlice> serialize_block_full(const BlockIdExt& id, td::Slice
td::Status deserialize_block_full(ton_api::tonNode_DataFull& obj, BlockIdExt& id, td::BufferSlice& proof,
td::BufferSlice& data, bool& is_proof_link, int max_decompressed_data_size);

td::Result<td::BufferSlice> serialize_block_candidate_broadcast(BlockIdExt block_id, CatchainSeqno cc_seqno,
td::uint32 validator_set_hash, td::Slice data,
bool compression_enabled);
td::Status deserialize_block_candidate_broadcast(ton_api::tonNode_Broadcast& obj, BlockIdExt& block_id,
CatchainSeqno& cc_seqno, td::uint32& validator_set_hash,
td::BufferSlice& data, int max_decompressed_data_size);

} // namespace ton::validator::fullnode
36 changes: 26 additions & 10 deletions validator/full-node-shard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -649,19 +649,32 @@ void FullNodeShardImpl::process_broadcast(PublicKeyHash src, ton_api::tonNode_ne
}

void FullNodeShardImpl::process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcast &query) {
if (query.data_.size() > FullNode::max_block_size()) {
process_block_candidate_broadcast(src, query);
}

void FullNodeShardImpl::process_broadcast(PublicKeyHash src,
ton_api::tonNode_newBlockCandidateBroadcastCompressed &query) {
process_block_candidate_broadcast(src, query);
}

void FullNodeShardImpl::process_block_candidate_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query) {
BlockIdExt block_id;
CatchainSeqno cc_seqno;
td::uint32 validator_set_hash;
td::BufferSlice data;
auto S = deserialize_block_candidate_broadcast(query, block_id, cc_seqno, validator_set_hash, data,
overlay::Overlays::max_fec_broadcast_size());
if (data.size() > FullNode::max_block_size()) {
VLOG(FULL_NODE_WARNING) << "received block candidate with too big size from " << src;
return;
}
BlockIdExt block_id = create_block_id(query.id_);
if (td::sha256_bits256(query.data_.as_slice()) != block_id.file_hash) {
if (td::sha256_bits256(data.as_slice()) != block_id.file_hash) {
VLOG(FULL_NODE_WARNING) << "received block candidate with incorrect file hash from " << src;
return;
}
// ignore cc_seqno and validator_hash for now
VLOG(FULL_NODE_DEBUG) << "Received newBlockCandidate from " << src << ": " << block_id.to_str();
td::actor::send_closure(full_node_, &FullNode::process_block_candidate_broadcast, block_id, query.catchain_seqno_,
query.validator_set_hash_, std::move(query.data_));
td::actor::send_closure(full_node_, &FullNode::process_block_candidate_broadcast, block_id, cc_seqno,
validator_set_hash, std::move(data));
}

void FullNodeShardImpl::process_broadcast(PublicKeyHash src, ton_api::tonNode_blockBroadcast &query) {
Expand Down Expand Up @@ -762,12 +775,15 @@ void FullNodeShardImpl::send_block_candidate(BlockIdExt block_id, CatchainSeqno
UNREACHABLE();
return;
}
auto B =
serialize_block_candidate_broadcast(block_id, cc_seqno, validator_set_hash, data, true); // compression enabled
if (B.is_error()) {
VLOG(FULL_NODE_WARNING) << "failed to serialize block candidate broadcast: " << B.move_as_error();
return;
}
VLOG(FULL_NODE_DEBUG) << "Sending newBlockCandidate: " << block_id.to_str();
auto B = create_serialize_tl_object<ton_api::tonNode_newBlockCandidateBroadcast>(
create_tl_block_id(block_id), cc_seqno, validator_set_hash,
create_tl_object<ton_api::tonNode_blockSignature>(Bits256::zero(), td::BufferSlice()), std::move(data));
td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, adnl_id_, overlay_id_, local_id_,
overlay::Overlays::BroadcastFlagAnySender(), std::move(B));
overlay::Overlays::BroadcastFlagAnySender(), B.move_as_ok());
}

void FullNodeShardImpl::send_broadcast(BlockBroadcast broadcast) {
Expand Down
4 changes: 4 additions & 0 deletions validator/full-node-shard.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,11 @@ class FullNodeShardImpl : public FullNodeShard {
void process_broadcast(PublicKeyHash src, ton_api::tonNode_ihrMessageBroadcast &query);
void process_broadcast(PublicKeyHash src, ton_api::tonNode_externalMessageBroadcast &query);
void process_broadcast(PublicKeyHash src, ton_api::tonNode_newShardBlockBroadcast &query);

void process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcast &query);
void process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcastCompressed &query);
void process_block_candidate_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query);

void receive_broadcast(PublicKeyHash src, td::BufferSlice query);
void check_broadcast(PublicKeyHash src, td::BufferSlice query, td::Promise<td::Unit> promise);

Expand Down

0 comments on commit 8a4d44d

Please sign in to comment.