From 882d9dca1dce7ecb1d11d65cb3b217e46f2224f2 Mon Sep 17 00:00:00 2001 From: Amit Nagler <58042354+naglera@users.noreply.github.com> Date: Tue, 26 Nov 2024 16:51:52 +0200 Subject: [PATCH] Add tag for dual-channel logs (#999) This PR introduces a consistent tagging system for dual-channel logs. The goal is to improve log readability and filterability, making it easier for operators to manage and analyze log entries. Resolves https://github.com/valkey-io/valkey/issues/986 --------- Signed-off-by: naglera Signed-off-by: vudiep411 --- src/networking.c | 21 ++-- src/replication.c | 102 +++++++++--------- src/server.h | 5 + .../integration/dual-channel-replication.tcl | 2 +- 4 files changed, 70 insertions(+), 60 deletions(-) diff --git a/src/networking.c b/src/networking.c index 93aa9d00ae..9c51efc537 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1713,10 +1713,10 @@ void freeClient(client *c) { /* Log link disconnection with replica */ if (getClientType(c) == CLIENT_TYPE_REPLICA) { - serverLog(LL_NOTICE, - c->flag.repl_rdb_channel ? "Replica %s rdb channel disconnected." - : "Connection with replica %s lost.", - replicationGetReplicaName(c)); + if (c->flag.repl_rdb_channel) + dualChannelServerLog(LL_NOTICE, "Replica %s rdb channel disconnected.", replicationGetReplicaName(c)); + else + serverLog(LL_NOTICE, "Connection with replica %s lost.", replicationGetReplicaName(c)); } /* Free the query buffer */ @@ -1963,14 +1963,15 @@ int freeClientsInAsyncFreeQueue(void) { if (!c->rdb_client_disconnect_time) { if (c->conn) connSetReadHandler(c->conn, NULL); c->rdb_client_disconnect_time = server.unixtime; - serverLog(LL_VERBOSE, "Postpone RDB client id=%llu (%s) free for %d seconds", (unsigned long long)c->id, - replicationGetReplicaName(c), server.wait_before_rdb_client_free); + dualChannelServerLog(LL_VERBOSE, "Postpone RDB client id=%llu (%s) free for %d seconds", + (unsigned long long)c->id, replicationGetReplicaName(c), server.wait_before_rdb_client_free); } if (server.unixtime - c->rdb_client_disconnect_time <= server.wait_before_rdb_client_free) continue; - serverLog(LL_NOTICE, - "Replica main channel failed to establish PSYNC within the grace period (%ld seconds). " - "Freeing RDB client %llu.", - (long int)(server.unixtime - c->rdb_client_disconnect_time), (unsigned long long)c->id); + dualChannelServerLog( + LL_NOTICE, + "Replica main channel failed to establish PSYNC within the grace period (%ld seconds). " + "Freeing RDB client %llu.", + (long int)(server.unixtime - c->rdb_client_disconnect_time), (unsigned long long)c->id); c->flag.protected_rdb_channel = 0; } diff --git a/src/replication.c b/src/replication.c index 97aa10dfab..260da1cd6e 100644 --- a/src/replication.c +++ b/src/replication.c @@ -227,9 +227,9 @@ void addRdbReplicaToPsyncWait(client *replica_rdb_client) { tail->refcount++; } } - serverLog(LL_DEBUG, "Add rdb replica %s to waiting psync, with cid %llu, %s ", - replicationGetReplicaName(replica_rdb_client), (unsigned long long)replica_rdb_client->id, - tail ? "tracking repl-backlog tail" : "no repl-backlog to track"); + dualChannelServerLog(LL_DEBUG, "Add rdb replica %s to waiting psync, with cid %llu, %s ", + replicationGetReplicaName(replica_rdb_client), (unsigned long long)replica_rdb_client->id, + tail ? "tracking repl-backlog tail" : "no repl-backlog to track"); replica_rdb_client->ref_repl_buf_node = tail ? ln : NULL; /* Prevent rdb client from being freed before psync is established. */ replica_rdb_client->flag.protected_rdb_channel = 1; @@ -252,8 +252,8 @@ void backfillRdbReplicasToPsyncWait(void) { if (replica_rdb_client->ref_repl_buf_node) continue; replica_rdb_client->ref_repl_buf_node = ln; head->refcount++; - serverLog(LL_DEBUG, "Attach replica rdb client %llu to repl buf block", - (long long unsigned int)replica_rdb_client->id); + dualChannelServerLog(LL_DEBUG, "Attach replica rdb client %llu to repl buf block", + (long long unsigned int)replica_rdb_client->id); } raxStop(&iter); } @@ -271,10 +271,10 @@ void removeReplicaFromPsyncWait(client *replica_main_client) { } replica_rdb_client->ref_repl_buf_node = NULL; replica_rdb_client->flag.protected_rdb_channel = 0; - serverLog(LL_DEBUG, "Remove psync waiting replica %s with cid %llu, repl buffer block %s", - replicationGetReplicaName(replica_main_client), - (long long unsigned int)replica_main_client->associated_rdb_client_id, - o ? "ref count decreased" : "doesn't exist"); + dualChannelServerLog(LL_DEBUG, "Remove psync waiting replica %s with cid %llu, repl buffer block %s", + replicationGetReplicaName(replica_main_client), + (long long unsigned int)replica_main_client->associated_rdb_client_id, + o ? "ref count decreased" : "doesn't exist"); uint64_t id = htonu64(replica_rdb_client->id); raxRemove(server.replicas_waiting_psync, (unsigned char *)&id, sizeof(id), NULL); } @@ -391,8 +391,8 @@ void freeReplicaReferencedReplBuffer(client *replica) { if (replica->flag.repl_rdb_channel) { uint64_t rdb_cid = htonu64(replica->id); if (raxRemove(server.replicas_waiting_psync, (unsigned char *)&rdb_cid, sizeof(rdb_cid), NULL)) { - serverLog(LL_DEBUG, "Remove psync waiting replica %s with cid %llu from replicas rax.", - replicationGetReplicaName(replica), (long long unsigned int)replica->id); + dualChannelServerLog(LL_DEBUG, "Remove psync waiting replica %s with cid %llu from replicas rax.", + replicationGetReplicaName(replica), (long long unsigned int)replica->id); } } if (replica->ref_repl_buf_node != NULL) { @@ -1121,10 +1121,11 @@ void syncCommand(client *c) { * resync. */ if (primary_replid[0] != '?') server.stat_sync_partial_err++; if (c->replica_capa & REPLICA_CAPA_DUAL_CHANNEL) { - serverLog(LL_NOTICE, - "Replica %s is capable of dual channel synchronization, and partial sync isn't possible. " - "Full sync will continue with dedicated RDB channel.", - replicationGetReplicaName(c)); + dualChannelServerLog(LL_NOTICE, + "Replica %s is capable of dual channel synchronization, and partial sync " + "isn't possible. " + "Full sync will continue with dedicated RDB channel.", + replicationGetReplicaName(c)); const char *buf = "+DUALCHANNELSYNC\r\n"; if (connWrite(c->conn, buf, strlen(buf)) != (int)strlen(buf)) { freeClientAsync(c); @@ -2565,7 +2566,7 @@ void freePendingReplDataBuf(void) { * provisional primary struct, and free local replication buffer. */ void replicationAbortDualChannelSyncTransfer(void) { serverAssert(server.repl_rdb_channel_state != REPL_DUAL_CHANNEL_STATE_NONE); - serverLog(LL_NOTICE, "Aborting dual channel sync"); + dualChannelServerLog(LL_NOTICE, "Aborting dual channel sync"); if (server.repl_rdb_transfer_s) { connClose(server.repl_rdb_transfer_s); server.repl_rdb_transfer_s = NULL; @@ -2594,8 +2595,9 @@ int sendCurrentOffsetToReplica(client *replica) { int buflen; buflen = snprintf(buf, sizeof(buf), "$ENDOFF:%lld %s %d %llu\r\n", server.primary_repl_offset, server.replid, server.db->id, (long long unsigned int)replica->id); - serverLog(LL_NOTICE, "Sending to replica %s RDB end offset %lld and client-id %llu", - replicationGetReplicaName(replica), server.primary_repl_offset, (long long unsigned int)replica->id); + dualChannelServerLog(LL_NOTICE, "Sending to replica %s RDB end offset %lld and client-id %llu", + replicationGetReplicaName(replica), server.primary_repl_offset, + (long long unsigned int)replica->id); if (connSyncWrite(replica->conn, buf, buflen, server.repl_syncio_timeout * 1000) != buflen) { freeClientAsync(replica); return C_ERR; @@ -2604,7 +2606,7 @@ int sendCurrentOffsetToReplica(client *replica) { } static int dualChannelReplHandleHandshake(connection *conn, sds *err) { - serverLog(LL_DEBUG, "Received first reply from primary using rdb connection."); + dualChannelServerLog(LL_DEBUG, "Received first reply from primary using rdb connection."); /* AUTH with the primary if required. */ if (server.primary_auth) { char *args[] = {"AUTH", NULL, NULL}; @@ -2620,7 +2622,7 @@ static int dualChannelReplHandleHandshake(connection *conn, sds *err) { argc++; *err = sendCommandArgv(conn, argc, args, lens); if (*err) { - serverLog(LL_WARNING, "Sending command to primary in dual channel replication handshake: %s", *err); + dualChannelServerLog(LL_WARNING, "Sending command to primary in dual channel replication handshake: %s", *err); return C_ERR; } } @@ -2630,14 +2632,14 @@ static int dualChannelReplHandleHandshake(connection *conn, sds *err) { NULL); sdsfree(portstr); if (*err) { - serverLog(LL_WARNING, "Sending command to primary in dual channel replication handshake: %s", *err); + dualChannelServerLog(LL_WARNING, "Sending command to primary in dual channel replication handshake: %s", *err); return C_ERR; } if (connSetReadHandler(conn, dualChannelFullSyncWithPrimary) == C_ERR) { char conninfo[CONN_INFO_LEN]; - serverLog(LL_WARNING, "Can't create readable event for SYNC: %s (%s)", strerror(errno), - connGetInfo(conn, conninfo, sizeof(conninfo))); + dualChannelServerLog(LL_WARNING, "Can't create readable event for SYNC: %s (%s)", strerror(errno), + connGetInfo(conn, conninfo, sizeof(conninfo))); return C_ERR; } return C_OK; @@ -2646,11 +2648,11 @@ static int dualChannelReplHandleHandshake(connection *conn, sds *err) { static int dualChannelReplHandleAuthReply(connection *conn, sds *err) { *err = receiveSynchronousResponse(conn); if (*err == NULL) { - serverLog(LL_WARNING, "Primary did not respond to auth command during SYNC handshake"); + dualChannelServerLog(LL_WARNING, "Primary did not respond to auth command during SYNC handshake"); return C_ERR; } if ((*err)[0] == '-') { - serverLog(LL_WARNING, "Unable to AUTH to Primary: %s", *err); + dualChannelServerLog(LL_WARNING, "Unable to AUTH to Primary: %s", *err); return C_ERR; } server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RECEIVE_REPLCONF_REPLY; @@ -2660,17 +2662,17 @@ static int dualChannelReplHandleAuthReply(connection *conn, sds *err) { static int dualChannelReplHandleReplconfReply(connection *conn, sds *err) { *err = receiveSynchronousResponse(conn); if (*err == NULL) { - serverLog(LL_WARNING, "Primary did not respond to replconf command during SYNC handshake"); + dualChannelServerLog(LL_WARNING, "Primary did not respond to replconf command during SYNC handshake"); return C_ERR; } if (*err[0] == '-') { - serverLog(LL_NOTICE, "Server does not support sync with offset, dual channel sync approach cannot be used: %s", - *err); + dualChannelServerLog(LL_NOTICE, "Server does not support sync with offset, dual channel sync approach cannot be used: %s", + *err); return C_ERR; } if (connSyncWrite(conn, "SYNC\r\n", 6, server.repl_syncio_timeout * 1000) == -1) { - serverLog(LL_WARNING, "I/O error writing to Primary: %s", connGetLastError(conn)); + dualChannelServerLog(LL_WARNING, "I/O error writing to Primary: %s", connGetLastError(conn)); return C_ERR; } return C_OK; @@ -2684,7 +2686,7 @@ static int dualChannelReplHandleEndOffsetResponse(connection *conn, sds *err) { } if (*err[0] == '\0') { /* Retry again later */ - serverLog(LL_DEBUG, "Received empty $ENDOFF response"); + dualChannelServerLog(LL_DEBUG, "Received empty $ENDOFF response"); return C_RETRY; } long long reploffset; @@ -2693,7 +2695,7 @@ static int dualChannelReplHandleEndOffsetResponse(connection *conn, sds *err) { /* Parse end offset response */ char *endoff_format = "$ENDOFF:%lld %40s %d %llu"; if (sscanf(*err, endoff_format, &reploffset, primary_replid, &dbid, &rdb_client_id) != 4) { - serverLog(LL_WARNING, "Received unexpected $ENDOFF response: %s", *err); + dualChannelServerLog(LL_WARNING, "Received unexpected $ENDOFF response: %s", *err); return C_ERR; } server.rdb_client_id = rdb_client_id; @@ -2741,7 +2743,8 @@ static void dualChannelFullSyncWithPrimary(connection *conn) { /* Check for errors in the socket: after a non blocking connect() we * may find that the socket is in error state. */ if (connGetState(conn) != CONN_STATE_CONNECTED) { - serverLog(LL_WARNING, "Error condition on socket for dual channel replication: %s", connGetLastError(conn)); + dualChannelServerLog(LL_WARNING, "Error condition on socket for dual channel replication: %s", + connGetLastError(conn)); goto error; } switch (server.repl_rdb_channel_state) { @@ -2830,13 +2833,13 @@ int readIntoReplDataBlock(connection *conn, replDataBufBlock *data_block, size_t int nread = connRead(conn, data_block->buf + data_block->used, read); if (nread == -1) { if (connGetState(conn) != CONN_STATE_CONNECTED) { - serverLog(LL_NOTICE, "Error reading from primary: %s", connGetLastError(conn)); + dualChannelServerLog(LL_NOTICE, "Error reading from primary: %s", connGetLastError(conn)); cancelReplicationHandshake(1); } return C_ERR; } if (nread == 0) { - serverLog(LL_VERBOSE, "Provisional primary closed connection"); + dualChannelServerLog(LL_VERBOSE, "Provisional primary closed connection"); cancelReplicationHandshake(1); return C_ERR; } @@ -2865,7 +2868,7 @@ void bufferReplData(connection *conn) { if (readlen && remaining_bytes == 0) { if (server.client_obuf_limits[CLIENT_TYPE_REPLICA].hard_limit_bytes && server.pending_repl_data.len > server.client_obuf_limits[CLIENT_TYPE_REPLICA].hard_limit_bytes) { - serverLog(LL_NOTICE, "Replication buffer limit reached, stopping buffering."); + dualChannelServerLog(LL_NOTICE, "Replication buffer limit reached, stopping buffering."); /* Stop accumulating primary commands. */ connSetReadHandler(conn, NULL); break; @@ -2938,7 +2941,7 @@ void dualChannelSyncSuccess(void) { /* Wait for the accumulated buffer to be processed before reading any more replication updates */ if (server.pending_repl_data.blocks && streamReplDataBufToDb(server.primary) == C_ERR) { /* Sync session aborted during repl data streaming. */ - serverLog(LL_WARNING, "Failed to stream local replication buffer into memory"); + dualChannelServerLog(LL_WARNING, "Failed to stream local replication buffer into memory"); /* Verify sync is still in progress */ if (server.repl_rdb_channel_state != REPL_DUAL_CHANNEL_STATE_NONE) { replicationAbortDualChannelSyncTransfer(); @@ -2947,7 +2950,7 @@ void dualChannelSyncSuccess(void) { return; } freePendingReplDataBuf(); - serverLog(LL_NOTICE, "Successfully streamed replication data into memory"); + dualChannelServerLog(LL_NOTICE, "Successfully streamed replication data into memory"); /* We can resume reading from the primary connection once the local replication buffer has been loaded. */ replicationSteadyStateInit(); replicationSendAck(); /* Send ACK to notify primary that replica is synced */ @@ -2963,7 +2966,7 @@ int dualChannelSyncHandlePsync(void) { if (server.repl_rdb_channel_state < REPL_DUAL_CHANNEL_RDB_LOADED) { /* RDB is still loading */ if (connSetReadHandler(server.repl_provisional_primary.conn, bufferReplData) == C_ERR) { - serverLog(LL_WARNING, "Error while setting readable handler: %s", strerror(errno)); + dualChannelServerLog(LL_WARNING, "Error while setting readable handler: %s", strerror(errno)); cancelReplicationHandshake(1); return C_ERR; } @@ -2972,7 +2975,7 @@ int dualChannelSyncHandlePsync(void) { } serverAssert(server.repl_rdb_channel_state == REPL_DUAL_CHANNEL_RDB_LOADED); /* RDB is loaded */ - serverLog(LL_DEBUG, "Dual channel sync - psync established after rdb load"); + dualChannelServerLog(LL_DEBUG, "Psync established after rdb load"); dualChannelSyncSuccess(); return C_OK; } @@ -3066,8 +3069,9 @@ int replicaTryPartialResynchronization(connection *conn, int read_reply) { /* While in dual channel replication, we should use our prepared repl id and offset. */ psync_replid = server.repl_provisional_primary.replid; snprintf(psync_offset, sizeof(psync_offset), "%lld", server.repl_provisional_primary.reploff + 1); - serverLog(LL_NOTICE, "Trying a partial resynchronization using main channel (request %s:%s).", psync_replid, - psync_offset); + dualChannelServerLog(LL_NOTICE, + "Trying a partial resynchronization using main channel (request %s:%s).", + psync_replid, psync_offset); } else if (server.cached_primary) { psync_replid = server.cached_primary->replid; snprintf(psync_offset, sizeof(psync_offset), "%lld", server.cached_primary->reploff + 1); @@ -3214,7 +3218,7 @@ int replicaTryPartialResynchronization(connection *conn, int read_reply) { /* A response of +DUALCHANNELSYNC from the primary implies that partial * synchronization is not possible and that the primary supports full * sync using dedicated RDB channel. Full sync will continue that way. */ - serverLog(LL_NOTICE, "PSYNC is not possible, initialize RDB channel."); + dualChannelServerLog(LL_NOTICE, "PSYNC is not possible, initialize RDB channel."); sdsfree(reply); return PSYNC_FULLRESYNC_DUAL_CHANNEL; } @@ -3258,7 +3262,7 @@ int dualChannelReplMainConnRecvCapaReply(connection *conn, sds *err) { *err = receiveSynchronousResponse(conn); if (*err == NULL) return C_ERR; if ((*err)[0] == '-') { - serverLog(LL_NOTICE, "Primary does not understand REPLCONF identify: %s", *err); + dualChannelServerLog(LL_NOTICE, "Primary does not understand REPLCONF identify: %s", *err); return C_ERR; } return C_OK; @@ -3267,7 +3271,7 @@ int dualChannelReplMainConnRecvCapaReply(connection *conn, sds *err) { int dualChannelReplMainConnSendPsync(connection *conn, sds *err) { if (server.debug_pause_after_fork) debugPauseProcess(); if (replicaTryPartialResynchronization(conn, 0) == PSYNC_WRITE_ERROR) { - serverLog(LL_WARNING, "Aborting dual channel sync. Write error."); + dualChannelServerLog(LL_WARNING, "Aborting dual channel sync. Write error."); *err = sdsnew(connGetLastError(conn)); return C_ERR; } @@ -3279,8 +3283,8 @@ int dualChannelReplMainConnRecvPsyncReply(connection *conn, sds *err) { if (psync_result == PSYNC_WAIT_REPLY) return C_OK; /* Try again later... */ if (psync_result == PSYNC_CONTINUE) { - serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Primary accepted a Partial Resynchronization%s", - server.repl_rdb_transfer_s != NULL ? ", RDB load in background." : "."); + dualChannelServerLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Primary accepted a Partial Resynchronization%s", + server.repl_rdb_transfer_s != NULL ? ", RDB load in background." : "."); if (server.supervised_mode == SUPERVISED_SYSTEMD) { serverCommunicateSystemd("STATUS=PRIMARY <-> REPLICA sync: Partial Resynchronization accepted. Ready to " "accept connections in read-write mode.\n"); @@ -3328,7 +3332,7 @@ void dualChannelSetupMainConnForPsync(connection *conn) { } if (ret == C_ERR) { - serverLog(LL_WARNING, "Aborting dual channel sync. Main channel psync result %d %s", ret, err ? err : ""); + dualChannelServerLog(LL_WARNING, "Aborting dual channel sync. Main channel psync result %d %s", ret, err ? err : ""); cancelReplicationHandshake(1); } sdsfree(err); @@ -3717,8 +3721,8 @@ void syncWithPrimary(connection *conn) { } if (connSetReadHandler(conn, NULL) == C_ERR) { char conninfo[CONN_INFO_LEN]; - serverLog(LL_WARNING, "Can't clear main connection handler: %s (%s)", strerror(errno), - connGetInfo(conn, conninfo, sizeof(conninfo))); + dualChannelServerLog(LL_WARNING, "Can't clear main connection handler: %s (%s)", strerror(errno), + connGetInfo(conn, conninfo, sizeof(conninfo))); goto error; } server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_SEND_HANDSHAKE; diff --git a/src/server.h b/src/server.h index 09b67b2670..0ec105a7ba 100644 --- a/src/server.h +++ b/src/server.h @@ -4044,6 +4044,11 @@ void debugPauseProcess(void); _serverLog(level, __VA_ARGS__); \ } while (0) +/* dualChannelServerLog - Log messages related to dual-channel operations + * This macro wraps the serverLog function, prepending "" + * to the log message. */ +#define dualChannelServerLog(level, ...) serverLog(level, " " __VA_ARGS__) + #define serverDebug(fmt, ...) printf("DEBUG %s:%d > " fmt "\n", __FILE__, __LINE__, __VA_ARGS__) #define serverDebugMark() printf("-- MARK %s:%d --\n", __FILE__, __LINE__) diff --git a/tests/integration/dual-channel-replication.tcl b/tests/integration/dual-channel-replication.tcl index 05bdc130c1..055ed670ab 100644 --- a/tests/integration/dual-channel-replication.tcl +++ b/tests/integration/dual-channel-replication.tcl @@ -485,7 +485,7 @@ start_server {tags {"dual-channel-replication external:skip"}} { } wait_for_value_to_propegate_to_replica $primary $replica "key1" # Confirm the occurrence of a race condition. - wait_for_log_messages -1 {"*Dual channel sync - psync established after rdb load*"} 0 2000 1 + wait_for_log_messages -1 {"* Psync established after rdb load*"} 0 2000 1 } } }