From f59c5e8cffc15550a0f265174a726ed17d6dc36e Mon Sep 17 00:00:00 2001 From: "lvyanqi.lyq" Date: Thu, 17 Oct 2024 17:26:08 +0800 Subject: [PATCH 01/14] add pseudo-replica mode Signed-off-by: lvyanqi.lyq --- src/cluster.c | 0 src/cluster_legacy.c | 5 +++ src/config.c | 14 ++++++++ src/db.c | 5 ++- src/evict.c | 4 +-- src/expire.c | 2 +- src/replication.c | 19 +++++++++++ src/server.c | 5 +-- src/server.h | 5 ++- tests/unit/expire.tcl | 74 ++++++++++++++++++++++++++++++++++++++++ tests/unit/maxmemory.tcl | 18 ++++++++++ valkey.conf | 7 ++++ 12 files changed, 151 insertions(+), 7 deletions(-) mode change 100644 => 100755 src/cluster.c mode change 100644 => 100755 src/cluster_legacy.c mode change 100644 => 100755 src/config.c mode change 100644 => 100755 src/db.c mode change 100644 => 100755 src/evict.c mode change 100644 => 100755 src/expire.c mode change 100644 => 100755 src/replication.c mode change 100644 => 100755 src/server.c mode change 100644 => 100755 src/server.h mode change 100644 => 100755 tests/unit/expire.tcl mode change 100644 => 100755 tests/unit/maxmemory.tcl mode change 100644 => 100755 valkey.conf diff --git a/src/cluster.c b/src/cluster.c old mode 100644 new mode 100755 diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c old mode 100644 new mode 100755 index 14f8a6bd1e..d21e30a649 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -6648,6 +6648,11 @@ int clusterCommandSpecial(client *c) { return 1; } + if (server.pseudo_replica) { + addReplyError(c, "CLUSTER REPLICATE not allowed in pseudo-replica mode."); + return 1; + } + /* If the instance is currently a primary, it should have no assigned * slots nor keys to accept to replicate some other node. * Replicas can switch to another primary without issues. */ diff --git a/src/config.c b/src/config.c old mode 100644 new mode 100755 index 663cf5da38..1f0184015f --- a/src/config.c +++ b/src/config.c @@ -2331,6 +2331,14 @@ static int isValidActiveDefrag(int val, const char **err) { return 1; } +static int isValidPseudoReplica(int val, const char **err) { + if (server.primary_host && val) { + *err = "Server is already a replica"; + return 0; + } + return 1; +} + static int isValidClusterConfigFile(char *val, const char **err) { if (!strcmp(val, "")) { *err = "cluster-config-file can't be empty"; @@ -2949,6 +2957,11 @@ static int setConfigReplicaOfOption(standardConfig *config, sds *argv, int argc, return 0; } + if (server.pseudo_replica) { + *err = "REPLICAOF not allowed in pseudo-replica mode"; + return 0; + } + sdsfree(server.primary_host); server.primary_host = NULL; if (!strcasecmp(argv[0], "no") && !strcasecmp(argv[1], "one")) { @@ -3136,6 +3149,7 @@ standardConfig static_configs[] = { createBoolConfig("enable-debug-assert", NULL, IMMUTABLE_CONFIG | HIDDEN_CONFIG, server.enable_debug_assert, 0, NULL, NULL), createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL), createBoolConfig("hide-user-data-from-log", NULL, MODIFIABLE_CONFIG, server.hide_user_data_from_log, 1, NULL, NULL), + createBoolConfig("pseudo-replica", NULL, MODIFIABLE_CONFIG, server.pseudo_replica, 0, isValidPseudoReplica, NULL), /* String Configs */ createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL), diff --git a/src/db.c b/src/db.c old mode 100644 new mode 100755 index 00e6e7b2d6..612cf1e864 --- a/src/db.c +++ b/src/db.c @@ -385,7 +385,7 @@ robj *dbRandomKey(serverDb *db) { key = dictGetKey(de); keyobj = createStringObject(key, sdslen(key)); if (dbFindExpiresWithDictIndex(db, key, randomDictIndex)) { - if (allvolatile && server.primary_host && --maxtries == 0) { + if (allvolatile && (server.primary_host || server.pseudo_replica) && --maxtries == 0) { /* If the DB is composed only of keys with an expire set, * it could happen that all the keys are already logically * expired in the repilca, so the function cannot stop because @@ -1826,6 +1826,9 @@ keyStatus expireIfNeededWithDictIndex(serverDb *db, robj *key, int flags, int di if (server.primary_host != NULL) { if (server.current_client && (server.current_client->flag.primary)) return KEY_VALID; if (!(flags & EXPIRE_FORCE_DELETE_EXPIRED)) return KEY_EXPIRED; + } else if (server.pseudo_replica) { + if (server.current_client && (server.current_client->flag.pseudo_master)) return KEY_VALID; + if (!(flags & EXPIRE_FORCE_DELETE_EXPIRED)) return KEY_EXPIRED; } /* In some cases we're explicitly instructed to return an indication of a diff --git a/src/evict.c b/src/evict.c old mode 100644 new mode 100755 index 5e4b6220eb..aa1057e36f --- a/src/evict.c +++ b/src/evict.c @@ -546,8 +546,8 @@ int performEvictions(void) { goto update_metrics; } - if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION) { - result = EVICT_FAIL; /* We need to free memory, but policy forbids. */ + if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION || server.pseudo_replica) { + result = EVICT_FAIL; /* We need to free memory, but policy forbids or we are in 'pseudo-replica' mode. */ goto update_metrics; } diff --git a/src/expire.c b/src/expire.c old mode 100644 new mode 100755 index 928bb58d86..9a6f8dc84c --- a/src/expire.c +++ b/src/expire.c @@ -521,7 +521,7 @@ int checkAlreadyExpired(long long when) { * * Instead we add the already expired key to the database with expire time * (possibly in the past) and wait for an explicit DEL from the primary. */ - return (when <= commandTimeSnapshot() && !server.loading && !server.primary_host); + return (when <= commandTimeSnapshot() && !server.loading && !server.primary_host && !server.pseudo_replica); } #define EXPIRE_NX (1 << 0) diff --git a/src/replication.c b/src/replication.c old mode 100644 new mode 100755 index 63433de865..f053e86da4 --- a/src/replication.c +++ b/src/replication.c @@ -1272,6 +1272,10 @@ void syncCommand(client *c) { * - rdb-channel <1|0> * Used to identify the client as a replica's rdb connection in an dual channel * sync session. + * + * - pseudo-master <0|1> + * Set this connection behaving like a master if server.pseudo_replica is true. + * Sync tools can set their connections into 'pseudo-master' state to visit expired keys. * */ void replconfCommand(client *c) { int j; @@ -1418,6 +1422,16 @@ void replconfCommand(client *c) { return; } c->associated_rdb_client_id = (uint64_t)client_id; + } else if (!strcasecmp(c->argv[j]->ptr, "pseudo-master")) { + long pseudo_master = 0; + if (getRangeLongFromObjectOrReply(c, c->argv[j + 1], + 0, 1, &pseudo_master, NULL) != C_OK) + return; + if (pseudo_master == 1) { + c->flag.pseudo_master = 1; + } else { + c->flag.pseudo_master = 0; + } } else { addReplyErrorFormat(c, "Unrecognized REPLCONF option: %s", (char *)c->argv[j]->ptr); return; @@ -3961,6 +3975,11 @@ void replicaofCommand(client *c) { return; } + if (server.pseudo_replica) { + addReplyError(c, "REPLICAOF not allowed in pseudo-replica mode."); + return; + } + if (server.failover_state != NO_FAILOVER) { addReplyError(c, "REPLICAOF not allowed while failing over."); return; diff --git a/src/server.c b/src/server.c old mode 100644 new mode 100755 index ab95f84346..e56860cbe8 --- a/src/server.c +++ b/src/server.c @@ -1054,7 +1054,7 @@ void clientsCron(void) { void databasesCron(void) { /* Expire keys by random sampling. Not required for replicas * as primary will synthesize DELs for us. */ - if (server.active_expire_enabled) { + if (server.active_expire_enabled && !server.pseudo_replica) { if (iAmPrimary()) { activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); } else { @@ -1651,7 +1651,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* Run a fast expire cycle (the called function will return * ASAP if a fast cycle is not needed). */ - if (server.active_expire_enabled && iAmPrimary()) activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); + if (server.active_expire_enabled && !server.pseudo_replica && iAmPrimary()) activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); if (moduleCount()) { moduleFireServerEvent(VALKEYMODULE_EVENT_EVENTLOOP, VALKEYMODULE_SUBEVENT_EVENTLOOP_BEFORE_SLEEP, NULL); @@ -2057,6 +2057,7 @@ void initServerConfig(void) { server.extended_redis_compat = 0; server.pause_cron = 0; server.dict_resizing = 1; + server.pseudo_replica = 0; server.latency_tracking_info_percentiles_len = 3; server.latency_tracking_info_percentiles = zmalloc(sizeof(double) * (server.latency_tracking_info_percentiles_len)); diff --git a/src/server.h b/src/server.h old mode 100644 new mode 100755 index 4fad8d2508..590e352031 --- a/src/server.h +++ b/src/server.h @@ -1224,7 +1224,8 @@ typedef struct ClientFlags { * knows that it does not need the cache and required a full sync. With this * flag, we won't cache the primary in freeClient. */ uint64_t fake : 1; /* This is a fake client without a real connection. */ - uint64_t reserved : 5; /* Reserved for future use */ + uint64_t pseudo_master : 1; /* This client is a pseudo master */ + uint64_t reserved : 4; /* Reserved for future use */ } ClientFlags; typedef struct client { @@ -2070,6 +2071,8 @@ struct valkeyServer { char primary_replid[CONFIG_RUN_ID_SIZE + 1]; /* Primary PSYNC runid. */ long long primary_initial_offset; /* Primary PSYNC offset. */ int repl_replica_lazy_flush; /* Lazy FLUSHALL before loading DB? */ + /* Pseudo Replica */ + int pseudo_replica; /* If true, server is a pseudo replica. */ /* Synchronous replication. */ list *clients_waiting_acks; /* Clients waiting in WAIT or WAITAOF. */ int get_ack_from_replicas; /* If true we send REPLCONF GETACK. */ diff --git a/tests/unit/expire.tcl b/tests/unit/expire.tcl old mode 100644 new mode 100755 index d85ce7ee68..09acb350c4 --- a/tests/unit/expire.tcl +++ b/tests/unit/expire.tcl @@ -832,6 +832,80 @@ start_server {tags {"expire"}} { close_replication_stream $repl assert_equal [r debug set-active-expire 1] {OK} } {} {needs:debug} + + test {Pseudo-replica mode should forbid active expiration} { + r flushall + + r config set pseudo-replica yes + assert_equal [r replconf pseudo-master 1] {OK} + + r set foo1 bar PX 1 + r set foo2 bar PX 1 + after 100 + + assert_equal [r dbsize] {2} + + assert_equal [r replconf pseudo-master 0] {OK} + r config set pseudo-replica no + + # Verify all keys have expired + wait_for_condition 40 100 { + [r dbsize] eq 0 + } else { + fail "Keys did not actively expire." + } + } + + test {Pseudo-replica mode should forbid lazy expiration} { + r flushall + r debug set-active-expire 0 + + r config set pseudo-replica yes + assert_equal [r replconf pseudo-master 1] {OK} + + r set foo1 1 PX 1 + after 10 + + r get foo1 + assert_equal [r dbsize] {1} + + assert_equal [r replconf pseudo-master 0] {OK} + r config set pseudo-replica no + + r get foo1 + + assert_equal [r dbsize] {0} + + assert_equal [r debug set-active-expire 1] {OK} + } {} {needs:debug} + + test {RANDOMKEY can return expired key in Pseudo-replica mode} { + r flushall + + r config set pseudo-replica yes + assert_equal [r replconf pseudo-master 1] {OK} + + r set foo1 bar PX 1 + after 10 + + set client [valkey [srv "host"] [srv "port"] 0 $::tls] + if {!$::singledb} { + $client select 9 + } + assert_equal [$client ttl foo1] {-2} + + assert_equal [r randomkey] {foo1} + + assert_equal [r replconf pseudo-master 0] {OK} + r config set pseudo-replica no + + # Verify all keys have expired + wait_for_condition 40 100 { + [r dbsize] eq 0 + } else { + fail "Keys did not actively expire." + } + } } start_cluster 1 0 {tags {"expire external:skip cluster"}} { diff --git a/tests/unit/maxmemory.tcl b/tests/unit/maxmemory.tcl old mode 100644 new mode 100755 index d4e62246f1..bf99084ce7 --- a/tests/unit/maxmemory.tcl +++ b/tests/unit/maxmemory.tcl @@ -611,3 +611,21 @@ start_server {tags {"maxmemory" "external:skip"}} { assert {[r object freq foo] == 5} } } + +start_server {tags {"maxmemory" "external:skip"}} { + test {Pseudo-replica mode should forbid eviction} { + r set key val + r config set pseudo-replica yes + assert_equal [r replconf pseudo-master 1] {OK} + r config set maxmemory-policy allkeys-lru + r config set maxmemory 1 + + assert_equal [r dbsize] {1} + assert_error {OOM command not allowed*} {r set key1 val1} + + assert_equal [r replconf pseudo-master 0] {OK} + r config set pseudo-replica no + + assert_equal [r dbsize] {0} + } +} \ No newline at end of file diff --git a/valkey.conf b/valkey.conf old mode 100644 new mode 100755 index f485b42b1a..9085473477 --- a/valkey.conf +++ b/valkey.conf @@ -801,6 +801,13 @@ replica-priority 100 # # replica-ignore-disk-write-errors no +# Make the master behave like a replica, which forbids expiration and evcition. +# This is useful for sync tools, because expiration and evcition may cause the data corruption. +# Sync tools can set their connections into 'pseudo-master' state by REPLCONF PSEUDO-MASTER to +# behave like a master(i.e. visit expired keys). +# +# pseudo-replica no + # ----------------------------------------------------------------------------- # By default, Sentinel includes all replicas in its reports. A replica # can be excluded from Sentinel's announcements. An unannounced replica From 7226e552ba6aba08341c51a0f512386e8edf3748 Mon Sep 17 00:00:00 2001 From: "lvyanqi.lyq" Date: Fri, 18 Oct 2024 15:00:06 +0800 Subject: [PATCH 02/14] rename pseudo-replica to import-mode and add client import-source command Signed-off-by: lvyanqi.lyq --- src/cluster_legacy.c | 4 +-- src/commands.def | 29 +++++++++++++++++++ src/commands/client-import-source.json | 40 ++++++++++++++++++++++++++ src/config.c | 10 +++---- src/db.c | 6 ++-- src/evict.c | 4 +-- src/expire.c | 2 +- src/networking.c | 19 ++++++++++++ src/replication.c | 18 ++---------- src/server.c | 6 ++-- src/server.h | 6 ++-- tests/unit/expire.tcl | 30 +++++++++---------- tests/unit/maxmemory.tcl | 10 +++---- valkey.conf | 8 +++--- 14 files changed, 133 insertions(+), 59 deletions(-) create mode 100644 src/commands/client-import-source.json diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index d21e30a649..6ce5967cbe 100755 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -6648,8 +6648,8 @@ int clusterCommandSpecial(client *c) { return 1; } - if (server.pseudo_replica) { - addReplyError(c, "CLUSTER REPLICATE not allowed in pseudo-replica mode."); + if (server.import_mode) { + addReplyError(c, "CLUSTER REPLICATE not allowed in import mode."); return 1; } diff --git a/src/commands.def b/src/commands.def index cd9f8e2984..8abd059b10 100644 --- a/src/commands.def +++ b/src/commands.def @@ -1230,6 +1230,34 @@ struct COMMAND_ARG CLIENT_CAPA_Args[] = { #define CLIENT_ID_Keyspecs NULL #endif +/********** CLIENT IMPORT_SOURCE ********************/ + +#ifndef SKIP_CMD_HISTORY_TABLE +/* CLIENT IMPORT_SOURCE history */ +#define CLIENT_IMPORT_SOURCE_History NULL +#endif + +#ifndef SKIP_CMD_TIPS_TABLE +/* CLIENT IMPORT_SOURCE tips */ +#define CLIENT_IMPORT_SOURCE_Tips NULL +#endif + +#ifndef SKIP_CMD_KEY_SPECS_TABLE +/* CLIENT IMPORT_SOURCE key specs */ +#define CLIENT_IMPORT_SOURCE_Keyspecs NULL +#endif + +/* CLIENT IMPORT_SOURCE enabled argument table */ +struct COMMAND_ARG CLIENT_IMPORT_SOURCE_enabled_Subargs[] = { +{MAKE_ARG("on",ARG_TYPE_PURE_TOKEN,-1,"ON",NULL,NULL,CMD_ARG_NONE,0,NULL)}, +{MAKE_ARG("off",ARG_TYPE_PURE_TOKEN,-1,"OFF",NULL,NULL,CMD_ARG_NONE,0,NULL)}, +}; + +/* CLIENT IMPORT_SOURCE argument table */ +struct COMMAND_ARG CLIENT_IMPORT_SOURCE_Args[] = { +{MAKE_ARG("enabled",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,2,NULL),.subargs=CLIENT_IMPORT_SOURCE_enabled_Subargs}, +}; + /********** CLIENT INFO ********************/ #ifndef SKIP_CMD_HISTORY_TABLE @@ -1630,6 +1658,7 @@ struct COMMAND_STRUCT CLIENT_Subcommands[] = { {MAKE_CMD("getredir","Returns the client ID to which the connection's tracking notifications are redirected.","O(1)","6.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_GETREDIR_History,0,CLIENT_GETREDIR_Tips,0,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_GETREDIR_Keyspecs,0,NULL,0)}, {MAKE_CMD("help","Returns helpful text about the different subcommands.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_HELP_History,0,CLIENT_HELP_Tips,0,clientCommand,2,CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_HELP_Keyspecs,0,NULL,0)}, {MAKE_CMD("id","Returns the unique client ID of the connection.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_ID_History,0,CLIENT_ID_Tips,0,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_ID_Keyspecs,0,NULL,0)}, +{MAKE_CMD("import-source","Allow this client to import data.","O(1)","8.2.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_IMPORT_SOURCE_History,0,CLIENT_IMPORT_SOURCE_Tips,0,clientCommand,3,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,ACL_CATEGORY_CONNECTION,CLIENT_IMPORT_SOURCE_Keyspecs,0,NULL,1),.args=CLIENT_IMPORT_SOURCE_Args}, {MAKE_CMD("info","Returns information about the connection.","O(1)","6.2.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_INFO_History,0,CLIENT_INFO_Tips,1,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_INFO_Keyspecs,0,NULL,0)}, {MAKE_CMD("kill","Terminates open connections.","O(N) where N is the number of client connections","2.4.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_KILL_History,7,CLIENT_KILL_Tips,0,clientCommand,-3,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_KILL_Keyspecs,0,NULL,1),.args=CLIENT_KILL_Args}, {MAKE_CMD("list","Lists open connections.","O(N) where N is the number of client connections","2.4.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_LIST_History,7,CLIENT_LIST_Tips,1,clientCommand,-2,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_LIST_Keyspecs,0,NULL,2),.args=CLIENT_LIST_Args}, diff --git a/src/commands/client-import-source.json b/src/commands/client-import-source.json new file mode 100644 index 0000000000..dfebef5ca0 --- /dev/null +++ b/src/commands/client-import-source.json @@ -0,0 +1,40 @@ +{ + "IMPORT-SOURCE": { + "summary": "Allow this client to import data.", + "complexity": "O(1)", + "group": "connection", + "since": "8.2.0", + "arity": 3, + "container": "CLIENT", + "function": "clientCommand", + "command_flags": [ + "NOSCRIPT", + "LOADING", + "STALE" + ], + "acl_categories": [ + "CONNECTION" + ], + "reply_schema": { + "const": "OK" + }, + "arguments": [ + { + "name": "enabled", + "type": "oneof", + "arguments": [ + { + "name": "on", + "type": "pure-token", + "token": "ON" + }, + { + "name": "off", + "type": "pure-token", + "token": "OFF" + } + ] + } + ] + } +} \ No newline at end of file diff --git a/src/config.c b/src/config.c index 1f0184015f..bdbbbe5891 100755 --- a/src/config.c +++ b/src/config.c @@ -2331,9 +2331,9 @@ static int isValidActiveDefrag(int val, const char **err) { return 1; } -static int isValidPseudoReplica(int val, const char **err) { +static int isValidImportMode(int val, const char **err) { if (server.primary_host && val) { - *err = "Server is already a replica"; + *err = "Server is already in replica mode"; return 0; } return 1; @@ -2957,8 +2957,8 @@ static int setConfigReplicaOfOption(standardConfig *config, sds *argv, int argc, return 0; } - if (server.pseudo_replica) { - *err = "REPLICAOF not allowed in pseudo-replica mode"; + if (server.import_mode) { + *err = "REPLICAOF not allowed in import mode"; return 0; } @@ -3149,7 +3149,7 @@ standardConfig static_configs[] = { createBoolConfig("enable-debug-assert", NULL, IMMUTABLE_CONFIG | HIDDEN_CONFIG, server.enable_debug_assert, 0, NULL, NULL), createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL), createBoolConfig("hide-user-data-from-log", NULL, MODIFIABLE_CONFIG, server.hide_user_data_from_log, 1, NULL, NULL), - createBoolConfig("pseudo-replica", NULL, MODIFIABLE_CONFIG, server.pseudo_replica, 0, isValidPseudoReplica, NULL), + createBoolConfig("import-mode", NULL, MODIFIABLE_CONFIG, server.import_mode, 0, isValidImportMode, NULL), /* String Configs */ createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL), diff --git a/src/db.c b/src/db.c index 612cf1e864..30200275a7 100755 --- a/src/db.c +++ b/src/db.c @@ -385,7 +385,7 @@ robj *dbRandomKey(serverDb *db) { key = dictGetKey(de); keyobj = createStringObject(key, sdslen(key)); if (dbFindExpiresWithDictIndex(db, key, randomDictIndex)) { - if (allvolatile && (server.primary_host || server.pseudo_replica) && --maxtries == 0) { + if (allvolatile && (server.primary_host || server.import_mode) && --maxtries == 0) { /* If the DB is composed only of keys with an expire set, * it could happen that all the keys are already logically * expired in the repilca, so the function cannot stop because @@ -1826,8 +1826,8 @@ keyStatus expireIfNeededWithDictIndex(serverDb *db, robj *key, int flags, int di if (server.primary_host != NULL) { if (server.current_client && (server.current_client->flag.primary)) return KEY_VALID; if (!(flags & EXPIRE_FORCE_DELETE_EXPIRED)) return KEY_EXPIRED; - } else if (server.pseudo_replica) { - if (server.current_client && (server.current_client->flag.pseudo_master)) return KEY_VALID; + } else if (server.import_mode) { + if (server.current_client && (server.current_client->flag.import_source)) return KEY_VALID; if (!(flags & EXPIRE_FORCE_DELETE_EXPIRED)) return KEY_EXPIRED; } diff --git a/src/evict.c b/src/evict.c index aa1057e36f..0eaacfb62c 100755 --- a/src/evict.c +++ b/src/evict.c @@ -546,8 +546,8 @@ int performEvictions(void) { goto update_metrics; } - if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION || server.pseudo_replica) { - result = EVICT_FAIL; /* We need to free memory, but policy forbids or we are in 'pseudo-replica' mode. */ + if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION || server.import_mode) { + result = EVICT_FAIL; /* We need to free memory, but policy forbids or we are in import mode. */ goto update_metrics; } diff --git a/src/expire.c b/src/expire.c index 9a6f8dc84c..8c79abcde7 100755 --- a/src/expire.c +++ b/src/expire.c @@ -521,7 +521,7 @@ int checkAlreadyExpired(long long when) { * * Instead we add the already expired key to the database with expire time * (possibly in the past) and wait for an explicit DEL from the primary. */ - return (when <= commandTimeSnapshot() && !server.loading && !server.primary_host && !server.pseudo_replica); + return (when <= commandTimeSnapshot() && !server.loading && !server.primary_host && !server.import_mode); } #define EXPIRE_NX (1 << 0) diff --git a/src/networking.c b/src/networking.c index c24a95019b..35cd4cb7e5 100644 --- a/src/networking.c +++ b/src/networking.c @@ -3567,6 +3567,9 @@ void clientCommand(client *c) { " Protect current client connection from eviction.", "NO-TOUCH (ON|OFF)", " Will not touch LRU/LFU stats when this mode is on.", + "PSEUDO-PRIMARY (ON|OFF)", + " Set this connection behaving like a primary if server.import_mode is true.", + " Sync tools can set their connections into 'pseudo-primary' state to visit expired keys.", NULL}; addReplyHelp(c, help); } else if (!strcasecmp(c->argv[1]->ptr, "id") && c->argc == 2) { @@ -4040,6 +4043,22 @@ void clientCommand(client *c) { } } addReply(c, shared.ok); + } else if (!strcasecmp(c->argv[1]->ptr, "import-source")) { + /* CLIENT IMPORT-SOURCE ON|OFF */ + if (!server.import_mode) { + addReplyError(c, "Server is not in import mode"); + return; + } + if (!strcasecmp(c->argv[2]->ptr, "on")) { + c->flag.import_source = 1; + addReply(c, shared.ok); + } else if (!strcasecmp(c->argv[2]->ptr, "off")) { + c->flag.import_source = 0; + addReply(c, shared.ok); + } else { + addReplyErrorObject(c, shared.syntaxerr); + return; + } } else { addReplySubcommandSyntaxError(c); } diff --git a/src/replication.c b/src/replication.c index f053e86da4..0f582ba726 100755 --- a/src/replication.c +++ b/src/replication.c @@ -1272,10 +1272,6 @@ void syncCommand(client *c) { * - rdb-channel <1|0> * Used to identify the client as a replica's rdb connection in an dual channel * sync session. - * - * - pseudo-master <0|1> - * Set this connection behaving like a master if server.pseudo_replica is true. - * Sync tools can set their connections into 'pseudo-master' state to visit expired keys. * */ void replconfCommand(client *c) { int j; @@ -1422,16 +1418,6 @@ void replconfCommand(client *c) { return; } c->associated_rdb_client_id = (uint64_t)client_id; - } else if (!strcasecmp(c->argv[j]->ptr, "pseudo-master")) { - long pseudo_master = 0; - if (getRangeLongFromObjectOrReply(c, c->argv[j + 1], - 0, 1, &pseudo_master, NULL) != C_OK) - return; - if (pseudo_master == 1) { - c->flag.pseudo_master = 1; - } else { - c->flag.pseudo_master = 0; - } } else { addReplyErrorFormat(c, "Unrecognized REPLCONF option: %s", (char *)c->argv[j]->ptr); return; @@ -3975,8 +3961,8 @@ void replicaofCommand(client *c) { return; } - if (server.pseudo_replica) { - addReplyError(c, "REPLICAOF not allowed in pseudo-replica mode."); + if (server.import_mode) { + addReplyError(c, "REPLICAOF not allowed in import mode."); return; } diff --git a/src/server.c b/src/server.c index e56860cbe8..c34d95bd21 100755 --- a/src/server.c +++ b/src/server.c @@ -1054,7 +1054,7 @@ void clientsCron(void) { void databasesCron(void) { /* Expire keys by random sampling. Not required for replicas * as primary will synthesize DELs for us. */ - if (server.active_expire_enabled && !server.pseudo_replica) { + if (server.active_expire_enabled && !server.import_mode) { if (iAmPrimary()) { activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); } else { @@ -1651,7 +1651,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* Run a fast expire cycle (the called function will return * ASAP if a fast cycle is not needed). */ - if (server.active_expire_enabled && !server.pseudo_replica && iAmPrimary()) activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); + if (server.active_expire_enabled && !server.import_mode && iAmPrimary()) activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); if (moduleCount()) { moduleFireServerEvent(VALKEYMODULE_EVENT_EVENTLOOP, VALKEYMODULE_SUBEVENT_EVENTLOOP_BEFORE_SLEEP, NULL); @@ -2057,7 +2057,7 @@ void initServerConfig(void) { server.extended_redis_compat = 0; server.pause_cron = 0; server.dict_resizing = 1; - server.pseudo_replica = 0; + server.import_mode = 0; server.latency_tracking_info_percentiles_len = 3; server.latency_tracking_info_percentiles = zmalloc(sizeof(double) * (server.latency_tracking_info_percentiles_len)); diff --git a/src/server.h b/src/server.h index 590e352031..5afb11edab 100755 --- a/src/server.h +++ b/src/server.h @@ -1224,7 +1224,7 @@ typedef struct ClientFlags { * knows that it does not need the cache and required a full sync. With this * flag, we won't cache the primary in freeClient. */ uint64_t fake : 1; /* This is a fake client without a real connection. */ - uint64_t pseudo_master : 1; /* This client is a pseudo master */ + uint64_t import_source : 1; /* This client is importing data to server and can visit expired key. */ uint64_t reserved : 4; /* Reserved for future use */ } ClientFlags; @@ -2071,8 +2071,8 @@ struct valkeyServer { char primary_replid[CONFIG_RUN_ID_SIZE + 1]; /* Primary PSYNC runid. */ long long primary_initial_offset; /* Primary PSYNC offset. */ int repl_replica_lazy_flush; /* Lazy FLUSHALL before loading DB? */ - /* Pseudo Replica */ - int pseudo_replica; /* If true, server is a pseudo replica. */ + /* Import Mode */ + int import_mode; /* If true, server is in import mode and forbid expiration and evcition. */ /* Synchronous replication. */ list *clients_waiting_acks; /* Clients waiting in WAIT or WAITAOF. */ int get_ack_from_replicas; /* If true we send REPLCONF GETACK. */ diff --git a/tests/unit/expire.tcl b/tests/unit/expire.tcl index 09acb350c4..fba425f62d 100755 --- a/tests/unit/expire.tcl +++ b/tests/unit/expire.tcl @@ -833,11 +833,11 @@ start_server {tags {"expire"}} { assert_equal [r debug set-active-expire 1] {OK} } {} {needs:debug} - test {Pseudo-replica mode should forbid active expiration} { + test {Import mode should forbid active expiration} { r flushall - r config set pseudo-replica yes - assert_equal [r replconf pseudo-master 1] {OK} + r config set import-mode yes + assert_equal [r client import-source on] {OK} r set foo1 bar PX 1 r set foo2 bar PX 1 @@ -845,8 +845,8 @@ start_server {tags {"expire"}} { assert_equal [r dbsize] {2} - assert_equal [r replconf pseudo-master 0] {OK} - r config set pseudo-replica no + assert_equal [r client import-source off] {OK} + r config set import-mode no # Verify all keys have expired wait_for_condition 40 100 { @@ -856,12 +856,12 @@ start_server {tags {"expire"}} { } } - test {Pseudo-replica mode should forbid lazy expiration} { + test {Import mode should forbid lazy expiration} { r flushall r debug set-active-expire 0 - r config set pseudo-replica yes - assert_equal [r replconf pseudo-master 1] {OK} + r config set import-mode yes + assert_equal [r client import-source on] {OK} r set foo1 1 PX 1 after 10 @@ -869,8 +869,8 @@ start_server {tags {"expire"}} { r get foo1 assert_equal [r dbsize] {1} - assert_equal [r replconf pseudo-master 0] {OK} - r config set pseudo-replica no + assert_equal [r client import-source off] {OK} + r config set import-mode no r get foo1 @@ -879,11 +879,11 @@ start_server {tags {"expire"}} { assert_equal [r debug set-active-expire 1] {OK} } {} {needs:debug} - test {RANDOMKEY can return expired key in Pseudo-replica mode} { + test {RANDOMKEY can return expired key in import mode} { r flushall - r config set pseudo-replica yes - assert_equal [r replconf pseudo-master 1] {OK} + r config set import-mode yes + assert_equal [r client import-source on] {OK} r set foo1 bar PX 1 after 10 @@ -896,8 +896,8 @@ start_server {tags {"expire"}} { assert_equal [r randomkey] {foo1} - assert_equal [r replconf pseudo-master 0] {OK} - r config set pseudo-replica no + assert_equal [r client import-source off] {OK} + r config set import-mode no # Verify all keys have expired wait_for_condition 40 100 { diff --git a/tests/unit/maxmemory.tcl b/tests/unit/maxmemory.tcl index bf99084ce7..89e9699a3e 100755 --- a/tests/unit/maxmemory.tcl +++ b/tests/unit/maxmemory.tcl @@ -613,18 +613,18 @@ start_server {tags {"maxmemory" "external:skip"}} { } start_server {tags {"maxmemory" "external:skip"}} { - test {Pseudo-replica mode should forbid eviction} { + test {Import mode should forbid eviction} { r set key val - r config set pseudo-replica yes - assert_equal [r replconf pseudo-master 1] {OK} + r config set import-mode yes + assert_equal [r client import-source on] {OK} r config set maxmemory-policy allkeys-lru r config set maxmemory 1 assert_equal [r dbsize] {1} assert_error {OOM command not allowed*} {r set key1 val1} - assert_equal [r replconf pseudo-master 0] {OK} - r config set pseudo-replica no + assert_equal [r client import-source off] {OK} + r config set import-mode no assert_equal [r dbsize] {0} } diff --git a/valkey.conf b/valkey.conf index 9085473477..d93a564c44 100755 --- a/valkey.conf +++ b/valkey.conf @@ -801,12 +801,12 @@ replica-priority 100 # # replica-ignore-disk-write-errors no -# Make the master behave like a replica, which forbids expiration and evcition. +# Make the primary forbid expiration and evcition. # This is useful for sync tools, because expiration and evcition may cause the data corruption. -# Sync tools can set their connections into 'pseudo-master' state by REPLCONF PSEUDO-MASTER to -# behave like a master(i.e. visit expired keys). +# Sync tools can set their connections into 'pseudo-primary' state by CLIENT PSEUDO-PRIMARY to +# behave like a primary(i.e. visit expired keys). # -# pseudo-replica no +# import-mode no # ----------------------------------------------------------------------------- # By default, Sentinel includes all replicas in its reports. A replica From c2e86bffe23492047eeb3166fafb37b425031b4b Mon Sep 17 00:00:00 2001 From: "lvyanqi.lyq" Date: Fri, 18 Oct 2024 15:05:53 +0800 Subject: [PATCH 03/14] revert the file permission change Signed-off-by: lvyanqi.lyq --- src/cluster.c | 0 src/cluster_legacy.c | 0 src/config.c | 0 src/db.c | 0 src/evict.c | 0 src/expire.c | 0 src/replication.c | 0 src/server.c | 0 src/server.h | 0 tests/unit/expire.tcl | 0 tests/unit/maxmemory.tcl | 0 valkey.conf | 0 12 files changed, 0 insertions(+), 0 deletions(-) mode change 100755 => 100644 src/cluster.c mode change 100755 => 100644 src/cluster_legacy.c mode change 100755 => 100644 src/config.c mode change 100755 => 100644 src/db.c mode change 100755 => 100644 src/evict.c mode change 100755 => 100644 src/expire.c mode change 100755 => 100644 src/replication.c mode change 100755 => 100644 src/server.c mode change 100755 => 100644 src/server.h mode change 100755 => 100644 tests/unit/expire.tcl mode change 100755 => 100644 tests/unit/maxmemory.tcl mode change 100755 => 100644 valkey.conf diff --git a/src/cluster.c b/src/cluster.c old mode 100755 new mode 100644 diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c old mode 100755 new mode 100644 diff --git a/src/config.c b/src/config.c old mode 100755 new mode 100644 diff --git a/src/db.c b/src/db.c old mode 100755 new mode 100644 diff --git a/src/evict.c b/src/evict.c old mode 100755 new mode 100644 diff --git a/src/expire.c b/src/expire.c old mode 100755 new mode 100644 diff --git a/src/replication.c b/src/replication.c old mode 100755 new mode 100644 diff --git a/src/server.c b/src/server.c old mode 100755 new mode 100644 diff --git a/src/server.h b/src/server.h old mode 100755 new mode 100644 diff --git a/tests/unit/expire.tcl b/tests/unit/expire.tcl old mode 100755 new mode 100644 diff --git a/tests/unit/maxmemory.tcl b/tests/unit/maxmemory.tcl old mode 100755 new mode 100644 diff --git a/valkey.conf b/valkey.conf old mode 100755 new mode 100644 From 937924c14f43fe91ee11e01663fda1734f5c92f1 Mon Sep 17 00:00:00 2001 From: "lvyanqi.lyq" Date: Fri, 18 Oct 2024 15:15:34 +0800 Subject: [PATCH 04/14] minor comments fix Signed-off-by: lvyanqi.lyq --- src/commands/client-import-source.json | 2 +- src/networking.c | 6 +++--- valkey.conf | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/commands/client-import-source.json b/src/commands/client-import-source.json index dfebef5ca0..6f196a1534 100644 --- a/src/commands/client-import-source.json +++ b/src/commands/client-import-source.json @@ -1,6 +1,6 @@ { "IMPORT-SOURCE": { - "summary": "Allow this client to import data.", + "summary": "Mark this client as importing source when server is in import mode.", "complexity": "O(1)", "group": "connection", "since": "8.2.0", diff --git a/src/networking.c b/src/networking.c index 35cd4cb7e5..8d6abc55f0 100644 --- a/src/networking.c +++ b/src/networking.c @@ -3567,9 +3567,9 @@ void clientCommand(client *c) { " Protect current client connection from eviction.", "NO-TOUCH (ON|OFF)", " Will not touch LRU/LFU stats when this mode is on.", - "PSEUDO-PRIMARY (ON|OFF)", - " Set this connection behaving like a primary if server.import_mode is true.", - " Sync tools can set their connections into 'pseudo-primary' state to visit expired keys.", + "IMPORY-SOURCE (ON|OFF)", + " Set this connection as importing source when server.import_mode is true.", + " Sync tools can set their connections into this state to visit expired keys.", NULL}; addReplyHelp(c, help); } else if (!strcasecmp(c->argv[1]->ptr, "id") && c->argc == 2) { diff --git a/valkey.conf b/valkey.conf index d93a564c44..260b730bbd 100644 --- a/valkey.conf +++ b/valkey.conf @@ -803,8 +803,8 @@ replica-priority 100 # Make the primary forbid expiration and evcition. # This is useful for sync tools, because expiration and evcition may cause the data corruption. -# Sync tools can set their connections into 'pseudo-primary' state by CLIENT PSEUDO-PRIMARY to -# behave like a primary(i.e. visit expired keys). +# Sync tools can mark their connections as importing source by CLIENT IMPORT-SOURCE. +# NOTICE: Clients should avoid writing the same key on the source server and the destination server. # # import-mode no From b3b5c46f5fc6347c25c861c43755fb55b6d32ae5 Mon Sep 17 00:00:00 2001 From: "lvyanqi.lyq" Date: Fri, 18 Oct 2024 15:20:17 +0800 Subject: [PATCH 05/14] minor comments fix Signed-off-by: lvyanqi.lyq --- src/networking.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/networking.c b/src/networking.c index 8d6abc55f0..caf83dc589 100644 --- a/src/networking.c +++ b/src/networking.c @@ -3567,8 +3567,8 @@ void clientCommand(client *c) { " Protect current client connection from eviction.", "NO-TOUCH (ON|OFF)", " Will not touch LRU/LFU stats when this mode is on.", - "IMPORY-SOURCE (ON|OFF)", - " Set this connection as importing source when server.import_mode is true.", + "IMPORT-SOURCE (ON|OFF)", + " Mark this connection as importing source when server.import_mode is true.", " Sync tools can set their connections into this state to visit expired keys.", NULL}; addReplyHelp(c, help); From a79a51aaa3da2d0aea3e148a24ab3ed70deb73cd Mon Sep 17 00:00:00 2001 From: "lvyanqi.lyq" Date: Mon, 21 Oct 2024 11:42:37 +0800 Subject: [PATCH 06/14] fix typo Signed-off-by: lvyanqi.lyq --- src/networking.c | 5 +++-- src/server.h | 2 +- valkey.conf | 4 ++-- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/networking.c b/src/networking.c index caf83dc589..925bef8372 100644 --- a/src/networking.c +++ b/src/networking.c @@ -3568,8 +3568,9 @@ void clientCommand(client *c) { "NO-TOUCH (ON|OFF)", " Will not touch LRU/LFU stats when this mode is on.", "IMPORT-SOURCE (ON|OFF)", - " Mark this connection as importing source when server.import_mode is true.", - " Sync tools can set their connections into this state to visit expired keys.", + " Mark this connection as an import source if server.import_mode is true.", + " Sync tools can set their connections into 'import-source' state to visit", + " expired keys.", NULL}; addReplyHelp(c, help); } else if (!strcasecmp(c->argv[1]->ptr, "id") && c->argc == 2) { diff --git a/src/server.h b/src/server.h index 5afb11edab..8c07ed77e6 100644 --- a/src/server.h +++ b/src/server.h @@ -2072,7 +2072,7 @@ struct valkeyServer { long long primary_initial_offset; /* Primary PSYNC offset. */ int repl_replica_lazy_flush; /* Lazy FLUSHALL before loading DB? */ /* Import Mode */ - int import_mode; /* If true, server is in import mode and forbid expiration and evcition. */ + int import_mode; /* If true, server is in import mode and forbid expiration and eviction. */ /* Synchronous replication. */ list *clients_waiting_acks; /* Clients waiting in WAIT or WAITAOF. */ int get_ack_from_replicas; /* If true we send REPLCONF GETACK. */ diff --git a/valkey.conf b/valkey.conf index 260b730bbd..a0c6cb6285 100644 --- a/valkey.conf +++ b/valkey.conf @@ -801,8 +801,8 @@ replica-priority 100 # # replica-ignore-disk-write-errors no -# Make the primary forbid expiration and evcition. -# This is useful for sync tools, because expiration and evcition may cause the data corruption. +# Make the primary forbid expiration and eviction. +# This is useful for sync tools, because expiration and eviction may cause the data corruption. # Sync tools can mark their connections as importing source by CLIENT IMPORT-SOURCE. # NOTICE: Clients should avoid writing the same key on the source server and the destination server. # From 3e365482c082e62c7ab34ef14f4efd8b4c6c432d Mon Sep 17 00:00:00 2001 From: "lvyanqi.lyq" Date: Mon, 21 Oct 2024 17:52:53 +0800 Subject: [PATCH 07/14] auto-generate commands.def Signed-off-by: lvyanqi.lyq --- src/commands.def | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/commands.def b/src/commands.def index 8abd059b10..4ffb8ae929 100644 --- a/src/commands.def +++ b/src/commands.def @@ -1658,7 +1658,7 @@ struct COMMAND_STRUCT CLIENT_Subcommands[] = { {MAKE_CMD("getredir","Returns the client ID to which the connection's tracking notifications are redirected.","O(1)","6.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_GETREDIR_History,0,CLIENT_GETREDIR_Tips,0,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_GETREDIR_Keyspecs,0,NULL,0)}, {MAKE_CMD("help","Returns helpful text about the different subcommands.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_HELP_History,0,CLIENT_HELP_Tips,0,clientCommand,2,CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_HELP_Keyspecs,0,NULL,0)}, {MAKE_CMD("id","Returns the unique client ID of the connection.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_ID_History,0,CLIENT_ID_Tips,0,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_ID_Keyspecs,0,NULL,0)}, -{MAKE_CMD("import-source","Allow this client to import data.","O(1)","8.2.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_IMPORT_SOURCE_History,0,CLIENT_IMPORT_SOURCE_Tips,0,clientCommand,3,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,ACL_CATEGORY_CONNECTION,CLIENT_IMPORT_SOURCE_Keyspecs,0,NULL,1),.args=CLIENT_IMPORT_SOURCE_Args}, +{MAKE_CMD("import-source","Mark this client as importing source when server is in import mode.","O(1)","8.2.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_IMPORT_SOURCE_History,0,CLIENT_IMPORT_SOURCE_Tips,0,clientCommand,3,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,ACL_CATEGORY_CONNECTION,CLIENT_IMPORT_SOURCE_Keyspecs,0,NULL,1),.args=CLIENT_IMPORT_SOURCE_Args}, {MAKE_CMD("info","Returns information about the connection.","O(1)","6.2.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_INFO_History,0,CLIENT_INFO_Tips,1,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_INFO_Keyspecs,0,NULL,0)}, {MAKE_CMD("kill","Terminates open connections.","O(N) where N is the number of client connections","2.4.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_KILL_History,7,CLIENT_KILL_Tips,0,clientCommand,-3,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_KILL_Keyspecs,0,NULL,1),.args=CLIENT_KILL_Args}, {MAKE_CMD("list","Lists open connections.","O(N) where N is the number of client connections","2.4.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_LIST_History,7,CLIENT_LIST_Tips,1,clientCommand,-2,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_LIST_Keyspecs,0,NULL,2),.args=CLIENT_LIST_Args}, From 7883e9f30ccfe50e9b5607214aa3d8aafbf816f6 Mon Sep 17 00:00:00 2001 From: "lvyanqi.lyq" Date: Mon, 21 Oct 2024 17:55:54 +0800 Subject: [PATCH 08/14] auto-generate commands.def Signed-off-by: lvyanqi.lyq --- src/commands.def | 2 +- src/commands/client-import-source.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/commands.def b/src/commands.def index 4ffb8ae929..425afeadac 100644 --- a/src/commands.def +++ b/src/commands.def @@ -1658,7 +1658,7 @@ struct COMMAND_STRUCT CLIENT_Subcommands[] = { {MAKE_CMD("getredir","Returns the client ID to which the connection's tracking notifications are redirected.","O(1)","6.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_GETREDIR_History,0,CLIENT_GETREDIR_Tips,0,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_GETREDIR_Keyspecs,0,NULL,0)}, {MAKE_CMD("help","Returns helpful text about the different subcommands.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_HELP_History,0,CLIENT_HELP_Tips,0,clientCommand,2,CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_HELP_Keyspecs,0,NULL,0)}, {MAKE_CMD("id","Returns the unique client ID of the connection.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_ID_History,0,CLIENT_ID_Tips,0,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_ID_Keyspecs,0,NULL,0)}, -{MAKE_CMD("import-source","Mark this client as importing source when server is in import mode.","O(1)","8.2.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_IMPORT_SOURCE_History,0,CLIENT_IMPORT_SOURCE_Tips,0,clientCommand,3,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,ACL_CATEGORY_CONNECTION,CLIENT_IMPORT_SOURCE_Keyspecs,0,NULL,1),.args=CLIENT_IMPORT_SOURCE_Args}, +{MAKE_CMD("import-source","Mark this client as an import source when server is in import mode.","O(1)","8.2.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_IMPORT_SOURCE_History,0,CLIENT_IMPORT_SOURCE_Tips,0,clientCommand,3,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,ACL_CATEGORY_CONNECTION,CLIENT_IMPORT_SOURCE_Keyspecs,0,NULL,1),.args=CLIENT_IMPORT_SOURCE_Args}, {MAKE_CMD("info","Returns information about the connection.","O(1)","6.2.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_INFO_History,0,CLIENT_INFO_Tips,1,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_INFO_Keyspecs,0,NULL,0)}, {MAKE_CMD("kill","Terminates open connections.","O(N) where N is the number of client connections","2.4.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_KILL_History,7,CLIENT_KILL_Tips,0,clientCommand,-3,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_KILL_Keyspecs,0,NULL,1),.args=CLIENT_KILL_Args}, {MAKE_CMD("list","Lists open connections.","O(N) where N is the number of client connections","2.4.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_LIST_History,7,CLIENT_LIST_Tips,1,clientCommand,-2,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_LIST_Keyspecs,0,NULL,2),.args=CLIENT_LIST_Args}, diff --git a/src/commands/client-import-source.json b/src/commands/client-import-source.json index 6f196a1534..57d08a6d0e 100644 --- a/src/commands/client-import-source.json +++ b/src/commands/client-import-source.json @@ -1,6 +1,6 @@ { "IMPORT-SOURCE": { - "summary": "Mark this client as importing source when server is in import mode.", + "summary": "Mark this client as an import source when server is in import mode.", "complexity": "O(1)", "group": "connection", "since": "8.2.0", From 523b4624cf1b1f6c71a496e1d1a33aa0dd321ded Mon Sep 17 00:00:00 2001 From: Yanqi Lv Date: Tue, 12 Nov 2024 09:10:37 +0800 Subject: [PATCH 09/14] Update src/commands/client-import-source.json Co-authored-by: Madelyn Olson Signed-off-by: Yanqi Lv --- src/commands/client-import-source.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/commands/client-import-source.json b/src/commands/client-import-source.json index 57d08a6d0e..113c07d70a 100644 --- a/src/commands/client-import-source.json +++ b/src/commands/client-import-source.json @@ -3,7 +3,7 @@ "summary": "Mark this client as an import source when server is in import mode.", "complexity": "O(1)", "group": "connection", - "since": "8.2.0", + "since": "8.1.0", "arity": 3, "container": "CLIENT", "function": "clientCommand", From be13edcc20136947cdd6e3f387a919e47a699a61 Mon Sep 17 00:00:00 2001 From: "lvyanqi.lyq" Date: Tue, 12 Nov 2024 09:30:16 +0800 Subject: [PATCH 10/14] update commands.def Signed-off-by: lvyanqi.lyq --- src/commands.def | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/commands.def b/src/commands.def index 425afeadac..820e468533 100644 --- a/src/commands.def +++ b/src/commands.def @@ -1658,7 +1658,7 @@ struct COMMAND_STRUCT CLIENT_Subcommands[] = { {MAKE_CMD("getredir","Returns the client ID to which the connection's tracking notifications are redirected.","O(1)","6.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_GETREDIR_History,0,CLIENT_GETREDIR_Tips,0,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_GETREDIR_Keyspecs,0,NULL,0)}, {MAKE_CMD("help","Returns helpful text about the different subcommands.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_HELP_History,0,CLIENT_HELP_Tips,0,clientCommand,2,CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_HELP_Keyspecs,0,NULL,0)}, {MAKE_CMD("id","Returns the unique client ID of the connection.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_ID_History,0,CLIENT_ID_Tips,0,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_ID_Keyspecs,0,NULL,0)}, -{MAKE_CMD("import-source","Mark this client as an import source when server is in import mode.","O(1)","8.2.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_IMPORT_SOURCE_History,0,CLIENT_IMPORT_SOURCE_Tips,0,clientCommand,3,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,ACL_CATEGORY_CONNECTION,CLIENT_IMPORT_SOURCE_Keyspecs,0,NULL,1),.args=CLIENT_IMPORT_SOURCE_Args}, +{MAKE_CMD("import-source","Mark this client as an import source when server is in import mode.","O(1)","8.1.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_IMPORT_SOURCE_History,0,CLIENT_IMPORT_SOURCE_Tips,0,clientCommand,3,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,ACL_CATEGORY_CONNECTION,CLIENT_IMPORT_SOURCE_Keyspecs,0,NULL,1),.args=CLIENT_IMPORT_SOURCE_Args}, {MAKE_CMD("info","Returns information about the connection.","O(1)","6.2.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_INFO_History,0,CLIENT_INFO_Tips,1,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_INFO_Keyspecs,0,NULL,0)}, {MAKE_CMD("kill","Terminates open connections.","O(N) where N is the number of client connections","2.4.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_KILL_History,7,CLIENT_KILL_Tips,0,clientCommand,-3,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_KILL_Keyspecs,0,NULL,1),.args=CLIENT_KILL_Args}, {MAKE_CMD("list","Lists open connections.","O(N) where N is the number of client connections","2.4.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_LIST_History,7,CLIENT_LIST_Tips,1,clientCommand,-2,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_LIST_Keyspecs,0,NULL,2),.args=CLIENT_LIST_Args}, From 7b1127ee56f2ab244f260107da1c121260f6e0de Mon Sep 17 00:00:00 2001 From: "lvyanqi.lyq" Date: Wed, 13 Nov 2024 11:13:20 +0800 Subject: [PATCH 11/14] import-mode only takes effect on primary Signed-off-by: lvyanqi.lyq --- src/cluster_legacy.c | 5 ----- src/config.c | 15 +-------------- src/evict.c | 2 +- src/replication.c | 5 ----- src/server.c | 6 ++++-- 5 files changed, 6 insertions(+), 27 deletions(-) diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 6ce5967cbe..14f8a6bd1e 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -6648,11 +6648,6 @@ int clusterCommandSpecial(client *c) { return 1; } - if (server.import_mode) { - addReplyError(c, "CLUSTER REPLICATE not allowed in import mode."); - return 1; - } - /* If the instance is currently a primary, it should have no assigned * slots nor keys to accept to replicate some other node. * Replicas can switch to another primary without issues. */ diff --git a/src/config.c b/src/config.c index bdbbbe5891..01795258b1 100644 --- a/src/config.c +++ b/src/config.c @@ -2331,14 +2331,6 @@ static int isValidActiveDefrag(int val, const char **err) { return 1; } -static int isValidImportMode(int val, const char **err) { - if (server.primary_host && val) { - *err = "Server is already in replica mode"; - return 0; - } - return 1; -} - static int isValidClusterConfigFile(char *val, const char **err) { if (!strcmp(val, "")) { *err = "cluster-config-file can't be empty"; @@ -2957,11 +2949,6 @@ static int setConfigReplicaOfOption(standardConfig *config, sds *argv, int argc, return 0; } - if (server.import_mode) { - *err = "REPLICAOF not allowed in import mode"; - return 0; - } - sdsfree(server.primary_host); server.primary_host = NULL; if (!strcasecmp(argv[0], "no") && !strcasecmp(argv[1], "one")) { @@ -3149,7 +3136,7 @@ standardConfig static_configs[] = { createBoolConfig("enable-debug-assert", NULL, IMMUTABLE_CONFIG | HIDDEN_CONFIG, server.enable_debug_assert, 0, NULL, NULL), createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL), createBoolConfig("hide-user-data-from-log", NULL, MODIFIABLE_CONFIG, server.hide_user_data_from_log, 1, NULL, NULL), - createBoolConfig("import-mode", NULL, MODIFIABLE_CONFIG, server.import_mode, 0, isValidImportMode, NULL), + createBoolConfig("import-mode", NULL, MODIFIABLE_CONFIG, server.import_mode, 0, NULL, NULL), /* String Configs */ createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL), diff --git a/src/evict.c b/src/evict.c index 0eaacfb62c..5208328b32 100644 --- a/src/evict.c +++ b/src/evict.c @@ -546,7 +546,7 @@ int performEvictions(void) { goto update_metrics; } - if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION || server.import_mode) { + if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION || (iAmPrimary() && server.import_mode)) { result = EVICT_FAIL; /* We need to free memory, but policy forbids or we are in import mode. */ goto update_metrics; } diff --git a/src/replication.c b/src/replication.c index 0f582ba726..63433de865 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3961,11 +3961,6 @@ void replicaofCommand(client *c) { return; } - if (server.import_mode) { - addReplyError(c, "REPLICAOF not allowed in import mode."); - return; - } - if (server.failover_state != NO_FAILOVER) { addReplyError(c, "REPLICAOF not allowed while failing over."); return; diff --git a/src/server.c b/src/server.c index c34d95bd21..3c23bde71b 100644 --- a/src/server.c +++ b/src/server.c @@ -1054,9 +1054,11 @@ void clientsCron(void) { void databasesCron(void) { /* Expire keys by random sampling. Not required for replicas * as primary will synthesize DELs for us. */ - if (server.active_expire_enabled && !server.import_mode) { + if (server.active_expire_enabled) { if (iAmPrimary()) { - activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); + if (!server.import_mode) { + activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); + } } else { expireReplicaKeys(); } From 11f75bc70a3ee395e567cb3355a5b0d1d993f0dd Mon Sep 17 00:00:00 2001 From: "lvyanqi.lyq" Date: Wed, 13 Nov 2024 17:53:09 +0800 Subject: [PATCH 12/14] update comment Signed-off-by: lvyanqi.lyq --- src/db.c | 16 ++++++++++++++++ src/expire.c | 5 ++++- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/src/db.c b/src/db.c index 30200275a7..dfc3e4a77f 100644 --- a/src/db.c +++ b/src/db.c @@ -1827,6 +1827,22 @@ keyStatus expireIfNeededWithDictIndex(serverDb *db, robj *key, int flags, int di if (server.current_client && (server.current_client->flag.primary)) return KEY_VALID; if (!(flags & EXPIRE_FORCE_DELETE_EXPIRED)) return KEY_EXPIRED; } else if (server.import_mode) { + /* If we are running in the import mode on a primary, instead of + * evicting the expired key from the database, we return ASAP: + * the key expiration is controlled by the import source that will + * send us synthesized DEL operations for expired keys. The + * exception is when write operations are performed on this server + * because it's a primary. + * + * Notice: other clients, apart from the import source, should not access + * the data imported by import source. + * + * Still we try to return the right information to the caller, + * that is, KEY_VALID if we think the key should still be valid, + * KEY_EXPIRED if we think the key is expired but don't want to delete it at this time. + * + * When receiving commands from the import source, keys are never considered + * expired. */ if (server.current_client && (server.current_client->flag.import_source)) return KEY_VALID; if (!(flags & EXPIRE_FORCE_DELETE_EXPIRED)) return KEY_EXPIRED; } diff --git a/src/expire.c b/src/expire.c index 8c79abcde7..2dd57dc26d 100644 --- a/src/expire.c +++ b/src/expire.c @@ -520,7 +520,10 @@ int checkAlreadyExpired(long long when) { * of a replica instance. * * Instead we add the already expired key to the database with expire time - * (possibly in the past) and wait for an explicit DEL from the primary. */ + * (possibly in the past) and wait for an explicit DEL from the primary. + * + * If the server is a primary and in the import mode, we also add the already + * expired key and wait for an explicit DEL from the import source. */ return (when <= commandTimeSnapshot() && !server.loading && !server.primary_host && !server.import_mode); } From 20b005cd1e45878f07fc886002015c9a475d0c27 Mon Sep 17 00:00:00 2001 From: "lvyanqi.lyq" Date: Wed, 13 Nov 2024 18:41:18 +0800 Subject: [PATCH 13/14] clang-format Signed-off-by: lvyanqi.lyq --- src/db.c | 6 +++--- src/expire.c | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/db.c b/src/db.c index dfc3e4a77f..6c08db54e9 100644 --- a/src/db.c +++ b/src/db.c @@ -1832,9 +1832,9 @@ keyStatus expireIfNeededWithDictIndex(serverDb *db, robj *key, int flags, int di * the key expiration is controlled by the import source that will * send us synthesized DEL operations for expired keys. The * exception is when write operations are performed on this server - * because it's a primary. - * - * Notice: other clients, apart from the import source, should not access + * because it's a primary. + * + * Notice: other clients, apart from the import source, should not access * the data imported by import source. * * Still we try to return the right information to the caller, diff --git a/src/expire.c b/src/expire.c index 2dd57dc26d..c22df1ef86 100644 --- a/src/expire.c +++ b/src/expire.c @@ -521,8 +521,8 @@ int checkAlreadyExpired(long long when) { * * Instead we add the already expired key to the database with expire time * (possibly in the past) and wait for an explicit DEL from the primary. - * - * If the server is a primary and in the import mode, we also add the already + * + * If the server is a primary and in the import mode, we also add the already * expired key and wait for an explicit DEL from the import source. */ return (when <= commandTimeSnapshot() && !server.loading && !server.primary_host && !server.import_mode); } From 7f04964d5b7017f8f71395ebd0fb30d00d52179a Mon Sep 17 00:00:00 2001 From: "lvyanqi.lyq" Date: Fri, 15 Nov 2024 14:27:52 +0800 Subject: [PATCH 14/14] minor refactor Signed-off-by: lvyanqi.lyq --- src/server.c | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/server.c b/src/server.c index 3c23bde71b..73fdf23538 100644 --- a/src/server.c +++ b/src/server.c @@ -1055,12 +1055,10 @@ void databasesCron(void) { /* Expire keys by random sampling. Not required for replicas * as primary will synthesize DELs for us. */ if (server.active_expire_enabled) { - if (iAmPrimary()) { - if (!server.import_mode) { - activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); - } - } else { + if (!iAmPrimary()) { expireReplicaKeys(); + } else if (!server.import_mode) { + activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); } }