Skip to content

Commit

Permalink
Adapt "get msg queue sizes" in lite-client and tonlib to non-full lit…
Browse files Browse the repository at this point in the history
…eservers
  • Loading branch information
SpyCheese committed Nov 28, 2024
1 parent 5d79855 commit 5fae8db
Show file tree
Hide file tree
Showing 4 changed files with 253 additions and 26 deletions.
90 changes: 78 additions & 12 deletions lite-client/lite-client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1627,27 +1627,93 @@ void TestNode::send_compute_complaint_price_query(ton::StdSmcAddress elector_add
}

bool TestNode::get_msg_queue_sizes() {
auto q = ton::serialize_tl_object(ton::create_tl_object<ton::lite_api::liteServer_getOutMsgQueueSizes>(0, 0, 0), true);
return envelope_send_query(std::move(q), [Self = actor_id(this)](td::Result<td::BufferSlice> res) -> void {
if (res.is_error()) {
LOG(ERROR) << "liteServer.getOutMsgQueueSizes error: " << res.move_as_error();
ton::BlockIdExt blkid = mc_last_id_;
if (!blkid.is_valid_full()) {
return set_error("must obtain last block information before making other queries");
}
if (!(ready_ && !client_.empty())) {
return set_error("server connection not ready");
}
auto b =
ton::create_serialize_tl_object<ton::lite_api::liteServer_getAllShardsInfo>(ton::create_tl_lite_block_id(blkid));
LOG(INFO) << "requesting recent shard configuration";
return envelope_send_query(std::move(b), [Self = actor_id(this), blkid](td::Result<td::BufferSlice> R) -> void {
if (R.is_error()) {
return;
}
auto F = ton::fetch_tl_object<ton::lite_api::liteServer_outMsgQueueSizes>(res.move_as_ok(), true);
auto F = ton::fetch_tl_object<ton::lite_api::liteServer_allShardsInfo>(R.move_as_ok(), true);
if (F.is_error()) {
LOG(ERROR) << "cannot parse answer to liteServer.getOutMsgQueueSizes";
return;
LOG(ERROR) << "cannot parse answer to liteServer.getAllShardsInfo";
} else {
auto f = F.move_as_ok();
td::actor::send_closure_later(Self, &TestNode::get_msg_queue_sizes_cont, blkid, std::move(f->data_));
}
td::actor::send_closure_later(Self, &TestNode::got_msg_queue_sizes, F.move_as_ok());
});
}

void TestNode::got_msg_queue_sizes(ton::tl_object_ptr<ton::lite_api::liteServer_outMsgQueueSizes> f) {
void TestNode::get_msg_queue_sizes_cont(ton::BlockIdExt mc_blkid, td::BufferSlice data) {
LOG(INFO) << "got shard configuration with respect to block " << mc_blkid.to_str();
std::vector<ton::BlockIdExt> blocks;
blocks.push_back(mc_blkid);
auto R = vm::std_boc_deserialize(data.clone());
if (R.is_error()) {
set_error(R.move_as_error_prefix("cannot deserialize shard configuration: "));
return;
}
auto root = R.move_as_ok();
block::ShardConfig sh_conf;
if (!sh_conf.unpack(vm::load_cell_slice_ref(root))) {
set_error("cannot extract shard block list from shard configuration");
return;
}
auto ids = sh_conf.get_shard_hash_ids(true);
for (auto id : ids) {
auto ref = sh_conf.get_shard_hash(ton::ShardIdFull(id));
if (ref.not_null()) {
blocks.push_back(ref->top_block_id());
}
}

struct QueryInfo {
std::vector<ton::BlockIdExt> blocks;
std::vector<td::uint64> sizes;
size_t pending;
};
auto info = std::make_shared<QueryInfo>();
info->blocks = std::move(blocks);
info->sizes.resize(info->blocks.size(), 0);
info->pending = info->blocks.size();

for (size_t i = 0; i < info->blocks.size(); ++i) {
ton::BlockIdExt block_id = info->blocks[i];
auto b = ton::create_serialize_tl_object<ton::lite_api::liteServer_getBlockOutMsgQueueSize>(
0, ton::create_tl_lite_block_id(block_id), false);
LOG(DEBUG) << "requesting queue size for block " << block_id.to_str();
envelope_send_query(std::move(b), [=, this](td::Result<td::BufferSlice> R) -> void {
if (R.is_error()) {
return;
}
auto F = ton::fetch_tl_object<ton::lite_api::liteServer_blockOutMsgQueueSize>(R.move_as_ok(), true);
if (F.is_error()) {
set_error(F.move_as_error_prefix("failed to get queue size: "));
return;
}
auto f = F.move_as_ok();
LOG(DEBUG) << "got queue size for block " << block_id.to_str() << " : " << f->size_;
info->sizes[i] = f->size_;
if (--info->pending == 0) {
get_msg_queue_sizes_finish(std::move(info->blocks), std::move(info->sizes));
}
});
}
}

void TestNode::get_msg_queue_sizes_finish(std::vector<ton::BlockIdExt> blocks, std::vector<td::uint64> sizes) {
CHECK(blocks.size() == sizes.size());
td::TerminalIO::out() << "Outbound message queue sizes:" << std::endl;
for (auto &x : f->shards_) {
td::TerminalIO::out() << ton::create_block_id(x->id_).id.to_str() << " " << x->size_ << std::endl;
for (size_t i = 0; i < blocks.size(); ++i) {
td::TerminalIO::out() << blocks[i].id.to_str() << " " << sizes[i] << std::endl;
}
td::TerminalIO::out() << "External message queue size limit: " << f->ext_msg_queue_size_limit_ << std::endl;
}

bool TestNode::get_dispatch_queue_info(ton::BlockIdExt block_id) {
Expand Down
3 changes: 2 additions & 1 deletion lite-client/lite-client.h
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,8 @@ class TestNode : public td::actor::Actor {
void send_compute_complaint_price_query(ton::StdSmcAddress elector_addr, unsigned expires_in, unsigned bits,
unsigned refs, td::Bits256 chash, std::string filename);
bool get_msg_queue_sizes();
void got_msg_queue_sizes(ton::tl_object_ptr<ton::lite_api::liteServer_outMsgQueueSizes> f);
void get_msg_queue_sizes_cont(ton::BlockIdExt mc_blkid, td::BufferSlice data);
void get_msg_queue_sizes_finish(std::vector<ton::BlockIdExt> blocks, std::vector<td::uint64> sizes);
bool get_dispatch_queue_info(ton::BlockIdExt block_id);
bool get_dispatch_queue_info_cont(ton::BlockIdExt block_id, bool first, td::Bits256 after_addr);
void got_dispatch_queue_info(ton::BlockIdExt block_id,
Expand Down
167 changes: 154 additions & 13 deletions tonlib/tonlib/TonlibClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1800,6 +1800,132 @@ class GetShardBlockProof : public td::actor::Actor {
std::vector<std::pair<ton::BlockIdExt, td::BufferSlice>> links_;
};

class GetOutMsgQueueSizes : public td::actor::Actor {
public:
GetOutMsgQueueSizes(ExtClientRef ext_client_ref, std::vector<ton::BlockIdExt> blocks,
td::actor::ActorShared<> parent,
td::Promise<tonlib_api_ptr<tonlib_api::blocks_outMsgQueueSizes>>&& promise)
: blocks_(std::move(blocks)), parent_(std::move(parent)), promise_(std::move(promise)) {
client_.set_client(ext_client_ref);
}

void start_up() override {
sizes_.resize(blocks_.size());
pending_ = blocks_.size() + 1;

for (size_t i = 0; i < blocks_.size(); ++i) {
client_.send_query(
ton::lite_api::liteServer_getBlockOutMsgQueueSize(1, ton::create_tl_lite_block_id(blocks_[i]), true),
[SelfId = actor_id(this), i](td::Result<lite_api_ptr<ton::lite_api::liteServer_blockOutMsgQueueSize>> R) {
if (R.is_error()) {
td::actor::send_closure(SelfId, &GetOutMsgQueueSizes::abort, R.move_as_error());
} else {
td::actor::send_closure(SelfId, &GetOutMsgQueueSizes::got_block_queue_size, i, R.move_as_ok());
}
});
}

client_.send_query(
ton::lite_api::liteServer_getOutMsgQueueSizes(1, ton::masterchainId, ton::shardIdAll),
[SelfId = actor_id(this)](td::Result<lite_api_ptr<ton::lite_api::liteServer_outMsgQueueSizes>> R) {
if (R.is_error()) {
td::actor::send_closure(SelfId, &GetOutMsgQueueSizes::abort, R.move_as_error());
} else {
td::actor::send_closure(SelfId, &GetOutMsgQueueSizes::got_ext_msg_queue_size_limit,
R.ok()->ext_msg_queue_size_limit_);
}
});
}

void got_block_queue_size(size_t i, lite_api_ptr<ton::lite_api::liteServer_blockOutMsgQueueSize> f) {
try {
auto S = [&, this]() -> td::Status {
TRY_RESULT_PREFIX(roots, vm::std_boc_deserialize_multi(f->proof_), "cannot deserialize proof: ");
if (roots.size() != 2) {
return td::Status::Error("expected 2 roots in proof");
}
auto state_root = vm::MerkleProof::virtualize(std::move(roots[1]), 1);
if (state_root.is_null()) {
return td::Status::Error("state proof is invalid");
}
ton::Bits256 state_hash = state_root->get_hash().bits();
TRY_STATUS_PREFIX(block::check_block_header_proof(vm::MerkleProof::virtualize(std::move(roots[0]), 1),
blocks_[i], &state_hash, true, nullptr, nullptr),
"error in block header proof: ");

block::gen::ShardStateUnsplit::Record sstate;
block::gen::OutMsgQueueInfo::Record out_msg_queue_info;
if (!tlb::unpack_cell(state_root, sstate) || !tlb::unpack_cell(sstate.out_msg_queue_info, out_msg_queue_info)) {
return td::Status::Error("cannot unpack shard state");
}
vm::CellSlice& extra_slice = out_msg_queue_info.extra.write();
if (extra_slice.fetch_long(1) == 0) {
return td::Status::Error("no out_msg_queue_size in shard state");
}
block::gen::OutMsgQueueExtra::Record out_msg_queue_extra;
if (!tlb::unpack(extra_slice, out_msg_queue_extra)) {
return td::Status::Error("cannot unpack OutMsgQueueExtra");
}
vm::CellSlice& size_slice = out_msg_queue_extra.out_queue_size.write();
if (size_slice.fetch_long(1) == 0) {
return td::Status::Error("no out_msg_queue_size in shard state");
}
td::uint64 size = size_slice.prefetch_ulong(48);
if (size != f->size_) {
return td::Status::Error("queue size mismatch");
}
return td::Status::OK();
}();
if (S.is_error()) {
abort(std::move(S));
return;
}
} catch (vm::VmError& err) {
abort(err.as_status());
return;
} catch (vm::VmVirtError& err) {
abort(err.as_status());
return;
}

sizes_[i] = f->size_;
dec_pending();
}

void got_ext_msg_queue_size_limit(td::uint32 value) {
ext_msg_queue_size_limit_ = value;
dec_pending();
}

void dec_pending() {
if (--pending_ == 0) {
std::vector<tonlib_api::object_ptr<tonlib_api::blocks_outMsgQueueSize>> shards;
for (size_t i = 0; i < blocks_.size(); ++i) {
shards.push_back(
tonlib_api::make_object<tonlib_api::blocks_outMsgQueueSize>(to_tonlib_api(blocks_[i]), sizes_[i]));
}
promise_.set_result(
tonlib_api::make_object<tonlib_api::blocks_outMsgQueueSizes>(std::move(shards), ext_msg_queue_size_limit_));
stop();
}
}

void abort(td::Status error) {
promise_.set_error(std::move(error));
stop();
}

private:
std::vector<ton::BlockIdExt> blocks_;
td::actor::ActorShared<> parent_;
td::Promise<tonlib_api_ptr<tonlib_api::blocks_outMsgQueueSizes>> promise_;
ExtClient client_;

std::vector<td::uint64> sizes_;
td::uint32 ext_msg_queue_size_limit_ = 0;
size_t pending_ = 0;
};

auto to_lite_api(const tonlib_api::ton_blockIdExt& blk) -> td::Result<lite_api_ptr<ton::lite_api::tonNode_blockIdExt>>;
auto to_tonlib_api(const ton::lite_api::liteServer_transactionId& txid) -> tonlib_api_ptr<tonlib_api::blocks_shortTxId>;

Expand Down Expand Up @@ -6129,19 +6255,34 @@ td::Status TonlibClient::do_request(const tonlib_api::blocks_getShardBlockProof&

td::Status TonlibClient::do_request(const tonlib_api::blocks_getOutMsgQueueSizes& request,
td::Promise<object_ptr<tonlib_api::blocks_outMsgQueueSizes>>&& promise) {
client_.send_query(ton::lite_api::liteServer_getOutMsgQueueSizes(request.mode_, request.wc_, request.shard_),
promise.wrap([](lite_api_ptr<ton::lite_api::liteServer_outMsgQueueSizes>&& queue_sizes) {
tonlib_api::blocks_outMsgQueueSizes result;
result.ext_msg_queue_size_limit_ = queue_sizes->ext_msg_queue_size_limit_;
for (auto &x : queue_sizes->shards_) {
tonlib_api::blocks_outMsgQueueSize shard;
shard.id_ = to_tonlib_api(*x->id_);
shard.size_ = x->size_;
result.shards_.push_back(tonlib_api::make_object<tonlib_api::blocks_outMsgQueueSize>(std::move(shard)));
}
return tonlib_api::make_object<tonlib_api::blocks_outMsgQueueSizes>(std::move(result));
}));

auto req_mode = request.mode_;
auto req_shard = ton::ShardIdFull{request.wc_, (ton::ShardId)request.shard_};
if ((req_mode & 1) && !req_shard.is_valid_ext()) {
return td::Status::Error("invalid shard");
}
client_.with_last_block(
[=, self = this, promise = std::move(promise)](td::Result<LastBlockState> r_last_block) mutable {
TRY_RESULT_PROMISE_PREFIX(promise, last_block, std::move(r_last_block), "get last block failed: ");
do_request(tonlib_api::blocks_getShards(to_tonlib_api(last_block.last_block_id)),
[=, mc_blkid = last_block.last_block_id,
promise = std::move(promise)](td::Result<object_ptr<tonlib_api::blocks_shards>> R) mutable {
TRY_RESULT_PROMISE_PREFIX(promise, shards, std::move(R), "get shards failed: ");
std::vector<ton::BlockIdExt> blocks;
if (!(req_mode & 1) || ton::shard_intersects(mc_blkid.shard_full(), req_shard)) {
blocks.push_back(mc_blkid);
}
for (const auto& shard : shards->shards_) {
TRY_RESULT_PROMISE(promise, block_id, to_block_id(*shard));
if (!(req_mode & 1) || ton::shard_intersects(block_id.shard_full(), req_shard)) {
blocks.push_back(block_id);
}
}
auto actor_id = self->actor_id_++;
self->actors_[actor_id] = td::actor::create_actor<GetOutMsgQueueSizes>(
"GetOutMsgQueueSizes", self->client_.get_client(), std::move(blocks),
actor_shared(this, actor_id), std::move(promise));
});
});
return td::Status::OK();
}

Expand Down
19 changes: 19 additions & 0 deletions tonlib/tonlib/tonlib-cli.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ class TonlibCli : public td::actor::Actor {
<< "\t 'k' modifier - use fake key\n"
<< "\t 'c' modifier - just esmitate fees\n";
td::TerminalIO::out() << "getmasterchainsignatures <seqno> - get sigratures of masterchain block <seqno>\n";
td::TerminalIO::out() << "msgqueuesizes - get out msg queue sizes in the latest shard states\n";
} else if (cmd == "genkey") {
generate_key();
} else if (cmd == "exit" || cmd == "quit") {
Expand Down Expand Up @@ -517,6 +518,8 @@ class TonlibCli : public td::actor::Actor {
} else if (cmd == "getmasterchainsignatures") {
auto seqno = parser.read_word();
run_get_masterchain_block_signatures(seqno, std::move(cmd_promise));
} else if (cmd == "msgqueuesizes") {
run_get_out_msg_queue_sizes(std::move(cmd_promise));
} else if (cmd == "showtransactions") {
run_show_transactions(parser, std::move(cmd_promise));
} else {
Expand Down Expand Up @@ -2161,6 +2164,22 @@ class TonlibCli : public td::actor::Actor {
}));
}

void run_get_out_msg_queue_sizes(td::Promise<td::Unit> promise) {
send_query(make_object<tonlib_api::blocks_getOutMsgQueueSizes>(0, 0, 0),
promise.wrap([](tonlib_api::object_ptr<tonlib_api::blocks_outMsgQueueSizes>&& f) {
td::TerminalIO::out() << "Outbound message queue sizes:" << std::endl;
for (const auto& shard : f->shards_) {
td::TerminalIO::out() << ton::BlockId{shard->id_->workchain_, (ton::ShardId)shard->id_->shard_,
(ton::BlockSeqno)shard->id_->seqno_}
.to_str()
<< " " << shard->size_ << std::endl;
}
td::TerminalIO::out() << "External message queue size limit: " << f->ext_msg_queue_size_limit_
<< std::endl;
return td::Unit();
}));
}

void run_show_transactions(td::ConstParser& parser, td::Promise<td::Unit> promise) {
TRY_RESULT_PROMISE(promise, address, to_account_address(parser.read_word(), false));
TRY_RESULT_PROMISE(promise, lt, td::to_integer_safe<td::int64>(parser.read_word()));
Expand Down

0 comments on commit 5fae8db

Please sign in to comment.