From 31624d95365f2a78693a45449d92e237bbba09f5 Mon Sep 17 00:00:00 2001 From: Michael de Lang Date: Sun, 5 Jan 2025 00:04:24 +0100 Subject: [PATCH] Refactor boost support to proper event loop, removing need for locks and threads --- .github/workflows/cmake-internal-debug.yml | 2 +- .github/workflows/cmake.yml | 14 +- .github/workflows/coverage.yml | 2 +- CMakeLists.txt | 18 +- Dockerfile | 2 +- Dockerfile-asan | 13 +- Dockerfile-asan-beast-deprecated | 98 +++++++++++ Dockerfile-asan-clang | 15 +- Dockerfile-clang | 2 +- Dockerfile-musl | 2 +- Dockerfile-musl-aarch64 | 2 +- Dockerfile-tsan | 42 +---- build.sh | 20 +-- build_common.sh | 4 +- examples/CMakeLists.txt | 12 +- examples/etcd_example/main.cpp | 6 +- examples/http_example/main.cpp | 47 ++++-- examples/http_ping_pong/ping.cpp | 6 +- examples/http_ping_pong/pong.cpp | 8 +- examples/websocket_example/main.cpp | 44 +++-- include/ichor/event_queues/BoostAsioQueue.h | 42 +++++ include/ichor/event_queues/IBoostAsioQueue.h | 25 +++ include/ichor/event_queues/IOUringQueue.h | 3 +- include/ichor/interfaces/IFrameworkLogger.h | 12 -- .../network/boost/AsioContextService.h | 61 ------- .../network/boost/HttpConnectionService.h | 27 ++- .../services/network/boost/HttpHostService.h | 26 ++- .../network/boost/WsConnectionService.h | 18 +- .../services/network/boost/WsHostService.h | 13 +- quickbuild.sh | 5 +- src/ichor/event_queues/BoostAsioQueue.cpp | 158 ++++++++++++++++++ src/ichor/event_queues/IOUringQueue.cpp | 6 +- src/ichor/event_queues/SdeventQueue.cpp | 6 +- .../network/boost/AsioContextService.cpp | 136 --------------- .../network/boost/HttpConnectionService.cpp | 145 ++++++++-------- .../network/boost/HttpHostService.cpp | 149 ++++++++--------- .../network/boost/WsConnectionService.cpp | 147 ++++++---------- src/services/network/boost/WsHostService.cpp | 53 +++--- test/EtcdTests.cpp | 9 +- test/HttpTests.cpp | 53 +----- test/QueueTests.cpp | 55 ++++++ 41 files changed, 767 insertions(+), 741 deletions(-) create mode 100644 Dockerfile-asan-beast-deprecated create mode 100644 include/ichor/event_queues/BoostAsioQueue.h create mode 100644 include/ichor/event_queues/IBoostAsioQueue.h delete mode 100644 include/ichor/services/network/boost/AsioContextService.h create mode 100644 src/ichor/event_queues/BoostAsioQueue.cpp delete mode 100644 src/services/network/boost/AsioContextService.cpp diff --git a/.github/workflows/cmake-internal-debug.yml b/.github/workflows/cmake-internal-debug.yml index 0ca337a4..42ad6226 100644 --- a/.github/workflows/cmake-internal-debug.yml +++ b/.github/workflows/cmake-internal-debug.yml @@ -37,5 +37,5 @@ jobs: - name: Examples working-directory: ${{github.workspace}}/bin - run: ../bin/ichor_etcd_example && ../bin/ichor_http_example && ../bin/ichor_multithreaded_example && ../bin/ichor_optional_dependency_example && ../bin/ichor_serializer_example && ../bin/ichor_tcp_example && ../bin/ichor_timer_example && ../bin/ichor_factory_example && ../bin/ichor_websocket_example && ../bin/ichor_websocket_example -t4 && ../bin/ichor_yielding_timer_example && ../bin/ichor_event_statistics_example && ../bin/ichor_introspection_example && ../bin/ichor_coroutine_benchmark && ../bin/ichor_event_benchmark && ../bin/ichor_serializer_benchmark && ../bin/ichor_start_benchmark && ../bin/ichor_start_stop_benchmark && ../bin/ichor_utils_benchmark -r + run: ../bin/ichor_etcd_example && ../bin/ichor_http_example_boost && ../bin/ichor_http_example_uring && ../bin/ichor_multithreaded_example && ../bin/ichor_multithreaded_example_uring && ../bin/ichor_optional_dependency_example && ../bin/ichor_serializer_example && ../bin/ichor_tcp_example && ../bin/ichor_tcp_example_uring && ../bin/ichor_timer_example && ../bin/ichor_yielding_timer_example && ../bin/ichor_timer_example_uring && ../bin/ichor_yielding_timer_example_uring && ../bin/ichor_factory_example && ../bin/ichor_websocket_example && ../bin/ichor_event_statistics_example && ../bin/ichor_introspection_example && ../bin/ichor_coroutine_benchmark && ../bin/ichor_event_benchmark && ../bin/ichor_serializer_benchmark && ../bin/ichor_start_benchmark && ../bin/ichor_start_stop_benchmark && ../bin/ichor_utils_benchmark -r diff --git a/.github/workflows/cmake.yml b/.github/workflows/cmake.yml index 36385cac..6302953c 100644 --- a/.github/workflows/cmake.yml +++ b/.github/workflows/cmake.yml @@ -41,7 +41,7 @@ jobs: if: ${{matrix.sanitizer == 'asan'}} # Configure CMake in a 'build' subdirectory. `CMAKE_BUILD_TYPE` is only required if you are using a single-configuration generator such as make. # See https://cmake.org/cmake/help/latest/variable/CMAKE_BUILD_TYPE.html?highlight=cmake_build_type - run: cmake -GNinja -B ${{github.workspace}}/build -DCMAKE_BUILD_TYPE=${{matrix.opts}} -DICHOR_USE_SPDLOG=${{matrix.spdlog}} -DICHOR_USE_BOOST_BEAST=ON -DICHOR_USE_SDEVENT=ON -DICHOR_USE_SANITIZERS=ON -DICHOR_USE_MOLD=ON -DICHOR_USE_LIBCPP=OFF -DICHOR_USE_LIBURING=ON + run: cmake -GNinja -B ${{github.workspace}}/build -DCMAKE_BUILD_TYPE=${{matrix.opts}} -DICHOR_USE_SPDLOG=${{matrix.spdlog}} -DICHOR_USE_BOOST_BEAST=OFF -DICHOR_USE_SDEVENT=ON -DICHOR_USE_SANITIZERS=ON -DICHOR_USE_MOLD=ON -DICHOR_USE_LIBCPP=OFF -DICHOR_USE_LIBURING=ON - name: "Configure CMake (spdlog:${{matrix.spdlog}}, no asan)" if: ${{matrix.sanitizer == 'none'}} @@ -53,7 +53,7 @@ jobs: if: ${{matrix.sanitizer == 'tsan'}} # Configure CMake in a 'build' subdirectory. `CMAKE_BUILD_TYPE` is only required if you are using a single-configuration generator such as make. # See https://cmake.org/cmake/help/latest/variable/CMAKE_BUILD_TYPE.html?highlight=cmake_build_type - run: cmake -GNinja -B ${{github.workspace}}/build -DCMAKE_BUILD_TYPE=${{matrix.opts}} -DICHOR_USE_SPDLOG=${{matrix.spdlog}} -DICHOR_USE_BOOST_BEAST=ON -DICHOR_USE_SDEVENT=ON -DICHOR_USE_SANITIZERS=OFF -DICHOR_USE_THREAD_SANITIZER=ON -DICHOR_USE_MOLD=ON -DICHOR_USE_LIBCPP=OFF -DICHOR_USE_LIBURING=ON + run: cmake -GNinja -B ${{github.workspace}}/build -DCMAKE_BUILD_TYPE=${{matrix.opts}} -DICHOR_USE_SPDLOG=${{matrix.spdlog}} -DICHOR_USE_BOOST_BEAST=OFF -DICHOR_USE_SDEVENT=ON -DICHOR_USE_SANITIZERS=OFF -DICHOR_USE_THREAD_SANITIZER=ON -DICHOR_USE_MOLD=ON -DICHOR_USE_LIBCPP=OFF -DICHOR_USE_LIBURING=ON - name: Build # Build your program with the given configuration @@ -65,7 +65,13 @@ jobs: # See https://cmake.org/cmake/help/latest/manual/ctest.1.html for more detail run: ctest --output-on-failure -C ${{env.BUILD_TYPE}} - - name: Examples + - name: Examples no san + if: ${{matrix.sanitizer == 'none'}} + working-directory: ${{github.workspace}}/bin + run: ../bin/ichor_etcd_example && ../bin/ichor_http_example_boost && ../bin/ichor_http_example_uring && ../bin/ichor_multithreaded_example && ../bin/ichor_multithreaded_example_uring && ../bin/ichor_optional_dependency_example && ../bin/ichor_serializer_example && ../bin/ichor_tcp_example && ../bin/ichor_tcp_example_uring && ../bin/ichor_timer_example && ../bin/ichor_yielding_timer_example && ../bin/ichor_timer_example_uring && ../bin/ichor_yielding_timer_example_uring && ../bin/ichor_factory_example && ../bin/ichor_websocket_example && ../bin/ichor_event_statistics_example && ../bin/ichor_introspection_example + + - name: Examples san + if: ${{matrix.sanitizer != 'none'}} working-directory: ${{github.workspace}}/bin - run: ../bin/ichor_etcd_example && ../bin/ichor_http_example && ../bin/ichor_multithreaded_example && ../bin/ichor_optional_dependency_example && ../bin/ichor_serializer_example && ../bin/ichor_tcp_example && ../bin/ichor_timer_example && ../bin/ichor_factory_example && ../bin/ichor_websocket_example && ../bin/ichor_websocket_example -t4 && ../bin/ichor_yielding_timer_example && ../bin/ichor_event_statistics_example && ../bin/ichor_introspection_example + run: ../bin/ichor_http_example_uring && ../bin/ichor_multithreaded_example && ../bin/ichor_multithreaded_example_uring && ../bin/ichor_optional_dependency_example && ../bin/ichor_serializer_example && ../bin/ichor_tcp_example && ../bin/ichor_tcp_example_uring && ../bin/ichor_timer_example && ../bin/ichor_yielding_timer_example && ../bin/ichor_timer_example_uring && ../bin/ichor_yielding_timer_example_uring && ../bin/ichor_factory_example && ../bin/ichor_event_statistics_example && ../bin/ichor_introspection_example diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index 9d77f7fa..2811c9df 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -35,7 +35,7 @@ jobs: - name: Examples working-directory: ${{github.workspace}}/bin - run: ../bin/ichor_etcd_example && ../bin/ichor_http_example && ../bin/ichor_multithreaded_example && ../bin/ichor_optional_dependency_example && ../bin/ichor_serializer_example && ../bin/ichor_tcp_example && ../bin/ichor_timer_example && ../bin/ichor_factory_example && ../bin/ichor_websocket_example && ../bin/ichor_yielding_timer_example && ../bin/ichor_event_statistics_example && ../bin/ichor_introspection_example && ../bin/ichor_coroutine_benchmark && ../bin/ichor_event_benchmark && ../bin/ichor_serializer_benchmark && ../bin/ichor_start_benchmark && ../bin/ichor_start_stop_benchmark && ../bin/ichor_utils_benchmark -r + run: ../bin/ichor_etcd_example && ../bin/ichor_http_example_boost && ../bin/ichor_http_example_uring && ../bin/ichor_multithreaded_example && ../bin/ichor_multithreaded_example_uring && ../bin/ichor_optional_dependency_example && ../bin/ichor_serializer_example && ../bin/ichor_tcp_example && ../bin/ichor_tcp_example_uring && ../bin/ichor_timer_example && ../bin/ichor_yielding_timer_example && ../bin/ichor_timer_example_uring && ../bin/ichor_yielding_timer_example_uring && ../bin/ichor_factory_example && ../bin/ichor_websocket_example && ../bin/ichor_event_statistics_example && ../bin/ichor_introspection_example && ../bin/ichor_coroutine_benchmark && ../bin/ichor_event_benchmark && ../bin/ichor_serializer_benchmark && ../bin/ichor_start_benchmark && ../bin/ichor_start_stop_benchmark && ../bin/ichor_utils_benchmark -r - name: coverage run: lcov --directory . --capture --output-file coverage.info && lcov --remove coverage.info '/usr/*' '/opt/*' '${{github.workspace}}/external/*' --output-file coverage.info && lcov --list coverage.info && codecov -f coverage.info diff --git a/CMakeLists.txt b/CMakeLists.txt index 73392b67..5f98699e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -75,9 +75,9 @@ option(ICHOR_ENABLE_INTERNAL_IO_DEBUGGING "Add verbose logging of Ichor I/O inte option(ICHOR_ENABLE_INTERNAL_STL_DEBUGGING "Add verbose logging of Ichor STL" OFF) option(ICHOR_BUILD_COVERAGE "Build ichor with coverage" OFF) option(ICHOR_USE_SPDLOG "Use spdlog as framework logging implementation" OFF) -option(ICHOR_USE_BOOST_BEAST "Add boost asio and boost BEAST as dependencies" OFF) cmake_dependent_option(ICHOR_USE_SANITIZERS "Enable sanitizers, catching potential errors but slowing down compilation and execution speed" $ICHOR_BUILDING_DEBUG "NOT ICHOR_MUSL" OFF) cmake_dependent_option(ICHOR_USE_THREAD_SANITIZER "Enable thread sanitizer, catching potential threading errors but slowing down compilation and execution speed. Cannot be combined with ICHOR_USE_SANITIZERS" OFF "NOT WIN32" OFF) +cmake_dependent_option(ICHOR_USE_BOOST_BEAST "Add boost asio and boost BEAST as dependencies" OFF "NOT ICHOR_USE_SANITIZERS AND NOT ICHOR_USE_THREAD_SANITIZER" OFF) option(ICHOR_USE_UGLY_HACK_EXCEPTION_CATCHING "Enable an ugly hack on gcc to enable debugging the point where exceptions are thrown. Useful for debugging boost asio/beast backtraces." OFF) option(ICHOR_REMOVE_SOURCE_NAMES "Remove compiling source file names and line numbers when logging." $ICHOR_BUILDING_DEBUG) cmake_dependent_option(ICHOR_USE_MOLD "Use mold when linking, recommended to use with gcc 12+ or clang" OFF "NOT WIN32" OFF) @@ -112,7 +112,8 @@ if(ICHOR_COMPILER_ID STREQUAL "clang" AND ICHOR_RUN_CLANG_TIDY) endif() set(FMT_SOURCES ${ICHOR_EXTERNAL_DIR}/fmt/src/format.cc ${ICHOR_EXTERNAL_DIR}/fmt/src/os.cc) -file(GLOB_RECURSE ICHOR_FRAMEWORK_SOURCES ${ICHOR_TOP_DIR}/src/ichor/*.cpp) +file(GLOB_RECURSE ICHOR_FRAMEWORK_SOURCES ${ICHOR_TOP_DIR}/src/ichor/coroutines/*.cpp ${ICHOR_TOP_DIR}/src/ichor/dependency_management/*.cpp ${ICHOR_TOP_DIR}/src/ichor/DependencyManager.cpp ${ICHOR_TOP_DIR}/src/ichor/LifecycleManager.cpp ${ICHOR_TOP_DIR}/src/ichor/Service.cpp) +set(ICHOR_FRAMEWORK_QUEUE_SOURCES ${ICHOR_TOP_DIR}/src/ichor/event_queues/PriorityQueue.cpp ${ICHOR_TOP_DIR}/src/ichor/event_queues/EventQueue.cpp) file(GLOB_RECURSE ICHOR_OPTIONAL_ETCD_SOURCES ${ICHOR_TOP_DIR}/src/services/etcd/*.cpp) file(GLOB_RECURSE ICHOR_LOGGING_SOURCES ${ICHOR_TOP_DIR}/src/services/logging/*.cpp) file(GLOB_RECURSE ICHOR_HTTP_SOURCES ${ICHOR_TOP_DIR}/src/services/network/http/*.cpp) @@ -121,15 +122,22 @@ file(GLOB_RECURSE ICHOR_METRICS_SOURCES ${ICHOR_TOP_DIR}/src/services/metrics/*. file(GLOB_RECURSE ICHOR_TIMER_SOURCES ${ICHOR_TOP_DIR}/src/services/timer/Timer.cpp ${ICHOR_TOP_DIR}/src/services/timer/TimerFactoryFactory.cpp) file(GLOB_RECURSE ICHOR_OPTIONAL_HIREDIS_SOURCES ${ICHOR_TOP_DIR}/src/services/redis/*.cpp) file(GLOB_RECURSE ICHOR_BASE64_SOURCES ${ICHOR_TOP_DIR}/src/base64/*.cpp) -file(GLOB_RECURSE ICHOR_STL_SOURCES ${ICHOR_TOP_DIR}/src/ichor/stl/Any.cpp ${ICHOR_TOP_DIR}/src/ichor/stl/LinuxUtils.cpp ${ICHOR_TOP_DIR}/src/ichor/stl/StringUtils.cpp) +file(GLOB_RECURSE ICHOR_STL_SOURCES ${ICHOR_TOP_DIR}/src/ichor/stl/Any.cpp ${ICHOR_TOP_DIR}/src/ichor/stl/AsyncSingleThreadedMutex.cpp ${ICHOR_TOP_DIR}/src/ichor/stl/LinuxUtils.cpp ${ICHOR_TOP_DIR}/src/ichor/stl/StringUtils.cpp) set(ICHOR_IO_SOURCES ${ICHOR_TOP_DIR}/src/services/io/SharedOverThreadsAsyncFileIO.cpp) set(ICHOR_TCP_SOURCES ${ICHOR_TOP_DIR}/src/services/network/tcp/TcpConnectionService.cpp ${ICHOR_TOP_DIR}/src/services/network/tcp/TcpHostService.cpp) if(ICHOR_USE_LIBURING) + set(ICHOR_FRAMEWORK_QUEUE_SOURCES ${ICHOR_FRAMEWORK_QUEUE_SOURCES} ${ICHOR_TOP_DIR}/src/ichor/event_queues/IOUringQueue.cpp) set(ICHOR_IO_SOURCES ${ICHOR_IO_SOURCES} ${ICHOR_TOP_DIR}/src/services/io/IOUringAsyncFileIO.cpp) set(ICHOR_TCP_SOURCES ${ICHOR_TCP_SOURCES} ${ICHOR_TOP_DIR}/src/services/network/tcp/IOUringTcpConnectionService.cpp ${ICHOR_TOP_DIR}/src/services/network/tcp/IOUringTcpHostService.cpp) set(ICHOR_TIMER_SOURCES ${ICHOR_TIMER_SOURCES} ${ICHOR_TOP_DIR}/src/services/timer/IOUringTimerFactoryFactory.cpp ${ICHOR_TOP_DIR}/src/services/timer/IOUringTimer.cpp) endif() +if(ICHOR_USE_SDEVENT) + set(ICHOR_FRAMEWORK_QUEUE_SOURCES ${ICHOR_FRAMEWORK_QUEUE_SOURCES} ${ICHOR_TOP_DIR}/src/ichor/event_queues/SdeventQueue.cpp) +endif() +if(ICHOR_USE_BOOST_BEAST) + set(ICHOR_FRAMEWORK_QUEUE_SOURCES ${ICHOR_FRAMEWORK_QUEUE_SOURCES} ${ICHOR_TOP_DIR}/src/ichor/event_queues/BoostAsioQueue.cpp) +endif() file(GLOB SPDLOG_SOURCES ${ICHOR_EXTERNAL_DIR}/spdlog/src/*.cpp) @@ -137,7 +145,7 @@ if(ICHOR_USE_MIMALLOC AND NOT ICHOR_USE_SYSTEM_MIMALLOC) set(ICHOR_FRAMEWORK_SOURCES ${ICHOR_FRAMEWORK_SOURCES} ${ICHOR_TOP_DIR}/external/mimalloc/src/static.c) endif() -add_library(ichor ${FMT_SOURCES} ${ICHOR_FRAMEWORK_SOURCES} ${ICHOR_LOGGING_SOURCES} ${ICHOR_TCP_SOURCES} ${ICHOR_HTTP_SOURCES} ${ICHOR_METRICS_SOURCES} ${ICHOR_TIMER_SOURCES} ${ICHOR_IO_SOURCES} ${ICHOR_BASE64_SOURCES} ${ICHOR_STL_SOURCES}) +add_library(ichor ${FMT_SOURCES} ${ICHOR_FRAMEWORK_SOURCES} ${ICHOR_FRAMEWORK_QUEUE_SOURCES} ${ICHOR_LOGGING_SOURCES} ${ICHOR_TCP_SOURCES} ${ICHOR_HTTP_SOURCES} ${ICHOR_METRICS_SOURCES} ${ICHOR_TIMER_SOURCES} ${ICHOR_IO_SOURCES} ${ICHOR_BASE64_SOURCES} ${ICHOR_STL_SOURCES}) if(ICHOR_ENABLE_INTERNAL_DEBUGGING) target_compile_definitions(ichor PUBLIC ICHOR_ENABLE_INTERNAL_DEBUGGING) @@ -450,7 +458,7 @@ endif() if(NOT WIN32 AND NOT ICHOR_BUILD_COVERAGE) # gcc uses gdwarf-4 by default, which messes up using the coz profiler, add "-gdwarf-3" if using coz set(CMAKE_CXX_FLAGS_DEBUG "-ggdb") - set(CMAKE_CXX_FLAGS_RELEASE "-O3 -DNDEBUG") + set(CMAKE_CXX_FLAGS_RELEASE "-O3 -ggdb3 -DNDEBUG") set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "-ggdb -O3 -DNDEBUG") set(CMAKE_CXX_FLAGS_MINSIZEREL "-Os -DNDEBUG") endif() diff --git a/Dockerfile b/Dockerfile index d53b7689..18c8c36c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,7 +14,7 @@ RUN update-alternatives --install /usr/bin/cpp cpp /usr/bin/cpp-12 60 WORKDIR /opt RUN wget https://github.com/redis/hiredis/archive/refs/tags/v1.2.0.tar.gz -RUN wget https://boostorg.jfrog.io/artifactory/main/release/1.83.0/source/boost_1_83_0.tar.bz2 +RUN wget https://archives.boost.io/release/1.83.0/source/boost_1_83_0.tar.bz2 RUN wget https://github.com/openssl/openssl/releases/download/openssl-3.0.15/openssl-3.0.15.tar.gz ENV CFLAGS="-O2 -fstack-protector-strong -fcf-protection -fstack-clash-protection -U_FORTIFY_SOURCE -D_FORTIFY_SOURCE=3" diff --git a/Dockerfile-asan b/Dockerfile-asan index cb28c490..8a83e6c8 100644 --- a/Dockerfile-asan +++ b/Dockerfile-asan @@ -14,7 +14,6 @@ RUN update-alternatives --install /usr/bin/cpp cpp /usr/bin/cpp-12 60 WORKDIR /opt RUN wget https://github.com/redis/hiredis/archive/refs/tags/v1.2.0.tar.gz -RUN wget https://boostorg.jfrog.io/artifactory/main/release/1.83.0/source/boost_1_83_0.tar.bz2 RUN wget https://github.com/openssl/openssl/releases/download/openssl-3.0.15/openssl-3.0.15.tar.gz ENV CFLAGS="-Og -fsanitize=address,undefined -fno-sanitize=vptr" @@ -27,16 +26,6 @@ RUN ./Configure --prefix=/usr --openssldir=/etc/ssl --libdir=lib no-shared RUN make -j RUN make -j install -WORKDIR /opt -#Build boost with support for asan -RUN tar xf boost_1_83_0.tar.bz2 - -WORKDIR /opt/boost_1_83_0 - -RUN ./bootstrap.sh --prefix=/usr -RUN ./b2 cxxflags="-fsanitize=address,undefined -fno-sanitize=vptr -Og -std=c++20 -DBOOST_USE_ASAN -DBOOST_USE_UCONTEXT" linkflags="-static-libasan -static-libgcc -static-libstdc++ -lubsan" variant=debug link=static threading=multi context-impl=ucontext -RUN ./b2 cxxflags="-fsanitize=address,undefined -fno-sanitize=vptr -Og -std=c++20 -DBOOST_USE_ASAN -DBOOST_USE_UCONTEXT" linkflags="-static-libasan -static-libgcc -static-libstdc++ -lubsan" variant=debug link=static threading=multi context-impl=ucontext install - WORKDIR /opt #Build latest hiredis containing sdevent support, not available yet in apt @@ -57,4 +46,4 @@ WORKDIR /opt/ichor/build ENTRYPOINT ["/bin/bash", "-c"] -CMD ["unset CFLAGS CXXFLAGS && cd /opt/ichor/build && cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DICHOR_USE_SANITIZERS=1 -DICHOR_USE_HIREDIS=1 -DICHOR_USE_BOOST_BEAST=1 -DICHOR_USE_SPDLOG=1 -DICHOR_USE_SDEVENT=1 -DICHOR_SKIP_EXTERNAL_TESTS=1 /opt/ichor/src && ninja && ninja test"] +CMD ["unset CFLAGS CXXFLAGS && cd /opt/ichor/build && cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DICHOR_USE_SANITIZERS=1 -DICHOR_USE_HIREDIS=1 -DICHOR_USE_SPDLOG=1 -DICHOR_USE_SDEVENT=1 -DICHOR_SKIP_EXTERNAL_TESTS=1 /opt/ichor/src && ninja && ninja test"] diff --git a/Dockerfile-asan-beast-deprecated b/Dockerfile-asan-beast-deprecated new file mode 100644 index 00000000..ce3c8234 --- /dev/null +++ b/Dockerfile-asan-beast-deprecated @@ -0,0 +1,98 @@ +# Boost is ridiculously time consuming to maintain. Every couple of releases, the API changes for common functions like spawn/post, the ucontext context-impl is supposed to work with address sanitizer but leads to compilation errrors because fcontext stuff is still being used and the various support channels to go to for support have resulted in 0 responses so far. Boost really is not a high quality library IMO and I personally recommend to stop using it. +# This file serves to show how to compile boost with sanitizer support, but is not being maintained and might not compile Ichor anymore. + +# linker issue is because net::spawn is used inside of a coroutine in HttpConnection. EtcdTests and other programs fail to link with the following message: +# /usr/bin/ld: /usr/lib/libboost_coroutine.a(coroutine_context.o): in function `boost::coroutines::detail::coroutine_context::coroutine_context(void (*)(boost::context::detail::transfer_t), boost::coroutines::detail::preallocated const&)': +#/opt/boost_1_85_0/libs/coroutine/src/detail/coroutine_context.cpp:41:(.text+0x1b3): undefined reference to `make_fcontext' +#/usr/bin/ld: /usr/lib/libboost_coroutine.a(coroutine_context.o): in function `boost::coroutines::detail::coroutine_context::jump(boost::coroutines::detail::coroutine_context&, void*)': +#/opt/boost_1_85_0/libs/coroutine/src/detail/coroutine_context.cpp:68:(.text+0x842): undefined reference to `jump_fcontext' + +FROM ubuntu:jammy + +ARG CONTAINER_OWNER_GID=1000 +ARG CONTAINER_OWNER_ID=1000 + +RUN apt update +RUN apt install -y g++-12 gcc-12 build-essential cmake pkg-config git wget ninja-build nano libzip-dev libsystemd-dev + +RUN update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-12 60 +RUN update-alternatives --install /usr/bin/g++ g++ /usr/bin/g++-12 60 +RUN update-alternatives --install /usr/bin/cpp cpp /usr/bin/cpp-12 60 + +# Run all downloads first to be able to use Docker's layers as cache and prevent excessive redownloads + +WORKDIR /opt +RUN wget https://github.com/redis/hiredis/archive/refs/tags/v1.2.0.tar.gz +RUN wget https://boostorg.jfrog.io/artifactory/main/release/1.85.0/source/boost_1_85_0.tar.bz2 +RUN wget https://github.com/openssl/openssl/releases/download/openssl-3.0.15/openssl-3.0.15.tar.gz + +ENV CFLAGS="-Og -fsanitize=address,undefined -fno-sanitize=vptr" +ENV CXXFLAGS="-Og -fsanitize=address,undefined -fno-sanitize=vptr" +ENV LDFLAGS="-fsanitize=address,undefined -static-libasan -static-libgcc -static-libstdc++" + +RUN tar xf openssl-3.0.15.tar.gz +WORKDIR /opt/openssl-3.0.15 +RUN ./Configure --prefix=/usr --openssldir=/etc/ssl --libdir=lib no-shared +RUN make -j +RUN make -j install + +WORKDIR /opt +#Build boost with support for asan +RUN tar xf boost_1_85_0.tar.bz2 + +WORKDIR /opt/boost_1_85_0 + +# Boost disappoints once again +RUN <> patch << EOR +diff -r -u boost-1.81.0/boost/asio/detail/std_fenced_block.hpp boost-1.81.0-fencecd/boost/asio/detail/std_fenced_block.hpp +--- ./boost/asio/detail/std_fenced_block.hpp 2022-12-14 16:15:35.000000000 +0100 ++++ ./boost/asio/detail/std_fenced_block.hpp 2023-10-03 20:17:11.701435674 +0200 +@@ -43,14 +43,16 @@ + // Constructor for a full fenced block. + explicit std_fenced_block(full_t) + { +- std::atomic_thread_fence(std::memory_order_acquire); ++ _fence.load(std::memory_order_acquire); + } + + // Destructor. + ~std_fenced_block() + { +- std::atomic_thread_fence(std::memory_order_release); ++ _fence.store(true, std::memory_order_release); + } ++ ++ std::atomic _fence; + }; + + } // namespace detail +EOR +EOF +RUN patch ./boost/asio/detail/std_fenced_block.hpp < patch + +RUN ./bootstrap.sh --prefix=/usr +RUN ./b2 cxxflags="-fsanitize=address,undefined -fno-sanitize=vptr -Og -std=c++20 -DBOOST_USE_ASAN -DBOOST_USE_UCONTEXT" linkflags="-static-libasan -static-libgcc -static-libstdc++ -lubsan" variant=debug link=static threading=multi context-impl=ucontext +RUN ./b2 cxxflags="-fsanitize=address,undefined -fno-sanitize=vptr -Og -std=c++20 -DBOOST_USE_ASAN -DBOOST_USE_UCONTEXT" linkflags="-static-libasan -static-libgcc -static-libstdc++ -lubsan" variant=debug link=static threading=multi context-impl=ucontext install + +WORKDIR /opt + +#Build latest hiredis containing sdevent support, not available yet in apt +RUN tar xf v1.2.0.tar.gz +RUN mkdir /opt/hiredis-1.2.0/build + +WORKDIR /opt/hiredis-1.2.0/build +RUN cmake -GNinja -DCMAKE_BUILD_TYPE=RelWithDebInfo -DDISABLE_TESTS=1 -DCMAKE_INSTALL_PREFIX=/usr -DBUILD_SHARED_LIBS=OFF -DENABLE_SSL=1 .. +RUN ninja && ninja install + +RUN mkdir -p /opt/ichor/build + +RUN groupadd -g ${CONTAINER_OWNER_GID} buildgroup && useradd -m -u ${CONTAINER_OWNER_ID} -g buildgroup builduser +RUN chown builduser:buildgroup /opt/ichor/build +USER builduser + +WORKDIR /opt/ichor/build + +ENTRYPOINT ["/bin/bash", "-c"] + +CMD ["unset CFLAGS CXXFLAGS && cd /opt/ichor/build && cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DICHOR_USE_SANITIZERS=1 -DICHOR_USE_HIREDIS=1 -DICHOR_USE_BOOST_BEAST=1 -DICHOR_USE_SPDLOG=1 -DICHOR_USE_SDEVENT=1 -DICHOR_SKIP_EXTERNAL_TESTS=1 /opt/ichor/src && ninja && ninja test"] diff --git a/Dockerfile-asan-clang b/Dockerfile-asan-clang index 29ba7dc3..d1b01e37 100644 --- a/Dockerfile-asan-clang +++ b/Dockerfile-asan-clang @@ -11,7 +11,6 @@ RUN apt install -y cmake pkg-config git ninja-build nano libzip-dev clang-18 lib WORKDIR /opt RUN wget https://github.com/redis/hiredis/archive/refs/tags/v1.2.0.tar.gz -RUN wget https://boostorg.jfrog.io/artifactory/main/release/1.83.0/source/boost_1_83_0.tar.bz2 RUN wget https://github.com/openssl/openssl/releases/download/openssl-3.0.15/openssl-3.0.15.tar.gz ENV CC="clang" @@ -29,18 +28,6 @@ RUN ./Configure --prefix=/usr --openssldir=/etc/ssl --libdir=lib no-shared RUN make -j RUN make -j install -WORKDIR /opt -#Build boost with support for asan -RUN tar xf boost_1_83_0.tar.bz2 - -WORKDIR /opt/boost_1_83_0 - -RUN ./bootstrap.sh --prefix=/usr --with-toolset=clang -RUN clang++ --version -RUN find /usr/lib -name "*-linux*" -RUN ./b2 cxxflags="-fsanitize=address,undefined -fno-sanitize=vptr -Og -std=c++17 -stdlib=libc++ -DBOOST_USE_ASAN -DBOOST_USE_UCONTEXT" linkflags="-stdlib=libc++ -static-libstdc++ -l:libc++abi.a -static-libsan -lubsan" variant=debug link=static threading=multi context-impl=ucontext -RUN ./b2 cxxflags="-fsanitize=address,undefined -fno-sanitize=vptr -Og -std=c++17 -stdlib=libc++ -DBOOST_USE_ASAN -DBOOST_USE_UCONTEXT" linkflags="-stdlib=libc++ -static-libstdc++ -l:libc++abi.a -static-libsan -lubsan" variant=debug link=static threading=multi context-impl=ucontext install - WORKDIR /opt #Build latest hiredis containing sdevent support, not available yet in apt @@ -61,4 +48,4 @@ WORKDIR /opt/ichor/build ENTRYPOINT ["/bin/bash", "-c"] -CMD ["unset CFLAGS CXXFLAGS && cd /opt/ichor/build && cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DICHOR_USE_SANITIZERS=1 -DICHOR_USE_HIREDIS=1 -DICHOR_USE_BOOST_BEAST=1 -DICHOR_USE_SPDLOG=1 -DICHOR_USE_SDEVENT=1 -DICHOR_SKIP_EXTERNAL_TESTS=1 /opt/ichor/src && ninja && ninja test"] +CMD ["unset CFLAGS CXXFLAGS && cd /opt/ichor/build && cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DICHOR_USE_SANITIZERS=1 -DICHOR_USE_HIREDIS=1 -DICHOR_USE_SPDLOG=1 -DICHOR_USE_SDEVENT=1 -DICHOR_SKIP_EXTERNAL_TESTS=1 /opt/ichor/src && ninja && ninja test"] diff --git a/Dockerfile-clang b/Dockerfile-clang index 6aa8f138..3b3738c0 100644 --- a/Dockerfile-clang +++ b/Dockerfile-clang @@ -10,7 +10,7 @@ RUN apt install -y cmake libssl-dev pkg-config git ninja-build nano libzip-dev c # Run all downloads first to be able to use Docker's layers as cache and prevent excessive redownloads WORKDIR /opt -RUN wget https://boostorg.jfrog.io/artifactory/main/release/1.83.0/source/boost_1_83_0.tar.bz2 +RUN wget https://archives.boost.io/release/1.83.0/source/boost_1_83_0.tar.bz2 RUN wget https://github.com/redis/hiredis/archive/refs/tags/v1.2.0.tar.gz ENV CC="clang" diff --git a/Dockerfile-musl b/Dockerfile-musl index b6e687d5..ea749f28 100644 --- a/Dockerfile-musl +++ b/Dockerfile-musl @@ -9,7 +9,7 @@ RUN apk add gcc g++ build-base cmake git wget ninja-build ninja-is-really-ninja # Run all downloads first to be able to use Docker's layers as cache and prevent excessive redownloads WORKDIR /opt -RUN wget https://boostorg.jfrog.io/artifactory/main/release/1.81.0/source/boost_1_81_0.tar.bz2 +RUN wget https://archives.boost.io/release/1.81.0/source/boost_1_81_0.tar.bz2 RUN wget https://github.com/redis/hiredis/archive/refs/tags/v1.2.0.tar.gz RUN wget https://github.com/openssl/openssl/releases/download/openssl-3.0.15/openssl-3.0.15.tar.gz diff --git a/Dockerfile-musl-aarch64 b/Dockerfile-musl-aarch64 index e1887838..2ae8b5b1 100644 --- a/Dockerfile-musl-aarch64 +++ b/Dockerfile-musl-aarch64 @@ -9,7 +9,7 @@ RUN apk add gcc g++ build-base cmake git wget ninja-build ninja-is-really-ninja # Run all downloads first to be able to use Docker's layers as cache and prevent excessive redownloads WORKDIR /opt -RUN wget https://boostorg.jfrog.io/artifactory/main/release/1.81.0/source/boost_1_81_0.tar.bz2 +RUN wget https://archives.boost.io/release/1.81.0/source/boost_1_81_0.tar.bz2 RUN wget https://github.com/redis/hiredis/archive/refs/tags/v1.2.0.tar.gz RUN wget https://github.com/openssl/openssl/releases/download/openssl-3.0.15/openssl-3.0.15.tar.gz diff --git a/Dockerfile-tsan b/Dockerfile-tsan index 2af0c8bf..7b8bb8f6 100644 --- a/Dockerfile-tsan +++ b/Dockerfile-tsan @@ -14,7 +14,6 @@ RUN update-alternatives --install /usr/bin/cpp cpp /usr/bin/cpp-12 60 WORKDIR /opt RUN wget https://github.com/redis/hiredis/archive/refs/tags/v1.2.0.tar.gz -RUN wget https://boostorg.jfrog.io/artifactory/main/release/1.81.0/source/boost_1_81_0.tar.bz2 RUN wget https://github.com/openssl/openssl/releases/download/openssl-3.0.15/openssl-3.0.15.tar.gz ENV CFLAGS="-Og -fsanitize=thread" @@ -29,45 +28,6 @@ RUN make -j install WORKDIR /opt -#Build boost with support for asan -RUN tar xf boost_1_81_0.tar.bz2 - -WORKDIR /opt/boost_1_81_0 - -RUN <> patch << EOR -diff -r -u boost-1.81.0/boost/asio/detail/std_fenced_block.hpp boost-1.81.0-fencecd/boost/asio/detail/std_fenced_block.hpp ---- ./boost/asio/detail/std_fenced_block.hpp 2022-12-14 16:15:35.000000000 +0100 -+++ ./boost/asio/detail/std_fenced_block.hpp 2023-10-03 20:17:11.701435674 +0200 -@@ -43,14 +43,16 @@ - // Constructor for a full fenced block. - explicit std_fenced_block(full_t) - { -- std::atomic_thread_fence(std::memory_order_acquire); -+ _fence.load(std::memory_order_acquire); - } - - // Destructor. - ~std_fenced_block() - { -- std::atomic_thread_fence(std::memory_order_release); -+ _fence.store(true, std::memory_order_release); - } -+ -+ std::atomic _fence; - }; - - } // namespace detail -EOR -EOF -RUN patch ./boost/asio/detail/std_fenced_block.hpp < patch - -RUN ./bootstrap.sh --prefix=/usr -RUN ./b2 cxxflags="-fsanitize=thread -Og -std=c++20 -DBOOST_USE_TSAN -DBOOST_USE_UCONTEXT" linkflags="-static-libtsan -static-libgcc -static-libstdc++" variant=debug link=static threading=multi context-impl=ucontext -RUN ./b2 cxxflags="-fsanitize=thread -Og -std=c++20 -DBOOST_USE_TSAN -DBOOST_USE_UCONTEXT" linkflags="-static-libtsan -static-libgcc -static-libstdc++" variant=debug link=static threading=multi context-impl=ucontext install - -WORKDIR /opt - #Build latest hiredis containing sdevent support, not available yet in apt RUN tar xf v1.2.0.tar.gz RUN mkdir /opt/hiredis-1.2.0/build @@ -86,4 +46,4 @@ WORKDIR /opt/ichor/build ENTRYPOINT ["/bin/bash", "-c"] -CMD ["unset CFLAGS CXXFLAGS && cd /opt/ichor/build && cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DICHOR_USE_SANITIZERS=0 -DICHOR_USE_MIMALLOC=0 -DICHOR_USE_THREAD_SANITIZER=1 -DICHOR_USE_HIREDIS=1 -DICHOR_USE_BOOST_BEAST=1 -DICHOR_USE_SPDLOG=1 -DICHOR_USE_BACKWARD=0 -DICHOR_USE_SDEVENT=1 -DICHOR_SKIP_EXTERNAL_TESTS=1 /opt/ichor/src && ninja && ninja test"] +CMD ["unset CFLAGS CXXFLAGS && cd /opt/ichor/build && cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DICHOR_USE_SANITIZERS=0 -DICHOR_USE_MIMALLOC=0 -DICHOR_USE_THREAD_SANITIZER=1 -DICHOR_USE_HIREDIS=1 -DICHOR_USE_SPDLOG=1 -DICHOR_USE_BACKWARD=0 -DICHOR_USE_SDEVENT=1 -DICHOR_SKIP_EXTERNAL_TESTS=1 /opt/ichor/src && ninja && ninja test"] diff --git a/build.sh b/build.sh index cf0ee50c..66ea5a5d 100755 --- a/build.sh +++ b/build.sh @@ -12,14 +12,14 @@ fi trap cleanup SIGINT SIGTERM POSITIONAL_ARGS=() -DOCKER=1 +DOCKER=0 DEV=0 GCC=0 while [[ $# -gt 0 ]]; do case $1 in - --no-docker) - DOCKER=0 + --docker) + DOCKER=1 shift # past value ;; --dev) @@ -74,21 +74,21 @@ if [[ $DOCKER -eq 1 ]]; then rm -rf ./* ../bin/* docker build -f ../Dockerfile-asan -t ichor-asan --build-arg CONTAINER_OWNER_GID=$(id -g) --build-arg CONTAINER_OWNER_ID=$(id -u) . || exit 1 docker run -v $(pwd)/../:/opt/ichor/src -v $(pwd)/../build:/opt/ichor/build --rm --privileged -it ichor-asan || exit 1 - run_examples 1 1 0 + run_examples 0 1 0 run_benchmarks rm -rf ./* ../bin/* docker build -f ../Dockerfile-asan-clang -t ichor-asan-clang --build-arg CONTAINER_OWNER_GID=$(id -g) --build-arg CONTAINER_OWNER_ID=$(id -u) . || exit 1 docker run -v $(pwd)/../:/opt/ichor/src -v $(pwd)/../build:/opt/ichor/build --rm --privileged -it ichor-asan-clang || exit 1 - docker run -v $(pwd)/../:/opt/ichor/src -v $(pwd)/../build:/opt/ichor/build --rm --privileged -it ichor-asan-clang "ninja" || exit 1 +# docker run -v $(pwd)/../:/opt/ichor/src -v $(pwd)/../build:/opt/ichor/build --rm --privileged -it ichor-asan-clang "ninja" || exit 1 ../bin/TcpTests - run_examples 1 1 0 + run_examples 0 1 0 run_benchmarks rm -rf ./* ../bin/* docker build -f ../Dockerfile-tsan -t ichor-tsan --build-arg CONTAINER_OWNER_GID=$(id -g) --build-arg CONTAINER_OWNER_ID=$(id -u) . || exit 1 docker run -v $(pwd)/../:/opt/ichor/src -v $(pwd)/../build:/opt/ichor/build --rm --privileged -it ichor-tsan || exit 1 - run_examples 1 1 0 + run_examples 0 1 0 run_benchmarks rm -rf ./* ../bin/* @@ -108,7 +108,7 @@ if [[ $DOCKER -eq 1 ]]; then # run_fast_benchmarks # this is still too slow, need to refactor benchmarks to specify iterations on command line rm -rf ./* ../bin/* - docker build -f ../Dockerfile-musl-aarch64-bench -t ichor-musl-aarch64-bench --build-arg CONTAINER_OWNER_GID=$(id -g) --build-arg CONTAINER_OWNER_ID=$(id -u) . || exit 1 + docker build -f ../Dockerfile-musl-aarch64-bench -t ichor-musl-aarch64-bench --build-arg CONTAINER_OWNER_GID=$(id -g) --build-arg CONTAINER_OWNER_ID=$(id -u) . --platform=linux/arm64 || exit 1 docker run -v $(pwd)/../:/opt/ichor/src -v $(pwd)/../build:/opt/ichor/build --rm -it ichor-musl-aarch64-bench || exit 1 rm -rf ../arm_bench mkdir -p ../arm_bench @@ -132,10 +132,10 @@ run_examples 0 1 1 for i in ${!ccompilers[@]}; do export LD_LIBRARY_PATH=${ldlibpath[i]} rm -rf ./* ../bin/* - CC=${ccompilers[i]} CXX=${cppcompilers[i]} cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DICHOR_USE_SANITIZERS=1 -DICHOR_ENABLE_INTERNAL_DEBUGGING=1 -DICHOR_ENABLE_INTERNAL_IO_DEBUGGING=1 -DICHOR_ENABLE_INTERNAL_STL_DEBUGGING=1 -DICHOR_ENABLE_INTERNAL_COROUTINE_DEBUGGING=1 -DICHOR_USE_MOLD=1 -DICHOR_USE_BOOST_BEAST=1 -DICHOR_USE_LIBCPP=0 -DICHOR_USE_HIREDIS=1 -DICHOR_USE_SDEVENT=1 .. || exit 1 + CC=${ccompilers[i]} CXX=${cppcompilers[i]} cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DICHOR_USE_SANITIZERS=1 -DICHOR_ENABLE_INTERNAL_DEBUGGING=1 -DICHOR_ENABLE_INTERNAL_IO_DEBUGGING=1 -DICHOR_ENABLE_INTERNAL_STL_DEBUGGING=1 -DICHOR_ENABLE_INTERNAL_COROUTINE_DEBUGGING=1 -DICHOR_USE_MOLD=1 -DICHOR_USE_BOOST_BEAST=0 -DICHOR_USE_LIBCPP=0 -DICHOR_USE_HIREDIS=1 -DICHOR_USE_SDEVENT=1 .. || exit 1 ninja || exit 1 ninja test || exit 1 - run_examples 1 1 1 + run_examples 0 1 1 run_benchmarks rm -rf ./* ../bin/* diff --git a/build_common.sh b/build_common.sh index 0167b91b..e5f349f3 100644 --- a/build_common.sh +++ b/build_common.sh @@ -21,9 +21,8 @@ run_examples () ../bin/ichor_yielding_timer_example || exit 1 if [[ $BOOST -eq 1 ]]; then ../bin/ichor_tcp_example || exit 1 - ../bin/ichor_http_example || exit 1 + ../bin/ichor_http_example_boost || exit 1 ../bin/ichor_websocket_example || exit 1 - ../bin/ichor_websocket_example -t4 || exit 1 ../bin/ichor_etcd_example || exit 1 fi if [[ URING -eq 1 ]]; then @@ -31,6 +30,7 @@ run_examples () ../bin/ichor_tcp_example_uring || exit 1 ../bin/ichor_timer_example_uring || exit 1 ../bin/ichor_yielding_timer_example_uring || exit 1 + ../bin/ichor_http_example_uring || exit 1 fi } run_benchmarks () diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index bb2e87c5..ddf8eba0 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -69,9 +69,9 @@ if(ICHOR_USE_BOOST_BEAST) target_link_libraries(ichor_websocket_example ichor) file(GLOB_RECURSE EXAMPLE_SOURCES ${ICHOR_TOP_DIR}/examples/http_example/*.cpp) - add_executable(ichor_http_example ${EXAMPLE_SOURCES}) - target_link_libraries(ichor_http_example ${CMAKE_THREAD_LIBS_INIT}) - target_link_libraries(ichor_http_example ichor) + add_executable(ichor_http_example_boost ${EXAMPLE_SOURCES}) + target_link_libraries(ichor_http_example_boost ${CMAKE_THREAD_LIBS_INIT}) + target_link_libraries(ichor_http_example_boost ichor) set(EXAMPLE_SOURCES ${ICHOR_TOP_DIR}/examples/http_ping_pong/ping.cpp) add_executable(ichor_ping_example ${EXAMPLE_SOURCES}) @@ -110,4 +110,10 @@ if(ICHOR_USE_LIBURING AND NOT (ICHOR_SKIP_EXTERNAL_TESTS AND ICHOR_AARCH64)) target_link_libraries(ichor_yielding_timer_example_uring ${CMAKE_THREAD_LIBS_INIT}) target_link_libraries(ichor_yielding_timer_example_uring ichor) target_compile_definitions(ichor_yielding_timer_example_uring PUBLIC URING_EXAMPLE) + + file(GLOB_RECURSE EXAMPLE_SOURCES ${ICHOR_TOP_DIR}/examples/http_example/*.cpp) + add_executable(ichor_http_example_uring ${EXAMPLE_SOURCES}) + target_link_libraries(ichor_http_example_uring ${CMAKE_THREAD_LIBS_INIT}) + target_link_libraries(ichor_http_example_uring ichor) + target_compile_definitions(ichor_http_example_uring PUBLIC URING_EXAMPLE) endif() diff --git a/examples/etcd_example/main.cpp b/examples/etcd_example/main.cpp index 309f2e18..4a9f765a 100644 --- a/examples/etcd_example/main.cpp +++ b/examples/etcd_example/main.cpp @@ -1,11 +1,10 @@ #include "UsingEtcdService.h" -#include +#include #include #include #include #include #include -#include #include #include @@ -31,7 +30,7 @@ int main(int argc, char *argv[]) { } auto start = std::chrono::steady_clock::now(); - auto queue = std::make_unique(); + auto queue = std::make_unique(); auto &dm = queue->createManager(); #ifdef ICHOR_USE_SPDLOG dm.createServiceManager(); @@ -40,7 +39,6 @@ int main(int argc, char *argv[]) { dm.createServiceManager, ILoggerFactory>(Properties{{"DefaultLogLevel", Ichor::make_any(LogLevel::LOG_INFO)}}); dm.createServiceManager(Properties{{"Address", Ichor::make_any("127.0.0.1")}, {"Port", Ichor::make_any(static_cast(2379))}, {"TimeoutMs", Ichor::make_any(1'000ul)}}); dm.createServiceManager(); - dm.createServiceManager(); dm.createServiceManager, IClientFactory>(); dm.createServiceManager(); queue->start(CaptureSigInt); diff --git a/examples/http_example/main.cpp b/examples/http_example/main.cpp index e12ce502..69e97914 100644 --- a/examples/http_example/main.cpp +++ b/examples/http_example/main.cpp @@ -1,11 +1,8 @@ #include "UsingHttpService.h" #include "../common/TestMsgGlazeSerializer.h" #include "../common/lyra.hpp" -#include #include #include -#include -#include #include #include #include @@ -23,6 +20,28 @@ #define LOGGER_TYPE CoutLogger #endif +#if defined(URING_EXAMPLE) +#include +#include +#include +#include +#include + +#define QIMPL IOUringQueue +#define CONNIMPL IOUringTcpConnectionService +#define HOSTIMPL IOUringTcpHostService +#define HTTPHOSTIMPL HttpHostService +#define HTTPCONNIMPL HttpConnectionService +#else +#include +#include +#include + +#define QIMPL BoostAsioQueue +#define HTTPHOSTIMPL Boost::HttpHostService +#define HTTPCONNIMPL Boost::HttpConnectionService +#endif + #include #include @@ -37,17 +56,13 @@ int main(int argc, char *argv[]) { } uint64_t verbosity{}; - uint64_t threads{1}; bool silent{}; - bool spinlock{}; bool showHelp{}; std::string address{"127.0.0.1"}; auto cli = lyra::help(showHelp) | lyra::opt(address, "address")["-a"]["--address"]("Address to bind to, e.g. 127.0.0.1") | lyra::opt([&verbosity](bool) { verbosity++; })["-v"]["--verbose"]("Increase logging for each -v").cardinality(0, 4) - | lyra::opt(threads, "threads")["-t"]["--threads"]("Number of threads to use for I/O, default: 1") - | lyra::opt(spinlock)["-p"]["--spinlock"]("Spinlock 10ms before going to sleep, improves latency in high workload cases at the expense of CPU usage") | lyra::opt(silent)["-s"]["--silent"]("No output"); auto result = cli.parse( { argc, argv } ); @@ -73,8 +88,14 @@ int main(int argc, char *argv[]) { } auto start = std::chrono::steady_clock::now(); - auto queue = std::make_unique(spinlock); + auto queue = std::make_unique(500); auto &dm = queue->createManager(); +#ifdef URING_EXAMPLE + if(!queue->createEventLoop()) { + fmt::println("Couldn't create io_uring event loop"); + return -1; + } +#endif #ifdef ICHOR_USE_SPDLOG dm.createServiceManager(); #endif @@ -88,12 +109,14 @@ int main(int argc, char *argv[]) { } // Create the JSON serializer for the TestMsg class dm.createServiceManager>(); - // Create the Event Thread for Boost.ASIO - dm.createServiceManager(Properties{{"Threads", Ichor::make_any(threads)}}); +#ifdef URING_EXAMPLE + dm.createServiceManager(Properties{{"Address", Ichor::make_any("127.0.0.1"s)}, {"Port", Ichor::make_any(static_cast(8001))}}); + dm.createServiceManager, IClientConnectionService>, IClientFactory>(); +#endif // Create the HTTP server binding to the given address - dm.createServiceManager(Properties{{"Address", Ichor::make_any(address)}, {"Port", Ichor::make_any(static_cast(8001))}}); + dm.createServiceManager(Properties{{"Address", Ichor::make_any(address)}, {"Port", Ichor::make_any(static_cast(8001))}}); // Setup a factory which creates HTTP clients for every class requesting an IHttpConnectionService - dm.createServiceManager, IClientFactory>(); + dm.createServiceManager, IClientFactory>(); // Create the class that we defined in this example, to setup a /test endpoint in the server, to request (and therefore create) an HTTP client and send a message to the /test endpoint using the serializer for TestMsg dm.createServiceManager(Properties{{"Address", Ichor::make_any(address)}, {"Port", Ichor::make_any(static_cast(8001))}}); queue->start(CaptureSigInt); diff --git a/examples/http_ping_pong/ping.cpp b/examples/http_ping_pong/ping.cpp index 0a2e2bda..06b01311 100644 --- a/examples/http_ping_pong/ping.cpp +++ b/examples/http_ping_pong/ping.cpp @@ -1,10 +1,9 @@ #include "PingService.h" #include "PingMsgJsonSerializer.h" #include "../common/lyra.hpp" -#include +#include #include #include -#include #include #include #include @@ -70,7 +69,7 @@ int main(int argc, char *argv[]) { } auto start = std::chrono::steady_clock::now(); - auto queue = std::make_unique(); + auto queue = std::make_unique(); auto &dm = queue->createManager(); #ifdef ICHOR_USE_SPDLOG @@ -88,7 +87,6 @@ int main(int argc, char *argv[]) { } dm.createServiceManager>(); - dm.createServiceManager(); dm.createServiceManager, IClientFactory>(); dm.createServiceManager(Properties{{"Address", Ichor::make_any(address)}, {"Port", Ichor::make_any(static_cast(8001))}, {"NoDelay", Ichor::make_any(true)}}); dm.createServiceManager(); diff --git a/examples/http_ping_pong/pong.cpp b/examples/http_ping_pong/pong.cpp index 8f58d887..e23418f1 100644 --- a/examples/http_ping_pong/pong.cpp +++ b/examples/http_ping_pong/pong.cpp @@ -1,10 +1,9 @@ #include "PongService.h" #include "PingMsgJsonSerializer.h" #include "../common/lyra.hpp" -#include +#include #include #include -#include #include #include #include @@ -36,7 +35,6 @@ int main(int argc, char *argv[]) { } uint64_t verbosity{}; - uint64_t threads{1}; bool silent{}; bool spinlock{}; bool showHelp{}; @@ -45,7 +43,6 @@ int main(int argc, char *argv[]) { auto cli = lyra::help(showHelp) | lyra::opt(address, "address")["-a"]["--address"]("Address to bind to, e.g. 127.0.0.1") | lyra::opt([&verbosity](bool) { verbosity++; })["-v"]["--verbose"]("Increase logging for each -v").cardinality(0, 4) - | lyra::opt(threads, "threads")["-t"]["--threads"]("Number of threads to use for I/O, default: 1") | lyra::opt(spinlock)["-p"]["--spinlock"]("Spinlock 10ms before going to sleep, improves latency in high workload cases at the expense of CPU usage") | lyra::opt(silent)["-s"]["--silent"]("No output"); @@ -72,7 +69,7 @@ int main(int argc, char *argv[]) { } auto start = std::chrono::steady_clock::now(); - auto queue = std::make_unique(spinlock); + auto queue = std::make_unique(); auto &dm = queue->createManager(); #ifdef ICHOR_USE_SPDLOG @@ -90,7 +87,6 @@ int main(int argc, char *argv[]) { } dm.createServiceManager>(); - dm.createServiceManager(Properties{{"Threads", Ichor::make_any(threads)}}); dm.createServiceManager(Properties{{"Address", Ichor::make_any(address)}, {"Port", Ichor::make_any(static_cast(8001))}, {"NoDelay", Ichor::make_any(true)}}); dm.createServiceManager(); queue->start(CaptureSigInt); diff --git a/examples/websocket_example/main.cpp b/examples/websocket_example/main.cpp index acc08e4d..6ba5b4a3 100644 --- a/examples/websocket_example/main.cpp +++ b/examples/websocket_example/main.cpp @@ -2,10 +2,7 @@ #include "../common/TestMsgGlazeSerializer.h" #include "../common/lyra.hpp" #include -#include #include -#include -#include #include #include #include @@ -23,6 +20,26 @@ #define LOGGER_TYPE CoutLogger #endif +#if defined(URING_EXAMPLE) +#include +#include +#include + +#define QIMPL IOUringQueue +#define CONNIMPL IOUringTcpConnectionService +#define HOSTIMPL IOUringTcpHostService +#define WSHOSTIMPL +#define WSCONNIMPL +#else +#include +#include +#include + +#define QIMPL BoostAsioQueue +#define WSHOSTIMPL Boost::WsHostService +#define WSCONNIMPL Boost::WsConnectionService +#endif + #include #include @@ -37,17 +54,13 @@ int main(int argc, char *argv[]) { } uint64_t verbosity{}; - uint64_t threads{1}; bool silent{}; - bool spinlock{}; bool showHelp{}; std::string address{"127.0.0.1"}; auto cli = lyra::help(showHelp) | lyra::opt(address, "address")["-a"]["--address"]("Address to bind to, e.g. 127.0.0.1") | lyra::opt([&verbosity](bool) { verbosity++; })["-v"]["--verbose"]("Increase logging for each -v").cardinality(0, 4) - | lyra::opt(threads, "threads")["-t"]["--threads"]("Number of threads to use for I/O, default: 1") - | lyra::opt(spinlock)["-p"]["--spinlock"]("Spinlock 10ms before going to sleep, improves latency in high workload cases at the expense of CPU usage") | lyra::opt(silent)["-s"]["--silent"]("No output"); auto result = cli.parse( { argc, argv } ); @@ -73,8 +86,14 @@ int main(int argc, char *argv[]) { uint64_t priorityToEnsureHostStartingFirst = 51; auto start = std::chrono::steady_clock::now(); - auto queue = std::make_unique(spinlock); + auto queue = std::make_unique(500); auto &dm = queue->createManager(); +#ifdef URING_EXAMPLE + if(!queue->createEventLoop()) { + fmt::println("Couldn't create io_uring event loop"); + return -1; + } +#endif #ifdef ICHOR_USE_SPDLOG dm.createServiceManager(Properties{}, priorityToEnsureHostStartingFirst); #endif @@ -87,9 +106,12 @@ int main(int argc, char *argv[]) { dm.createServiceManager, ILoggerFactory>(Properties{{"DefaultLogLevel", Ichor::make_any(level)}}, priorityToEnsureHostStartingFirst); } dm.createServiceManager>(); - dm.createServiceManager(Properties{{"Threads", Ichor::make_any(threads)}}); - dm.createServiceManager(Properties{{"Address", Ichor::make_any(address)}, {"Port", Ichor::make_any(static_cast(8001))}}, priorityToEnsureHostStartingFirst); - dm.createServiceManager>, IClientFactory>(); +#ifdef URING_EXAMPLE + dm.createServiceManager(Properties{{"Address", Ichor::make_any("127.0.0.1"s)}, {"Port", Ichor::make_any(static_cast(8001))}}); + dm.createServiceManager, IClientConnectionService>, IClientFactory>(); +#endif + dm.createServiceManager(Properties{{"Address", Ichor::make_any(address)}, {"Port", Ichor::make_any(static_cast(8001))}}, priorityToEnsureHostStartingFirst); + dm.createServiceManager>, IClientFactory>(); dm.createServiceManager(Properties{{"Address", Ichor::make_any(address)}, {"Port", Ichor::make_any(static_cast(8001))}}); queue->start(CaptureSigInt); auto end = std::chrono::steady_clock::now(); diff --git a/include/ichor/event_queues/BoostAsioQueue.h b/include/ichor/event_queues/BoostAsioQueue.h new file mode 100644 index 00000000..c6def670 --- /dev/null +++ b/include/ichor/event_queues/BoostAsioQueue.h @@ -0,0 +1,42 @@ +#pragma once + +#ifndef ICHOR_USE_BOOST_BEAST +#error "Boost not enabled." +#endif + +#include +#include +#include + +namespace Ichor { + class BoostAsioQueue final : public IBoostAsioQueue { + public: + BoostAsioQueue(); + explicit BoostAsioQueue(uint64_t unused); + ~BoostAsioQueue() final; + + void pushEventInternal(uint64_t priority, std::unique_ptr &&event) final; + + [[nodiscard]] bool empty() const final; + [[nodiscard]] uint64_t size() const final; + [[nodiscard]] bool is_running() const noexcept final; + [[nodiscard]] NameHashType get_queue_name_hash() const noexcept final; + + bool start(bool captureSigInt) final; + [[nodiscard]] bool shouldQuit() final; + void quit() final; + + net::io_context& getContext() noexcept final; + [[nodiscard]] bool fibersShouldStop() const noexcept final; + + private: + void shouldAddQuitEvent(); + + net::io_context _context; + std::atomic _quit{}; + std::thread::id _threadId{}; + std::chrono::steady_clock::time_point _whenQuitEventWasSent{}; + std::atomic _quitEventSent{false}; + uint64_t _quitTimeoutMs{}; + }; +} diff --git a/include/ichor/event_queues/IBoostAsioQueue.h b/include/ichor/event_queues/IBoostAsioQueue.h new file mode 100644 index 00000000..04470419 --- /dev/null +++ b/include/ichor/event_queues/IBoostAsioQueue.h @@ -0,0 +1,25 @@ +#pragma once + +#ifndef ICHOR_USE_BOOST_BEAST +#error "Boost not enabled." +#endif + +#if BOOST_VERSION >= 108000 +#define ASIO_SPAWN_COMPLETION_TOKEN , [](std::exception_ptr e) { if (e) std::rethrow_exception(e); } +#else +#define ASIO_SPAWN_COMPLETION_TOKEN +#endif + +#include +#include + +namespace net = boost::asio; // from + +namespace Ichor { + class IBoostAsioQueue : public IEventQueue { + public: + ~IBoostAsioQueue() override = default; + virtual net::io_context& getContext() noexcept = 0; + [[nodiscard]] virtual bool fibersShouldStop() const noexcept = 0; + }; +} diff --git a/include/ichor/event_queues/IOUringQueue.h b/include/ichor/event_queues/IOUringQueue.h index 3aa7c499..b0600b70 100644 --- a/include/ichor/event_queues/IOUringQueue.h +++ b/include/ichor/event_queues/IOUringQueue.h @@ -4,7 +4,6 @@ #error "Ichor has not been compiled with io_uring support" #endif -#include #include #include #include @@ -12,7 +11,7 @@ #include namespace Ichor { - /// Provides an io_uring based queue, expects the running OS to have at least kernel 5.4 + /// Provides an io_uring based queue, expects the running OS to have at least kernel 5.4, but multithreading support is only available from 5.18 and later. class IOUringQueue final : public IIOUringQueue { public: IOUringQueue(uint64_t quitTimeoutMs = 5'000, long long pollTimeoutNs = 100'000'000, tl::optional emulateKernelVersion = {}); diff --git a/include/ichor/interfaces/IFrameworkLogger.h b/include/ichor/interfaces/IFrameworkLogger.h index 37517e36..658f1d50 100644 --- a/include/ichor/interfaces/IFrameworkLogger.h +++ b/include/ichor/interfaces/IFrameworkLogger.h @@ -36,12 +36,6 @@ namespace Ichor { #define ICHOR_LOG_WARN(logger, str, ...) { if(logger != nullptr) logger->warn(__FILE__, __LINE__, static_cast(__FUNCTION__), str, make_args(__VA_ARGS__)); }; static_assert(true, "") #define ICHOR_LOG_ERROR(logger, str, ...) { if(logger != nullptr) logger->error(__FILE__, __LINE__, static_cast(__FUNCTION__), str, make_args(__VA_ARGS__)); }; static_assert(true, "") -#define ICHOR_LOG_TRACE_ATOMIC(logger, str, ...) { auto *l = logger.load(std::memory_order_acquire); if(l != nullptr) l->trace(__FILE__, __LINE__, static_cast(__FUNCTION__), str, make_args(__VA_ARGS__)); }; static_assert(true, "") -#define ICHOR_LOG_DEBUG_ATOMIC(logger, str, ...) { auto *l = logger.load(std::memory_order_acquire); if(l != nullptr) l->debug(__FILE__, __LINE__, static_cast(__FUNCTION__), str, make_args(__VA_ARGS__)); }; static_assert(true, "") -#define ICHOR_LOG_INFO_ATOMIC(logger, str, ...) { auto *l = logger.load(std::memory_order_acquire); if(l != nullptr) l->info(__FILE__, __LINE__, static_cast(__FUNCTION__), str, make_args(__VA_ARGS__)); }; static_assert(true, "") -#define ICHOR_LOG_WARN_ATOMIC(logger, str, ...) { auto *l = logger.load(std::memory_order_acquire); if(l != nullptr) l->warn(__FILE__, __LINE__, static_cast(__FUNCTION__), str, make_args(__VA_ARGS__)); }; static_assert(true, "") -#define ICHOR_LOG_ERROR_ATOMIC(logger, str, ...) { auto *l = logger.load(std::memory_order_acquire); if(l != nullptr) l->error(__FILE__, __LINE__, static_cast(__FUNCTION__), str, make_args(__VA_ARGS__)); }; static_assert(true, "") - #define ICHOR_EMERGENCY_LOG1(logger, str) { if(logger != nullptr) { logger->error(__FILE__, __LINE__, static_cast(__FUNCTION__), str, make_args()); } const char *base = Ichor::basename(__FILE__); fmt::print("[{}:{}] ", base, __LINE__); fmt::println(str); }; static_assert(true, "") #define ICHOR_EMERGENCY_LOG2(logger, str, ...) { if(logger != nullptr) { logger->error(__FILE__, __LINE__, static_cast(__FUNCTION__), str, make_args(__VA_ARGS__)); } const char *base = Ichor::basename(__FILE__); fmt::print("[{}:{}] ", base, __LINE__); fmt::println(str, __VA_ARGS__); }; static_assert(true, "") #else @@ -51,12 +45,6 @@ namespace Ichor { #define ICHOR_LOG_WARN(logger, str, ...) { if(logger != nullptr) logger->warn(nullptr, 0, nullptr, str, make_args(__VA_ARGS__)); }; static_assert(true, "") #define ICHOR_LOG_ERROR(logger, str, ...) { if(logger != nullptr) logger->error(nullptr, 0, nullptr, str, make_args(__VA_ARGS__)); }; static_assert(true, "") -#define ICHOR_LOG_TRACE_ATOMIC(logger, str, ...) { auto *l = logger.load(std::memory_order_acquire); if(l != nullptr) l->trace(nullptr, 0, nullptr, str, make_args(__VA_ARGS__)); }; static_assert(true, "") -#define ICHOR_LOG_DEBUG_ATOMIC(logger, str, ...) { auto *l = logger.load(std::memory_order_acquire); if(l != nullptr) l->debug(nullptr, 0, nullptr, str, make_args(__VA_ARGS__)); }; static_assert(true, "") -#define ICHOR_LOG_INFO_ATOMIC(logger, str, ...) { auto *l = logger.load(std::memory_order_acquire); if(l != nullptr) l->info(nullptr, 0, nullptr, str, make_args(__VA_ARGS__)); }; static_assert(true, "") -#define ICHOR_LOG_WARN_ATOMIC(logger, str, ...) { auto *l = logger.load(std::memory_order_acquire); if(l != nullptr) l->warn(nullptr, 0, nullptr, str, make_args(__VA_ARGS__)); }; static_assert(true, "") -#define ICHOR_LOG_ERROR_ATOMIC(logger, str, ...) { auto *l = logger.load(std::memory_order_acquire); if(l != nullptr) l->error(nullptr, 0, nullptr, str, make_args(__VA_ARGS__)); }; static_assert(true, "") - #define ICHOR_EMERGENCY_LOG1(logger, str, ...) { if(logger != nullptr) { logger->error(nullptr, 0, nullptr, str, make_args()); } fmt::println(str); }; static_assert(true, "") #define ICHOR_EMERGENCY_LOG2(logger, str, ...) { if(logger != nullptr) { logger->error(nullptr, 0, nullptr, str, make_args(__VA_ARGS__)); } fmt::println(str, __VA_ARGS__); }; static_assert(true, "") #endif diff --git a/include/ichor/services/network/boost/AsioContextService.h b/include/ichor/services/network/boost/AsioContextService.h deleted file mode 100644 index f88052b9..00000000 --- a/include/ichor/services/network/boost/AsioContextService.h +++ /dev/null @@ -1,61 +0,0 @@ -#pragma once - -#ifdef ICHOR_USE_BOOST_BEAST - -#include -#include -#include -#include -#include -#include -#include - -#if BOOST_VERSION >= 108000 -#define ASIO_SPAWN_COMPLETION_TOKEN , [](std::exception_ptr e) { if (e) std::rethrow_exception(e); } -#else -#define ASIO_SPAWN_COMPLETION_TOKEN -#endif -namespace beast = boost::beast; // from -namespace http = beast::http; // from -namespace net = boost::asio; // from -using tcp = boost::asio::ip::tcp; // from - -namespace Ichor::Boost { - class IAsioContextService { - public: - virtual net::io_context* getContext() noexcept = 0; - virtual bool fibersShouldStop() const noexcept = 0; - virtual uint64_t threadCount() const noexcept = 0; - - protected: - ~IAsioContextService() = default; - }; - - class AsioContextService final : public IAsioContextService, public AdvancedService { - public: - AsioContextService(DependencyRegister ®, Properties props); - ~AsioContextService() final; - - net::io_context* getContext() noexcept final; - bool fibersShouldStop() const noexcept final; - uint64_t threadCount() const noexcept final; - - private: - Task> start() final; - Task stop() final; - - void addDependencyInstance(ILogger &logger, IService &isvc); - void removeDependencyInstance(ILogger &logger, IService &isvc); - - friend DependencyRegister; - - std::unique_ptr _context{}; - std::vector _asioThreads{}; - std::atomic _quit{}; - uint64_t _threads{1}; - ILogger *_logger{}; - AsyncManualResetEvent _startStopEvent{}; - }; -} - -#endif diff --git a/include/ichor/services/network/boost/HttpConnectionService.h b/include/ichor/services/network/boost/HttpConnectionService.h index 6669cbf7..c05a222a 100644 --- a/include/ichor/services/network/boost/HttpConnectionService.h +++ b/include/ichor/services/network/boost/HttpConnectionService.h @@ -1,10 +1,9 @@ #pragma once +#include #include -#include #include #include -#include #include #include #include @@ -28,7 +27,7 @@ namespace Ichor::Boost { } /** - * Service for connecting to an HTTP/1.1 server using boost. Requires an IAsioContextService and a logger. + * Service for connecting to an HTTP/1.1 server using boost. Requires an IBoostAsioQueue and a logger. * * Properties: * - "Address" std::string - What address to connect to (required) @@ -59,8 +58,8 @@ namespace Ichor::Boost { void addDependencyInstance(ILogger &logger, IService &); void removeDependencyInstance(ILogger &logger, IService&); - void addDependencyInstance(IAsioContextService &logger, IService&); - void removeDependencyInstance(IAsioContextService &logger, IService&); + void addDependencyInstance(IBoostAsioQueue &q, IService&); + void removeDependencyInstance(IBoostAsioQueue &q, IService&); void fail(beast::error_code, char const* what); void connect(tcp::endpoint endpoint, net::yield_context yield); @@ -70,19 +69,17 @@ namespace Ichor::Boost { std::unique_ptr _httpStream{}; std::unique_ptr> _sslStream{}; std::unique_ptr _sslContext{}; - std::atomic _priority{INTERNAL_EVENT_PRIORITY}; - std::atomic _quit{}; - std::atomic _connecting{}; - std::atomic _connected{}; - std::atomic _tcpNoDelay{}; - std::atomic _useSsl{}; + uint64_t _priority{INTERNAL_EVENT_PRIORITY}; + bool _quit{}; + bool _connecting{}; + bool _connected{}; + bool _tcpNoDelay{}; + bool _useSsl{}; std::atomic _finishedListenAndRead{}; - std::atomic _logger{}; - IAsioContextService *_asioContextService{}; + ILogger* _logger{}; boost::circular_buffer _outbox{10}; - RealtimeMutex _outboxMutex{}; AsyncManualResetEvent _startStopEvent{}; - IEventQueue *_queue; + IBoostAsioQueue *_queue; bool _debug{}; uint64_t _tryConnectIntervalMs{100}; uint64_t _timeoutMs{10'000}; diff --git a/include/ichor/services/network/boost/HttpHostService.h b/include/ichor/services/network/boost/HttpHostService.h index f1d1d440..a23db7cf 100644 --- a/include/ichor/services/network/boost/HttpHostService.h +++ b/include/ichor/services/network/boost/HttpHostService.h @@ -1,10 +1,9 @@ #pragma once +#include #include -#include #include #include -#include #include #include #include @@ -27,12 +26,11 @@ namespace Ichor::Boost { Connection(tcp::socket &&_socket, net::ssl::context &ctx) : socket(std::move(_socket), ctx) {} SocketT socket; boost::circular_buffer outbox{10}; - RealtimeMutex mutex{}; }; } /** - * Service for creating an HTTP/1.1 server using boost. Requires an IAsioContextService and a logger. + * Service for creating an HTTP/1.1 server using boost. Requires an IBoostAsioQueue and a logger. * * Properties: * - "Address" std::string - What address to bind to (required) @@ -63,8 +61,8 @@ namespace Ichor::Boost { void addDependencyInstance(ILogger &logger, IService &isvc); void removeDependencyInstance(ILogger &logger, IService &isvc); - void addDependencyInstance(IAsioContextService &logger, IService&); - void removeDependencyInstance(IAsioContextService &logger, IService&); + void addDependencyInstance(IBoostAsioQueue &q, IService&); + void removeDependencyInstance(IBoostAsioQueue &q, IService&); void fail(beast::error_code, char const* what, bool stopSelf); void listen(tcp::endpoint endpoint, net::yield_context yield); @@ -81,21 +79,19 @@ namespace Ichor::Boost { unordered_map>> _httpStreams{}; unordered_map>>> _sslStreams{}; std::unique_ptr _sslContext{}; - RealtimeMutex _streamsMutex{}; - std::atomic _priority{INTERNAL_EVENT_PRIORITY}; - std::atomic _quit{}; - std::atomic _goingToCleanupStream{}; + uint64_t _priority{INTERNAL_EVENT_PRIORITY}; + bool _quit{}; + bool _goingToCleanupStream{}; std::atomic _finishedListenAndRead{}; - std::atomic _tcpNoDelay{}; - std::atomic _useSsl{}; + bool _tcpNoDelay{}; + bool _useSsl{}; uint64_t _streamIdCounter{}; uint64_t _matchersIdCounter{}; bool _sendServerHeader{true}; bool _debug{}; - std::atomic _logger{}; - IAsioContextService *_asioContextService{}; + ILogger* _logger{}; unordered_map, std::function(HttpRequest&)>>> _handlers{}; AsyncManualResetEvent _startStopEvent{}; - IEventQueue *_queue; + IBoostAsioQueue *_queue{}; }; } diff --git a/include/ichor/services/network/boost/WsConnectionService.h b/include/ichor/services/network/boost/WsConnectionService.h index e5ed0001..f2f9856a 100644 --- a/include/ichor/services/network/boost/WsConnectionService.h +++ b/include/ichor/services/network/boost/WsConnectionService.h @@ -1,12 +1,10 @@ #pragma once +#include #include #include -#include #include #include -#include -#include #include #include #include @@ -50,8 +48,8 @@ namespace Ichor::Boost { void addDependencyInstance(IHostService&, IService &isvc); void removeDependencyInstance(IHostService&, IService &isvc); - void addDependencyInstance(IAsioContextService &logger, IService&); - void removeDependencyInstance(IAsioContextService &logger, IService&); + void addDependencyInstance(IBoostAsioQueue &q, IService&); + void removeDependencyInstance(IBoostAsioQueue &q, IService&); friend DependencyRegister; friend DependencyManager; @@ -62,17 +60,15 @@ namespace Ichor::Boost { void read(net::yield_context &yield); std::shared_ptr> _ws{}; - std::atomic _priority{}; - std::atomic _connected{}; - std::atomic _quit{}; + uint64_t _priority{}; + bool _connected{}; + bool _quit{}; ILogger *_logger{}; - IAsioContextService *_asioContextService{}; + IBoostAsioQueue *_queue{}; std::unique_ptr> _strand{}; std::atomic _finishedListenAndRead{}; AsyncManualResetEvent _startStopEvent{}; boost::circular_buffer _outbox{10}; - RealtimeMutex _outboxMutex{}; - IEventQueue *_queue; std::vector> _queuedMessages{}; std::function)> _recvHandler; }; diff --git a/include/ichor/services/network/boost/WsHostService.h b/include/ichor/services/network/boost/WsHostService.h index 9bbe45fd..dff08cb2 100644 --- a/include/ichor/services/network/boost/WsHostService.h +++ b/include/ichor/services/network/boost/WsHostService.h @@ -1,9 +1,9 @@ #pragma once +#include #include #include #include -#include #include #include #include @@ -30,8 +30,8 @@ namespace Ichor::Boost { void addDependencyInstance(ILogger &logger, IService &isvc); void removeDependencyInstance(ILogger &logger, IService &isvc); - void addDependencyInstance(IAsioContextService &logger, IService&); - void removeDependencyInstance(IAsioContextService &logger, IService&); + void addDependencyInstance(IBoostAsioQueue &q, IService&); + void removeDependencyInstance(IBoostAsioQueue &q, IService&); AsyncGenerator handleEvent(NewWsConnectionEvent const &evt); @@ -44,14 +44,13 @@ namespace Ichor::Boost { std::unique_ptr _wsAcceptor{}; std::unique_ptr> _strand{}; uint64_t _priority{INTERNAL_EVENT_PRIORITY}; - std::atomic _quit{}; - std::atomic _tcpNoDelay{}; + bool _quit{}; + bool _tcpNoDelay{}; std::atomic _finishedListenAndRead{}; ILogger *_logger{}; - IAsioContextService *_asioContextService{}; std::vector _connections{}; EventHandlerRegistration _eventRegistration{}; AsyncManualResetEvent _startStopEvent{}; - IEventQueue *_queue{}; + IBoostAsioQueue *_queue{}; }; } diff --git a/quickbuild.sh b/quickbuild.sh index 436f9fa6..f32f281c 100755 --- a/quickbuild.sh +++ b/quickbuild.sh @@ -23,12 +23,14 @@ CXXCOMP=clang++-19 BOOST=0 SPDLOG=0 HIREDIS=0 +LD_PATH= while [[ $# -gt 0 ]]; do case $1 in --gcc) CCOMP=gcc CXXCOMP=g++ + LD_PATH="/opt/gcc/14/lib64/" shift # past value ;; --oldgcc) @@ -65,7 +67,7 @@ while [[ $# -gt 0 ]]; do DEBUG=1 shift # past value ;; - --run-examples) + --examples) RUN_EXAMPLES=1 shift # past value ;; @@ -94,6 +96,7 @@ done set -- "${POSITIONAL_ARGS[@]}" # restore positional parameters +export LD_LIBRARY_PATH=$LD_PATH:$LD_LIBRARY_PATH rm -rf ./* rm -rf ../bin/* diff --git a/src/ichor/event_queues/BoostAsioQueue.cpp b/src/ichor/event_queues/BoostAsioQueue.cpp new file mode 100644 index 00000000..ec8bb1ec --- /dev/null +++ b/src/ichor/event_queues/BoostAsioQueue.cpp @@ -0,0 +1,158 @@ +#ifndef ICHOR_USE_BOOST_BEAST +#error "Boost not enabled." +#endif + +#include +#include +#include +#include + +namespace Ichor::Detail { + extern std::atomic sigintQuit; + extern std::atomic registeredSignalHandler; + void on_sigint([[maybe_unused]] int sig); +} + +Ichor::BoostAsioQueue::BoostAsioQueue() : _context(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE_IO) { + _threadId = std::this_thread::get_id(); // re-set in functions below, because adding events when the queue isn't running yet cannot be done from another thread. +} + +Ichor::BoostAsioQueue::BoostAsioQueue(uint64_t unused) : _context(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE_IO) { + _threadId = std::this_thread::get_id(); // re-set in functions below, because adding events when the queue isn't running yet cannot be done from another thread. +} + +Ichor::BoostAsioQueue::~BoostAsioQueue() { + if(_dm) { + stopDm(); + } + + if(Detail::registeredSignalHandler) { + if(::signal(SIGINT, SIG_DFL) == SIG_ERR) { + // fmt::println("Couldn't unset signal handler"); + } + } +} + +net::io_context& Ichor::BoostAsioQueue::getContext() noexcept { + return _context; +} + +bool Ichor::BoostAsioQueue::fibersShouldStop() const noexcept { + return _quit.load(std::memory_order_acquire); +} + +bool Ichor::BoostAsioQueue::empty() const { + return !_dm || !_dm->isRunning(); +} + +uint64_t Ichor::BoostAsioQueue::size() const { + return (_dm && _dm->isRunning()) ? 1 : 0; +} + +bool Ichor::BoostAsioQueue::is_running() const noexcept { + return !_quit.load(std::memory_order_acquire); +} + +Ichor::NameHashType Ichor::BoostAsioQueue::get_queue_name_hash() const noexcept { + return typeNameHash(); +} + + +void Ichor::BoostAsioQueue::pushEventInternal(uint64_t priority, std::unique_ptr &&event) { + if(!event) [[unlikely]] { + fmt::println("Pushing nullptr"); + std::terminate(); + } + + if(shouldQuit()) { + //fmt::println("pushEventInternal, should quit! not inserting {}", event->get_name()); + return; + } + + // TODO hardening + //#ifdef ICHOR_USE_HARDENING + // if(originatingServiceId != 0 && _services.find(originatingServiceId) == _services.end()) [[unlikely]] { + // std::terminate(); + // } + //#endif + + fmt::println("pushing {}:{}", event->id, event->get_name()); + net::post(_context, [this, event = std::move(event)]() mutable { + if constexpr (DO_INTERNAL_DEBUG) { + if(std::this_thread::get_id() != _threadId) { + fmt::println("running on wrong thread"); + std::terminate(); + } + } + // fmt::println("processing {}:{}", event->id, event->get_name()); + processEvent(event); + shouldAddQuitEvent(); + }); +} + +bool Ichor::BoostAsioQueue::shouldQuit() { + if(_quitEventSent.load(std::memory_order_acquire) && std::chrono::steady_clock::now() - _whenQuitEventWasSent >= std::chrono::milliseconds(_quitTimeoutMs)) [[unlikely]] { + _quit.store(true, std::memory_order_release); + } + + return _quit.load(std::memory_order_acquire); +} + +void Ichor::BoostAsioQueue::quit() { + _quit.store(true, std::memory_order_release); +} + +void Ichor::BoostAsioQueue::shouldAddQuitEvent() { + bool const shouldQuit = Detail::sigintQuit.load(std::memory_order_acquire); + + if(shouldQuit && !_quitEventSent.load(std::memory_order_acquire)) { + pushEventInternal(INTERNAL_EVENT_PRIORITY, std::make_unique(getNextEventId(), 0, INTERNAL_EVENT_PRIORITY)); + _quitEventSent.store(true, std::memory_order_release); + _whenQuitEventWasSent = std::chrono::steady_clock::now(); + } +} + +bool Ichor::BoostAsioQueue::start(bool captureSigInt) { + if(!_dm) [[unlikely]] { + fmt::println("Please create a manager first!"); + return false; + } + + if(captureSigInt && !Ichor::Detail::registeredSignalHandler.exchange(true)) { + if(::signal(SIGINT, Ichor::Detail::on_sigint) == SIG_ERR) { + fmt::println("Couldn't set signal"); + return false; + } + } + _threadId = std::this_thread::get_id(); + + addInternalServiceManager(std::make_unique>(this)); + + startDm(); + + net::spawn(_context, [this](net::yield_context yield) { + if constexpr (DO_INTERNAL_DEBUG) { + if(std::this_thread::get_id() != _threadId) { + fmt::println("running on wrong thread"); + std::terminate(); + } + } + net::steady_timer t{_context}; + while (!_quit) { + t.expires_after(std::chrono::milliseconds(10)); + t.async_wait(yield); + } + }ASIO_SPAWN_COMPLETION_TOKEN); + + while (!_context.stopped()) { + try { + _context.run(); + } catch (boost::system::system_error const &e) { + fmt::println("io context {} system error {}", _dm->getId(), e.what()); + } catch (std::exception const &e) { + fmt::println("io context {} std error {}", _dm->getId(), e.what()); + } + } + + return true; +} diff --git a/src/ichor/event_queues/IOUringQueue.cpp b/src/ichor/event_queues/IOUringQueue.cpp index 52527f70..19cefac6 100644 --- a/src/ichor/event_queues/IOUringQueue.cpp +++ b/src/ichor/event_queues/IOUringQueue.cpp @@ -1,4 +1,6 @@ -#ifdef ICHOR_USE_LIBURING +#ifndef ICHOR_USE_LIBURING +#error "Uring not enabled." +#endif #include #include @@ -1043,5 +1045,3 @@ namespace Ichor { fmt::println("The last submission probably contains an error."); } #endif - -#endif diff --git a/src/ichor/event_queues/SdeventQueue.cpp b/src/ichor/event_queues/SdeventQueue.cpp index a46b0333..61515085 100644 --- a/src/ichor/event_queues/SdeventQueue.cpp +++ b/src/ichor/event_queues/SdeventQueue.cpp @@ -1,4 +1,6 @@ -#ifdef ICHOR_USE_SDEVENT +#ifndef ICHOR_USE_SDEVENT +#error "Sdevent not enabled." +#endif #include #include @@ -294,5 +296,3 @@ namespace Ichor { return _eventQueue; } } - -#endif diff --git a/src/services/network/boost/AsioContextService.cpp b/src/services/network/boost/AsioContextService.cpp deleted file mode 100644 index a67c6bab..00000000 --- a/src/services/network/boost/AsioContextService.cpp +++ /dev/null @@ -1,136 +0,0 @@ -#include -#include -#include -#include -#if (defined(WIN32) || defined(_WIN32) || defined(__WIN32)) && !defined(__CYGWIN__) -#include -#include -#endif - -Ichor::Boost::AsioContextService::AsioContextService(DependencyRegister ®, Properties props) : AdvancedService(std::move(props)) { - if(auto propIt = getProperties().find("Threads"); propIt != getProperties().end()) { - _threads = Ichor::any_cast(propIt->second); - if(_threads == 0) { - _threads = std::thread::hardware_concurrency(); - } - } - - reg.registerDependency(this, DependencyFlags::REQUIRED); -} - -Ichor::Boost::AsioContextService::~AsioContextService() { - for(auto &thread : _asioThreads) { - if(thread.joinable()) { - // "We're in a bad situation, sir, and do not know how to recover." - std::terminate(); - } - } -} - -Ichor::Task> Ichor::Boost::AsioContextService::start() { - _quit = false; - if(_threads == 1) { - _context = std::make_unique(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE_IO); - } else { - _context = std::make_unique(); - } - - ICHOR_LOG_INFO(_logger, "Using boost version {} beast version {}", BOOST_VERSION, BOOST_BEAST_VERSION); - - net::spawn(*_context, [this, &queue = GetThreadLocalEventQueue()](net::yield_context yield) { - // notify start() - queue.pushPrioritisedEvent(getServiceId(), INTERNAL_COROUTINE_EVENT_PRIORITY, [this]() { - _startStopEvent.set(); - }); - - net::steady_timer t{*_context}; - while (!_quit) { - t.expires_after(std::chrono::milliseconds(10)); - t.async_wait(yield); - } - INTERNAL_DEBUG("+++++++++++++++++++++++++++++++++++++++++++++++ NOTIFY STOP ++++++++++++++++++++++++++++++++"); - - // notify stop() - queue.pushPrioritisedEvent(getServiceId(), INTERNAL_COROUTINE_EVENT_PRIORITY, [this]() { - _startStopEvent.set(); - }); - }ASIO_SPAWN_COMPLETION_TOKEN); - - for(uint64_t i = 0; i < _threads; i++) { - [[maybe_unused]] auto &thread = _asioThreads.emplace_back([this, i]() { -#if defined(__APPLE__) - pthread_setname_np(fmt::format("Asio#{}-{}", getServiceId(), i).c_str()); -#endif -#if (defined(WIN32) || defined(_WIN32) || defined(__WIN32)) && !defined(__CYGWIN__) - SetThreadDescription(GetCurrentThread(), fmt::format(L"Asio#{}-{}", getServiceId(), i).c_str()); -#endif - INTERNAL_DEBUG("AsioContext started"); - while (!_context->stopped()) { - try { - _context->run(); - } catch (boost::system::system_error const &e) { - ICHOR_LOG_ERROR(_logger, "io context {}-{} system error {}", getServiceId(), i, e.what()); - } catch (std::exception const &e) { - ICHOR_LOG_ERROR(_logger, "io context {}-{} std error {}", getServiceId(), i, e.what()); - } - } - - INTERNAL_DEBUG("AsioContext stopped"); - }); - -#if defined(__linux__) || defined(__CYGWIN__) - pthread_setname_np(thread.native_handle(), fmt::format("Asio#{}-{}", getServiceId(), i).c_str()); -#endif - } - - co_await _startStopEvent; - - _startStopEvent.reset(); - - co_return {}; -} - -Ichor::Task Ichor::Boost::AsioContextService::stop() { - _quit.store(true, std::memory_order_release); - INTERNAL_DEBUG("+++++++++++++++++++++++++++++++++++++++++++++++ STOP ++++++++++++++++++++++++++++++++"); - - if(!_context->stopped()) { - INTERNAL_DEBUG("+++++++++++++++++++++++++++++++++++++++++++++++ pre wait\n"); - co_await _startStopEvent; - INTERNAL_DEBUG("+++++++++++++++++++++++++++++++++++++++++++++++ post wait\n"); - - _context->stop(); - } - - INTERNAL_DEBUG("+++++++++++++++++++++++++++++++++++++++++++++++ pre join\n"); - for(auto &thread : _asioThreads) { - thread.join(); - } - - INTERNAL_DEBUG("+++++++++++++++++++++++++++++++++++++++++++++++ post join\n"); - _context = nullptr; - _asioThreads.clear(); - - INTERNAL_DEBUG("+++++++++++++++++++++++++++++++++++++++++++++++ post clear\n"); - co_return; -} - -void Ichor::Boost::AsioContextService::addDependencyInstance(ILogger &logger, IService &) { - _logger = &logger; -} - -void Ichor::Boost::AsioContextService::removeDependencyInstance(ILogger &logger, IService&) { - _logger = nullptr; -} - -net::io_context* Ichor::Boost::AsioContextService::getContext() noexcept { - return _context.get(); -} - -bool Ichor::Boost::AsioContextService::fibersShouldStop() const noexcept { - return _quit.load(std::memory_order_acquire); -} - -uint64_t Ichor::Boost::AsioContextService::threadCount() const noexcept { - return _threads; -} diff --git a/src/services/network/boost/HttpConnectionService.cpp b/src/services/network/boost/HttpConnectionService.cpp index acd5d8cb..932897ef 100644 --- a/src/services/network/boost/HttpConnectionService.cpp +++ b/src/services/network/boost/HttpConnectionService.cpp @@ -7,23 +7,21 @@ Ichor::Boost::HttpConnectionService::HttpConnectionService(DependencyRegister ®, Properties props) : AdvancedService(std::move(props)) { reg.registerDependency(this, DependencyFlags::REQUIRED); - reg.registerDependency(this, DependencyFlags::REQUIRED); + reg.registerDependency(this, DependencyFlags::REQUIRED); } Ichor::Task> Ichor::Boost::HttpConnectionService::start() { - _queue = &GetThreadLocalEventQueue(); - - if (!_asioContextService->fibersShouldStop()) { - _quit.store(false, std::memory_order_release); + if (!_queue->fibersShouldStop()) { + _quit = false; auto addrIt = getProperties().find("Address"); auto portIt = getProperties().find("Port"); if(addrIt == getProperties().end()) { - ICHOR_LOG_ERROR_ATOMIC(_logger, "Missing address"); + ICHOR_LOG_ERROR(_logger, "Missing address"); co_return tl::unexpected(StartError::FAILED); } if(portIt == getProperties().end()) { - ICHOR_LOG_ERROR_ATOMIC(_logger, "Missing port"); + ICHOR_LOG_ERROR(_logger, "Missing port"); co_return tl::unexpected(StartError::FAILED); } @@ -34,19 +32,19 @@ Ichor::Task> Ichor::Boost::HttpConnectionS _tryConnectIntervalMs = Ichor::any_cast(propIt->second); } if(auto propIt = getProperties().find("Priority"); propIt != getProperties().end()) { - _priority.store(Ichor::any_cast(propIt->second), std::memory_order_release); + _priority = Ichor::any_cast(propIt->second); } if(auto propIt = getProperties().find("Debug"); propIt != getProperties().end()) { _debug = Ichor::any_cast(propIt->second); } if(auto propIt = getProperties().find("NoDelay"); propIt != getProperties().end()) { - _tcpNoDelay.store(Ichor::any_cast(propIt->second), std::memory_order_release); + _tcpNoDelay = Ichor::any_cast(propIt->second); } if(auto propIt = getProperties().find("ConnectOverSsl"); propIt != getProperties().end()) { - _useSsl.store(Ichor::any_cast(propIt->second), std::memory_order_release); + _useSsl = Ichor::any_cast(propIt->second); } if(_debug) { - ICHOR_LOG_DEBUG_ATOMIC(_logger, "connecting to {}:{}\n", Ichor::any_cast(addrIt->second), Ichor::any_cast(portIt->second)); + ICHOR_LOG_DEBUG(_logger, "connecting to {}:{}\n", Ichor::any_cast(addrIt->second), Ichor::any_cast(portIt->second)); } boost::system::error_code ec; @@ -54,17 +52,17 @@ Ichor::Task> Ichor::Boost::HttpConnectionS auto port = Ichor::any_cast(portIt->second); if(ec) { - ICHOR_LOG_ERROR_ATOMIC(_logger, "Couldn't parse address \"{}\": {} {}", Ichor::any_cast(addrIt->second), ec.value(), ec.message()); + ICHOR_LOG_ERROR(_logger, "Couldn't parse address \"{}\": {} {}", Ichor::any_cast(addrIt->second), ec.value(), ec.message()); co_return tl::unexpected(StartError::FAILED); } - _connecting.store(true, std::memory_order_release); - net::spawn(*_asioContextService->getContext(), [this, address = std::move(address), port](net::yield_context yield) { + _connecting = true; + net::spawn(_queue->getContext(), [this, address = std::move(address), port](net::yield_context yield) { connect(tcp::endpoint{address, port}, std::move(yield)); }ASIO_SPAWN_COMPLETION_TOKEN); } - if (_asioContextService->fibersShouldStop()) { + if (_queue->fibersShouldStop()) { co_return tl::unexpected(StartError::FAILED); } @@ -75,7 +73,7 @@ Ichor::Task> Ichor::Boost::HttpConnectionS } Ichor::Task Ichor::Boost::HttpConnectionService::stop() { - _quit.store(true, std::memory_order_release); + _quit = true; // INTERNAL_DEBUG("----------------------------------------------- STOP"); co_await close(); @@ -86,28 +84,27 @@ Ichor::Task Ichor::Boost::HttpConnectionService::stop() { } void Ichor::Boost::HttpConnectionService::addDependencyInstance(ILogger &logger, IService &) { - _logger.store(&logger, std::memory_order_release); + _logger = &logger; } void Ichor::Boost::HttpConnectionService::removeDependencyInstance(ILogger &logger, IService&) { - _logger.store(nullptr, std::memory_order_release); + _logger = nullptr; } -void Ichor::Boost::HttpConnectionService::addDependencyInstance(IAsioContextService &AsioContextService, IService&) { - _asioContextService = &AsioContextService; - ICHOR_LOG_TRACE_ATOMIC(_logger, "Inserted AsioContextService"); +void Ichor::Boost::HttpConnectionService::addDependencyInstance(IBoostAsioQueue &q, IService&) { + _queue = &q; } -void Ichor::Boost::HttpConnectionService::removeDependencyInstance(IAsioContextService&, IService&) { - _asioContextService = nullptr; +void Ichor::Boost::HttpConnectionService::removeDependencyInstance(IBoostAsioQueue&, IService&) { + _queue = nullptr; } void Ichor::Boost::HttpConnectionService::setPriority(uint64_t priority) { - _priority.store(priority, std::memory_order_release); + _priority = priority; } uint64_t Ichor::Boost::HttpConnectionService::getPriority() { - return _priority.load(std::memory_order_acquire); + return _priority; } Ichor::Task Ichor::Boost::HttpConnectionService::sendAsync(Ichor::HttpMethod method, std::string_view route, unordered_map &&headers, std::vector &&msg) { @@ -115,21 +112,20 @@ Ichor::Task Ichor::Boost::HttpConnectionService::sendAsync( throw std::runtime_error("GET requests cannot have a body."); } - ICHOR_LOG_DEBUG_ATOMIC(_logger, "sending to {}", route); + ICHOR_LOG_DEBUG(_logger, "sending to {}", route); HttpResponse response{}; - if(_quit.load(std::memory_order_acquire) || _asioContextService->fibersShouldStop()) { + if(_quit || _queue->fibersShouldStop()) { co_return response; } AsyncManualResetEvent event{}; - net::spawn(*_asioContextService->getContext(), [this, method, route, &event, &response, &headers, &msg](net::yield_context yield) mutable { + net::spawn(_queue->getContext(), [this, method, route, &event, &response, &headers, &msg](net::yield_context yield) mutable { static_assert(std::is_trivially_copyable_v, "ConnectionOutboxMessage should be trivially copyable"); ScopeGuardAtomicCount const guard{_finishedListenAndRead}; - std::unique_lock lg{_outboxMutex}; if(_outbox.full()) { _outbox.set_capacity(std::max(_outbox.capacity() * 2, 10ul)); } @@ -141,22 +137,18 @@ Ichor::Task Ichor::Boost::HttpConnectionService::sendAsync( while(!_outbox.empty()) { // Copy message, should be trivially copyable and prevents iterator invalidation auto next = _outbox.front(); - lg.unlock(); INTERNAL_DEBUG("Outbox {}", next.route); - ScopeGuard const coroutineGuard{[this, event = next.event, &lg]() { + ScopeGuard const coroutineGuard{[this, event = next.event]() { // use service id 0 to ensure event gets run, even if service is stopped. Otherwise, the coroutine will never complete. // Similarly, use priority 0 to ensure these events run before any dependency changes, otherwise the service might be destroyed // before we can finish all the coroutines. - _queue->pushPrioritisedEvent(0u, 0u, [event]() { - event->set(); - }); - lg.lock(); + event->set(); _outbox.pop_front(); }}; // if the service has to quit, we still have to spool through all the remaining messages, to complete coroutines - if(_quit.load(std::memory_order_acquire)) { + if(_quit) { continue; } @@ -175,7 +167,7 @@ Ichor::Task Ichor::Boost::HttpConnectionService::sendAsync( req.prepare_payload(); req.keep_alive(); - if(_useSsl.load(std::memory_order_acquire)) { + if(_useSsl) { // Set the timeout for this operation. // _sslStream should only be modified from the boost thread beast::get_lowest_layer(*_sslStream).expires_after(30s); @@ -195,7 +187,7 @@ Ichor::Task Ichor::Boost::HttpConnectionService::sendAsync( } // if the service has to quit, we still have to spool through all the remaining messages, to complete coroutines - if(_quit.load(std::memory_order_acquire)) { + if(_quit) { continue; } @@ -206,7 +198,7 @@ Ichor::Task Ichor::Boost::HttpConnectionService::sendAsync( http::response, http::basic_fields>> res; // Receive the HTTP response - if(_useSsl.load(std::memory_order_acquire)) { + if(_useSsl) { INTERNAL_DEBUG("https::read"); http::async_read(*_sslStream, b, res, yield[ec]); } else { @@ -225,7 +217,7 @@ Ichor::Task Ichor::Boost::HttpConnectionService::sendAsync( INTERNAL_DEBUG("received HTTP response {}", std::string_view(reinterpret_cast(res.body().data()), res.body().size())); // unset the timeout for the next operation. - if(_useSsl.load(std::memory_order_acquire)) { + if(_useSsl) { beast::get_lowest_layer(*_sslStream).expires_never(); } else { _httpStream->expires_never(); @@ -248,7 +240,7 @@ Ichor::Task Ichor::Boost::HttpConnectionService::sendAsync( } Ichor::Task Ichor::Boost::HttpConnectionService::close() { - if(_useSsl.load(std::memory_order_acquire)) { + if(_useSsl) { if(!_sslStream) { co_return; } @@ -258,30 +250,31 @@ Ichor::Task Ichor::Boost::HttpConnectionService::close() { } } - if(!_connecting.exchange(true, std::memory_order_acq_rel)) { - net::spawn(*_asioContextService->getContext(), [this](net::yield_context) { - // _httpStream and _sslStream should only be modified from the boost thread - if(_useSsl.load(std::memory_order_acquire)) { - beast::get_lowest_layer(*_sslStream).cancel(); - } else { - _httpStream->cancel(); - } + if(!_connecting) { + _connecting = true; + // _httpStream and _sslStream should only be modified from the boost thread + if(_useSsl) { + beast::get_lowest_layer(*_sslStream).cancel(); + } else { + _httpStream->cancel(); + } - _queue->pushEvent(getServiceId(), [this]() { - _startStopEvent.set(); - }); - }ASIO_SPAWN_COMPLETION_TOKEN); + _startStopEvent.set(); } co_await _startStopEvent; _startStopEvent.reset(); - while(_finishedListenAndRead.load(std::memory_order_acquire) != 0) { - std::this_thread::sleep_for(1ms); + while(_finishedListenAndRead != 0) { + _startStopEvent.reset(); + _queue->pushEvent(getServiceId(), [this]() { + _startStopEvent.set(); + }); + co_await _startStopEvent; } - _connected.store(false, std::memory_order_release); - _connecting.store(false, std::memory_order_release); + _connected = false; + _connecting = false; _httpStream = nullptr; _sslStream = nullptr; @@ -289,22 +282,22 @@ Ichor::Task Ichor::Boost::HttpConnectionService::close() { } void Ichor::Boost::HttpConnectionService::fail(beast::error_code ec, const char *what) { - ICHOR_LOG_ERROR_ATOMIC(_logger, "Boost.BEAST fail: {}, {}", what, ec.message()); - _queue->pushPrioritisedEvent(getServiceId(), _priority.load(std::memory_order_acquire), getServiceId()); + ICHOR_LOG_ERROR(_logger, "Boost.BEAST fail: {}, {}", what, ec.message()); + _queue->pushPrioritisedEvent(getServiceId(), _priority, getServiceId()); } void Ichor::Boost::HttpConnectionService::connect(tcp::endpoint endpoint, net::yield_context yield) { ScopeGuardAtomicCount guard{_finishedListenAndRead}; beast::error_code ec; - tcp::resolver resolver(*_asioContextService->getContext()); + tcp::resolver resolver(_queue->getContext()); auto const results = resolver.resolve(endpoint, ec); if(ec) { return fail(ec, "HttpConnectionService::connect resolve"); } - if (_useSsl.load(std::memory_order_acquire)) { + if (_useSsl) { // _sslContext and _sslStream should only be modified from the boost thread _sslContext = std::make_unique(net::ssl::context::tlsv12); _sslContext->set_verify_mode(net::ssl::verify_peer); @@ -314,7 +307,7 @@ void Ichor::Boost::HttpConnectionService::connect(tcp::endpoint endpoint, net::y _sslContext->add_certificate_authority(boost::asio::const_buffer(ca.c_str(), ca.size()), ec); } - _sslStream = std::make_unique>(*_asioContextService->getContext(), *_sslContext); + _sslStream = std::make_unique>(_queue->getContext(), *_sslContext); if(!SSL_set_tlsext_host_name(_sslStream->native_handle(), endpoint.address().to_string().c_str())) { ec.assign(static_cast(::ERR_get_error()), net::error::get_ssl_category()); @@ -327,10 +320,10 @@ void Ichor::Boost::HttpConnectionService::connect(tcp::endpoint endpoint, net::y // Make the connection on the IP address we get from a lookup bool connected{}; auto timeoutAt = std::chrono::steady_clock::now() + std::chrono::milliseconds(_timeoutMs); - while(!_quit.load(std::memory_order_acquire) && !_asioContextService->fibersShouldStop() && std::chrono::steady_clock::now() < timeoutAt) { + while(!_quit && !_queue->fibersShouldStop() && std::chrono::steady_clock::now() < timeoutAt) { beast::get_lowest_layer(*_sslStream).async_connect(results, yield[ec]); if (ec) { - net::steady_timer t{*_asioContextService->getContext()}; + net::steady_timer t{_queue->getContext()}; t.expires_after(std::chrono::milliseconds(_tryConnectIntervalMs)); t.async_wait(yield); } else { @@ -344,9 +337,9 @@ void Ichor::Boost::HttpConnectionService::connect(tcp::endpoint endpoint, net::y } if(_debug) { - fmt::print("--- ssl tcpNoDelay: {} ---\n", _tcpNoDelay.load(std::memory_order_acquire)); + fmt::print("--- ssl tcpNoDelay: {} ---\n", _tcpNoDelay); } - beast::get_lowest_layer(*_sslStream).socket().set_option(tcp::no_delay(_tcpNoDelay.load(std::memory_order_acquire))); + beast::get_lowest_layer(*_sslStream).socket().set_option(tcp::no_delay(_tcpNoDelay)); _sslStream->async_handshake(net::ssl::stream_base::client, yield[ec]); if(ec) { @@ -354,7 +347,7 @@ void Ichor::Boost::HttpConnectionService::connect(tcp::endpoint endpoint, net::y } } else { // _httpStream should only be modified from the boost thread - _httpStream = std::make_unique(*_asioContextService->getContext()); + _httpStream = std::make_unique(_queue->getContext()); // Never expire until we actually have an operation. _httpStream->expires_never(); @@ -362,10 +355,10 @@ void Ichor::Boost::HttpConnectionService::connect(tcp::endpoint endpoint, net::y // Make the connection on the IP address we get from a lookup bool connected{}; auto timeoutAt = std::chrono::steady_clock::now() + std::chrono::milliseconds(_timeoutMs); - while(!_quit.load(std::memory_order_acquire) && !_asioContextService->fibersShouldStop() && std::chrono::steady_clock::now() < timeoutAt) { + while(!_quit && !_queue->fibersShouldStop() && std::chrono::steady_clock::now() < timeoutAt) { _httpStream->async_connect(results, yield[ec]); if (ec) { - net::steady_timer t{*_asioContextService->getContext()}; + net::steady_timer t{_queue->getContext()}; t.expires_after(std::chrono::milliseconds(_tryConnectIntervalMs)); t.async_wait(yield); } else { @@ -378,20 +371,18 @@ void Ichor::Boost::HttpConnectionService::connect(tcp::endpoint endpoint, net::y return fail(ec, fmt::format("HttpConnectionService::connect ssl couldn't connect within {:L} ms", _timeoutMs).c_str()); } - _httpStream->socket().set_option(tcp::no_delay(_tcpNoDelay.load(std::memory_order_acquire))); + _httpStream->socket().set_option(tcp::no_delay(_tcpNoDelay)); } if(ec) { // see below for why _connecting has to be set last - _quit.store(true, std::memory_order_release); - _connecting.store(false, std::memory_order_release); + _quit = true; + _connecting = false; return fail(ec, "HttpConnectionService::connect connect"); } // set connected before connecting, or races with the start() function may occur. - _queue->pushEvent(getServiceId(), [this]() { - _startStopEvent.set(); - }); - _connected.store(true, std::memory_order_release); - _connecting.store(false, std::memory_order_release); + _startStopEvent.set(); + _connected = true; + _connecting = false; } diff --git a/src/services/network/boost/HttpHostService.cpp b/src/services/network/boost/HttpHostService.cpp index 740aacda..c5ff8c0b 100644 --- a/src/services/network/boost/HttpHostService.cpp +++ b/src/services/network/boost/HttpHostService.cpp @@ -5,7 +5,7 @@ Ichor::Boost::HttpHostService::HttpHostService(DependencyRegister ®, Properties props) : AdvancedService(std::move(props)) { reg.registerDependency(this, DependencyFlags::REQUIRED); - reg.registerDependency(this, DependencyFlags::REQUIRED); + reg.registerDependency(this, DependencyFlags::REQUIRED); } Ichor::Task> Ichor::Boost::HttpHostService::start() { @@ -13,16 +13,16 @@ Ichor::Task> Ichor::Boost::HttpHostService auto portIt = getProperties().find("Port"); if(addrIt == getProperties().end()) { - ICHOR_LOG_ERROR_ATOMIC(_logger, "Missing address"); + ICHOR_LOG_ERROR(_logger, "Missing address"); co_return tl::unexpected(StartError::FAILED); } if(portIt == getProperties().end()) { - ICHOR_LOG_ERROR_ATOMIC(_logger, "Missing port"); + ICHOR_LOG_ERROR(_logger, "Missing port"); co_return tl::unexpected(StartError::FAILED); } if(auto propIt = getProperties().find("Priority"); propIt != getProperties().end()) { - _priority.store(Ichor::any_cast(propIt->second), std::memory_order_release); + _priority = Ichor::any_cast(propIt->second); } if(auto propIt = getProperties().find("Debug"); propIt != getProperties().end()) { _debug = Ichor::any_cast(propIt->second); @@ -31,32 +31,30 @@ Ichor::Task> Ichor::Boost::HttpHostService _sendServerHeader = Ichor::any_cast(propIt->second); } if(auto propIt = getProperties().find("NoDelay"); propIt != getProperties().end()) { - _tcpNoDelay.store(Ichor::any_cast(propIt->second), std::memory_order_release); + _tcpNoDelay = Ichor::any_cast(propIt->second); } if(auto propIt = getProperties().find("SslCert"); propIt != getProperties().end()) { - _useSsl.store(true, std::memory_order_release); + _useSsl = true; } auto sslKeyIt = getProperties().find("SslKey"); - if((_useSsl.load(std::memory_order_acquire) && sslKeyIt == getProperties().end()) || - (!_useSsl.load(std::memory_order_acquire) && sslKeyIt != getProperties().end())) { - ICHOR_LOG_ERROR_ATOMIC(_logger, "Both SslCert and SslKey properties are required when using ssl"); + if((_useSsl && sslKeyIt == getProperties().end()) || + (!_useSsl && sslKeyIt != getProperties().end())) { + ICHOR_LOG_ERROR(_logger, "Both SslCert and SslKey properties are required when using ssl"); co_return tl::unexpected(StartError::FAILED); } - _queue = &GetThreadLocalEventQueue(); - boost::system::error_code ec; auto address = net::ip::make_address(Ichor::any_cast(addrIt->second), ec); auto port = Ichor::any_cast(portIt->second); if(ec) { - ICHOR_LOG_ERROR_ATOMIC(_logger, "Couldn't parse address \"{}\": {} {}", Ichor::any_cast(addrIt->second), ec.value(), ec.message()); + ICHOR_LOG_ERROR(_logger, "Couldn't parse address \"{}\": {} {}", Ichor::any_cast(addrIt->second), ec.value(), ec.message()); co_return tl::unexpected(StartError::FAILED); } - net::spawn(*_asioContextService->getContext(), [this, address = std::move(address), port](net::yield_context yield) { + net::spawn(_queue->getContext(), [this, address = std::move(address), port](net::yield_context yield) { listen(tcp::endpoint{address, port}, std::move(yield)); }ASIO_SPAWN_COMPLETION_TOKEN); @@ -65,37 +63,35 @@ Ichor::Task> Ichor::Boost::HttpHostService Ichor::Task Ichor::Boost::HttpHostService::stop() { INTERNAL_DEBUG("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! HttpHostService::stop()"); - _quit.store(true, std::memory_order_release); + _quit = true; - if(!_goingToCleanupStream.exchange(true, std::memory_order_acq_rel) && _httpAcceptor) { + if(!_goingToCleanupStream && _httpAcceptor) { + _goingToCleanupStream = true; if(_httpAcceptor->is_open()) { _httpAcceptor->close(); } - net::spawn(*_asioContextService->getContext(), [this](net::yield_context _yield) { - ScopeGuardAtomicCount const guard{_finishedListenAndRead}; - { - std::unique_lock const lg{_streamsMutex}; - for (auto &[id, stream]: _httpStreams) { - stream->socket.cancel(); - } - for (auto &[id, stream]: _sslStreams) { - beast::get_lowest_layer(stream->socket).cancel(); - } - } - _httpAcceptor = nullptr; + for (auto &[id, stream]: _httpStreams) { + stream->socket.cancel(); + } + for (auto &[id, stream]: _sslStreams) { + beast::get_lowest_layer(stream->socket).cancel(); + } + + _httpAcceptor = nullptr; - _queue->pushEvent(getServiceId(), [this]() { - _startStopEvent.set(); - }); - }ASIO_SPAWN_COMPLETION_TOKEN); + _startStopEvent.set(); } co_await _startStopEvent; INTERNAL_DEBUG("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! post await"); while(_finishedListenAndRead.load(std::memory_order_acquire) != 0) { - std::this_thread::sleep_for(1ms); + _startStopEvent.reset(); + _queue->pushEvent(getServiceId(), [this]() { + _startStopEvent.set(); + }); + co_await _startStopEvent; } _httpStreams.clear(); @@ -105,29 +101,27 @@ Ichor::Task Ichor::Boost::HttpHostService::stop() { } void Ichor::Boost::HttpHostService::addDependencyInstance(ILogger &logger, IService &) { - _logger.store(&logger, std::memory_order_release); + _logger = &logger; } void Ichor::Boost::HttpHostService::removeDependencyInstance(ILogger &, IService&) { - _logger.store(nullptr, std::memory_order_release); + _logger = nullptr; } -void Ichor::Boost::HttpHostService::addDependencyInstance(IAsioContextService &AsioContextService, IService&) { - _asioContextService = &AsioContextService; - ICHOR_LOG_TRACE_ATOMIC(_logger, "Inserted AsioContextService"); +void Ichor::Boost::HttpHostService::addDependencyInstance(IBoostAsioQueue &q, IService&) { + _queue = &q; } -void Ichor::Boost::HttpHostService::removeDependencyInstance(IAsioContextService&, IService&) { - ICHOR_LOG_TRACE_ATOMIC(_logger, "Removing AsioContextService"); - _asioContextService = nullptr; +void Ichor::Boost::HttpHostService::removeDependencyInstance(IBoostAsioQueue&, IService&) { + _queue = nullptr; } void Ichor::Boost::HttpHostService::setPriority(uint64_t priority) { - _priority.store(priority, std::memory_order_release); + _priority = priority; } uint64_t Ichor::Boost::HttpHostService::getPriority() { - return _priority.load(std::memory_order_acquire); + return _priority; } Ichor::HttpRouteRegistration Ichor::Boost::HttpHostService::addRoute(HttpMethod method, std::string_view route, std::function(HttpRequest&)> handler) { @@ -163,11 +157,9 @@ void Ichor::Boost::HttpHostService::removeRoute(HttpMethod method, RouteIdType i } void Ichor::Boost::HttpHostService::fail(beast::error_code ec, const char *what, bool stopSelf) { - _queue->pushPrioritisedEvent(getServiceId(), _priority.load(std::memory_order_acquire), [this, what, ec]() { - ICHOR_LOG_ERROR_ATOMIC(_logger, "Boost.BEAST fail: {}, {}", what, ec.message()); - }); + ICHOR_LOG_ERROR(_logger, "Boost.BEAST fail: {}, {}", what, ec.message()); if(stopSelf) { - _queue->pushPrioritisedEvent(getServiceId(), _priority.load(std::memory_order_acquire), getServiceId()); + _queue->pushPrioritisedEvent(getServiceId(), _priority, getServiceId()); } } @@ -175,7 +167,7 @@ void Ichor::Boost::HttpHostService::listen(tcp::endpoint endpoint, net::yield_co ScopeGuardAtomicCount guard{_finishedListenAndRead}; beast::error_code ec; - if(_useSsl.load(std::memory_order_acquire)) { + if(_useSsl) { _sslContext = std::make_unique(net::ssl::context::tlsv12); if(auto propIt = getProperties().find("SslPassword"); propIt != getProperties().end()) { @@ -198,7 +190,7 @@ void Ichor::Boost::HttpHostService::listen(tcp::endpoint endpoint, net::yield_co _sslContext->use_private_key(boost::asio::buffer(key.data(), key.size()), boost::asio::ssl::context::file_format::pem); } - _httpAcceptor = std::make_unique(*_asioContextService->getContext()); + _httpAcceptor = std::make_unique(_queue->getContext()); _httpAcceptor->open(endpoint.protocol(), ec); if(ec) { return fail(ec, "HttpHostService::listen open", true); @@ -219,9 +211,9 @@ void Ichor::Boost::HttpHostService::listen(tcp::endpoint endpoint, net::yield_co return fail(ec, "HttpHostService::listen listen", true); } - while(!_quit.load(std::memory_order_acquire) && !_asioContextService->fibersShouldStop()) + while(!_quit && !_queue->fibersShouldStop()) { - auto socket = tcp::socket(*_asioContextService->getContext()); + auto socket = tcp::socket(_queue->getContext()); // tcp accept new connections _httpAcceptor->async_accept(socket, yield[ec]); @@ -231,10 +223,10 @@ void Ichor::Boost::HttpHostService::listen(tcp::endpoint endpoint, net::yield_co continue; } - socket.set_option(tcp::no_delay(_tcpNoDelay.load(std::memory_order_acquire))); + socket.set_option(tcp::no_delay(_tcpNoDelay)); - net::spawn(*_asioContextService->getContext(), [this, socket = std::move(socket)](net::yield_context _yield) mutable { - if(_quit.load(std::memory_order_acquire)) { + net::spawn(_queue->getContext(), [this, socket = std::move(socket)](net::yield_context _yield) mutable { + if(_quit) { return; } @@ -246,8 +238,9 @@ void Ichor::Boost::HttpHostService::listen(tcp::endpoint endpoint, net::yield_co }ASIO_SPAWN_COMPLETION_TOKEN); } - ICHOR_LOG_WARN_ATOMIC(_logger, "finished listen() {} {}", _quit.load(std::memory_order_acquire), _asioContextService->fibersShouldStop()); - if(!_goingToCleanupStream.exchange(true, std::memory_order_acq_rel) && _httpAcceptor) { + ICHOR_LOG_WARN(_logger, "finished listen() {} {}", _quit, _queue->fibersShouldStop()); + if(!_goingToCleanupStream && _httpAcceptor) { + _goingToCleanupStream = true; _queue->pushPrioritisedEvent(getServiceId(), getPriority(), getServiceId()); } } @@ -258,15 +251,11 @@ void Ichor::Boost::HttpHostService::read(tcp::socket socket, net::yield_context beast::error_code ec; auto addr = socket.remote_endpoint().address().to_string(); std::shared_ptr> connection; - uint64_t streamId; - { - std::lock_guard const lg(_streamsMutex); - streamId = _streamIdCounter++; - if constexpr (std::is_same_v) { - connection = _httpStreams.emplace(streamId, std::make_shared>(std::move(socket))).first->second; - } else { - connection = _sslStreams.emplace(streamId, std::make_shared>(std::move(socket), *_sslContext)).first->second; - } + uint64_t streamId = _streamIdCounter++; + if constexpr (std::is_same_v) { + connection = _httpStreams.emplace(streamId, std::make_shared>(std::move(socket))).first->second; + } else { + connection = _sslStreams.emplace(streamId, std::make_shared>(std::move(socket), *_sslContext)).first->second; } if constexpr (!std::is_same_v) { @@ -281,7 +270,7 @@ void Ichor::Boost::HttpHostService::read(tcp::socket socket, net::yield_context // This buffer is required to persist across reads beast::basic_flat_buffer buffer{ std::allocator{} }; - while (!_quit.load(std::memory_order_acquire) && !_asioContextService->fibersShouldStop()) + while (!_quit && !_queue->fibersShouldStop()) { // Set the timeout. if constexpr (std::is_same_v) { @@ -315,7 +304,7 @@ void Ichor::Boost::HttpHostService::read(tcp::socket socket, net::yield_context } auto target = std::string{req.target()}; - ICHOR_LOG_TRACE_ATOMIC(_logger, "New request for {} {}", (int)req.method(), target); + ICHOR_LOG_TRACE(_logger, "New request for {} {}", (int)req.method(), target); unordered_map headers{}; headers.reserve(static_cast(std::distance(std::begin(req), std::end(req)))); @@ -337,7 +326,7 @@ void Ichor::Boost::HttpHostService::read(tcp::socket socket, net::yield_context #endif auto routes = _handlers.find(httpReq.method); - if (_quit.load(std::memory_order_acquire) || _asioContextService->fibersShouldStop()) { + if (_quit || _queue->fibersShouldStop()) { co_return{}; } @@ -366,7 +355,7 @@ void Ichor::Boost::HttpHostService::read(tcp::socket socket, net::yield_context res.set(header.first, header.second); } res.keep_alive(keep_alive); - ICHOR_LOG_TRACE_ATOMIC(_logger, "sending http response {} - {}", (int)httpRes.status, + ICHOR_LOG_TRACE(_logger, "sending http response {} - {}", (int)httpRes.status, std::string_view(reinterpret_cast(httpRes.body.data()), httpRes.body.size())); res.body() = std::move(httpRes.body); @@ -390,33 +379,29 @@ void Ichor::Boost::HttpHostService::read(tcp::socket socket, net::yield_context }); } - { - std::lock_guard lg(_streamsMutex); - if constexpr (std::is_same_v) { - _httpStreams.erase(streamId); - } else { - _sslStreams.erase(streamId); - } + if constexpr (std::is_same_v) { + _httpStreams.erase(streamId); + } else { + _sslStreams.erase(streamId); } // At this point the connection is closed gracefully - ICHOR_LOG_WARN_ATOMIC(_logger, "finished read() {} {}", _quit.load(std::memory_order_acquire), _asioContextService->fibersShouldStop()); + ICHOR_LOG_WARN(_logger, "finished read() {} {}", _quit, _queue->fibersShouldStop()); } template void Ichor::Boost::HttpHostService::sendInternal(std::shared_ptr> &connection, http::response, http::basic_fields>> &&res) { static_assert(std::is_move_assignable_v, "HostOutboxMessage should be move assignable"); - if(_quit.load(std::memory_order_acquire) || _asioContextService->fibersShouldStop()) { + if(_quit || _queue->fibersShouldStop()) { return; } - net::spawn(*_asioContextService->getContext(), [this, res = std::move(res), connection = std::move(connection)](net::yield_context yield) mutable { - if(_quit.load(std::memory_order_acquire)) { + net::spawn(_queue->getContext(), [this, res = std::move(res), connection = std::move(connection)](net::yield_context yield) mutable { + if(_quit) { return; } - std::unique_lock lg(connection->mutex); if(connection->outbox.full()) { connection->outbox.set_capacity(std::max(connection->outbox.capacity() * 2, 10ul)); } @@ -426,10 +411,9 @@ void Ichor::Boost::HttpHostService::sendInternal(std::shared_ptroutbox.empty()) { + while(!_quit && !connection->outbox.empty()) { // Move message, should be trivially copyable and prevents iterator invalidation auto next = std::move(connection->outbox.front()); - lg.unlock(); if constexpr (std::is_same_v) { connection->socket.expires_after(30s); } else { @@ -446,7 +430,6 @@ void Ichor::Boost::HttpHostService::sendInternal(std::shared_ptroutbox.pop_front(); } }ASIO_SPAWN_COMPLETION_TOKEN); diff --git a/src/services/network/boost/WsConnectionService.cpp b/src/services/network/boost/WsConnectionService.cpp index 9a07f845..0c945087 100644 --- a/src/services/network/boost/WsConnectionService.cpp +++ b/src/services/network/boost/WsConnectionService.cpp @@ -2,7 +2,6 @@ #include #include #include -#include #include #include #include @@ -27,7 +26,7 @@ void setup_stream(std::shared_ptr>& ws) template requires Ichor::DerivedAny Ichor::Boost::WsConnectionService::WsConnectionService(DependencyRegister ®, Properties props) : AdvancedService>(std::move(props)) { reg.registerDependency(this, DependencyFlags::REQUIRED); - reg.registerDependency(this, DependencyFlags::REQUIRED); + reg.registerDependency(this, DependencyFlags::REQUIRED); if(auto propIt = AdvancedService>::getProperties().find("WsHostServiceId"); propIt != AdvancedService>::getProperties().end()) { reg.registerDependency(this, DependencyFlags::REQUIRED, Properties{{"Filter", Ichor::make_any(ServiceIdFilterEntry{Ichor::any_cast(propIt->second)})}}); @@ -36,19 +35,17 @@ Ichor::Boost::WsConnectionService::WsConnectionService(DependencyReg template requires Ichor::DerivedAny Ichor::Task> Ichor::Boost::WsConnectionService::start() { - if(_connected.load(std::memory_order_acquire)) { + if(_connected) { co_return {}; } - _queue = &GetThreadLocalEventQueue(); - - _quit.store(false, std::memory_order_release); + _quit = false; if(auto propIt = AdvancedService>::getProperties().find("Priority"); propIt != AdvancedService>::getProperties().end()) { _priority = Ichor::any_cast(propIt->second); } - _strand = std::make_unique>(_asioContextService->getContext()->get_executor()); + _strand = std::make_unique>(_queue->getContext().get_executor()); if (AdvancedService>::getProperties().contains("Socket")) { net::spawn(*_strand, [this](net::yield_context yield) { ScopeGuardAtomicCount const guard{_finishedListenAndRead}; @@ -64,52 +61,44 @@ Ichor::Task> Ichor::Boost::WsConnectionSer co_await _startStopEvent; _startStopEvent.reset(); - if(!_connected.load(std::memory_order_acquire)) { + if(!_connected) { auto const& address = Ichor::any_cast(AdvancedService>::getProperties()["Address"]); auto const port = Ichor::any_cast(AdvancedService>::getProperties()["Port"]); ICHOR_LOG_ERROR(_logger, "Could not connect to {}:{}", address, port); co_return tl::unexpected(StartError::FAILED); } -// fmt::print("----------------------------------------------- {}:{} start done\n", AdvancedService>::getServiceId(), getServiceName()); co_return {}; } template requires Ichor::DerivedAny Ichor::Task Ichor::Boost::WsConnectionService::stop() { -// INTERNAL_DEBUG("----------------------------------------------- trying to stop WsConnectionService {}", AdvancedService>::getServiceId()); -// fmt::print("----------------------------------------------- {}:{} stop\n", AdvancedService>::getServiceId(), getServiceName()); - _quit.store(true, std::memory_order_release); + INTERNAL_DEBUG("----------------------------------------------- trying to stop WsConnectionService {}", AdvancedService>::getServiceId()); + _quit = true; if(_ws != nullptr) { net::spawn(*_strand, [this](net::yield_context yield) { ScopeGuardAtomicCount const guard{_finishedListenAndRead}; boost::system::error_code ec; -// fmt::print("----------------------------------------------- {}:{} async_close\n", AdvancedService>::getServiceId(), getServiceName()); _ws->async_close(beast::websocket::close_code::normal, yield[ec]); -// fmt::print("----------------------------------------------- {}:{} async_close done\n", AdvancedService>::getServiceId(), getServiceName()); if (ec) { - _queue->pushEvent(AdvancedService>::getServiceId(), [this, ec]() { - ICHOR_LOG_ERROR(_logger, "Boost.BEAST fail: {}", ec.message()); - }); + ICHOR_LOG_ERROR(_logger, "Boost.BEAST fail: {}", ec.message()); } - _queue->pushPrioritisedEvent(AdvancedService>::getServiceId(), _priority.load(std::memory_order_acquire), [this]() { -// fmt::print("{}:{} rfe2\n", AdvancedService>::getServiceId(), getServiceName()); - _startStopEvent.set(); - }); + _startStopEvent.set(); }ASIO_SPAWN_COMPLETION_TOKEN); -// fmt::print("----------------------------------------------- {}:{} wait {}\n", AdvancedService>::getServiceId(), getServiceName(), _startStopEvent.is_set()); co_await _startStopEvent; -// fmt::print("----------------------------------------------- {}:{} wait done\n", AdvancedService>::getServiceId(), getServiceName()); while(_finishedListenAndRead.load(std::memory_order_acquire) != 0) { - std::this_thread::sleep_for(1ms); + _startStopEvent.reset(); + _queue->pushEvent(AdvancedService>::getServiceId(), [this]() { + _startStopEvent.set(); + }); + co_await _startStopEvent; } _ws = nullptr; _strand = nullptr; } -// fmt::print("----------------------------------------------- {}:{} stop done\n", AdvancedService>::getServiceId(), getServiceName()); co_return; } @@ -134,34 +123,32 @@ void Ichor::Boost::WsConnectionService::removeDependencyInstance(IHo } template requires Ichor::DerivedAny -void Ichor::Boost::WsConnectionService::addDependencyInstance(IAsioContextService &AsioContextService, IService&) { - _asioContextService = &AsioContextService; +void Ichor::Boost::WsConnectionService::addDependencyInstance(IBoostAsioQueue &q, IService&) { + _queue = &q; } template requires Ichor::DerivedAny -void Ichor::Boost::WsConnectionService::removeDependencyInstance(IAsioContextService&, IService&) { -// fmt::print("{}:{} {} set nullptr\n", AdvancedService>::getServiceId(), getServiceName(), getServiceState()); +void Ichor::Boost::WsConnectionService::removeDependencyInstance(IBoostAsioQueue&, IService&) { if(AdvancedService>::getServiceState() != ServiceState::INSTALLED) { std::terminate(); } - _asioContextService = nullptr; + _queue = nullptr; } static_assert(std::is_move_assignable_v, "ConnectionOutboxMessage should be move assignable"); template requires Ichor::DerivedAny Ichor::Task> Ichor::Boost::WsConnectionService::sendAsync(std::vector &&msg) { - if(_quit.load(std::memory_order_acquire) || !_connected.load(std::memory_order_acquire) || _asioContextService->fibersShouldStop()) { + if(_quit || !_connected || _queue->fibersShouldStop()) { co_return tl::unexpected(IOError::SERVICE_QUITTING); } AsyncManualResetEvent evt; bool success{}; net::spawn(*_strand, [this, &evt, &success, msg = std::move(msg)](net::yield_context yield) mutable { ScopeGuardAtomicCount const guard{_finishedListenAndRead}; - if(_quit.load(std::memory_order_acquire) || !_connected.load(std::memory_order_acquire) || _asioContextService->fibersShouldStop()) { + if(_quit || !_connected || _queue->fibersShouldStop()) { return; } - std::unique_lock lg{_outboxMutex}; if(_outbox.full()) { _outbox.set_capacity(std::max(_outbox.capacity() * 2, 10ul)); } @@ -173,20 +160,16 @@ Ichor::Task> Ichor::Boost::WsConnectionServic auto ws = _ws; while(!_outbox.empty()) { auto next = std::move(_outbox.front()); - lg.unlock(); - ScopeGuard const coroutineGuard{[this, event = next.evt, &lg]() { + ScopeGuard const coroutineGuard{[this, event = next.evt]() { // use service id 0 to ensure event gets run, even if service is stopped. Otherwise, the coroutine will never complete. // Similarly, use priority 0 to ensure these events run before any dependency changes, otherwise the service might be destroyed // before we can finish all the coroutines. - _queue->pushPrioritisedEvent(0u, 0u, [event]() { - event->set(); - }); - lg.lock(); + event->set(); _outbox.pop_front(); }}; - if(_quit.load(std::memory_order_acquire)) { + if(_quit) { continue; } @@ -195,11 +178,10 @@ Ichor::Task> Ichor::Boost::WsConnectionServic *(next.success) = !ec; if(ec) { - _queue->pushEvent(AdvancedService>::getServiceId(), [this, ec]() { - ICHOR_LOG_ERROR(_logger, "couldn't send msg for service {}: {}", AdvancedService>::getServiceId(), ec.message()); - }); + ICHOR_LOG_ERROR(_logger, "couldn't send msg for service {}: {}", AdvancedService>::getServiceId(), ec.message()); } } + }ASIO_SPAWN_COMPLETION_TOKEN); co_await evt; @@ -214,18 +196,17 @@ Ichor::Task> Ichor::Boost::WsConnectionServic template requires Ichor::DerivedAny Ichor::Task> Ichor::Boost::WsConnectionService::sendAsync(std::vector>&& msgs) { - if(_quit.load(std::memory_order_acquire) || !_connected.load(std::memory_order_acquire) || _asioContextService->fibersShouldStop()) { + if(_quit || !_connected || _queue->fibersShouldStop()) { co_return tl::unexpected(IOError::SERVICE_QUITTING); } AsyncManualResetEvent evt; bool success{}; net::spawn(*_strand, [this, &evt, &success, msgs = std::move(msgs)](net::yield_context yield) mutable { ScopeGuardAtomicCount const guard{_finishedListenAndRead}; - if(_quit.load(std::memory_order_acquire) || !_connected.load(std::memory_order_acquire) || _asioContextService->fibersShouldStop()) { + if(_quit || !_connected || _queue->fibersShouldStop()) { return; } - std::unique_lock lg{_outboxMutex}; for(auto &msg : msgs) { if (_outbox.full()) { _outbox.set_capacity(std::max(_outbox.capacity() * 2, 10ul)); @@ -239,20 +220,16 @@ Ichor::Task> Ichor::Boost::WsConnectionServic auto ws = _ws; while(!_outbox.empty()) { auto next = std::move(_outbox.front()); - lg.unlock(); - ScopeGuard const coroutineGuard{[this, event = next.evt, &lg]() { + ScopeGuard const coroutineGuard{[this, event = next.evt]() { // use service id 0 to ensure event gets run, even if service is stopped. Otherwise, the coroutine will never complete. // Similarly, use priority 0 to ensure these events run before any dependency changes, otherwise the service might be destroyed // before we can finish all the coroutines. - _queue->pushPrioritisedEvent(0u, 0u, [event]() { - event->set(); - }); - lg.lock(); + event->set(); _outbox.pop_front(); }}; - if(_quit.load(std::memory_order_acquire)) { + if(_quit) { continue; } @@ -261,9 +238,7 @@ Ichor::Task> Ichor::Boost::WsConnectionServic *(next.success) = !ec; if(ec) { - _queue->pushEvent(AdvancedService>::getServiceId(), [this, ec]() { - ICHOR_LOG_ERROR(_logger, "couldn't send msg for service {}: {}", AdvancedService>::getServiceId(), ec.message()); - }); + ICHOR_LOG_ERROR(_logger, "couldn't send msg for service {}: {}", AdvancedService>::getServiceId(), ec.message()); } } }ASIO_SPAWN_COMPLETION_TOKEN); @@ -279,12 +254,12 @@ Ichor::Task> Ichor::Boost::WsConnectionServic template requires Ichor::DerivedAny void Ichor::Boost::WsConnectionService::setPriority(uint64_t priority) { - _priority.store(priority, std::memory_order_release); + _priority = priority; } template requires Ichor::DerivedAny uint64_t Ichor::Boost::WsConnectionService::getPriority() { - return _priority.load(std::memory_order_acquire); + return _priority; } template requires Ichor::DerivedAny @@ -304,14 +279,9 @@ void Ichor::Boost::WsConnectionService::setReceiveHandler(std::funct template requires Ichor::DerivedAny void Ichor::Boost::WsConnectionService::fail(beast::error_code ec, const char *what) { -// fmt::print("{}:{} fail {}\n", AdvancedService>::getServiceId(), getServiceName(), ec.message()); - - _queue->pushPrioritisedEvent(AdvancedService>::getServiceId(), _priority.load(std::memory_order_acquire), [this, ec, what]() { - ICHOR_LOG_ERROR(_logger, "Boost.BEAST fail: {}, {}", what, ec.message()); -// fmt::print("{}:{} rfe\n", AdvancedService>::getServiceId(), getServiceName()); - _startStopEvent.set(); - }); _queue->pushEvent(AdvancedService>::getServiceId(), AdvancedService>::getServiceId()); + ICHOR_LOG_ERROR(_logger, "Boost.BEAST fail: {}, {}", what, ec.message()); + _startStopEvent.set(); } template requires Ichor::DerivedAny @@ -320,10 +290,7 @@ void Ichor::Boost::WsConnectionService::accept(net::yield_context yi { ScopeGuard const coroutineGuard{[this]() { - _queue->pushPrioritisedEvent(AdvancedService>::getServiceId(), _priority.load(std::memory_order_acquire), [this]() { -// fmt::print("{}:{} rfe accept\n", AdvancedService>::getServiceId(), getServiceName()); - _startStopEvent.set(); - }); + _startStopEvent.set(); }}; if (!_ws) { @@ -353,12 +320,12 @@ void Ichor::Boost::WsConnectionService::accept(net::yield_context yi // If it fails (due to connecting earlier than the host is available), wait 250 ms and make another attempt // After 5 attempts, fail. int attempts{}; - while (!_quit.load(std::memory_order_acquire) && !_asioContextService->fibersShouldStop() && attempts < 5) { + while (!_quit && !_queue->fibersShouldStop() && attempts < 5) { // initiate websocket handshake _ws->async_accept(yield[ec]); if (ec) { attempts++; - net::steady_timer t{*_asioContextService->getContext()}; + net::steady_timer t{_queue->getContext()}; t.expires_after(250ms); t.async_wait(yield); } else { @@ -370,7 +337,7 @@ void Ichor::Boost::WsConnectionService::accept(net::yield_context yi return fail(ec, "accept"); } - _connected.store(true, std::memory_order_release); + _connected = true; } read(yield); @@ -383,15 +350,12 @@ void Ichor::Boost::WsConnectionService::connect(net::yield_context y auto const port = Ichor::any_cast(AdvancedService>::getProperties()["Port"]); // These objects perform our I/O - tcp::resolver resolver(*_asioContextService->getContext()); - _ws = std::make_shared>(*_asioContextService->getContext()); + tcp::resolver resolver(_queue->getContext()); + _ws = std::make_shared>(_queue->getContext()); { ScopeGuard const coroutineGuard{[this]() { - _queue->pushPrioritisedEvent(AdvancedService>::getServiceId(), _priority.load(std::memory_order_acquire), [this]() { -// fmt::print("{}:{} rfe connect\n", AdvancedService>::getServiceId(), getServiceName()); - _startStopEvent.set(); - }); + _startStopEvent.set(); }}; // Look up the domain name @@ -407,11 +371,11 @@ void Ichor::Boost::WsConnectionService::connect(net::yield_context y // If it fails (due to connecting earlier than the host is available), wait 250 ms and make another attempt // After 5 attempts, fail. int attempts{}; - while (!_quit.load(std::memory_order_acquire) && !_asioContextService->fibersShouldStop() && attempts < 5) { + while (!_quit && !_queue->fibersShouldStop() && attempts < 5) { beast::get_lowest_layer(*_ws).async_connect(results, yield[ec]); if (ec) { attempts++; - net::steady_timer t{*_asioContextService->getContext()}; + net::steady_timer t{_queue->getContext()}; t.expires_after(std::chrono::milliseconds(250)); t.async_wait(yield); } else { @@ -424,7 +388,7 @@ void Ichor::Boost::WsConnectionService::connect(net::yield_context y return fail(ec, "connect"); } - _connected.store(true, std::memory_order_release); + _connected = true; // Turn off the timeout on the tcp_stream, because // the websocket stream has its own timeout system. @@ -443,10 +407,10 @@ void Ichor::Boost::WsConnectionService::connect(net::yield_context y " ichor"); })); - // Perform the websocket handshake + // Perform the websocket handshake7 _ws->async_handshake(address, "/", yield[ec]); if (ec) { - _connected.store(false, std::memory_order_release); + _connected = false; return fail(ec, "handshake"); } } @@ -458,14 +422,12 @@ template requires Ichor::DerivedAny::read(net::yield_context &yield) { beast::error_code ec; - while(!_quit.load(std::memory_order_acquire) && !_asioContextService->fibersShouldStop()) { -// fmt::print("{}:{} read\n", AdvancedService>::getServiceId(), getServiceName()); + while(!_quit && !_queue->fibersShouldStop()) { beast::basic_flat_buffer buffer{std::allocator{}}; _ws->async_read(buffer, yield[ec]); -// fmt::print("{}:{} read post async_read\n", AdvancedService>::getServiceId(), getServiceName()); - if(_quit.load(std::memory_order_acquire) || _asioContextService->fibersShouldStop()) { + if(_quit || _queue->fibersShouldStop()) { break; } @@ -473,25 +435,22 @@ void Ichor::Boost::WsConnectionService::read(net::yield_context &yie break; } if(ec) { - _connected.store(false, std::memory_order_release); + _connected = false; return fail(ec, "read"); } if(_ws->got_text()) { auto data = buffer.data(); - _queue->pushPrioritisedEvent(AdvancedService>::getServiceId(), _priority.load(std::memory_order_acquire), [this, data = std::vector{static_cast(data.data()), static_cast(data.data()) + data.size()}]() mutable { if(_recvHandler) { - _recvHandler(data); + _recvHandler(std::span(static_cast(data.data()), data.size())); } else { - _queuedMessages.emplace_back(std::move(data)); + _queuedMessages.emplace_back(static_cast(data.data()), static_cast(data.data()) + data.size()); } - }); } } - _connected.store(false, std::memory_order_release); + _connected = false; INTERNAL_DEBUG("read stopped WsConnectionService {}", AdvancedService>::getServiceId()); -// fmt::print("{}:{} read done\n", AdvancedService>::getServiceId(), getServiceName()); } template class Ichor::Boost::WsConnectionService; diff --git a/src/services/network/boost/WsHostService.cpp b/src/services/network/boost/WsHostService.cpp index 191e25b2..54511b17 100644 --- a/src/services/network/boost/WsHostService.cpp +++ b/src/services/network/boost/WsHostService.cpp @@ -8,7 +8,7 @@ Ichor::Boost::WsHostService::WsHostService(DependencyRegister ®, Properties props) : AdvancedService(std::move(props)) { reg.registerDependency(this, DependencyFlags::REQUIRED); - reg.registerDependency(this, DependencyFlags::REQUIRED); + reg.registerDependency(this, DependencyFlags::REQUIRED); } Ichor::Task> Ichor::Boost::WsHostService::start() { @@ -28,16 +28,14 @@ Ichor::Task> Ichor::Boost::WsHostService:: _priority = Ichor::any_cast(propIt->second); } if(auto propIt = getProperties().find("NoDelay"); propIt != getProperties().end()) { - _tcpNoDelay.store(Ichor::any_cast(propIt->second), std::memory_order_release); + _tcpNoDelay = Ichor::any_cast(propIt->second); } - _queue = &GetThreadLocalEventQueue(); - _eventRegistration = GetThreadLocalManager().registerEventHandler(this, this, getServiceId()); auto address = net::ip::make_address(Ichor::any_cast(addrIt->second)); auto port = Ichor::any_cast(portIt->second); - _strand = std::make_unique>(_asioContextService->getContext()->get_executor()); + _strand = std::make_unique>(_queue->getContext().get_executor()); net::spawn(*_strand, [this, address = std::move(address), port](net::yield_context yield) { ScopeGuardAtomicCount const guard{_finishedListenAndRead}; @@ -63,19 +61,17 @@ Ichor::Task Ichor::Boost::WsHostService::stop() { _queue->pushEvent(getServiceId(), conn, true); } - INTERNAL_DEBUG("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! acceptor {}", getServiceId()); - net::spawn(*_strand, [this](net::yield_context yield) { - ScopeGuardAtomicCount const guard{_finishedListenAndRead}; -// fmt::print("----------------------------------------------- {}:{} ws acceptor close\n", getServiceId(), getServiceName()); - _wsAcceptor->close(); -// fmt::print("----------------------------------------------- {}:{} ws acceptor done\n", getServiceId(), getServiceName()); - }ASIO_SPAWN_COMPLETION_TOKEN); + _wsAcceptor->close(); INTERNAL_DEBUG("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! pre-await {} {}", getServiceId(), _startStopEvent.is_set()); co_await _startStopEvent; while(_finishedListenAndRead.load(std::memory_order_acquire) != 0) { - std::this_thread::sleep_for(1ms); + _startStopEvent.reset(); + _queue->pushEvent(getServiceId(), [this]() { + _startStopEvent.set(); + }); + co_await _startStopEvent; } INTERNAL_DEBUG("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! stopped WsHostService {}", getServiceId()); @@ -93,16 +89,16 @@ void Ichor::Boost::WsHostService::removeDependencyInstance(ILogger &logger, ISer _logger = nullptr; } -void Ichor::Boost::WsHostService::addDependencyInstance(IAsioContextService &AsioContextService, IService&) { - _asioContextService = &AsioContextService; +void Ichor::Boost::WsHostService::addDependencyInstance(IBoostAsioQueue &q, IService&) { + _queue = &q; } -void Ichor::Boost::WsHostService::removeDependencyInstance(IAsioContextService&, IService&) { - _asioContextService = nullptr; +void Ichor::Boost::WsHostService::removeDependencyInstance(IBoostAsioQueue&, IService&) { + _queue = nullptr; } Ichor::AsyncGenerator Ichor::Boost::WsHostService::handleEvent(Ichor::NewWsConnectionEvent const &evt) { - if(_quit.load(std::memory_order_acquire)) { + if(_quit) { co_return {}; } @@ -125,19 +121,16 @@ uint64_t Ichor::Boost::WsHostService::getPriority() { void Ichor::Boost::WsHostService::fail(beast::error_code ec, const char *what) { ICHOR_LOG_ERROR(_logger, "Boost.BEAST fail: {}, {}", what, ec.message()); - INTERNAL_DEBUG("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! push {}", getServiceId()); - _queue->pushPrioritisedEvent(getServiceId(), INTERNAL_EVENT_PRIORITY, [this]() { - INTERNAL_DEBUG("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! _startStopEvent set {}", getServiceId()); - _startStopEvent.set(); - }); _queue->pushPrioritisedEvent(getServiceId(), _priority, getServiceId()); + INTERNAL_DEBUG("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! _startStopEvent set {}", getServiceId()); + _startStopEvent.set(); } void Ichor::Boost::WsHostService::listen(tcp::endpoint endpoint, net::yield_context yield) { beast::error_code ec; - _wsAcceptor = std::make_unique(*_asioContextService->getContext()); + _wsAcceptor = std::make_unique(_queue->getContext()); _wsAcceptor->open(endpoint.protocol(), ec); if(ec) { return fail(ec, "open"); @@ -158,16 +151,15 @@ void Ichor::Boost::WsHostService::listen(tcp::endpoint endpoint, net::yield_cont return fail(ec, "listen"); } - while(!_quit && !_asioContextService->fibersShouldStop()) - { - tcp::socket socket(*_asioContextService->getContext()); + while(!_quit && !_queue->fibersShouldStop()) { + tcp::socket socket(_queue->getContext()); // tcp accept new connections _wsAcceptor->async_accept(socket, yield[ec]); if(ec) { return fail(ec, "accept"); } - if(_quit.load(std::memory_order_acquire)) { + if(_quit) { break; } @@ -177,8 +169,5 @@ void Ichor::Boost::WsHostService::listen(tcp::endpoint endpoint, net::yield_cont } INTERNAL_DEBUG("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! push2 {}", getServiceId()); - _queue->pushPrioritisedEvent(getServiceId(), 1000, [this]() { - INTERNAL_DEBUG("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! _startStopEvent set2 {}", getServiceId()); - _startStopEvent.set(); - }); + _startStopEvent.set(); } diff --git a/test/EtcdTests.cpp b/test/EtcdTests.cpp index a7d90f68..7549f19a 100644 --- a/test/EtcdTests.cpp +++ b/test/EtcdTests.cpp @@ -1,12 +1,11 @@ #ifdef ICHOR_USE_ETCD -#include +#include #include #include #include #include #include -#include #include #include "TestServices/Etcdv2UsingService.h" #include "TestServices/Etcdv3UsingService.h" @@ -24,7 +23,7 @@ using namespace Ichor; TEST_CASE("EtcdTests") { SECTION("v2") { - auto queue = std::make_unique(); + auto queue = std::make_unique(); auto &dm = queue->createManager(); std::thread t([&]() { @@ -33,7 +32,6 @@ TEST_CASE("EtcdTests") { dm.createServiceManager(); #endif dm.createServiceManager, ILoggerFactory>(Properties{{"DefaultLogLevel", Ichor::make_any(LogLevel::LOG_TRACE)}}); - dm.createServiceManager(); dm.createServiceManager, IClientFactory>(); dm.createServiceManager(Properties{{"Address", Ichor::make_any("127.0.0.1")}, {"Port", Ichor::make_any(static_cast(2379))}, {"TimeoutMs", Ichor::make_any(1'000ul)}, {"Debug", Ichor::make_any(true)}}); dm.createServiceManager(Properties{{"LogLevel", Ichor::make_any(LogLevel::LOG_TRACE)}}); @@ -46,7 +44,7 @@ TEST_CASE("EtcdTests") { } SECTION("v3") { - auto queue = std::make_unique(); + auto queue = std::make_unique(); auto &dm = queue->createManager(); std::thread t([&]() { @@ -55,7 +53,6 @@ TEST_CASE("EtcdTests") { dm.createServiceManager(); #endif dm.createServiceManager, ILoggerFactory>(Properties{{"DefaultLogLevel", Ichor::make_any(LogLevel::LOG_TRACE)}}); - dm.createServiceManager(); dm.createServiceManager, IClientFactory>(); dm.createServiceManager(Properties{{"Address", Ichor::make_any("127.0.0.1")}, {"Port", Ichor::make_any(static_cast(2379))}, {"TimeoutMs", Ichor::make_any(1'000ul)}, {"Debug", Ichor::make_any(true)}}); dm.createServiceManager(Properties{{"LogLevel", Ichor::make_any(LogLevel::LOG_TRACE)}}); diff --git a/test/HttpTests.cpp b/test/HttpTests.cpp index f3035e00..7826ab33 100644 --- a/test/HttpTests.cpp +++ b/test/HttpTests.cpp @@ -1,4 +1,3 @@ -#include #include #include #include @@ -14,11 +13,10 @@ #ifdef TEST_BOOST #include #include -#include -#include +#include -#define QIMPL PriorityQueue +#define QIMPL BoostAsioQueue #define HTTPHOSTIMPL Boost::HttpHostService #define HTTPCONNIMPL Boost::HttpConnectionService #elif defined(TEST_URING) @@ -77,7 +75,7 @@ TEST_CASE("HttpTests_boost") { #if defined(TEST_URING) auto queue = std::make_unique(500, 100'000'000, emulateKernelVersion); #else - auto queue = std::make_unique(500, true); + auto queue = std::make_unique(500); #endif auto &dm = queue->createManager(); evtGate = false; @@ -92,11 +90,6 @@ TEST_CASE("HttpTests_boost") { dm.createServiceManager, ILoggerFactory>(Properties{{"DefaultLogLevel", Ichor::make_any(LogLevel::LOG_TRACE)}}); dm.createServiceManager>(); dm.createServiceManager>(); -#ifdef TEST_BOOST - dm.createServiceManager(); -#else - -#endif dm.createServiceManager(Properties{{"Address", Ichor::make_any("127.0.0.1")}, {"Port", Ichor::make_any(static_cast(8001))}}); dm.createServiceManager, IClientFactory>(); #ifdef TEST_URING @@ -127,7 +120,7 @@ TEST_CASE("HttpTests_boost") { SECTION("Https events on same thread") { testThreadId = std::this_thread::get_id(); _evt = std::make_unique(); - auto queue = std::make_unique(true); + auto queue = std::make_unique(true); auto &dm = queue->createManager(); evtGate = false; @@ -224,7 +217,6 @@ TEST_CASE("HttpTests_boost") { dm.createServiceManager, ILoggerFactory>(); dm.createServiceManager>(); dm.createServiceManager>(); - dm.createServiceManager(); dm.createServiceManager(Properties{{"Address", Ichor::make_any("127.0.0.1")}, {"Port", Ichor::make_any(static_cast(8001))}, {"SslKey", Ichor::make_any(key)}, {"SslCert", Ichor::make_any(cert)}}); dm.createServiceManager, IClientFactory>(); dm.createServiceManager(Properties{{"Address", Ichor::make_any("127.0.0.1")}, {"Port", Ichor::make_any(static_cast(8001))}, {"ConnectOverSsl", Ichor::make_any(true)}, {"RootCA", Ichor::make_any(cert)}}); @@ -246,42 +238,5 @@ TEST_CASE("HttpTests_boost") { t.join(); } - - SECTION("Http events on 4 threads") { - testThreadId = std::this_thread::get_id(); - _evt = std::make_unique(); - auto queue = std::make_unique(true); - auto &dm = queue->createManager(); - evtGate = false; - - std::thread t([&]() { - dmThreadId = std::this_thread::get_id(); - - dm.createServiceManager({}, 10); - dm.createServiceManager, ILoggerFactory>(); - dm.createServiceManager>(); - dm.createServiceManager>(); - dm.createServiceManager(Properties{{"Threads", Ichor::make_any(4ul)}}); - dm.createServiceManager(Properties{{"Address", Ichor::make_any("127.0.0.1")}, {"Port", Ichor::make_any(static_cast(8001))}}); - dm.createServiceManager, IClientFactory>(); - dm.createServiceManager(Properties{{"Address", Ichor::make_any("127.0.0.1")}, {"Port", Ichor::make_any(static_cast(8001))}}); - - queue->start(CaptureSigInt); - }); - - while(!evtGate) { - std::this_thread::sleep_for(500us); - } - - queue->pushEvent(0, [&]() { - REQUIRE(Ichor::Detail::_local_dm == &dm); - REQUIRE(testThreadId != std::this_thread::get_id()); - REQUIRE(dmThreadId == std::this_thread::get_id()); - fmt::print("3 _evt set\n"); - _evt->set(); - }); - - t.join(); - } #endif } diff --git a/test/QueueTests.cpp b/test/QueueTests.cpp index 68b28049..5cd381b4 100644 --- a/test/QueueTests.cpp +++ b/test/QueueTests.cpp @@ -13,6 +13,9 @@ #ifdef __linux__ #include #endif +#ifdef ICHOR_USE_BOOST_BEAST +#include +#endif TEST_CASE("QueueTests") { @@ -125,6 +128,58 @@ TEST_CASE("QueueTests") { } #endif +#ifdef ICHOR_USE_BOOST_BEAST + SECTION("BoostAsioQueue") { + auto queue = std::make_unique(); + auto &dm = queue->createManager(); + + Detail::_local_dm = &dm; + + // REQUIRE_THROWS(queue->pushEventInternal(0, nullptr)); + // REQUIRE_THROWS(queue->empty()); + // REQUIRE_THROWS(queue->size()); + + REQUIRE(queue->empty()); + REQUIRE(queue->size() == 0); + REQUIRE(!queue->shouldQuit()); + + queue->quit(); + + REQUIRE(queue->shouldQuit()); + } + + SECTION("BoostAsioQueue Live") { + std::atomic _dm{}; + std::thread t([&] { + auto queue = std::make_unique(); + auto &dm = queue->createManager(); + _dm.store(&dm, std::memory_order_release); + + dm.createServiceManager(); + dm.createServiceManager(); + queue->start(DoNotCaptureSigInt); + }); + + while(_dm.load(std::memory_order_acquire) == nullptr) { + std::this_thread::sleep_for(1ms); + } + auto *dm = _dm.load(std::memory_order_acquire); + + dm->runForOrQueueEmpty(); + + REQUIRE(dm->isRunning()); + + try { + dm->getEventQueue().pushEvent(0); + } catch(const std::exception &e) { + fmt::print("exception: {}\n", e.what()); + throw; + } + + t.join(); + } +#endif + // running this inside docker for aarch64 may cause problems, but we still want to test compilation on those setups #if defined(ICHOR_USE_LIBURING) && !(defined(ICHOR_SKIP_EXTERNAL_TESTS) && defined(ICHOR_AARCH64)) SECTION("IOUringQueue") {