diff --git a/src/cpp/rtps/transport/TCPChannelResource.cpp b/src/cpp/rtps/transport/TCPChannelResource.cpp index fcf816cbc6c..8581534c85f 100644 --- a/src/cpp/rtps/transport/TCPChannelResource.cpp +++ b/src/cpp/rtps/transport/TCPChannelResource.cpp @@ -176,9 +176,9 @@ void TCPChannelResource::add_logical_port_response( negotiating_logical_ports_.erase(it); if (portIt != pending_logical_output_ports_.end()) { - pending_logical_output_ports_.erase(portIt); if (success) { + pending_logical_output_ports_.erase(portIt); logical_output_ports_.push_back(port); logInfo(RTCP, "OpenedLogicalPort: " << port); } diff --git a/test/unittest/transport/TCPv4Tests.cpp b/test/unittest/transport/TCPv4Tests.cpp index 510932bddb5..3fbdfb2fd55 100644 --- a/test/unittest/transport/TCPv4Tests.cpp +++ b/test/unittest/transport/TCPv4Tests.cpp @@ -1852,6 +1852,92 @@ TEST_F(TCPv4Tests, non_blocking_send) } #endif // ifndef _WIN32 +// This test verifies that a server can reconnect to a client after the client has once failed in a +// openLogicalPort request +TEST_F(TCPv4Tests, reconnect_after_open_port_failure) +{ + eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Warning); + uint16_t port = g_default_port; + // Create a TCP Server transport + TCPv4TransportDescriptor serverDescriptor; + serverDescriptor.add_listener_port(port); + std::unique_ptr serverTransportUnderTest(new TCPv4Transport(serverDescriptor)); + serverTransportUnderTest->init(); + + // Create a TCP Client transport + TCPv4TransportDescriptor clientDescriptor; + std::unique_ptr clientTransportUnderTest(new MockTCPv4Transport(clientDescriptor)); + clientTransportUnderTest->init(); + + // Add initial peer to the client + Locator_t initialPeerLocator; + IPLocator::createLocator(LOCATOR_KIND_TCPv4, "127.0.0.1", port, initialPeerLocator); + IPLocator::setLogicalPort(initialPeerLocator, 7410); + + // Connect client to server + EXPECT_TRUE(serverTransportUnderTest->OpenInputChannel(initialPeerLocator, nullptr, 0x00FF)); + SendResourceList client_resource_list; + ASSERT_TRUE(clientTransportUnderTest->OpenOutputChannel(client_resource_list, initialPeerLocator)); + ASSERT_FALSE(client_resource_list.empty()); + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + auto channel = clientTransportUnderTest->get_channel_resources().begin()->second; + + // Logical port is opened + ASSERT_TRUE(channel->is_logical_port_opened(7410)); + + // Disconnect server + EXPECT_TRUE(serverTransportUnderTest->CloseInputChannel(initialPeerLocator)); + serverTransportUnderTest.reset(); + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + // Client should have passed logical port to pending list + ASSERT_FALSE(channel->is_logical_port_opened(7410)); + ASSERT_TRUE(channel->is_logical_port_added(7410)); + + // Now try normal reconnection + serverTransportUnderTest.reset(new TCPv4Transport(serverDescriptor)); + serverTransportUnderTest->init(); + ASSERT_TRUE(serverTransportUnderTest->OpenInputChannel(initialPeerLocator, nullptr, 0x00FF)); + clientTransportUnderTest->send(nullptr, 0, channel->locator(), initialPeerLocator); // connect() + + // Logical port is opened (moved from pending list) + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + ASSERT_TRUE(channel->is_logical_port_opened(7410)); + + // Disconnect server + EXPECT_TRUE(serverTransportUnderTest->CloseInputChannel(initialPeerLocator)); + serverTransportUnderTest.reset(); + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + // Client should have passed logical port to pending list + ASSERT_FALSE(channel->is_logical_port_opened(7410)); + ASSERT_TRUE(channel->is_logical_port_added(7410)); + + // Now try reconnect the server and close server's input channel before client's open logical + // port request, and then delete server and reconnect + serverTransportUnderTest.reset(new TCPv4Transport(serverDescriptor)); + serverTransportUnderTest->init(); + ASSERT_TRUE(serverTransportUnderTest->OpenInputChannel(initialPeerLocator, nullptr, 0x00FF)); + EXPECT_TRUE(serverTransportUnderTest->CloseInputChannel(initialPeerLocator)); + clientTransportUnderTest->send(nullptr, 0, channel->locator(), initialPeerLocator); // connect() + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + serverTransportUnderTest.reset(); + ASSERT_FALSE(channel->is_logical_port_opened(7410)); + ASSERT_TRUE(channel->is_logical_port_added(7410)); + + // Now try normal reconnection + serverTransportUnderTest.reset(new TCPv4Transport(serverDescriptor)); + serverTransportUnderTest->init(); + ASSERT_TRUE(serverTransportUnderTest->OpenInputChannel(initialPeerLocator, nullptr, 0x00FF)); + clientTransportUnderTest->send(nullptr, 0, channel->locator(), initialPeerLocator); // connect() + + // Logical port is opened (moved from pending list) + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + ASSERT_TRUE(channel->is_logical_port_opened(7410)); + + // Clear test + EXPECT_TRUE(serverTransportUnderTest->CloseInputChannel(initialPeerLocator)); + client_resource_list.clear(); +} + void TCPv4Tests::HELPER_SetDescriptorDefaults() { descriptor.add_listener_port(g_default_port); diff --git a/test/unittest/transport/TCPv6Tests.cpp b/test/unittest/transport/TCPv6Tests.cpp index ee1cb45ebf8..746ead129a9 100644 --- a/test/unittest/transport/TCPv6Tests.cpp +++ b/test/unittest/transport/TCPv6Tests.cpp @@ -205,6 +205,70 @@ TEST_F(TCPv6Tests, autofill_port) EXPECT_TRUE(transportUnderTest_multiple_autofill.configuration()->listening_ports.size() == 3); } +static void GetIP6s( + std::vector& interfaces) +{ + IPFinder::getIPs(&interfaces, false); + auto new_end = remove_if(interfaces.begin(), + interfaces.end(), + [](IPFinder::info_IP ip) + { + return ip.type != IPFinder::IP6 && ip.type != IPFinder::IP6_LOCAL; + }); + interfaces.erase(new_end, interfaces.end()); + std::for_each(interfaces.begin(), interfaces.end(), [](IPFinder::info_IP& loc) + { + loc.locator.kind = LOCATOR_KIND_TCPv6; + }); +} + +TEST_F(TCPv6Tests, check_TCPv6_interface_whitelist_initialization) +{ + std::vector interfaces; + + GetIP6s(interfaces); + + // asio::ip::addres_v6 appends the interface name to the IP address, but the locator does not + // Create two different vectors to compare them + std::vector asio_interfaces; + std::vector locator_interfaces; + for (auto& ip : interfaces) + { + asio_interfaces.push_back(ip.name); + locator_interfaces.push_back(IPLocator::toIPv6string(ip.locator)); + } + // Add manually localhost to test adding multiple interfaces + asio_interfaces.push_back("::1"); + locator_interfaces.push_back("::1"); + + for (auto& ip : locator_interfaces) + { + descriptor.interfaceWhiteList.emplace_back(ip); + } + descriptor.add_listener_port(g_default_port); + MockTCPv6Transport transportUnderTest(descriptor); + transportUnderTest.init(); + + // Check that the transport whitelist and the acceptors map is the same size as the locator_interfaces + ASSERT_EQ(transportUnderTest.get_interface_whitelist().size(), descriptor.interfaceWhiteList.size()); + ASSERT_EQ(transportUnderTest.get_acceptors_map().size(), descriptor.interfaceWhiteList.size()); + + // Check that every interface is in the whitelist + auto check_whitelist = transportUnderTest.get_interface_whitelist(); + for (auto& ip : asio_interfaces) + { + ASSERT_NE(std::find(check_whitelist.begin(), check_whitelist.end(), asio::ip::address_v6::from_string( + ip)), check_whitelist.end()); + } + + // Check that every interface is in the acceptors map + for (const auto& test : transportUnderTest.get_acceptors_map()) + { + ASSERT_NE(std::find(locator_interfaces.begin(), locator_interfaces.end(), IPLocator::toIPv6string( + test.first)), locator_interfaces.end()); + } +} + // This test verifies server's channel resources mapping keys uniqueness, where keys are clients locators. // Clients typically communicated its PID as its locator port. When having several clients in the same // process this lead to overwriting server's channel resources map elements. @@ -326,6 +390,92 @@ TEST_F(TCPv6Tests, non_blocking_send) } #endif // ifndef _WIN32 +// This test verifies that a server can reconnect to a client after the client has once failed in a +// openLogicalPort request +TEST_F(TCPv6Tests, reconnect_after_open_port_failure) +{ + eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Warning); + uint16_t port = g_default_port; + // Create a TCP Server transport + TCPv6TransportDescriptor serverDescriptor; + serverDescriptor.add_listener_port(port); + std::unique_ptr serverTransportUnderTest(new TCPv6Transport(serverDescriptor)); + serverTransportUnderTest->init(); + + // Create a TCP Client transport + TCPv6TransportDescriptor clientDescriptor; + std::unique_ptr clientTransportUnderTest(new MockTCPv6Transport(clientDescriptor)); + clientTransportUnderTest->init(); + + // Add initial peer to the client + Locator_t initialPeerLocator; + IPLocator::createLocator(LOCATOR_KIND_TCPv6, "::1", port, initialPeerLocator); + IPLocator::setLogicalPort(initialPeerLocator, 7410); + + // Connect client to server + EXPECT_TRUE(serverTransportUnderTest->OpenInputChannel(initialPeerLocator, nullptr, 0x00FF)); + SendResourceList client_resource_list; + ASSERT_TRUE(clientTransportUnderTest->OpenOutputChannel(client_resource_list, initialPeerLocator)); + ASSERT_FALSE(client_resource_list.empty()); + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + auto channel = clientTransportUnderTest->get_channel_resources().begin()->second; + + // Logical port is opened + ASSERT_TRUE(channel->is_logical_port_opened(7410)); + + // Disconnect server + EXPECT_TRUE(serverTransportUnderTest->CloseInputChannel(initialPeerLocator)); + serverTransportUnderTest.reset(); + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + // Client should have passed logical port to pending list + ASSERT_FALSE(channel->is_logical_port_opened(7410)); + ASSERT_TRUE(channel->is_logical_port_added(7410)); + + // Now try normal reconnection + serverTransportUnderTest.reset(new TCPv6Transport(serverDescriptor)); + serverTransportUnderTest->init(); + ASSERT_TRUE(serverTransportUnderTest->OpenInputChannel(initialPeerLocator, nullptr, 0x00FF)); + clientTransportUnderTest->send(nullptr, 0, channel->locator(), initialPeerLocator); // connect() + + // Logical port is opened (moved from pending list) + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + ASSERT_TRUE(channel->is_logical_port_opened(7410)); + + // Disconnect server + EXPECT_TRUE(serverTransportUnderTest->CloseInputChannel(initialPeerLocator)); + serverTransportUnderTest.reset(); + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + // Client should have passed logical port to pending list + ASSERT_FALSE(channel->is_logical_port_opened(7410)); + ASSERT_TRUE(channel->is_logical_port_added(7410)); + + // Now try reconnect the server and close server's input channel before client's open logical + // port request, and then delete server and reconnect + serverTransportUnderTest.reset(new TCPv6Transport(serverDescriptor)); + serverTransportUnderTest->init(); + ASSERT_TRUE(serverTransportUnderTest->OpenInputChannel(initialPeerLocator, nullptr, 0x00FF)); + EXPECT_TRUE(serverTransportUnderTest->CloseInputChannel(initialPeerLocator)); + clientTransportUnderTest->send(nullptr, 0, channel->locator(), initialPeerLocator); // connect() + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + serverTransportUnderTest.reset(); + ASSERT_FALSE(channel->is_logical_port_opened(7410)); + ASSERT_TRUE(channel->is_logical_port_added(7410)); + + // Now try normal reconnection + serverTransportUnderTest.reset(new TCPv6Transport(serverDescriptor)); + serverTransportUnderTest->init(); + ASSERT_TRUE(serverTransportUnderTest->OpenInputChannel(initialPeerLocator, nullptr, 0x00FF)); + clientTransportUnderTest->send(nullptr, 0, channel->locator(), initialPeerLocator); // connect() + + // Logical port is opened (moved from pending list) + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + ASSERT_TRUE(channel->is_logical_port_opened(7410)); + + // Clear test + EXPECT_TRUE(serverTransportUnderTest->CloseInputChannel(initialPeerLocator)); + client_resource_list.clear(); +} + /* TEST_F(TCPv6Tests, send_and_receive_between_both_secure_ports) { diff --git a/test/unittest/transport/mock/MockTCPv4Transport.h b/test/unittest/transport/mock/MockTCPv4Transport.h index f5aff8b452b..ffb7d404897 100644 --- a/test/unittest/transport/mock/MockTCPv4Transport.h +++ b/test/unittest/transport/mock/MockTCPv4Transport.h @@ -56,6 +56,14 @@ class MockTCPv4Transport : public TCPv4Transport return acceptors_; } + bool send( + const fastrtps::rtps::octet* send_buffer, + uint32_t send_buffer_size, + const fastrtps::rtps::Locator_t& send_resource_locator, + const Locator_t& remote_locator) + { + return TCPv4Transport::send(send_buffer, send_buffer_size, send_resource_locator, remote_locator); + } }; } // namespace rtps diff --git a/test/unittest/transport/mock/MockTCPv6Transport.h b/test/unittest/transport/mock/MockTCPv6Transport.h index 37b8d7c02d3..3ecbfaf3db9 100644 --- a/test/unittest/transport/mock/MockTCPv6Transport.h +++ b/test/unittest/transport/mock/MockTCPv6Transport.h @@ -46,6 +46,25 @@ class MockTCPv6Transport : public TCPv6Transport return unbound_channel_resources_; } + const std::vector& get_interface_whitelist() const + { + return interface_whitelist_; + } + + const std::map>& get_acceptors_map() const + { + return acceptors_; + } + + bool send( + const fastrtps::rtps::octet* send_buffer, + uint32_t send_buffer_size, + const fastrtps::rtps::Locator_t& send_resource_locator, + const Locator_t& remote_locator) + { + return TCPv6Transport::send(send_buffer, send_buffer_size, send_resource_locator, remote_locator); + } + }; } // namespace rtps