Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[libc] Remove more libc dependencies from the RPC header #116437

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 50 additions & 37 deletions libc/src/__support/RPC/rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
#define LLVM_LIBC_SRC___SUPPORT_RPC_RPC_H

#include "rpc_util.h"
#include "src/__support/CPP/algorithm.h" // max
#include "src/__support/CPP/atomic.h"
#include "src/__support/CPP/optional.h"
#include "src/__support/GPU/utils.h"
#include "src/__support/macros/config.h"
Expand All @@ -30,6 +28,17 @@
namespace LIBC_NAMESPACE_DECL {
namespace rpc {

/// Use scoped atomic variants if they are available for the target.
#if !__has_builtin(__scoped_atomic_load_n)
#define __scoped_atomic_load_n(src, ord, scp) __atomic_load_n(src, ord)
#define __scoped_atomic_store_n(dst, src, ord, scp) \
__atomic_store_n(dst, src, ord)
#define __scoped_atomic_fetch_or(src, val, ord, scp) \
__atomic_fetch_or(src, val, ord)
#define __scoped_atomic_fetch_and(src, val, ord, scp) \
__atomic_fetch_and(src, val, ord)
#endif

/// A fixed size channel used to communicate between the RPC client and server.
struct Buffer {
uint64_t data[8];
Expand Down Expand Up @@ -67,18 +76,18 @@ template <bool Invert> struct Process {
LIBC_INLINE ~Process() = default;

uint32_t port_count = 0;
cpp::Atomic<uint32_t> *inbox = nullptr;
cpp::Atomic<uint32_t> *outbox = nullptr;
uint32_t *inbox = nullptr;
uint32_t *outbox = nullptr;
Header *header = nullptr;
Buffer *packet = nullptr;

static constexpr uint64_t NUM_BITS_IN_WORD = sizeof(uint32_t) * 8;
cpp::Atomic<uint32_t> lock[MAX_PORT_COUNT / NUM_BITS_IN_WORD] = {0};
uint32_t lock[MAX_PORT_COUNT / NUM_BITS_IN_WORD] = {0};

LIBC_INLINE Process(uint32_t port_count, void *buffer)
: port_count(port_count), inbox(reinterpret_cast<cpp::Atomic<uint32_t> *>(
: port_count(port_count), inbox(reinterpret_cast<uint32_t *>(
advance(buffer, inbox_offset(port_count)))),
outbox(reinterpret_cast<cpp::Atomic<uint32_t> *>(
outbox(reinterpret_cast<uint32_t *>(
advance(buffer, outbox_offset(port_count)))),
header(reinterpret_cast<Header *>(
advance(buffer, header_offset(port_count)))),
Expand All @@ -102,15 +111,15 @@ template <bool Invert> struct Process {
/// Retrieve the inbox state from memory shared between processes.
LIBC_INLINE uint32_t load_inbox(uint64_t lane_mask, uint32_t index) const {
return gpu::broadcast_value(
lane_mask,
inbox[index].load(cpp::MemoryOrder::RELAXED, cpp::MemoryScope::SYSTEM));
lane_mask, __scoped_atomic_load_n(&inbox[index], __ATOMIC_RELAXED,
__MEMORY_SCOPE_SYSTEM));
}

/// Retrieve the outbox state from memory shared between processes.
LIBC_INLINE uint32_t load_outbox(uint64_t lane_mask, uint32_t index) const {
return gpu::broadcast_value(lane_mask,
outbox[index].load(cpp::MemoryOrder::RELAXED,
cpp::MemoryScope::SYSTEM));
return gpu::broadcast_value(
lane_mask, __scoped_atomic_load_n(&outbox[index], __ATOMIC_RELAXED,
__MEMORY_SCOPE_SYSTEM));
}

/// Signal to the other process that this one is finished with the buffer.
Expand All @@ -119,9 +128,9 @@ template <bool Invert> struct Process {
/// cheaper than calling load_outbox to get the value to store.
LIBC_INLINE uint32_t invert_outbox(uint32_t index, uint32_t current_outbox) {
uint32_t inverted_outbox = !current_outbox;
atomic_thread_fence(cpp::MemoryOrder::RELEASE);
outbox[index].store(inverted_outbox, cpp::MemoryOrder::RELAXED,
cpp::MemoryScope::SYSTEM);
__atomic_thread_fence(__ATOMIC_RELEASE);
__scoped_atomic_store_n(&outbox[index], inverted_outbox, __ATOMIC_RELAXED,
__MEMORY_SCOPE_SYSTEM);
return inverted_outbox;
}

Expand All @@ -133,7 +142,7 @@ template <bool Invert> struct Process {
sleep_briefly();
in = load_inbox(lane_mask, index);
}
atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);
__atomic_thread_fence(__ATOMIC_ACQUIRE);
}

/// The packet is a linearly allocated array of buffers used to communicate
Expand All @@ -155,8 +164,7 @@ template <bool Invert> struct Process {
/// lane_mask is a bitmap of the threads in the warp that would hold the
/// single lock on success, e.g. the result of gpu::get_lane_mask()
/// The lock is held when the n-th bit of the lock bitfield is set.
[[clang::convergent]] LIBC_INLINE bool try_lock(uint64_t lane_mask,
uint32_t index) {
LIBC_INLINE bool try_lock(uint64_t lane_mask, uint32_t index) {
// On amdgpu, test and set to the nth lock bit and a sync_lane would suffice
// On volta, need to handle differences between the threads running and
// the threads that were detected in the previous call to get_lane_mask()
Expand Down Expand Up @@ -190,16 +198,15 @@ template <bool Invert> struct Process {
// inlining the current function.
bool holding_lock = lane_mask != packed;
if (holding_lock)
atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);
__atomic_thread_fence(__ATOMIC_ACQUIRE);
return holding_lock;
}

/// Unlock the lock at index. We need a lane sync to keep this function
/// convergent, otherwise the compiler will sink the store and deadlock.
[[clang::convergent]] LIBC_INLINE void unlock(uint64_t lane_mask,
uint32_t index) {
LIBC_INLINE void unlock(uint64_t lane_mask, uint32_t index) {
// Do not move any writes past the unlock.
atomic_thread_fence(cpp::MemoryOrder::RELEASE);
__atomic_thread_fence(__ATOMIC_RELEASE);

// Use exactly one thread to clear the nth bit in the lock array Must
// restrict to a single thread to avoid one thread dropping the lock, then
Expand All @@ -211,7 +218,7 @@ template <bool Invert> struct Process {

/// Number of bytes to allocate for an inbox or outbox.
LIBC_INLINE static constexpr uint64_t mailbox_bytes(uint32_t port_count) {
return port_count * sizeof(cpp::Atomic<uint32_t>);
return port_count * sizeof(uint32_t);
}

/// Number of bytes to allocate for the buffer containing the packets.
Expand Down Expand Up @@ -242,24 +249,24 @@ template <bool Invert> struct Process {
}

/// Conditionally set the n-th bit in the atomic bitfield.
LIBC_INLINE static constexpr uint32_t set_nth(cpp::Atomic<uint32_t> *bits,
uint32_t index, bool cond) {
LIBC_INLINE static constexpr uint32_t set_nth(uint32_t *bits, uint32_t index,
bool cond) {
uint32_t slot = index / NUM_BITS_IN_WORD;
uint32_t bit = index % NUM_BITS_IN_WORD;
return bits[slot].fetch_or(static_cast<uint32_t>(cond) << bit,
cpp::MemoryOrder::RELAXED,
cpp::MemoryScope::DEVICE) &
return __scoped_atomic_fetch_or(&bits[slot],
static_cast<uint32_t>(cond) << bit,
__ATOMIC_RELAXED, __MEMORY_SCOPE_DEVICE) &
(1u << bit);
}

/// Conditionally clear the n-th bit in the atomic bitfield.
LIBC_INLINE static constexpr uint32_t clear_nth(cpp::Atomic<uint32_t> *bits,
LIBC_INLINE static constexpr uint32_t clear_nth(uint32_t *bits,
uint32_t index, bool cond) {
uint32_t slot = index / NUM_BITS_IN_WORD;
uint32_t bit = index % NUM_BITS_IN_WORD;
return bits[slot].fetch_and(~0u ^ (static_cast<uint32_t>(cond) << bit),
cpp::MemoryOrder::RELAXED,
cpp::MemoryScope::DEVICE) &
return __scoped_atomic_fetch_and(&bits[slot],
~0u ^ (static_cast<uint32_t>(cond) << bit),
__ATOMIC_RELAXED, __MEMORY_SCOPE_DEVICE) &
(1u << bit);
}
};
Expand Down Expand Up @@ -450,7 +457,7 @@ LIBC_INLINE void Port<T>::send_n(const void *const *src, uint64_t *size) {
send([&](Buffer *buffer, uint32_t id) {
reinterpret_cast<uint64_t *>(buffer->data)[0] = lane_value(size, id);
num_sends = is_process_gpu() ? lane_value(size, id)
: cpp::max(lane_value(size, id), num_sends);
: rpc::max(lane_value(size, id), num_sends);
uint64_t len =
lane_value(size, id) > sizeof(Buffer::data) - sizeof(uint64_t)
? sizeof(Buffer::data) - sizeof(uint64_t)
Expand Down Expand Up @@ -483,7 +490,7 @@ LIBC_INLINE void Port<T>::recv_n(void **dst, uint64_t *size, A &&alloc) {
lane_value(dst, id) =
reinterpret_cast<uint8_t *>(alloc(lane_value(size, id)));
num_recvs = is_process_gpu() ? lane_value(size, id)
: cpp::max(lane_value(size, id), num_recvs);
: rpc::max(lane_value(size, id), num_recvs);
uint64_t len =
lane_value(size, id) > sizeof(Buffer::data) - sizeof(uint64_t)
? sizeof(Buffer::data) - sizeof(uint64_t)
Expand All @@ -510,8 +517,7 @@ LIBC_INLINE void Port<T>::recv_n(void **dst, uint64_t *size, A &&alloc) {
/// port. Each port instance uses an associated \p opcode to tell the server
/// what to do. The Client interface provides the appropriate lane size to the
/// port using the platform's returned value.
template <uint16_t opcode>
[[clang::convergent]] LIBC_INLINE Client::Port Client::open() {
template <uint16_t opcode> LIBC_INLINE Client::Port Client::open() {
// Repeatedly perform a naive linear scan for a port that can be opened to
// send data.
for (uint32_t index = gpu::get_cluster_id();; ++index) {
Expand Down Expand Up @@ -545,7 +551,7 @@ template <uint16_t opcode>

/// Attempts to open a port to use as the server. The server can only open a
/// port if it has a pending receive operation
[[clang::convergent]] LIBC_INLINE cpp::optional<typename Server::Port>
LIBC_INLINE cpp::optional<typename Server::Port>
Server::try_open(uint32_t lane_size, uint32_t start) {
// Perform a naive linear scan for a port that has a pending request.
for (uint32_t index = start; index < process.port_count; ++index) {
Expand Down Expand Up @@ -583,6 +589,13 @@ LIBC_INLINE Server::Port Server::open(uint32_t lane_size) {
}
}

#if !__has_builtin(__scoped_atomic_load_n)
#undef __scoped_atomic_load_n
#undef __scoped_atomic_store_n
#undef __scoped_atomic_fetch_or
#undef __scoped_atomic_fetch_and
#endif

} // namespace rpc
} // namespace LIBC_NAMESPACE_DECL

Expand Down
19 changes: 6 additions & 13 deletions libc/src/__support/RPC/rpc_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,16 @@
#define LLVM_LIBC_SRC___SUPPORT_RPC_RPC_UTIL_H

#include "src/__support/CPP/type_traits.h"
#include "src/__support/GPU/utils.h"
#include "src/__support/macros/attributes.h"
#include "src/__support/macros/config.h"
#include "src/__support/macros/properties/architectures.h"
#include "src/__support/threads/sleep.h"
#include "src/string/memory_utils/generic/byte_per_byte.h"
#include "src/string/memory_utils/inline_memcpy.h"

namespace LIBC_NAMESPACE_DECL {
namespace rpc {

/// Conditional to indicate if this process is running on the GPU.
LIBC_INLINE constexpr bool is_process_gpu() {
#if defined(LIBC_TARGET_ARCH_IS_GPU)
#if defined(__NVPTX__) || defined(__AMDGPU__)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to have a gpu_common.h public header where you could define LIBC_TARGET_ARCH_IS_GPU?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'm going to cram all of this in rpc_util.h. Question is where I should put those two headers when I'm done such that they can be included by everyone.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was assuming they were going in include/gpu/rpc_util.h

return true;
#else
return false;
Expand Down Expand Up @@ -57,14 +53,11 @@ template <typename T, typename U> LIBC_INLINE T *advance(T *ptr, U bytes) {

/// Wrapper around the optimal memory copy implementation for the target.
LIBC_INLINE void rpc_memcpy(void *dst, const void *src, size_t count) {
// The built-in memcpy prefers to fully unroll loops. We want to minimize
// resource usage so we use a single nounroll loop implementation.
#if defined(LIBC_TARGET_ARCH_IS_AMDGPU)
inline_memcpy_byte_per_byte(reinterpret_cast<Ptr>(dst),
reinterpret_cast<CPtr>(src), count);
#else
inline_memcpy(dst, src, count);
#endif
__builtin_memcpy(dst, src, count);
}

template <class T> LIBC_INLINE constexpr const T &max(const T &a, const T &b) {
return (a < b) ? b : a;
}

} // namespace rpc
Expand Down
Loading