From 759845bc422389a3a7bb41f92f9fceeec2afd7bd Mon Sep 17 00:00:00 2001 From: Masaaki HAMADA Date: Thu, 15 Aug 2024 01:20:32 +0900 Subject: [PATCH] unit test: queue --- .github/workflows/ci.yml | 38 +- .github/workflows/sanitizer.yml | 63 + meson.build | 2 +- src/meson.build | 42 +- src/pt/pf_base.h | 162 +++ src/pt/pf_bounded_spsc_zero_copy.h | 494 ++++++++ src/pt/pf_bounded_spsc_zero_copy_test.cpp | 1267 +++++++++++++++++++++ src/pt/pf_common_headers.h | 60 + src/pt/pf_mpsc_ringbuffer.h | 232 ++++ src/pt/pf_mpsc_ringbuffer_test.cpp | 207 ++++ src/zaplog_test.cpp | 4 + 11 files changed, 2549 insertions(+), 22 deletions(-) create mode 100644 .github/workflows/sanitizer.yml create mode 100644 src/pt/pf_base.h create mode 100644 src/pt/pf_bounded_spsc_zero_copy.h create mode 100644 src/pt/pf_bounded_spsc_zero_copy_test.cpp create mode 100644 src/pt/pf_common_headers.h create mode 100644 src/pt/pf_mpsc_ringbuffer.h create mode 100644 src/pt/pf_mpsc_ringbuffer_test.cpp diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d15c1d8..c0aec79 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -14,25 +14,27 @@ jobs: fail-fast: false matrix: - os: [ubuntu-latest, windows-latest] + # as of 2024-08-15 + os: [ubuntu-24.04, windows-2022] + #os: [ubuntu-latest, windows-latest] build_type: [debugoptimized] c_compiler: [gcc, clang, cl] include: - - os: windows-latest + - os: windows-2022 c_compiler: cl cpp_compiler: cl - - os: ubuntu-latest + - os: ubuntu-24.04 c_compiler: gcc cpp_compiler: g++ - - os: ubuntu-latest + - os: ubuntu-24.04 c_compiler: clang cpp_compiler: clang++ exclude: - - os: windows-latest + - os: windows-2022 c_compiler: gcc - - os: windows-latest + - os: windows-2022 c_compiler: clang - - os: ubuntu-latest + - os: ubuntu-24.04 c_compiler: cl steps: @@ -42,7 +44,7 @@ jobs: python-version: '3.x' - name: Prepare msbuild - if: contains(matrix.os, 'windows-latest') + if: contains(matrix.os, 'windows') uses: microsoft/setup-msbuild@v2 with: msbuild-architecture: x64 @@ -50,6 +52,14 @@ jobs: - name: Install Python Dependencies run: pip install meson ninja + - name: Compiler version + if: ${{ !contains(matrix.os, 'windows') }} + run: | + dpkg -l | grep gcc + gcc --version + dpkg -l | grep clang + clang --version + - name: Set reusable strings id: strings shell: bash @@ -61,15 +71,19 @@ jobs: echo "package_test=${{ github.workspace }}/package_test" >> "$GITHUB_OUTPUT" - name: Prepare Build for Windows - if: contains(matrix.os, 'windows-latest') + if: contains(matrix.os, 'windows') run: > meson setup --buildtype=${{ matrix.build_type }} --prefix='${{ steps.strings.outputs.installdir_abs }}' + --backend=vs ${{ steps.strings.outputs.builddir }} ${{ github.workspace }} + env: + CC: ${{ matrix.c_compiler }} + CXX: ${{ matrix.cpp_compiler }} - name: Prepare Build - if: ${{ !contains(matrix.os, 'windows-latest') }} + if: ${{ !contains(matrix.os, 'windows') }} run: > meson setup --buildtype=${{ matrix.build_type }} @@ -89,14 +103,14 @@ jobs: # run: meson install -C ${{ steps.strings.outputs.builddir }} # # - name: Package Test for Windows -# if: contains(matrix.os, 'windows-latest') +# if: contains(matrix.os, 'windows') # run: | # meson setup --buildtype=${{ matrix.build_type }} '-Dpackage_install_dir=${{ steps.strings.outputs.installdir }}' ${{ steps.strings.outputs.builddir_pkg }} ${{ steps.strings.outputs.package_test }} # meson compile -v -C ${{ steps.strings.outputs.builddir_pkg }} # meson test -v -C ${{ steps.strings.outputs.builddir_pkg }} # # - name: Package Test -# if: ${{ !contains(matrix.os, 'windows-latest') }} +# if: ${{ !contains(matrix.os, 'windows') }} # run: | # meson setup --buildtype ${{ matrix.build_type }} '-Dpackage_install_dir=${{ steps.strings.outputs.installdir }}' ${{ steps.strings.outputs.builddir_pkg }} ${{ steps.strings.outputs.package_test }} --pkg-config-path '${{ steps.strings.outputs.installdir }}/lib/x86_64-linux-gnu/pkgconfig' # meson compile -v -C ${{ steps.strings.outputs.builddir_pkg }} diff --git a/.github/workflows/sanitizer.yml b/.github/workflows/sanitizer.yml new file mode 100644 index 0000000..7bf9cfb --- /dev/null +++ b/.github/workflows/sanitizer.yml @@ -0,0 +1,63 @@ +name: Sanitize + +on: + push: + branches: [ "xxx" ] +# branches: [ "br_*" ] +# pull_request: +# branches: [ "main" ] + +jobs: + sanitize: + runs-on: ${{ matrix.os }} + + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest] + build_type: [debugoptimized] + c_compiler: [gcc] + sanitize: [address, undefined, thread, leak] + include: + - c_compiler: gcc + cpp_compiler: g++ + + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: '3.x' + + - name: Install Python Dependencies + run: pip install meson ninja + + - name: Set reusable strings + id: strings + shell: bash + run: | + echo "builddir=${{ github.workspace }}/builddir" >> "$GITHUB_OUTPUT" + echo "result_file=sanitize_${{ matrix.sanitize }}.txt" >> "$GITHUB_OUTPUT" + + - name: Prepare Build + if: ${{ !contains(matrix.os, 'windows-latest') }} + run: > + meson setup + --buildtype=${{ matrix.build_type }} + -Db_sanitize=${{ matrix.sanitize }} + ${{ steps.strings.outputs.builddir }} ${{ github.workspace }} + env: + CC: ${{ matrix.c_compiler }} + CXX: ${{ matrix.cpp_compiler }} + + - name: Run Build + run: meson compile -v -C ${{ steps.strings.outputs.builddir }} + + - name: Run Test Suite + run: meson test -v -C ${{ steps.strings.outputs.builddir }} 2>&1 | tee ${{ steps.strings.outputs.result_file }} + + #- name: Upload results + # uses: actions/upload-artifact@v4 + # with: + # name: sanitize-results-${{ matrix.sanitize }} + # path: ${{ steps.strings.outputs.result_file }} + # overwrite: true \ No newline at end of file diff --git a/meson.build b/meson.build index 710c08f..4114b32 100644 --- a/meson.build +++ b/meson.build @@ -1,7 +1,7 @@ project('zaplog', 'cpp', version : '0.1', - default_options : ['warning_level=3', 'cpp_std=c++20']) + default_options : ['warning_level=2', 'cpp_std=c++20']) zaplog_build_tests = get_option('zaplog_build_tests') diff --git a/src/meson.build b/src/meson.build index bf7b46c..2c9fd5f 100644 --- a/src/meson.build +++ b/src/meson.build @@ -24,13 +24,18 @@ git_date_h = vcs_tag( replace_string : '@git_date@' ) -lib_args = ['-DZAPLOG_BUILDING', '-DZAPLOG_BUILDING_SLIB'] +if meson.get_compiler('cpp').get_id() == 'msvc' + extra_args = ['/utf-8'] +else + extra_args = [] +endif +lib_args = ['-DZAPLOG_BUILDING', '-DZAPLOG_BUILDING_SLIB'] + extra_args slib_sources = [ config_h, git_hash_h, git_date_h, - 'zaplog.cpp' + 'zaplog.cpp', ] slib = static_library( @@ -43,14 +48,33 @@ slib = static_library( gtest_dep = dependency('gtest', required : true) gtest_main_dep = dependency('gtest_main', required : true) -test_args = ['-DZAPLOG_BUILDING', '-DZAPLOG_BUILDING_SLIB'] -test_exe = executable( - 'zaplog_test', + +unit_test_args = ['-DZAPLOG_BUILDING', '-DZAPLOG_BUILDING_SLIB', '-DLIBPT_ENABLE_BUILD_TESTS'] + extra_args +unit_test_sources = [ + 'pt/pf_bounded_spsc_zero_copy_test.cpp', + 'pt/pf_mpsc_ringbuffer_test.cpp' +] +unit_test_exe = executable( + 'unit_test', + sources : unit_test_sources, + dependencies : [ gtest_dep, gtest_main_dep, ], + cpp_args : unit_test_args +) +test('unittest', unit_test_exe) + + +api_test_args = ['-DZAPLOG_BUILDING', '-DZAPLOG_BUILDING_SLIB' ] + extra_args +api_test_sources = [ 'zaplog_test.cpp', +] +api_test_exe = executable( + 'api_test', + sources : api_test_sources, dependencies : [ gtest_dep, gtest_main_dep, ], link_with : slib, - cpp_args : test_args) -test('zaplog', test_exe) + cpp_args : api_test_args) +test('apitest', api_test_exe) + # Make this library usable as a Meson subproject. zaplog_dep = declare_dependency( @@ -65,8 +89,8 @@ pkg_mod = import('pkgconfig') pkg_mod.generate( name : 'zaplog', filebase : 'zaplog', - description : 'Meson sample project.', + description : 'Zaplog: A C++ logging library.', subdirs : 'zaplog', libraries : slib, - version : '0.1', + version : meson.project_version(), ) diff --git a/src/pt/pf_base.h b/src/pt/pf_base.h new file mode 100644 index 0000000..c6ab26e --- /dev/null +++ b/src/pt/pf_base.h @@ -0,0 +1,162 @@ +/* Copyright (c) 2024 Masaaki Hamada + * Distributed under the MIT License (http://opensource.org/licenses/MIT) + */ + +#pragma once + +#include "pf_common_headers.h" + +#ifdef NDEBUG +#define PF_BUILD_RELEASE +#else +#define PF_BUILD_DEBUG +#endif + +#if defined(_WIN32) +#define PF_COMPILER_MSVC +#elif defined(__GNUC__) +#define PF_COMPILER_GCC +#endif + +#if defined(__GNUC__) +#define PF_ALWAYS_INLINE inline __attribute__((__always_inline__)) +#elif defined(_WIN32) +#define PF_ALWAYS_INLINE __forceinline +#else +#define PF_ALWAYS_INLINE inline +#endif + +#if defined(__GNUC__) +#define PT_UNREACHABLE() __builtin_unreachable() +#elif defined(_WIN32) +#define PT_UNREACHABLE() __assume(0) +#else +#define PT_UNREACHABLE() abort(); +// #define PT_UNREACHABLE() std::unreachable() +#endif + +#define PF_CASE_TO_TSTRING(p, x) \ + case x: \ + p = _T(#x); \ + break; +#define PF_DEFAULT_TO_TSTRING(p, x) \ + default: \ + p = x; \ + break; +#define PF_CASE_TO_STRING(p, x) \ + case x: \ + p = #x; \ + break; +#define PF_DEFAULT_TO_STRING(p, x) \ + default: \ + p = x; \ + break; + +#define PF_CONCAT_NAME(prefix, line) prefix##_pf_unique_var_##line +#define PF_MAKE_VARNAME(prefix, line) PF_CONCAT_NAME(prefix, line) +#define PF_VAR(prefix) PF_MAKE_VARNAME(prefix, __LINE__) + +// see DEFINE_ENUM_FLAG_OPERATORS in winnt.h" +#define PF_DEFINE_BIT_OPERATORS(Type) \ + inline Type operator&(Type a, Type b) \ + { \ + using IntType = std::underlying_type::type; \ + return static_cast(static_cast(a) & static_cast(b)); \ + } \ + inline Type operator|(Type a, Type b) \ + { \ + using IntType = std::underlying_type::type; \ + return static_cast(static_cast(a) | static_cast(b)); \ + } \ + inline Type& operator|=(Type& a, Type b) \ + { \ + a = a | b; \ + return a; \ + }; \ + inline Type& operator&=(Type& a, Type b) \ + { \ + a = a & b; \ + return a; \ + }; \ + inline Type operator~(Type a) \ + { \ + using IntType = std::underlying_type::type; \ + return static_cast(~static_cast(a)); \ + } \ + inline bool is_set(Type val, Type flag) \ + { \ + return (val & flag) != static_cast(0); \ + } \ + inline void flip_bit(Type& val, Type flag) \ + { \ + val = is_set(val, flag) ? (val & (~flag)) : (val | flag); \ + } + +namespace pf { +namespace fw_ { + +class NonCopyable +{ + protected: + constexpr NonCopyable(void) = default; + ~NonCopyable(void) = default; + constexpr NonCopyable(NonCopyable&&) = default; + constexpr NonCopyable& operator=(NonCopyable&&) = default; + + private: + // note that the following functions are private + NonCopyable(const NonCopyable&) = delete; + NonCopyable& operator=(const NonCopyable&) = delete; +}; + +/* + * go like defer for c++ + * + * usege: + * auto a = openSomeghing(); + * pf_make_defer { closeSomething(a); }; // semicolon needed + */ +template +class Defer +{ + private: + Functor m_func; + + public: + explicit Defer(Functor&& func) noexcept + : m_func{ std::forward(func) } + { + } + ~Defer(void) noexcept { m_func(); } + Defer(const Defer&) = delete; + Defer& operator=(Defer&& rhs) = delete; + Defer& operator=(const Defer&) = delete; +}; + +class DeferMakeHelper final +{ + public: + template + Defer operator+(Functor&& func) + { + return Defer{ std::forward(func) }; + } +}; + +} // namespace fw_ { + +#if defined(_WIN32) +#define SWITCH_TO_THREAD() SwitchToThread() +#else +#define WCHAR wchar_t +#define SWITCH_TO_THREAD() std::this_thread::yield() +#endif + +using NonCopyable = fw_::NonCopyable; +using DeferMakeHelper = fw_::DeferMakeHelper; +using PathString = std::variant, std::basic_string>; + +} // namespace pf + +#define pf_make_defer \ + auto PF_MAKE_VARNAME(defer_, __LINE__) = pf::DeferMakeHelper{} + [&](void) noexcept -> void diff --git a/src/pt/pf_bounded_spsc_zero_copy.h b/src/pt/pf_bounded_spsc_zero_copy.h new file mode 100644 index 0000000..4f5cd7d --- /dev/null +++ b/src/pt/pf_bounded_spsc_zero_copy.h @@ -0,0 +1,494 @@ +/* Copyright (c) 2024 Masaaki Hamada + * Distributed under the MIT License (http://opensource.org/licenses/MIT) + */ + +#pragma once + +#include "pf_base.h" +#include + +namespace pf { + +// ElementTypeの型の配列を格納するQueue +// 一度のpushごとにElementAlignmentにアライメントをそろえる +template +class BoundedSpscZeroCopy : private pf::NonCopyable +{ + protected: + static_assert(std::hardware_destructive_interference_size == 64); + alignas(std::hardware_destructive_interference_size) std::atomic m_write_ctx = 0; + struct + { + uint64_t write_ctx; + int32_t write_index2; + int32_t read_end_index2; + int32_t read_index; + int32_t avail; + } m_write_im{}; + struct + { + int32_t wait_count; + int32_t insufficient; + int32_t max_read_end_index; + } m_write_stats{}; + + alignas(std::hardware_destructive_interference_size) std::atomic m_read_index = 0; + struct + { + int32_t read_index; + int32_t read_index2; + int32_t write_index; + int32_t read_end_index; + int32_t avail; + } m_read_im{}; + std::unique_ptr m_bufferPtr; + ElementType* m_buffer; + const int32_t kMaxSize; + +#ifdef LIBPT_ENABLE_BUILD_TESTS + std::atomic m_waiter_count = 0; +#endif + + private: + static constexpr size_t calcSafeElementArraySize(size_t queueSize) noexcept + { + // queueSizeは要素数(not byte数) + auto eleAlign = alignof(ElementType); + auto reqAlign = ElementAlignment; + if (reqAlign <= eleAlign) { + return queueSize; + } + // 例えばcharで書き込むが、書き込んだ跡はElementAlignmentにアライメントさせたい + // + return queueSize + (reqAlign / eleAlign - 1); + } + static constexpr bool isAligned(void* ptr, size_t alignment) noexcept + { + auto intptr = reinterpret_cast(ptr); + return (intptr % alignment) == 0; + } + static ElementType* toAlignedElement(ElementType* ptr) noexcept + { + auto eleAlign = alignof(ElementType); + auto reqAlign = ElementAlignment; + auto n = (reqAlign / eleAlign - 1); + for (auto i = 0; i < n; ++i) { + if (isAligned(ptr, ElementAlignment)) { + return ptr; + } + ++ptr; + } + PT_UNREACHABLE(); + return nullptr; + } + + public: + BoundedSpscZeroCopy(int32_t queueSize) noexcept + : m_bufferPtr{ std::make_unique(calcSafeElementArraySize(queueSize)) } + , kMaxSize(queueSize) + { + m_buffer = toAlignedElement(m_bufferPtr.get()); + } + ~BoundedSpscZeroCopy(void) noexcept { this->cancel(); } + BoundedSpscZeroCopy(BoundedSpscZeroCopy&&) = delete; + BoundedSpscZeroCopy& operator=(BoundedSpscZeroCopy&&) = delete; + + void cancel(void) noexcept + { + for (;;) { + auto ctx = m_write_ctx.load(std::memory_order_relaxed); + auto [index0, index1] = decode_ctx(ctx); + if (0 > index0) { + break; + } + auto newCtx = encode_ctx(-1, -1); + auto result = m_write_ctx.compare_exchange_strong(/*expected*/ ctx, + /*desired */ newCtx); + if (result) { + m_write_ctx.notify_one(); + break; + } + } + for (;;) { + auto index = m_read_index.load(std::memory_order_relaxed); + if (0 > index) { + break; + } + auto result = m_read_index.compare_exchange_strong(/*expected*/ index, + /*desired */ -1); + if (result) { + m_read_index.notify_one(); + break; + } + } + } + + std::pair getWritePtr(int32_t wantSize) noexcept + { + if (kMaxSize / 2 < wantSize) { + return { nullptr, -1 }; + } + auto write_ctx = + m_write_ctx.load(std::memory_order_relaxed); // only written from writer thread + auto [write_index, read_end_index] = decode_ctx(write_ctx); + if (0 > write_index) { + return { nullptr, -1 }; // canceled + } + for (;;) { + // synchronize-with & happens-before + // readerとwriterのm_read_indexとm_write_ctxで互いに + // synchronize-with & happens-beforeの関係を作って更新していく + // + // writer reader + // 1.m_read_index.load + // 2.write m_buffer -----+ + // 3.m_write_ctx.store => | + // | 1.m_write_ctx.load + // +->2.read m_buffer + // <= | 3.m_read_index.store + // 1.m_read_index.load | + // 2.write m_buffer <----+ + // 3.m_write_ctx.store | + // 4.m_read_index.load | #writerが連続する場合を例示 + // 5.write m_buffer <----+ + // 6.m_write_ctx.store => | + // | 1.m_write_ctx.load + // +->2.read m_buffer + // <= 3.m_read_index.store + // ..... + // =>: synchronize-with + // -->: happens-before + auto read_index = m_read_index.load(std::memory_order_acquire); + if (0 > read_index) { + break; // canceled + } + + auto [avail, updated_write_index] = + this->check_write_available(write_index, read_index, kMaxSize); + decltype(write_index) write_index2; + decltype(read_end_index) read_end_index2; + if (0 <= updated_write_index) { + // flip to back side + write_index2 = updated_write_index; + read_end_index2 = write_index; + } else { + write_index2 = write_index; + read_end_index2 = read_end_index; + } + if (0 < avail && wantSize <= avail) { + m_write_im.write_ctx = write_ctx; + m_write_im.write_index2 = write_index2; + m_write_im.read_end_index2 = read_end_index2; + m_write_im.read_index = read_index; + m_write_im.avail = avail; + auto* ptr = &m_buffer[write_index2]; + return { ptr, avail }; + } + if (0 <= updated_write_index) { + // waitする前に一旦ここで m_write_ctx を更新する + // read pointerが進む可能性がある + auto write_ctx2 = encode_ctx(write_index2, read_end_index2); + bool result = this->update_write_ctx(write_ctx, write_ctx2); + if (!result) { + break; // canceld + } + write_ctx = write_ctx2; + write_index = write_index2; + read_end_index = read_end_index2; + } + ++m_write_stats.insufficient; + if (0 >= wantSize) { // wantSize==0はnon-blocking呼び出しの意味になる + return { nullptr, 0 }; + } +#ifdef LIBPT_ENABLE_BUILD_TESTS + ++m_waiter_count; +#endif + ++m_write_stats.wait_count; + m_read_index.wait(read_index, std::memory_order_relaxed); +#ifdef LIBPT_ENABLE_BUILD_TESTS + --m_waiter_count; +#endif + } + return { nullptr, -1 }; // canceled; + } + + int32_t moveWritePtr(int32_t writtenSize) noexcept + { + if (m_write_im.avail < writtenSize) { + return -1; + } + auto read_end_index2 = m_write_im.read_end_index2; + auto new_write_index = m_write_im.write_index2 + writtenSize; // ここでwrap aroundはしない + auto new_read_end_index = + is_front_side(new_write_index, m_write_im.read_index) ? new_write_index : read_end_index2; + m_write_stats.max_read_end_index = + (std::max)(m_write_stats.max_read_end_index, read_end_index2); + auto new_write_ctx = encode_ctx(new_write_index, new_read_end_index); + // if (new_write_index == m_write_im.write_index2) { + if (new_write_ctx == m_write_im.write_ctx) { + __debugbreak(); + } + + auto result = this->update_write_ctx(m_write_im.write_ctx, new_write_ctx); + if (!result) { + return -1; // canceled + } + + m_write_im.avail = 0; + return writtenSize; + } + + std::pair getReadPtr(int32_t wantSize) noexcept + { + if (kMaxSize / 2 < wantSize) { + return { nullptr, -1 }; + } + auto read_index = m_read_index.load(std::memory_order_relaxed); + if (0 > read_index) { + return { nullptr, -1 }; // canceled + } + for (;;) { + auto write_ctx = m_write_ctx.load(std::memory_order_acquire); + auto [write_index, read_end_index] = decode_ctx(write_ctx); + if (0 > write_index) { + break; // canceled + } + + auto [avail, updated_read_index] = + check_read_available(write_index, read_end_index, read_index); + decltype(read_index) read_index2; + if (0 <= updated_read_index) { + // flip to front side + read_index2 = updated_read_index; + } else { + read_index2 = read_index; + } + if (0 < avail && wantSize <= avail) { + m_read_im.read_index = read_index; + m_read_im.read_index2 = read_index2; + m_read_im.write_index = write_index; + m_read_im.read_end_index = read_end_index; + m_read_im.avail = avail; + const auto* ptr = &m_buffer[read_index2]; + return { ptr, avail }; + } + if (0 <= updated_read_index) { + // waitする前に一旦ここで m_read_index を更新する + // write pointerが進む可能性がある + bool result = this->update_read_ctx(read_index, read_index2); + if (!result) { + break; // canceld + } + read_index = read_index2; + } + if (0 >= wantSize) { // wantSize==0はnon-blocking呼び出しの意味になる + return { nullptr, 0 }; + } +#ifdef LIBPT_ENABLE_BUILD_TESTS + ++m_waiter_count; +#endif + m_write_ctx.wait(write_ctx, std::memory_order_relaxed); +#ifdef LIBPT_ENABLE_BUILD_TESTS + --m_waiter_count; +#endif + } + return { nullptr, -1 }; // canceld + } + + int32_t moveReadPtr(int32_t readSize) noexcept + { + if (m_read_im.avail < readSize) { + return -1; + } + auto new_read_index = m_read_im.read_index2 + readSize; + if (!is_front_side(m_read_im.write_index, m_read_im.read_index2)) { + if (m_read_im.read_end_index <= new_read_index) { + // flip to front side + new_read_index = 0; + } + } + + auto result = this->update_read_ctx(m_read_im.read_index, new_read_index); + if (!result) { + return -1; // canceled + } + m_read_im.avail = 0; + return readSize; + } + + void waitUntilEmptyForWriter(void) noexcept + { + auto write_ctx = m_write_ctx.load(std::memory_order_relaxed); + auto [write_index, read_end_index] = decode_ctx(write_ctx); + if (0 > write_index) { + return; // canceled + } + for (;;) { + auto read_index = m_read_index.load(std::memory_order_acquire); + if (0 > read_index) { + break; // canceled + } + + auto empty = is_empty(write_index, read_end_index, read_index); + if (empty) { + return; + } + +#ifdef LIBPT_ENABLE_BUILD_TESTS + ++m_waiter_count; +#endif + m_read_index.wait(read_index, std::memory_order_relaxed); +#ifdef LIBPT_ENABLE_BUILD_TESTS + --m_waiter_count; +#endif + } + } + + protected: + static std::pair decode_ctx(uint64_t encoded) noexcept + { + int32_t decoded[2]; + memcpy(decoded, &encoded, sizeof(encoded)); + return { decoded[0], decoded[1] }; + } + + static uint64_t encode_ctx(int32_t x, int32_t y) noexcept + { + uint64_t encoded; + int32_t tmp[2] = { x, y }; + memcpy(&encoded, tmp, sizeof(tmp)); + return encoded; + } + + /* + * 状態としては大きく read <= write かどうかでわける + * read <= write をfront side、read > write をback sideと呼ぶことにする + * read == writeの場合にfullなのかemptyかの2通りがありえるが、2通りを許容すると + * lock-free実装が困難になるため read==writeはemptyの場合のみとする + * + * [FrontSide] (r<=w) + * -------------- -------------- -------------- + * + | <-w,r + +* <-r + * | | | |* + * | | |* <-r |* + * | | |* |* + * | | |* |* + * | | | | <-w |* + * | | | | |* + * | | | | |* + * | | | | |* + * + V + V +* + * <-w,end + * # rがtopの場合はwrap aroundしない + * [BackSide] (w check_write_available(int32_t write_index, + int32_t read_index, + int32_t maxSize) noexcept + { + if (is_front_side(write_index, read_index)) { + auto avail = maxSize - write_index; + auto top_area = read_index - 1; + if (avail < top_area) { + // flip to back + return { top_area, /*new_write_index*/ 0 }; + } + return { avail, /*new_write_index*/ -1 }; + } + auto avail = read_index - write_index - 1; + return { avail, /*new_write_index*/ -1 }; + } + + static std::pair check_read_available(int32_t write_index, + int32_t read_end_index, + int32_t read_index) noexcept + { + // writer: backだと思っている(endを更新しない) + // reader: frontに遷移 + // という場合があるのでfront/backで条件分岐して判定する必要がある + if (is_front_side(write_index, read_index)) { + auto avail = write_index - read_index; + return { avail, /*new_read_index*/ -1 }; + } + auto avail = read_end_index - read_index; + _ASSERTE(0 <= avail); + if (0 == avail) { + // flip to front + avail = write_index; + return { avail, /*new_read_index*/ 0 }; + } + return { avail, /*new_read_index*/ -1 }; + } + + bool update_write_ctx(uint64_t currentVal, uint64_t newValue) noexcept + { + for (;;) { + auto expected = currentVal; + auto success = m_write_ctx.compare_exchange_weak(/*expected*/ expected, + /*desired*/ newValue, + std::memory_order_release); + if (success) { + break; + } + if (0 > expected) { + // canceled + return false; + } + } + m_write_ctx.notify_one(); + return true; + } + + bool update_read_ctx(int32_t currentVal, int32_t newValue) noexcept + { + for (;;) { + auto expected = currentVal; + auto success = m_read_index.compare_exchange_weak(/*expected*/ expected, + /*desired*/ newValue, + std::memory_order_release); + if (success) { + break; + } + if (0 > expected) { + // canceled + return false; + } + } + m_read_index.notify_one(); + return true; + } + + static bool is_empty(int32_t write_index, int32_t read_end_index, int32_t read_index) noexcept + { + if (is_front_side(write_index, read_index)) { + auto is_empty = (write_index == read_index); + return is_empty; + } + auto is_empty = (read_end_index == read_index); + return is_empty; + } + +#ifdef LIBPT_ENABLE_BUILD_TESTS + int32_t waiterCount(void) noexcept { return m_waiter_count.load(); } + friend class BoundedSpscZeroCopyTest; +#endif +}; + +} // namespace pf diff --git a/src/pt/pf_bounded_spsc_zero_copy_test.cpp b/src/pt/pf_bounded_spsc_zero_copy_test.cpp new file mode 100644 index 0000000..a4f05cd --- /dev/null +++ b/src/pt/pf_bounded_spsc_zero_copy_test.cpp @@ -0,0 +1,1267 @@ +/* Copyright (c) 2024 Masaaki Hamada + * Distributed under the MIT License (http://opensource.org/licenses/MIT) + */ + +#define GTEST_DONT_DEFINE_TEST 1 +#include + +#include "pf_bounded_spsc_zero_copy.h" +#include + +namespace pf { + +class BoundedSpscZeroCopyTest : public ::testing::Test +{ + public: + static constexpr auto s_max_size = 64; + static constexpr auto s_want_size = 16; + using RingBuffer = pf::BoundedSpscZeroCopy<>; + + public: + static void setIndex(std::unique_ptr& rb, + int32_t write_index, + int32_t read_end_index, + int32_t read_index) + { + rb->m_write_ctx.store(RingBuffer::encode_ctx(write_index, read_end_index)); + rb->m_read_index.store(read_index); + } + enum + { + StateE0, + StateE1, + StateF0, + StateF1, + StateX0, + StateX1, + StateY0, + StateY1, + }; + static int getState(std::unique_ptr& rb) + { + auto write_ctx = rb->m_write_ctx.load(std::memory_order_relaxed); + auto [write_index, read_end_index] = RingBuffer::decode_ctx(write_ctx); + auto read_index = rb->m_read_index.load(std::memory_order_relaxed); + + if (read_index <= write_index) { + // front + if (write_index == read_index) { + if (read_index == 0) { + return StateE0; + } + return StateE1; + } + if (read_index == 0) { + if (write_index == s_max_size) { + return StateF0; + } + return StateX0; + } + return StateX1; + } + + // back + if (read_index - 1 == write_index) { + return StateF1; + } + if (write_index == 0) { + return StateY0; + } + return StateY1; + } + + static int32_t getWaiterCount(std::unique_ptr& rb) { return rb->waiterCount(); } + + // 状態 遷移先(w) 遷移先(r) + // E0:Empty0 r==0,w==r F0,X0, + // E1:Empty1 r!=0,w==r X1,Y1,F1 + // F0:Full0 r==0,w==end E1,X1 + // F1:Full1 r!=0,r-1==w X0,X1,E1 + // X0:Front0 r==0,r makeEmpty0(void) + { + auto rb = std::make_unique(s_max_size); + _ASSERTE(getState(rb) == StateE0); + return rb; + } + static std::unique_ptr makeEmpty1(int32_t wAvail = 10) + { + auto rb = std::make_unique(s_max_size); + auto w = s_max_size - wAvail; + auto r_end = 0; // read_end_indexはfront sideではdon't care + auto r = w; + setIndex(rb, w, r_end, r); + _ASSERTE(getState(rb) == StateE1); + return rb; + } + static std::unique_ptr makeFull0() + { + // Full0は r_end == s_max_size の場合に限定する + // r_end < s_max_size だとX0ともみなせてしまうため + auto rb = std::make_unique(s_max_size); + auto r_end = s_max_size; + setIndex(rb, r_end, r_end, 0); + _ASSERTE(getState(rb) == StateF0); + return rb; + } + static std::unique_ptr makeFull1(int32_t wAvail, + int32_t rAvail = 10, + int32_t tailRoom = 2) + { + _ASSERTE(wAvail == -1); + auto rb = std::make_unique(s_max_size); + auto r_end = s_max_size - tailRoom; + auto r = r_end - rAvail; + auto w = r - 1; + _ASSERTE(0 < r); + setIndex(rb, w, r_end, r); + _ASSERTE(getState(rb) == StateF1); + return rb; + } + static std::unique_ptr makeFront0(int32_t wAvail = 10) + { + auto rb = std::make_unique(s_max_size); + auto w = s_max_size - wAvail; + auto r_end = 0; + auto r = 0; + setIndex(rb, w, r_end, r); + _ASSERTE(getState(rb) == StateX0); + return rb; + } + static std::unique_ptr makeFront1(int32_t wAvail = 10, int32_t rAvail = 10) + { + auto rb = std::make_unique(s_max_size); + auto w = s_max_size - wAvail; + auto r_end = 0; + auto r = w - rAvail; + _ASSERTE(0 < r); + setIndex(rb, w, r_end, r); + _ASSERTE(getState(rb) == StateX1); + return rb; + } + static std::unique_ptr makeBack0(int32_t wAvail, int32_t rAvail = 10) + { + _ASSERTE(wAvail == -1); + auto rb = std::make_unique(s_max_size); + auto w = 0; + auto r_end = s_max_size - 2; + auto r = r_end - rAvail; + setIndex(rb, 0, r_end, r); + _ASSERTE(getState(rb) == StateY0); + return rb; + } + static std::unique_ptr makeBack1(int32_t wAvail = 10, int32_t rAvail = 10) + { + auto rb = std::make_unique(s_max_size); + auto r_end = s_max_size - 2; + auto r = r_end - rAvail; + auto w = r - 1 - wAvail; + _ASSERTE(0 <= w); + _ASSERTE(w < r - 1); + setIndex(rb, w, r_end, r); + _ASSERTE(getState(rb) == StateY1); + return rb; + } +}; + +GTEST_TEST_F(BoundedSpscZeroCopyTest, BasicOperation) +{ + struct Reader + { + Reader(int32_t rsize0 = 0) + : rsize(rsize0) + { + } + int32_t rsize; + const char* p = nullptr; + int32_t avail = 0; + bool called = false; + int32_t operator()(const char* p0, int32_t avail0) + { + p = p0; + avail = avail0; + called = true; + return rsize; + }; + }; + struct Writer + { + Writer(int32_t wsize0 = 0) + : wsize(wsize0) + { + } + int32_t wsize; + char* p = nullptr; + int32_t avail = 0; + bool called = false; + int32_t operator()(char* p0, int32_t avail0) + { + p = p0; + avail = avail0; + called = true; + return wsize; + }; + }; + + auto processReadBuf = [](std::unique_ptr& rb, Reader& reader, int32_t wantSize) { + auto [ptr, size] = rb->getReadPtr(wantSize); + if (ptr) { + auto readSize = reader(ptr, size); + return rb->moveReadPtr(readSize); + } + return size; + }; + auto processWriteBuf = [](std::unique_ptr& rb, Writer& writer, int32_t wantSize) { + auto [ptr, size] = rb->getWritePtr(wantSize); + if (ptr) { + auto writtenSize = writer(ptr, size); + return rb->moveWritePtr(writtenSize); + } + return size; + }; + + { + auto rb = makeEmpty0(); + Reader reader; + auto ret = processReadBuf(rb, reader, s_max_size / 2 + 1); + EXPECT_EQ(ret, -1); + EXPECT_FALSE(reader.called); + } + { + auto rb = makeEmpty0(); + Writer writer; + auto ret = processWriteBuf(rb, writer, s_max_size / 2 + 1); + EXPECT_EQ(ret, -1); + EXPECT_FALSE(writer.called); + } + { + int32_t size = s_max_size / 2; + auto rb = makeEmpty0(); + Writer writer(size); + auto ret = processWriteBuf(rb, writer, size); + EXPECT_EQ(ret, size); + EXPECT_EQ(writer.avail, s_max_size); + + Reader reader(size); + ret = processReadBuf(rb, reader, size); + EXPECT_EQ(ret, size); + EXPECT_EQ(reader.avail, size); + } +} + +GTEST_TEST_F(BoundedSpscZeroCopyTest, StateTransition) +{ + struct Reader + { + Reader(int32_t rsize0 = 0) + : rsize(rsize0) + { + } + int32_t rsize; + const char* p = nullptr; + int32_t num = 0; + bool called = false; + int32_t operator()(const char* p0, int32_t avail0) + { + p = p0; + num = avail0; + called = true; + return rsize; + }; + }; + struct Writer + { + Writer(int32_t wsize0 = 0) + : wsize(wsize0) + { + } + int32_t wsize; + char* p = nullptr; + int32_t num = 0; + bool called = false; + int32_t operator()(char* p0, int32_t avail0) + { + p = p0; + num = avail0; + called = true; + return wsize; + }; + }; + + auto processReadBuf = [](std::unique_ptr& rb, Reader& reader, int32_t wantSize) { + auto [ptr, size] = rb->getReadPtr(wantSize); + if (ptr) { + auto readSize = reader(ptr, size); + return rb->moveReadPtr(readSize); + } + return size; + }; + auto processWriteBuf = [](std::unique_ptr& rb, Writer& writer, int32_t wantSize) { + auto [ptr, size] = rb->getWritePtr(wantSize); + if (ptr) { + auto writtenSize = writer(ptr, size); + return rb->moveWritePtr(writtenSize); + } + return size; + }; + + // + // E0 + // + { + // E0 write-> F0 + auto rb = makeEmpty0(); + Writer writer(s_max_size); + auto ret = processWriteBuf(rb, writer, s_max_size / 2); + EXPECT_EQ(ret, s_max_size); + EXPECT_TRUE(writer.p != nullptr); + EXPECT_EQ(writer.num, s_max_size); + EXPECT_EQ(getState(rb), StateF0); + } + { + // E0 write-> F0 (fail) + Writer writer(s_max_size); + auto rb = makeEmpty0(); + auto ret = processWriteBuf(rb, writer, s_max_size / 2 + 1); + EXPECT_EQ(ret, -1); // この場合は0なくtoo largeエラーになる + EXPECT_FALSE(writer.called); + } + { + // E0 write-> X0 + int32_t wSize = s_max_size / 2; + Writer writer(wSize); + auto rb = makeEmpty0(); + auto ret = processWriteBuf(rb, writer, wSize); + EXPECT_EQ(ret, wSize); + EXPECT_TRUE(writer.p != nullptr); + EXPECT_EQ(writer.num, s_max_size); + EXPECT_EQ(getState(rb), StateX0); + } + { + // E0 read-> N/A + auto rb = makeEmpty0(); + Reader reader; + auto ret = processReadBuf(rb, reader, 0); + EXPECT_EQ(ret, 0); + EXPECT_FALSE(reader.called); + EXPECT_EQ(getState(rb), StateE0); + } + + // + // E1 + // + { + // E1 write-> X1 + int32_t wAvail = 50; // max_size 64 + int32_t wSize = 10; + auto rb = makeEmpty1(wAvail); + Writer writer(wSize); + auto ret = processWriteBuf(rb, writer, wSize); + EXPECT_EQ(ret, wSize); + EXPECT_TRUE(writer.p != nullptr); + EXPECT_EQ(writer.num, wAvail); + EXPECT_EQ(getState(rb), StateX1); + } + { + // E1 write-> X1 + // write_indexがmax_sizeになる (X1のテストでカバー) + int32_t wAvail = 50; // max_size 64 + int32_t wSize = wAvail; + auto rb = makeEmpty1(wAvail); + Writer writer(wSize); + auto ret = processWriteBuf(rb, writer, /*wSize*/ s_max_size / 2); + EXPECT_EQ(ret, wSize); + EXPECT_TRUE(writer.p != nullptr); + EXPECT_EQ(writer.num, wAvail); + EXPECT_EQ(getState(rb), StateX1); + } + { + // E1 write-> Y1 + // これが起こるのはr==w==max_sizeの場合のみ + int32_t wAvail = 0; + int32_t wSize = 10; + auto rb = makeEmpty1(wAvail); + Writer writer(wSize); + auto ret = processWriteBuf(rb, writer, wSize); + EXPECT_EQ(ret, wSize); + EXPECT_TRUE(writer.p != nullptr); + EXPECT_EQ(writer.num, s_max_size - 1); + EXPECT_EQ(getState(rb), StateY1); + } + { + // E1 write-> F1 + // これが起こるのはr==w==max_sizeの場合のみ + int32_t wAvail = 0; + int32_t wSize = s_max_size - 1; + auto rb = makeEmpty1(wAvail); + Writer writer(wSize); + auto ret = processWriteBuf(rb, writer, /*wSize*/ s_max_size / 2); + EXPECT_EQ(ret, wSize); + EXPECT_TRUE(writer.p != nullptr); + EXPECT_EQ(writer.num, s_max_size - 1); + EXPECT_EQ(getState(rb), StateF1); + } + { + // E1 write-> F1 (fail) + // これが起こるのはr==w==max_sizeの場合のみ + // wantSizeの上限をs_max_size/2にしたのでこのテストの意味がない + int32_t wAvail = 0; + int32_t wSize = s_max_size; + auto rb = makeEmpty1(wAvail); + Writer writer(1); + auto ret = processWriteBuf(rb, writer, /*wSize*/ s_max_size / 2 + 1); + EXPECT_EQ(ret, -1); + EXPECT_FALSE(writer.called); + EXPECT_EQ(getState(rb), StateE1); + } + { + // E1 read-> N/A + int32_t wAvail = 10; + auto rb = makeEmpty1(wAvail); + Reader reader(1); + auto ret = processReadBuf(rb, reader, 0); + EXPECT_EQ(ret, 0); + EXPECT_FALSE(reader.called); + EXPECT_EQ(getState(rb), StateE1); + } + + // + // F0 + // + { + // F0 write-> N/A + auto rb = makeFull0(); + Writer writer(1); + auto ret = processWriteBuf(rb, writer, 0); + EXPECT_EQ(ret, 0); + EXPECT_FALSE(writer.called); + EXPECT_EQ(getState(rb), StateF0); + } + { + // F0 read-> E1 + auto rb = makeFull0(); + int32_t rSize = s_max_size; + Reader reader(rSize); + auto ret = processReadBuf(rb, reader, /*rSize*/ s_max_size / 2); + EXPECT_EQ(ret, rSize); + EXPECT_TRUE(reader.p != nullptr); + EXPECT_EQ(reader.num, rSize); + EXPECT_EQ(getState(rb), StateE1); + + // w == r == r_end == max_sizeという特殊な状態 (E1テストでカバー) + } + { + // F0 read-> X1 + auto rb = makeFull0(); + int32_t rSize = 10; + Reader reader(rSize); + auto ret = processReadBuf(rb, reader, rSize); + EXPECT_EQ(ret, rSize); + EXPECT_TRUE(reader.p != nullptr); + EXPECT_EQ(reader.num, s_max_size); + EXPECT_EQ(getState(rb), StateX1); + + // r < w == max_sizeという特殊な状態 (X1テストでカバー) + } + { + // F0 read-> X1 (fail) + auto rb = makeFull0(); + int32_t rSize = s_max_size + 1; + Reader reader(1); + auto ret = processReadBuf(rb, reader, rSize); + EXPECT_EQ(ret, -1); // too large + EXPECT_FALSE(reader.called); + EXPECT_EQ(getState(rb), StateF0); + } + + // + // F1 + // + { + // F1 write-> N/A + int32_t rAvail = 10; + auto rb = makeFull1(/*wAvail*/ -1, rAvail); + Writer writer(1); + auto ret = processWriteBuf(rb, writer, 0); + EXPECT_EQ(ret, 0); + EXPECT_FALSE(writer.called); + EXPECT_EQ(getState(rb), StateF1); + } + { + // F1 read-> X0 + int32_t rAvail = 10; + auto rb = makeFull1(/*wAvail*/ -1, rAvail); + Reader reader(rAvail); + auto ret = processReadBuf(rb, reader, rAvail); + EXPECT_EQ(ret, rAvail); + EXPECT_TRUE(reader.p != nullptr); + EXPECT_EQ(reader.num, rAvail); + EXPECT_EQ(getState(rb), StateX0); + } + { + // F1 read-> X1 + // r=end,r-1=w + int32_t rAvail = 0; + int32_t rSize = 10; + auto rb = makeFull1(/*wAvail*/ -1, rAvail, /*tailRoom*/ 0); + Reader reader(rSize); + auto ret = processReadBuf(rb, reader, rSize); + EXPECT_EQ(ret, rSize); + EXPECT_TRUE(reader.p != nullptr); + EXPECT_EQ(reader.num, s_max_size - 1); + EXPECT_EQ(getState(rb), StateX1); + } + { + // F1 read-> E1 + // r=end,r-1=w + int32_t rAvail = 0; + int32_t rSize = s_max_size - 1; + auto rb = makeFull1(/*wAvail*/ -1, rAvail, /*tailRoom*/ 0); + Reader reader(rSize); + auto ret = processReadBuf(rb, reader, /*rSize*/ s_max_size / 2); + EXPECT_EQ(ret, rSize); + EXPECT_TRUE(reader.p != nullptr); + EXPECT_EQ(reader.num, s_max_size - 1); + EXPECT_EQ(getState(rb), StateE1); + } + { + // F1 read-> E1 (fail) + // r=end,r-1=w + int32_t rAvail = 0; + int32_t rSize = s_max_size; + auto rb = makeFull1(/*wAvail*/ -1, rAvail, /*tailRoom*/ 0); + Reader reader(1); + auto ret = processReadBuf(rb, reader, /*rSize*/ s_max_size / 2 + 1); + EXPECT_EQ(ret, -1); + EXPECT_FALSE(reader.called); + EXPECT_EQ(getState(rb), StateF1); + } + + // + // X0 + // + { + // X0 write-> F0 + int32_t wAvail = 10; + int32_t wSize = 10; + auto rb = makeFront0(wAvail); + Writer writer(wSize); + auto ret = processWriteBuf(rb, writer, wSize); + EXPECT_EQ(ret, wSize); + EXPECT_TRUE(writer.p != nullptr); + EXPECT_EQ(writer.num, wAvail); + EXPECT_EQ(getState(rb), StateF0); + } + if (0) { + // X0 write-> F0 (fail) + // wait指定不要にしたので意味のないテストになっている + int32_t wAvail = 10; + int32_t wSize = 11; + auto rb = makeFront0(wAvail); + Writer writer(1); + auto ret = processWriteBuf(rb, writer, /*wSize*/ s_max_size); + EXPECT_EQ(ret, -1); + EXPECT_FALSE(writer.called); + EXPECT_EQ(getState(rb), StateX0); + } + { + // X0 write-> X0 + int32_t wAvail = 10; + int32_t wSize = 5; + auto rb = makeFront0(wAvail); + Writer writer(wSize); + auto ret = processWriteBuf(rb, writer, wSize); + EXPECT_EQ(ret, wSize); + EXPECT_TRUE(writer.p != nullptr); + EXPECT_EQ(writer.num, wAvail); + EXPECT_EQ(getState(rb), StateX0); + } + { + // X0 read-> E1 + int32_t wAvail = 10; + int32_t rAvail = s_max_size - wAvail; + int32_t rSize = rAvail; + auto rb = makeFront0(wAvail); + Reader reader(rSize); + auto ret = processReadBuf(rb, reader, /*rSize*/ s_max_size / 2); + EXPECT_EQ(ret, rSize); + EXPECT_TRUE(reader.p != nullptr); + EXPECT_EQ(reader.num, rAvail); + EXPECT_EQ(getState(rb), StateE1); + } + if (0) { + // X0 read-> E1 (fail) + int32_t wAvail = 10; + int32_t rAvail = s_max_size - wAvail; + int32_t rSize = rAvail + 1; + auto rb = makeFront0(wAvail); + Reader reader(1); + auto ret = processReadBuf(rb, reader, 0); + EXPECT_EQ(ret, 0); + EXPECT_FALSE(reader.called); + EXPECT_EQ(getState(rb), StateX0); + } + { + // X0 read-> X1 + int32_t wAvail = 10; + int32_t rAvail = s_max_size - wAvail; + int32_t rSize = 1; + auto rb = makeFront0(wAvail); + Reader reader(rSize); + auto ret = processReadBuf(rb, reader, rSize); + EXPECT_EQ(ret, rSize); + EXPECT_TRUE(reader.p != nullptr); + EXPECT_EQ(reader.num, rAvail); + EXPECT_EQ(getState(rb), StateX1); + } + + // + // X1 + // + { + // X1 write-> X1 + int32_t wAvail = 40; + int32_t wSize = 5; + int32_t rAvail = 10; + auto rb = makeFront1(wAvail, rAvail); + Writer writer(wSize); + auto ret = processWriteBuf(rb, writer, wSize); + EXPECT_EQ(ret, wSize); + EXPECT_TRUE(writer.p != nullptr); + EXPECT_EQ(writer.num, wAvail); + EXPECT_EQ(getState(rb), StateX1); + } + { + // X1 write-> X1(write_indexがmax_sizeになる) + int32_t wAvail = 40; + int32_t wSize = wAvail; + int32_t rAvail = 10; + auto rb = makeFront1(wAvail, rAvail); + Writer writer(wSize); + auto ret = processWriteBuf(rb, writer, /*wSize*/ s_max_size / 2); + EXPECT_EQ(ret, wSize); + EXPECT_TRUE(writer.p != nullptr); + EXPECT_EQ(writer.num, wAvail); + EXPECT_EQ(getState(rb), StateX1); + + // r < w == max_sizeという特殊な状態 + // 次のテストでカバー + } + { + // X1 write-> Y1 + // w == max_size + int32_t wAvail = 0; + int32_t rAvail = 20; + // w_index = s_max_size - wAvail; //64 + // r_index = w_index - rAvail; //44 + + auto rb = makeFront1(wAvail, rAvail); + int32_t wSize = 10; + Writer writer(wSize); + auto ret = processWriteBuf(rb, writer, wSize); + EXPECT_EQ(ret, wSize); + // w_index = 0 + // r_index = 44 + int32_t wAvail2 = s_max_size - wAvail - rAvail - 1; + EXPECT_TRUE(writer.p != nullptr); + EXPECT_EQ(writer.num, wAvail2); + // w_index = 10 + // r_index = 44 + EXPECT_EQ(getState(rb), StateY1); + } + if (0) { + // X1 write-> Y1 (fail) + // w == max_size + int32_t wAvail = 0; + int32_t rAvail = 20; + int32_t wSize = s_max_size - wAvail - rAvail; + auto rb = makeFront1(wAvail, rAvail); + Writer writer(1); + auto ret = processWriteBuf(rb, writer, 0); + EXPECT_EQ(ret, 0); + EXPECT_FALSE(writer.called); + EXPECT_EQ(getState(rb), StateX1); + } + { + // X1 write-> Y1 + int32_t wAvail = 10; + int32_t rAvail = 10; + // w_index = s_max_size - wAvail; //54 + // r_index = w_index - rAvail; //44 + + auto rb = makeFront1(wAvail, rAvail); + + int32_t wSize = s_max_size - wAvail - rAvail - 2; + Writer writer(wSize); + auto ret = processWriteBuf(rb, writer, /*wSize*/ s_max_size / 2); + EXPECT_EQ(ret, wSize); + // w_index = 10 + // r_index = 44 + EXPECT_TRUE(writer.p != nullptr); + EXPECT_EQ(writer.num, wSize + 1); + EXPECT_EQ(getState(rb), StateY1); + } + { + // X1 write-> F1 + int32_t wAvail = 10; + int32_t wSize = 20; + int32_t rAvail = s_max_size - wAvail - (wSize + 1); + // w_index = s_max_size - wAvail; //54 + // r_index = wAvail; //21 + auto rb = makeFront1(wAvail, rAvail); + Writer writer(wSize); + auto ret = processWriteBuf(rb, writer, wSize); + EXPECT_EQ(ret, wSize); + EXPECT_TRUE(writer.p != nullptr); + EXPECT_EQ(writer.num, wSize); + EXPECT_EQ(getState(rb), StateF1); + } + if (0) { + // X1 write-> F1 (fail) + int32_t wAvail = 10; + int32_t wSize = 20; + int32_t rAvail = s_max_size - wAvail - (wSize + 1); + // w_index = s_max_size - wAvail; //54 + // r_index = wAvail; //21 + auto rb = makeFront1(wAvail, rAvail); + Writer writer(1); + auto ret = processWriteBuf(rb, writer, 0); + EXPECT_EQ(ret, 0); + EXPECT_FALSE(writer.called); + } + { + // X1 read-> E1 + int32_t wAvail = 10; + int32_t rAvail = 10; + int32_t rSize = rAvail; + auto rb = makeFront1(wAvail, rAvail); + Reader reader(rSize); + auto ret = processReadBuf(rb, reader, rSize); + EXPECT_EQ(ret, rSize); + EXPECT_TRUE(reader.p != nullptr); + EXPECT_EQ(reader.num, rAvail); + EXPECT_EQ(getState(rb), StateE1); + } + if (0) { + // X1 read-> E1 + int32_t wAvail = 10; + int32_t rAvail = 10; + int32_t rSize = rAvail + 1; + auto rb = makeFront1(wAvail, rAvail); + Reader reader(1); + auto ret = processReadBuf(rb, reader, 0); + EXPECT_EQ(ret, rSize); + EXPECT_TRUE(reader.p == nullptr); + EXPECT_EQ(reader.num, 0); + } + { + // X1 read-> X1 + int32_t wAvail = 10; + int32_t rAvail = 10; + int32_t rSize = 1; + auto rb = makeFront1(wAvail, rAvail); + Reader reader(rSize); + auto ret = processReadBuf(rb, reader, rSize); + EXPECT_EQ(ret, rSize); + EXPECT_TRUE(reader.p != nullptr); + EXPECT_EQ(reader.num, rAvail); + EXPECT_EQ(getState(rb), StateX1); + } + + // + // Y0 + // + { + // Y0 write-> F1 + int32_t rAvail = 20; + int32_t wAvail = s_max_size - 2 - rAvail - 1; // -2はmakeBack0()のマジックナンバーより + int32_t wSize = wAvail; + auto rb = makeBack0(/*wAvail*/ -1, rAvail); + Writer writer(wSize); + auto ret = processWriteBuf(rb, writer, /*wSize*/ s_max_size / 2); + EXPECT_EQ(ret, wSize); + EXPECT_TRUE(writer.p != nullptr); + EXPECT_EQ(writer.num, wSize); + EXPECT_EQ(getState(rb), StateF1); + } + if (0) { + // Y0 write-> F1 (faild) + int32_t rAvail = 20; + int32_t wAvail = s_max_size - 2 - rAvail - 1; + int32_t wSize = wAvail + 1; + auto rb = makeBack0(/*wAvail*/ -1, rAvail); + Writer writer(wSize); + auto ret = processWriteBuf(rb, writer, 0); + EXPECT_EQ(ret, 0); + EXPECT_FALSE(writer.called); + EXPECT_EQ(getState(rb), StateY0); + } + { + // Y0 write-> Y1 + int32_t rAvail = 20; + int32_t wAvail = s_max_size - 2 - rAvail - 1; + int32_t wSize = 10; + auto rb = makeBack0(/*wAvail*/ -1, rAvail); + Writer writer(wSize); + auto ret = processWriteBuf(rb, writer, wSize); + EXPECT_EQ(ret, wSize); + EXPECT_TRUE(writer.p != nullptr); + EXPECT_EQ(writer.num, wAvail); + EXPECT_EQ(getState(rb), StateY1); + } + { + // Y0 read-> E0 + int32_t rAvail = 10; + // int32_t wAvail=s_max_size - rAvail - 2; + int32_t rSize = rAvail; + auto rb = makeBack0(/*wAvail*/ -1, rAvail); + Reader reader(rSize); + auto ret = processReadBuf(rb, reader, rSize); + EXPECT_EQ(ret, rSize); + EXPECT_TRUE(reader.p != nullptr); + EXPECT_EQ(reader.num, rAvail); + EXPECT_EQ(getState(rb), StateE0); + } + { + // Y0 read-> Y0 + int32_t rAvail = 10; + // int32_t wAvail=s_max_size - rAvail - 2; + int32_t rSize = 1; + auto rb = makeBack0(/*wAvail*/ -1, rAvail); + Reader reader(rSize); + auto ret = processReadBuf(rb, reader, rSize); + EXPECT_EQ(ret, rSize); + EXPECT_TRUE(reader.p != nullptr); + EXPECT_EQ(reader.num, rAvail); + EXPECT_EQ(getState(rb), StateY0); + } + + // + // Y1 + // + { + // Y1 write-> Y1 + int32_t rAvail = 20; + int32_t wAvail = 20; + int32_t wSize = 10; + auto rb = makeBack1(wAvail, rAvail); + Writer writer(wSize); + auto ret = processWriteBuf(rb, writer, wSize); + EXPECT_EQ(ret, wSize); + EXPECT_TRUE(writer.p != nullptr); + EXPECT_EQ(writer.num, wAvail); + EXPECT_EQ(getState(rb), StateY1); + } + { + // Y1 write-> F1 + int32_t rAvail = 20; + int32_t wAvail = 20; + int32_t wSize = wAvail; + auto rb = makeBack1(wAvail, rAvail); + Writer writer(wSize); + auto ret = processWriteBuf(rb, writer, wSize); + EXPECT_EQ(ret, wSize); + EXPECT_TRUE(writer.p != nullptr); + EXPECT_EQ(writer.num, wAvail); + EXPECT_EQ(getState(rb), StateF1); + } + if (0) { + // Y1 write-> F1 (fail) + int32_t rAvail = 20; + int32_t wAvail = 20; + int32_t wSize = wAvail + 1; + auto rb = makeBack1(wAvail, rAvail); + Writer writer(1); + auto ret = processWriteBuf(rb, writer, 0); + EXPECT_EQ(ret, 0); + EXPECT_FALSE(writer.called); + EXPECT_EQ(getState(rb), StateY1); + } + { + // Y1 read-> X0 + int32_t rAvail = 20; + int32_t rSize = rAvail; + int32_t wAvail = 20; + auto rb = makeBack1(wAvail, rAvail); + Reader reader(rSize); + auto ret = processReadBuf(rb, reader, rSize); + EXPECT_EQ(ret, rSize); + EXPECT_TRUE(reader.p != nullptr); + EXPECT_EQ(reader.num, rAvail); + EXPECT_EQ(getState(rb), StateX0); + } + { + // Y1 read-> Y1 + int32_t rAvail = 20; + int32_t rSize = 10; + int32_t wAvail = 20; + auto rb = makeBack1(wAvail, rAvail); + Reader reader(rSize); + auto ret = processReadBuf(rb, reader, rSize); + EXPECT_EQ(ret, rSize); + EXPECT_TRUE(reader.p != nullptr); + EXPECT_EQ(reader.num, rAvail); + EXPECT_EQ(getState(rb), StateY1); + } +} + +GTEST_TEST_F(BoundedSpscZeroCopyTest, BlockingRead) +{ + auto rb = makeEmpty0(); + + struct Reader + { + Reader(int32_t rsize0 = 0) + : rsize(rsize0) + { + } + int32_t rsize; + int32_t operator()(const char* p0, int32_t avail0) + { + EXPECT_TRUE(0 < avail0); + EXPECT_EQ(p0[0], 123); + return rsize; + }; + }; + struct Writer + { + Writer(int32_t wsize0 = 0) + : wsize(wsize0) + { + } + int32_t wsize; + int32_t operator()(char* p0, int32_t avail0) + { + EXPECT_TRUE(0 < avail0); + p0[0] = 123; + return wsize; + }; + }; + + auto processReadBuf = [](std::unique_ptr& rb, Reader& reader, int32_t wantSize) { + auto [ptr, size] = rb->getReadPtr(wantSize); + if (ptr) { + auto readSize = reader(ptr, size); + return rb->moveReadPtr(readSize); + } + return size; + }; + auto processWriteBuf = [](std::unique_ptr& rb, Writer& writer, int32_t wantSize) { + auto [ptr, size] = rb->getWritePtr(wantSize); + if (ptr) { + auto writtenSize = writer(ptr, size); + return rb->moveWritePtr(writtenSize); + } + return size; + }; + + std::thread th([&]() { + while (0 >= getWaiterCount(rb)) { + SWITCH_TO_THREAD(); + } + int32_t wsize = 1; + Writer writer(wsize); + auto ret = processWriteBuf(rb, writer, wsize); + _ASSERTE(ret == 1); + }); + + int32_t rsize = 1; + Reader reader(rsize); + auto ret = processReadBuf(rb, reader, rsize); + EXPECT_EQ(ret, 1); + + th.join(); +} + +GTEST_TEST_F(BoundedSpscZeroCopyTest, BlockingWrite) +{ + auto rb = makeEmpty0(); + + struct Reader + { + Reader(int32_t rsize0 = 0) + : rsize(rsize0) + { + } + int32_t rsize; + int32_t operator()(const char* p0, int32_t avail0) + { + EXPECT_TRUE(0 < avail0); + for (int i = 0; i < rsize; ++i) { + _ASSERTE(p0[i] == i + 10); + } + return rsize; + }; + }; + struct Writer + { + Writer(int32_t wsize0 = 0) + : wsize(wsize0) + { + } + int32_t wsize; + int32_t operator()(char* p0, int32_t avail0) + { + EXPECT_TRUE(0 < avail0); + for (int i = 0; i < wsize; ++i) { + p0[i] = char(i + 10); + } + return wsize; + }; + }; + + auto processReadBuf = [](std::unique_ptr& rb, Reader& reader, int32_t wantSize) { + auto [ptr, size] = rb->getReadPtr(wantSize); + if (ptr) { + auto readSize = reader(ptr, size); + return rb->moveReadPtr(readSize); + } + return size; + }; + auto processWriteBuf = [](std::unique_ptr& rb, Writer& writer, int32_t wantSize) { + auto [ptr, size] = rb->getWritePtr(wantSize); + if (ptr) { + auto writtenSize = writer(ptr, size); + return rb->moveWritePtr(writtenSize); + } + return size; + }; + + const int32_t rsize = s_max_size / 2; + + std::thread reader([&]() { + std::this_thread::yield(); + while (0 >= getWaiterCount(rb)) { + std::this_thread::yield(); + } + // #1 + Reader reader(rsize); + auto ret = processReadBuf(rb, reader, rsize); + _ASSERTE(ret == rsize); + + // #2 + ret = processReadBuf(rb, reader, rsize); + _ASSERTE(ret == rsize); + + // #3 + ret = processReadBuf(rb, reader, rsize); + _ASSERTE(ret == rsize); + }); + + // #1 + Writer writer(rsize); + auto ret = processWriteBuf(rb, writer, rsize); + EXPECT_EQ(ret, rsize); + + // #2 + ret = processWriteBuf(rb, writer, rsize); + EXPECT_EQ(ret, rsize); + + // #3 + ret = processWriteBuf(rb, writer, rsize); + EXPECT_EQ(ret, rsize); + + reader.join(); +} + +GTEST_TEST_F(BoundedSpscZeroCopyTest, CancelRead) +{ + auto rb = makeEmpty0(); + + struct Reader + { + Reader(int32_t rsize0 = 0) + : rsize(rsize0) + { + } + int32_t rsize; + int32_t operator()(const char* p0, int32_t avail0) + { + EXPECT_TRUE(0 < avail0); + for (int i = 0; i < rsize; ++i) { + _ASSERTE(p0[i] == i + 10); + } + return rsize; + }; + }; + + auto processReadBuf = [](std::unique_ptr& rb, Reader& reader, int32_t wantSize) { + auto [ptr, size] = rb->getReadPtr(wantSize); + if (ptr) { + auto readSize = reader(ptr, size); + return rb->moveReadPtr(readSize); + } + return size; + }; + + std::thread th([&]() { + while (0 >= getWaiterCount(rb)) { + std::this_thread::yield(); + } + rb->cancel(); + }); + + char* p = nullptr; + int32_t rsize = 1; + Reader reader(rsize); + auto ret = processReadBuf(rb, reader, rsize); + EXPECT_EQ(ret, -1); + + th.join(); +} + +GTEST_TEST_F(BoundedSpscZeroCopyTest, CancelWrite) +{ + auto rb = makeEmpty0(); + + struct Writer + { + Writer(int32_t wsize0 = 0) + : wsize(wsize0) + { + } + int32_t wsize; + int32_t operator()(char* p0, int32_t avail0) + { + EXPECT_TRUE(0 < avail0); + for (int i = 0; i < wsize; ++i) { + p0[i] = char(i + 10); + } + return wsize; + }; + }; + + auto processWriteBuf = [](std::unique_ptr& rb, Writer& writer, int32_t wantSize) { + auto [ptr, size] = rb->getWritePtr(wantSize); + if (ptr) { + auto writtenSize = writer(ptr, size); + return rb->moveWritePtr(writtenSize); + } + return size; + }; + + std::thread th([&]() { + while (0 >= getWaiterCount(rb)) { + SwitchToThread(); + } + rb->cancel(); + }); + + char* p = nullptr; + const int32_t rsize = s_max_size / 3; + // #1 + Writer writer(rsize); + auto ret = processWriteBuf(rb, writer, rsize); + EXPECT_EQ(ret, rsize); + + // #2 + ret = processWriteBuf(rb, writer, rsize); + EXPECT_EQ(ret, rsize); + + // #3 + ret = processWriteBuf(rb, writer, rsize); + EXPECT_EQ(ret, rsize); + + // #4 + ret = processWriteBuf(rb, writer, rsize); + EXPECT_EQ(ret, -1); + + th.join(); +} + +GTEST_TEST_F(BoundedSpscZeroCopyTest, Random) +{ + struct Reader + { + Reader(int32_t rsize0, int n0) + : rsize(rsize0) + , n(n0) + { + } + int32_t rsize; + int32_t n; + int32_t operator()(const char* p0, int32_t avail0) + { + EXPECT_TRUE(0 < avail0); + for (int i = 0; i < rsize; ++i) { + _ASSERTE(p0[i] == (char)(i + n)); + } + return rsize; + }; + }; + struct Writer + { + Writer(int32_t wsize0, int n0) + : wsize(wsize0) + , n(n0) + { + } + int32_t wsize; + int32_t n; + int32_t operator()(char* p0, int32_t avail0) + { + EXPECT_TRUE(0 < avail0); + for (int i = 0; i < wsize; ++i) { + p0[i] = char(i + n); + } + return wsize; + }; + }; + + auto processReadBuf = [](std::unique_ptr& rb, Reader& reader, int32_t wantSize) { + auto [ptr, size] = rb->getReadPtr(wantSize); + if (ptr) { + auto readSize = reader(ptr, size); + return rb->moveReadPtr(readSize); + } + return size; + }; + auto processWriteBuf = [](std::unique_ptr& rb, Writer& writer, int32_t wantSize) { + auto [ptr, size] = rb->getWritePtr(wantSize); + if (ptr) { + auto writtenSize = writer(ptr, size); + return rb->moveWritePtr(writtenSize); + } + return size; + }; + + volatile uint32_t seed = 4646; + std::mt19937 mt(seed); + std::uniform_int_distribution dist(1, 32); + + const size_t s_sizeListLength = 10000; + std::vector sizeList(s_sizeListLength); + std::generate(sizeList.begin(), sizeList.end(), [&]() { return dist(mt); }); + + auto rb = makeEmpty0(); + + const int loopNum = 1000000; + auto producer = [&]() { + for (int i = 0; i < loopNum; ++i) { + auto sz = sizeList[i % sizeList.size()]; + Writer writer(sz, i); + auto ret = processWriteBuf(rb, writer, sz); + _ASSERTE(ret == sz); + } + }; + auto consumer = [&]() { + for (int i = 0; i < loopNum; ++i) { + auto sz = sizeList[i % sizeList.size()]; + Reader reader(sz, i); + auto ret = processReadBuf(rb, reader, sz); + _ASSERTE(ret == sz); + } + }; + + std::thread c([&]() { producer(); }); + std::thread p([&]() { consumer(); }); + + p.join(); + c.join(); +} + +} // namespace pf diff --git a/src/pt/pf_common_headers.h b/src/pt/pf_common_headers.h new file mode 100644 index 0000000..f312027 --- /dev/null +++ b/src/pt/pf_common_headers.h @@ -0,0 +1,60 @@ +/* Copyright (c) 2024 Masaaki Hamada + * Distributed under the MIT License (http://opensource.org/licenses/MIT) + */ + +#pragma once + +#if defined(_WIN32) +#pragma warning(disable : 4189) +#pragma warning(disable : 4702) +#pragma warning(disable : 4324) + +#define WIN32_LEAN_AND_MEAN +#define NOMINMAX + +#include + +#include +#else +#endif + +#include +#include +#include +#include + +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include diff --git a/src/pt/pf_mpsc_ringbuffer.h b/src/pt/pf_mpsc_ringbuffer.h new file mode 100644 index 0000000..7bf643c --- /dev/null +++ b/src/pt/pf_mpsc_ringbuffer.h @@ -0,0 +1,232 @@ +/* Copyright (c) 2024 Masaaki Hamada + * Distributed under the MIT License (http://opensource.org/licenses/MIT) + */ + +#pragma once + +#include "pf_base.h" + +namespace pf { + +template +class MpscRingBuffer : private pf::NonCopyable +{ + public: + enum + { + kMaxSize = max_size, + }; + struct WriteStats + { + int32_t max_queued; + int32_t wait_count; + }; + + protected: + alignas(std::hardware_destructive_interference_size) std::atomic m_write_index = 0; + std::atomic m_read_max_index = 0; + struct + { + std::atomic max_queued = 0; + std::atomic wait_count = 0; + } m_write_stats{}; + alignas(std::hardware_destructive_interference_size) std::atomic m_read_index = 0; + int64_t m_read_index_expected = 0; + int64_t m_read_index_desired = 0; + ElementType m_buffer[max_size]; + + public: + MpscRingBuffer(void) = default; + ~MpscRingBuffer(void) { this->cancel(); } + MpscRingBuffer(MpscRingBuffer&&) = delete; + MpscRingBuffer& operator=(MpscRingBuffer&&) = delete; + + void cancel(void) + { + for (;;) { + auto index = m_write_index.load(std::memory_order_relaxed); + if (0 > index) { + break; + } + auto result = m_write_index.compare_exchange_strong(/*expected*/ index, + /*desired */ -1); + if (result) { + // m_write_index.notify_all(); + break; + } + } + for (;;) { + auto index = m_read_max_index.load(std::memory_order_relaxed); + if (0 > index) { + break; + } + auto result = m_read_max_index.compare_exchange_strong(/*expected*/ index, + /*desired */ -1); + if (result) { + m_read_max_index.notify_all(); + break; + } + } + for (;;) { + auto index = m_read_index.load(std::memory_order_relaxed); + if (0 > index) { + break; + } + auto result = m_read_index.compare_exchange_strong(/*expected*/ index, + /*desired */ -1); + if (result) { + m_read_index.notify_all(); + break; + } + } + } + + bool push(const ElementType& data) { return this->pushCommon(data, /*wait*/ true); } + bool tryPush(const ElementType& data) { return this->pushCommon(data, /*wait*/ false); } + + int64_t tryPeek(ElementType** ppData, size_t num) + { + return this->peekCommon(ppData, num, /*wait*/ false); + } + int64_t peek(ElementType** ppData, size_t num) + { + return this->peekCommon(ppData, num, /*wait*/ true); + } + void commitPop(void) { return this->commitPopCommon(); } + WriteStats getWriteStats(void) const noexcept + { + return { m_write_stats.max_queued.load(std::memory_order_relaxed), + m_write_stats.wait_count.load(std::memory_order_relaxed) }; + } + + public: + bool pushCommon(const ElementType& data, bool wait) + { + int64_t write_index; + int64_t new_write_index; + int32_t queue_size; + for (;;) { + write_index = m_write_index.load(std::memory_order_relaxed); + if (0 > write_index) { + return false; // canceled + } + auto read_index = m_read_index.load(std::memory_order_acquire); + if (0 > read_index) { + return false; // canceled; + } + new_write_index = (write_index + 1) % max_size; + if (new_write_index == read_index) { + // the queue is full + if (!wait) { + return false; + } + m_write_stats.wait_count.fetch_add(1, std::memory_order_relaxed); + m_read_index.wait(read_index, std::memory_order_relaxed); + continue; + } + + // queue size + queue_size = int32_t(max_size + new_write_index - read_index) % max_size; + + // strongを使うと場合は、失敗した=他のwriterとコンフリクトしたということになるので + // m_write_index.load()からやり直しする必要があることが確定する + auto result = m_write_index.compare_exchange_strong(/*expected*/ write_index, + /*desired*/ new_write_index, + std::memory_order_relaxed); + if (result) { + break; + } + } + + m_buffer[write_index] = data; + + // update stat + { + auto max_queued = m_write_stats.max_queued.load(std::memory_order_relaxed); + while (max_queued < queue_size) { + // while (max_queued < num) でループさせるのでweakで十分 + auto result = + m_write_stats.max_queued.compare_exchange_weak(/*expected*/ max_queued, + /*desired*/ queue_size, + std::memory_order_relaxed); + if (result) { + break; + } + } + } + + auto new_read_max_index = new_write_index; + for (;;) { + auto read_max_index = write_index; + auto result = m_read_max_index.compare_exchange_weak(/*expected*/ read_max_index, + /*desired*/ new_read_max_index, + std::memory_order_release); + if (result) { + m_read_max_index.notify_one(); + break; + } + if (0 > read_max_index) { + return false; // canceled + } + } + + return true; + } + + static int64_t read_availe(int64_t read_max_index, int64_t read_index) + { + if (read_index <= read_max_index) { + return read_max_index - read_index; + } + return max_size - read_index; + } + + int64_t peekCommon(ElementType** ppData, int64_t num, bool wait) + { + const auto read_index = m_read_index.load(std::memory_order_relaxed); + int64_t read_max_index; + for (;;) { + read_max_index = m_read_max_index.load(std::memory_order_acquire); + if (0 > read_max_index) { + return -1; // canceled + } + bool empty = (read_index == read_max_index); + if (!empty) { + break; + } + // empty + if (!wait) { + return false; + } + m_read_max_index.wait(read_max_index, std::memory_order_relaxed); + } + + auto avail = read_availe(read_max_index, read_index); + num = (std::min)(avail, num); + *ppData = &m_buffer[read_index]; + + m_read_index_expected = read_index; + m_read_index_desired = (read_index + num) % max_size; + return num; + } + + void commitPopCommon(void) + { + auto new_read_index = m_read_index_desired; + for (;;) { + auto read_index_expected = m_read_index_expected; + auto result = m_read_index.compare_exchange_weak(/*expected*/ read_index_expected, + /*desired */ new_read_index, + std::memory_order_release); + if (result) { + m_read_index.notify_one(); + break; + } + if (0 > read_index_expected) { + return; // canceled + } + } + } +}; + +} // namespace pf diff --git a/src/pt/pf_mpsc_ringbuffer_test.cpp b/src/pt/pf_mpsc_ringbuffer_test.cpp new file mode 100644 index 0000000..8fdea82 --- /dev/null +++ b/src/pt/pf_mpsc_ringbuffer_test.cpp @@ -0,0 +1,207 @@ +/* Copyright (c) 2024 Masaaki Hamada + * Distributed under the MIT License (http://opensource.org/licenses/MIT) + */ + +#define GTEST_DONT_DEFINE_TEST 1 +#include + +#include "pf_mpsc_ringbuffer.h" +#include + +namespace pf { + +struct X +{ + int32_t id; + int32_t seq; + bool done; +}; + +class MpscRingBufferTest : public ::testing::Test +{ + static constexpr size_t s_max_size = 128; + static constexpr size_t s_sizeListLength = 10000; + std::vector m_sizeList = generateSizeList(); + std::vector m_sendData = generateSendData(); + using RingBuffer = pf::MpscRingBuffer; + + private: + static std::vector generateSizeList() + { + volatile unsigned seed = 4946; + std::mt19937 mt{ seed }; + std::uniform_int_distribution dist( + 1, s_max_size - 1); // s_max_size-1はエラーになるので注意 + std::vector ret(s_sizeListLength); + std::generate(ret.begin(), ret.end(), [&]() { return dist(mt); }); + return ret; + } + static std::vector generateSendData() + { + volatile unsigned seed = 8888; + std::mt19937 mt{ seed }; + std::vector ret(s_max_size); + std::generate(ret.begin(), ret.end(), [&]() { return char(mt()); }); + return ret; + } + + public: + void producer(RingBuffer& rb, int64_t n, int32_t id) + { + int32_t seq = 0; + for (auto i = 0ll; i < n;) { + bool done = (i == n - 1); + X x{ id, int32_t(i), done }; + if (seq & 0x100) { + SwitchToThread(); + } + if (seq & 8) { + auto result = rb.push(x); + _ASSERTE(result); + ++i; + } else { + auto result = rb.tryPush(x); + if (result) { + ++i; + } + } + } + }; + + void consumer(RingBuffer& rb, int producerNum, int& ret, int64_t& total) + { + struct ProducerInfo + { + int seq = 0; + bool done = false; + }; + std::vector producers(producerNum); + int doneCount = 0; + int wrong = 0; + int64_t popCount = 0; + int64_t write_index = 0; + for (;;) { + X* p = nullptr; + size_t num = 10; + auto n = rb.peek(&p, num); + if (0 > n) { + break; + } + for (auto i = 0; i < n; ++i) { + X& x = p[i]; + ++popCount; + if (!((unsigned)x.id < producers.size())) { + ++wrong; + } + auto& prod = producers[x.id]; + if (x.seq != prod.seq) { + ++wrong; + } + ++prod.seq; + if (prod.done) { + ++wrong; + } + if (x.done) { + prod.done = true; + ++doneCount; + } + } + if (0 < n) { + rb.commitPop(); + } + if (doneCount == producerNum) { + break; + } + } + ret = wrong; + total = popCount; + }; + + void execute(int64_t count, int producerNum) + { + auto rb0 = std::make_unique(); + RingBuffer& rb = *rb0.get(); + int wrongCount = 0; + int64_t total = 0; + std::thread c(&MpscRingBufferTest::consumer, + this, + std::ref(rb), + producerNum, + std::ref(wrongCount), + std::ref(total)); + std::vector producers(producerNum); + for (auto id = 0; id < producers.size(); ++id) { + producers[id] = + std::thread(&MpscRingBufferTest::producer, this, std::ref(rb), count, id); + } + for (auto id = 0; id < producers.size(); ++id) { + producers[id].join(); + } + c.join(); + ASSERT_EQ(wrongCount, 0); + auto totalExpected = count * producerNum; + ASSERT_EQ(total, totalExpected); + } + + void producer_cancel(RingBuffer& rb, int64_t n, int32_t id) + { + int32_t seq = 0; + for (auto i = 0ll; i < n;) { + bool done = (i == n - 1); + X x{ id, int32_t(i), done }; + if (seq & 0x100) { + SwitchToThread(); + } + auto result = rb.push(x); + if (!result) { + break; + } + ++i; + } + }; + + void execute_cancel(int64_t count, int producerNum) + { + int64_t sendCount = 100000; + for (auto i = 0; i < count; ++i) { + auto rb0 = std::make_unique(); + RingBuffer& rb = *rb0.get(); + int wrongCount = 0; + int64_t total = 0; + std::thread c(&MpscRingBufferTest::consumer, + this, + std::ref(rb), + producerNum, + std::ref(wrongCount), + std::ref(total)); + std::vector producers(producerNum); + for (auto id = 0; id < producers.size(); ++id) { + producers[id] = std::thread( + &MpscRingBufferTest::producer_cancel, this, std::ref(rb), sendCount, id); + } + + Sleep(1); + rb.cancel(); + + for (auto id = 0; id < producers.size(); ++id) { + producers[id].join(); + } + c.join(); + } + ASSERT_TRUE(true); + } +}; + +GTEST_TEST_F(MpscRingBufferTest, Normal) +{ + ASSERT_NO_THROW({ execute(/*count*/ 300000, /*producerNum*/ 1); }); + + ASSERT_NO_THROW({ execute(/*count*/ 400000, /*producerNum*/ 10); }); +} + +GTEST_TEST_F(MpscRingBufferTest, Cancel) +{ + ASSERT_NO_THROW({ execute_cancel(/*count*/ 100, /*producerNum*/ 10); }); +} + +} // namespace pf diff --git a/src/zaplog_test.cpp b/src/zaplog_test.cpp index bb7fdc6..d9f9858 100644 --- a/src/zaplog_test.cpp +++ b/src/zaplog_test.cpp @@ -1,3 +1,7 @@ +/* Copyright (c) 2024 Masaaki Hamada + * Distributed under the MIT License (http://opensource.org/licenses/MIT) + */ + #include #include