From fd58b73f0ae895bf9de3810d799da20bb75a2b4f Mon Sep 17 00:00:00 2001 From: uriyage <78144248+uriyage@users.noreply.github.com> Date: Tue, 28 May 2024 21:09:37 +0300 Subject: [PATCH] Introduce shared query buffer for client reads (#258) This PR optimizes client query buffer handling in Valkey by introducing a shared query buffer that is used by default for client reads. This reduces memory usage by ~20KB per client by avoiding allocations for most clients using short (<16KB) complete commands. For larger or partial commands, the client still gets its own private buffer. The primary changes are: * Adding a shared query buffer `shared_qb` that clients use by default * Modifying client querybuf initialization and reset logic * Copying any partial query from shared to private buffer before command execution * Freeing idle client query buffers when empty to allow reuse of shared buffer * Master client query buffers are kept private as their contents need to be preserved for replication stream In addition to the memory savings, this change shows a 3% improvement in latency and throughput when running with 1000 active clients. The memory reduction may also help reduce the need to evict clients when reaching max memory limit, as the query buffer is the main memory consumer per client. --------- Signed-off-by: Uri Yagelnik Signed-off-by: Madelyn Olson Co-authored-by: Madelyn Olson --- src/networking.c | 95 +++++++++++++++++++++++++++++++----- src/replication.c | 3 ++ src/server.c | 24 +++++++-- src/server.h | 1 + tests/unit/introspection.tcl | 4 +- tests/unit/querybuf.tcl | 23 ++++++++- 6 files changed, 130 insertions(+), 20 deletions(-) diff --git a/src/networking.c b/src/networking.c index 2ff9a7e366..e062bc3aba 100644 --- a/src/networking.c +++ b/src/networking.c @@ -43,6 +43,7 @@ static void pauseClientsByClient(mstime_t end, int isPauseClientAll); int postponeClientRead(client *c); char *getClientSockname(client *c); int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */ +__thread sds thread_shared_qb = NULL; /* Return the size consumed from the allocator, for the specified SDS string, * including internal fragmentation. This function is used in order to compute @@ -147,7 +148,7 @@ client *createClient(connection *conn) { c->ref_repl_buf_node = NULL; c->ref_block_pos = 0; c->qb_pos = 0; - c->querybuf = sdsempty(); + c->querybuf = NULL; c->querybuf_peak = 0; c->reqtype = 0; c->argc = 0; @@ -1608,7 +1609,11 @@ void freeClient(client *c) { } /* Free the query buffer */ - sdsfree(c->querybuf); + if (c->querybuf && c->querybuf == thread_shared_qb) { + sdsclear(c->querybuf); + } else { + sdsfree(c->querybuf); + } c->querybuf = NULL; /* Deallocate structures used to block on blocking ops. */ @@ -2093,6 +2098,48 @@ void resetClient(client *c) { } } +/* Initializes the shared query buffer to a new sds with the default capacity */ +void initSharedQueryBuf(void) { + thread_shared_qb = sdsnewlen(NULL, PROTO_IOBUF_LEN); + sdsclear(thread_shared_qb); +} + +/* Resets the shared query buffer used by the given client. + * If any data remained in the buffer, the client will take ownership of the buffer + * and a new empty buffer will be allocated for the shared buffer. */ +void resetSharedQueryBuf(client *c) { + serverAssert(c->querybuf == thread_shared_qb); + size_t remaining = sdslen(c->querybuf) - c->qb_pos; + + if (remaining > 0) { + /* Let the client take ownership of the shared buffer. */ + initSharedQueryBuf(); + return; + } + + c->querybuf = NULL; + sdsclear(thread_shared_qb); + c->qb_pos = 0; +} + +/* Trims the client query buffer to the current position. */ +void trimClientQueryBuffer(client *c) { + if (c->querybuf == thread_shared_qb) { + resetSharedQueryBuf(c); + } + + if (c->querybuf == NULL) { + return; + } + + serverAssert(c->qb_pos <= sdslen(c->querybuf)); + + if (c->qb_pos > 0) { + sdsrange(c->querybuf, c->qb_pos, -1); + c->qb_pos = 0; + } +} + /* This function is used when we want to re-enter the event loop but there * is the risk that the client we are dealing with will be freed in some * way. This happens for instance in: @@ -2348,6 +2395,10 @@ int processMultibulkBuffer(client *c) { * ll+2, trimming querybuf is just a waste of time, because * at this time the querybuf contains not only our bulk. */ if (sdslen(c->querybuf) - c->qb_pos <= (size_t)ll + 2) { + if (c->querybuf == thread_shared_qb) { + /* Let the client take the ownership of the shared buffer. */ + initSharedQueryBuf(); + } sdsrange(c->querybuf, c->qb_pos, -1); c->qb_pos = 0; /* Hint the sds library about the amount of bytes this string is @@ -2508,7 +2559,7 @@ int processPendingCommandAndInputBuffer(client *c) { * return C_ERR in case the client was freed during the processing */ int processInputBuffer(client *c) { /* Keep processing while there is something in the input buffer */ - while (c->qb_pos < sdslen(c->querybuf)) { + while (c->querybuf && c->qb_pos < sdslen(c->querybuf)) { /* Immediately abort if the client is in the middle of something. */ if (c->flags & CLIENT_BLOCKED) break; @@ -2559,6 +2610,13 @@ int processInputBuffer(client *c) { break; } + if (c->querybuf == thread_shared_qb) { + /* Before processing the command, reset the shared query buffer to its default state. + * This avoids unintentionally modifying the shared qb during processCommand as we may use + * the shared qb for other clients during processEventsWhileBlocked */ + resetSharedQueryBuf(c); + } + /* We are finally ready to execute the command. */ if (processCommandAndResetClient(c) == C_ERR) { /* If the client is no longer valid, we avoid exiting this @@ -2587,10 +2645,8 @@ int processInputBuffer(client *c) { c->qb_pos -= c->repl_applied; c->repl_applied = 0; } - } else if (c->qb_pos) { - /* Trim to pos */ - sdsrange(c->querybuf, c->qb_pos, -1); - c->qb_pos = 0; + } else { + trimClientQueryBuffer(c); } /* Update client memory usage after processing the query buffer, this is @@ -2614,14 +2670,16 @@ void readQueryFromClient(connection *conn) { atomic_fetch_add_explicit(&server.stat_total_reads_processed, 1, memory_order_relaxed); readlen = PROTO_IOBUF_LEN; + qblen = c->querybuf ? sdslen(c->querybuf) : 0; /* If this is a multi bulk request, and we are processing a bulk reply * that is large enough, try to maximize the probability that the query * buffer contains exactly the SDS string representing the object, even * at the risk of requiring more read(2) calls. This way the function * processMultiBulkBuffer() can avoid copying buffers to create the * robj representing the argument. */ + if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= PROTO_MBULK_BIG_ARG) { - ssize_t remaining = (size_t)(c->bulklen + 2) - (sdslen(c->querybuf) - c->qb_pos); + ssize_t remaining = (size_t)(c->bulklen + 2) - (qblen - c->qb_pos); big_arg = 1; /* Note that the 'remaining' variable may be zero in some edge case, @@ -2633,7 +2691,12 @@ void readQueryFromClient(connection *conn) { if (c->flags & CLIENT_MASTER && readlen < PROTO_IOBUF_LEN) readlen = PROTO_IOBUF_LEN; } - qblen = sdslen(c->querybuf); + if (c->querybuf == NULL) { + serverAssert(sdslen(thread_shared_qb) == 0); + c->querybuf = big_arg ? sdsempty() : thread_shared_qb; + qblen = sdslen(c->querybuf); + } + if (!(c->flags & CLIENT_MASTER) && // master client's querybuf can grow greedy. (big_arg || sdsalloc(c->querybuf) < PROTO_IOBUF_LEN)) { /* When reading a BIG_ARG we won't be reading more than that one arg @@ -2654,7 +2717,7 @@ void readQueryFromClient(connection *conn) { nread = connRead(c->conn, c->querybuf + qblen, readlen); if (nread == -1) { if (connGetState(conn) == CONN_STATE_CONNECTED) { - return; + goto done; } else { serverLog(LL_VERBOSE, "Reading from client: %s", connGetLastError(c->conn)); freeClientAsync(c); @@ -2707,6 +2770,10 @@ void readQueryFromClient(connection *conn) { if (processInputBuffer(c) == C_ERR) c = NULL; done: + if (c && c->querybuf == thread_shared_qb) { + sdsclear(thread_shared_qb); + c->querybuf = NULL; + } beforeNextClient(c); } @@ -2824,8 +2891,8 @@ sds catClientInfoString(sds s, client *client) { " ssub=%i", (int) dictSize(client->pubsubshard_channels), " multi=%i", (client->flags & CLIENT_MULTI) ? client->mstate.count : -1, " watch=%i", (int) listLength(client->watched_keys), - " qbuf=%U", (unsigned long long) sdslen(client->querybuf), - " qbuf-free=%U", (unsigned long long) sdsavail(client->querybuf), + " qbuf=%U", client->querybuf ? (unsigned long long) sdslen(client->querybuf) : 0, + " qbuf-free=%U", client->querybuf ? (unsigned long long) sdsavail(client->querybuf) : 0, " argv-mem=%U", (unsigned long long) client->argv_len_sum, " multi-mem=%U", (unsigned long long) client->mstate.argv_len_sums, " rbs=%U", (unsigned long long) client->buf_usable_size, @@ -3780,8 +3847,9 @@ size_t getClientOutputBufferMemoryUsage(client *c) { * the client output buffer memory usage portion of the total. */ size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) { size_t mem = getClientOutputBufferMemoryUsage(c); + if (output_buffer_mem_usage != NULL) *output_buffer_mem_usage = mem; - mem += sdsZmallocSize(c->querybuf); + mem += c->querybuf ? sdsZmallocSize(c->querybuf) : 0; mem += zmalloc_size(c); mem += c->buf_usable_size; /* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory @@ -4168,6 +4236,7 @@ void *IOThreadMain(void *myid) { valkey_set_thread_title(thdname); serverSetCpuAffinity(server.server_cpulist); makeThreadKillable(); + initSharedQueryBuf(); while (1) { /* Wait for start */ diff --git a/src/replication.c b/src/replication.c index 0c561f1204..1d5e0fe290 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1722,6 +1722,9 @@ void replicationCreateMasterClient(connection *conn, int dbid) { * connection. */ server.master->flags |= CLIENT_MASTER; + /* Allocate a private query buffer for the master client instead of using the shared query buffer. + * This is done because the master's query buffer data needs to be preserved for my sub-replicas to use. */ + server.master->querybuf = sdsempty(); server.master->authenticated = 1; server.master->reploff = server.master_initial_offset; server.master->read_reploff = server.master->reploff; diff --git a/src/server.c b/src/server.c index e0590706d3..f87193b74c 100644 --- a/src/server.c +++ b/src/server.c @@ -714,6 +714,8 @@ long long getInstantaneousMetric(int metric) { * * The function always returns 0 as it never terminates the client. */ int clientsCronResizeQueryBuffer(client *c) { + /* If the client query buffer is NULL, it is using the shared query buffer and there is nothing to do. */ + if (c->querybuf == NULL) return 0; size_t querybuf_size = sdsalloc(c->querybuf); time_t idletime = server.unixtime - c->lastinteraction; @@ -723,7 +725,18 @@ int clientsCronResizeQueryBuffer(client *c) { /* There are two conditions to resize the query buffer: */ if (idletime > 2) { /* 1) Query is idle for a long time. */ - c->querybuf = sdsRemoveFreeSpace(c->querybuf, 1); + size_t remaining = sdslen(c->querybuf) - c->qb_pos; + if (!(c->flags & CLIENT_MASTER) && !remaining) { + /* If the client is not a master and no data is pending, + * The client can safely use the shared query buffer in the next read - free the client's querybuf. */ + sdsfree(c->querybuf); + /* By setting the querybuf to NULL, the client will use the shared query buffer in the next read. + * We don't move the client to the shared query buffer immediately, because if we allocated a private + * query buffer for the client, it's likely that the client will use it again soon. */ + c->querybuf = NULL; + } else { + c->querybuf = sdsRemoveFreeSpace(c->querybuf, 1); + } } else if (querybuf_size > PROTO_RESIZE_THRESHOLD && querybuf_size / 2 > c->querybuf_peak) { /* 2) Query buffer is too big for latest peak and is larger than * resize threshold. Trim excess space but only up to a limit, @@ -739,7 +752,7 @@ int clientsCronResizeQueryBuffer(client *c) { /* Reset the peak again to capture the peak memory usage in the next * cycle. */ - c->querybuf_peak = sdslen(c->querybuf); + c->querybuf_peak = c->querybuf ? sdslen(c->querybuf) : 0; /* We reset to either the current used, or currently processed bulk size, * which ever is bigger. */ if (c->bulklen != -1 && (size_t)c->bulklen + 2 > c->querybuf_peak) c->querybuf_peak = c->bulklen + 2; @@ -807,7 +820,9 @@ size_t ClientsPeakMemInput[CLIENTS_PEAK_MEM_USAGE_SLOTS] = {0}; size_t ClientsPeakMemOutput[CLIENTS_PEAK_MEM_USAGE_SLOTS] = {0}; int clientsCronTrackExpansiveClients(client *c, int time_idx) { - size_t in_usage = sdsZmallocSize(c->querybuf) + c->argv_len_sum + (c->argv ? zmalloc_size(c->argv) : 0); + size_t qb_size = c->querybuf ? sdsZmallocSize(c->querybuf) : 0; + size_t argv_size = c->argv ? zmalloc_size(c->argv) : 0; + size_t in_usage = qb_size + c->argv_len_sum + argv_size; size_t out_usage = getClientOutputBufferMemoryUsage(c); /* Track the biggest values observed so far in this slot. */ @@ -2711,6 +2726,7 @@ void initServer(void) { } slowlogInit(); latencyMonitorInit(); + initSharedQueryBuf(); /* Initialize ACL default password if it exists */ ACLUpdateDefaultUserPassword(server.requirepass); @@ -6310,7 +6326,7 @@ void dismissMemory(void *ptr, size_t size_hint) { void dismissClientMemory(client *c) { /* Dismiss client query buffer and static reply buffer. */ dismissMemory(c->buf, c->buf_usable_size); - dismissSds(c->querybuf); + if (c->querybuf) dismissSds(c->querybuf); /* Dismiss argv array only if we estimate it contains a big buffer. */ if (c->argc && c->argv_len_sum / c->argc >= server.page_size) { for (int i = 0; i < c->argc; i++) { diff --git a/src/server.h b/src/server.h index bdf8b12574..249d896d35 100644 --- a/src/server.h +++ b/src/server.h @@ -2721,6 +2721,7 @@ void linkClient(client *c); void protectClient(client *c); void unprotectClient(client *c); void initThreadedIO(void); +void initSharedQueryBuf(void); client *lookupClientByID(uint64_t id); int authRequired(client *c); void putClientInPendingWriteQueue(client *c); diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index 9a0f3d7b31..02aca9e97d 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -7,7 +7,7 @@ start_server {tags {"introspection"}} { test {CLIENT LIST} { r client list - } {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|list user=* redir=-1 resp=* lib-name=* lib-ver=* tot-net-in=* tot-net-out=* tot-cmds=*} + } {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=0 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|list user=* redir=-1 resp=* lib-name=* lib-ver=* tot-net-in=* tot-net-out=* tot-cmds=*} test {CLIENT LIST with IDs} { set myid [r client id] @@ -17,7 +17,7 @@ start_server {tags {"introspection"}} { test {CLIENT INFO} { r client info - } {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|info user=* redir=-1 resp=* lib-name=* lib-ver=* tot-net-in=* tot-net-out=* tot-cmds=*} + } {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=0 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|info user=* redir=-1 resp=* lib-name=* lib-ver=* tot-net-in=* tot-net-out=* tot-cmds=*} proc get_field_in_client_info {info field} { set info [string trim $info] diff --git a/tests/unit/querybuf.tcl b/tests/unit/querybuf.tcl index 0394b72c00..66942a5bd1 100644 --- a/tests/unit/querybuf.tcl +++ b/tests/unit/querybuf.tcl @@ -24,8 +24,24 @@ start_server {tags {"querybuf slow"}} { # The test will run at least 2s to check if client query # buffer will be resized when client idle 2s. test "query buffer resized correctly" { - set rd [valkey_client] + + set rd [valkey_deferring_client] + $rd client setname test_client + $rd read + + # Make sure query buff has size of 0 bytes at start as the client uses the shared qb. + assert {[client_query_buffer test_client] == 0} + + # Send partial command to client to make sure it doesn't use the shared qb. + $rd write "*3\r\n\$3\r\nset\r\n\$2\r\na" + $rd flush + after 100 + # send the rest of the command + $rd write "a\r\n\$1\r\nb\r\n" + $rd flush + assert_equal {OK} [$rd read] + set orig_test_client_qbuf [client_query_buffer test_client] # Make sure query buff has less than the peak resize threshold (PROTO_RESIZE_THRESHOLD) 32k # but at least the basic IO reading buffer size (PROTO_IOBUF_LEN) 16k @@ -78,6 +94,11 @@ start_server {tags {"querybuf slow"}} { $rd write "*3\r\n\$3\r\nset\r\n\$1\r\na\r\n\$1000000\r\n" $rd flush + after 200 + # Send the start of the arg and make sure the client is not using shared qb for it rather a private buf of > 1000000 size. + $rd write "a" + $rd flush + after 20 if {[client_query_buffer test_client] < 1000000} { fail "query buffer should not be resized when client idle time smaller than 2s"