Skip to content

Commit

Permalink
add pseudo-replica mode
Browse files Browse the repository at this point in the history
  • Loading branch information
lyq2333 committed Oct 17, 2024
1 parent 06cfe2c commit 52be593
Show file tree
Hide file tree
Showing 12 changed files with 151 additions and 7 deletions.
Empty file modified src/cluster.c
100644 → 100755
Empty file.
5 changes: 5 additions & 0 deletions src/cluster_legacy.c
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -6648,6 +6648,11 @@ int clusterCommandSpecial(client *c) {
return 1;
}

if (server.pseudo_replica) {
addReplyError(c, "CLUSTER REPLICATE not allowed in pseudo-replica mode.");
return 1;
}

/* If the instance is currently a primary, it should have no assigned
* slots nor keys to accept to replicate some other node.
* Replicas can switch to another primary without issues. */
Expand Down
14 changes: 14 additions & 0 deletions src/config.c
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -2331,6 +2331,14 @@ static int isValidActiveDefrag(int val, const char **err) {
return 1;
}

static int isValidPseudoReplica(int val, const char **err) {
if (server.primary_host && val) {
*err = "Server is already a replica";
return 0;
}
return 1;
}

static int isValidClusterConfigFile(char *val, const char **err) {
if (!strcmp(val, "")) {
*err = "cluster-config-file can't be empty";
Expand Down Expand Up @@ -2949,6 +2957,11 @@ static int setConfigReplicaOfOption(standardConfig *config, sds *argv, int argc,
return 0;
}

if (server.pseudo_replica) {
*err = "REPLICAOF not allowed in pseudo-replica mode";
return 0;
}

sdsfree(server.primary_host);
server.primary_host = NULL;
if (!strcasecmp(argv[0], "no") && !strcasecmp(argv[1], "one")) {
Expand Down Expand Up @@ -3136,6 +3149,7 @@ standardConfig static_configs[] = {
createBoolConfig("enable-debug-assert", NULL, IMMUTABLE_CONFIG | HIDDEN_CONFIG, server.enable_debug_assert, 0, NULL, NULL),
createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL),
createBoolConfig("hide-user-data-from-log", NULL, MODIFIABLE_CONFIG, server.hide_user_data_from_log, 1, NULL, NULL),
createBoolConfig("pseudo-replica", NULL, MODIFIABLE_CONFIG, server.pseudo_replica, 0, isValidPseudoReplica, NULL),

/* String Configs */
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL),
Expand Down
5 changes: 4 additions & 1 deletion src/db.c
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ robj *dbRandomKey(serverDb *db) {
key = dictGetKey(de);
keyobj = createStringObject(key, sdslen(key));
if (dbFindExpires(db, key)) {
if (allvolatile && server.primary_host && --maxtries == 0) {
if (allvolatile && (server.primary_host || server.pseudo_replica) && --maxtries == 0) {
/* If the DB is composed only of keys with an expire set,
* it could happen that all the keys are already logically
* expired in the repilca, so the function cannot stop because
Expand Down Expand Up @@ -1832,6 +1832,9 @@ keyStatus expireIfNeeded(serverDb *db, robj *key, int flags) {
if (server.primary_host != NULL) {
if (server.current_client && (server.current_client->flag.primary)) return KEY_VALID;
if (!(flags & EXPIRE_FORCE_DELETE_EXPIRED)) return KEY_EXPIRED;
} else if (server.pseudo_replica) {
if (server.current_client && (server.current_client->flag.pseudo_master)) return KEY_VALID;
if (!(flags & EXPIRE_FORCE_DELETE_EXPIRED)) return KEY_EXPIRED;
}

/* In some cases we're explicitly instructed to return an indication of a
Expand Down
4 changes: 2 additions & 2 deletions src/evict.c
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -546,8 +546,8 @@ int performEvictions(void) {
goto update_metrics;
}

if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION) {
result = EVICT_FAIL; /* We need to free memory, but policy forbids. */
if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION || server.pseudo_replica) {
result = EVICT_FAIL; /* We need to free memory, but policy forbids or we are in 'pseudo-replica' mode. */
goto update_metrics;
}

Expand Down
2 changes: 1 addition & 1 deletion src/expire.c
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ int checkAlreadyExpired(long long when) {
*
* Instead we add the already expired key to the database with expire time
* (possibly in the past) and wait for an explicit DEL from the primary. */
return (when <= commandTimeSnapshot() && !server.loading && !server.primary_host);
return (when <= commandTimeSnapshot() && !server.loading && !server.primary_host && !server.pseudo_replica);
}

#define EXPIRE_NX (1 << 0)
Expand Down
19 changes: 19 additions & 0 deletions src/replication.c
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -1272,6 +1272,10 @@ void syncCommand(client *c) {
* - rdb-channel <1|0>
* Used to identify the client as a replica's rdb connection in an dual channel
* sync session.
*
* - pseudo-master <0|1>
* Set this connection behaving like a master if server.pseudo_replica is true.
* Sync tools can set their connections into 'pseudo-master' state to visit expired keys.
* */
void replconfCommand(client *c) {
int j;
Expand Down Expand Up @@ -1418,6 +1422,16 @@ void replconfCommand(client *c) {
return;
}
c->associated_rdb_client_id = (uint64_t)client_id;
} else if (!strcasecmp(c->argv[j]->ptr, "pseudo-master")) {
long pseudo_master = 0;
if (getRangeLongFromObjectOrReply(c, c->argv[j + 1],
0, 1, &pseudo_master, NULL) != C_OK)
return;
if (pseudo_master == 1) {
c->flag.pseudo_master = 1;
} else {
c->flag.pseudo_master = 0;
}
} else {
addReplyErrorFormat(c, "Unrecognized REPLCONF option: %s", (char *)c->argv[j]->ptr);
return;
Expand Down Expand Up @@ -3961,6 +3975,11 @@ void replicaofCommand(client *c) {
return;
}

if (server.pseudo_replica) {
addReplyError(c, "REPLICAOF not allowed in pseudo-replica mode.");
return;
}

if (server.failover_state != NO_FAILOVER) {
addReplyError(c, "REPLICAOF not allowed while failing over.");
return;
Expand Down
5 changes: 3 additions & 2 deletions src/server.c
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -1054,7 +1054,7 @@ void clientsCron(void) {
void databasesCron(void) {
/* Expire keys by random sampling. Not required for replicas
* as primary will synthesize DELs for us. */
if (server.active_expire_enabled) {
if (server.active_expire_enabled && !server.pseudo_replica) {
if (iAmPrimary()) {
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
} else {
Expand Down Expand Up @@ -1651,7 +1651,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {

/* Run a fast expire cycle (the called function will return
* ASAP if a fast cycle is not needed). */
if (server.active_expire_enabled && iAmPrimary()) activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
if (server.active_expire_enabled && !server.pseudo_replica && iAmPrimary()) activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);

if (moduleCount()) {
moduleFireServerEvent(VALKEYMODULE_EVENT_EVENTLOOP, VALKEYMODULE_SUBEVENT_EVENTLOOP_BEFORE_SLEEP, NULL);
Expand Down Expand Up @@ -2057,6 +2057,7 @@ void initServerConfig(void) {
server.extended_redis_compat = 0;
server.pause_cron = 0;
server.dict_resizing = 1;
server.pseudo_replica = 0;

server.latency_tracking_info_percentiles_len = 3;
server.latency_tracking_info_percentiles = zmalloc(sizeof(double) * (server.latency_tracking_info_percentiles_len));
Expand Down
5 changes: 4 additions & 1 deletion src/server.h
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -1224,7 +1224,8 @@ typedef struct ClientFlags {
* knows that it does not need the cache and required a full sync. With this
* flag, we won't cache the primary in freeClient. */
uint64_t fake : 1; /* This is a fake client without a real connection. */
uint64_t reserved : 5; /* Reserved for future use */
uint64_t pseudo_master : 1; /* This client is a pseudo master */
uint64_t reserved : 4; /* Reserved for future use */
} ClientFlags;

typedef struct client {
Expand Down Expand Up @@ -2070,6 +2071,8 @@ struct valkeyServer {
char primary_replid[CONFIG_RUN_ID_SIZE + 1]; /* Primary PSYNC runid. */
long long primary_initial_offset; /* Primary PSYNC offset. */
int repl_replica_lazy_flush; /* Lazy FLUSHALL before loading DB? */
/* Pseudo Replica */
int pseudo_replica; /* If true, server is a pseudo replica. */
/* Synchronous replication. */
list *clients_waiting_acks; /* Clients waiting in WAIT or WAITAOF. */
int get_ack_from_replicas; /* If true we send REPLCONF GETACK. */
Expand Down
74 changes: 74 additions & 0 deletions tests/unit/expire.tcl
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,80 @@ start_server {tags {"expire"}} {
close_replication_stream $repl
assert_equal [r debug set-active-expire 1] {OK}
} {} {needs:debug}

test {Pseudo-replica mode should forbid active expiration} {
r flushall

r config set pseudo-replica yes
assert_equal [r replconf pseudo-master 1] {OK}

r set foo1 bar PX 1
r set foo2 bar PX 1
after 100

assert_equal [r dbsize] {2}

assert_equal [r replconf pseudo-master 0] {OK}
r config set pseudo-replica no

# Verify all keys have expired
wait_for_condition 40 100 {
[r dbsize] eq 0
} else {
fail "Keys did not actively expire."
}
}

test {Pseudo-replica mode should forbid lazy expiration} {
r flushall
r debug set-active-expire 0

r config set pseudo-replica yes
assert_equal [r replconf pseudo-master 1] {OK}

r set foo1 1 PX 1
after 10

r get foo1
assert_equal [r dbsize] {1}

assert_equal [r replconf pseudo-master 0] {OK}
r config set pseudo-replica no

r get foo1

assert_equal [r dbsize] {0}

assert_equal [r debug set-active-expire 1] {OK}
} {} {needs:debug}

test {RANDOMKEY can return expired key in Pseudo-replica mode} {
r flushall

r config set pseudo-replica yes
assert_equal [r replconf pseudo-master 1] {OK}

r set foo1 bar PX 1
after 10

set client [valkey [srv "host"] [srv "port"] 0 $::tls]
if {!$::singledb} {
$client select 9
}
assert_equal [$client ttl foo1] {-2}

assert_equal [r randomkey] {foo1}

assert_equal [r replconf pseudo-master 0] {OK}
r config set pseudo-replica no

# Verify all keys have expired
wait_for_condition 40 100 {
[r dbsize] eq 0
} else {
fail "Keys did not actively expire."
}
}
}

start_cluster 1 0 {tags {"expire external:skip cluster"}} {
Expand Down
18 changes: 18 additions & 0 deletions tests/unit/maxmemory.tcl
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -611,3 +611,21 @@ start_server {tags {"maxmemory" "external:skip"}} {
assert {[r object freq foo] == 5}
}
}

start_server {tags {"maxmemory" "external:skip"}} {
test {Pseudo-replica mode should forbid eviction} {
r set key val
r config set pseudo-replica yes
assert_equal [r replconf pseudo-master 1] {OK}
r config set maxmemory-policy allkeys-lru
r config set maxmemory 1

assert_equal [r dbsize] {1}
assert_error {OOM command not allowed*} {r set key1 val1}

assert_equal [r replconf pseudo-master 0] {OK}
r config set pseudo-replica no

assert_equal [r dbsize] {0}
}
}
7 changes: 7 additions & 0 deletions valkey.conf
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,13 @@ replica-priority 100
#
# replica-ignore-disk-write-errors no

# Make the master behave like a replica, which forbids expiration and evcition.
# This is useful for sync tools, because expiration and evcition may cause the data corruption.
# Sync tools can set their connections into 'pseudo-master' state by REPLCONF PSEUDO-MASTER to
# behave like a master(i.e. visit expired keys).
#
# pseudo-replica no

# -----------------------------------------------------------------------------
# By default, Sentinel includes all replicas in its reports. A replica
# can be excluded from Sentinel's announcements. An unannounced replica
Expand Down

0 comments on commit 52be593

Please sign in to comment.