Skip to content

Commit

Permalink
new CSubscriber method "IsPublished" to be aligned with CPublishers "…
Browse files Browse the repository at this point in the history
…IsSubscribed" (we may rename both functions to "IsConnected" in the future)

new additional check for IsPublished using the connection state of the matching subscriber (should ensure that IsPublished is flagged as true only if publisher is able to send data)
test added to test new behavior
  • Loading branch information
rex-schilasky committed Jun 19, 2024
1 parent 7174d2b commit e0a850a
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 11 deletions.
9 changes: 8 additions & 1 deletion ecal/core/include/ecal/ecal_subscriber.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -298,6 +298,13 @@ namespace eCAL
**/
ECAL_API bool IsCreated() const {return(m_created);}

/**
* @brief Query if the subscriber is published.
*
* @return true if published, false if not.
**/
ECAL_API bool IsPublished() const;

/**
* @brief Query the number of publishers.
*
Expand Down
18 changes: 13 additions & 5 deletions ecal/core/src/pubsub/ecal_subgate.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -295,8 +295,12 @@ namespace eCAL

iter->second->ApplyLocLayerParameter(process_id, topic_id, tlayer.type(), writer_par);
}
// inform for local publisher connection
iter->second->ApplyLocPublication(process_id, topic_id, topic_info);
// we only inform the subscriber when the publisher has already recognized at least on local subscriber
// this should avoid to set the "IsPublished" state before the publisher is able to send data
if (ecal_sample_.topic().connections_loc() > 0)
{
iter->second->ApplyLocPublication(process_id, topic_id, topic_info);
}
}
}

Expand Down Expand Up @@ -345,8 +349,12 @@ namespace eCAL
const std::string writer_par = tlayer.par_layer().SerializeAsString();
iter->second->ApplyExtLayerParameter(host_name, tlayer.type(), writer_par);
}
// inform for external publisher connection
iter->second->ApplyExtPublication(host_name, process_id, topic_id, topic_info);
// we only inform the subscriber when the publisher has already recognized at least on external subscriber
// this should avoid to set the "IsPublished" state before the publisher is able to send data
if (ecal_sample_.topic().connections_ext() > 0)
{
iter->second->ApplyExtPublication(host_name, process_id, topic_id, topic_info);
}
}
}

Expand Down
8 changes: 7 additions & 1 deletion ecal/core/src/pubsub/ecal_subscriber.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -250,6 +250,12 @@ namespace eCAL
return(m_datareader->RemEventCallback(type_));
}

bool CSubscriber::IsPublished() const
{
if (m_datareader == nullptr) return(false);
return(m_datareader->IsPublished());
}

size_t CSubscriber::GetPublisherCount() const
{
if(m_datareader == nullptr) return(0);
Expand Down
3 changes: 2 additions & 1 deletion ecal/core/src/readwrite/ecal_reader.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -96,6 +96,7 @@ namespace eCAL

bool IsCreated() const {return(m_created);}

bool IsPublished() const { return(m_loc_published || m_ext_published); }
size_t GetPublisherCount() const
{
const std::lock_guard<std::mutex> lock(m_pub_map_sync);
Expand Down
97 changes: 94 additions & 3 deletions testing/ecal/pubsub_test/src/pubsub_receive_test.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -161,8 +161,6 @@ TEST(PubSub, TimingSubscriberReceive)
EXPECT_EQ(0, eCAL::Finalize());
}



// This tests test for sporadically received empty messages which were a problem.
TEST(PubSub, SporadicEmptyReceives)
{
Expand Down Expand Up @@ -219,3 +217,96 @@ TEST(PubSub, SporadicEmptyReceives)
// finalize eCAL API
EXPECT_EQ(0, eCAL::Finalize());
}

TEST(PubSub, TestSubscriberSeen)
{
// initialize eCAL API
EXPECT_EQ(0, eCAL::Initialize(0, nullptr, "subscriber_seen"));

// enable data loopback
eCAL::Util::EnableLoopback(true);

std::atomic<bool> subscriber_seen_at_publication_start = false;
std::atomic<bool> subscriber_seen_at_publication_end = false;

std::atomic<bool> do_start_publication = false;
std::atomic<bool> publication_finished = false;

// publishing thread
auto publisher_thread = [&]() {
eCAL::CPublisher pub("blob");
pub.ShmSetAcknowledgeTimeout(500);

int cnt(0);
const auto max_runs(1000);
while (eCAL::Ok())
{
if (do_start_publication && cnt < max_runs)
{
if (cnt == 0)
{
subscriber_seen_at_publication_start = pub.IsSubscribed();
}

pub.Send(std::to_string(cnt));
cnt++;

if (cnt == max_runs)
{
subscriber_seen_at_publication_end = pub.IsSubscribed();
publication_finished = true;
break;
}
}
}
};

// subscribing thread
auto subscriber_thread = [&]() {
eCAL::CSubscriber sub("blob");
bool received(false);
auto max_lines(10);
auto receive_lambda = [&received, &max_lines](const char* /*topic_name_*/, const struct eCAL::SReceiveCallbackData* data_)
{
if (max_lines)
{
// the final log should look like this
// -----------------------------------
// Receiving 0
// Receiving 1
// Receiving 2
// Receiving 3
// Receiving 4
// Receiving 5
// Receiving 6
// Receiving 7
// Receiving 8
// Receiving 9
// -----------------------------------
std::cout << "Receiving " << std::string(static_cast<const char*>(data_->buf), data_->size) << std::endl;
max_lines--;
}
};
sub.AddReceiveCallback(receive_lambda);

while (eCAL::Ok() && !publication_finished)
{
if (sub.IsPublished()) do_start_publication = true;
}
};

// create threads for publisher and subscriber
std::thread pub_thread(publisher_thread);
std::thread sub_thread(subscriber_thread);

// join threads to the main thread
pub_thread.join();
sub_thread.join();

// finalize eCAL API
eCAL::Finalize();

// check if the publisher has seen the subscriber
EXPECT_TRUE(subscriber_seen_at_publication_start);
EXPECT_TRUE(subscriber_seen_at_publication_end);
}

0 comments on commit e0a850a

Please sign in to comment.