Skip to content

Commit

Permalink
Send validator telemetry to the private overlay (#1325)
Browse files Browse the repository at this point in the history
* Send validator telemetry to the private overlay

* Improve rotating neighbours in overlays
  • Loading branch information
SpyCheese authored Nov 25, 2024
1 parent 52b010f commit 061c82f
Show file tree
Hide file tree
Showing 30 changed files with 426 additions and 13 deletions.
3 changes: 3 additions & 0 deletions create-hardfork/create-hardfork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,9 @@ class HardforkCreator : public td::actor::Actor {

void new_key_block(ton::validator::BlockHandle handle) override {
}
void send_validator_telemetry(ton::PublicKeyHash key,
ton::tl_object_ptr<ton::ton_api::validator_telemetry> telemetry) override {
}
};

td::actor::send_closure(validator_manager_, &ton::validator::ValidatorManagerInterface::install_callback,
Expand Down
15 changes: 12 additions & 3 deletions overlay/overlay-manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ void OverlayManager::register_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdS
}
overlays_[local_id][overlay_id] = OverlayDescription{std::move(overlay), std::move(cert)};

if (!with_db_) {
return;
}
auto P =
td::PromiseCreator::lambda([id = overlays_[local_id][overlay_id].overlay.get()](td::Result<DbType::GetResult> R) {
R.ensure();
Expand Down Expand Up @@ -417,13 +420,19 @@ OverlayManager::OverlayManager(std::string db_root, td::actor::ActorId<keyring::
}

void OverlayManager::start_up() {
std::shared_ptr<td::KeyValue> kv =
std::make_shared<td::RocksDb>(td::RocksDb::open(PSTRING() << db_root_ << "/overlays").move_as_ok());
db_ = DbType{std::move(kv)};
if (!db_root_.empty()) {
with_db_ = true;
std::shared_ptr<td::KeyValue> kv =
std::make_shared<td::RocksDb>(td::RocksDb::open(PSTRING() << db_root_ << "/overlays").move_as_ok());
db_ = DbType{std::move(kv)};
}
}

void OverlayManager::save_to_db(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay_id,
std::vector<OverlayNode> nodes) {
if (!with_db_) {
return;
}
std::vector<tl_object_ptr<ton_api::overlay_node>> nodes_vec;
for (auto &n : nodes) {
nodes_vec.push_back(n.tl());
Expand Down
1 change: 1 addition & 0 deletions overlay/overlay-manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ class OverlayManager : public Overlays {
td::actor::ActorId<dht::Dht> dht_node_;

using DbType = td::KeyValueAsync<td::Bits256, td::BufferSlice>;
bool with_db_ = false;
DbType db_;

class AdnlCallback : public adnl::Adnl::Callback {
Expand Down
4 changes: 2 additions & 2 deletions overlay/overlay-peers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ void OverlayImpl::add_peer(OverlayNode node) {
peer_list_.peers_.insert(id, OverlayPeer(std::move(node)));
del_some_peers();
auto X = peer_list_.peers_.get(id);
if (X != nullptr && peer_list_.neighbours_.size() < max_neighbours() &&
if (X != nullptr && !X->is_neighbour() && peer_list_.neighbours_.size() < max_neighbours() &&
!(X->get_node()->flags() & OverlayMemberFlags::DoNotReceiveBroadcasts) && X->get_id() != local_id_) {
peer_list_.neighbours_.push_back(X->get_id());
X->set_neighbour(true);
Expand Down Expand Up @@ -440,7 +440,7 @@ void OverlayImpl::update_neighbours(td::uint32 nodes_to_change) {
VLOG(OVERLAY_INFO) << this << ": adding new neighbour " << X->get_id();
peer_list_.neighbours_.push_back(X->get_id());
X->set_neighbour(true);
} else {
} else if (X->is_alive()) {
CHECK(nodes_to_change > 0);
auto i = td::Random::fast(0, static_cast<td::uint32>(peer_list_.neighbours_.size()) - 1);
auto Y = peer_list_.peers_.get(peer_list_.neighbours_[i]);
Expand Down
7 changes: 6 additions & 1 deletion overlay/overlay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,12 @@ void OverlayImpl::alarm() {
update_db_at_ = td::Timestamp::in(60.0);
}

update_neighbours(0);
if (update_neighbours_at_.is_in_past()) {
update_neighbours(2);
update_neighbours_at_ = td::Timestamp::in(td::Random::fast(30.0, 120.0));
} else {
update_neighbours(0);
}
alarm_timestamp() = td::Timestamp::in(1.0);
} else {
update_neighbours(0);
Expand Down
1 change: 1 addition & 0 deletions overlay/overlay.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ class OverlayImpl : public Overlay {
td::Timestamp next_dht_store_query_ = td::Timestamp::in(1.0);
td::Timestamp update_db_at_;
td::Timestamp update_throughput_at_;
td::Timestamp update_neighbours_at_;
td::Timestamp last_throughput_update_;

std::unique_ptr<Overlays::Callback> callback_;
Expand Down
41 changes: 41 additions & 0 deletions tdutils/td/utils/port/Stat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -472,4 +472,45 @@ Result<TotalMemStat> get_total_mem_stat() {
#endif
}

Result<uint32> get_cpu_cores() {
#if TD_LINUX
uint32 result = 0;
TRY_RESULT(fd, FileFd::open("/proc/cpuinfo", FileFd::Read));
SCOPE_EXIT {
fd.close();
};
std::string data;
char buf[10000];
while (true) {
TRY_RESULT(size, fd.read(MutableSlice{buf, sizeof(buf) - 1}));
if (size == 0) {
break;
}
buf[size] = '\0';
data += buf;
}
size_t i = 0;
while (i < data.size()) {
const char *line_begin = data.data() + i;
while (i < data.size() && data[i] != '\n') {
++i;
}
auto line_end = data.data() + i;
++i;
Slice line{line_begin, line_end};
size_t j = 0;
while (j < line.size() && line[j] != ' ' && line[j] != '\t' && line[j] != ':') {
++j;
}
Slice name = line.substr(0, j);
if (name == "processor") {
++result;
}
}
return result;
#else
return Status::Error("Not supported");
#endif
}

} // namespace td
2 changes: 2 additions & 0 deletions tdutils/td/utils/port/Stat.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,6 @@ struct TotalMemStat {
};
Result<TotalMemStat> get_total_mem_stat() TD_WARN_UNUSED_RESULT;

Result<uint32> get_cpu_cores() TD_WARN_UNUSED_RESULT;

} // namespace td
3 changes: 3 additions & 0 deletions test/test-ton-collator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,9 @@ class TestNode : public td::actor::Actor {

void new_key_block(ton::validator::BlockHandle handle) override {
}
void send_validator_telemetry(ton::PublicKeyHash key,
ton::tl_object_ptr<ton::ton_api::validator_telemetry> telemetry) override {
}
};

td::actor::send_closure(validator_manager_, &ton::validator::ValidatorManagerInterface::install_callback,
Expand Down
4 changes: 4 additions & 0 deletions tl/generate/scheme/ton_api.tl
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,10 @@ validator.group workchain:int shard:long catchain_seqno:int config_hash:int256 m
validator.groupEx workchain:int shard:long vertical_seqno:int catchain_seqno:int config_hash:int256 members:(vector validator.groupMember) = validator.Group;
validator.groupNew workchain:int shard:long vertical_seqno:int last_key_block_seqno:int catchain_seqno:int config_hash:int256 members:(vector validator.groupMember) = validator.Group;

validator.telemetry flags:# timestamp:double adnl_id:int256
node_version:string os_version:string node_started_at:int
ram_size:long cpu_cores:int node_threads:int = validator.Telemetry;

---functions---


Expand Down
Binary file modified tl/generate/scheme/ton_api.tlo
Binary file not shown.
13 changes: 13 additions & 0 deletions validator-engine/validator-engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1945,6 +1945,10 @@ void ValidatorEngine::start_full_node() {
default_dht_node_.is_zero() ? td::actor::ActorId<ton::dht::Dht>{} : dht_nodes_[default_dht_node_].get(),
overlay_manager_.get(), validator_manager_.get(), full_node_client_.get(), db_root_);
load_custom_overlays_config();
if (!validator_telemetry_filename_.empty()) {
td::actor::send_closure(full_node_, &ton::validator::fullnode::FullNode::set_validator_telemetry_filename,
validator_telemetry_filename_);
}
}

for (auto &v : config_.validators) {
Expand Down Expand Up @@ -4331,6 +4335,15 @@ int main(int argc, char *argv[]) {
acts.push_back(
[&x]() { td::actor::send_closure(x, &ValidatorEngine::set_fast_state_serializer_enabled, true); });
});
p.add_option(
'\0', "collect-validator-telemetry",
"store validator telemetry from private block overlay to a given file (json format)",
[&](td::Slice s) {
acts.push_back(
[&x, s = s.str()]() {
td::actor::send_closure(x, &ValidatorEngine::set_validator_telemetry_filename, s);
});
});
auto S = p.run(argc, argv);
if (S.is_error()) {
LOG(ERROR) << "failed to parse options: " << S.move_as_error();
Expand Down
4 changes: 4 additions & 0 deletions validator-engine/validator-engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ class ValidatorEngine : public td::actor::Actor {
ton::BlockSeqno truncate_seqno_{0};
std::string session_logs_file_;
bool fast_state_serializer_enabled_ = false;
std::string validator_telemetry_filename_;

std::set<ton::CatchainSeqno> unsafe_catchains_;
std::map<ton::BlockSeqno, std::pair<ton::CatchainSeqno, td::uint32>> unsafe_catchain_rotations_;
Expand Down Expand Up @@ -310,6 +311,9 @@ class ValidatorEngine : public td::actor::Actor {
void set_fast_state_serializer_enabled(bool value) {
fast_state_serializer_enabled_ = value;
}
void set_validator_telemetry_filename(std::string value) {
validator_telemetry_filename_ = std::move(value);
}
void start_up() override;
ValidatorEngine() {
}
Expand Down
2 changes: 2 additions & 0 deletions validator/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ set(VALIDATOR_HEADERS

import-db-slice.hpp
queue-size-counter.hpp
validator-telemetry.hpp

manager-disk.h
manager-disk.hpp
Expand All @@ -82,6 +83,7 @@ set(VALIDATOR_SOURCE
validator-group.cpp
validator-options.cpp
queue-size-counter.cpp
validator-telemetry.cpp

downloaders/wait-block-data.cpp
downloaders/wait-block-state.cpp
Expand Down
66 changes: 65 additions & 1 deletion validator/full-node-private-overlay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
#include "common/delay.h"
#include "common/checksum.h"
#include "full-node-serializer.hpp"
#include "auto/tl/ton_api_json.h"
#include "td/utils/JsonBuilder.h"
#include "tl/tl_json.h"

namespace ton::validator::fullnode {

Expand Down Expand Up @@ -85,15 +88,52 @@ void FullNodePrivateBlockOverlay::process_block_candidate_broadcast(PublicKeyHas
validator_set_hash, std::move(data));
}

void FullNodePrivateBlockOverlay::process_telemetry_broadcast(
PublicKeyHash src, const tl_object_ptr<ton_api::validator_telemetry>& telemetry) {
if (telemetry->adnl_id_ != src.bits256_value()) {
VLOG(FULL_NODE_WARNING) << "Invalid telemetry broadcast from " << src << ": adnl_id mismatch";
return;
}
auto now = (td::int32)td::Clocks::system();
if (telemetry->timestamp_ < now - 60) {
VLOG(FULL_NODE_WARNING) << "Invalid telemetry broadcast from " << src << ": too old ("
<< now - telemetry->timestamp_ << "s ago)";
return;
}
if (telemetry->timestamp_ > now + 60) {
VLOG(FULL_NODE_WARNING) << "Invalid telemetry broadcast from " << src << ": too new ("
<< telemetry->timestamp_ - now << "s in the future)";
return;
}
VLOG(FULL_NODE_DEBUG) << "Got telemetry broadcast from " << src;
auto s = td::json_encode<std::string>(td::ToJson(*telemetry), false);
std::erase_if(s, [](char c) {
return c == '\n' || c == '\r';
});
telemetry_file_ << s << "\n";
telemetry_file_.flush();
if (telemetry_file_.fail()) {
VLOG(FULL_NODE_WARNING) << "Failed to write telemetry to file";
}
}

void FullNodePrivateBlockOverlay::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) {
if (adnl::AdnlNodeIdShort{src} == local_id_) {
return;
}
auto B = fetch_tl_object<ton_api::tonNode_Broadcast>(std::move(broadcast), true);
if (B.is_error()) {
if (collect_telemetry_ && src != local_id_.pubkey_hash()) {
auto R = fetch_tl_prefix<ton_api::validator_telemetry>(broadcast, true);
if (R.is_ok()) {
process_telemetry_broadcast(src, R.ok());
}
}
return;
}
ton_api::downcast_call(*B.move_as_ok(), [src, Self = this](auto &obj) { Self->process_broadcast(src, obj); });
ton_api::downcast_call(*B.move_as_ok(), [src, Self = this](auto& obj) {
Self->process_broadcast(src, obj);
});
}

void FullNodePrivateBlockOverlay::send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno,
Expand Down Expand Up @@ -144,6 +184,30 @@ void FullNodePrivateBlockOverlay::send_broadcast(BlockBroadcast broadcast) {
local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), B.move_as_ok());
}

void FullNodePrivateBlockOverlay::send_validator_telemetry(tl_object_ptr<ton_api::validator_telemetry> telemetry) {
process_telemetry_broadcast(local_id_.pubkey_hash(), telemetry);
auto data = serialize_tl_object(telemetry, true);
if (data.size() <= overlay::Overlays::max_simple_broadcast_size()) {
td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_ex, local_id_, overlay_id_,
local_id_.pubkey_hash(), 0, std::move(data));
} else {
td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, local_id_, overlay_id_,
local_id_.pubkey_hash(), 0, std::move(data));
}
}

void FullNodePrivateBlockOverlay::collect_validator_telemetry(std::string filename) {
if (collect_telemetry_) {
telemetry_file_.close();
}
collect_telemetry_ = true;
LOG(FULL_NODE_WARNING) << "Collecting validator telemetry to " << filename << " (local id: " << local_id_ << ")";
telemetry_file_.open(filename, std::ios_base::app);
if (!telemetry_file_.is_open()) {
LOG(WARNING) << "Cannot open file " << filename << " for validator telemetry";
}
}

void FullNodePrivateBlockOverlay::start_up() {
std::sort(nodes_.begin(), nodes_.end());
nodes_.erase(std::unique(nodes_.begin(), nodes_.end()), nodes_.end());
Expand Down
9 changes: 9 additions & 0 deletions validator/full-node-private-overlay.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#pragma once

#include "full-node.h"
#include <fstream>

namespace ton::validator::fullnode {

Expand All @@ -32,6 +33,8 @@ class FullNodePrivateBlockOverlay : public td::actor::Actor {
void process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcastCompressed &query);
void process_block_candidate_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query);

void process_telemetry_broadcast(PublicKeyHash src, const tl_object_ptr<ton_api::validator_telemetry>& telemetry);

template <class T>
void process_broadcast(PublicKeyHash, T &) {
VLOG(FULL_NODE_WARNING) << "dropping unknown broadcast";
Expand All @@ -42,6 +45,9 @@ class FullNodePrivateBlockOverlay : public td::actor::Actor {
void send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash,
td::BufferSlice data);
void send_broadcast(BlockBroadcast broadcast);
void send_validator_telemetry(tl_object_ptr<ton_api::validator_telemetry> telemetry);

void collect_validator_telemetry(std::string filename);

void set_config(FullNodeConfig config) {
config_ = std::move(config);
Expand Down Expand Up @@ -91,6 +97,9 @@ class FullNodePrivateBlockOverlay : public td::actor::Actor {

void try_init();
void init();

bool collect_telemetry_ = false;
std::ofstream telemetry_file_;
};

class FullNodeCustomOverlay : public td::actor::Actor {
Expand Down
Loading

0 comments on commit 061c82f

Please sign in to comment.