Skip to content

Commit

Permalink
Introduce shared query buffer for client reads (#258)
Browse files Browse the repository at this point in the history
This PR optimizes client query buffer handling in Valkey by introducing
a shared query buffer that is used by default for client reads. This
reduces memory usage by ~20KB per client by avoiding allocations for
most clients using short (<16KB) complete commands. For larger or
partial commands, the client still gets its own private buffer.

The primary changes are:

* Adding a shared query buffer `shared_qb` that clients use by default
* Modifying client querybuf initialization and reset logic
* Copying any partial query from shared to private buffer before command
execution
* Freeing idle client query buffers when empty to allow reuse of shared
buffer
* Master client query buffers are kept private as their contents need to
be preserved for replication stream

In addition to the memory savings, this change shows a 3% improvement in
latency and throughput when running with 1000 active clients.

The memory reduction may also help reduce the need to evict clients when
reaching max memory limit, as the query buffer is the main memory
consumer per client.

---------

Signed-off-by: Uri Yagelnik <uriy@amazon.com>
Signed-off-by: Madelyn Olson <madelyneolson@gmail.com>
Co-authored-by: Madelyn Olson <madelyneolson@gmail.com>
  • Loading branch information
uriyage and madolson authored May 28, 2024
1 parent 7ba7e4d commit fd58b73
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 20 deletions.
95 changes: 82 additions & 13 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ static void pauseClientsByClient(mstime_t end, int isPauseClientAll);
int postponeClientRead(client *c);
char *getClientSockname(client *c);
int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */
__thread sds thread_shared_qb = NULL;

/* Return the size consumed from the allocator, for the specified SDS string,
* including internal fragmentation. This function is used in order to compute
Expand Down Expand Up @@ -147,7 +148,7 @@ client *createClient(connection *conn) {
c->ref_repl_buf_node = NULL;
c->ref_block_pos = 0;
c->qb_pos = 0;
c->querybuf = sdsempty();
c->querybuf = NULL;
c->querybuf_peak = 0;
c->reqtype = 0;
c->argc = 0;
Expand Down Expand Up @@ -1608,7 +1609,11 @@ void freeClient(client *c) {
}

/* Free the query buffer */
sdsfree(c->querybuf);
if (c->querybuf && c->querybuf == thread_shared_qb) {
sdsclear(c->querybuf);
} else {
sdsfree(c->querybuf);
}
c->querybuf = NULL;

/* Deallocate structures used to block on blocking ops. */
Expand Down Expand Up @@ -2093,6 +2098,48 @@ void resetClient(client *c) {
}
}

/* Initializes the shared query buffer to a new sds with the default capacity */
void initSharedQueryBuf(void) {
thread_shared_qb = sdsnewlen(NULL, PROTO_IOBUF_LEN);
sdsclear(thread_shared_qb);
}

/* Resets the shared query buffer used by the given client.
* If any data remained in the buffer, the client will take ownership of the buffer
* and a new empty buffer will be allocated for the shared buffer. */
void resetSharedQueryBuf(client *c) {
serverAssert(c->querybuf == thread_shared_qb);
size_t remaining = sdslen(c->querybuf) - c->qb_pos;

if (remaining > 0) {
/* Let the client take ownership of the shared buffer. */
initSharedQueryBuf();
return;
}

c->querybuf = NULL;
sdsclear(thread_shared_qb);
c->qb_pos = 0;
}

/* Trims the client query buffer to the current position. */
void trimClientQueryBuffer(client *c) {
if (c->querybuf == thread_shared_qb) {
resetSharedQueryBuf(c);
}

if (c->querybuf == NULL) {
return;
}

serverAssert(c->qb_pos <= sdslen(c->querybuf));

if (c->qb_pos > 0) {
sdsrange(c->querybuf, c->qb_pos, -1);
c->qb_pos = 0;
}
}

/* This function is used when we want to re-enter the event loop but there
* is the risk that the client we are dealing with will be freed in some
* way. This happens for instance in:
Expand Down Expand Up @@ -2348,6 +2395,10 @@ int processMultibulkBuffer(client *c) {
* ll+2, trimming querybuf is just a waste of time, because
* at this time the querybuf contains not only our bulk. */
if (sdslen(c->querybuf) - c->qb_pos <= (size_t)ll + 2) {
if (c->querybuf == thread_shared_qb) {
/* Let the client take the ownership of the shared buffer. */
initSharedQueryBuf();
}
sdsrange(c->querybuf, c->qb_pos, -1);
c->qb_pos = 0;
/* Hint the sds library about the amount of bytes this string is
Expand Down Expand Up @@ -2508,7 +2559,7 @@ int processPendingCommandAndInputBuffer(client *c) {
* return C_ERR in case the client was freed during the processing */
int processInputBuffer(client *c) {
/* Keep processing while there is something in the input buffer */
while (c->qb_pos < sdslen(c->querybuf)) {
while (c->querybuf && c->qb_pos < sdslen(c->querybuf)) {
/* Immediately abort if the client is in the middle of something. */
if (c->flags & CLIENT_BLOCKED) break;

Expand Down Expand Up @@ -2559,6 +2610,13 @@ int processInputBuffer(client *c) {
break;
}

if (c->querybuf == thread_shared_qb) {
/* Before processing the command, reset the shared query buffer to its default state.
* This avoids unintentionally modifying the shared qb during processCommand as we may use
* the shared qb for other clients during processEventsWhileBlocked */
resetSharedQueryBuf(c);
}

/* We are finally ready to execute the command. */
if (processCommandAndResetClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid exiting this
Expand Down Expand Up @@ -2587,10 +2645,8 @@ int processInputBuffer(client *c) {
c->qb_pos -= c->repl_applied;
c->repl_applied = 0;
}
} else if (c->qb_pos) {
/* Trim to pos */
sdsrange(c->querybuf, c->qb_pos, -1);
c->qb_pos = 0;
} else {
trimClientQueryBuffer(c);
}

/* Update client memory usage after processing the query buffer, this is
Expand All @@ -2614,14 +2670,16 @@ void readQueryFromClient(connection *conn) {
atomic_fetch_add_explicit(&server.stat_total_reads_processed, 1, memory_order_relaxed);

readlen = PROTO_IOBUF_LEN;
qblen = c->querybuf ? sdslen(c->querybuf) : 0;
/* If this is a multi bulk request, and we are processing a bulk reply
* that is large enough, try to maximize the probability that the query
* buffer contains exactly the SDS string representing the object, even
* at the risk of requiring more read(2) calls. This way the function
* processMultiBulkBuffer() can avoid copying buffers to create the
* robj representing the argument. */

if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= PROTO_MBULK_BIG_ARG) {
ssize_t remaining = (size_t)(c->bulklen + 2) - (sdslen(c->querybuf) - c->qb_pos);
ssize_t remaining = (size_t)(c->bulklen + 2) - (qblen - c->qb_pos);
big_arg = 1;

/* Note that the 'remaining' variable may be zero in some edge case,
Expand All @@ -2633,7 +2691,12 @@ void readQueryFromClient(connection *conn) {
if (c->flags & CLIENT_MASTER && readlen < PROTO_IOBUF_LEN) readlen = PROTO_IOBUF_LEN;
}

qblen = sdslen(c->querybuf);
if (c->querybuf == NULL) {
serverAssert(sdslen(thread_shared_qb) == 0);
c->querybuf = big_arg ? sdsempty() : thread_shared_qb;
qblen = sdslen(c->querybuf);
}

if (!(c->flags & CLIENT_MASTER) && // master client's querybuf can grow greedy.
(big_arg || sdsalloc(c->querybuf) < PROTO_IOBUF_LEN)) {
/* When reading a BIG_ARG we won't be reading more than that one arg
Expand All @@ -2654,7 +2717,7 @@ void readQueryFromClient(connection *conn) {
nread = connRead(c->conn, c->querybuf + qblen, readlen);
if (nread == -1) {
if (connGetState(conn) == CONN_STATE_CONNECTED) {
return;
goto done;
} else {
serverLog(LL_VERBOSE, "Reading from client: %s", connGetLastError(c->conn));
freeClientAsync(c);
Expand Down Expand Up @@ -2707,6 +2770,10 @@ void readQueryFromClient(connection *conn) {
if (processInputBuffer(c) == C_ERR) c = NULL;

done:
if (c && c->querybuf == thread_shared_qb) {
sdsclear(thread_shared_qb);
c->querybuf = NULL;
}
beforeNextClient(c);
}

Expand Down Expand Up @@ -2824,8 +2891,8 @@ sds catClientInfoString(sds s, client *client) {
" ssub=%i", (int) dictSize(client->pubsubshard_channels),
" multi=%i", (client->flags & CLIENT_MULTI) ? client->mstate.count : -1,
" watch=%i", (int) listLength(client->watched_keys),
" qbuf=%U", (unsigned long long) sdslen(client->querybuf),
" qbuf-free=%U", (unsigned long long) sdsavail(client->querybuf),
" qbuf=%U", client->querybuf ? (unsigned long long) sdslen(client->querybuf) : 0,
" qbuf-free=%U", client->querybuf ? (unsigned long long) sdsavail(client->querybuf) : 0,
" argv-mem=%U", (unsigned long long) client->argv_len_sum,
" multi-mem=%U", (unsigned long long) client->mstate.argv_len_sums,
" rbs=%U", (unsigned long long) client->buf_usable_size,
Expand Down Expand Up @@ -3780,8 +3847,9 @@ size_t getClientOutputBufferMemoryUsage(client *c) {
* the client output buffer memory usage portion of the total. */
size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) {
size_t mem = getClientOutputBufferMemoryUsage(c);

if (output_buffer_mem_usage != NULL) *output_buffer_mem_usage = mem;
mem += sdsZmallocSize(c->querybuf);
mem += c->querybuf ? sdsZmallocSize(c->querybuf) : 0;
mem += zmalloc_size(c);
mem += c->buf_usable_size;
/* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory
Expand Down Expand Up @@ -4168,6 +4236,7 @@ void *IOThreadMain(void *myid) {
valkey_set_thread_title(thdname);
serverSetCpuAffinity(server.server_cpulist);
makeThreadKillable();
initSharedQueryBuf();

while (1) {
/* Wait for start */
Expand Down
3 changes: 3 additions & 0 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -1722,6 +1722,9 @@ void replicationCreateMasterClient(connection *conn, int dbid) {
* connection. */
server.master->flags |= CLIENT_MASTER;

/* Allocate a private query buffer for the master client instead of using the shared query buffer.
* This is done because the master's query buffer data needs to be preserved for my sub-replicas to use. */
server.master->querybuf = sdsempty();
server.master->authenticated = 1;
server.master->reploff = server.master_initial_offset;
server.master->read_reploff = server.master->reploff;
Expand Down
24 changes: 20 additions & 4 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,8 @@ long long getInstantaneousMetric(int metric) {
*
* The function always returns 0 as it never terminates the client. */
int clientsCronResizeQueryBuffer(client *c) {
/* If the client query buffer is NULL, it is using the shared query buffer and there is nothing to do. */
if (c->querybuf == NULL) return 0;
size_t querybuf_size = sdsalloc(c->querybuf);
time_t idletime = server.unixtime - c->lastinteraction;

Expand All @@ -723,7 +725,18 @@ int clientsCronResizeQueryBuffer(client *c) {
/* There are two conditions to resize the query buffer: */
if (idletime > 2) {
/* 1) Query is idle for a long time. */
c->querybuf = sdsRemoveFreeSpace(c->querybuf, 1);
size_t remaining = sdslen(c->querybuf) - c->qb_pos;
if (!(c->flags & CLIENT_MASTER) && !remaining) {
/* If the client is not a master and no data is pending,
* The client can safely use the shared query buffer in the next read - free the client's querybuf. */
sdsfree(c->querybuf);
/* By setting the querybuf to NULL, the client will use the shared query buffer in the next read.
* We don't move the client to the shared query buffer immediately, because if we allocated a private
* query buffer for the client, it's likely that the client will use it again soon. */
c->querybuf = NULL;
} else {
c->querybuf = sdsRemoveFreeSpace(c->querybuf, 1);
}
} else if (querybuf_size > PROTO_RESIZE_THRESHOLD && querybuf_size / 2 > c->querybuf_peak) {
/* 2) Query buffer is too big for latest peak and is larger than
* resize threshold. Trim excess space but only up to a limit,
Expand All @@ -739,7 +752,7 @@ int clientsCronResizeQueryBuffer(client *c) {

/* Reset the peak again to capture the peak memory usage in the next
* cycle. */
c->querybuf_peak = sdslen(c->querybuf);
c->querybuf_peak = c->querybuf ? sdslen(c->querybuf) : 0;
/* We reset to either the current used, or currently processed bulk size,
* which ever is bigger. */
if (c->bulklen != -1 && (size_t)c->bulklen + 2 > c->querybuf_peak) c->querybuf_peak = c->bulklen + 2;
Expand Down Expand Up @@ -807,7 +820,9 @@ size_t ClientsPeakMemInput[CLIENTS_PEAK_MEM_USAGE_SLOTS] = {0};
size_t ClientsPeakMemOutput[CLIENTS_PEAK_MEM_USAGE_SLOTS] = {0};

int clientsCronTrackExpansiveClients(client *c, int time_idx) {
size_t in_usage = sdsZmallocSize(c->querybuf) + c->argv_len_sum + (c->argv ? zmalloc_size(c->argv) : 0);
size_t qb_size = c->querybuf ? sdsZmallocSize(c->querybuf) : 0;
size_t argv_size = c->argv ? zmalloc_size(c->argv) : 0;
size_t in_usage = qb_size + c->argv_len_sum + argv_size;
size_t out_usage = getClientOutputBufferMemoryUsage(c);

/* Track the biggest values observed so far in this slot. */
Expand Down Expand Up @@ -2711,6 +2726,7 @@ void initServer(void) {
}
slowlogInit();
latencyMonitorInit();
initSharedQueryBuf();

/* Initialize ACL default password if it exists */
ACLUpdateDefaultUserPassword(server.requirepass);
Expand Down Expand Up @@ -6310,7 +6326,7 @@ void dismissMemory(void *ptr, size_t size_hint) {
void dismissClientMemory(client *c) {
/* Dismiss client query buffer and static reply buffer. */
dismissMemory(c->buf, c->buf_usable_size);
dismissSds(c->querybuf);
if (c->querybuf) dismissSds(c->querybuf);
/* Dismiss argv array only if we estimate it contains a big buffer. */
if (c->argc && c->argv_len_sum / c->argc >= server.page_size) {
for (int i = 0; i < c->argc; i++) {
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2721,6 +2721,7 @@ void linkClient(client *c);
void protectClient(client *c);
void unprotectClient(client *c);
void initThreadedIO(void);
void initSharedQueryBuf(void);
client *lookupClientByID(uint64_t id);
int authRequired(client *c);
void putClientInPendingWriteQueue(client *c);
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/introspection.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ start_server {tags {"introspection"}} {

test {CLIENT LIST} {
r client list
} {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|list user=* redir=-1 resp=* lib-name=* lib-ver=* tot-net-in=* tot-net-out=* tot-cmds=*}
} {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=0 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|list user=* redir=-1 resp=* lib-name=* lib-ver=* tot-net-in=* tot-net-out=* tot-cmds=*}

test {CLIENT LIST with IDs} {
set myid [r client id]
Expand All @@ -17,7 +17,7 @@ start_server {tags {"introspection"}} {

test {CLIENT INFO} {
r client info
} {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|info user=* redir=-1 resp=* lib-name=* lib-ver=* tot-net-in=* tot-net-out=* tot-cmds=*}
} {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=0 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|info user=* redir=-1 resp=* lib-name=* lib-ver=* tot-net-in=* tot-net-out=* tot-cmds=*}

proc get_field_in_client_info {info field} {
set info [string trim $info]
Expand Down
23 changes: 22 additions & 1 deletion tests/unit/querybuf.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,24 @@ start_server {tags {"querybuf slow"}} {
# The test will run at least 2s to check if client query
# buffer will be resized when client idle 2s.
test "query buffer resized correctly" {
set rd [valkey_client]

set rd [valkey_deferring_client]

$rd client setname test_client
$rd read

# Make sure query buff has size of 0 bytes at start as the client uses the shared qb.
assert {[client_query_buffer test_client] == 0}

# Send partial command to client to make sure it doesn't use the shared qb.
$rd write "*3\r\n\$3\r\nset\r\n\$2\r\na"
$rd flush
after 100
# send the rest of the command
$rd write "a\r\n\$1\r\nb\r\n"
$rd flush
assert_equal {OK} [$rd read]

set orig_test_client_qbuf [client_query_buffer test_client]
# Make sure query buff has less than the peak resize threshold (PROTO_RESIZE_THRESHOLD) 32k
# but at least the basic IO reading buffer size (PROTO_IOBUF_LEN) 16k
Expand Down Expand Up @@ -78,6 +94,11 @@ start_server {tags {"querybuf slow"}} {
$rd write "*3\r\n\$3\r\nset\r\n\$1\r\na\r\n\$1000000\r\n"
$rd flush

after 200
# Send the start of the arg and make sure the client is not using shared qb for it rather a private buf of > 1000000 size.
$rd write "a"
$rd flush

after 20
if {[client_query_buffer test_client] < 1000000} {
fail "query buffer should not be resized when client idle time smaller than 2s"
Expand Down

0 comments on commit fd58b73

Please sign in to comment.