Skip to content

Commit

Permalink
Fix TCP reconnection after open logical port failure (#4324)
Browse files Browse the repository at this point in the history
* Refs #20141: Relocate pending ports erasing

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20141: Add test

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20141: Uncrustify

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20141: Fix asan tests

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20141: Changes after rebase

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20141: Fix windows tests

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20141: Uncrustify

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20141: Apply suggestions

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

---------

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>
(cherry picked from commit 3db2db8)
  • Loading branch information
jepemi committed Mar 13, 2024
1 parent 4efd85c commit 1a3a8f7
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/cpp/rtps/transport/TCPChannelResource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,9 @@ void TCPChannelResource::add_logical_port_response(
negotiating_logical_ports_.erase(it);
if (portIt != pending_logical_output_ports_.end())
{
pending_logical_output_ports_.erase(portIt);
if (success)
{
pending_logical_output_ports_.erase(portIt);
logical_output_ports_.push_back(port);
EPROSIMA_LOG_INFO(RTCP, "OpenedLogicalPort: " << port);
}
Expand Down
86 changes: 86 additions & 0 deletions test/unittest/transport/TCPv4Tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1961,6 +1961,92 @@ TEST_F(TCPv4Tests, non_blocking_send)
}
#endif // ifndef _WIN32

// This test verifies that a server can reconnect to a client after the client has once failed in a
// openLogicalPort request
TEST_F(TCPv4Tests, reconnect_after_open_port_failure)
{
eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Warning);
uint16_t port = g_default_port;
// Create a TCP Server transport
TCPv4TransportDescriptor serverDescriptor;
serverDescriptor.add_listener_port(port);
std::unique_ptr<TCPv4Transport> serverTransportUnderTest(new TCPv4Transport(serverDescriptor));
serverTransportUnderTest->init();

// Create a TCP Client transport
TCPv4TransportDescriptor clientDescriptor;
std::unique_ptr<MockTCPv4Transport> clientTransportUnderTest(new MockTCPv4Transport(clientDescriptor));
clientTransportUnderTest->init();

// Add initial peer to the client
Locator_t initialPeerLocator;
IPLocator::createLocator(LOCATOR_KIND_TCPv4, "127.0.0.1", port, initialPeerLocator);
IPLocator::setLogicalPort(initialPeerLocator, 7410);

// Connect client to server
EXPECT_TRUE(serverTransportUnderTest->OpenInputChannel(initialPeerLocator, nullptr, 0x00FF));
SendResourceList client_resource_list;
ASSERT_TRUE(clientTransportUnderTest->OpenOutputChannel(client_resource_list, initialPeerLocator));
ASSERT_FALSE(client_resource_list.empty());
std::this_thread::sleep_for(std::chrono::milliseconds(300));
auto channel = clientTransportUnderTest->get_channel_resources().begin()->second;

// Logical port is opened
ASSERT_TRUE(channel->is_logical_port_opened(7410));

// Disconnect server
EXPECT_TRUE(serverTransportUnderTest->CloseInputChannel(initialPeerLocator));
serverTransportUnderTest.reset();
std::this_thread::sleep_for(std::chrono::milliseconds(300));
// Client should have passed logical port to pending list
ASSERT_FALSE(channel->is_logical_port_opened(7410));
ASSERT_TRUE(channel->is_logical_port_added(7410));

// Now try normal reconnection
serverTransportUnderTest.reset(new TCPv4Transport(serverDescriptor));
serverTransportUnderTest->init();
ASSERT_TRUE(serverTransportUnderTest->OpenInputChannel(initialPeerLocator, nullptr, 0x00FF));
clientTransportUnderTest->send(nullptr, 0, channel->locator(), initialPeerLocator); // connect()

// Logical port is opened (moved from pending list)
std::this_thread::sleep_for(std::chrono::milliseconds(300));
ASSERT_TRUE(channel->is_logical_port_opened(7410));

// Disconnect server
EXPECT_TRUE(serverTransportUnderTest->CloseInputChannel(initialPeerLocator));
serverTransportUnderTest.reset();
std::this_thread::sleep_for(std::chrono::milliseconds(300));
// Client should have passed logical port to pending list
ASSERT_FALSE(channel->is_logical_port_opened(7410));
ASSERT_TRUE(channel->is_logical_port_added(7410));

// Now try reconnect the server and close server's input channel before client's open logical
// port request, and then delete server and reconnect
serverTransportUnderTest.reset(new TCPv4Transport(serverDescriptor));
serverTransportUnderTest->init();
ASSERT_TRUE(serverTransportUnderTest->OpenInputChannel(initialPeerLocator, nullptr, 0x00FF));
EXPECT_TRUE(serverTransportUnderTest->CloseInputChannel(initialPeerLocator));
clientTransportUnderTest->send(nullptr, 0, channel->locator(), initialPeerLocator); // connect()
std::this_thread::sleep_for(std::chrono::milliseconds(300));
serverTransportUnderTest.reset();
ASSERT_FALSE(channel->is_logical_port_opened(7410));
ASSERT_TRUE(channel->is_logical_port_added(7410));

// Now try normal reconnection
serverTransportUnderTest.reset(new TCPv4Transport(serverDescriptor));
serverTransportUnderTest->init();
ASSERT_TRUE(serverTransportUnderTest->OpenInputChannel(initialPeerLocator, nullptr, 0x00FF));
clientTransportUnderTest->send(nullptr, 0, channel->locator(), initialPeerLocator); // connect()

// Logical port is opened (moved from pending list)
std::this_thread::sleep_for(std::chrono::milliseconds(300));
ASSERT_TRUE(channel->is_logical_port_opened(7410));

// Clear test
EXPECT_TRUE(serverTransportUnderTest->CloseInputChannel(initialPeerLocator));
client_resource_list.clear();
}

void TCPv4Tests::HELPER_SetDescriptorDefaults()
{
descriptor.add_listener_port(g_default_port);
Expand Down
86 changes: 86 additions & 0 deletions test/unittest/transport/TCPv6Tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,92 @@ TEST_F(TCPv6Tests, non_blocking_send)
}
#endif // ifndef _WIN32

// This test verifies that a server can reconnect to a client after the client has once failed in a
// openLogicalPort request
TEST_F(TCPv6Tests, reconnect_after_open_port_failure)
{
eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Warning);
uint16_t port = g_default_port;
// Create a TCP Server transport
TCPv6TransportDescriptor serverDescriptor;
serverDescriptor.add_listener_port(port);
std::unique_ptr<TCPv6Transport> serverTransportUnderTest(new TCPv6Transport(serverDescriptor));
serverTransportUnderTest->init();

// Create a TCP Client transport
TCPv6TransportDescriptor clientDescriptor;
std::unique_ptr<MockTCPv6Transport> clientTransportUnderTest(new MockTCPv6Transport(clientDescriptor));
clientTransportUnderTest->init();

// Add initial peer to the client
Locator_t initialPeerLocator;
IPLocator::createLocator(LOCATOR_KIND_TCPv6, "::1", port, initialPeerLocator);
IPLocator::setLogicalPort(initialPeerLocator, 7410);

// Connect client to server
EXPECT_TRUE(serverTransportUnderTest->OpenInputChannel(initialPeerLocator, nullptr, 0x00FF));
SendResourceList client_resource_list;
ASSERT_TRUE(clientTransportUnderTest->OpenOutputChannel(client_resource_list, initialPeerLocator));
ASSERT_FALSE(client_resource_list.empty());
std::this_thread::sleep_for(std::chrono::milliseconds(300));
auto channel = clientTransportUnderTest->get_channel_resources().begin()->second;

// Logical port is opened
ASSERT_TRUE(channel->is_logical_port_opened(7410));

// Disconnect server
EXPECT_TRUE(serverTransportUnderTest->CloseInputChannel(initialPeerLocator));
serverTransportUnderTest.reset();
std::this_thread::sleep_for(std::chrono::milliseconds(300));
// Client should have passed logical port to pending list
ASSERT_FALSE(channel->is_logical_port_opened(7410));
ASSERT_TRUE(channel->is_logical_port_added(7410));

// Now try normal reconnection
serverTransportUnderTest.reset(new TCPv6Transport(serverDescriptor));
serverTransportUnderTest->init();
ASSERT_TRUE(serverTransportUnderTest->OpenInputChannel(initialPeerLocator, nullptr, 0x00FF));
clientTransportUnderTest->send(nullptr, 0, channel->locator(), initialPeerLocator); // connect()

// Logical port is opened (moved from pending list)
std::this_thread::sleep_for(std::chrono::milliseconds(300));
ASSERT_TRUE(channel->is_logical_port_opened(7410));

// Disconnect server
EXPECT_TRUE(serverTransportUnderTest->CloseInputChannel(initialPeerLocator));
serverTransportUnderTest.reset();
std::this_thread::sleep_for(std::chrono::milliseconds(300));
// Client should have passed logical port to pending list
ASSERT_FALSE(channel->is_logical_port_opened(7410));
ASSERT_TRUE(channel->is_logical_port_added(7410));

// Now try reconnect the server and close server's input channel before client's open logical
// port request, and then delete server and reconnect
serverTransportUnderTest.reset(new TCPv6Transport(serverDescriptor));
serverTransportUnderTest->init();
ASSERT_TRUE(serverTransportUnderTest->OpenInputChannel(initialPeerLocator, nullptr, 0x00FF));
EXPECT_TRUE(serverTransportUnderTest->CloseInputChannel(initialPeerLocator));
clientTransportUnderTest->send(nullptr, 0, channel->locator(), initialPeerLocator); // connect()
std::this_thread::sleep_for(std::chrono::milliseconds(300));
serverTransportUnderTest.reset();
ASSERT_FALSE(channel->is_logical_port_opened(7410));
ASSERT_TRUE(channel->is_logical_port_added(7410));

// Now try normal reconnection
serverTransportUnderTest.reset(new TCPv6Transport(serverDescriptor));
serverTransportUnderTest->init();
ASSERT_TRUE(serverTransportUnderTest->OpenInputChannel(initialPeerLocator, nullptr, 0x00FF));
clientTransportUnderTest->send(nullptr, 0, channel->locator(), initialPeerLocator); // connect()

// Logical port is opened (moved from pending list)
std::this_thread::sleep_for(std::chrono::milliseconds(300));
ASSERT_TRUE(channel->is_logical_port_opened(7410));

// Clear test
EXPECT_TRUE(serverTransportUnderTest->CloseInputChannel(initialPeerLocator));
client_resource_list.clear();
}

/*
TEST_F(TCPv6Tests, send_and_receive_between_both_secure_ports)
{
Expand Down
9 changes: 9 additions & 0 deletions test/unittest/transport/mock/MockTCPv4Transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ class MockTCPv4Transport : public TCPv4Transport
return acceptors_;
}

bool send(
const fastrtps::rtps::octet* send_buffer,
uint32_t send_buffer_size,
const fastrtps::rtps::Locator_t& send_resource_locator,
const Locator_t& remote_locator)
{
return TCPv4Transport::send(send_buffer, send_buffer_size, send_resource_locator, remote_locator);
}

};

} // namespace rtps
Expand Down
9 changes: 9 additions & 0 deletions test/unittest/transport/mock/MockTCPv6Transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ class MockTCPv6Transport : public TCPv6Transport
return acceptors_;
}

bool send(
const fastrtps::rtps::octet* send_buffer,
uint32_t send_buffer_size,
const fastrtps::rtps::Locator_t& send_resource_locator,
const Locator_t& remote_locator)
{
return TCPv6Transport::send(send_buffer, send_buffer_size, send_resource_locator, remote_locator);
}

};

} // namespace rtps
Expand Down

0 comments on commit 1a3a8f7

Please sign in to comment.