From d2d92b74a8327daf9652a9732454937f3529bd30 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Tue, 21 May 2024 19:36:49 +0800 Subject: [PATCH] unix: support SO_REUSEPORT with load balancing for TCP --- CMakeLists.txt | 1 + Makefile.am | 1 + docs/src/tcp.rst | 53 ++++++++-- include/uv.h | 13 ++- src/unix/tcp.c | 50 +++++++++ src/win/tcp.c | 6 ++ test/test-list.h | 3 + test/test-tcp-reuseport.c | 215 ++++++++++++++++++++++++++++++++++++++ 8 files changed, 334 insertions(+), 8 deletions(-) create mode 100644 test/test-tcp-reuseport.c diff --git a/CMakeLists.txt b/CMakeLists.txt index f5bb871b533..ce086f4ae7f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -644,6 +644,7 @@ if(LIBUV_BUILD_TESTS) test/test-tcp-oob.c test/test-tcp-open.c test/test-tcp-read-stop.c + test/test-tcp-reuseport.c test/test-tcp-read-stop-start.c test/test-tcp-rst.c test/test-tcp-shutdown-after-write.c diff --git a/Makefile.am b/Makefile.am index c344f7dc9f4..585934bd39c 100644 --- a/Makefile.am +++ b/Makefile.am @@ -276,6 +276,7 @@ test_run_tests_SOURCES = test/blackhole-server.c \ test/test-tcp-flags.c \ test/test-tcp-open.c \ test/test-tcp-read-stop.c \ + test/test-tcp-reuseport.c \ test/test-tcp-read-stop-start.c \ test/test-tcp-rst.c \ test/test-tcp-shutdown-after-write.c \ diff --git a/docs/src/tcp.rst b/docs/src/tcp.rst index f351214ff92..5b5453b0591 100644 --- a/docs/src/tcp.rst +++ b/docs/src/tcp.rst @@ -16,6 +16,28 @@ Data types TCP handle type. +.. c:type:: uv_tcp_flags + + Flags used in :c:func:`uv_tcp_bind`. + + :: + + enum uv_tcp_flags { + /* Used with uv_tcp_bind, when an IPv6 address is used. */ + UV_TCP_IPV6ONLY = 1, + + /* Enable SO_REUSEPORT socket option when binding the handle. + * This allows completely duplicate bindings by multiple processes + * or threads if they all set SO_REUSEPORT before binding the port. + * Incoming connections are distributed across the participating + * listener sockets. + * + * This flag is available only on Linux 3.9+, DragonFlyBSD 3.6+, + * FreeBSD 12.0+, Solaris 11.4, and AIX 7.2.5+ for now. + */ + UV_TCP_REUSEPORT = 2, + }; + Public members ^^^^^^^^^^^^^^ @@ -81,16 +103,33 @@ API .. c:function:: int uv_tcp_bind(uv_tcp_t* handle, const struct sockaddr* addr, unsigned int flags) - Bind the handle to an address and port. `addr` should point to an - initialized ``struct sockaddr_in`` or ``struct sockaddr_in6``. + Bind the handle to an address and port. When the port is already taken, you can expect to see an ``UV_EADDRINUSE`` - error from :c:func:`uv_listen` or :c:func:`uv_tcp_connect`. That is, - a successful call to this function does not guarantee that the call - to :c:func:`uv_listen` or :c:func:`uv_tcp_connect` will succeed as well. + error from :c:func:`uv_listen` or :c:func:`uv_tcp_connect` unless you specify + ``UV_TCP_REUSEPORT`` in `flags` for all the binding sockets. That is, a successful + call to this function does not guarantee that the call to :c:func:`uv_listen` or + :c:func:`uv_tcp_connect` will succeed as well. - `flags` can contain ``UV_TCP_IPV6ONLY``, in which case dual-stack support - is disabled and only IPv6 is used. + :param handle: TCP handle. It should have been initialized with :c:func:`uv_tcp_init`. + + :param addr: Address to bind to. It should point to an initialized ``struct sockaddr_in`` + or ``struct sockaddr_in6``. + + :param flags: Flags that control the behavior of binding the socket. + ``UV_TCP_IPV6ONLY`` can be contained in `flags` to disable dual-stack + support and only use IPv6. + ``UV_TCP_REUSEPORT`` can be contained in `flags` to enable the socket option + `SO_REUSEPORT` with the capability of load balancing that distribute incoming + connections across all listening sockets in multiple processes or threads. + + :returns: 0 on success, or an error code < 0 on failure. + + .. versionchanged:: 1.49.0 added the ``UV_TCP_REUSEPORT`` flag. + + .. note:: + ``UV_TCP_REUSEPORT`` flag is available only on Linux 3.9+, DragonFlyBSD 3.6+, + FreeBSD 12.0+, Solaris 11.4, and AIX 7.2.5+ at the moment. .. c:function:: int uv_tcp_getsockname(const uv_tcp_t* handle, struct sockaddr* name, int* namelen) diff --git a/include/uv.h b/include/uv.h index a62b3fa69b1..3b6b72246ac 100644 --- a/include/uv.h +++ b/include/uv.h @@ -604,7 +604,18 @@ UV_EXTERN int uv_tcp_simultaneous_accepts(uv_tcp_t* handle, int enable); enum uv_tcp_flags { /* Used with uv_tcp_bind, when an IPv6 address is used. */ - UV_TCP_IPV6ONLY = 1 + UV_TCP_IPV6ONLY = 1, + + /* Enable SO_REUSEPORT socket option when binding the handle. + * This allows completely duplicate bindings by multiple processes + * or threads if they all set SO_REUSEPORT before binding the port. + * Incoming connections are distributed across the participating + * listener sockets. + * + * This flag is available only on Linux 3.9+, DragonFlyBSD 3.6+, + * FreeBSD 12.0+, Solaris 11.4, and AIX 7.2.5+ for now. + */ + UV_TCP_REUSEPORT = 2, }; UV_EXTERN int uv_tcp_bind(uv_tcp_t* handle, diff --git a/src/unix/tcp.c b/src/unix/tcp.c index e4a65498eb5..5b8df37ff25 100644 --- a/src/unix/tcp.c +++ b/src/unix/tcp.c @@ -148,6 +148,50 @@ int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* tcp) { } +static int uv__tcp_reuseport(int fd) { + int on = 1; +#if defined(__FreeBSD__) && __FreeBSD__ >= 12 && defined(SO_REUSEPORT_LB) + /* FreeBSD 12 introduced a new socket option named SO_REUSEPORT_LB + * with the capability of load balancing, it's the substitution of + * the SO_REUSEPORTs on Linux and DragonFlyBSD. */ + if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT_LB, &on, sizeof(on))) + return UV__ERR(errno); +#elif (defined(__linux__) || \ + defined(_AIX73) || \ + (defined(__DragonFly__) && __DragonFly_version >= 300600) || \ + (defined(__sun) && defined(SO_FLOW_NAME))) && \ + defined(SO_REUSEPORT) + /* On Linux 3.9+, the SO_REUSEPORT implementation distributes connections + * evenly across all of the threads (or processes) that are blocked in + * accept() on the same port. + * + * DragonFlyBSD 3.6.0 extended SO_REUSEPORT to distribute workload to + * available sockets, which made it the equivalent of Linux's SO_REUSEPORT. + * + * AIX 7.2.5 added the feature that would add the capability to distribute + * incoming connections across all listening ports for SO_REUSEPORT. + * + * Solaris 11 supported SO_REUSEPORT, but it's implemented only for + * binding to the same address and port, without load balancing. + * Solaris 11.4 extended SO_REUSEPORT with the capability of load balancing. + * Since it's impossible to detect the Solaris 11.4 version via OS macros, + * so we check the presence of the socket option SO_FLOW_NAME that was first + * introduced to Solaris 11.4. */ + if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on))) + return UV__ERR(errno); +#else + (void) (fd); + (void) (on); + /* SO_REUSEPORTs do not have the capability of load balancing on platforms + * other than those mentioned above. The semantics are completely different, + * therefore we shouldn't enable it, but fail this operation to indicate that + * UV_TCP_REUSEPORT is not supported on these platforms. */ + return UV_ENOTSUP; +#endif + + return 0; +} + int uv__tcp_bind(uv_tcp_t* tcp, const struct sockaddr* addr, unsigned int addrlen, @@ -167,6 +211,12 @@ int uv__tcp_bind(uv_tcp_t* tcp, if (setsockopt(tcp->io_watcher.fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))) return UV__ERR(errno); + if (flags & UV_TCP_REUSEPORT) { + err = uv__tcp_reuseport(tcp->io_watcher.fd); + if (err) + return err; + } + #ifndef __OpenBSD__ #ifdef IPV6_V6ONLY if (addr->sa_family == AF_INET6) { diff --git a/src/win/tcp.c b/src/win/tcp.c index 53225e3ddc2..4ccfaff2405 100644 --- a/src/win/tcp.c +++ b/src/win/tcp.c @@ -292,6 +292,12 @@ static int uv__tcp_try_bind(uv_tcp_t* handle, DWORD err; int r; + /* There is no SO_REUSEPORT on Windows, Windows only knows SO_REUSEADDR. + * so we just return an error directly when UV_TCP_REUSEPORT is requested + * for binding the socket. */ + if (flags & UV_TCP_REUSEPORT) + return ERROR_NOT_SUPPORTED; + if (handle->socket == INVALID_SOCKET) { SOCKET sock; diff --git a/test/test-list.h b/test/test-list.h index 84c885ccc78..ad4593d86d4 100644 --- a/test/test-list.h +++ b/test/test-list.h @@ -153,6 +153,7 @@ TEST_DECLARE (tcp_write_to_half_open_connection) TEST_DECLARE (tcp_unexpected_read) TEST_DECLARE (tcp_read_stop) TEST_DECLARE (tcp_read_stop_start) +TEST_DECLARE (tcp_reuseport) TEST_DECLARE (tcp_rst) TEST_DECLARE (tcp_bind6_error_addrinuse) TEST_DECLARE (tcp_bind6_error_addrnotavail) @@ -765,6 +766,8 @@ TASK_LIST_START TEST_ENTRY (tcp_read_stop_start) + TEST_ENTRY (tcp_reuseport) + TEST_ENTRY (tcp_rst) TEST_HELPER (tcp_rst, tcp4_echo_server) diff --git a/test/test-tcp-reuseport.c b/test/test-tcp-reuseport.c new file mode 100644 index 00000000000..296301b82de --- /dev/null +++ b/test/test-tcp-reuseport.c @@ -0,0 +1,215 @@ +/* Copyright libuv project contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "uv.h" +#include "task.h" + +#include +#include +#include + +#if !defined(__linux__) && !defined(__FreeBSD__) && \ + !defined(__DragonFly__) && !defined(__sun) && !defined(_AIX73) + +TEST_IMPL(tcp_reuseport) { + struct sockaddr_in addr; + uv_loop_t* loop; + uv_tcp_t handle; + int r; + + ASSERT_OK(uv_ip4_addr("127.0.0.1", TEST_PORT, &addr)); + + loop = uv_default_loop(); + ASSERT_NOT_NULL(loop); + + r = uv_tcp_init(loop, &handle); + ASSERT_OK(r); + + r = uv_tcp_bind(&handle, (const struct sockaddr*) &addr, UV_TCP_REUSEPORT); + ASSERT_EQ(r, UV_ENOTSUP); + + MAKE_VALGRIND_HAPPY(loop); + + return 0; +} + +#else + +#define MAX_TCP_CLIENTS 10 + +static uv_tcp_t tcp_connect_handles[MAX_TCP_CLIENTS]; +static uv_connect_t tcp_connect_requests[MAX_TCP_CLIENTS]; + +static unsigned int main_loop_accepted; +static unsigned int thread_loop_accepted; +static unsigned int connected; + +static uv_mutex_t mutex; +static unsigned int accepted; + +static uv_loop_t* main_loop; +static uv_loop_t* thread_loop; +static uv_tcp_t main_handle; +static uv_tcp_t thread_handle; +static uv_timer_t main_timer_handle; +static uv_timer_t thread_timer_handle; + +static void on_close(uv_handle_t* handle) { + free(handle); +} + +static void ticktack(uv_timer_t* timer) { + ASSERT(timer == &main_timer_handle || timer == &thread_timer_handle); + + int done = 0; + uv_mutex_lock(&mutex); + if (accepted == MAX_TCP_CLIENTS) { + done = 1; + } + uv_mutex_unlock(&mutex); + + if (done) { + uv_close((uv_handle_t*) timer, NULL); + if (timer->loop == main_loop) + uv_close((uv_handle_t*) &main_handle, NULL); + if (timer->loop == thread_loop) + uv_close((uv_handle_t*) &thread_handle, NULL); + } +} + +static void on_connection(uv_stream_t* server, int status) +{ + ASSERT_OK(status); + ASSERT(server == (uv_stream_t*) &main_handle || \ + server == (uv_stream_t*) &thread_handle); + + uv_tcp_t *client = malloc(sizeof(uv_tcp_t)); + ASSERT_OK(uv_tcp_init(server->loop, client)); + ASSERT_OK(uv_accept(server, (uv_stream_t*) client)); + uv_close((uv_handle_t*) client, on_close); + + if (server->loop == main_loop) + main_loop_accepted++; + + if (server->loop == thread_loop) + thread_loop_accepted++; + + uv_mutex_lock(&mutex); + accepted++; + uv_mutex_unlock(&mutex); +} + +static void on_connect(uv_connect_t* req, int status) { + ASSERT_OK(status); + ASSERT_NOT_NULL(req->handle); + ASSERT_PTR_EQ(req->handle->loop, main_loop); + + connected++; + uv_close((uv_handle_t*) req->handle, NULL); +} + +static void run_event_loop(void* arg) { + int r; + uv_loop_t* loop = (uv_loop_t*) arg; + ASSERT_PTR_EQ(loop, thread_loop); + + r = uv_run(loop, UV_RUN_DEFAULT); + ASSERT_OK(r); +} + +static void create_listener(uv_loop_t* loop, uv_tcp_t* handle) { + struct sockaddr_in addr; + int r; + + ASSERT_OK(uv_ip4_addr("127.0.0.1", TEST_PORT, &addr)); + + r = uv_tcp_init(loop, handle); + ASSERT_OK(r); + + r = uv_tcp_bind(handle, (const struct sockaddr*) &addr, UV_TCP_REUSEPORT); + ASSERT_OK(r); + + r = uv_listen((uv_stream_t*) handle, 128, on_connection); + ASSERT_OK(r); +} + +TEST_IMPL(tcp_reuseport) { + struct sockaddr_in addr; + int r; + + r = uv_mutex_init(&mutex); + + /* Create listener per event loop. */ + main_loop = uv_default_loop(); + ASSERT_NOT_NULL(main_loop); + create_listener(main_loop, &main_handle); + uv_timer_init(main_loop, &main_timer_handle); + uv_timer_start(&main_timer_handle, ticktack, 0, 10); + + thread_loop = uv_loop_new(); + ASSERT_NOT_NULL(thread_loop); + create_listener(thread_loop, &thread_handle); + uv_timer_init(thread_loop, &thread_timer_handle); + uv_timer_start(&thread_timer_handle, ticktack, 0, 10); + + /* Connect to the peers. */ + ASSERT_OK(uv_ip4_addr("127.0.0.1", TEST_PORT, &addr)); + + int i; + for (i = 0; i < MAX_TCP_CLIENTS; i++) { + r = uv_tcp_init(main_loop, &tcp_connect_handles[i]); + ASSERT_OK(r); + r = uv_tcp_connect(&tcp_connect_requests[i], + &tcp_connect_handles[i], + (const struct sockaddr*) &addr, + on_connect); + ASSERT_OK(r); + } + + /* Run event loops and wait for them to exit. */ + uv_thread_t thread_loop_id; + uv_thread_create(&thread_loop_id, run_event_loop, thread_loop); + + r = uv_run(main_loop, UV_RUN_DEFAULT); + ASSERT_OK(r); + + uv_thread_join(&thread_loop_id); + + /* Verify if each listener per event loop accepted connections + * and the amount of accepted connections matches the one of + * connected connections. + */ + ASSERT_EQ(accepted, MAX_TCP_CLIENTS); + ASSERT_EQ(connected, MAX_TCP_CLIENTS); + ASSERT_GT(main_loop_accepted, 0); + ASSERT_GT(thread_loop_accepted, 0); + ASSERT_EQ(main_loop_accepted + thread_loop_accepted, connected); + + /* Clean up. */ + uv_mutex_destroy(&mutex); + + uv_loop_delete(thread_loop); + MAKE_VALGRIND_HAPPY(main_loop); + + return 0; +} + +#endif