Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Traffic shaping #4786

Merged
merged 30 commits into from
Dec 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
bce3995
Debug log stuck IO threads
pwojcikdev Oct 22, 2024
9bdc56c
Closing channels
pwojcikdev May 3, 2024
11d4a43
Rename event to `channel_connected`
pwojcikdev Oct 30, 2024
c994477
Keep connection type
pwojcikdev Nov 14, 2024
420d8d5
Modify `socket_connected` event
pwojcikdev Oct 30, 2024
187a3ce
Boost handler tracking
pwojcikdev Nov 13, 2024
cb70e36
Abort thread runner
pwojcikdev Nov 13, 2024
c494819
Asio error to stat detail
pwojcikdev Nov 11, 2024
ae4e484
Do not merge peer on keepalive
pwojcikdev Nov 14, 2024
3a458d8
Tests
pwojcikdev Nov 14, 2024
98fe46d
Channel traffic rework
pwojcikdev May 4, 2024
a41c317
Return merge peer result
pwojcikdev Nov 14, 2024
44e0417
Stats & logging
pwojcikdev Nov 14, 2024
2d43452
Tests
pwojcikdev Nov 15, 2024
0f8bbf8
Make send_buffer protected
pwojcikdev Nov 16, 2024
40f53a9
Node traffic prioritization
pwojcikdev Nov 17, 2024
8ac186c
Store shared ptr to socket
pwojcikdev Nov 17, 2024
c659d0c
Inline async
pwojcikdev Nov 17, 2024
a81d515
Track traffic type stats
pwojcikdev Nov 17, 2024
cd8d2b2
Tune traffic priorities
pwojcikdev Nov 17, 2024
fcd7415
Reduce socket queue size
pwojcikdev Nov 17, 2024
9d9cac9
Reduce channel queue size
pwojcikdev Nov 18, 2024
55530cf
Test fixing
pwojcikdev Nov 18, 2024
31ca903
Async factory
pwojcikdev Nov 18, 2024
483062f
Avoid coroutine lambda captures in tcp_channel
pwojcikdev Nov 18, 2024
7dc6041
Avoid coroutine lambda captures in tcp_listener
pwojcikdev Nov 18, 2024
ae6408e
Move socket_functions impl
pwojcikdev Nov 19, 2024
ca96ce3
Tests
pwojcikdev Dec 16, 2024
e3a2afd
Configurable socket queue size
pwojcikdev Dec 16, 2024
4146cf6
Missing stats
pwojcikdev Dec 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ set(NANO_FUZZER_TEST
OFF
CACHE BOOL "")
set(NANO_ASIO_HANDLER_TRACKING
0
CACHE STRING "")
OFF
CACHE BOOL "")
set(NANO_ROCKSDB_TOOLS
OFF
CACHE BOOL "")
Expand Down Expand Up @@ -153,9 +153,8 @@ if(${NANO_TIMED_LOCKS} GREATER 0)
endif()
endif()

if(${NANO_ASIO_HANDLER_TRACKING} GREATER 0)
add_definitions(-DNANO_ASIO_HANDLER_TRACKING=${NANO_ASIO_HANDLER_TRACKING}
-DBOOST_ASIO_ENABLE_HANDLER_TRACKING)
if(NANO_ASIO_HANDLER_TRACKING)
add_definitions(-DBOOST_ASIO_ENABLE_HANDLER_TRACKING)
endif()

option(NANO_SIMD_OPTIMIZATIONS
Expand Down
4 changes: 2 additions & 2 deletions nano/core_test/active_elections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,7 @@ TEST (active_elections, fork_replacement_tally)
node_config.peering_port = system.get_available_port ();
auto & node2 (*system.add_node (node_config));
node1.network.filter.clear ();
node2.network.flood_block (send_last);
node2.network.flood_block (send_last, nano::transport::traffic_type::test);
ASSERT_TIMELY (3s, node1.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) > 0);

// Correct block without votes is ignored
Expand All @@ -984,7 +984,7 @@ TEST (active_elections, fork_replacement_tally)
// ensure vote arrives before the block
ASSERT_TIMELY_EQ (5s, 1, node1.vote_cache.find (send_last->hash ()).size ());
node1.network.filter.clear ();
node2.network.flood_block (send_last);
node2.network.flood_block (send_last, nano::transport::traffic_type::test);
ASSERT_TIMELY (5s, node1.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) > 1);

// the send_last block should replace one of the existing block of the election because it has higher vote weight
Expand Down
86 changes: 43 additions & 43 deletions nano/core_test/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ TEST (network, last_contacted)

{
// check that the endpoints are part of the same connection
std::shared_ptr<nano::transport::tcp_socket> sock0 = channel0->socket.lock ();
std::shared_ptr<nano::transport::tcp_socket> sock1 = channel1->socket.lock ();
std::shared_ptr<nano::transport::tcp_socket> sock0 = channel0->socket;
std::shared_ptr<nano::transport::tcp_socket> sock1 = channel1->socket;
ASSERT_EQ (sock0->local_endpoint (), sock1->remote_endpoint ());
ASSERT_EQ (sock1->local_endpoint (), sock0->remote_endpoint ());
}
Expand Down Expand Up @@ -195,7 +195,7 @@ TEST (network, send_discarded_publish)
.build ();
{
auto transaction = node1.ledger.tx_begin_read ();
node1.network.flood_block (block);
node1.network.flood_block (block, nano::transport::traffic_type::test);
ASSERT_EQ (nano::dev::genesis->hash (), node1.ledger.any.account_head (transaction, nano::dev::genesis_key.pub));
ASSERT_EQ (nano::dev::genesis->hash (), node2.latest (nano::dev::genesis_key.pub));
}
Expand All @@ -221,7 +221,7 @@ TEST (network, send_invalid_publish)
.build ();
{
auto transaction = node1.ledger.tx_begin_read ();
node1.network.flood_block (block);
node1.network.flood_block (block, nano::transport::traffic_type::test);
ASSERT_EQ (nano::dev::genesis->hash (), node1.ledger.any.account_head (transaction, nano::dev::genesis_key.pub));
ASSERT_EQ (nano::dev::genesis->hash (), node2.latest (nano::dev::genesis_key.pub));
}
Expand Down Expand Up @@ -306,7 +306,7 @@ TEST (network, send_insufficient_work)
nano::publish publish1{ nano::dev::network_params.network, block1 };
auto tcp_channel (node1.network.tcp_channels.find_node_id (node2.get_node_id ()));
ASSERT_NE (nullptr, tcp_channel);
tcp_channel->send (publish1, [] (boost::system::error_code const & ec, size_t size) {});
tcp_channel->send (publish1, nano::transport::traffic_type::test);
ASSERT_EQ (0, node1.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work));
ASSERT_TIMELY (10s, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work) != 0);
ASSERT_EQ (1, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work));
Expand All @@ -320,7 +320,7 @@ TEST (network, send_insufficient_work)
.work (system.work_generate_limited (block1->hash (), node1.network_params.work.epoch_2_receive, node1.network_params.work.epoch_1 - 1))
.build ();
nano::publish publish2{ nano::dev::network_params.network, block2 };
tcp_channel->send (publish2, [] (boost::system::error_code const & ec, size_t size) {});
tcp_channel->send (publish2, nano::transport::traffic_type::test);
ASSERT_TIMELY (10s, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work) != 1);
ASSERT_EQ (2, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work));
// Legacy block work epoch_1
Expand All @@ -333,7 +333,7 @@ TEST (network, send_insufficient_work)
.work (*system.work.generate (block2->hash (), node1.network_params.work.epoch_2))
.build ();
nano::publish publish3{ nano::dev::network_params.network, block3 };
tcp_channel->send (publish3, [] (boost::system::error_code const & ec, size_t size) {});
tcp_channel->send (publish3, nano::transport::traffic_type::test);
ASSERT_EQ (0, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in));
ASSERT_TIMELY (10s, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) != 0);
ASSERT_EQ (1, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in));
Expand All @@ -349,7 +349,7 @@ TEST (network, send_insufficient_work)
.work (system.work_generate_limited (block1->hash (), node1.network_params.work.epoch_2_receive, node1.network_params.work.epoch_1 - 1))
.build ();
nano::publish publish4{ nano::dev::network_params.network, block4 };
tcp_channel->send (publish4, [] (boost::system::error_code const & ec, size_t size) {});
tcp_channel->send (publish4, nano::transport::traffic_type::test);
ASSERT_TIMELY (10s, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) != 0);
ASSERT_EQ (1, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in));
ASSERT_EQ (2, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work));
Expand Down Expand Up @@ -562,14 +562,14 @@ TEST (network, peer_max_tcp_attempts)
node_config.network.max_peers_per_ip = 3;
auto node = system.add_node (node_config, node_flags);

for (auto i (0); i < node_config.network.max_peers_per_ip; ++i)
for (auto i = 0; i < node_config.network.max_peers_per_ip; ++i)
{
auto node2 (std::make_shared<nano::node> (system.io_ctx, system.get_available_port (), nano::unique_path (), system.work, node_flags));
node2->start ();
system.nodes.push_back (node2);

// Start TCP attempt
node->network.merge_peer (node2->network.endpoint ());
// Disable reachout from temporary nodes to avoid mixing outbound and inbound connections
nano::node_config temp_config = system.default_config ();
temp_config.network.peer_reachout = {};
temp_config.network.cached_peer_reachout = {};
auto temp_node = system.make_disconnected_node (temp_config, node_flags);
ASSERT_TRUE (node->network.merge_peer (temp_node->network.endpoint ()));
}

ASSERT_TIMELY_EQ (15s, node->network.size (), node_config.network.max_peers_per_ip);
Expand Down Expand Up @@ -632,9 +632,9 @@ TEST (network, duplicate_detection)
ASSERT_NE (nullptr, tcp_channel);

ASSERT_EQ (0, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish_message));
tcp_channel->send (publish);
tcp_channel->send (publish, nano::transport::traffic_type::test);
ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish_message), 0);
tcp_channel->send (publish);
tcp_channel->send (publish, nano::transport::traffic_type::test);
ASSERT_TIMELY_EQ (2s, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish_message), 1);
}

Expand Down Expand Up @@ -681,9 +681,9 @@ TEST (network, duplicate_vote_detection)
ASSERT_NE (nullptr, tcp_channel);

ASSERT_EQ (0, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message));
tcp_channel->send (message);
tcp_channel->send (message, nano::transport::traffic_type::test);
ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 0);
tcp_channel->send (message);
tcp_channel->send (message, nano::transport::traffic_type::test);
ASSERT_TIMELY_EQ (2s, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 1);
}

Expand Down Expand Up @@ -711,12 +711,12 @@ TEST (network, duplicate_revert_vote)
ASSERT_NE (nullptr, tcp_channel);

// First vote should be processed
tcp_channel->send (message1);
tcp_channel->send (message1, nano::transport::traffic_type::test);
ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 0);
ASSERT_TIMELY (5s, node1.network.filter.check (bytes1.data (), bytes1.size ()));

// Second vote should get dropped from processor queue
tcp_channel->send (message2);
tcp_channel->send (message2, nano::transport::traffic_type::test);
ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 0);
// And the filter should not have it
WAIT (500ms); // Give the node time to process the vote
Expand All @@ -741,9 +741,9 @@ TEST (network, expire_duplicate_filter)

// Send a vote
ASSERT_EQ (0, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message));
tcp_channel->send (message);
tcp_channel->send (message, nano::transport::traffic_type::test);
ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 0);
tcp_channel->send (message);
tcp_channel->send (message, nano::transport::traffic_type::test);
ASSERT_TIMELY_EQ (2s, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 1);

// The filter should expire the vote after some time
Expand All @@ -752,7 +752,7 @@ TEST (network, expire_duplicate_filter)
}

// The test must be completed in less than 1 second
TEST (network, bandwidth_limiter_4_messages)
TEST (network, DISABLED_bandwidth_limiter_4_messages)
{
nano::test::system system;
nano::publish message{ nano::dev::network_params.network, nano::dev::genesis };
Expand All @@ -767,22 +767,22 @@ TEST (network, bandwidth_limiter_4_messages)
// Send droppable messages
for (auto i = 0; i < message_limit; i += 2) // number of channels
{
channel1.send (message);
channel2.send (message);
channel1.send (message, nano::transport::traffic_type::test);
channel2.send (message, nano::transport::traffic_type::test);
}
// Only sent messages below limit, so we don't expect any drops
ASSERT_TIMELY_EQ (1s, 0, node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out));

// Send droppable message; drop stats should increase by one now
channel1.send (message);
channel1.send (message, nano::transport::traffic_type::test);
ASSERT_TIMELY_EQ (1s, 1, node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out));

// Send non-droppable message, i.e. drop stats should not increase
channel2.send (message, nullptr, nano::transport::buffer_drop_policy::no_limiter_drop);
channel2.send (message, nano::transport::traffic_type::test);
ASSERT_TIMELY_EQ (1s, 1, node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out));
}

TEST (network, bandwidth_limiter_2_messages)
TEST (network, DISABLED_bandwidth_limiter_2_messages)
{
nano::test::system system;
nano::publish message{ nano::dev::network_params.network, nano::dev::genesis };
Expand All @@ -795,10 +795,10 @@ TEST (network, bandwidth_limiter_2_messages)
nano::transport::inproc::channel channel1{ node, node };
nano::transport::inproc::channel channel2{ node, node };
// change the bandwidth settings, 2 packets will be dropped
channel1.send (message);
channel2.send (message);
channel1.send (message);
channel2.send (message);
channel1.send (message, nano::transport::traffic_type::test);
channel2.send (message, nano::transport::traffic_type::test);
channel1.send (message, nano::transport::traffic_type::test);
channel2.send (message, nano::transport::traffic_type::test);
ASSERT_TIMELY_EQ (1s, 2, node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out));
}

Expand All @@ -815,10 +815,10 @@ TEST (network, bandwidth_limiter_with_burst)
nano::transport::inproc::channel channel1{ node, node };
nano::transport::inproc::channel channel2{ node, node };
// change the bandwidth settings, no packet will be dropped
channel1.send (message);
channel2.send (message);
channel1.send (message);
channel2.send (message);
channel1.send (message, nano::transport::traffic_type::test);
channel2.send (message, nano::transport::traffic_type::test);
channel1.send (message, nano::transport::traffic_type::test);
channel2.send (message, nano::transport::traffic_type::test);
ASSERT_TIMELY_EQ (1s, 0, node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out));
}

Expand Down Expand Up @@ -962,7 +962,7 @@ TEST (network, filter_invalid_network_bytes)
// send a keepalive, from node2 to node1, with the wrong network bytes
nano::keepalive keepalive{ nano::dev::network_params.network };
const_cast<nano::networks &> (keepalive.header.network) = nano::networks::invalid;
channel->send (keepalive);
channel->send (keepalive, nano::transport::traffic_type::test);

ASSERT_TIMELY_EQ (5s, 1, node1.stats.count (nano::stat::type::error, nano::stat::detail::invalid_network));
}
Expand All @@ -981,7 +981,7 @@ TEST (network, filter_invalid_version_using)
// send a keepalive, from node2 to node1, with the wrong version_using
nano::keepalive keepalive{ nano::dev::network_params.network };
const_cast<uint8_t &> (keepalive.header.version_using) = nano::dev::network_params.network.protocol_version_min - 1;
channel->send (keepalive);
channel->send (keepalive, nano::transport::traffic_type::test);

ASSERT_TIMELY_EQ (5s, 1, node1.stats.count (nano::stat::type::error, nano::stat::detail::outdated_version));
}
Expand Down Expand Up @@ -1068,8 +1068,8 @@ TEST (network, purge_dead_channel)

auto & node1 = *system.add_node (flags);

node1.observers.socket_connected.add ([&] (nano::transport::tcp_socket & sock) {
system.logger.debug (nano::log::type::test, "Connected: {}", sock);
node1.observers.socket_connected.add ([&] (auto const & socket) {
system.logger.debug (nano::log::type::test, "Connected socket: {}", nano::streamed (socket));
});

auto & node2 = *system.add_node (flags);
Expand Down Expand Up @@ -1119,8 +1119,8 @@ TEST (network, purge_dead_channel_remote)
auto & node1 = *system.add_node (flags);
auto & node2 = *system.add_node (flags);

node2.observers.socket_connected.add ([&] (nano::transport::tcp_socket & sock) {
system.logger.debug (nano::log::type::test, "Connected: {}", sock);
node2.observers.socket_connected.add ([&] (auto const & socket) {
system.logger.debug (nano::log::type::test, "Connected socket: {}", nano::streamed (socket));
});

ASSERT_EQ (node1.network.size (), 1);
Expand Down
16 changes: 2 additions & 14 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ TEST (node, confirm_locked)
.sign (nano::keypair ().prv, 0)
.work (0)
.build ();
system.nodes[0]->network.flood_block (block);
system.nodes[0]->network.flood_block (block, nano::transport::traffic_type::test);
}

TEST (node_config, random_rep)
Expand Down Expand Up @@ -1005,14 +1005,9 @@ TEST (node, fork_no_vote_quorum)
ASSERT_FALSE (system.wallet (1)->store.fetch (transaction, key1, key3));
auto vote = std::make_shared<nano::vote> (key1, key3, 0, 0, std::vector<nano::block_hash>{ send2->hash () });
nano::confirm_ack confirm{ nano::dev::network_params.network, vote };
std::vector<uint8_t> buffer;
{
nano::vectorstream stream (buffer);
confirm.serialize (stream);
}
auto channel = node2.network.find_node_id (node3.node_id.pub);
ASSERT_NE (nullptr, channel);
channel->send_buffer (nano::shared_const_buffer (std::move (buffer)));
channel->send (confirm, nano::transport::traffic_type::test);
ASSERT_TIMELY (10s, node3.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::in) >= 3);
ASSERT_EQ (node1.latest (nano::dev::genesis_key.pub), send1->hash ());
ASSERT_EQ (node2.latest (nano::dev::genesis_key.pub), send1->hash ());
Expand Down Expand Up @@ -2662,13 +2657,6 @@ TEST (node, dont_write_lock_node)

TEST (node, bidirectional_tcp)
{
#ifdef _WIN32
if (nano::rocksdb_config::using_rocksdb_in_tests ())
{
// Don't test this in rocksdb mode
GTEST_SKIP ();
}
#endif
nano::test::system system;
nano::node_flags node_flags;
// Disable bootstrap to start elections for new blocks
Expand Down
5 changes: 3 additions & 2 deletions nano/core_test/peer_container.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ TEST (peer_container, reserved_ip_is_not_a_peer)

// Test the TCP channel cleanup function works properly. It is used to remove peers that are not
// exchanging messages after a while.
TEST (peer_container, tcp_channel_cleanup_works)
TEST (peer_container, DISABLED_tcp_channel_cleanup_works)
{
nano::test::system system;
nano::node_config node_config = system.default_config ();
Expand Down Expand Up @@ -90,6 +90,7 @@ TEST (peer_container, tcp_channel_cleanup_works)

for (auto it = 0; node1.network.tcp_channels.size () > 1 && it < 10; ++it)
{
// FIXME: This is racy and doesn't work reliably
// we can't control everything the nodes are doing in background, so using the middle time as
// the cutoff point.
auto const channel1_last_packet_sent = channel1->get_last_packet_sent ();
Expand Down Expand Up @@ -254,7 +255,7 @@ TEST (peer_container, depeer_on_outdated_version)
nano::keepalive keepalive{ nano::dev::network_params.network };
const_cast<uint8_t &> (keepalive.header.version_using) = nano::dev::network_params.network.protocol_version_min - 1;
ASSERT_TIMELY (5s, channel->alive ());
channel->send (keepalive);
channel->send (keepalive, nano::transport::traffic_type::test);

ASSERT_TIMELY (5s, !channel->alive ());
}
2 changes: 1 addition & 1 deletion nano/core_test/rep_crawler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ TEST (rep_crawler, ignore_rebroadcasted)

auto tick = [&] () {
nano::confirm_ack msg{ nano::dev::network_params.network, vote, /* rebroadcasted */ true };
channel2to1->send (msg, nullptr, nano::transport::buffer_drop_policy::no_socket_drop);
channel2to1->send (msg, nano::transport::traffic_type::test);
return false;
};

Expand Down
Loading
Loading