Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[20180] TCP Client&Server Participant Decision-Making (backport #4277) #4386

Merged
merged 2 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/cpp/fastdds/publisher/DataWriterImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,7 @@ const DataWriterQos& DataWriterImpl::get_qos() const
ReturnCode_t DataWriterImpl::set_listener(
DataWriterListener* listener)
{
std::lock_guard<std::mutex> scoped_lock(listener_mutex_);
listener_ = listener;
return ReturnCode_t::RETCODE_OK;
}
Expand Down Expand Up @@ -1779,6 +1780,7 @@ bool DataWriterImpl::can_qos_be_updated(
DataWriterListener* DataWriterImpl::get_listener_for(
const StatusMask& status)
{
std::lock_guard<std::mutex> scoped_lock(listener_mutex_);
if (listener_ != nullptr &&
user_datawriter_->get_status_mask().is_active(status))
{
Expand Down
3 changes: 3 additions & 0 deletions src/cpp/fastdds/publisher/DataWriterImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,9 @@ class DataWriterImpl : protected rtps::IReaderDataFilter
//! DataWriterListener
DataWriterListener* listener_ = nullptr;

//! Mutex to protect listener_
std::mutex listener_mutex_;

//!Listener to capture the events of the Writer
class InnerDataWriterListener : public fastrtps::rtps::WriterListener
{
Expand Down
3 changes: 1 addition & 2 deletions src/cpp/rtps/attributes/RTPSParticipantAttributes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ static void setup_transports_large_data(

auto tcp_transport = create_tcpv4_transport(att);
att.userTransports.push_back(tcp_transport);
att.properties.properties().emplace_back("fastdds.tcp_transport.non_blocking_send", "true");

Locator_t tcp_loc;
tcp_loc.kind = LOCATOR_KIND_TCPv4;
Expand Down Expand Up @@ -235,7 +234,6 @@ static void setup_transports_large_datav6(

auto tcp_transport = create_tcpv6_transport(att);
att.userTransports.push_back(tcp_transport);
att.properties.properties().emplace_back("fastdds.tcp_transport.non_blocking_send", "true");

Locator_t tcp_loc;
tcp_loc.kind = LOCATOR_KIND_TCPv6;
Expand All @@ -253,6 +251,7 @@ static void setup_transports_large_datav6(
{
Locator_t pdp_locator;
pdp_locator.kind = LOCATOR_KIND_UDPv6;
IPLocator::setIPv6(pdp_locator, "ff1e::ffff:efff:1");
att.builtin.metatrafficMulticastLocatorList.push_back(pdp_locator);
}
}
Expand Down
4 changes: 1 addition & 3 deletions src/cpp/rtps/transport/TCPChannelResource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ ResponseCode TCPChannelResource::process_bind_request(
if (connection_status_.compare_exchange_strong(expected, eConnectionStatus::eEstablished))
{
locator_ = IPLocator::toPhysicalLocator(locator);
logInfo(RTCP_MSG, "Connection Stablished");
logInfo(RTCP_MSG, "Connection Established");
return RETCODE_OK;
}
else if (expected == eConnectionStatus::eEstablished)
Expand Down Expand Up @@ -139,9 +139,7 @@ void TCPChannelResource::add_logical_port(
pending_logical_output_ports_.emplace_back(port);
if (connection_established())
{
scopedLock.unlock();
TCPTransactionId id = rtcp_manager->sendOpenLogicalPortRequest(this, port);
scopedLock.lock();
negotiating_logical_ports_[id] = port;
}
}
Expand Down
1 change: 1 addition & 0 deletions src/cpp/rtps/transport/TCPChannelResourceBasic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ void TCPChannelResourceBasic::disconnect()
{
if (eConnecting < change_status(eConnectionStatus::eDisconnected) && alive())
{
std::lock_guard<std::mutex> read_lock(read_mutex_);
auto socket = socket_;

std::error_code ec;
Expand Down
14 changes: 7 additions & 7 deletions src/cpp/rtps/transport/TCPSenderResource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ class TCPSenderResource : public fastrtps::rtps::SenderResource

TCPSenderResource(
TCPTransportInterface& transport,
std::shared_ptr<TCPChannelResource>& channel)
eprosima::fastrtps::rtps::Locator_t& locator)
: fastrtps::rtps::SenderResource(transport.kind())
, channel_(channel)
, locator_(locator)
{
// Implementation functions are bound to the right transport parameters
clean_up = [this, &transport]()
{
transport.CloseOutputChannel(channel_);
transport.CloseOutputChannel(locator_);
};

send_lambda_ = [this, &transport](
Expand All @@ -49,7 +49,7 @@ class TCPSenderResource : public fastrtps::rtps::SenderResource
fastrtps::rtps::LocatorsIterator* destination_locators_end,
const std::chrono::steady_clock::time_point&) -> bool
{
return transport.send(data, dataSize, channel_, destination_locators_begin,
return transport.send(data, dataSize, locator_, destination_locators_begin,
destination_locators_end);
};
}
Expand All @@ -62,9 +62,9 @@ class TCPSenderResource : public fastrtps::rtps::SenderResource
}
}

std::shared_ptr<TCPChannelResource>& channel()
fastrtps::rtps::Locator_t& locator()
{
return channel_;
return locator_;
}

static TCPSenderResource* cast(
Expand Down Expand Up @@ -102,7 +102,7 @@ class TCPSenderResource : public fastrtps::rtps::SenderResource
TCPSenderResource& operator =(
const SenderResource&) = delete;

std::shared_ptr<TCPChannelResource> channel_;
fastrtps::rtps::Locator_t locator_;
};

} // namespace rtps
Expand Down
Loading
Loading