Skip to content

Commit

Permalink
Optimize uring sending messages to other uring queues
Browse files Browse the repository at this point in the history
  • Loading branch information
Oipo committed Nov 10, 2024
1 parent d967dc5 commit 1aa3656
Show file tree
Hide file tree
Showing 12 changed files with 79 additions and 3 deletions.
1 change: 1 addition & 0 deletions build_common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ run_examples ()
../bin/ichor_etcd_example || exit 1
fi
if [[ URING -eq 1 ]]; then
../bin/ichor_multithreaded_example_uring || exit 1
../bin/ichor_tcp_example_uring || exit 1
../bin/ichor_timer_example_uring || exit 1
../bin/ichor_yielding_timer_example_uring || exit 1
Expand Down
8 changes: 8 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ if(ICHOR_USE_BOOST_BEAST)
target_link_libraries(ichor_pong_example ichor)
endif()

if(ICHOR_USE_LIBURING)
file(GLOB_RECURSE EXAMPLE_SOURCES ${ICHOR_TOP_DIR}/examples/multithreaded_example/*.cpp)
add_executable(ichor_multithreaded_example_uring ${EXAMPLE_SOURCES})
target_link_libraries(ichor_multithreaded_example_uring ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(ichor_multithreaded_example_uring ichor)
target_compile_definitions(ichor_multithreaded_example_uring PUBLIC URING_EXAMPLE)
endif()

if(ICHOR_USE_LIBURING AND NOT (ICHOR_SKIP_EXTERNAL_TESTS AND ICHOR_AARCH64))
file(GLOB_RECURSE EXAMPLE_SOURCES ${ICHOR_TOP_DIR}/examples/tcp_example/*.cpp)
add_executable(ichor_tcp_example_uring ${EXAMPLE_SOURCES})
Expand Down
38 changes: 35 additions & 3 deletions examples/multithreaded_example/main.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#include "OneService.h"
#include "OtherService.h"
#include <ichor/event_queues/PriorityQueue.h>
#include <ichor/services/logging/LoggerFactory.h>
#include <ichor/services/logging/NullFrameworkLogger.h>
#include <ichor/ichor-mimalloc.h>
Expand All @@ -14,6 +13,23 @@
#define LOGGER_TYPE CoutLogger
#endif

#if defined(URING_EXAMPLE)
#include <ichor/event_queues/IOUringQueue.h>

#define QIMPL IOUringQueue
#elif defined(SDEVENT_EXAMPLE)
#include <ichor/event_queues/SdeventQueue.h>

#define QIMPL SdeventQueue
#else
#include <ichor/event_queues/PriorityQueue.h>
#ifdef ORDERED_EXAMPLE
#define QIMPL OrderedPriorityQueue
#else
#define QIMPL PriorityQueue
#endif
#endif

#include <ichor/CommunicationChannel.h>
#include <chrono>
#include <iostream>
Expand All @@ -29,31 +45,47 @@ int main(int argc, char *argv[]) {
auto start = std::chrono::steady_clock::now();

CommunicationChannel channel{};
auto queueOne = std::make_unique<PriorityQueue>();
auto queueOne = std::make_unique<QIMPL>();
auto &dmOne = queueOne->createManager();
auto queueTwo = std::make_unique<PriorityQueue>();
auto queueTwo = std::make_unique<QIMPL>();
auto &dmTwo = queueTwo->createManager();
channel.addManager(&dmOne);
channel.addManager(&dmTwo);

std::thread t1([&] {
#if defined(URING_EXAMPLE)
queueOne->createEventLoop();
#elif defined(SDEVENT_EXAMPLE)
auto *loop = queue->createEventLoop();
#endif
#ifdef ICHOR_USE_SPDLOG
dmOne.createServiceManager<SpdlogSharedService, ISpdlogSharedService>();
#endif
dmOne.createServiceManager<NullFrameworkLogger, IFrameworkLogger>();
dmOne.createServiceManager<LoggerFactory<LOGGER_TYPE>, ILoggerFactory>(Properties{{"DefaultLogLevel", Ichor::make_any<LogLevel>(LogLevel::LOG_INFO)}});
dmOne.createServiceManager<OneService>();
queueOne->start(CaptureSigInt);
#if defined(SDEVENT_EXAMPLE)
sd_event_loop(loop);
#endif
});

std::thread t2([&] {
#if defined(URING_EXAMPLE)
queueTwo->createEventLoop();
#elif defined(SDEVENT_EXAMPLE)
auto *loop = queue->createEventLoop();
#endif
#ifdef ICHOR_USE_SPDLOG
dmTwo.createServiceManager<SpdlogSharedService, ISpdlogSharedService>();
#endif
dmTwo.createServiceManager<NullFrameworkLogger, IFrameworkLogger>();
dmTwo.createServiceManager<LoggerFactory<LOGGER_TYPE>, ILoggerFactory>(Properties{{"DefaultLogLevel", Ichor::make_any<LogLevel>(LogLevel::LOG_INFO)}});
dmTwo.createServiceManager<OtherService>();
queueTwo->start(CaptureSigInt);
#if defined(SDEVENT_EXAMPLE)
sd_event_loop(loop);
#endif
});

t1.join();
Expand Down
1 change: 1 addition & 0 deletions include/ichor/DependencyManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,7 @@ namespace Ichor {
/// Returns thread-local manager. Can only be used after the manager's start() function has been called.
/// \return
[[nodiscard]] DependencyManager& GetThreadLocalManager() noexcept;
[[nodiscard]] bool HasThreadLocalManager() noexcept;
}

#include <ichor/coroutines/AsyncGeneratorDetail.h>
3 changes: 3 additions & 0 deletions include/ichor/event_queues/IEventQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ namespace Ichor {
/// Thread-safe
[[nodiscard]] virtual bool is_running() const noexcept = 0;

/// Thread-safe
[[nodiscard]] virtual NameHashType get_queue_name_hash() const noexcept = 0;

/// Starts the event loop, consumes the current thread until a QuitEvent occurs
/// \param captureSigInt If true, exit on CTRL+C/SigInt
/// \return true if started, false if there was a problem
Expand Down
1 change: 1 addition & 0 deletions include/ichor/event_queues/IOUringQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ namespace Ichor {
[[nodiscard]] bool empty() const final;
[[nodiscard]] uint64_t size() const final;
[[nodiscard]] bool is_running() const noexcept final;
[[nodiscard]] NameHashType get_queue_name_hash() const noexcept final;

/// Creates an io_uring event loop with a standard flagset.
/// \param entriesCount
Expand Down
1 change: 1 addition & 0 deletions include/ichor/event_queues/PriorityQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ namespace Ichor {
[[nodiscard]] bool empty() const noexcept final;
[[nodiscard]] uint64_t size() const noexcept final;
[[nodiscard]] bool is_running() const noexcept final;
[[nodiscard]] NameHashType get_queue_name_hash() const noexcept final;

bool start(bool captureSigInt) final;
[[nodiscard]] bool shouldQuit() final;
Expand Down
1 change: 1 addition & 0 deletions include/ichor/event_queues/SdeventQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ namespace Ichor {
[[nodiscard]] bool empty() const final;
[[nodiscard]] uint64_t size() const final;
[[nodiscard]] bool is_running() const noexcept final;
[[nodiscard]] NameHashType get_queue_name_hash() const noexcept final;

[[nodiscard]] sd_event* createEventLoop();
void useEventLoop(sd_event *loop);
Expand Down
4 changes: 4 additions & 0 deletions src/ichor/DependencyManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1587,6 +1587,10 @@ void Ichor::DependencyManager::clearCommunicationChannel() {
_communicationChannel = nullptr;
}

[[nodiscard]] bool Ichor::HasThreadLocalManager() noexcept {
return Detail::_local_dm != nullptr;
}

[[nodiscard]] Ichor::DependencyManager& Ichor::GetThreadLocalManager() noexcept {
return *Detail::_local_dm;
}
Expand Down
15 changes: 15 additions & 0 deletions src/ichor/event_queues/IOUringQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,17 @@ namespace Ichor {
}

TSAN_ANNOTATE_HAPPENS_BEFORE(procEvent);

if(HasThreadLocalManager()) {
auto &q = GetThreadLocalEventQueue();
if(q.get_queue_name_hash() == get_queue_name_hash()) {
auto &ioq = static_cast<IOUringQueue&>(q);
auto sqe = ioq.getSqe();
io_uring_prep_msg_ring(sqe, _eventQueuePtr->ring_fd, 0, reinterpret_cast<uintptr_t>(reinterpret_cast<void*>(procEvent)), 0);
return;
}
}

io_uring tempQueue{};
io_uring_params p{};
p.flags = IORING_SETUP_ATTACH_WQ;
Expand Down Expand Up @@ -537,6 +548,10 @@ namespace Ichor {
return !_quit.load(std::memory_order_acquire);
}

NameHashType IOUringQueue::get_queue_name_hash() const noexcept {
return typeNameHash<IOUringQueue>();
}

tl::optional<NeverNull<io_uring*>> IOUringQueue::createEventLoop(unsigned entriesCount) {
if(entriesCount == 0) {
fmt::println("Cannot create an event loop with 0 entries.");
Expand Down
5 changes: 5 additions & 0 deletions src/ichor/event_queues/PriorityQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ namespace Ichor {
return !_quit.load(std::memory_order_acquire);
}

template <typename COMPARE>
NameHashType TemplatePriorityQueue<COMPARE>::get_queue_name_hash() const noexcept {
return typeNameHash<TemplatePriorityQueue<COMPARE>>();
}

template <typename COMPARE>
bool TemplatePriorityQueue<COMPARE>::start(bool captureSigInt) {
if(!_dm) [[unlikely]] {
Expand Down
4 changes: 4 additions & 0 deletions src/ichor/event_queues/SdeventQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ namespace Ichor {
return !_quit.load(std::memory_order_acquire);
}

NameHashType SdeventQueue::get_queue_name_hash() const noexcept {
return typeNameHash<SdeventQueue>();
}

[[nodiscard]] sd_event* SdeventQueue::createEventLoop() {
auto ret = sd_event_default(&_eventQueue);

Expand Down

0 comments on commit 1aa3656

Please sign in to comment.