-
Notifications
You must be signed in to change notification settings - Fork 19
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
70d7e49
commit d42362a
Showing
4 changed files
with
313 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
################################################################################ | ||
# Copyright (c) Continental. All rights reserved. | ||
# Licensed under the MIT license. See LICENSE file in the project root for details. | ||
# | ||
# SPDX-License-Identifier: MIT | ||
################################################################################ | ||
|
||
cmake_minimum_required(VERSION 3.13) | ||
|
||
project(tcp_pubsub_test) | ||
|
||
set(CMAKE_FIND_PACKAGE_PREFER_CONFIG TRUE) | ||
|
||
find_package(tcp_pubsub REQUIRED) | ||
find_package(GTest REQUIRED) | ||
|
||
set(sources | ||
src/atomic_signalable.h | ||
src/tcp_pubsub_test.cpp | ||
) | ||
|
||
add_executable (${PROJECT_NAME} | ||
${sources} | ||
) | ||
|
||
target_compile_features(${PROJECT_NAME} PUBLIC cxx_std_14) | ||
|
||
source_group(TREE ${CMAKE_CURRENT_SOURCE_DIR} FILES ${sources}) | ||
|
||
target_link_libraries (${PROJECT_NAME} | ||
tcp_pubsub::tcp_pubsub | ||
GTest::gtest_main | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,194 @@ | ||
// Copyright (c) Continental. All rights reserved. | ||
// Licensed under the MIT license. See LICENSE file in the project root for details. | ||
// | ||
// SPDX-License-Identifier: MIT | ||
|
||
#include <atomic> | ||
#include <chrono> | ||
#include <condition_variable> | ||
#include <mutex> | ||
|
||
template <typename T> | ||
class atomic_signalable | ||
{ | ||
public: | ||
atomic_signalable(T initial_value) : value(initial_value) {} | ||
|
||
atomic_signalable<T>& operator=(const T new_value) | ||
{ | ||
const std::lock_guard<std::mutex> lock(mutex); | ||
value = new_value; | ||
cv.notify_all(); | ||
return *this; | ||
} | ||
|
||
T operator++() | ||
{ | ||
const std::lock_guard<std::mutex> lock(mutex); | ||
T newValue = ++value; | ||
cv.notify_all(); | ||
return newValue; | ||
} | ||
|
||
T operator++(T) | ||
{ | ||
const std::lock_guard<std::mutex> lock(mutex); | ||
T oldValue = value++; | ||
cv.notify_all(); | ||
return oldValue; | ||
} | ||
|
||
T operator--() | ||
{ | ||
const std::lock_guard<std::mutex> lock(mutex); | ||
T newValue = --value; | ||
cv.notify_all(); | ||
return newValue; | ||
} | ||
|
||
T operator--(T) | ||
{ | ||
const std::lock_guard<std::mutex> lock(mutex); | ||
T oldValue = value--; | ||
cv.notify_all(); | ||
return oldValue; | ||
} | ||
|
||
T operator+=(const T& other) | ||
{ | ||
const std::lock_guard<std::mutex> lock(mutex); | ||
value += other; | ||
cv.notify_all(); | ||
return value; | ||
} | ||
|
||
T operator-=(const T& other) | ||
{ | ||
const std::lock_guard<std::mutex> lock(mutex); | ||
value -= other; | ||
cv.notify_all(); | ||
return value; | ||
} | ||
|
||
T operator*=(const T& other) | ||
{ | ||
const std::lock_guard<std::mutex> lock(mutex); | ||
value *= other; | ||
cv.notify_all(); | ||
return value; | ||
} | ||
|
||
T operator/=(const T& other) | ||
{ | ||
const std::lock_guard<std::mutex> lock(mutex); | ||
value /= other; | ||
cv.notify_all(); | ||
return value; | ||
} | ||
|
||
T operator%=(const T& other) | ||
{ | ||
const std::lock_guard<std::mutex> lock(mutex); | ||
value %= other; | ||
cv.notify_all(); | ||
return value; | ||
} | ||
|
||
template <typename Predicate> | ||
bool wait_for(Predicate predicate, std::chrono::milliseconds timeout) | ||
{ | ||
std::unique_lock<std::mutex> lock(mutex); | ||
return cv.wait_for(lock, timeout, [&]() { return predicate(value); }); | ||
} | ||
|
||
T get() const | ||
{ | ||
const std::lock_guard<std::mutex> lock(mutex); | ||
return value; | ||
} | ||
|
||
bool operator==(T other) const | ||
{ | ||
const std::lock_guard<std::mutex> lock(mutex); | ||
return value == other; | ||
} | ||
|
||
bool operator==(const atomic_signalable<T>& other) const | ||
{ | ||
std::lock_guard<std::mutex> lock_this(mutex); | ||
std::lock_guard<std::mutex> lock_other(other.mutex); | ||
return value == other.value; | ||
} | ||
|
||
bool operator!=(T other) const | ||
{ | ||
const std::lock_guard<std::mutex> lock(mutex); | ||
return value != other; | ||
} | ||
|
||
bool operator<(T other) const | ||
{ | ||
const std::lock_guard<std::mutex> lock(mutex); | ||
return value < other; | ||
} | ||
|
||
bool operator<=(T other) const | ||
{ | ||
const std::lock_guard<std::mutex> lock(mutex); | ||
return value <= other; | ||
} | ||
|
||
bool operator>(T other) const | ||
{ | ||
const std::lock_guard<std::mutex> lock(mutex); | ||
return value > other; | ||
} | ||
|
||
bool operator>=(T other) const | ||
{ | ||
const std::lock_guard<std::mutex> lock(mutex); | ||
return value >= other; | ||
} | ||
|
||
private: | ||
T value; | ||
std::condition_variable cv; | ||
mutable std::mutex mutex; | ||
}; | ||
|
||
|
||
template <typename T> | ||
bool operator==(const T& other, const atomic_signalable<T>& atomic) | ||
{ | ||
return atomic == other; | ||
} | ||
|
||
template <typename T> | ||
bool operator!=(const T& other, const atomic_signalable<T>& atomic) | ||
{ | ||
return atomic != other; | ||
} | ||
|
||
template <typename T> | ||
bool operator<(const T& other, const atomic_signalable<T>& atomic) | ||
{ | ||
return atomic > other; | ||
} | ||
|
||
template <typename T> | ||
bool operator<=(const T& other, const atomic_signalable<T>& atomic) | ||
{ | ||
return atomic >= other; | ||
} | ||
|
||
template <typename T> | ||
bool operator>(const T& other, const atomic_signalable<T>& atomic) | ||
{ | ||
return atomic < other; | ||
} | ||
|
||
template <typename T> | ||
bool operator>=(const T& other, const atomic_signalable<T>& atomic) | ||
{ | ||
return atomic <= other; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
// Copyright (c) Continental. All rights reserved. | ||
// Licensed under the MIT license. See LICENSE file in the project root for details. | ||
// | ||
// SPDX-License-Identifier: MIT | ||
|
||
#include <chrono> | ||
#include <functional> | ||
#include <memory> | ||
|
||
#include <tcp_pubsub/executor.h> | ||
#include <tcp_pubsub/publisher.h> | ||
#include <tcp_pubsub/subscriber.h> | ||
|
||
#include "atomic_signalable.h" | ||
|
||
#include <gtest/gtest.h> | ||
|
||
TEST(tcp_pubsub, basic_test) | ||
{ | ||
atomic_signalable<int> num_messages_received(0); | ||
|
||
// Create executor | ||
std::shared_ptr<tcp_pubsub::Executor> executor = std::make_shared<tcp_pubsub::Executor>(1); | ||
|
||
// Create publisher | ||
tcp_pubsub::Publisher hello_world_publisher(executor, 1588); | ||
|
||
// Create subscriber | ||
tcp_pubsub::Subscriber hello_world_subscriber(executor); | ||
|
||
// Subscribe to localhost on port 1588 | ||
hello_world_subscriber.addSession("127.0.0.1", 1588); | ||
|
||
std::string received_message; | ||
|
||
// Create a callback that will be called when a message is received | ||
std::function<void(const tcp_pubsub::CallbackData& callback_data)> callback_function = | ||
[&received_message, &num_messages_received](const tcp_pubsub::CallbackData& callback_data) | ||
{ | ||
received_message = std::string(callback_data.buffer_->data(), callback_data.buffer_->size()); | ||
++num_messages_received; | ||
}; | ||
|
||
// Register the callback | ||
hello_world_subscriber.setCallback(callback_function); | ||
|
||
// Wait up to 1 second for the subscriber to connect | ||
for (int i = 0; i < 10; ++i) | ||
{ | ||
if (hello_world_subscriber.getSessions().at(0)->isConnected()) | ||
{ | ||
break; | ||
} | ||
std::this_thread::sleep_for(std::chrono::milliseconds(100)); | ||
} | ||
|
||
// Check that the subscriber is connected | ||
EXPECT_TRUE(hello_world_subscriber.getSessions().at(0)->isConnected()); | ||
|
||
// Publish "Hello World 1" | ||
{ | ||
const std::string message = "Hello World 1"; | ||
hello_world_publisher.send(message.data(), message.size()); | ||
} | ||
|
||
// wait for message to be received | ||
num_messages_received.wait_for([](int value) { return value > 0; }, std::chrono::seconds(1)); | ||
|
||
// Check that the message was received | ||
EXPECT_EQ(received_message, "Hello World 1"); | ||
EXPECT_EQ(num_messages_received.get(), 1); | ||
|
||
// Publish "Hello World 2" | ||
{ | ||
const std::string message = "Hello World 2"; | ||
hello_world_publisher.send(message.data(), message.size()); | ||
} | ||
|
||
// wait for message to be received | ||
num_messages_received.wait_for([](int value) { return value > 1; }, std::chrono::seconds(1)); | ||
|
||
// Check that the message was received | ||
EXPECT_EQ(received_message, "Hello World 2"); | ||
EXPECT_EQ(num_messages_received.get(), 2); | ||
} |