diff --git a/src/cluster.c b/src/cluster.c old mode 100644 new mode 100755 diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c old mode 100644 new mode 100755 index 14f8a6bd1e..d21e30a649 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -6648,6 +6648,11 @@ int clusterCommandSpecial(client *c) { return 1; } + if (server.pseudo_replica) { + addReplyError(c, "CLUSTER REPLICATE not allowed in pseudo-replica mode."); + return 1; + } + /* If the instance is currently a primary, it should have no assigned * slots nor keys to accept to replicate some other node. * Replicas can switch to another primary without issues. */ diff --git a/src/config.c b/src/config.c old mode 100644 new mode 100755 index 663cf5da38..1f0184015f --- a/src/config.c +++ b/src/config.c @@ -2331,6 +2331,14 @@ static int isValidActiveDefrag(int val, const char **err) { return 1; } +static int isValidPseudoReplica(int val, const char **err) { + if (server.primary_host && val) { + *err = "Server is already a replica"; + return 0; + } + return 1; +} + static int isValidClusterConfigFile(char *val, const char **err) { if (!strcmp(val, "")) { *err = "cluster-config-file can't be empty"; @@ -2949,6 +2957,11 @@ static int setConfigReplicaOfOption(standardConfig *config, sds *argv, int argc, return 0; } + if (server.pseudo_replica) { + *err = "REPLICAOF not allowed in pseudo-replica mode"; + return 0; + } + sdsfree(server.primary_host); server.primary_host = NULL; if (!strcasecmp(argv[0], "no") && !strcasecmp(argv[1], "one")) { @@ -3136,6 +3149,7 @@ standardConfig static_configs[] = { createBoolConfig("enable-debug-assert", NULL, IMMUTABLE_CONFIG | HIDDEN_CONFIG, server.enable_debug_assert, 0, NULL, NULL), createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL), createBoolConfig("hide-user-data-from-log", NULL, MODIFIABLE_CONFIG, server.hide_user_data_from_log, 1, NULL, NULL), + createBoolConfig("pseudo-replica", NULL, MODIFIABLE_CONFIG, server.pseudo_replica, 0, isValidPseudoReplica, NULL), /* String Configs */ createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL), diff --git a/src/db.c b/src/db.c old mode 100644 new mode 100755 index 3493e2d863..13f5948405 --- a/src/db.c +++ b/src/db.c @@ -382,7 +382,7 @@ robj *dbRandomKey(serverDb *db) { key = dictGetKey(de); keyobj = createStringObject(key, sdslen(key)); if (dbFindExpires(db, key)) { - if (allvolatile && server.primary_host && --maxtries == 0) { + if (allvolatile && (server.primary_host || server.pseudo_replica) && --maxtries == 0) { /* If the DB is composed only of keys with an expire set, * it could happen that all the keys are already logically * expired in the repilca, so the function cannot stop because @@ -1832,6 +1832,9 @@ keyStatus expireIfNeeded(serverDb *db, robj *key, int flags) { if (server.primary_host != NULL) { if (server.current_client && (server.current_client->flag.primary)) return KEY_VALID; if (!(flags & EXPIRE_FORCE_DELETE_EXPIRED)) return KEY_EXPIRED; + } else if (server.pseudo_replica) { + if (server.current_client && (server.current_client->flag.pseudo_master)) return KEY_VALID; + if (!(flags & EXPIRE_FORCE_DELETE_EXPIRED)) return KEY_EXPIRED; } /* In some cases we're explicitly instructed to return an indication of a diff --git a/src/evict.c b/src/evict.c old mode 100644 new mode 100755 index 5e4b6220eb..aa1057e36f --- a/src/evict.c +++ b/src/evict.c @@ -546,8 +546,8 @@ int performEvictions(void) { goto update_metrics; } - if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION) { - result = EVICT_FAIL; /* We need to free memory, but policy forbids. */ + if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION || server.pseudo_replica) { + result = EVICT_FAIL; /* We need to free memory, but policy forbids or we are in 'pseudo-replica' mode. */ goto update_metrics; } diff --git a/src/expire.c b/src/expire.c old mode 100644 new mode 100755 index 928bb58d86..9a6f8dc84c --- a/src/expire.c +++ b/src/expire.c @@ -521,7 +521,7 @@ int checkAlreadyExpired(long long when) { * * Instead we add the already expired key to the database with expire time * (possibly in the past) and wait for an explicit DEL from the primary. */ - return (when <= commandTimeSnapshot() && !server.loading && !server.primary_host); + return (when <= commandTimeSnapshot() && !server.loading && !server.primary_host && !server.pseudo_replica); } #define EXPIRE_NX (1 << 0) diff --git a/src/replication.c b/src/replication.c old mode 100644 new mode 100755 index 63433de865..f053e86da4 --- a/src/replication.c +++ b/src/replication.c @@ -1272,6 +1272,10 @@ void syncCommand(client *c) { * - rdb-channel <1|0> * Used to identify the client as a replica's rdb connection in an dual channel * sync session. + * + * - pseudo-master <0|1> + * Set this connection behaving like a master if server.pseudo_replica is true. + * Sync tools can set their connections into 'pseudo-master' state to visit expired keys. * */ void replconfCommand(client *c) { int j; @@ -1418,6 +1422,16 @@ void replconfCommand(client *c) { return; } c->associated_rdb_client_id = (uint64_t)client_id; + } else if (!strcasecmp(c->argv[j]->ptr, "pseudo-master")) { + long pseudo_master = 0; + if (getRangeLongFromObjectOrReply(c, c->argv[j + 1], + 0, 1, &pseudo_master, NULL) != C_OK) + return; + if (pseudo_master == 1) { + c->flag.pseudo_master = 1; + } else { + c->flag.pseudo_master = 0; + } } else { addReplyErrorFormat(c, "Unrecognized REPLCONF option: %s", (char *)c->argv[j]->ptr); return; @@ -3961,6 +3975,11 @@ void replicaofCommand(client *c) { return; } + if (server.pseudo_replica) { + addReplyError(c, "REPLICAOF not allowed in pseudo-replica mode."); + return; + } + if (server.failover_state != NO_FAILOVER) { addReplyError(c, "REPLICAOF not allowed while failing over."); return; diff --git a/src/server.c b/src/server.c old mode 100644 new mode 100755 index ab95f84346..e56860cbe8 --- a/src/server.c +++ b/src/server.c @@ -1054,7 +1054,7 @@ void clientsCron(void) { void databasesCron(void) { /* Expire keys by random sampling. Not required for replicas * as primary will synthesize DELs for us. */ - if (server.active_expire_enabled) { + if (server.active_expire_enabled && !server.pseudo_replica) { if (iAmPrimary()) { activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); } else { @@ -1651,7 +1651,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* Run a fast expire cycle (the called function will return * ASAP if a fast cycle is not needed). */ - if (server.active_expire_enabled && iAmPrimary()) activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); + if (server.active_expire_enabled && !server.pseudo_replica && iAmPrimary()) activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); if (moduleCount()) { moduleFireServerEvent(VALKEYMODULE_EVENT_EVENTLOOP, VALKEYMODULE_SUBEVENT_EVENTLOOP_BEFORE_SLEEP, NULL); @@ -2057,6 +2057,7 @@ void initServerConfig(void) { server.extended_redis_compat = 0; server.pause_cron = 0; server.dict_resizing = 1; + server.pseudo_replica = 0; server.latency_tracking_info_percentiles_len = 3; server.latency_tracking_info_percentiles = zmalloc(sizeof(double) * (server.latency_tracking_info_percentiles_len)); diff --git a/src/server.h b/src/server.h old mode 100644 new mode 100755 index 4fad8d2508..590e352031 --- a/src/server.h +++ b/src/server.h @@ -1224,7 +1224,8 @@ typedef struct ClientFlags { * knows that it does not need the cache and required a full sync. With this * flag, we won't cache the primary in freeClient. */ uint64_t fake : 1; /* This is a fake client without a real connection. */ - uint64_t reserved : 5; /* Reserved for future use */ + uint64_t pseudo_master : 1; /* This client is a pseudo master */ + uint64_t reserved : 4; /* Reserved for future use */ } ClientFlags; typedef struct client { @@ -2070,6 +2071,8 @@ struct valkeyServer { char primary_replid[CONFIG_RUN_ID_SIZE + 1]; /* Primary PSYNC runid. */ long long primary_initial_offset; /* Primary PSYNC offset. */ int repl_replica_lazy_flush; /* Lazy FLUSHALL before loading DB? */ + /* Pseudo Replica */ + int pseudo_replica; /* If true, server is a pseudo replica. */ /* Synchronous replication. */ list *clients_waiting_acks; /* Clients waiting in WAIT or WAITAOF. */ int get_ack_from_replicas; /* If true we send REPLCONF GETACK. */ diff --git a/tests/unit/expire.tcl b/tests/unit/expire.tcl old mode 100644 new mode 100755 index d85ce7ee68..09acb350c4 --- a/tests/unit/expire.tcl +++ b/tests/unit/expire.tcl @@ -832,6 +832,80 @@ start_server {tags {"expire"}} { close_replication_stream $repl assert_equal [r debug set-active-expire 1] {OK} } {} {needs:debug} + + test {Pseudo-replica mode should forbid active expiration} { + r flushall + + r config set pseudo-replica yes + assert_equal [r replconf pseudo-master 1] {OK} + + r set foo1 bar PX 1 + r set foo2 bar PX 1 + after 100 + + assert_equal [r dbsize] {2} + + assert_equal [r replconf pseudo-master 0] {OK} + r config set pseudo-replica no + + # Verify all keys have expired + wait_for_condition 40 100 { + [r dbsize] eq 0 + } else { + fail "Keys did not actively expire." + } + } + + test {Pseudo-replica mode should forbid lazy expiration} { + r flushall + r debug set-active-expire 0 + + r config set pseudo-replica yes + assert_equal [r replconf pseudo-master 1] {OK} + + r set foo1 1 PX 1 + after 10 + + r get foo1 + assert_equal [r dbsize] {1} + + assert_equal [r replconf pseudo-master 0] {OK} + r config set pseudo-replica no + + r get foo1 + + assert_equal [r dbsize] {0} + + assert_equal [r debug set-active-expire 1] {OK} + } {} {needs:debug} + + test {RANDOMKEY can return expired key in Pseudo-replica mode} { + r flushall + + r config set pseudo-replica yes + assert_equal [r replconf pseudo-master 1] {OK} + + r set foo1 bar PX 1 + after 10 + + set client [valkey [srv "host"] [srv "port"] 0 $::tls] + if {!$::singledb} { + $client select 9 + } + assert_equal [$client ttl foo1] {-2} + + assert_equal [r randomkey] {foo1} + + assert_equal [r replconf pseudo-master 0] {OK} + r config set pseudo-replica no + + # Verify all keys have expired + wait_for_condition 40 100 { + [r dbsize] eq 0 + } else { + fail "Keys did not actively expire." + } + } } start_cluster 1 0 {tags {"expire external:skip cluster"}} { diff --git a/tests/unit/maxmemory.tcl b/tests/unit/maxmemory.tcl old mode 100644 new mode 100755 index d4e62246f1..bf99084ce7 --- a/tests/unit/maxmemory.tcl +++ b/tests/unit/maxmemory.tcl @@ -611,3 +611,21 @@ start_server {tags {"maxmemory" "external:skip"}} { assert {[r object freq foo] == 5} } } + +start_server {tags {"maxmemory" "external:skip"}} { + test {Pseudo-replica mode should forbid eviction} { + r set key val + r config set pseudo-replica yes + assert_equal [r replconf pseudo-master 1] {OK} + r config set maxmemory-policy allkeys-lru + r config set maxmemory 1 + + assert_equal [r dbsize] {1} + assert_error {OOM command not allowed*} {r set key1 val1} + + assert_equal [r replconf pseudo-master 0] {OK} + r config set pseudo-replica no + + assert_equal [r dbsize] {0} + } +} \ No newline at end of file diff --git a/valkey.conf b/valkey.conf old mode 100644 new mode 100755 index f485b42b1a..9085473477 --- a/valkey.conf +++ b/valkey.conf @@ -801,6 +801,13 @@ replica-priority 100 # # replica-ignore-disk-write-errors no +# Make the master behave like a replica, which forbids expiration and evcition. +# This is useful for sync tools, because expiration and evcition may cause the data corruption. +# Sync tools can set their connections into 'pseudo-master' state by REPLCONF PSEUDO-MASTER to +# behave like a master(i.e. visit expired keys). +# +# pseudo-replica no + # ----------------------------------------------------------------------------- # By default, Sentinel includes all replicas in its reports. A replica # can be excluded from Sentinel's announcements. An unannounced replica