diff --git a/CMakeLists.txt b/CMakeLists.txt index f35a182..92b25eb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -31,7 +31,7 @@ option(TCP_PUBSUB_USE_BUILTIN_RECYCLE option(TCP_PUBSUB_BUILD_TESTS "Build the tcp_pubsub tests. Requires Gtest::GTest to be findable by CMake." - ON) + OFF) cmake_dependent_option(TCP_PUBSUB_USE_BUILTIN_GTEST "Use the builtin GoogleTest submodule. Only needed if TCP_PUBSUB_BUILD_TESTS is ON. If set to OFF, GoogleTest must be available from somewhere else (e.g. system libs)." diff --git a/tests/tcp_pubsub_test/CMakeLists.txt b/tests/tcp_pubsub_test/CMakeLists.txt new file mode 100644 index 0000000..cf37286 --- /dev/null +++ b/tests/tcp_pubsub_test/CMakeLists.txt @@ -0,0 +1,33 @@ +################################################################################ +# Copyright (c) Continental. All rights reserved. +# Licensed under the MIT license. See LICENSE file in the project root for details. +# +# SPDX-License-Identifier: MIT +################################################################################ + +cmake_minimum_required(VERSION 3.13) + +project(tcp_pubsub_test) + +set(CMAKE_FIND_PACKAGE_PREFER_CONFIG TRUE) + +find_package(tcp_pubsub REQUIRED) +find_package(GTest REQUIRED) + +set(sources + src/atomic_signalable.h + src/tcp_pubsub_test.cpp +) + +add_executable (${PROJECT_NAME} + ${sources} +) + +target_compile_features(${PROJECT_NAME} PUBLIC cxx_std_14) + +source_group(TREE ${CMAKE_CURRENT_SOURCE_DIR} FILES ${sources}) + +target_link_libraries (${PROJECT_NAME} + tcp_pubsub::tcp_pubsub + GTest::gtest_main +) \ No newline at end of file diff --git a/tests/tcp_pubsub_test/src/atomic_signalable.h b/tests/tcp_pubsub_test/src/atomic_signalable.h new file mode 100644 index 0000000..ab95f5e --- /dev/null +++ b/tests/tcp_pubsub_test/src/atomic_signalable.h @@ -0,0 +1,194 @@ +// Copyright (c) Continental. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for details. +// +// SPDX-License-Identifier: MIT + +#include +#include +#include +#include + +template +class atomic_signalable +{ +public: + atomic_signalable(T initial_value) : value(initial_value) {} + + atomic_signalable& operator=(const T new_value) + { + const std::lock_guard lock(mutex); + value = new_value; + cv.notify_all(); + return *this; + } + + T operator++() + { + const std::lock_guard lock(mutex); + T newValue = ++value; + cv.notify_all(); + return newValue; + } + + T operator++(T) + { + const std::lock_guard lock(mutex); + T oldValue = value++; + cv.notify_all(); + return oldValue; + } + + T operator--() + { + const std::lock_guard lock(mutex); + T newValue = --value; + cv.notify_all(); + return newValue; + } + + T operator--(T) + { + const std::lock_guard lock(mutex); + T oldValue = value--; + cv.notify_all(); + return oldValue; + } + + T operator+=(const T& other) + { + const std::lock_guard lock(mutex); + value += other; + cv.notify_all(); + return value; + } + + T operator-=(const T& other) + { + const std::lock_guard lock(mutex); + value -= other; + cv.notify_all(); + return value; + } + + T operator*=(const T& other) + { + const std::lock_guard lock(mutex); + value *= other; + cv.notify_all(); + return value; + } + + T operator/=(const T& other) + { + const std::lock_guard lock(mutex); + value /= other; + cv.notify_all(); + return value; + } + + T operator%=(const T& other) + { + const std::lock_guard lock(mutex); + value %= other; + cv.notify_all(); + return value; + } + + template + bool wait_for(Predicate predicate, std::chrono::milliseconds timeout) + { + std::unique_lock lock(mutex); + return cv.wait_for(lock, timeout, [&]() { return predicate(value); }); + } + + T get() const + { + const std::lock_guard lock(mutex); + return value; + } + + bool operator==(T other) const + { + const std::lock_guard lock(mutex); + return value == other; + } + + bool operator==(const atomic_signalable& other) const + { + std::lock_guard lock_this(mutex); + std::lock_guard lock_other(other.mutex); + return value == other.value; + } + + bool operator!=(T other) const + { + const std::lock_guard lock(mutex); + return value != other; + } + + bool operator<(T other) const + { + const std::lock_guard lock(mutex); + return value < other; + } + + bool operator<=(T other) const + { + const std::lock_guard lock(mutex); + return value <= other; + } + + bool operator>(T other) const + { + const std::lock_guard lock(mutex); + return value > other; + } + + bool operator>=(T other) const + { + const std::lock_guard lock(mutex); + return value >= other; + } + +private: + T value; + std::condition_variable cv; + mutable std::mutex mutex; +}; + + +template +bool operator==(const T& other, const atomic_signalable& atomic) +{ + return atomic == other; +} + +template +bool operator!=(const T& other, const atomic_signalable& atomic) +{ + return atomic != other; +} + +template +bool operator<(const T& other, const atomic_signalable& atomic) +{ + return atomic > other; +} + +template +bool operator<=(const T& other, const atomic_signalable& atomic) +{ + return atomic >= other; +} + +template +bool operator>(const T& other, const atomic_signalable& atomic) +{ + return atomic < other; +} + +template +bool operator>=(const T& other, const atomic_signalable& atomic) +{ + return atomic <= other; +} diff --git a/tests/tcp_pubsub_test/src/tcp_pubsub_test.cpp b/tests/tcp_pubsub_test/src/tcp_pubsub_test.cpp new file mode 100644 index 0000000..dde65e0 --- /dev/null +++ b/tests/tcp_pubsub_test/src/tcp_pubsub_test.cpp @@ -0,0 +1,85 @@ +// Copyright (c) Continental. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for details. +// +// SPDX-License-Identifier: MIT + +#include +#include +#include + +#include +#include +#include + +#include "atomic_signalable.h" + +#include + +TEST(tcp_pubsub, basic_test) +{ + atomic_signalable num_messages_received(0); + + // Create executor + std::shared_ptr executor = std::make_shared(1); + + // Create publisher + tcp_pubsub::Publisher hello_world_publisher(executor, 1588); + + // Create subscriber + tcp_pubsub::Subscriber hello_world_subscriber(executor); + + // Subscribe to localhost on port 1588 + hello_world_subscriber.addSession("127.0.0.1", 1588); + + std::string received_message; + + // Create a callback that will be called when a message is received + std::function callback_function = + [&received_message, &num_messages_received](const tcp_pubsub::CallbackData& callback_data) + { + received_message = std::string(callback_data.buffer_->data(), callback_data.buffer_->size()); + ++num_messages_received; + }; + + // Register the callback + hello_world_subscriber.setCallback(callback_function); + + // Wait up to 1 second for the subscriber to connect + for (int i = 0; i < 10; ++i) + { + if (hello_world_subscriber.getSessions().at(0)->isConnected()) + { + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + // Check that the subscriber is connected + EXPECT_TRUE(hello_world_subscriber.getSessions().at(0)->isConnected()); + + // Publish "Hello World 1" + { + const std::string message = "Hello World 1"; + hello_world_publisher.send(message.data(), message.size()); + } + + // wait for message to be received + num_messages_received.wait_for([](int value) { return value > 0; }, std::chrono::seconds(1)); + + // Check that the message was received + EXPECT_EQ(received_message, "Hello World 1"); + EXPECT_EQ(num_messages_received.get(), 1); + + // Publish "Hello World 2" + { + const std::string message = "Hello World 2"; + hello_world_publisher.send(message.data(), message.size()); + } + + // wait for message to be received + num_messages_received.wait_for([](int value) { return value > 1; }, std::chrono::seconds(1)); + + // Check that the message was received + EXPECT_EQ(received_message, "Hello World 2"); + EXPECT_EQ(num_messages_received.get(), 2); +}