Skip to content

Commit

Permalink
Limit checked external messages per address (#1005)
Browse files Browse the repository at this point in the history
* Limit checked external messages per address

* Change max_ext_msg_per_addr_time_window; cleanup mempool by timer

---------

Co-authored-by: SpyCheese <mikle98@yandex.ru>
  • Loading branch information
EmelyanenkoK and SpyCheese authored May 28, 2024
1 parent d80ce8d commit ceefac7
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 23 deletions.
4 changes: 2 additions & 2 deletions validator/fabric.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ td::Result<std::vector<td::Ref<ShardTopBlockDescription>>> create_new_shard_bloc

td::Ref<BlockSignatureSet> create_signature_set(std::vector<BlockSignature> sig_set);

void run_check_external_message(td::BufferSlice data, block::SizeLimitsConfig::ExtMsgLimits limits,
td::actor::ActorId<ValidatorManager> manager, td::Promise<td::Ref<ExtMessage>> promise);
void run_check_external_message(td::Ref<ExtMessage> message, td::actor::ActorId<ValidatorManager> manager,
td::Promise<td::Ref<ExtMessage>> promise);

void run_accept_block_query(BlockIdExt id, td::Ref<BlockData> data, std::vector<BlockIdExt> prev,
td::Ref<ValidatorSet> validator_set, td::Ref<BlockSignatureSet> signatures,
Expand Down
20 changes: 7 additions & 13 deletions validator/impl/external-message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,24 +86,18 @@ td::Result<Ref<ExtMessageQ>> ExtMessageQ::create_ext_message(td::BufferSlice dat
return Ref<ExtMessageQ>{true, std::move(data), std::move(ext_msg), dest_prefix, wc, addr};
}

void ExtMessageQ::run_message(td::BufferSlice data, block::SizeLimitsConfig::ExtMsgLimits limits,
td::actor::ActorId<ton::validator::ValidatorManager> manager,
void ExtMessageQ::run_message(td::Ref<ExtMessage> message, td::actor::ActorId<ton::validator::ValidatorManager> manager,
td::Promise<td::Ref<ExtMessage>> promise) {
auto R = create_ext_message(std::move(data), limits);
if (R.is_error()) {
return promise.set_error(R.move_as_error_prefix("failed to parse external message "));
}
auto M = R.move_as_ok();
auto root = M->root_cell();
auto root = message->root_cell();
block::gen::CommonMsgInfo::Record_ext_in_msg_info info;
tlb::unpack_cell_inexact(root, info); // checked in create message
ton::StdSmcAddress addr = M->addr();
ton::WorkchainId wc = M->wc();
ton::StdSmcAddress addr = message->addr();
ton::WorkchainId wc = message->wc();

run_fetch_account_state(
wc, addr, manager,
[promise = std::move(promise), msg_root = root, wc, addr,
M](td::Result<std::tuple<td::Ref<vm::CellSlice>, UnixTime, LogicalTime, std::unique_ptr<block::ConfigInfo>>>
[promise = std::move(promise), msg_root = root, wc, addr, message](
td::Result<std::tuple<td::Ref<vm::CellSlice>, UnixTime, LogicalTime, std::unique_ptr<block::ConfigInfo>>>
res) mutable {
if (res.is_error()) {
promise.set_error(td::Status::Error(PSLICE() << "Failed to get account state"));
Expand All @@ -120,7 +114,7 @@ void ExtMessageQ::run_message(td::BufferSlice data, block::SizeLimitsConfig::Ext
} else {
auto status = run_message_on_account(wc, &acc, utime, lt + 1, msg_root, std::move(config));
if (status.is_ok()) {
promise.set_value(std::move(M));
promise.set_value(std::move(message));
} else {
promise.set_error(td::Status::Error(PSLICE() << "External message was not accepted\n"
<< status.message()));
Expand Down
3 changes: 1 addition & 2 deletions validator/impl/external-message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ class ExtMessageQ : public ExtMessage {
ton::StdSmcAddress addr);
static td::Result<td::Ref<ExtMessageQ>> create_ext_message(td::BufferSlice data,
block::SizeLimitsConfig::ExtMsgLimits limits);
static void run_message(td::BufferSlice data, block::SizeLimitsConfig::ExtMsgLimits limits,
td::actor::ActorId<ton::validator::ValidatorManager> manager,
static void run_message(td::Ref<ExtMessage> message, td::actor::ActorId<ton::validator::ValidatorManager> manager,
td::Promise<td::Ref<ExtMessage>> promise);
static td::Status run_message_on_account(ton::WorkchainId wc,
block::Account* acc,
Expand Down
5 changes: 2 additions & 3 deletions validator/impl/fabric.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,9 @@ td::Result<td::Ref<ExtMessage>> create_ext_message(td::BufferSlice data,
return std::move(res);
}

void run_check_external_message(td::BufferSlice data, block::SizeLimitsConfig::ExtMsgLimits limits,
td::actor::ActorId<ValidatorManager> manager,
void run_check_external_message(Ref<ExtMessage> message, td::actor::ActorId<ValidatorManager> manager,
td::Promise<td::Ref<ExtMessage>> promise) {
ExtMessageQ::run_message(std::move(data), limits, std::move(manager), std::move(promise));
ExtMessageQ::run_message(std::move(message), std::move(manager), std::move(promise));
}

td::Result<td::Ref<IhrMessage>> create_ihr_message(td::BufferSlice data) {
Expand Down
67 changes: 64 additions & 3 deletions validator/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -412,14 +412,42 @@ void ValidatorManagerImpl::add_external_message(td::Ref<ExtMessage> msg, int pri
ext_messages_hashes_[id.hash] = {priority, id};
}
void ValidatorManagerImpl::check_external_message(td::BufferSlice data, td::Promise<td::Ref<ExtMessage>> promise) {
++ls_stats_check_ext_messages_;
auto state = do_get_last_liteserver_state();
if (state.is_null()) {
promise.set_error(td::Status::Error(ErrorCode::notready, "not ready"));
return;
}
run_check_external_message(std::move(data), state->get_ext_msg_limits(), actor_id(this),
std::move(promise));
auto R = create_ext_message(std::move(data), state->get_ext_msg_limits());
if (R.is_error()) {
promise.set_error(R.move_as_error_prefix("failed to parse external message: "));
return;
}
auto message = R.move_as_ok();
WorkchainId wc = message->wc();
StdSmcAddress addr = message->addr();
if (checked_ext_msg_counter_.get_msg_count(wc, addr) >= max_ext_msg_per_addr()) {
promise.set_error(
td::Status::Error(PSTRING() << "too many external messages to address " << wc << ":" << addr.to_hex()));
return;
}

promise = [self = this, wc, addr, promise = std::move(promise),
SelfId = actor_id(this)](td::Result<td::Ref<ExtMessage>> R) mutable {
if (R.is_error()) {
promise.set_error(R.move_as_error());
return;
}
td::actor::send_lambda(SelfId, [=, promise = std::move(promise), message = R.move_as_ok()]() mutable {
if (self->checked_ext_msg_counter_.inc_msg_count(wc, addr) > max_ext_msg_per_addr()) {
promise.set_error(
td::Status::Error(PSTRING() << "too many external messages to address " << wc << ":" << addr.to_hex()));
return;
}
promise.set_result(std::move(message));
});
};
++ls_stats_check_ext_messages_;
run_check_external_message(std::move(message), actor_id(this), std::move(promise));
}

void ValidatorManagerImpl::new_ihr_message(td::BufferSlice data) {
Expand Down Expand Up @@ -2592,6 +2620,16 @@ void ValidatorManagerImpl::alarm() {
log_ls_stats_at_ = td::Timestamp::in(60.0);
}
alarm_timestamp().relax(log_ls_stats_at_);
if (cleanup_mempool_at_.is_in_past()) {
if (is_validator()) {
get_external_messages(ShardIdFull{masterchainId, shardIdAll},
[](td::Result<std::vector<std::pair<td::Ref<ExtMessage>, int>>>) {});
get_external_messages(ShardIdFull{basechainId, shardIdAll},
[](td::Result<std::vector<std::pair<td::Ref<ExtMessage>, int>>>) {});
}
cleanup_mempool_at_ = td::Timestamp::in(250.0);
}
alarm_timestamp().relax(cleanup_mempool_at_);
}

void ValidatorManagerImpl::update_shard_client_state(BlockIdExt masterchain_block_id, td::Promise<td::Unit> promise) {
Expand Down Expand Up @@ -3102,6 +3140,29 @@ td::actor::ActorOwn<ValidatorManagerInterface> ValidatorManagerFactory::create(
rldp, overlays);
}

size_t ValidatorManagerImpl::CheckedExtMsgCounter::get_msg_count(WorkchainId wc, StdSmcAddress addr) {
before_query();
auto it1 = counter_cur_.find({wc, addr});
auto it2 = counter_prev_.find({wc, addr});
return (it1 == counter_cur_.end() ? 0 : it1->second) + (it2 == counter_prev_.end() ? 0 : it2->second);
}
size_t ValidatorManagerImpl::CheckedExtMsgCounter::inc_msg_count(WorkchainId wc, StdSmcAddress addr) {
before_query();
auto it2 = counter_prev_.find({wc, addr});
return (it2 == counter_prev_.end() ? 0 : it2->second) + ++counter_cur_[{wc, addr}];
}
void ValidatorManagerImpl::CheckedExtMsgCounter::before_query() {
while (cleanup_at_.is_in_past()) {
counter_prev_ = std::move(counter_cur_);
counter_cur_.clear();
if (counter_prev_.empty()) {
cleanup_at_ = td::Timestamp::in(max_ext_msg_per_addr_time_window() / 2.0);
break;
}
cleanup_at_ += max_ext_msg_per_addr_time_window() / 2.0;
}
}

} // namespace validator

} // namespace ton
16 changes: 16 additions & 0 deletions validator/manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,20 @@ class ValidatorManagerImpl : public ValidatorManager {
};
std::map<int, ExtMessages> ext_msgs_; // priority -> messages
std::map<ExtMessage::Hash, std::pair<int, MessageId<ExtMessage>>> ext_messages_hashes_; // hash -> priority
td::Timestamp cleanup_mempool_at_;
// IHR ?
std::map<MessageId<IhrMessage>, std::unique_ptr<MessageExt<IhrMessage>>> ihr_messages_;
std::map<IhrMessage::Hash, MessageId<IhrMessage>> ihr_messages_hashes_;

struct CheckedExtMsgCounter {
std::map<std::pair<WorkchainId, StdSmcAddress>, size_t> counter_cur_, counter_prev_;
td::Timestamp cleanup_at_ = td::Timestamp::now();

size_t get_msg_count(WorkchainId wc, StdSmcAddress addr);
size_t inc_msg_count(WorkchainId wc, StdSmcAddress addr);
void before_query();
} checked_ext_msg_counter_;

private:
// VALIDATOR GROUPS
ValidatorSessionId get_validator_set_id(ShardIdFull shard, td::Ref<ValidatorSet> val_set, td::Bits256 opts_hash,
Expand Down Expand Up @@ -678,6 +688,12 @@ class ValidatorManagerImpl : public ValidatorManager {
size_t max_cached_candidates() const {
return 128;
}
static double max_ext_msg_per_addr_time_window() {
return 10.0;
}
static size_t max_ext_msg_per_addr() {
return 3 * 10;
}

private:
std::map<BlockSeqno, WaitList<td::actor::Actor, td::Unit>> shard_client_waiters_;
Expand Down

0 comments on commit ceefac7

Please sign in to comment.