Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[20141] Fix TCP reconnection after open logical port failure (backport #4324) #4422

Merged
merged 3 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/cpp/rtps/transport/TCPChannelResource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,9 @@ void TCPChannelResource::add_logical_port_response(
negotiating_logical_ports_.erase(it);
if (portIt != pending_logical_output_ports_.end())
{
pending_logical_output_ports_.erase(portIt);
if (success)
{
pending_logical_output_ports_.erase(portIt);
logical_output_ports_.push_back(port);
logInfo(RTCP, "OpenedLogicalPort: " << port);
}
Expand Down
86 changes: 86 additions & 0 deletions test/unittest/transport/TCPv4Tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1852,6 +1852,92 @@ TEST_F(TCPv4Tests, non_blocking_send)
}
#endif // ifndef _WIN32

// This test verifies that a server can reconnect to a client after the client has once failed in a
// openLogicalPort request
TEST_F(TCPv4Tests, reconnect_after_open_port_failure)
{
eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Warning);
uint16_t port = g_default_port;
// Create a TCP Server transport
TCPv4TransportDescriptor serverDescriptor;
serverDescriptor.add_listener_port(port);
std::unique_ptr<TCPv4Transport> serverTransportUnderTest(new TCPv4Transport(serverDescriptor));
serverTransportUnderTest->init();

// Create a TCP Client transport
TCPv4TransportDescriptor clientDescriptor;
std::unique_ptr<MockTCPv4Transport> clientTransportUnderTest(new MockTCPv4Transport(clientDescriptor));
clientTransportUnderTest->init();

// Add initial peer to the client
Locator_t initialPeerLocator;
IPLocator::createLocator(LOCATOR_KIND_TCPv4, "127.0.0.1", port, initialPeerLocator);
IPLocator::setLogicalPort(initialPeerLocator, 7410);

// Connect client to server
EXPECT_TRUE(serverTransportUnderTest->OpenInputChannel(initialPeerLocator, nullptr, 0x00FF));
SendResourceList client_resource_list;
ASSERT_TRUE(clientTransportUnderTest->OpenOutputChannel(client_resource_list, initialPeerLocator));
ASSERT_FALSE(client_resource_list.empty());
std::this_thread::sleep_for(std::chrono::milliseconds(300));
auto channel = clientTransportUnderTest->get_channel_resources().begin()->second;

// Logical port is opened
ASSERT_TRUE(channel->is_logical_port_opened(7410));

// Disconnect server
EXPECT_TRUE(serverTransportUnderTest->CloseInputChannel(initialPeerLocator));
serverTransportUnderTest.reset();
std::this_thread::sleep_for(std::chrono::milliseconds(300));
// Client should have passed logical port to pending list
ASSERT_FALSE(channel->is_logical_port_opened(7410));
ASSERT_TRUE(channel->is_logical_port_added(7410));

// Now try normal reconnection
serverTransportUnderTest.reset(new TCPv4Transport(serverDescriptor));
serverTransportUnderTest->init();
ASSERT_TRUE(serverTransportUnderTest->OpenInputChannel(initialPeerLocator, nullptr, 0x00FF));
clientTransportUnderTest->send(nullptr, 0, channel->locator(), initialPeerLocator); // connect()

// Logical port is opened (moved from pending list)
std::this_thread::sleep_for(std::chrono::milliseconds(300));
ASSERT_TRUE(channel->is_logical_port_opened(7410));

// Disconnect server
EXPECT_TRUE(serverTransportUnderTest->CloseInputChannel(initialPeerLocator));
serverTransportUnderTest.reset();
std::this_thread::sleep_for(std::chrono::milliseconds(300));
// Client should have passed logical port to pending list
ASSERT_FALSE(channel->is_logical_port_opened(7410));
ASSERT_TRUE(channel->is_logical_port_added(7410));

// Now try reconnect the server and close server's input channel before client's open logical
// port request, and then delete server and reconnect
serverTransportUnderTest.reset(new TCPv4Transport(serverDescriptor));
serverTransportUnderTest->init();
ASSERT_TRUE(serverTransportUnderTest->OpenInputChannel(initialPeerLocator, nullptr, 0x00FF));
EXPECT_TRUE(serverTransportUnderTest->CloseInputChannel(initialPeerLocator));
clientTransportUnderTest->send(nullptr, 0, channel->locator(), initialPeerLocator); // connect()
std::this_thread::sleep_for(std::chrono::milliseconds(300));
serverTransportUnderTest.reset();
ASSERT_FALSE(channel->is_logical_port_opened(7410));
ASSERT_TRUE(channel->is_logical_port_added(7410));

// Now try normal reconnection
serverTransportUnderTest.reset(new TCPv4Transport(serverDescriptor));
serverTransportUnderTest->init();
ASSERT_TRUE(serverTransportUnderTest->OpenInputChannel(initialPeerLocator, nullptr, 0x00FF));
clientTransportUnderTest->send(nullptr, 0, channel->locator(), initialPeerLocator); // connect()

// Logical port is opened (moved from pending list)
std::this_thread::sleep_for(std::chrono::milliseconds(300));
ASSERT_TRUE(channel->is_logical_port_opened(7410));

// Clear test
EXPECT_TRUE(serverTransportUnderTest->CloseInputChannel(initialPeerLocator));
client_resource_list.clear();
}

void TCPv4Tests::HELPER_SetDescriptorDefaults()
{
descriptor.add_listener_port(g_default_port);
Expand Down
86 changes: 86 additions & 0 deletions test/unittest/transport/TCPv6Tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,92 @@ TEST_F(TCPv6Tests, non_blocking_send)
}
#endif // ifndef _WIN32

// This test verifies that a server can reconnect to a client after the client has once failed in a
// openLogicalPort request
TEST_F(TCPv6Tests, reconnect_after_open_port_failure)
{
eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Warning);
uint16_t port = g_default_port;
// Create a TCP Server transport
TCPv6TransportDescriptor serverDescriptor;
serverDescriptor.add_listener_port(port);
std::unique_ptr<TCPv6Transport> serverTransportUnderTest(new TCPv6Transport(serverDescriptor));
serverTransportUnderTest->init();

// Create a TCP Client transport
TCPv6TransportDescriptor clientDescriptor;
std::unique_ptr<MockTCPv6Transport> clientTransportUnderTest(new MockTCPv6Transport(clientDescriptor));
clientTransportUnderTest->init();

// Add initial peer to the client
Locator_t initialPeerLocator;
IPLocator::createLocator(LOCATOR_KIND_TCPv6, "::1", port, initialPeerLocator);
IPLocator::setLogicalPort(initialPeerLocator, 7410);

// Connect client to server
EXPECT_TRUE(serverTransportUnderTest->OpenInputChannel(initialPeerLocator, nullptr, 0x00FF));
SendResourceList client_resource_list;
ASSERT_TRUE(clientTransportUnderTest->OpenOutputChannel(client_resource_list, initialPeerLocator));
ASSERT_FALSE(client_resource_list.empty());
std::this_thread::sleep_for(std::chrono::milliseconds(300));
auto channel = clientTransportUnderTest->get_channel_resources().begin()->second;

// Logical port is opened
ASSERT_TRUE(channel->is_logical_port_opened(7410));

// Disconnect server
EXPECT_TRUE(serverTransportUnderTest->CloseInputChannel(initialPeerLocator));
serverTransportUnderTest.reset();
std::this_thread::sleep_for(std::chrono::milliseconds(300));
// Client should have passed logical port to pending list
ASSERT_FALSE(channel->is_logical_port_opened(7410));
ASSERT_TRUE(channel->is_logical_port_added(7410));

// Now try normal reconnection
serverTransportUnderTest.reset(new TCPv6Transport(serverDescriptor));
serverTransportUnderTest->init();
ASSERT_TRUE(serverTransportUnderTest->OpenInputChannel(initialPeerLocator, nullptr, 0x00FF));
clientTransportUnderTest->send(nullptr, 0, channel->locator(), initialPeerLocator); // connect()

// Logical port is opened (moved from pending list)
std::this_thread::sleep_for(std::chrono::milliseconds(300));
ASSERT_TRUE(channel->is_logical_port_opened(7410));

// Disconnect server
EXPECT_TRUE(serverTransportUnderTest->CloseInputChannel(initialPeerLocator));
serverTransportUnderTest.reset();
std::this_thread::sleep_for(std::chrono::milliseconds(300));
// Client should have passed logical port to pending list
ASSERT_FALSE(channel->is_logical_port_opened(7410));
ASSERT_TRUE(channel->is_logical_port_added(7410));

// Now try reconnect the server and close server's input channel before client's open logical
// port request, and then delete server and reconnect
serverTransportUnderTest.reset(new TCPv6Transport(serverDescriptor));
serverTransportUnderTest->init();
ASSERT_TRUE(serverTransportUnderTest->OpenInputChannel(initialPeerLocator, nullptr, 0x00FF));
EXPECT_TRUE(serverTransportUnderTest->CloseInputChannel(initialPeerLocator));
clientTransportUnderTest->send(nullptr, 0, channel->locator(), initialPeerLocator); // connect()
std::this_thread::sleep_for(std::chrono::milliseconds(300));
serverTransportUnderTest.reset();
ASSERT_FALSE(channel->is_logical_port_opened(7410));
ASSERT_TRUE(channel->is_logical_port_added(7410));

// Now try normal reconnection
serverTransportUnderTest.reset(new TCPv6Transport(serverDescriptor));
serverTransportUnderTest->init();
ASSERT_TRUE(serverTransportUnderTest->OpenInputChannel(initialPeerLocator, nullptr, 0x00FF));
clientTransportUnderTest->send(nullptr, 0, channel->locator(), initialPeerLocator); // connect()

// Logical port is opened (moved from pending list)
std::this_thread::sleep_for(std::chrono::milliseconds(300));
ASSERT_TRUE(channel->is_logical_port_opened(7410));

// Clear test
EXPECT_TRUE(serverTransportUnderTest->CloseInputChannel(initialPeerLocator));
client_resource_list.clear();
}

/*
TEST_F(TCPv6Tests, send_and_receive_between_both_secure_ports)
{
Expand Down
9 changes: 9 additions & 0 deletions test/unittest/transport/mock/MockTCPv4Transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ class MockTCPv4Transport : public TCPv4Transport
return acceptors_;
}

bool send(
const fastrtps::rtps::octet* send_buffer,
uint32_t send_buffer_size,
const fastrtps::rtps::Locator_t& send_resource_locator,
const Locator_t& remote_locator)
{
return TCPv4Transport::send(send_buffer, send_buffer_size, send_resource_locator, remote_locator);
}

};

} // namespace rtps
Expand Down
9 changes: 9 additions & 0 deletions test/unittest/transport/mock/MockTCPv6Transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ class MockTCPv6Transport : public TCPv6Transport
return unbound_channel_resources_;
}

bool send(
const fastrtps::rtps::octet* send_buffer,
uint32_t send_buffer_size,
const fastrtps::rtps::Locator_t& send_resource_locator,
const Locator_t& remote_locator)
{
return TCPv6Transport::send(send_buffer, send_buffer_size, send_resource_locator, remote_locator);
}

};

} // namespace rtps
Expand Down
Loading