Skip to content

Commit

Permalink
Removed negotiation callback, restore registerSSLEvent, restore conn …
Browse files Browse the repository at this point in the history
…abstraction

Signed-off-by: Uri Yagelnik <uriy@amazon.com>
  • Loading branch information
uriyage committed Nov 27, 2024
1 parent dc5aa76 commit c0cf818
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 104 deletions.
6 changes: 3 additions & 3 deletions src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ typedef enum {
CONN_STATE_ERROR
} ConnectionState;

#define CONN_FLAG_CLOSE_SCHEDULED (1 << 0) /* Closed scheduled by a handler */
#define CONN_FLAG_WRITE_BARRIER (1 << 1) /* Write barrier requested */
#define CONN_FLAG_NO_OFFLOAD (1 << 2) /* Connection should not be offloaded to IO threads. */
#define CONN_FLAG_CLOSE_SCHEDULED (1 << 0) /* Closed scheduled by a handler */
#define CONN_FLAG_WRITE_BARRIER (1 << 1) /* Write barrier requested */
#define CONN_FLAG_ALLOW_ACCEPT_OFFLOAD (1 << 2) /* Connection accept can be offloaded to IO threads. */

#define CONN_TYPE_SOCKET "tcp"
#define CONN_TYPE_UNIX "unix"
Expand Down
31 changes: 14 additions & 17 deletions src/io_threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
static __thread int thread_id = 0; /* Thread local var */
static pthread_t io_threads[IO_THREADS_MAX_NUM] = {0};
static pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM];
typedef void (*tls_negotiation_callback)(void *);
tls_negotiation_callback tls_negotiation_cb;

/* IO jobs queue functions - Used to send jobs from the main-thread to the IO thread. */
typedef void (*job_handler)(void *);
Expand Down Expand Up @@ -557,29 +555,29 @@ void trySendPollJobToIOThreads(void) {
IOJobQueue_push(jq, IOThreadPoll, server.el);
}

void setTLSNegotiationCallback(tls_negotiation_callback cb) {
tls_negotiation_cb = cb;
}

static void ioThreadTLSNegotiation(void *data) {
static void ioThreadAccept(void *data) {
client *c = (client *)data;
tls_negotiation_cb(c->conn);
connAccept(c->conn, NULL);
c->io_read_state = CLIENT_COMPLETED_IO;
}

/*
* This function attempts to offload TLS negotiation for a client connection to an I/O thread.
* Returns C_OK if the TLS negotiation was successfully queued for processing by an I/O thread,
* or C_ERR if the client is not eligible for offloading.
* Attempts to offload an Accept operation (currently used for TLS accept) for a client
* connection to I/O threads.
*
* Returns:
* C_OK - If the accept operation was successfully queued for processing
* C_ERR - If the connection is not eligible for offloading
*
* Parameters:
* conn: The connection object for which TLS negotiation should be performed
* conn - The connection object to perform the accept operation on
*/
int trySendTLSNegotiationToIOThreads(connection *conn) {
int trySendAcceptToIOThreads(connection *conn) {
if (server.io_threads_num <= 1) {
return C_ERR;
}

if (!(conn->flags & CONN_FLAG_NO_OFFLOAD)) {
if (!(conn->flags & CONN_FLAG_ALLOW_ACCEPT_OFFLOAD)) {
return C_ERR;
}

Expand All @@ -599,13 +597,12 @@ int trySendTLSNegotiationToIOThreads(connection *conn) {
return C_ERR;
}

c->read_flags = READ_FLAGS_TLS_NEGOTIATION;
c->io_read_state = CLIENT_PENDING_IO;
c->flag.pending_read = 1;
listLinkNodeTail(server.clients_pending_io_read, &c->pending_read_list_node);
connSetPostponeUpdateState(c->conn, 1);
server.stat_io_tls_negotiation_offloaded++;
IOJobQueue_push(job_queue, ioThreadTLSNegotiation, c);
server.stat_io_accept_offloaded++;
IOJobQueue_push(job_queue, ioThreadAccept, c);

return C_OK;
}
4 changes: 2 additions & 2 deletions src/io_threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ int tryOffloadFreeArgvToIOThreads(client *c);
void adjustIOThreadsByEventLoad(int numevents, int increase_only);
void drainIOThreadsQueue(void);
void trySendPollJobToIOThreads(void);
int trySendTLSNegotiationToIOThreads(connection *conn);
void setTLSNegotiationCallback(void (*cb)(void *));
int trySendAcceptToIOThreads(connection *conn);

#endif /* IO_THREADS_H */
8 changes: 5 additions & 3 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ client *createClient(connection *conn) {
if (server.tcpkeepalive) connKeepAlive(conn, server.tcpkeepalive);
connSetReadHandler(conn, readQueryFromClient);
connSetPrivateData(conn, c);
conn->flags |= CONN_FLAG_NO_OFFLOAD;
conn->flags |= CONN_FLAG_ALLOW_ACCEPT_OFFLOAD;
}
c->buf = zmalloc_usable(PROTO_REPLY_CHUNK_BYTES, &c->buf_usable_size);
selectDb(c, 0);
Expand Down Expand Up @@ -4723,11 +4723,13 @@ int processIOThreadsReadDone(void) {
processed++;
server.stat_io_reads_processed++;

/* Save the current conn state, as connUpdateState may modify it */
int in_accept_state = (connGetState(c->conn) == CONN_STATE_ACCEPTING);
connSetPostponeUpdateState(c->conn, 0);
connUpdateState(c->conn);

/* No client's data was read only TLS handshake. */
if (c->read_flags & READ_FLAGS_TLS_NEGOTIATION) continue;
/* In accept state, no client's data was read - stop here. */
if (in_accept_state) continue;

/* On read error - stop here. */
if (handleReadResult(c) == C_ERR) {
Expand Down
4 changes: 2 additions & 2 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2604,7 +2604,7 @@ void resetServerStats(void) {
server.stat_total_reads_processed = 0;
server.stat_io_writes_processed = 0;
server.stat_io_freed_objects = 0;
server.stat_io_tls_negotiation_offloaded = 0;
server.stat_io_accept_offloaded = 0;
server.stat_poll_processed_by_io_threads = 0;
server.stat_total_writes_processed = 0;
server.stat_client_qbuf_limit_disconnections = 0;
Expand Down Expand Up @@ -5863,7 +5863,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
"io_threaded_reads_processed:%lld\r\n", server.stat_io_reads_processed,
"io_threaded_writes_processed:%lld\r\n", server.stat_io_writes_processed,
"io_threaded_freed_objects:%lld\r\n", server.stat_io_freed_objects,
"io_threaded_tls_negotiations:%lld\r\n", server.stat_io_tls_negotiation_offloaded,
"io_threaded_accept:%lld\r\n", server.stat_io_accept_offloaded,
"io_threaded_poll_processed:%lld\r\n", server.stat_poll_processed_by_io_threads,
"io_threaded_total_prefetch_batches:%lld\r\n", server.stat_total_prefetch_batches,
"io_threaded_total_prefetch_entries:%lld\r\n", server.stat_total_prefetch_entries,
Expand Down
3 changes: 1 addition & 2 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1841,7 +1841,7 @@ struct valkeyServer {
long long stat_io_reads_processed; /* Number of read events processed by IO threads */
long long stat_io_writes_processed; /* Number of write events processed by IO threads */
long long stat_io_freed_objects; /* Number of objects freed by IO threads */
long long stat_io_tls_negotiation_offloaded; /* Number of TLS negotiation offloads */
long long stat_io_accept_offloaded; /* Number of offloaded accepts */
long long stat_poll_processed_by_io_threads; /* Total number of poll jobs processed by IO */
long long stat_total_reads_processed; /* Total number of read events processed */
long long stat_total_writes_processed; /* Total number of write events processed */
Expand Down Expand Up @@ -2768,7 +2768,6 @@ void dictVanillaFree(void *val);
#define READ_FLAGS_PRIMARY (1 << 14)
#define READ_FLAGS_DONT_PARSE (1 << 15)
#define READ_FLAGS_AUTH_REQUIRED (1 << 16)
#define READ_FLAGS_TLS_NEGOTIATION (1 << 17)

/* Write flags for various write errors and states */
#define WRITE_FLAGS_WRITE_ERROR (1 << 0)
Expand Down
Loading

0 comments on commit c0cf818

Please sign in to comment.