diff --git a/CMakeLists.txt b/CMakeLists.txt index 27e14e507c8..41087fbb415 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2108,6 +2108,7 @@ add_executable(mixxx-test src/test/enginemixertest.cpp src/test/enginemicrophonetest.cpp src/test/enginesynctest.cpp + src/test/fifotest.cpp src/test/fileinfo_test.cpp src/test/frametest.cpp src/test/globaltrackcache_test.cpp diff --git a/src/test/fifotest.cpp b/src/test/fifotest.cpp new file mode 100644 index 00000000000..d76a5a4a110 --- /dev/null +++ b/src/test/fifotest.cpp @@ -0,0 +1,389 @@ +#include + +#include +#include +#include + +#include "util/fifo.h" + +namespace { + +struct Param { + int requestedBufferSize; + int expectedBufferSize; + int offset; +}; + +class FifoTest : public testing::TestWithParam { +}; + +TEST_P(FifoTest, writeAvailableTest) { + const auto param = FifoTest::GetParam(); + std::vector data(param.requestedBufferSize); + FIFO fifo(param.requestedBufferSize); + fifo.releaseReadRegions(param.offset >= 0 + ? param.offset + : std::numeric_limits::max() + param.offset + + 1); + fifo.releaseWriteRegions(param.offset >= 0 + ? param.offset + : std::numeric_limits::max() + param.offset + + 1); + + ASSERT_EQ(param.expectedBufferSize, fifo.writeAvailable()); + ASSERT_EQ(100, fifo.write(data.data(), 100)); + ASSERT_EQ(param.expectedBufferSize - 100, fifo.writeAvailable()); + ASSERT_EQ(50, fifo.read(data.data(), 50)); + ASSERT_EQ(param.expectedBufferSize - 50, fifo.writeAvailable()); + ASSERT_EQ(param.expectedBufferSize - 50, fifo.write(data.data(), 1000000)); + ASSERT_EQ(0, fifo.writeAvailable()); +} + +TEST_P(FifoTest, readAvailableTest) { + const auto param = FifoTest::GetParam(); + std::vector data(param.requestedBufferSize); + FIFO fifo(param.requestedBufferSize); + fifo.releaseReadRegions(param.offset >= 0 + ? param.offset + : std::numeric_limits::max() + param.offset + + 1); + fifo.releaseWriteRegions(param.offset >= 0 + ? param.offset + : std::numeric_limits::max() + param.offset + + 1); + + ASSERT_EQ(0, fifo.readAvailable()); + ASSERT_EQ(100, fifo.write(data.data(), 100)); + ASSERT_EQ(100, fifo.readAvailable()); + ASSERT_EQ(50, fifo.read(data.data(), 50)); + ASSERT_EQ(50, fifo.readAvailable()); + ASSERT_EQ(param.expectedBufferSize - 50, fifo.write(data.data(), 1000000)); + ASSERT_EQ(param.expectedBufferSize, fifo.readAvailable()); +} + +TEST_P(FifoTest, flushReadTest) { + const auto param = FifoTest::GetParam(); + std::vector data(param.requestedBufferSize); + FIFO fifo(param.requestedBufferSize); + fifo.releaseReadRegions(param.offset >= 0 + ? param.offset + : std::numeric_limits::max() + param.offset + + 1); + fifo.releaseWriteRegions(param.offset >= 0 + ? param.offset + : std::numeric_limits::max() + param.offset + + 1); + + ASSERT_EQ(0, fifo.readAvailable()); + ASSERT_EQ(100, fifo.write(data.data(), 100)); + + int expected; + expected = (param.offset + 50) % param.expectedBufferSize; + if (expected < 0) { + expected += param.expectedBufferSize; + } + ASSERT_EQ(expected, fifo.flushReadData(50)); + + expected = (param.offset + 100) % param.expectedBufferSize; + if (expected < 0) { + expected += param.expectedBufferSize; + } + ASSERT_EQ(expected, fifo.flushReadData(1000000)); + ASSERT_EQ(param.expectedBufferSize, fifo.write(data.data(), 1000000)); + ASSERT_EQ(param.expectedBufferSize, fifo.readAvailable()); +} + +TEST_P(FifoTest, readWriteStressTest) { + const auto param = FifoTest::GetParam(); + std::vector data(param.expectedBufferSize); + FIFO fifo(param.requestedBufferSize); + fifo.releaseReadRegions(param.offset >= 0 + ? param.offset + : std::numeric_limits::max() + param.offset + + 1); + fifo.releaseWriteRegions(param.offset >= 0 + ? param.offset + : std::numeric_limits::max() + param.offset + + 1); + + std::vector wdata(param.expectedBufferSize + param.expectedBufferSize / 10); + std::vector rdata(param.expectedBufferSize + param.expectedBufferSize / 10); + uint32_t k = 0; + uint32_t j = 0; + + std::random_device rd; + std::mt19937 mt(rd()); + std::uniform_int_distribution dist( + 0, param.expectedBufferSize + param.expectedBufferSize / 10); + + while (k < 1000000) { + int n = dist(mt); + int m = std::min(n, fifo.writeAvailable()); + for (int i = 0; i < m; i++) { + wdata[i] = k++; + } + ASSERT_EQ(m, fifo.write(wdata.data(), n)); + n = dist(mt); + m = std::min(n, fifo.readAvailable()); + ASSERT_EQ(m, fifo.read(rdata.data(), n)); + for (int i = 0; i < m; i++) { + ASSERT_EQ(j++, rdata[i]); + } + } +} + +TEST_P(FifoTest, readWriteStressTestRegions) { + const auto param = FifoTest::GetParam(); + std::vector data(param.expectedBufferSize); + FIFO fifo(param.requestedBufferSize); + fifo.releaseReadRegions(param.offset >= 0 + ? param.offset + : std::numeric_limits::max() + param.offset + + 1); + fifo.releaseWriteRegions(param.offset >= 0 + ? param.offset + : std::numeric_limits::max() + param.offset + + 1); + + std::vector wdata(param.expectedBufferSize + param.expectedBufferSize / 10); + std::vector rdata(param.expectedBufferSize + param.expectedBufferSize / 10); + uint32_t k = 0; + uint32_t j = 0; + + std::random_device rd; + std::mt19937 mt(rd()); + std::uniform_int_distribution dist( + 0, param.expectedBufferSize + param.expectedBufferSize / 10); + + while (k < 1000000) { + int n = dist(mt); + int m = std::min(n, fifo.writeAvailable()); + uint32_t* ptr1; + ring_buffer_size_t size1; + uint32_t* ptr2; + ring_buffer_size_t size2; + ASSERT_EQ(m, fifo.aquireWriteRegions(n, &ptr1, &size1, &ptr2, &size2)); + ASSERT_EQ(m, size1 + size2); + for (int i = 0; i < size1; i++) { + ptr1[i] = k++; + } + for (int i = 0; i < size2; i++) { + ptr2[i] = k++; + } + fifo.releaseWriteRegions(m); + n = dist(mt); + m = std::min(n, fifo.readAvailable()); + ASSERT_EQ(m, fifo.aquireReadRegions(n, &ptr1, &size1, &ptr2, &size2)); + ASSERT_EQ(m, size1 + size2); + for (int i = 0; i < size1; i++) { + ASSERT_EQ(j++, ptr1[i]); + } + for (int i = 0; i < size2; i++) { + ASSERT_EQ(j++, ptr2[i]); + } + fifo.releaseReadRegions(m); + } +} + +INSTANTIATE_TEST_SUITE_P(FifoTestSuite, + FifoTest, + testing::Values( + Param{1024, 1024, 0}, + Param{1024, 1024, 1200}, + Param{1024, 1024, -1200}, + Param{65536, 65536, 0}, + Param{65536, 65536, 1200}, + Param{65536, 65536, -1200}, + Param{1234, 2048, 0}, + Param{1234, 2048, -1200}, + Param{1234, 2048, 1200})); + +} // namespace + +constexpr int rwtotal = 500000000; + +template +class MultiThreadRW { + T_FIFO m_fifo; + bool m_ok; + const int m_bufferSize; + const int m_total; + const bool m_wait; + + public: + MultiThreadRW(int ringBufferSize, int bufferSize, int total, bool wait) + : m_fifo(ringBufferSize), + m_ok{}, + m_bufferSize(bufferSize), + m_total(total), + m_wait(wait) { + } + + void write() { + int k = 0; + std::vector buffer(m_bufferSize); + while (k != m_total) { + int n = std::min(m_bufferSize, m_total - k); + if (m_wait) { + while (m_fifo.writeAvailable() < n) { + } + } + n = std::min(n, m_fifo.writeAvailable()); + for (int j = 0; j < n; j++) { + buffer[j] = k++; + } + m_fifo.write(buffer.data(), n); + } + } + + void read() { + m_ok = true; + int k = 0; + std::vector buffer(m_bufferSize); + while (k != m_total) { + int n = std::min(m_bufferSize, m_total - k); + if (m_wait) { + while (m_fifo.readAvailable() < n) { + } + } + n = m_fifo.read(buffer.data(), n); + for (int j = 0; j < n; j++) { + m_ok &= (buffer[j] == k++); + } + } + } + + bool run() { + m_ok = true; + std::thread th1(&MultiThreadRW::write, this); + std::thread th2(&MultiThreadRW::read, this); + th1.join(); + th2.join(); + return m_ok; + } +}; + +TEST(FifoTest, MultiThreadRW) { + MultiThreadRW> io(65536, 1024, rwtotal, false); + bool ok = io.run(); + ASSERT_TRUE(ok); +} + +TEST(FifoTest, MultiThreadRW_PA) { + MultiThreadRW> io(65536, 1024, rwtotal, false); + bool ok = io.run(); + ASSERT_TRUE(ok); +} + +TEST(FifoTest, MultiThreadRW_Wait) { + MultiThreadRW> io(65536, 1024, rwtotal, true); + bool ok = io.run(); + ASSERT_TRUE(ok); +} + +TEST(FifoTest, MultiThreadRW_PA_Wait) { + MultiThreadRW> io(65536, 1024, rwtotal, true); + bool ok = io.run(); + ASSERT_TRUE(ok); +} + +template +class MultiThreadRegionRW { + T_FIFO m_fifo; + bool m_ok; + const int m_bufferSize; + const int m_total; + const bool m_wait; + + public: + MultiThreadRegionRW( + int ringBufferSize, int bufferSize, int total, bool wait) + : m_fifo(ringBufferSize), + m_ok{}, + m_bufferSize(bufferSize), + m_total(total), + m_wait(wait) { + } + + void write() { + int k = 0; + std::vector buffer(m_bufferSize); + while (k != m_total) { + int n = std::min(m_bufferSize, m_total - k); + int* ptr1; + int* ptr2; + ring_buffer_size_t size1; + ring_buffer_size_t size2; + if (m_wait) { + while (m_fifo.writeAvailable() < n) { + } + } + n = m_fifo.aquireWriteRegions(n, &ptr1, &size1, &ptr2, &size2); + for (int j = 0; j < size1; j++) { + ptr1[j] = k++; + } + for (int j = 0; j < size2; j++) { + ptr2[j] = k++; + } + m_fifo.releaseWriteRegions(n); + } + } + + void read() { + m_ok = true; + int k = 0; + std::vector buffer(m_bufferSize); + while (k != m_total) { + int n = std::min(m_bufferSize, m_total - k); + int* ptr1; + int* ptr2; + ring_buffer_size_t size1; + ring_buffer_size_t size2; + if (m_wait) { + while (m_fifo.readAvailable() < n) { + } + } + n = m_fifo.aquireReadRegions(n, &ptr1, &size1, &ptr2, &size2); + for (int j = 0; j < size1; j++) { + m_ok &= (ptr1[j] == k++); + } + for (int j = 0; j < size2; j++) { + m_ok &= (ptr2[j] == k++); + } + m_fifo.releaseReadRegions(n); + } + } + + bool run() { + m_ok = true; + std::thread th1(&MultiThreadRegionRW::write, this); + std::thread th2(&MultiThreadRegionRW::read, this); + th1.join(); + th2.join(); + return m_ok; + } +}; + +TEST(FifoTest, MultiThreadRegionRW) { + MultiThreadRegionRW> io(65536, 256, rwtotal, false); + bool ok = io.run(); + ASSERT_TRUE(ok); +} + +TEST(FifoTest, MultiThreadRegionRW_PA) { + MultiThreadRegionRW> io(65536, 256, rwtotal, false); + bool ok = io.run(); + ASSERT_TRUE(ok); +} + +TEST(FifoTest, MultiThreadRegionRW_Wait) { + MultiThreadRegionRW> io(65536, 256, rwtotal, true); + bool ok = io.run(); + ASSERT_TRUE(ok); +} + +TEST(FifoTest, MultiThreadRegionRW_PA_Wait) { + MultiThreadRegionRW> io(65536, 256, rwtotal, true); + bool ok = io.run(); + ASSERT_TRUE(ok); +} diff --git a/src/util/fifo.h b/src/util/fifo.h index d41afbfa59a..7da69ceeb38 100644 --- a/src/util/fifo.h +++ b/src/util/fifo.h @@ -1,11 +1,14 @@ #pragma once -#include "pa_ringbuffer.h" +#include +#include +#include "pa_ringbuffer.h" #include "util/class.h" #include "util/math.h" -template +namespace PA { +template class FIFO { public: explicit FIFO(int size) @@ -40,19 +43,31 @@ class FIFO { } } int aquireWriteRegions(int count, - DataType** dataPtr1, ring_buffer_size_t* sizePtr1, - DataType** dataPtr2, ring_buffer_size_t* sizePtr2) { - return PaUtil_GetRingBufferWriteRegions(&m_ringBuffer, count, - (void**)dataPtr1, sizePtr1, (void**)dataPtr2, sizePtr2); + DataType** dataPtr1, + ring_buffer_size_t* sizePtr1, + DataType** dataPtr2, + ring_buffer_size_t* sizePtr2) { + return PaUtil_GetRingBufferWriteRegions(&m_ringBuffer, + count, + (void**)dataPtr1, + sizePtr1, + (void**)dataPtr2, + sizePtr2); } int releaseWriteRegions(int count) { return PaUtil_AdvanceRingBufferWriteIndex(&m_ringBuffer, count); } int aquireReadRegions(int count, - DataType** dataPtr1, ring_buffer_size_t* sizePtr1, - DataType** dataPtr2, ring_buffer_size_t* sizePtr2) { - return PaUtil_GetRingBufferReadRegions(&m_ringBuffer, count, - (void**)dataPtr1, sizePtr1, (void**)dataPtr2, sizePtr2); + DataType** dataPtr1, + ring_buffer_size_t* sizePtr1, + DataType** dataPtr2, + ring_buffer_size_t* sizePtr2) { + return PaUtil_GetRingBufferReadRegions(&m_ringBuffer, + count, + (void**)dataPtr1, + sizePtr1, + (void**)dataPtr2, + sizePtr2); } int releaseReadRegions(int count) { return PaUtil_AdvanceRingBufferReadIndex(&m_ringBuffer, count); @@ -67,3 +82,162 @@ class FIFO { PaUtilRingBuffer m_ringBuffer; DISALLOW_COPY_AND_ASSIGN(FIFO); }; +} // namespace PA + +// Fast, trivial type only single producer single consumer ring buffer, lock and wait free. +// Internal buffer size will be rounded up to a power of two. + +// Internally we use std::size_t, but the original API used int for the return values. +// It would be better to use std::size_t also for the return values, but this breaks +// Windows compilation and should be done in a follow-up PR. +// using ring_buffer_size_t = int; + +template +class FIFO { + public: + using size_type = std::size_t; + + explicit FIFO(size_type size) + : m_size{roundUpToPowerOf2(static_cast(size))}, + m_mask{m_size - 1}, + m_data(m_size), + m_writeIndex{0}, + m_readIndex{0} { + } + + // Returns the number of values in the ringbuffer available for read + int readAvailable() const { + const size_type readIndex = m_readIndex.load(std::memory_order_relaxed); + const size_type writeIndex = m_writeIndex.load(std::memory_order_acquire); + return static_cast(writeIndex - readIndex); + } + // Returns the space in the ringbuffer available for write + int writeAvailable() const { + const size_type readIndex = m_readIndex.load(std::memory_order_acquire); + const size_type writeIndex = m_writeIndex.load(std::memory_order_relaxed); + return static_cast(m_size - (writeIndex - readIndex)); + } + // Read count values to the ring buffer. If less then count values are + // available, only read the available amount. The return value is the + // actual number of values read. If the read index reached the end of the + // ringbuffer, the remainder is read from the start. + int read(DataType* pData, size_type count) { + size_type readIndex = m_readIndex.load(std::memory_order_relaxed); + const size_type writeIndex = m_writeIndex.load(std::memory_order_acquire); + const size_type available = writeIndex - readIndex; + count = std::min(available, count); + readIndex = readIndex & m_mask; + const size_type n = std::min(m_size - readIndex, count); + std::copy(m_data.data() + readIndex, m_data.data() + readIndex + n, pData); + std::copy(m_data.data(), m_data.data() + count - n, pData + n); + m_readIndex.fetch_add(count, std::memory_order_release); + return static_cast(count); + } + // Write count samples to the ring buffer. If space available is less then + // count, only write the available space amount. The return value is the + // actual number of values written. If the write index reached the end of the + // ringbuffer, the remainder is written to the start. + int write(const DataType* pData, size_type count) { + const size_type readIndex = m_readIndex.load(std::memory_order_acquire); + size_type writeIndex = m_writeIndex.load(std::memory_order_relaxed); + const size_type available = m_size - (writeIndex - readIndex); + count = std::min(available, count); + writeIndex = writeIndex & m_mask; + const size_type n = std::min(m_size - writeIndex, count); + std::copy(pData, pData + n, m_data.data() + writeIndex); + std::copy(pData + n, pData + count, m_data.data()); + m_writeIndex.fetch_add(count, std::memory_order_release); + return static_cast(count); + } + void writeBlocking(const DataType* pData, size_type count) { + size_type written = 0; + while (written < count) { + written += write(pData + written, count - written); + } + } + // Give direct access to the ring buffer at the read index + // to a region of count values. If less than count values + // are available, limit to the available amount. + // If the region surpasses the end of the ring buffer, + // the first region will be until the end, the second + // region will be the remainder from the start, otherwise + // the second region will be nullptr and size 0. + int aquireReadRegions(size_type count, + DataType** dataPtr1, + ring_buffer_size_t* sizePtr1, + DataType** dataPtr2, + ring_buffer_size_t* sizePtr2) { + size_type readIndex = m_readIndex.load(std::memory_order_relaxed); + const size_type writeIndex = m_writeIndex.load(std::memory_order_acquire); + const size_type available = writeIndex - readIndex; + count = std::min(available, count); + readIndex = readIndex & m_mask; + *sizePtr1 = static_cast(std::min(m_size - readIndex, count)); + *sizePtr2 = static_cast(count) - *sizePtr1; + *dataPtr1 = m_data.data() + readIndex; + *dataPtr2 = *sizePtr2 == 0 ? nullptr : m_data.data(); + + return static_cast(count); + } + // Advance the read index after aquireReadRegions. Count should + // be the return value of aquireReadRegions, which is the total + // region size acquired. + int releaseReadRegions(size_type count) { + const size_type readIndex = (m_readIndex.load(std::memory_order_relaxed) + count); + m_readIndex.store(readIndex, std::memory_order_release); + return static_cast(readIndex & m_mask); + } + // Same as aquireReadRegions, for write operations + int aquireWriteRegions(size_type count, + DataType** dataPtr1, + ring_buffer_size_t* sizePtr1, + DataType** dataPtr2, + ring_buffer_size_t* sizePtr2) { + const size_type readIndex = m_readIndex.load(std::memory_order_acquire); + size_type writeIndex = m_writeIndex.load(std::memory_order_relaxed); + const size_type available = m_size - (writeIndex - readIndex); + count = std::min(available, count); + writeIndex = writeIndex & m_mask; + *sizePtr1 = static_cast(std::min(m_size - writeIndex, count)); + *sizePtr2 = static_cast(count) - *sizePtr1; + *dataPtr1 = m_data.data() + writeIndex; + *dataPtr2 = *sizePtr2 == 0 ? nullptr : m_data.data(); + + return static_cast(count); + } + // Same as releaseReadRegions, for write operations + int releaseWriteRegions(size_type count) { + const size_type writeIndex = (m_writeIndex.load(std::memory_order_relaxed) + count); + m_writeIndex.store(writeIndex, std::memory_order_release); + return static_cast(writeIndex & m_mask); + } + // Advance the read index with count values, or maximum until the write index. + // Returns the new read index (wrapped inside the buffer size) + int flushReadData(size_type count) { + size_type readIndex = m_readIndex.load(std::memory_order_relaxed); + const size_type writeIndex = m_writeIndex.load(std::memory_order_acquire); + const size_type available = writeIndex - readIndex; + count = std::min(available, count); + readIndex += count; + m_readIndex.store(readIndex, std::memory_order_release); + return static_cast(readIndex & m_mask); + } + + private: + const size_type m_size; + const size_type m_mask; + std::vector m_data; + std::atomic m_writeIndex; + std::atomic m_readIndex; + // The memory order have the atomic has to be guaranteed: + // - The read functions have to use memory_order_acquire to access the write + // index, while the write functions, which modify the write index, have to + // use memory_order_release. This is to ensure that the read function will + // not see the write index before it has been modified. + // - Vice versa the read index has to be accessed with memory_order_acquire + // in the write functions and changed with memory_order_release. + // - All other access can be memory_order_relaxed as no other operations on + // the atomics take place in between the operations described above. + + DISALLOW_COPY_AND_ASSIGN(FIFO); +};