From 247315d2c48a4597099d3a8e6082372b3a86f5ef Mon Sep 17 00:00:00 2001 From: Dominic Reber Date: Wed, 1 May 2024 08:46:02 +0200 Subject: [PATCH 1/5] feat: add communication interfaces --- .../communication_interfaces_bindings.hpp | 8 ++ .../communication_interfaces/bind_tcp.cpp | 27 ++++++ .../communication_interfaces/bind_udp.cpp | 24 +++++ .../communication_interfaces/bind_zmq.cpp | 87 +++++++++++++++++ .../communication_interfaces_bindings.cpp | 46 +++++++++ .../test/communication_interfaces/test_tcp.py | 56 +++++++++++ .../test/communication_interfaces/test_udp.py | 94 +++++++++++++++++++ .../test/communication_interfaces/test_zmq.py | 89 ++++++++++++++++++ .../communication_interfaces/CMakeLists.txt | 73 ++++++++++++++ .../SocketConfigurationException.hpp | 16 ++++ .../sockets/ISocket.hpp | 80 ++++++++++++++++ .../sockets/TCPClient.hpp | 34 +++++++ .../sockets/TCPServer.hpp | 47 ++++++++++ .../sockets/TCPSocket.hpp | 43 +++++++++ .../sockets/UDPClient.hpp | 34 +++++++ .../sockets/UDPServer.hpp | 36 +++++++ .../sockets/UDPSocket.hpp | 73 ++++++++++++++ .../sockets/ZMQPublisher.hpp | 29 ++++++ .../sockets/ZMQPublisherSubscriber.hpp | 68 ++++++++++++++ .../sockets/ZMQSocket.hpp | 63 +++++++++++++ .../sockets/ZMQSubscriber.hpp | 28 ++++++ .../src/sockets/ISocket.cpp | 30 ++++++ .../src/sockets/TCPClient.cpp | 32 +++++++ .../src/sockets/TCPServer.cpp | 58 ++++++++++++ .../src/sockets/TCPSocket.cpp | 38 ++++++++ .../src/sockets/UDPClient.cpp | 18 ++++ .../src/sockets/UDPServer.cpp | 18 ++++ .../src/sockets/UDPSocket.cpp | 84 +++++++++++++++++ .../src/sockets/ZMQPublisher.cpp | 15 +++ .../src/sockets/ZMQPublisherSubscriber.cpp | 41 ++++++++ .../src/sockets/ZMQSocket.cpp | 60 ++++++++++++ .../src/sockets/ZMQSubscriber.cpp | 17 ++++ .../test/test_communication_interfaces.cpp | 6 ++ .../test/tests/test_tcp_communication.cpp | 72 ++++++++++++++ .../test/tests/test_udp_communication.cpp | 88 +++++++++++++++++ .../test/tests/test_zmq_communication.cpp | 82 ++++++++++++++++ 36 files changed, 1714 insertions(+) create mode 100644 python/include/communication_interfaces_bindings.hpp create mode 100644 python/source/communication_interfaces/bind_tcp.cpp create mode 100644 python/source/communication_interfaces/bind_udp.cpp create mode 100644 python/source/communication_interfaces/bind_zmq.cpp create mode 100644 python/source/communication_interfaces/communication_interfaces_bindings.cpp create mode 100644 python/test/communication_interfaces/test_tcp.py create mode 100644 python/test/communication_interfaces/test_udp.py create mode 100644 python/test/communication_interfaces/test_zmq.py create mode 100644 source/communication_interfaces/CMakeLists.txt create mode 100644 source/communication_interfaces/include/communication_interfaces/exceptions/SocketConfigurationException.hpp create mode 100644 source/communication_interfaces/include/communication_interfaces/sockets/ISocket.hpp create mode 100644 source/communication_interfaces/include/communication_interfaces/sockets/TCPClient.hpp create mode 100644 source/communication_interfaces/include/communication_interfaces/sockets/TCPServer.hpp create mode 100644 source/communication_interfaces/include/communication_interfaces/sockets/TCPSocket.hpp create mode 100644 source/communication_interfaces/include/communication_interfaces/sockets/UDPClient.hpp create mode 100644 source/communication_interfaces/include/communication_interfaces/sockets/UDPServer.hpp create mode 100644 source/communication_interfaces/include/communication_interfaces/sockets/UDPSocket.hpp create mode 100644 source/communication_interfaces/include/communication_interfaces/sockets/ZMQPublisher.hpp create mode 100644 source/communication_interfaces/include/communication_interfaces/sockets/ZMQPublisherSubscriber.hpp create mode 100644 source/communication_interfaces/include/communication_interfaces/sockets/ZMQSocket.hpp create mode 100644 source/communication_interfaces/include/communication_interfaces/sockets/ZMQSubscriber.hpp create mode 100644 source/communication_interfaces/src/sockets/ISocket.cpp create mode 100644 source/communication_interfaces/src/sockets/TCPClient.cpp create mode 100644 source/communication_interfaces/src/sockets/TCPServer.cpp create mode 100644 source/communication_interfaces/src/sockets/TCPSocket.cpp create mode 100644 source/communication_interfaces/src/sockets/UDPClient.cpp create mode 100644 source/communication_interfaces/src/sockets/UDPServer.cpp create mode 100644 source/communication_interfaces/src/sockets/UDPSocket.cpp create mode 100644 source/communication_interfaces/src/sockets/ZMQPublisher.cpp create mode 100644 source/communication_interfaces/src/sockets/ZMQPublisherSubscriber.cpp create mode 100644 source/communication_interfaces/src/sockets/ZMQSocket.cpp create mode 100644 source/communication_interfaces/src/sockets/ZMQSubscriber.cpp create mode 100644 source/communication_interfaces/test/test_communication_interfaces.cpp create mode 100644 source/communication_interfaces/test/tests/test_tcp_communication.cpp create mode 100644 source/communication_interfaces/test/tests/test_udp_communication.cpp create mode 100644 source/communication_interfaces/test/tests/test_zmq_communication.cpp diff --git a/python/include/communication_interfaces_bindings.hpp b/python/include/communication_interfaces_bindings.hpp new file mode 100644 index 000000000..477ae404c --- /dev/null +++ b/python/include/communication_interfaces_bindings.hpp @@ -0,0 +1,8 @@ +#include + +namespace py = pybind11; +using namespace pybind11::literals; + +void bind_tcp(py::module_& m); +void bind_udp(py::module_& m); +void bind_zmq(py::module_& m); diff --git a/python/source/communication_interfaces/bind_tcp.cpp b/python/source/communication_interfaces/bind_tcp.cpp new file mode 100644 index 000000000..9d07bccc6 --- /dev/null +++ b/python/source/communication_interfaces/bind_tcp.cpp @@ -0,0 +1,27 @@ +#include "communication_interfaces_bindings.hpp" + +#include +#include + +using namespace communication_interfaces::sockets; + +void bind_tcp(py::module_& m) { + py::class_(m, "TCPClientConfiguration") + .def( + py::init(), "TCPClientConfiguration struct", "ip_address"_a, "port"_a, "buffer_size"_a) + .def_readwrite("ip_address", &TCPClientConfiguration::ip_address) + .def_readwrite("port", &TCPClientConfiguration::port) + .def_readwrite("buffer_size", &TCPClientConfiguration::buffer_size); + + py::class_, ISocket>(m, "TCPClient") + .def(py::init(), "Constructor taking the configuration struct", "configuration"_a); + + py::class_(m, "TCPServerConfiguration") + .def(py::init(), "TCPServerConfiguration struct", "port"_a, "buffer_size"_a, "enable_reuse"_a) + .def_readwrite("port", &TCPServerConfiguration::port) + .def_readwrite("buffer_size", &TCPServerConfiguration::buffer_size) + .def_readwrite("enable_reuse", &TCPServerConfiguration::enable_reuse); + + py::class_, ISocket>(m, "TCPServer") + .def(py::init(), "Constructor taking the configuration struct", "configuration"_a); +} diff --git a/python/source/communication_interfaces/bind_udp.cpp b/python/source/communication_interfaces/bind_udp.cpp new file mode 100644 index 000000000..20f043f07 --- /dev/null +++ b/python/source/communication_interfaces/bind_udp.cpp @@ -0,0 +1,24 @@ +#include "communication_interfaces_bindings.hpp" + +#include +#include + +using namespace communication_interfaces::sockets; + +void bind_udp(py::module_& m) { + py::class_(m, "UDPSocketConfiguration") + .def( + py::init(), "UDPSocketConfiguration struct", "ip_address"_a, "port"_a, + "buffer_size"_a, "enable_reuse"_a = false, "timeout_duration_sec"_a = 0.0) + .def_readwrite("ip_address", &UDPSocketConfiguration::ip_address) + .def_readwrite("port", &UDPSocketConfiguration::port) + .def_readwrite("buffer_size", &UDPSocketConfiguration::buffer_size) + .def_readwrite("enable_reuse", &UDPSocketConfiguration::enable_reuse) + .def_readwrite("timeout_duration_sec", &UDPSocketConfiguration::timeout_duration_sec); + + py::class_, ISocket>(m, "UDPClient") + .def(py::init(), "Constructor taking the configuration struct", "configuration"_a); + + py::class_, ISocket>(m, "UDPServer") + .def(py::init(), "Constructor taking the configuration struct", "configuration"_a); +} diff --git a/python/source/communication_interfaces/bind_zmq.cpp b/python/source/communication_interfaces/bind_zmq.cpp new file mode 100644 index 000000000..7d3d77b0f --- /dev/null +++ b/python/source/communication_interfaces/bind_zmq.cpp @@ -0,0 +1,87 @@ +#include "communication_interfaces_bindings.hpp" + +#include + +using namespace communication_interfaces::sockets; + +class PyZMQContext { +public: + PyZMQContext(int io_threads = 1) { context = std::make_shared(io_threads); } + + std::shared_ptr context; +}; + +struct PyZMQSocketConfiguration { + PyZMQContext context; + std::string ip_address; + std::string port; + bool bind = true; + bool wait = false; +}; + +struct PyZMQCombinedSocketsConfiguration { + PyZMQContext context; + std::string ip_address; + std::string publisher_port; + std::string subscriber_port; + bool bind_publisher = true; + bool bind_subscriber = true; + bool wait = false; +}; + +class PyZMQPublisher : public ZMQPublisher, public std::enable_shared_from_this { +public: + PyZMQPublisher(PyZMQSocketConfiguration config) + : ZMQPublisher({config.context.context, config.ip_address, config.port, config.bind, config.wait}) {} +}; + +class PyZMQSubscriber : public ZMQSubscriber, public std::enable_shared_from_this { +public: + PyZMQSubscriber(PyZMQSocketConfiguration config) + : ZMQSubscriber({config.context.context, config.ip_address, config.port, config.bind, config.wait}) {} +}; + +class PyZMQPublisherSubscriber : public ZMQPublisherSubscriber, + public std::enable_shared_from_this { +public: + PyZMQPublisherSubscriber(PyZMQCombinedSocketsConfiguration config) + : ZMQPublisherSubscriber( + {config.context.context, config.ip_address, config.publisher_port, config.subscriber_port, + config.bind_publisher, config.bind_subscriber, config.wait}) {} +}; + +void bind_zmq(py::module_& m) { + py::class_(m, "ZMQContext").def(py::init(), "Create a ZMQ context", "io_threads"_a = 1); + + py::class_(m, "ZMQSocketConfiguration") + .def( + py::init(), "ZMQSocketConfiguration struct", "context"_a, + "ip_address"_a, "port"_a, "bind"_a = true, "wait"_a = false) + .def_readwrite("ip_address", &PyZMQSocketConfiguration::ip_address) + .def_readwrite("port", &PyZMQSocketConfiguration::port) + .def_readwrite("bind", &PyZMQSocketConfiguration::bind) + .def_readwrite("wait", &PyZMQSocketConfiguration::wait); + + py::class_(m, "ZMQCombinedSocketsConfiguration") + .def( + py::init(), + "ZMQCombinedSocketsConfiguration struct", "context"_a, "ip_address"_a, "publisher_port"_a, + "subscriber_port"_a, "bind_publisher"_a = true, "bind_subscriber"_a = true, "wait"_a = false) + .def_readwrite("ip_address", &PyZMQCombinedSocketsConfiguration::ip_address) + .def_readwrite("publisher_port", &PyZMQCombinedSocketsConfiguration::publisher_port) + .def_readwrite("subscriber_port", &PyZMQCombinedSocketsConfiguration::subscriber_port) + .def_readwrite("bind_publisher", &PyZMQCombinedSocketsConfiguration::bind_publisher) + .def_readwrite("bind_subscriber", &PyZMQCombinedSocketsConfiguration::bind_subscriber) + .def_readwrite("wait", &PyZMQCombinedSocketsConfiguration::wait); + + py::class_, ISocket>(m, "ZMQPublisher") + .def(py::init(), "Constructor taking the configuration struct", "configuration"_a); + + py::class_, ISocket>(m, "ZMQSubscriber") + .def(py::init(), "Constructor taking the configuration struct", "configuration"_a); + + py::class_, ISocket>(m, "ZMQPublisherSubscriber") + .def( + py::init(), "Constructor taking the configuration struct", + "configuration"_a); +} diff --git a/python/source/communication_interfaces/communication_interfaces_bindings.cpp b/python/source/communication_interfaces/communication_interfaces_bindings.cpp new file mode 100644 index 000000000..910f2e923 --- /dev/null +++ b/python/source/communication_interfaces/communication_interfaces_bindings.cpp @@ -0,0 +1,46 @@ +#include "communication_interfaces_bindings.hpp" + +#include +#include + +#define STRINGIFY(x) #x +#define MACRO_STRINGIFY(x) STRINGIFY(x) + +using namespace communication_interfaces; + +PYBIND11_MODULE(communication_interfaces, m) { + m.doc() = "Python bindings for communication interfaces"; + +#ifdef MODULE_VERSION_INFO + m.attr("__version__") = MACRO_STRINGIFY(MODULE_VERSION_INFO); +#else + m.attr("__version__") = "dev"; +#endif + + auto m_sub_ex = m.def_submodule("exceptions", "Submodule for custom communication interfaces exceptions"); + py::register_exception( + m_sub_ex, "SocketConfigurationError", PyExc_RuntimeError); + + auto m_sub_sock = m.def_submodule("sockets", "Submodule for communication interfaces sockets"); + + py::class_>(m_sub_sock, "ISocket") + .def("open", &sockets::ISocket::open, "Perform configuration steps to open the socket for communication") + .def( + "receive_bytes", + [](sockets::ISocket& socket) -> py::object { + std::string buffer; + auto res = socket.receive_bytes(buffer); + if (res) { + return py::bytes(buffer); + } else { + return py::none(); + } + }, + "Receive bytes from the socket") + .def("send_bytes", &sockets::ISocket::send_bytes, "Receive bytes from the socket", "buffer"_a) + .def("close", &sockets::ISocket::close, "Perform steps to disconnect and close the socket communication"); + + bind_tcp(m_sub_sock); + bind_udp(m_sub_sock); + bind_zmq(m_sub_sock); +} diff --git a/python/test/communication_interfaces/test_tcp.py b/python/test/communication_interfaces/test_tcp.py new file mode 100644 index 000000000..8a9ef27b8 --- /dev/null +++ b/python/test/communication_interfaces/test_tcp.py @@ -0,0 +1,56 @@ +import multiprocessing +import pytest +import time + +from communication_interfaces.exceptions import SocketConfigurationError +from communication_interfaces.sockets import TCPClientConfiguration, TCPClient, TCPServerConfiguration, TCPServer + + +class TestTCPCommunication: + server = TCPServer(TCPServerConfiguration(6000, 50, True)) + client = TCPClient(TCPClientConfiguration("127.0.0.1", 6000, 50)) + server_message = "Hello client" + client_message = "Hello server" + + def serve(self): + self.server.open() + response = self.server.receive_bytes() + if response is None: + pytest.fail("No response from client") + assert response.decode("utf-8").rstrip("\x00") == self.client_message + assert self.server.send_bytes(self.server_message) + + def test_comm(self): + t = multiprocessing.Process(target=self.serve) + t.start() + time.sleep(1.0) + + self.client.open() + assert self.client.send_bytes(self.client_message) + response = self.client.receive_bytes() + assert response + assert response.decode("utf-8").rstrip("\x00") == self.server_message + + buffer = "" + self.server.close() + with pytest.raises(SocketConfigurationError): + self.server.receive_bytes() + with pytest.raises(SocketConfigurationError): + self.server.send_bytes(buffer) + self.client.close() + with pytest.raises(SocketConfigurationError): + self.client.receive_bytes() + with pytest.raises(SocketConfigurationError): + self.client.send_bytes(buffer) + + def test_not_open(self): + buffer = "" + with pytest.raises(SocketConfigurationError): + self.server.receive_bytes() + with pytest.raises(SocketConfigurationError): + self.server.send_bytes(buffer) + + with pytest.raises(SocketConfigurationError): + self.client.receive_bytes() + with pytest.raises(SocketConfigurationError): + self.client.send_bytes(buffer) diff --git a/python/test/communication_interfaces/test_udp.py b/python/test/communication_interfaces/test_udp.py new file mode 100644 index 000000000..8cbeeb5ac --- /dev/null +++ b/python/test/communication_interfaces/test_udp.py @@ -0,0 +1,94 @@ +import pytest + +from communication_interfaces.sockets import UDPSocketConfiguration, UDPClient, UDPServer +from communication_interfaces.exceptions import SocketConfigurationError + + +@pytest.fixture +def udp_config(): + return UDPSocketConfiguration("127.0.0.1", 5000, 100) + + +@pytest.fixture +def server(udp_config): + socket = UDPServer(udp_config) + yield socket + socket.close() + + +@pytest.fixture +def client(udp_config): + socket = UDPClient(udp_config) + yield socket + socket.close() + + +def test_send_receive(server, client): + send_string = "Hello world" + + try: + server.open() + except SocketConfigurationError as e: + pytest.fail(e) + + try: + client.open() + except SocketConfigurationError as e: + pytest.fail(e) + + assert client.send_bytes(send_string) + response = server.receive_bytes() + assert response + assert response.decode("utf-8").rstrip("\x00") == send_string + + +def test_timeout(udp_config): + udp_config.timeout_duration_sec = 2.0 + + server = UDPServer(udp_config) + server.open() + + assert server.receive_bytes() is None + server.close() + + +def test_port_reuse(udp_config, server): + server.open() + + server2 = UDPServer(udp_config) + with pytest.raises(Exception): + server2.open() + + +def test_open_close(udp_config): + buffer = "" + udp_config.timeout_duration_sec = 0.5 + server = UDPServer(udp_config) + with pytest.raises(SocketConfigurationError): + server.send_bytes(buffer) + with pytest.raises(SocketConfigurationError): + server.receive_bytes() + server.open() + assert server.send_bytes("test") + assert not server.receive_bytes() + + server.close() + with pytest.raises(SocketConfigurationError): + server.send_bytes(buffer) + with pytest.raises(SocketConfigurationError): + server.receive_bytes() + + client = UDPClient(udp_config) + with pytest.raises(SocketConfigurationError): + client.send_bytes(buffer) + with pytest.raises(SocketConfigurationError): + client.receive_bytes() + client.open() + assert client.send_bytes("test") + assert not client.receive_bytes() + + client.close() + with pytest.raises(SocketConfigurationError): + client.send_bytes(buffer) + with pytest.raises(SocketConfigurationError): + client.receive_bytes() diff --git a/python/test/communication_interfaces/test_zmq.py b/python/test/communication_interfaces/test_zmq.py new file mode 100644 index 000000000..7911ff5d7 --- /dev/null +++ b/python/test/communication_interfaces/test_zmq.py @@ -0,0 +1,89 @@ +import pytest +import time + +from communication_interfaces.exceptions import SocketConfigurationError +from communication_interfaces.sockets import ZMQContext, ZMQSocketConfiguration, ZMQPublisher, ZMQSubscriber, \ + ZMQCombinedSocketsConfiguration, ZMQPublisherSubscriber + + +@pytest.fixture +def zmq_context(): + return ZMQContext() + + +@pytest.fixture +def zmq_config(zmq_context): + return ZMQSocketConfiguration(zmq_context, "127.0.0.1", "4000") + + +def test_send_receive(zmq_config): + send_string = "Hello world" + + publisher = ZMQPublisher(zmq_config) + zmq_config.bind = False + subscriber = ZMQSubscriber(zmq_config) + + publisher.open() + subscriber.open() + + response = subscriber.receive_bytes() + while response is None: + assert publisher.send_bytes(send_string) + response = subscriber.receive_bytes() + time.sleep(0.01) + + assert response + assert response.decode("utf-8") == send_string + + publisher.close() + subscriber.close() + + +def test_send_receive_combined(zmq_context): + server_send_string = "Hello client" + client_send_string = "Hello server" + + server_config = ZMQCombinedSocketsConfiguration( + zmq_context, "127.0.0.1", "5001", "5002") + server = ZMQPublisherSubscriber(server_config) + + client_config = ZMQCombinedSocketsConfiguration( + zmq_context, "127.0.0.1", "5002", "5001", False, False) + client = ZMQPublisherSubscriber(client_config) + + server.open() + client.open() + + response = client.receive_bytes() + while response is None: + assert server.send_bytes(server_send_string) + response = client.receive_bytes() + time.sleep(0.01) + + assert response.decode("utf-8") == server_send_string + + response = server.receive_bytes() + while response is None: + assert client.send_bytes(client_send_string) + response = server.receive_bytes() + time.sleep(0.01) + + assert response.decode("utf-8") == client_send_string + + server.close() + client.close() + + +def test_open_close(zmq_config): + buffer = "" + socket = ZMQPublisher(zmq_config) + with pytest.raises(SocketConfigurationError): + socket.send_bytes(buffer) + with pytest.raises(SocketConfigurationError): + socket.receive_bytes() + + socket.open() + assert socket.send_bytes("test") + socket.close() + with pytest.raises(SocketConfigurationError): + socket.send_bytes(buffer) diff --git a/source/communication_interfaces/CMakeLists.txt b/source/communication_interfaces/CMakeLists.txt new file mode 100644 index 000000000..7a35bca7d --- /dev/null +++ b/source/communication_interfaces/CMakeLists.txt @@ -0,0 +1,73 @@ +set(LIBRARY_NAME communication_interfaces) + +set(CPPZMQ_VERSION 4.7.1) +find_package(cppzmq ${CPPZMQ_VERSION} REQUIRED) + +set(CORE_SOURCES + src/sockets/ISocket.cpp + src/sockets/UDPSocket.cpp + src/sockets/UDPClient.cpp + src/sockets/UDPServer.cpp + src/sockets/ZMQSocket.cpp + src/sockets/ZMQPublisher.cpp + src/sockets/ZMQSubscriber.cpp + src/sockets/ZMQPublisherSubscriber.cpp + src/sockets/TCPSocket.cpp + src/sockets/TCPClient.cpp + src/sockets/TCPServer.cpp +) + +add_library(${LIBRARY_NAME} SHARED ${CORE_SOURCES}) +add_library(${PROJECT_NAME}::${LIBRARY_NAME} ALIAS ${LIBRARY_NAME}) + +# add include directories +target_include_directories(${LIBRARY_NAME} + PUBLIC + $ + $ +) +target_link_libraries(${LIBRARY_NAME} + cppzmq +) + +# install the target and create export-set +install(TARGETS ${LIBRARY_NAME} + EXPORT ${LIBRARY_NAME}_targets + LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} + ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} + RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} + INCLUDES DESTINATION ${CMAKE_INSTALL_INCLUDEDIR} +) + +install(DIRECTORY include/ + DESTINATION ${CMAKE_INSTALL_INCLUDEDIR} +) + +# generate and install export file +install(EXPORT ${LIBRARY_NAME}_targets + FILE ${PROJECT_NAME}_${LIBRARY_NAME}_targets.cmake + NAMESPACE ${PROJECT_NAME}:: + DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/${PROJECT_NAME} +) + +if (BUILD_TESTING) + add_executable(test_${LIBRARY_NAME} test/test_communication_interfaces.cpp) + file(GLOB_RECURSE MODULE_TEST_SOURCES test/tests test_*.cpp) + target_sources(test_${LIBRARY_NAME} PRIVATE ${MODULE_TEST_SOURCES}) + target_link_libraries(test_${LIBRARY_NAME} + ${LIBRARY_NAME} + ${GTEST_LIBRARIES} + pthread + ) + add_test(NAME test_${LIBRARY_NAME} COMMAND test_${LIBRARY_NAME}) +endif () + +if(${PKG_CONFIG_FOUND}) + set(PKG_NAME ${LIBRARY_NAME}) + set(PKG_DESC "Communication interfaces for socket communication.") + set(pkg_conf_file "communicatoin_interfaces.pc") + set(PKG_CFLAGS "-lcppzmq") + configure_file("${CMAKE_CURRENT_SOURCE_DIR}/../library_template.pc.in" "${CMAKE_CURRENT_BINARY_DIR}/${pkg_conf_file}" @ONLY) + install(FILES "${CMAKE_CURRENT_BINARY_DIR}/${pkg_conf_file}" + DESTINATION ${CMAKE_INSTALL_LIBDIR}/pkgconfig/ COMPONENT pkgconfig) +endif() diff --git a/source/communication_interfaces/include/communication_interfaces/exceptions/SocketConfigurationException.hpp b/source/communication_interfaces/include/communication_interfaces/exceptions/SocketConfigurationException.hpp new file mode 100644 index 000000000..28f5f930a --- /dev/null +++ b/source/communication_interfaces/include/communication_interfaces/exceptions/SocketConfigurationException.hpp @@ -0,0 +1,16 @@ +#pragma once + +#include +#include + +namespace communication_interfaces::exceptions { + +/* + * @class SocketConfigurationException + * @brief Exception that is thrown when a socket configuration failed + */ +class SocketConfigurationException : public std::runtime_error { +public: + explicit SocketConfigurationException(const std::string& msg) : runtime_error(msg) {}; +}; +} // namespace communication_interfaces::exceptions diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/ISocket.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/ISocket.hpp new file mode 100644 index 000000000..1a2db3e39 --- /dev/null +++ b/source/communication_interfaces/include/communication_interfaces/sockets/ISocket.hpp @@ -0,0 +1,80 @@ +#pragma once + +#include + +#include "communication_interfaces/exceptions/SocketConfigurationException.hpp" + +namespace communication_interfaces::sockets { + +/** + * @brief Interface class to define functions required for simple socket communication + */ +class ISocket { +public: + /** + * @brief Default constructor + */ + ISocket() = default; + + /** + * @brief Default destructor + */ + virtual ~ISocket() = default; + + /** + * @brief Perform configuration steps to open the socket for communication + * @throws SocketConfigurationException if opening fails + */ + void open(); + + /** + * @brief Receive bytes from the socket + * @param buffer The buffer to fill with the received bytes + * @return True if bytes were received, false otherwise + * @throws SocketConfigurationException if socket has not been opened yet + */ + bool receive_bytes(std::string& buffer); + + /** + * @brief Send bytes to the socket + * @param buffer The buffer with the bytes to send + * @return True if bytes were sent, false otherwise + * @throws SocketConfigurationException if socket has not been opened yet + */ + bool send_bytes(const std::string& buffer); + + /** + * @brief Perform steps to disconnect and close the socket communication + */ + void close(); + +protected: + /** + * @brief Perform configuration steps to open the socket for communication + * @throws SocketConfigurationException if opening fails + */ + virtual void on_open() = 0; + + /** + * @brief Receive bytes from the socket + * @param buffer The buffer to fill with the received bytes + * @return True if bytes were received, false otherwise + */ + virtual bool on_receive_bytes(std::string& buffer) = 0; + + /** + * @brief Send bytes to the socket + * @param buffer The buffer with the bytes to send + * @return True if bytes were sent, false otherwise + */ + virtual bool on_send_bytes(const std::string& buffer) = 0; + + /** + * @brief Perform steps to disconnect and close the socket communication + */ + virtual void on_close() {}; + +private: + bool opened_ = false; +}; +}// namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/TCPClient.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/TCPClient.hpp new file mode 100644 index 000000000..640480775 --- /dev/null +++ b/source/communication_interfaces/include/communication_interfaces/sockets/TCPClient.hpp @@ -0,0 +1,34 @@ +#pragma once + +#include "communication_interfaces/sockets/TCPSocket.hpp" + +namespace communication_interfaces::sockets { + +/** + * @struct TCPClientConfiguration + * @brief Configuration parameters for a TCP client + */ +struct TCPClientConfiguration { + std::string ip_address; + int port; + int buffer_size; +}; + +class TCPClient : public TCPSocket { +public: + /** + * @brief Constructor taking the configuration struct + * @param The configuration struct + */ + explicit TCPClient(TCPClientConfiguration configuration); + +private: + /** + * @copydoc ISocket::on_open() + * @details Connect the client socket to the server + */ + void on_open() override; + + TCPClientConfiguration config_; ///< Socket configuration struct +}; +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/TCPServer.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/TCPServer.hpp new file mode 100644 index 000000000..b3ebd3a58 --- /dev/null +++ b/source/communication_interfaces/include/communication_interfaces/sockets/TCPServer.hpp @@ -0,0 +1,47 @@ +#pragma once + +#include "communication_interfaces/sockets/TCPSocket.hpp" + +namespace communication_interfaces::sockets { + +/** + * @struct TCPServerConfiguration + * @brief Configuration parameters for a TCP server + */ +struct TCPServerConfiguration { + int port; + int buffer_size; + bool enable_reuse; +}; + +class TCPServer : public TCPSocket { +public: + /** + * @brief Constructor taking the configuration struct + * @param The configuration struct + */ + explicit TCPServer(TCPServerConfiguration configuration); + + /** + * @brief Close the sockets by calling TCPServer::close() + */ + ~TCPServer() override; + +private: + /** + * @copydoc ISocket::on_open() + * @details Wait for connection requests from clients and accept new connections. This method blocks until a + * connection is established + */ + void on_open() override; + + /** + * @copydoc ISocket::on_close() + */ + void on_close() override; + + + TCPServerConfiguration config_; ///< Socket configuration struct + int server_fd_; ///< File descriptor of the connected server socket +}; +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/TCPSocket.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/TCPSocket.hpp new file mode 100644 index 000000000..9eea88662 --- /dev/null +++ b/source/communication_interfaces/include/communication_interfaces/sockets/TCPSocket.hpp @@ -0,0 +1,43 @@ +#pragma once + +#include + +#include "communication_interfaces/sockets/ISocket.hpp" + +namespace communication_interfaces::sockets { + +/** + * @brief Abstract class to define a generic TCP socket + * @details TCP is a connection-based communication protocol. Hence, TCP sockets need to implement an additional + * interface method `connect()`. + */ +class TCPSocket : public ISocket { +public: + /** + * @brief Close the socket by calling TCPSocket::close() + */ + ~TCPSocket() override; + +protected: + explicit TCPSocket(int buffer_size); + + /** + * @copydoc ISocket::on_receive_bytes(std::string&) + */ + bool on_receive_bytes(std::string& buffer) override; + + /** + * @copydoc ISocket::on_receive_bytes(std::string&) + */ + bool on_send_bytes(const std::string& buffer) override; + + /** + * @copydoc ISocket::on_close(std::string&) + */ + void on_close() override; + + sockaddr_in server_address_; ///< Address of the TCP server + int socket_fd_; ///< File descriptor of the socket + int buffer_size_; ///< Buffer size +}; +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/UDPClient.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/UDPClient.hpp new file mode 100644 index 000000000..0513bcba6 --- /dev/null +++ b/source/communication_interfaces/include/communication_interfaces/sockets/UDPClient.hpp @@ -0,0 +1,34 @@ +#pragma once + +#include "communication_interfaces/sockets/UDPSocket.hpp" + +namespace communication_interfaces::sockets { + +/** + * @class UDPClient + * @brief Class to define a UDP client + */ +class UDPClient : public UDPSocket { +public: + /** + * @copydoc UDPSocket::UDPSocket(UDPSocketConfiguration) + */ + UDPClient(UDPSocketConfiguration configuration); + +private: + /** + * @copydoc ISocket::on_open() + */ + void on_open() override; + + /** + * @copydoc ISocket::on_receive_bytes(std::string&) + */ + bool on_receive_bytes(std::string& buffer) override; + + /** + * @copydoc ISocket::on_send_bytes(const std::string&) + */ + bool on_send_bytes(const std::string& buffer) override; +}; +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/UDPServer.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/UDPServer.hpp new file mode 100644 index 000000000..465fbba34 --- /dev/null +++ b/source/communication_interfaces/include/communication_interfaces/sockets/UDPServer.hpp @@ -0,0 +1,36 @@ +#pragma once + +#include "communication_interfaces/sockets/UDPSocket.hpp" + +namespace communication_interfaces::sockets { + +/** + * @class UDPServer + * @brief Class to define a UDP server + */ +class UDPServer : public UDPSocket { +public: + /** + * @copydoc UDPSocket::UDPSocket(UDPSocketConfiguration) + */ + UDPServer(UDPSocketConfiguration configuration); + +private: + /** + * @copydoc ISocket::on_open() + */ + void on_open() override; + + /** + * @copydoc ISocket::on_receive_bytes(std::string&) + */ + bool on_receive_bytes(std::string& buffer) override; + + /** + * @copydoc ISocket::on_send_bytes(const std::string&) + */ + bool on_send_bytes(const std::string& buffer) override; + + sockaddr_in client_address_; +}; +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/UDPSocket.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/UDPSocket.hpp new file mode 100644 index 000000000..10368664f --- /dev/null +++ b/source/communication_interfaces/include/communication_interfaces/sockets/UDPSocket.hpp @@ -0,0 +1,73 @@ +#pragma once + +#include + +#include "communication_interfaces/sockets/ISocket.hpp" + +namespace communication_interfaces::sockets { + +/** + * @struct UDPSocketConfiguration + * @brief Configuration parameters for a UDP sockets + */ +struct UDPSocketConfiguration { + std::string ip_address; + int port; + int buffer_size; + bool enable_reuse = false; + double timeout_duration_sec = 0.0; +}; + +/** + * @class UDPSocket + * @brief Abstract class to define a generic UDP socket + */ +class UDPSocket : public ISocket { +public: + /** + * @brief Close the socket by calling UDPSocket::close() + */ + ~UDPSocket() override; + +protected: + /** + * @brief Constructor taking the configuration struct + * @param The configuration struct + */ + explicit UDPSocket(UDPSocketConfiguration configuration); + + /** + * @brief Perform steps to open the socket on the desired IP/port, set reuse and timeout options and bind if desired. + * @param bind_socket If true, bind the socket (for a UDP server), no binding otherwise (for a UDP client) + */ + void open_socket(bool bind_socket); + + /** + * @brief Receive bytes from the socket + * @param address Reference to a sockaddr_in structure in which the sending address is to be stored + * @param buffer The buffer to fill with the received bytes + * @return True if bytes were received, false otherwise + */ + [[nodiscard]] bool recvfrom(sockaddr_in& address, std::string& buffer); + + /** + * @brief Send bytes to the socket + * @param address Reference to a sockaddr_in structure containing the destination address + * @param buffer The buffer with the bytes to send + * @return True if bytes were sent, false otherwise + */ + [[nodiscard]] bool sendto(const sockaddr_in& address, const std::string& buffer) const; + + /** + * @copydoc ISocket::on_close() + */ + void on_close() override; + + sockaddr_in server_address_; ///< Address of the UDP server + +private: + UDPSocketConfiguration config_; ///< Socket configuration struct + int server_fd_; ///< File descriptor of the socket + socklen_t addr_len_; ///< Length of the socket address +}; +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/ZMQPublisher.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/ZMQPublisher.hpp new file mode 100644 index 000000000..9bba2cde8 --- /dev/null +++ b/source/communication_interfaces/include/communication_interfaces/sockets/ZMQPublisher.hpp @@ -0,0 +1,29 @@ +#pragma once + +#include "communication_interfaces/sockets/ZMQSocket.hpp" + +namespace communication_interfaces::sockets { + +/** + * @class ZMQPublisher + * @brief Class to define a ZMQ publisher + */ +class ZMQPublisher : public ZMQSocket { +public: + /** + * @copydoc ZMQSocket::ZMQSocket(ZMQSocketConfiguration) + */ + explicit ZMQPublisher(ZMQSocketConfiguration configuration); + +private: + /** + * @copydoc ISocket::on_open() + */ + void on_open() override; + + /** + * @brief This method throws a runtime error as receiving is not available for a ZMQ publisher + */ + bool on_receive_bytes(std::string& buffer) override; +}; +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/ZMQPublisherSubscriber.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/ZMQPublisherSubscriber.hpp new file mode 100644 index 000000000..dc24ebf2e --- /dev/null +++ b/source/communication_interfaces/include/communication_interfaces/sockets/ZMQPublisherSubscriber.hpp @@ -0,0 +1,68 @@ +#pragma once + +#include "communication_interfaces/sockets/ISocket.hpp" +#include "communication_interfaces/sockets/ZMQPublisher.hpp" +#include "communication_interfaces/sockets/ZMQSubscriber.hpp" + +namespace communication_interfaces::sockets { + +/** + * @struct ZMQCombinedSocketsConfiguration + * @brief Configuration parameters for a the combination of a ZMQ Publisher and Pubscriber socket + */ +struct ZMQCombinedSocketsConfiguration { + std::shared_ptr context; + std::string ip_address; + std::string publisher_port; + std::string subscriber_port; + bool bind_publisher = true; + bool bind_subscriber = true; + bool wait = false; +}; + +/** + * @brief A class that combines both a ZMQ Publisher and Subscriber socket into one single object + */ +class ZMQPublisherSubscriber : public ISocket { +public: + /** + * @brief Constructor taking the configuration struct + * @param The configuration struct + */ + explicit ZMQPublisherSubscriber(ZMQCombinedSocketsConfiguration configuration); + + /** + * @brief Close the sockets by calling ZMQPublisherSubscriber::close() + */ + ~ZMQPublisherSubscriber() override; + +private: + /** + * @brief Open the internal ZMQ Publisher and Subscriber sockets for communication + * @throws SocketConfigurationException if opening fails + */ + void on_open() override; + + /** + * @brief Receive bytes from the internal ZMQ Subscriber socket + * @param buffer The buffer to fill with the received bytes + * @return True if bytes were received, false otherwise + */ + bool on_receive_bytes(std::string& buffer) override; + + /** + * @brief Send bytes with the internal ZMQ Publisher socket + * @param buffer The buffer with the bytes to send + * @return True if bytes were sent, false otherwise + */ + bool on_send_bytes(const std::string& buffer) override; + + /** + * @copydoc ISocket::on_close() + */ + void on_close() override; + + std::shared_ptr pub_; ///< ZMQ publisher + std::shared_ptr sub_; ///< ZMQ subscriber +}; +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/ZMQSocket.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/ZMQSocket.hpp new file mode 100644 index 000000000..213eb5929 --- /dev/null +++ b/source/communication_interfaces/include/communication_interfaces/sockets/ZMQSocket.hpp @@ -0,0 +1,63 @@ +#pragma once + +#include + +#include "communication_interfaces/sockets/ISocket.hpp" + +namespace communication_interfaces::sockets { + +/** + * @struct ZMQSocketConfiguration + * @brief Configuration parameters for a ZMQ socket + */ +struct ZMQSocketConfiguration { + std::shared_ptr context; + std::string ip_address; + std::string port; + bool bind = true; + bool wait = false; +}; + +/** + * @class ZMQSocket + * @brief Abstract class to define a generic ZMQ socket + */ +class ZMQSocket : public ISocket { +public: + /** + * @brief Close the socket by calling ZMQSocket::close() + */ + ~ZMQSocket() override; + +protected: + /** + * @brief Constructor taking the configuration struct + * @param The configuration struct + */ + ZMQSocket(ZMQSocketConfiguration configuration); + + /** + * @brief Bind or connect the socket on the desired IP/port + */ + void open_socket(); + + ZMQSocketConfiguration config_; ///< Socket configuration struct + std::shared_ptr socket_; ///< ZMQ socket + +private: + /** + * @copydoc ISocket::on_receive_bytes(std::string&) + */ + bool on_receive_bytes(std::string& buffer) override; + + /** + * @copydoc ISocket::on_send_bytes(const std::string&) + */ + bool on_send_bytes(const std::string& buffer) override; + + /** + * @copydoc ISocket::on_close() + */ + void on_close() override; +}; +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/ZMQSubscriber.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/ZMQSubscriber.hpp new file mode 100644 index 000000000..9de008cfd --- /dev/null +++ b/source/communication_interfaces/include/communication_interfaces/sockets/ZMQSubscriber.hpp @@ -0,0 +1,28 @@ +#pragma once + +#include "communication_interfaces/sockets/ZMQSocket.hpp" + +namespace communication_interfaces::sockets { + +/** + * @class ZMQSubscriber + * @brief Class to define a ZMQ subscriber + */ +class ZMQSubscriber : public ZMQSocket { +public: + /** + * @copydoc ZMQSocket::ZMQSocket(ZMQSocketConfiguration) + */ + explicit ZMQSubscriber(ZMQSocketConfiguration configuration); + + /** + * @copydoc ISocket::on_open() + */ + void on_open() override; + + /** + * @brief This method throws a runtime error as sending is not available for a ZMQ publisher + */ + bool on_send_bytes(const std::string& buffer) override; +}; +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/src/sockets/ISocket.cpp b/source/communication_interfaces/src/sockets/ISocket.cpp new file mode 100644 index 000000000..3af22c800 --- /dev/null +++ b/source/communication_interfaces/src/sockets/ISocket.cpp @@ -0,0 +1,30 @@ +#include "communication_interfaces/sockets/ISocket.hpp" + +#include "communication_interfaces/exceptions/SocketConfigurationException.hpp" + +namespace communication_interfaces::sockets { + +void ISocket::open() { + this->on_open(); + this->opened_ = true; +} + +bool ISocket::receive_bytes(std::string& buffer) { + if (!this->opened_) { + throw exceptions::SocketConfigurationException("Failed to received bytes: socket has not been opened yet"); + } + return this->on_receive_bytes(buffer); +} + +bool ISocket::send_bytes(const std::string& buffer) { + if (!this->opened_) { + throw exceptions::SocketConfigurationException("Failed to send bytes: socket has not been opened yet"); + } + return this->on_send_bytes(buffer); +} + +void ISocket::close() { + this->opened_ = false; + this->on_close(); +} +}// namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/src/sockets/TCPClient.cpp b/source/communication_interfaces/src/sockets/TCPClient.cpp new file mode 100644 index 000000000..7954b64d1 --- /dev/null +++ b/source/communication_interfaces/src/sockets/TCPClient.cpp @@ -0,0 +1,32 @@ +#include "communication_interfaces/sockets/TCPClient.hpp" + +#include + +#include "communication_interfaces/exceptions/SocketConfigurationException.hpp" + +namespace communication_interfaces::sockets { + +TCPClient::TCPClient(TCPClientConfiguration configuration) : TCPSocket(configuration.buffer_size), config_(configuration) {} + +void TCPClient::on_open() { + try { + bzero((char*) &this->server_address_, sizeof(this->server_address_)); + this->server_address_.sin_family = AF_INET; + this->server_address_.sin_port = htons(this->config_.port); + if (inet_pton(AF_INET, this->config_.ip_address.c_str(), &this->server_address_.sin_addr) <= 0) { + throw std::invalid_argument("IP Address not supported"); + } + } catch (const std::exception& ex) { + throw exceptions::SocketConfigurationException("Socket configuration failed: " + std::string(ex.what())); + } + + this->socket_fd_ = socket(AF_INET, SOCK_STREAM, 0); + if (this->socket_fd_ < 0) { + throw exceptions::SocketConfigurationException("Opening socket failed"); + } + + if (::connect(this->socket_fd_, (sockaddr*) &this->server_address_, sizeof(this->server_address_)) != 0) { + throw exceptions::SocketConfigurationException("Connecting client failed"); + } +} +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/src/sockets/TCPServer.cpp b/source/communication_interfaces/src/sockets/TCPServer.cpp new file mode 100644 index 000000000..53455b703 --- /dev/null +++ b/source/communication_interfaces/src/sockets/TCPServer.cpp @@ -0,0 +1,58 @@ +#include "communication_interfaces/sockets/TCPServer.hpp" + +#include +#include + +#include "communication_interfaces/exceptions/SocketConfigurationException.hpp" + +namespace communication_interfaces::sockets { + +TCPServer::TCPServer(TCPServerConfiguration configuration) : + TCPSocket(configuration.buffer_size), config_(configuration), server_fd_() { +} + +TCPServer::~TCPServer() { + TCPServer::on_close(); +} + +void TCPServer::on_open() { + try { + bzero((char*) &this->server_address_, sizeof(this->server_address_)); + this->server_address_.sin_family = AF_INET; + this->server_address_.sin_addr.s_addr = htonl(INADDR_ANY); + this->server_address_.sin_port = htons(this->config_.port); + } catch (const std::exception& ex) { + throw exceptions::SocketConfigurationException("Socket configuration failed: " + std::string(ex.what())); + } + + // open stream oriented socket with internet address + this->server_fd_ = socket(AF_INET, SOCK_STREAM, 0); + if (this->server_fd_ < 0) { + throw exceptions::SocketConfigurationException("Opening socket failed"); + } + if (this->config_.enable_reuse) { + const int opt_reuse = 1; + if (setsockopt(this->server_fd_, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt_reuse, sizeof(opt_reuse)) != 0) { + throw exceptions::SocketConfigurationException("Setting socket option (enable reuse) failed"); + } + } + if (bind(this->server_fd_, (sockaddr*) &(this->server_address_), sizeof(this->server_address_)) != 0) { + throw exceptions::SocketConfigurationException("Binding socket failed"); + } + // listen for only 1 request at a time + listen(this->server_fd_, 1); + // receive a request from client using accept + sockaddr_in new_socket_address{}; + socklen_t new_addr_len = sizeof(new_socket_address); + // accept, create a new socket descriptor to handle the new connection with client + this->socket_fd_ = accept(this->server_fd_, (sockaddr*) &new_socket_address, &new_addr_len); + if (this->socket_fd_ < 0) { + throw exceptions::SocketConfigurationException("Connecting server failed"); + } +} + +void TCPServer::on_close() { + ::close(this->server_fd_); + TCPSocket::on_close(); +} +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/src/sockets/TCPSocket.cpp b/source/communication_interfaces/src/sockets/TCPSocket.cpp new file mode 100644 index 000000000..52a904999 --- /dev/null +++ b/source/communication_interfaces/src/sockets/TCPSocket.cpp @@ -0,0 +1,38 @@ +#include "communication_interfaces/sockets/TCPSocket.hpp" + +#include +#include + +#include "communication_interfaces/exceptions/SocketConfigurationException.hpp" + +namespace communication_interfaces::sockets { + +TCPSocket::TCPSocket(int buffer_size) : server_address_(), socket_fd_(), buffer_size_(buffer_size) { + if (buffer_size <= 0) { + throw exceptions::SocketConfigurationException("Configuration parameter 'buffer_size' has to be greater than 0."); + } +} + +TCPSocket::~TCPSocket() { + TCPSocket::on_close(); +} + +bool TCPSocket::on_receive_bytes(std::string& buffer) { + std::vector local_buffer(this->buffer_size_); + auto receive_length = recv(this->socket_fd_, local_buffer.data(), this->buffer_size_, 0); + if (receive_length < 0) { + return false; + } + buffer.assign(local_buffer.data(), local_buffer.size()); + return true; +} + +bool TCPSocket::on_send_bytes(const std::string& buffer) { + int send_length = send(this->socket_fd_, buffer.data(), buffer.size(), 0); + return send_length == static_cast(buffer.size()); +} + +void TCPSocket::on_close() { + ::close(this->socket_fd_); +} +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/src/sockets/UDPClient.cpp b/source/communication_interfaces/src/sockets/UDPClient.cpp new file mode 100644 index 000000000..408b8c2e0 --- /dev/null +++ b/source/communication_interfaces/src/sockets/UDPClient.cpp @@ -0,0 +1,18 @@ +#include "communication_interfaces/sockets/UDPClient.hpp" + +namespace communication_interfaces::sockets { + +UDPClient::UDPClient(UDPSocketConfiguration configuration) : UDPSocket(std::move(configuration)) {} + +void UDPClient::on_open() { + this->open_socket(false); +} + +bool UDPClient::on_receive_bytes(std::string& buffer) { + return this->recvfrom(this->server_address_, buffer); +} + +bool UDPClient::on_send_bytes(const std::string& buffer) { + return this->sendto(this->server_address_, buffer); +} +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/src/sockets/UDPServer.cpp b/source/communication_interfaces/src/sockets/UDPServer.cpp new file mode 100644 index 000000000..d2b97003c --- /dev/null +++ b/source/communication_interfaces/src/sockets/UDPServer.cpp @@ -0,0 +1,18 @@ +#include "communication_interfaces/sockets/UDPServer.hpp" + +namespace communication_interfaces::sockets { + +UDPServer::UDPServer(UDPSocketConfiguration configuration) : UDPSocket(std::move(configuration)) {} + +void UDPServer::on_open() { + this->open_socket(true); +} + +bool UDPServer::on_receive_bytes(std::string& buffer) { + return this->recvfrom(this->client_address_, buffer); +} + +bool UDPServer::on_send_bytes(const std::string& buffer) { + return this->sendto(this->client_address_, buffer); +} +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/src/sockets/UDPSocket.cpp b/source/communication_interfaces/src/sockets/UDPSocket.cpp new file mode 100644 index 000000000..eea0f903d --- /dev/null +++ b/source/communication_interfaces/src/sockets/UDPSocket.cpp @@ -0,0 +1,84 @@ +#include "communication_interfaces/sockets/UDPSocket.hpp" + +#include +#include +#include +#include + +#include "communication_interfaces/exceptions/SocketConfigurationException.hpp" + +namespace communication_interfaces::sockets { + +UDPSocket::UDPSocket(UDPSocketConfiguration configuration) : + server_address_(), config_(std::move(configuration)), server_fd_(), addr_len_() { + if (this->config_.buffer_size <= 0) { + throw exceptions::SocketConfigurationException("Configuration parameter 'buffer_size' has to be greater than 0."); + } +} + +UDPSocket::~UDPSocket() { + UDPSocket::on_close(); +} + +void UDPSocket::open_socket(bool bind_socket) { + try { + this->addr_len_ = sizeof(this->server_address_); + this->server_address_.sin_family = AF_INET; + this->server_address_.sin_addr.s_addr = inet_addr(this->config_.ip_address.c_str()); + this->server_address_.sin_port = htons(this->config_.port); + } catch (const std::exception& ex) { + throw exceptions::SocketConfigurationException("Socket configuration failed: " + std::string(ex.what())); + } + + this->server_fd_ = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + if (this->server_fd_ < 0) { + throw exceptions::SocketConfigurationException("Opening socket failed"); + } + if (this->config_.enable_reuse) { + const int opt_reuse = 1; + if (setsockopt(this->server_fd_, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt_reuse, sizeof(opt_reuse)) != 0) { + throw exceptions::SocketConfigurationException("Setting socket option (enable reuse) failed"); + } + } + if (bind_socket) { + if (bind(this->server_fd_, (sockaddr*) &(this->server_address_), sizeof(server_address_)) != 0) { + throw exceptions::SocketConfigurationException("Binding socket failed."); + } + } + + if (this->config_.timeout_duration_sec > 0.0 + && this->config_.timeout_duration_sec < std::numeric_limits::max()) { + timeval timeout{}; + auto secs = std::floor(this->config_.timeout_duration_sec); + timeout.tv_sec = static_cast(secs); + timeout.tv_usec = static_cast((this->config_.timeout_duration_sec - secs) * 1e6); + if (setsockopt(this->server_fd_, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)) != 0) { + throw exceptions::SocketConfigurationException("Setting socket timeout failed."); + } + } +} + +bool UDPSocket::recvfrom(sockaddr_in& address, std::string& buffer) { + std::vector local_buffer(this->config_.buffer_size); + auto receive_length = ::recvfrom( + this->server_fd_, local_buffer.data(), this->config_.buffer_size, 0, (sockaddr*) &(address), &(this->addr_len_)); + if (receive_length < 0) { + return false; + } + buffer.assign(local_buffer.data(), local_buffer.size()); + return true; +} + +bool UDPSocket::sendto(const sockaddr_in& address, const std::string& buffer) const { + int send_length = ::sendto( + this->server_fd_, buffer.data(), buffer.size(), 0, (sockaddr*) &(address), this->addr_len_); + return send_length == static_cast(buffer.size()); +} + +void UDPSocket::on_close() { + if (this->server_fd_ >= 0) { + ::close(this->server_fd_); + this->server_fd_ = -1; + } +} +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/src/sockets/ZMQPublisher.cpp b/source/communication_interfaces/src/sockets/ZMQPublisher.cpp new file mode 100644 index 000000000..15a37285e --- /dev/null +++ b/source/communication_interfaces/src/sockets/ZMQPublisher.cpp @@ -0,0 +1,15 @@ +#include "communication_interfaces/sockets/ZMQPublisher.hpp" + +namespace communication_interfaces::sockets { + +ZMQPublisher::ZMQPublisher(ZMQSocketConfiguration configuration) : ZMQSocket(std::move(configuration)) {} + +void ZMQPublisher::on_open() { + this->socket_ = std::make_shared(*this->config_.context, ZMQ_PUB); + this->open_socket(); +} + +bool ZMQPublisher::on_receive_bytes(std::string&) { + throw std::runtime_error("Receive not available for socket of type ZMQPublisher"); +} +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/src/sockets/ZMQPublisherSubscriber.cpp b/source/communication_interfaces/src/sockets/ZMQPublisherSubscriber.cpp new file mode 100644 index 000000000..eff31b846 --- /dev/null +++ b/source/communication_interfaces/src/sockets/ZMQPublisherSubscriber.cpp @@ -0,0 +1,41 @@ +#include "communication_interfaces/sockets/ZMQPublisherSubscriber.hpp" + +namespace communication_interfaces::sockets { + +ZMQPublisherSubscriber::ZMQPublisherSubscriber(ZMQCombinedSocketsConfiguration configuration) { + this->pub_ = std::make_shared( + ZMQSocketConfiguration( + { + configuration.context, configuration.ip_address, configuration.publisher_port, + configuration.bind_publisher, configuration.wait + })); + this->sub_ = std::make_shared( + ZMQSocketConfiguration( + { + configuration.context, configuration.ip_address, configuration.subscriber_port, + configuration.bind_subscriber, configuration.wait + })); +} + +ZMQPublisherSubscriber::~ZMQPublisherSubscriber() { + ZMQPublisherSubscriber::close(); +} + +void ZMQPublisherSubscriber::on_open() { + this->pub_->open(); + this->sub_->open(); +} + +bool ZMQPublisherSubscriber::on_receive_bytes(std::string& buffer) { + return this->sub_->receive_bytes(buffer); +} + +bool ZMQPublisherSubscriber::on_send_bytes(const std::string& buffer) { + return this->pub_->send_bytes(buffer); +} + +void ZMQPublisherSubscriber::on_close() { + this->pub_->close(); + this->sub_->close(); +} +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/src/sockets/ZMQSocket.cpp b/source/communication_interfaces/src/sockets/ZMQSocket.cpp new file mode 100644 index 000000000..cc704093b --- /dev/null +++ b/source/communication_interfaces/src/sockets/ZMQSocket.cpp @@ -0,0 +1,60 @@ +#include "communication_interfaces/sockets/ZMQSocket.hpp" + +#include "communication_interfaces/exceptions/SocketConfigurationException.hpp" + +namespace communication_interfaces::sockets { + +ZMQSocket::ZMQSocket(ZMQSocketConfiguration configuration) : config_(std::move(configuration)) {} + +ZMQSocket::~ZMQSocket() { + ZMQSocket::on_close(); +} + +void ZMQSocket::open_socket() { + try { + auto address = "tcp://" + this->config_.ip_address + ":" + this->config_.port; + if (this->config_.bind) { + this->socket_->bind(address); + } else { + this->socket_->connect(address); + } + } catch (const std::exception& ex) { + throw exceptions::SocketConfigurationException("Socket configuration failed: " + std::string(ex.what())); + } +} + +bool ZMQSocket::on_receive_bytes(std::string& buffer) { + zmq::recv_flags recv_flag = this->config_.wait ? zmq::recv_flags::none : zmq::recv_flags::dontwait; + zmq::message_t message; + try { + auto received = this->socket_->recv(message, recv_flag); + if (received.has_value()) { + buffer = std::string(static_cast(message.data()), message.size()); + } + return received.has_value(); + } catch (const zmq::error_t&) { + return false; + } +} + +bool ZMQSocket::on_send_bytes(const std::string& buffer) { + zmq::send_flags send_flags = this->config_.wait ? zmq::send_flags::none : zmq::send_flags::dontwait; + zmq::message_t msg(buffer.size()); + memcpy(msg.data(), buffer.data(), buffer.size()); + try { + auto sent = this->socket_->send(msg, send_flags); + if (!sent.has_value()) { + return false; + } + return *sent == buffer.size(); + } catch (const zmq::error_t&) { + return false; + } +} + +void ZMQSocket::on_close() { + if (this->socket_ != nullptr && this->socket_->connected()) { + this->socket_->close(); + } +} +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/src/sockets/ZMQSubscriber.cpp b/source/communication_interfaces/src/sockets/ZMQSubscriber.cpp new file mode 100644 index 000000000..540452524 --- /dev/null +++ b/source/communication_interfaces/src/sockets/ZMQSubscriber.cpp @@ -0,0 +1,17 @@ +#include "communication_interfaces/sockets/ZMQSubscriber.hpp" + +namespace communication_interfaces::sockets { + +ZMQSubscriber::ZMQSubscriber(ZMQSocketConfiguration configuration) : ZMQSocket(std::move(configuration)) {} + +void ZMQSubscriber::on_open() { + this->socket_ = std::make_shared(*this->config_.context, ZMQ_SUB); + this->socket_->set(zmq::sockopt::conflate, 1); + this->socket_->set(zmq::sockopt::subscribe, ""); + this->open_socket(); +} + +bool ZMQSubscriber::on_send_bytes(const std::string&) { + throw std::runtime_error("Send not available for socket of type ZMQSubscriber"); +} +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/test/test_communication_interfaces.cpp b/source/communication_interfaces/test/test_communication_interfaces.cpp new file mode 100644 index 000000000..697a9d70c --- /dev/null +++ b/source/communication_interfaces/test/test_communication_interfaces.cpp @@ -0,0 +1,6 @@ +#include + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/source/communication_interfaces/test/tests/test_tcp_communication.cpp b/source/communication_interfaces/test/tests/test_tcp_communication.cpp new file mode 100644 index 000000000..876e199a5 --- /dev/null +++ b/source/communication_interfaces/test/tests/test_tcp_communication.cpp @@ -0,0 +1,72 @@ +#include +#include + +#include "communication_interfaces/sockets/TCPClient.hpp" +#include "communication_interfaces/sockets/TCPServer.hpp" + +using namespace communication_interfaces; + +class TestTCPSockets : public ::testing::Test { +public: + TestTCPSockets() { + server_ = std::make_shared(sockets::TCPServerConfiguration{6000, 50, true}); + client_ = std::make_shared(sockets::TCPClientConfiguration{"127.0.0.1", 6000, 50}); + } + + std::thread start_server() { + return std::thread([this] { this->serve(); }); + } + + void serve() const { + server_->open(); + std::string recv_message; + EXPECT_TRUE(server_->receive_bytes(recv_message)); + EXPECT_STREQ(recv_message.c_str(), client_message_.c_str()); + EXPECT_TRUE(server_->send_bytes(server_message_)); + } + + std::thread start_client() { + return std::thread([this] { this->client(); }); + } + + void client() const { + client_->open(); + EXPECT_TRUE(client_->send_bytes(client_message_)); + std::string recv_message; + EXPECT_TRUE(client_->receive_bytes(recv_message)); + EXPECT_STREQ(recv_message.c_str(), server_message_.c_str()); + } + + std::string client_message_ = "Hello server"; + std::string server_message_ = "Hello client"; + + std::shared_ptr client_; + std::shared_ptr server_; +}; + +TEST_F(TestTCPSockets, TestCommunication) { + std::thread server = start_server(); + usleep(100000); + std::thread client = start_client(); + + client.join(); + server.join(); + + std::string buffer; + this->server_->close(); + EXPECT_THROW(this->server_->receive_bytes(buffer), exceptions::SocketConfigurationException); + EXPECT_THROW(this->server_->send_bytes(buffer), exceptions::SocketConfigurationException); + this->client_->close(); + EXPECT_THROW(this->client_->receive_bytes(buffer), exceptions::SocketConfigurationException); + EXPECT_THROW(this->client_->send_bytes(buffer), exceptions::SocketConfigurationException); +} + +TEST_F(TestTCPSockets, TestNotOpen) { + std::string buffer; + + EXPECT_THROW(this->server_->receive_bytes(buffer), exceptions::SocketConfigurationException); + EXPECT_THROW(this->server_->send_bytes(buffer), exceptions::SocketConfigurationException); + + EXPECT_THROW(this->client_->receive_bytes(buffer), exceptions::SocketConfigurationException); + EXPECT_THROW(this->client_->send_bytes(buffer), exceptions::SocketConfigurationException); +} diff --git a/source/communication_interfaces/test/tests/test_udp_communication.cpp b/source/communication_interfaces/test/tests/test_udp_communication.cpp new file mode 100644 index 000000000..ef332ee9d --- /dev/null +++ b/source/communication_interfaces/test/tests/test_udp_communication.cpp @@ -0,0 +1,88 @@ +#include + +#include "communication_interfaces/exceptions/SocketConfigurationException.hpp" +#include "communication_interfaces/sockets/UDPClient.hpp" +#include "communication_interfaces/sockets/UDPServer.hpp" + +using namespace communication_interfaces; + +class TestUDPSockets : public ::testing::Test { +public: + TestUDPSockets() { + config_ = {"127.0.0.1", 5000, 100}; + } + + sockets::UDPSocketConfiguration config_; +}; + +TEST_F(TestUDPSockets, SendReceive) { + const std::string send_string = "Hello world!"; + std::string receive_string; + + // Create server socket and bind it to a port + sockets::UDPServer server(this->config_); + ASSERT_NO_THROW(server.open()); + + // Create client socket + sockets::UDPClient client(this->config_); + ASSERT_NO_THROW(client.open()); + + // Send test message from client to server + ASSERT_TRUE(client.send_bytes(send_string)); + + // Receive message on server + ASSERT_TRUE(server.receive_bytes(receive_string)); + + // Convert received message to string and compare with sent message + EXPECT_STREQ(receive_string.c_str(), send_string.c_str()); +} + +TEST_F(TestUDPSockets, Timeout) { + // Create server socket and bind it to a port + this->config_.timeout_duration_sec = 3.0; + sockets::UDPServer server(this->config_); + server.open(); + + // Try to receive a message from client, but expect timeout + std::string received_bytes; + EXPECT_FALSE(server.receive_bytes(received_bytes)); +} + +TEST_F(TestUDPSockets, PortReuse) { + // Create server socket and bind it to a port + sockets::UDPServer server1(this->config_); + server1.open(); + + // Try to create a second server socket and bind it to the same port (expect failure) + sockets::UDPServer server2(this->config_); + EXPECT_THROW(server2.open(), exceptions::SocketConfigurationException); +} + +TEST_F(TestUDPSockets, OpenClose) { + std::string buffer; + // Create and open server socket + this->config_.timeout_duration_sec = 0.5; + sockets::UDPServer server(this->config_); + EXPECT_THROW(server.send_bytes(buffer), exceptions::SocketConfigurationException); + EXPECT_THROW(server.receive_bytes(buffer), exceptions::SocketConfigurationException); + server.open(); + + EXPECT_FALSE(server.send_bytes(std::string("test"))); + EXPECT_FALSE(server.receive_bytes(buffer)); + // Close server socket + server.close(); + EXPECT_THROW(server.send_bytes(buffer), exceptions::SocketConfigurationException); + EXPECT_THROW(server.receive_bytes(buffer), exceptions::SocketConfigurationException); + + sockets::UDPClient client(this->config_); + EXPECT_THROW(client.send_bytes(buffer), exceptions::SocketConfigurationException); + EXPECT_THROW(client.receive_bytes(buffer), exceptions::SocketConfigurationException); + client.open(); + + EXPECT_TRUE(client.send_bytes(std::string("test"))); + EXPECT_FALSE(client.receive_bytes(buffer)); + // Close client socket + client.close(); + EXPECT_THROW(client.send_bytes(buffer), exceptions::SocketConfigurationException); + EXPECT_THROW(client.receive_bytes(buffer), exceptions::SocketConfigurationException); +} diff --git a/source/communication_interfaces/test/tests/test_zmq_communication.cpp b/source/communication_interfaces/test/tests/test_zmq_communication.cpp new file mode 100644 index 000000000..5c938f606 --- /dev/null +++ b/source/communication_interfaces/test/tests/test_zmq_communication.cpp @@ -0,0 +1,82 @@ +#include +#include + +#include "communication_interfaces/sockets/ZMQPublisher.hpp" +#include "communication_interfaces/sockets/ZMQSubscriber.hpp" +#include "communication_interfaces/sockets/ZMQPublisherSubscriber.hpp" + +using namespace communication_interfaces; +using namespace std::chrono_literals; + +class TestZMQSockets : public ::testing::Test { +public: + TestZMQSockets() { + auto context = std::make_shared(1); + config_ = {context, "127.0.0.1", "4000"}; + } + + sockets::ZMQSocketConfiguration config_; +}; + +TEST_F(TestZMQSockets, SendReceive) { + const std::string send_string = "Hello world!"; + std::string receive_string; + + sockets::ZMQPublisher publisher(this->config_); + this->config_.bind = false; + sockets::ZMQSubscriber subscriber(this->config_); + + publisher.open(); + subscriber.open(); + + while (!subscriber.receive_bytes(receive_string)) { + EXPECT_TRUE(publisher.send_bytes(send_string)); + usleep(10000); + } + EXPECT_STREQ(receive_string.c_str(), send_string.c_str()); + publisher.close(); + subscriber.close(); +} + +TEST_F(TestZMQSockets, SendReceiveCombined) { + const std::string server_send_string = "Hello client!"; + const std::string client_send_string = "Hello server!"; + std::string server_receive_string, client_receive_string; + + sockets::ZMQCombinedSocketsConfiguration server_config = {config_.context, config_.ip_address, "5001", "5002"}; + sockets::ZMQPublisherSubscriber server(server_config); + + sockets::ZMQCombinedSocketsConfiguration + client_config = {config_.context, config_.ip_address, "5002", "5001", false, false}; + sockets::ZMQPublisherSubscriber client(client_config); + + server.open(); + client.open(); + + while (!client.receive_bytes(client_receive_string)) { + EXPECT_TRUE(server.send_bytes(server_send_string)); + usleep(10000); + } + EXPECT_STREQ(client_receive_string.c_str(), server_send_string.c_str()); + + while (!server.receive_bytes(server_receive_string)) { + EXPECT_TRUE(client.send_bytes(client_send_string)); + usleep(10000); + } + EXPECT_STREQ(server_receive_string.c_str(), client_send_string.c_str()); + + server.close(); + client.close(); +} + +TEST_F(TestZMQSockets, TestOpenClose) { + std::string buffer; + sockets::ZMQPublisher socket(this->config_); + EXPECT_THROW(socket.send_bytes(buffer), exceptions::SocketConfigurationException); + EXPECT_THROW(socket.receive_bytes(buffer), std::runtime_error); + + socket.open(); + EXPECT_TRUE(socket.send_bytes(std::string("test"))); + socket.close(); + EXPECT_THROW(socket.send_bytes(buffer), exceptions::SocketConfigurationException); +} From 33ad390640c5b712154dc1bfa4f819a2356dbc60 Mon Sep 17 00:00:00 2001 From: Dominic Reber Date: Wed, 1 May 2024 08:46:11 +0200 Subject: [PATCH 2/5] chore: update build system --- Dockerfile | 3 ++- apt-packages.txt | 1 + dependencies/dependencies.cmake | 10 +++++++++- python/setup.py | 14 +++++++++++++- source/CMakeLists.txt | 7 +++++++ source/control_librariesConfig.cmake.in | 6 ++++++ 6 files changed, 38 insertions(+), 3 deletions(-) diff --git a/Dockerfile b/Dockerfile index fbc09a471..cdf29561a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -129,7 +129,8 @@ COPY --from=apt-dependencies /tmp/apt / COPY --from=base-dependencies /tmp/deps /usr COPY dependencies/dependencies.cmake CMakeLists.txt RUN --mount=type=cache,target=/build,id=cmake-deps-${TARGETPLATFORM}-${CACHEID},uid=1000 \ - cmake -B build -Dprotobuf_BUILD_TESTS=OFF -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} && cmake --build build && cmake --install build --prefix /tmp/deps + cmake -B build -Dprotobuf_BUILD_TESTS=OFF -DCPPZMQ_BUILD_TESTS=OFF -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} \ + && cmake --build build && cmake --install build --prefix /tmp/deps COPY --from=base-dependencies /tmp/deps /tmp/deps COPY --from=pinocchio-dependencies /tmp/deps /tmp/deps diff --git a/apt-packages.txt b/apt-packages.txt index 2988803ab..d080e8fe6 100644 --- a/apt-packages.txt +++ b/apt-packages.txt @@ -2,3 +2,4 @@ libboost-all-dev libeigen3-dev liburdfdom-dev libassimp-dev +libzmq3-dev diff --git a/dependencies/dependencies.cmake b/dependencies/dependencies.cmake index 38830257c..99093b885 100644 --- a/dependencies/dependencies.cmake +++ b/dependencies/dependencies.cmake @@ -18,4 +18,12 @@ FetchContent_Declare( SOURCE_SUBDIR cmake ) -FetchContent_MakeAvailable(OsqpEigen protobuf) +FetchContent_Declare( + cppzmq + GIT_REPOSITORY https://github.com/zeromq/cppzmq/ + GIT_TAG v4.7.1 +) +FetchContent_MakeAvailable(cppzmq) + + +FetchContent_MakeAvailable(OsqpEigen protobuf cppzmq) diff --git a/python/setup.py b/python/setup.py index bed40e169..1476e8f06 100644 --- a/python/setup.py +++ b/python/setup.py @@ -12,13 +12,14 @@ osqp_path_var = 'OSQP_INCLUDE_DIR' __version__ = "8.0.0" -__libraries__ = ['state_representation', 'clproto', 'controllers', 'dynamical_systems', 'robot_model'] +__libraries__ = ['state_representation', 'clproto', 'controllers', 'dynamical_systems', 'robot_model', 'communicatoin_interfaces'] __include_dirs__ = ['include'] __install_clproto_module__ = True __install_controllers_module__ = True __install_dynamical_systems_module__ = True __install_robot_model_module__ = True +__install_communication_interfaces_module__ = True # check that necessary libraries can be found try: @@ -122,6 +123,17 @@ ) ) +if __install_communication_interfaces_module__: + ext_modules.append( + Pybind11Extension('communication_interfaces', + sorted(glob('source/communication_interfaces/*.cpp')), + cxx_std=17, + include_dirs=__include_dirs__, + libraries=['communication_interfaces'], + define_macros=[('MODULE_VERSION_INFO', __version__)], + ) + ) + setup( name='control-libraries', version=__version__, diff --git a/source/CMakeLists.txt b/source/CMakeLists.txt index c77afad44..7601102f2 100644 --- a/source/CMakeLists.txt +++ b/source/CMakeLists.txt @@ -7,6 +7,7 @@ option(BUILD_TESTING "Build all tests." OFF) option(BUILD_CONTROLLERS "Build and install controllers library" ON) option(BUILD_DYNAMICAL_SYSTEMS "Build and install dynamical systems library" ON) option(BUILD_ROBOT_MODEL "Build and install robot model library" ON) +option(BUILD_COMMUNICATION_INTERFACES "Build and install communication interfaces library" ON) option(EXPERIMENTAL_FEATURES "Include experimental features" OFF) # Default to C99 @@ -78,6 +79,12 @@ if(BUILD_CONTROLLERS) add_pkgconfig_library(controllers ${PROJECT_VERSION}) endif() +if(BUILD_COMMUNICATION_INTERFACES) + add_subdirectory(communication_interfaces) + list(APPEND INSTALL_SUPPORTED_COMPONENTS communication_interfaces) + add_pkgconfig_library(communication_interfaces ${PROJECT_VERSION}) +endif() + # generate the version file for the config file write_basic_package_version_file( "${CMAKE_CURRENT_BINARY_DIR}/${PROJECT_NAME}ConfigVersion.cmake" diff --git a/source/control_librariesConfig.cmake.in b/source/control_librariesConfig.cmake.in index f244c2f63..4c580a77a 100644 --- a/source/control_librariesConfig.cmake.in +++ b/source/control_librariesConfig.cmake.in @@ -22,6 +22,12 @@ while(_control_libraries_to_find) set_and_check(control_libraries_NOT_FOUND_MESSAGE "Unsupported component: ${_comp}") endif() + # Find communication interfaces dependencies if the corresponding component is needed + # FIXME: this should be done automatically + if (${_comp} STREQUAL "communication_interfaces") + find_dependency(cppzmq) + endif() + # Find robot model dependencies if the corresponding component is needed # FIXME: this should be done automatically if (${_comp} STREQUAL "robot_model") From a0b5ae4462c3c8c41f97ed705df924e05a45fe20 Mon Sep 17 00:00:00 2001 From: Dominic Reber <71256590+domire8@users.noreply.github.com> Date: Wed, 1 May 2024 08:50:15 +0200 Subject: [PATCH 3/5] fix: typo --- python/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/setup.py b/python/setup.py index 1476e8f06..42a7e18e2 100644 --- a/python/setup.py +++ b/python/setup.py @@ -12,7 +12,7 @@ osqp_path_var = 'OSQP_INCLUDE_DIR' __version__ = "8.0.0" -__libraries__ = ['state_representation', 'clproto', 'controllers', 'dynamical_systems', 'robot_model', 'communicatoin_interfaces'] +__libraries__ = ['state_representation', 'clproto', 'controllers', 'dynamical_systems', 'robot_model', 'communication_interfaces'] __include_dirs__ = ['include'] __install_clproto_module__ = True From bd147af4123e8b931fd4648ba6d398574fec77eb Mon Sep 17 00:00:00 2001 From: Dominic Reber Date: Wed, 1 May 2024 17:51:02 +0200 Subject: [PATCH 4/5] release: version 8.1.0 --- CHANGELOG.md | 11 +++ VERSION | 2 +- demos/CMakeLists.txt | 2 +- doxygen/doxygen.conf | 2 +- protocol/clproto_cpp/CMakeLists.txt | 2 +- python/README.md | 31 ++++++ python/setup.py | 2 +- source/CMakeLists.txt | 2 +- source/communication_interfaces/README.md | 115 ++++++++++++++++++++++ 9 files changed, 163 insertions(+), 6 deletions(-) create mode 100644 source/communication_interfaces/README.md diff --git a/CHANGELOG.md b/CHANGELOG.md index 61cf44874..448dccc36 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ Release Versions +- [8.1.0](#810) - [8.0.0](#800) - [7.4.0](#740) - [7.3.0](#730) @@ -13,6 +14,16 @@ Release Versions - [6.3.0](#630) - [6.2.0](#620) +## 8.1.0 + +Version 8.1.0 adds a new module called `communication_interfaces` to the control libraries. This is a library for +simple socket communication and was previously developed in a different place. It currently implements sockets for UPD, +TCP, and ZMQ communication. + +### Full changelog + +- feat: migrate communication interfaces (#190) + ## 8.0.0 Version 8.0.0 is a major update that adds new state types and collision detection features to control-libraries. diff --git a/VERSION b/VERSION index ae9a76b92..8104cabd3 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -8.0.0 +8.1.0 diff --git a/demos/CMakeLists.txt b/demos/CMakeLists.txt index ddb9d359e..12fb305e9 100644 --- a/demos/CMakeLists.txt +++ b/demos/CMakeLists.txt @@ -15,7 +15,7 @@ if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") add_compile_options(-Wall -Wextra -Wpedantic) endif() -find_package(control_libraries 8.0.0 CONFIG REQUIRED) +find_package(control_libraries 8.1.0 CONFIG REQUIRED) set(DEMOS_SCRIPTS task_space_control_loop diff --git a/doxygen/doxygen.conf b/doxygen/doxygen.conf index 23040f939..1d8e4774c 100644 --- a/doxygen/doxygen.conf +++ b/doxygen/doxygen.conf @@ -38,7 +38,7 @@ PROJECT_NAME = "Control Libraries" # could be handy for archiving the generated documentation or if some version # control system is used. -PROJECT_NUMBER = 8.0.0 +PROJECT_NUMBER = 8.1.0 # Using the PROJECT_BRIEF tag one can provide an optional one line description # for a project that appears at the top of each page and should give viewer a diff --git a/protocol/clproto_cpp/CMakeLists.txt b/protocol/clproto_cpp/CMakeLists.txt index 625a44619..27decb009 100644 --- a/protocol/clproto_cpp/CMakeLists.txt +++ b/protocol/clproto_cpp/CMakeLists.txt @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 3.15) -project(clproto VERSION 8.0.0) +project(clproto VERSION 8.1.0) # Default to C99 if(NOT CMAKE_C_STANDARD) diff --git a/python/README.md b/python/README.md index fe5bbdc16..2835c8c6b 100644 --- a/python/README.md +++ b/python/README.md @@ -62,3 +62,34 @@ encoded_msg = clproto.encode(B, clproto.MessageType.JOINT_STATE_MESSAGE) decoded_object = clproto.decode(encoded_msg) ``` + +### Note on the communication interfaces + +The Python bindings require an additional step of sanitizing the data when sending and receiving bytes. To illustrate +this, an example is provided here. + +```python +# First a server and a client is connected +context = ZMQContext() +server = ZMQPublisher(ZMQSocketConfiguration(context, "127.0.0.1", "5001", True)) +client = ZMQSubscriber(ZMQSocketConfiguration(context, "127.0.0.1", "5001", False)) +server.open() +client.open() +``` + +```python +# Then a string is sent through +str_msg = "Hello!" +server.send_bytes(str_msg) +received_str_msg = client.receive_bytes() +if received_str_msg is not None: + print(received_str_msg) +``` + +Here we expect the printed value to be `Hello!`, however due to the way strings and bytes are processed, the string +message is left as a byte literal and `b'Hello!'` is printed instead. We can correct this as follows: + +```python +# Instead, decode the value +print(received_str_msg.decode("utf-8")) # will print Hello! as expected +``` diff --git a/python/setup.py b/python/setup.py index 42a7e18e2..bd6acdc9d 100644 --- a/python/setup.py +++ b/python/setup.py @@ -11,7 +11,7 @@ # names of the environment variables that define osqp and openrobots include directories osqp_path_var = 'OSQP_INCLUDE_DIR' -__version__ = "8.0.0" +__version__ = "8.1.0" __libraries__ = ['state_representation', 'clproto', 'controllers', 'dynamical_systems', 'robot_model', 'communication_interfaces'] __include_dirs__ = ['include'] diff --git a/source/CMakeLists.txt b/source/CMakeLists.txt index 7601102f2..baee09fe2 100644 --- a/source/CMakeLists.txt +++ b/source/CMakeLists.txt @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 3.15) -project(control_libraries VERSION 8.0.0) +project(control_libraries VERSION 8.1.0) # Build options option(BUILD_TESTING "Build all tests." OFF) diff --git a/source/communication_interfaces/README.md b/source/communication_interfaces/README.md new file mode 100644 index 000000000..ef9a1c246 --- /dev/null +++ b/source/communication_interfaces/README.md @@ -0,0 +1,115 @@ +# Communication interfaces + +A library for simple socket communication. It currently implements sockets for UPD, TCP, and ZMQ communication. + +## Socket interface + +The `ISocket` class is an interface for simple socket communication, defining functions for opening a socket, +sending and receiving bytes, and closing the socket connection. + +The `ISocket` class defines an `open()` method to perform configuration steps to open the socket for communication. +If opening the socket fails, an exception is thrown. The `close()` method is also provided to perform steps to disconnect +and close the socket communication. + +The functions `receive_bytes(std::string&)` and `send_bytes(const std::string&)` perform the read and write logic of the socket +respectively. + +### Implementing a derived socket class + +To use this class, create a subclass that inherits from it and implement its pure virtual functions. The pure virtual +functions are `on_open()`, `on_receive_bytes(std::string&)`, and `on_send_bytes(const std::string&)`. + +Configuration parameters should be passed with a configuration struct, resulting in a single argument constructor. + +The `on_close()` function can optionally be overridden to perform steps to disconnect and close the socket communication. +If a derived class defines any cleanup behavior in `on_close()`, it should also be invoked statically and explicitly +in the destructor of the derived class. + +An example is given below. + +```c++ +// DerivedSocket.hpp + +struct DerivedSocketConfig { + int param1; + double param2; +}; + +class DerivedSocket : ISocket { +public: + DerivedSocket(DerivedSocketConfig configuration); + + ~DerivedSocket() override; + +private: + void on_open() override; + + bool on_receive_bytes(std::string& buffer) override; + + bool on_send_bytes(const std::string& buffer) override; + + void on_close() override; +} +``` + +```c++ +// DerivedSocket.cpp +DerivedSocket::DerivedSocket(DerivedSocketConfig configuraiton) { + // save configuration parameters for later use +} + +DerivedSocket::~DerivedSocket() { + DerivedSocket::on_close(); +} + +void DerivedSocket::on_open() { + // Configure and open the socket +} + +bool DerivedSocket::on_receive_bytes(std::string& buffer) { + // Read the contents of the socket into the buffer and return true on success. Otherwise, return false. + return true; +} + +bool DerivedSocket::on_send_bytes(const std::string& buffer) { + // Write the contents of the buffer onto the socket and return true on success. Otherwise, return false. + return true; +} + +void DerivedSocket::on_close() { + // Perform clean-up steps here +} +``` + +## Notes on ZMQ communication + +It can be difficult to set up a working configuration for ZMQ communication. The examples below assume that there are +two ZMQ sockets, one that has the robot state is on port 1601 and the command on 1602: + +#### Everything runs in one container / on the same host (network independent) + +If all applications run in the same container, or on the same host, the situation is: + +- The robot publishes its state on `0.0.0.0:1601` and listens to commands on `0.0.0.0:1602` with both sockets + non-binding. +- The controller sends the command on `*:1602` and receives the state on `*:1601` with both sockets binding. + +#### One or more containers and host, all on host network and with no hostname + +Same as above. + +#### Several containers, user-defined bridge network with hostnames + +If the containers all run on a user-defined bridge network, the connecting sockets need to be provided with the hostname +of the binding sockets. For example, if the containers are running on network *aicanet* and have hostnames *robot* and +*controller*, respectively. + +- The robot publishes its state on `controller.aicanet:1601` and listens to commands on `controller.aicanet:1602` with + both sockets non-binding. +- The controller sends the command on `*:1602` and receives the state on `*:1601` with both sockets binding. + +#### Note + +- This list of combinations is not exhaustive. +- The binding sockets always have a URI like `*:port` whilst the connecting sockets need to provide the complete address + version (`0.0.0.0:port` if on localhost or `hostname.network:port` if on bridge network). From 41a59f600c8ba2c2f63e0660be92366db31bd2ca Mon Sep 17 00:00:00 2001 From: Dominic Reber Date: Thu, 2 May 2024 08:25:47 +0200 Subject: [PATCH 5/5] fix: typo --- source/communication_interfaces/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/communication_interfaces/CMakeLists.txt b/source/communication_interfaces/CMakeLists.txt index 7a35bca7d..2b3bc274e 100644 --- a/source/communication_interfaces/CMakeLists.txt +++ b/source/communication_interfaces/CMakeLists.txt @@ -65,7 +65,7 @@ endif () if(${PKG_CONFIG_FOUND}) set(PKG_NAME ${LIBRARY_NAME}) set(PKG_DESC "Communication interfaces for socket communication.") - set(pkg_conf_file "communicatoin_interfaces.pc") + set(pkg_conf_file "communication_interfaces.pc") set(PKG_CFLAGS "-lcppzmq") configure_file("${CMAKE_CURRENT_SOURCE_DIR}/../library_template.pc.in" "${CMAKE_CURRENT_BINARY_DIR}/${pkg_conf_file}" @ONLY) install(FILES "${CMAKE_CURRENT_BINARY_DIR}/${pkg_conf_file}"