diff --git a/.gitignore b/.gitignore index 920e32eca7..0fd81349cf 100644 --- a/.gitignore +++ b/.gitignore @@ -41,3 +41,4 @@ Makefile.dep compile_commands.json redis.code-workspace .cache +.cscope.* diff --git a/src/cluster.h b/src/cluster.h index 463b4940d9..ece4b33b2a 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -103,6 +103,7 @@ char *clusterNodeHostname(clusterNode *node); const char *clusterNodePreferredEndpoint(clusterNode *n); long long clusterNodeReplOffset(clusterNode *node); clusterNode *clusterLookupNode(const char *name, int length); +void clusterReplicateOpenSlots(void); /* functions with shared implementations */ clusterNode *getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int *hashslot, int *ask); diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 5874433d7d..db42d0356b 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -68,7 +68,7 @@ int clusterDelSlot(int slot); int clusterMoveNodeSlots(clusterNode *from_node, clusterNode *to_node); int clusterDelNodeSlots(clusterNode *node); int clusterNodeSetSlotBit(clusterNode *n, int slot); -void clusterSetMaster(clusterNode *n); +void clusterSetMaster(clusterNode *n, int closeSlots); void clusterHandleSlaveFailover(void); void clusterHandleSlaveMigration(int max_slaves); int bitmapTestBit(unsigned char *bitmap, int pos); @@ -942,6 +942,14 @@ static void updateShardId(clusterNode *node, const char *shard_id) { } } +static int areInSameShard(clusterNode *node1, clusterNode *node2) { + return memcmp(node1->shard_id, node2->shard_id, CLUSTER_NAMELEN) == 0; +} + +static uint64_t nodeEpoch(clusterNode *n) { + return n->slaveof ? n->slaveof->configEpoch : n->configEpoch; +} + /* Update my hostname based on server configuration values */ void clusterUpdateMyselfHostname(void) { if (!myself) return; @@ -1603,8 +1611,10 @@ void clusterRenameNode(clusterNode *node, char *newname) { int retval; sds s = sdsnewlen(node->name, CLUSTER_NAMELEN); - serverLog(LL_DEBUG,"Renaming node %.40s into %.40s", - node->name, newname); + serverLog(LL_DEBUG,"Renaming node %.40s (%s) into %.40s", + node->name, + node->human_nodename, + newname); retval = dictDelete(server.cluster->nodes, s); sdsfree(s); serverAssert(retval == DICT_OK); @@ -2355,19 +2365,27 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc continue; } - /* The slot is in importing state, it should be modified only - * manually via redis-cli (example: a resharding is in progress - * and the migrating side slot was already closed and is advertising - * a new config. We still want the slot to be closed manually). */ - if (server.cluster->importing_slots_from[j]) continue; - - /* We rebind the slot to the new node claiming it if: - * 1) The slot was unassigned or the previous owner no longer owns the slot or - * the new node claims it with a greater configEpoch. - * 2) We are not currently importing the slot. */ + /* We rebind the slot to the new node claiming it if + * the slot was unassigned or the new node claims it with a + * greater configEpoch. */ if (isSlotUnclaimed(j) || server.cluster->slots[j]->configEpoch < senderConfigEpoch) { + if (!isSlotUnclaimed(j) && + !areInSameShard(server.cluster->slots[j], sender)) + { + serverLog(LL_NOTICE, + "Slot %d is migrated from node %.40s (%s) in shard %.40s" + " to node %.40s (%s) in shard %.40s.", + j, + server.cluster->slots[j]->name, + server.cluster->slots[j]->human_nodename, + server.cluster->slots[j]->shard_id, + sender->name, + sender->human_nodename, + sender->shard_id); + } + /* Was this slot mine, and still contains keys? Mark it as * a dirty slot. */ if (server.cluster->slots[j] == myself && @@ -2382,21 +2400,144 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc newmaster = sender; migrated_our_slots++; } + + /* If the sender who claims this slot is not in the same shard, + * it must be a result of deliberate operator actions. Therefore, + * we should honor it and clear the outstanding migrating_slots_to + * state for the slot. Otherwise, we are looking at a failover within + * the same shard and we should retain the migrating_slots_to state + * for the slot in question */ + if (server.cluster->migrating_slots_to[j] != NULL) { + if (!areInSameShard(sender, myself)) { + serverLog(LL_NOTICE, + "Slot %d is no longer being migrated to node %.40s (%s) in shard %.40s.", + j, + server.cluster->migrating_slots_to[j]->name, + server.cluster->migrating_slots_to[j]->human_nodename, + server.cluster->migrating_slots_to[j]->shard_id); + server.cluster->migrating_slots_to[j] = NULL; + } + } + + /* Handle the case where we are importing this slot and the ownership changes */ + if (server.cluster->importing_slots_from[j] != NULL && + server.cluster->importing_slots_from[j] != sender) + { + /* Update importing_slots_from to point to the sender, if it is in the + * same shard as the previous slot owner */ + if (areInSameShard(sender, server.cluster->importing_slots_from[j])) { + serverLog(LL_NOTICE, + "Failover occurred in migration source. Update importing " + "source for slot %d to node %.40s (%s) in shard %.40s.", + j, + sender->name, + sender->human_nodename, + sender->shard_id); + server.cluster->importing_slots_from[j] = sender; + } else { + /* If the sender is from a different shard, it must be a result + * of deliberate operator actions. We should clear the importing + * state to conform to the operator's will. */ + serverLog(LL_NOTICE, + "Slot %d is not longer being imported from node %.40s (%s) in shard %.40s.", + j, + server.cluster->importing_slots_from[j]->name, + server.cluster->importing_slots_from[j]->human_nodename, + server.cluster->importing_slots_from[j]->shard_id); + server.cluster->importing_slots_from[j] = NULL; + } + } + clusterDelSlot(j); clusterAddSlot(sender,j); + bitmapClearBit(server.cluster->owner_not_claiming_slot, j); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| CLUSTER_TODO_UPDATE_STATE| CLUSTER_TODO_FSYNC_CONFIG); } - } else if (server.cluster->slots[j] == sender) { - /* The slot is currently bound to the sender but the sender is no longer - * claiming it. We don't want to unbind the slot yet as it can cause the cluster - * to move to FAIL state and also throw client error. Keeping the slot bound to - * the previous owner will cause a few client side redirects, but won't throw - * any errors. We will keep track of the uncertainty in ownership to avoid - * propagating misinformation about this slot's ownership using UPDATE - * messages. */ - bitmapSetBit(server.cluster->owner_not_claiming_slot, j); + } else { + if (server.cluster->slots[j] == sender) { + /* The slot is currently bound to the sender but the sender is no longer + * claiming it. We don't want to unbind the slot yet as it can cause the cluster + * to move to FAIL state and also throw client error. Keeping the slot bound to + * the previous owner will cause a few client side redirects, but won't throw + * any errors. We will keep track of the uncertainty in ownership to avoid + * propagating misinformation about this slot's ownership using UPDATE + * messages. */ + bitmapSetBit(server.cluster->owner_not_claiming_slot, j); + } + + /* If the sender doesn't claim the slot, check if we are migrating + * any slot to its shard and if there is a primaryship change in + * the shard. Update the migrating_slots_to state to point to the + * sender if it has just taken over the primary role. */ + if (server.cluster->migrating_slots_to[j] != NULL && + server.cluster->migrating_slots_to[j] != sender && + (server.cluster->migrating_slots_to[j]->configEpoch < senderConfigEpoch || + nodeIsSlave(server.cluster->migrating_slots_to[j])) && + areInSameShard(server.cluster->migrating_slots_to[j], sender)) + { + serverLog(LL_NOTICE, + "Failover occurred in migration target." + " Slot %d is now being migrated to node %.40s (%s) in shard %.40s.", + j, + sender->name, + sender->human_nodename, + sender->shard_id); + server.cluster->migrating_slots_to[j] = sender; + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| + CLUSTER_TODO_UPDATE_STATE| + CLUSTER_TODO_FSYNC_CONFIG); + } + + /* If the sender is no longer the owner of the slot, and I am a primary + * and I am still in the process of importing the slot from the sender, + * there are two possibilities: + * + * 1. I could be a replica of the target primary and missed the slot + * finalization step on my primary due to my primary crashing during + * the slot migration process. + * 2. I could be the original primary and missed the slot finalization + * step entirely. + * + * To ensure complete slot coverage in either case, the following steps + * will be taken: + * + * 1. Remove the importing state for the specific slot. + * 2. Finalize the slot's ownership, if I am not already the owner of + * the slot. */ + if (nodeIsMaster(myself) && + server.cluster->importing_slots_from[j] == sender) + { + serverLog(LL_NOTICE, + "Slot %d is no longer being imported from node %.40s (%s) in shard %.40s;" + " Clear my importing source for the slot.", + j, + sender->name, + sender->human_nodename, + sender->shard_id); + server.cluster->importing_slots_from[j] = NULL; + /* Take over the slot ownership if I am not the owner yet*/ + if (server.cluster->slots[j] != myself) { + /* We intentionally avoid updating myself's configEpoch when + * taking ownership of this slot. This approach is effective + * in scenarios where my primary crashed during the slot + * finalization process. I became the new primary without + * inheriting the slot ownership, while the source shard + * continued and relinquished the slot. + * + * By not increasing myself's configEpoch, we ensure that + * if the slot is correctly migrated to another primary, I + * will not mistakenly claim ownership. Instead, any ownership + * conflicts will be resolved accurately based on configEpoch + * values. */ + clusterDelSlot(j); + clusterAddSlot(myself,j); + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| + CLUSTER_TODO_UPDATE_STATE| + CLUSTER_TODO_FSYNC_CONFIG); + } + } } } @@ -2406,40 +2547,66 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION) return; - /* If at least one slot was reassigned from a node to another node - * with a greater configEpoch, it is possible that: - * 1) We are a master left without slots. This means that we were - * failed over and we should turn into a replica of the new - * master. - * 2) We are a slave and our master is left without slots. We need - * to replicate to the new slots owner. */ - if (newmaster && curmaster->numslots == 0 && - (server.cluster_allow_replica_migration || - sender_slots == migrated_our_slots)) { - serverLog(LL_NOTICE, - "Configuration change detected. Reconfiguring myself " - "as a replica of %.40s (%s)", sender->name, sender->human_nodename); - clusterSetMaster(sender); - clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| - CLUSTER_TODO_UPDATE_STATE| - CLUSTER_TODO_FSYNC_CONFIG); - } else if (myself->slaveof && myself->slaveof->slaveof && - /* In some rare case when CLUSTER FAILOVER TAKEOVER is used, it - * can happen that myself is a replica of a replica of myself. If - * this happens, we do nothing to avoid a crash and wait for the - * admin to repair the cluster. */ - myself->slaveof->slaveof != myself) + /* Handle a special case where newmaster is not set but both sender + * and myself own no slots and in the same shard. Set the sender as + * the new primary if my current config epoch is lower than the + * sender's. */ + if (!newmaster && + myself->slaveof != sender && + sender_slots == 0 && + myself->numslots == 0 && + nodeEpoch(myself) < senderConfigEpoch && + areInSameShard(sender, myself)) { - /* Safeguard against sub-replicas. A replica's master can turn itself - * into a replica if its last slot is removed. If no other node takes - * over the slot, there is nothing else to trigger replica migration. */ - serverLog(LL_NOTICE, - "I'm a sub-replica! Reconfiguring myself as a replica of grandmaster %.40s (%s)", - myself->slaveof->slaveof->name, myself->slaveof->slaveof->human_nodename); - clusterSetMaster(myself->slaveof->slaveof); - clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| - CLUSTER_TODO_UPDATE_STATE| - CLUSTER_TODO_FSYNC_CONFIG); + newmaster = sender; + } + + /* If the shard to which this node (myself) belongs loses all of + * its slots, this node should become a replica of the sender if + * one of the following conditions is true: + * + * 1. cluster-allow-replication-migration is enabled + * 2. all the lost slots go to the sender and the sender belongs + * to this node's shard + * + * Note that case 2 can happen in one of the following scenarios: + * 1) we were a primary and the sender was a replica in the same + * shard but just became the primary after a failover + * 2) we were a replica and our primary lost all of its slots to + * the sender who was another replica in the same shard and has + * just become the primary after a failover + * + * It is also possible that the sender is a primary in a different + * shard and our primary just had its last slot migrated to the + * sender. In this case we don't reconfigure ourselves as a replica + * of the sender. */ + if (newmaster && curmaster->numslots == 0) { + if (server.cluster_allow_replica_migration || areInSameShard(sender, myself)) { + serverLog(LL_NOTICE, + "Configuration change detected. Reconfiguring myself " + "as a replica of node %.40s (%s) in shard %.40s", + sender->name, + sender->human_nodename, + sender->shard_id); + /* Don't clear the migrating/importing states if this is a replica that + * just gets promoted to the new primary in the shard. */ + clusterSetMaster(sender, !areInSameShard(sender, myself)); + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| + CLUSTER_TODO_UPDATE_STATE| + CLUSTER_TODO_FSYNC_CONFIG); + } else if ((sender_slots >= migrated_our_slots) && + !areInSameShard(sender, myself)) + { + /* When all our slots are lost to the sender and the sender belongs to + * a different shard, this is likely due to a client triggered slot + * migration. Don't reconfigure this node to migrate to the new shard + * in this case. */ + serverLog(LL_NOTICE, + "My last slot was migrated to node %.40s (%s) in shard %.40s. I am now an empty master.", + sender->name, + sender->human_nodename, + sender->shard_id); + } } else if (dirty_slots_count) { /* If we are here, we received an update message which removed * ownership for certain slots we still have keys about, but still @@ -2448,8 +2615,16 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc * * In order to maintain a consistent state between keys and slots * we need to remove all the keys from the slots we lost. */ - for (j = 0; j < dirty_slots_count; j++) + for (int j = 0; j < dirty_slots_count; j++) { + serverLog(LL_WARNING, + "Deleting keys in dirty slot %d on node %.40s (%s) in shard %.40s", + dirty_slots[j], + myself->name, + myself->human_nodename, + myself->shard_id + ); delKeysInSlot(dirty_slots[j]); + } } } @@ -2922,8 +3097,11 @@ int clusterProcessPacket(clusterLink *link) { /* If the reply has a non matching node ID we * disconnect this node and set it as not having an associated * address. */ - serverLog(LL_DEBUG,"PONG contains mismatching sender ID. About node %.40s added %d ms ago, having flags %d", + serverLog(LL_DEBUG, + "PONG contains mismatching sender ID. About node %.40s (%s) in shard %.40s added %d ms ago, having flags %d", link->node->name, + link->node->human_nodename, + link->node->shard_id, (int)(now-(link->node->ctime)), link->node->flags); link->node->flags |= CLUSTER_NODE_NOADDR; @@ -3078,6 +3256,11 @@ int clusterProcessPacket(clusterLink *link) { if (sender_master) { dirty_slots = memcmp(sender_master->slots, hdr->myslots,sizeof(hdr->myslots)) != 0; + + /* Force dirty when sender is primary and owns no slots so that + * we have a chance to examine and repair slot migrating/importing + * states that involve empty shards. */ + dirty_slots |= nodeIsMaster(sender) && sender_master->numslots == 0; } } @@ -3087,6 +3270,56 @@ int clusterProcessPacket(clusterLink *link) { if (sender && clusterNodeIsMaster(sender) && dirty_slots) clusterUpdateSlotsConfigWith(sender,senderConfigEpoch,hdr->myslots); + /* Explicitly check for a replication loop before attempting the replication + * chain folding logic. + * + * In some rare case, slot config updates (via either PING/PONG or UPDATE) + * can be delivered out of order as illustrated below. + * + * 1. To keep the discussion simple, let's assume we have 2 shards, shard a + * and shard b. Let's also assume there are two slots in total with shard + * a owning slot 1 and shard b owning slot 2. + * 2. Shard a has two nodes: primary A and replica A*; shard b has primary + * B and replica B*. + * 3. A manual failover was initiated on A* and A* just wins the election. + * 4. A* announces to the world that it now owns slot 1 using PING messages. + * These PING messages are queued in the outgoing buffer to every other + * node in the cluster, namely, A, B, and B*. + * 5. Keep in mind that there is no ordering in the delivery of these PING + * messages. For the stale PING message to appear, we need the following + * events in the exact order as they are laid out. + * a. An old PING message before A* becomes the new primary is still queued + * in A*'s outgoing buffer to A. This later becomes the stale message, + * which says A* is a replica of A. It is followed by A*'s election + * winning announcement PING message. + * b. B or B* processes A's election winning announcement PING message + * and sets slots[1]=A*. + * c. A sends a PING message to B (or B*). Since A hasn't learnt that A* + * wins the election, it claims that it owns slot 1 but with a lower + * epoch than B has on slot 1. This leads to B sending an UPDATE to + * A directly saying A* is the new owner of slot 1 with a higher epoch. + * d. A receives the UPDATE from B and executes clusterUpdateSlotsConfigWith. + * A now realizes that it is a replica of A* hence setting myself->slaveof + * to A*. + * e. Finally, the pre-failover PING message queued up in A*'s outgoing + * buffer to A is delivered and processed, out of order though, to A. + * f. This stale PING message creates the replication loop */ + if (myself->slaveof && + myself->slaveof->slaveof && + myself->slaveof->slaveof != myself) { + /* Safeguard against sub-replicas. A replica's master can turn itself + * into a replica if its last slot is removed. If no other node takes + * over the slot, there is nothing else to trigger replica migration. */ + serverLog(LL_NOTICE, + "I'm a sub-replica! Reconfiguring myself as a replica of %.40s from %.40s", + myself->slaveof->slaveof->name, + myself->slaveof->name); + clusterSetMaster(myself->slaveof->slaveof, 1); + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| + CLUSTER_TODO_UPDATE_STATE| + CLUSTER_TODO_FSYNC_CONFIG); + } + /* 2) We also check for the reverse condition, that is, the sender * claims to serve slots we know are served by a master with a * greater configEpoch. If this happens we inform the sender. @@ -4220,13 +4453,11 @@ void clusterHandleSlaveFailover(void) { * 1) We are a slave. * 2) Our master is flagged as FAIL, or this is a manual failover. * 3) We don't have the no failover configuration set, and this is - * not a manual failover. - * 4) It is serving slots. */ + * not a manual failover. */ if (clusterNodeIsMaster(myself) || myself->slaveof == NULL || (!nodeFailed(myself->slaveof) && !manual_failover) || - (server.cluster_slave_no_failover && !manual_failover) || - myself->slaveof->numslots == 0) + (server.cluster_slave_no_failover && !manual_failover)) { /* There are no reasons to failover, so we set the reason why we * are returning without failing over to NONE. */ @@ -4471,9 +4702,11 @@ void clusterHandleSlaveMigration(int max_slaves) { (mstime()-target->orphaned_time) > CLUSTER_SLAVE_MIGRATION_DELAY && !(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER)) { - serverLog(LL_NOTICE,"Migrating to orphaned master %.40s", - target->name); - clusterSetMaster(target); + serverLog(LL_NOTICE,"Migrating to orphaned master %.40s (%s) in shard %.40s", + target->name, + target->human_nodename, + target->shard_id); + clusterSetMaster(target, 1); } } @@ -4844,7 +5077,7 @@ void clusterBeforeSleep(void) { if (flags & CLUSTER_TODO_HANDLE_MANUALFAILOVER) { /* Handle manual failover as soon as possible so that won't have a 100ms * as it was handled only in clusterCron */ - if(nodeIsSlave(myself)) { + if (nodeIsSlave(myself)) { clusterHandleManualFailover(); if (!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER)) clusterHandleSlaveFailover(); @@ -5133,14 +5366,9 @@ void clusterUpdateState(void) { /* This function is called after the node startup in order to verify that data * loaded from disk is in agreement with the cluster configuration: * - * 1) If we find keys about hash slots we have no responsibility for, the - * following happens: - * A) If no other node is in charge according to the current cluster - * configuration, we add these slots to our node. - * B) If according to our config other nodes are already in charge for - * this slots, we set the slots as IMPORTING from our point of view - * in order to justify we have those slots, and in order to make - * redis-cli aware of the issue, so that it can try to fix it. + * 1) If we find keys about hash slots we have no responsibility for and + * no other node is in charge according to the current cluster + * configuration, we add these slots to our node. * 2) If we find data in a DB different than DB0 we return C_ERR to * signal the caller it should quit the server with an error message * or take other actions. @@ -5185,16 +5413,27 @@ int verifyClusterConfigWithData(void) { * assigned to this slot. Fix this condition. */ update_config++; - /* Case A: slot is unassigned. Take responsibility for it. */ + /* slot is unassigned. Take responsibility for it. */ if (server.cluster->slots[j] == NULL) { serverLog(LL_NOTICE, "I have keys for unassigned slot %d. " "Taking responsibility for it.",j); clusterAddSlot(myself,j); - } else { - serverLog(LL_NOTICE, "I have keys for slot %d, but the slot is " - "assigned to another node. " - "Setting it to importing state.",j); - server.cluster->importing_slots_from[j] = server.cluster->slots[j]; + } else if (server.cluster->importing_slots_from[j] != server.cluster->slots[j]) { + if (server.cluster->importing_slots_from[j] == NULL) { + serverLog(LL_NOTICE, "I have keys for slot %d, but the slot is " + "assigned to another node. Deleting keys in the slot.", j); + } else { + serverLog(LL_NOTICE, "I am importing keys from node %.40s (%s) in shard %.40s to slot %d, " + "but the slot is now owned by node %.40s (%s) in shard %.40s. Deleting keys in the slot", + server.cluster->importing_slots_from[j]->name, + server.cluster->importing_slots_from[j]->human_nodename, + server.cluster->importing_slots_from[j]->shard_id, + j, + server.cluster->slots[j]->name, + server.cluster->slots[j]->human_nodename, + server.cluster->slots[j]->shard_id); + } + delKeysInSlot(j); } } if (update_config) clusterSaveConfigOrDie(1); @@ -5218,18 +5457,18 @@ static inline void removeAllNotOwnedShardChannelSubscriptions(void) { /* Set the specified node 'n' as master for this node. * If this node is currently a master, it is turned into a slave. */ -void clusterSetMaster(clusterNode *n) { +void clusterSetMaster(clusterNode *n, int closeSlots) { serverAssert(n != myself); serverAssert(myself->numslots == 0); if (clusterNodeIsMaster(myself)) { myself->flags &= ~(CLUSTER_NODE_MASTER|CLUSTER_NODE_MIGRATE_TO); myself->flags |= CLUSTER_NODE_SLAVE; - clusterCloseAllSlots(); } else { if (myself->slaveof) clusterNodeRemoveSlave(myself->slaveof,myself); } + if (closeSlots) clusterCloseAllSlots(); myself->slaveof = n; updateShardId(myself, n->shard_id); clusterNodeAddSlave(n,myself); @@ -5335,15 +5574,11 @@ sds clusterGenNodeDescription(client *c, clusterNode *node, int tls_primary) { else ci = sdscatlen(ci,"-",1); - unsigned long long nodeEpoch = node->configEpoch; - if (nodeIsSlave(node) && node->slaveof) { - nodeEpoch = node->slaveof->configEpoch; - } /* Latency from the POV of this node, config epoch, link status */ ci = sdscatfmt(ci," %I %I %U %s", (long long) node->ping_sent, (long long) node->pong_received, - nodeEpoch, + nodeEpoch(node), (node->link || node->flags & CLUSTER_NODE_MYSELF) ? "connected" : "disconnected"); @@ -5716,7 +5951,6 @@ sds genClusterInfoString(void) { sds info = sdsempty(); char *statestr[] = {"ok","fail"}; int slots_assigned = 0, slots_ok = 0, slots_pfail = 0, slots_fail = 0; - uint64_t myepoch; int j; for (j = 0; j < CLUSTER_SLOTS; j++) { @@ -5733,9 +5967,6 @@ sds genClusterInfoString(void) { } } - myepoch = (nodeIsSlave(myself) && myself->slaveof) ? - myself->slaveof->configEpoch : myself->configEpoch; - info = sdscatprintf(info, "cluster_state:%s\r\n" "cluster_slots_assigned:%d\r\n" @@ -5754,7 +5985,7 @@ sds genClusterInfoString(void) { dictSize(server.cluster->nodes), server.cluster->size, (unsigned long long) server.cluster->currentEpoch, - (unsigned long long) myepoch + (unsigned long long) nodeEpoch(myself) ); /* Show stats about messages sent and received. */ @@ -6084,7 +6315,8 @@ int clusterCommandSpecial(client *c) { int slot; clusterNode *n; - if (nodeIsSlave(myself)) { + /* Allow primaries to replicate "CLUSTER SETSLOT" */ + if (!(c->flags & CLIENT_MASTER) && nodeIsSlave(myself)) { addReplyError(c,"Please use SETSLOT only with masters."); return 1; } @@ -6092,7 +6324,8 @@ int clusterCommandSpecial(client *c) { if ((slot = getSlotOrReply(c, c->argv[2])) == -1) return 1; if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) { - if (server.cluster->slots[slot] != myself) { + /* Scope the check to primaries only */ + if (nodeIsMaster(myself) && server.cluster->slots[slot] != myself) { addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot); return 1; } @@ -6150,6 +6383,13 @@ int clusterCommandSpecial(client *c) { return 1; } } + + serverLog(LL_NOTICE, "Assigning slot %d to node %.40s (%s) in shard %.40s", + slot, + n->name, + n->human_nodename, + n->shard_id); + /* If this slot is in migrating status but we have no keys * for it assigning the slot to another node will clear * the migrating status. */ @@ -6168,41 +6408,128 @@ int clusterCommandSpecial(client *c) { myself->numslots == 0 && server.cluster_allow_replica_migration) { serverLog(LL_NOTICE, - "Configuration change detected. Reconfiguring myself " - "as a replica of %.40s (%s)", n->name, n->human_nodename); - clusterSetMaster(n); + "Lost my last slot during slot migration. Reconfiguring myself " + "as a replica of %.40s (%s) in shard %.40s", + n->name, + n->human_nodename, + n->shard_id); + clusterSetMaster(n, 1); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG); } - /* If this node was importing this slot, assigning the slot to - * itself also clears the importing status. */ - if (n == myself && - server.cluster->importing_slots_from[slot]) { - /* This slot was manually migrated, set this node configEpoch - * to a new epoch so that the new version can be propagated - * by the cluster. - * - * Note that if this ever results in a collision with another - * node getting the same configEpoch, for example because a - * failover happens at the same time we close the slot, the - * configEpoch collision resolution will fix it assigning - * a different epoch to each node. */ - if (clusterBumpConfigEpochWithoutConsensus() == C_OK) { - serverLog(LL_NOTICE, - "configEpoch updated after importing slot %d", slot); - } + /* If this node or this node's primary was importing this slot, + * assigning the slot to itself also clears the importing status. */ + if ((n == myself || n == myself->slaveof) && + server.cluster->importing_slots_from[slot]) + { server.cluster->importing_slots_from[slot] = NULL; - /* After importing this slot, let the other nodes know as - * soon as possible. */ - clusterBroadcastPong(CLUSTER_BROADCAST_ALL); + + /* Only primary broadcasts the updates */ + if (n == myself) { + /* This slot was manually migrated, set this node configEpoch + * to a new epoch so that the new version can be propagated + * by the cluster. + * + * Note that if this ever results in a collision with another + * node getting the same configEpoch, for example because a + * failover happens at the same time we close the slot, the + * configEpoch collision resolution will fix it assigning + * a different epoch to each node. */ + if (clusterBumpConfigEpochWithoutConsensus() == C_OK) { + serverLog(LL_NOTICE, + "ConfigEpoch updated after importing slot %d", + slot); + } + /* After importing this slot, let the other nodes know as + * soon as possible. */ + clusterBroadcastPong(CLUSTER_BROADCAST_ALL); + } } + } else if (!strcasecmp(c->argv[3]->ptr,"node") && + c->argc == 6 && + !strcasecmp(c->argv[5]->ptr,"replicaonly")) + { + /* CLUSTER SETSLOT NODE REPLICAONLY */ + + /* When finalizing the slot, there is a possibility that the + * target node B sends a cluster PONG to the source node A + * before SETSLOT has been replicated to B'. If B crashes here, + * B' will be in an importing state and the slot will have no + * owner. To help mitigate this issue, we added a new SETSLOT + * command variant that takes a special marker token called + * "REPLICAONLY". This command is a no-op on the primary. It + * simply replicates asynchronously the command without the + * "REPLICAONLY" marker to the replicas, if there exist any. + * The caller is expected to wait for this asynchronous + * replication to complete using the "WAIT" command. + * + * With the help of this command, we finalize the slots + * on the replicas before the primary in the following + * sequence, where A is the source primary and B is the target + * primary: + * + * 1. Client C issues SETSLOT n NODE B REPLICAONLY against + * node B + * 2. Node B replicates SETSLOT n NODE B to all of its replicas, + * such as B', B'', etc + * 3. Client C then issues WAIT for + * a number of B's replicas of C's choice to complete the + * finalization + * 4. On successful WAIT completion, Client C executes SETSLOT + * n NODE B against node B but without the "REPLICAONLY" + * marker this time, which completes the slot finalization + * on node B + * + * The following steps can happen in parallel: + * a. Client C issues SETSLOT n NODE B against node A + * b. Node B gossips its new slot ownership to the cluster, + * including A, A', etc */ + + n = clusterLookupNode(c->argv[4]->ptr, sdslen(c->argv[4]->ptr)); + + if (!n) { + addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[4]->ptr); + return 1; + } + if (nodeIsSlave(n)) { + addReplyError(c,"Target node is not a master"); + return 1; + } + /* If this hash slot was served by 'myself' before to switch + * make sure there are no longer local keys for this hash slot. */ + if (server.cluster->slots[slot] == myself && n != myself) { + if (countKeysInSlot(slot) != 0) { + addReplyErrorFormat(c, + "Can't assign hashslot %d to a different node " + "while I still hold keys for this hash slot.", slot); + return 1; + } + } + if (server.cluster->importing_slots_from[slot] == NULL) { + addReplyError(c,"Slot is not open for importing"); + return 1; + } + if (myself->numslaves == 0) { + addReplyError(c,"Target node has no replicas"); + return 1; + } + + /* Remove the last "REPLICAONLY" token so the command + * can be applied as the real "SETSLOT" command on the + * replicas. */ + serverAssert(c->argc == 6); + rewriteClientCommandVector(c, 5, c->argv[0], c->argv[1], c->argv[2], c->argv[3], c->argv[4]); } else { addReplyError(c, "Invalid CLUSTER SETSLOT action or number of arguments. Try CLUSTER HELP"); return 1; } + + /* Force-replicate "CLUSTER SETSLOT" */ + if (nodeIsMaster(myself)) forceCommandPropagation(c, PROPAGATE_REPL); + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"bumpepoch") && c->argc == 2) { @@ -6276,7 +6603,7 @@ int clusterCommandSpecial(client *c) { } /* Set the master. */ - clusterSetMaster(n); + clusterSetMaster(n, 1); clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"count-failure-reports") && @@ -6517,3 +6844,55 @@ int clusterAllowFailoverCmd(client *c) { void clusterPromoteSelfToMaster(void) { replicationUnsetMaster(); } + +/* Replicate all migrating and importing slot states to a + * a given replica client. */ +void clusterReplicateOpenSlots(void) +{ + if (!server.cluster_enabled) return; + + int argc = 5; + robj **argv = zmalloc(sizeof(robj*)*argc); + + sds cmd = sdsnew("CLUSTER"); + sds subcmd = sdsnew("SETSLOT"); + sds importing = sdsnew("IMPORTING"); + sds migrating = sdsnew("MIGRATING"); + robj *cmd_obj = createObject(OBJ_STRING, cmd); + robj *subcmd_obj = createObject(OBJ_STRING, subcmd); + robj *imp_obj = createObject(OBJ_STRING, importing); + robj *mig_obj = createObject(OBJ_STRING, migrating); + + argv[0] = cmd_obj; + argv[1] = subcmd_obj; + + for (int i = 0; i < 2; i++) { + clusterNode **nodes_ptr = NULL; + if (i == 0) { + nodes_ptr = server.cluster->importing_slots_from; + argv[3] = imp_obj; + } else { + nodes_ptr = server.cluster->migrating_slots_to; + argv[3] = mig_obj; + } + + for (int j = 0; j < CLUSTER_SLOTS; j++) { + if (nodes_ptr[j] == NULL) continue; + + argv[2] = createStringObjectFromLongLongForValue(j); + sds name = sdsnewlen(nodes_ptr[j]->name, sizeof(nodes_ptr[j]->name)); + argv[4] = createObject(OBJ_STRING, name); + + replicationFeedSlaves(0, argv, argc); + + decrRefCount(argv[2]); + decrRefCount(argv[4]); + } + } + + decrRefCount(mig_obj); + decrRefCount(imp_obj); + decrRefCount(subcmd_obj); + decrRefCount(cmd_obj); + zfree(argv); +} diff --git a/src/cluster_legacy.h b/src/cluster_legacy.h index a857184ab3..f29a324b7f 100644 --- a/src/cluster_legacy.h +++ b/src/cluster_legacy.h @@ -53,6 +53,7 @@ typedef struct clusterLink { #define CLUSTER_NODE_NOFAILOVER 512 /* Slave will not try to failover. */ #define CLUSTER_NODE_NULL_NAME "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" +#define nodeIsMaster(n) ((n)->flags & CLUSTER_NODE_MASTER) #define nodeIsSlave(n) ((n)->flags & CLUSTER_NODE_SLAVE) #define nodeInHandshake(n) ((n)->flags & CLUSTER_NODE_HANDSHAKE) #define nodeHasAddr(n) (!((n)->flags & CLUSTER_NODE_NOADDR)) diff --git a/src/commands/cluster-setslot.json b/src/commands/cluster-setslot.json index d0d48193d3..7a60d23468 100644 --- a/src/commands/cluster-setslot.json +++ b/src/commands/cluster-setslot.json @@ -10,7 +10,8 @@ "command_flags": [ "NO_ASYNC_LOADING", "ADMIN", - "STALE" + "STALE", + "MAY_REPLICATE" ], "arguments": [ { diff --git a/src/debug.c b/src/debug.c index 02133c5c5d..4274c2c9d4 100644 --- a/src/debug.c +++ b/src/debug.c @@ -873,8 +873,7 @@ NULL server.aof_flush_sleep = atoi(c->argv[2]->ptr); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc >= 3) { - replicationFeedSlaves(server.slaves, -1, - c->argv + 2, c->argc - 2); + replicationFeedSlaves(-1, c->argv + 2, c->argc - 2); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"error") && c->argc == 3) { sds errstr = sdsnewlen("-",1); diff --git a/src/rdb.c b/src/rdb.c index f35a41d8ee..bd4427bd54 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -3310,7 +3310,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin robj *argv[2]; argv[0] = server.lazyfree_lazy_expire ? shared.unlink : shared.del; argv[1] = &keyobj; - replicationFeedSlaves(server.slaves,dbid,argv,2); + replicationFeedSlaves(dbid,argv,2); } sdsfree(key); decrRefCount(val); diff --git a/src/replication.c b/src/replication.c index 5fb477cd8b..62e40b433e 100644 --- a/src/replication.c +++ b/src/replication.c @@ -434,7 +434,7 @@ void feedReplicationBuffer(char *s, size_t len) { * received by our clients in order to create the replication stream. * Instead if the instance is a replica and has sub-replicas attached, we use * replicationFeedStreamFromMasterStream() */ -void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { +void replicationFeedSlaves(int dictid, robj **argv, int argc) { int j, len; char llstr[LONG_STR_SIZE]; @@ -451,7 +451,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { /* If there aren't slaves, and there is no backlog buffer to populate, * we can return ASAP. */ - if (server.repl_backlog == NULL && listLength(slaves) == 0) { + if (server.repl_backlog == NULL && listLength(server.slaves) == 0) { /* We increment the repl_offset anyway, since we use that for tracking AOF fsyncs * even when there's no replication active. This code will not be reached if AOF * is also disabled. */ @@ -1313,6 +1313,9 @@ int replicaPutOnline(client *slave) { NULL); serverLog(LL_NOTICE,"Synchronization with replica %s succeeded", replicationGetSlaveName(slave)); + + /* Replicate slot being migrated/imported to the new replica */ + clusterReplicateOpenSlots(); return 1; } @@ -3788,8 +3791,7 @@ void replicationCron(void) { if (!manual_failover_in_progress) { ping_argv[0] = shared.ping; - replicationFeedSlaves(server.slaves, -1, - ping_argv, 1); + replicationFeedSlaves(-1, ping_argv, 1); } } diff --git a/src/server.c b/src/server.c index aa459994bb..dfab83dab9 100644 --- a/src/server.c +++ b/src/server.c @@ -1612,7 +1612,7 @@ static void sendGetackToReplicas(void) { argv[0] = shared.replconf; argv[1] = shared.getack; argv[2] = shared.special_asterick; /* Not used argument. */ - replicationFeedSlaves(server.slaves, -1, argv, 3); + replicationFeedSlaves(-1, argv, 3); } extern int ProcessingEventsWhileBlocked; @@ -3296,7 +3296,7 @@ static void propagateNow(int dbid, robj **argv, int argc, int target) { if (server.aof_state != AOF_OFF && target & PROPAGATE_AOF) feedAppendOnlyFile(dbid,argv,argc); if (target & PROPAGATE_REPL) - replicationFeedSlaves(server.slaves,dbid,argv,argc); + replicationFeedSlaves(dbid,argv,argc); } /* Used inside commands to schedule the propagation of additional commands diff --git a/src/server.h b/src/server.h index 0ca24e3c92..86b1036f9f 100644 --- a/src/server.h +++ b/src/server.h @@ -2816,7 +2816,7 @@ ssize_t syncRead(int fd, char *ptr, ssize_t size, long long timeout); ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout); /* Replication */ -void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc); +void replicationFeedSlaves(int dictid, robj **argv, int argc); void replicationFeedStreamFromMasterStream(char *buf, size_t buflen); void resetReplicationBuffer(void); void feedReplicationBuffer(char *buf, size_t len); diff --git a/src/valkey-cli.c b/src/valkey-cli.c index ba16d03fa7..ac163c8e5b 100644 --- a/src/valkey-cli.c +++ b/src/valkey-cli.c @@ -4762,6 +4762,39 @@ static clusterManagerNode *clusterManagerGetSlotOwner(clusterManagerNode *n, return owner; } +static void clusterManagerSetSlotNodeReplicaOnly(clusterManagerNode *node1, + clusterManagerNode *node2, + int slot) { + /* A new CLUSTER SETSLOT variant that finalizes slot ownership on replicas + * only (CLUSTER SETSLOT s NODE n REPLICAONLY) is introduced in Redis 8.0+ + * to help mitigate the single-point-of-failure issue related to the slot + * ownership finalization on HA clusters. We make a best-effort attempt below + * to utilize this enhanced reliability. Regardless of the result, we continue + * with finalizing slot ownership on the primary nodes. Note that this command + * is not essential. Redis 8.0+ will attempt to recover from failed slot + * ownership finalizations if they occur, although there may be a brief period + * where slots caught in this transition stage are unavailable. Including this + * additional step ensures no downtime for these slots if any failures arise. */ + redisReply *reply = CLUSTER_MANAGER_COMMAND(node1, "CLUSTER SETSLOT %d NODE %s REPLICAONLY", + slot, (char *) node2->name); + if (reply->type == REDIS_REPLY_ERROR) { + /* Either target Redis server doesn't support this command or the slot is + * not in the right state*/ + clusterManagerLogWarn("*** Failed to finalize slot on replicas: %s\n", reply->str); + freeReplyObject(reply); + return; + } + freeReplyObject(reply); + clusterManagerLogInfo(">>> Waiting for %d replicas to complete slot finalization\n", node1->replicas_count); + reply = CLUSTER_MANAGER_COMMAND(node1, "WAIT %d 1000", node1->replicas_count); + if (reply->type == REDIS_REPLY_ERROR) { + clusterManagerLogWarn("*** Failed to wait for slot finalization on replicas: %s\n", reply->str); + } else { + clusterManagerLogInfo(">>> %d replicas completed slot finalization in time\n", reply->integer); + } + freeReplyObject(reply); +} + /* Set slot status to "importing" or "migrating" */ static int clusterManagerSetSlot(clusterManagerNode *node1, clusterManagerNode *node2, @@ -5233,6 +5266,7 @@ static int clusterManagerMoveSlot(clusterManagerNode *source, * If we inform any other node first, it can happen that the target node * crashes before it is set as the new owner and then the slot is left * without an owner which results in redirect loops. See issue #7116. */ + clusterManagerSetSlotNodeReplicaOnly(target, target, slot); success = clusterManagerSetSlot(target, target, slot, "node", err); if (!success) return 0; diff --git a/tests/cluster/tests/20-half-migrated-slot.tcl b/tests/cluster/tests/20-half-migrated-slot.tcl index 229b3a86df..481b588696 100644 --- a/tests/cluster/tests/20-half-migrated-slot.tcl +++ b/tests/cluster/tests/20-half-migrated-slot.tcl @@ -5,10 +5,6 @@ # 4. migration is half finished on "migrating" node # 5. migration is half finished on "importing" node -# TODO: Test is currently disabled until it is stabilized (fixing the test -# itself or real issues in Redis). - -if {false} { source "../tests/includes/init-tests.tcl" source "../tests/includes/utils.tcl" @@ -95,4 +91,3 @@ test "Half-finish importing" { } config_set_all_nodes cluster-allow-replica-migration yes -} diff --git a/tests/cluster/tests/21-many-slot-migration.tcl b/tests/cluster/tests/21-many-slot-migration.tcl index 1ac73dc997..589ab00e69 100644 --- a/tests/cluster/tests/21-many-slot-migration.tcl +++ b/tests/cluster/tests/21-many-slot-migration.tcl @@ -1,10 +1,5 @@ # Tests for many simultaneous migrations. -# TODO: Test is currently disabled until it is stabilized (fixing the test -# itself or real issues in Redis). - -if {false} { - source "../tests/includes/init-tests.tcl" source "../tests/includes/utils.tcl" @@ -61,4 +56,3 @@ test "Keys are accessible" { } config_set_all_nodes cluster-allow-replica-migration yes -} diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index 6d9ca6299a..bab4af0cdd 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -105,6 +105,7 @@ set ::all_tests { unit/cluster/links unit/cluster/cluster-response-tls unit/cluster/failure-marking + unit/cluster/slot-migration } # Index to the next test to run in the ::all_tests list. set ::next_test 0 diff --git a/tests/unit/cluster/cli.tcl b/tests/unit/cluster/cli.tcl index 734dd19c9f..019c51853f 100644 --- a/tests/unit/cluster/cli.tcl +++ b/tests/unit/cluster/cli.tcl @@ -317,13 +317,10 @@ test {Migrate the last slot away from a node using valkey-cli} { catch { $newnode_r get foo } e assert_equal "MOVED $slot $owner_host:$owner_port" $e - # Check that the empty node has turned itself into a replica of the new - # owner and that the new owner knows that. - wait_for_condition 1000 50 { - [string match "*slave*" [$owner_r CLUSTER REPLICAS $owner_id]] - } else { - fail "Empty node didn't turn itself into a replica." - } + # Check that the now empty primary node doesn't turn itself into + # a replica of any other nodes + wait_for_cluster_propagation + assert_match *master* [$owner_r role] } } diff --git a/tests/unit/cluster/slot-migration.tcl b/tests/unit/cluster/slot-migration.tcl new file mode 100644 index 0000000000..67d0731d7b --- /dev/null +++ b/tests/unit/cluster/slot-migration.tcl @@ -0,0 +1,352 @@ +proc get_open_slots {srv_idx} { + foreach line [split [R $srv_idx cluster nodes] "\n"] { + set line [string trim $line] + if {$line eq {}} continue + if {[regexp {myself} $line] == 0} continue + set slots {} + regexp {\[.*} $line slots + return $slots + } +} + +proc get_cluster_role {srv_idx} { + foreach line [split [R $srv_idx cluster nodes] "\n"] { + set line [string trim $line] + if {$line eq {}} continue + if {[regexp {myself} $line] == 0} continue + set role {} + regexp {myself,(\w+)} $line -> role + return $role + } +} + +proc wait_for_role {srv_idx role} { + set node_timeout [lindex [R 0 config get cluster-node-timeout] 1] + # wait for a gossip cycle for states to be propagated throughout the cluster + after $node_timeout + wait_for_condition 100 100 { + [lindex [split [R $srv_idx role] " "] 0] eq $role + } else { + fail "R $srv_idx didn't assume the replication $role in time" + } + wait_for_condition 100 100 { + [get_cluster_role $srv_idx] eq $role + } else { + fail "R $srv_idx didn't assume the cluster $role in time" + } +} + +proc wait_for_slot_state {srv_idx pattern} { + wait_for_condition 100 100 { + [get_open_slots $srv_idx] eq $pattern + } else { + fail "incorrect slot state on R $srv_idx" + } +} + +start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-allow-replica-migration no cluster-node-timeout 1000} } { + + set node_timeout [lindex [R 0 config get cluster-node-timeout] 1] + set R0_id [R 0 cluster myid] + set R1_id [R 1 cluster myid] + set R2_id [R 2 cluster myid] + set R3_id [R 3 cluster myid] + set R4_id [R 4 cluster myid] + set R5_id [R 5 cluster myid] + + test "Slot migration states are replicated" { + # Validate initial states + assert_not_equal [get_open_slots 0] "\[609->-$R1_id\]" + assert_not_equal [get_open_slots 1] "\[609-<-$R0_id\]" + assert_not_equal [get_open_slots 3] "\[609->-$R1_id\]" + assert_not_equal [get_open_slots 4] "\[609-<-$R0_id\]" + # Kick off the migration of slot 609 from R0 to R1 + assert_equal {OK} [R 0 cluster setslot 609 migrating $R1_id] + assert_equal {OK} [R 1 cluster setslot 609 importing $R0_id] + # Validate that R0 is migrating slot 609 to R1 + assert_equal [get_open_slots 0] "\[609->-$R1_id\]" + # Validate that R1 is importing slot 609 from R0 + assert_equal [get_open_slots 1] "\[609-<-$R0_id\]" + # Validate final states + wait_for_slot_state 0 "\[609->-$R1_id\]" + wait_for_slot_state 1 "\[609-<-$R0_id\]" + wait_for_slot_state 3 "\[609->-$R1_id\]" + wait_for_slot_state 4 "\[609-<-$R0_id\]" + } + + test "Migration target is auto-updated after failover in target shard" { + # Restart R1 to trigger an auto-failover to R4 + # Make sure wait for twice the node timeout time + # to ensure the failover does occur + catch {R 1 debug restart [expr 2*$node_timeout]} e + catch {I/O error reading reply} $e + # Wait for R1 to come back + after [expr 3*$node_timeout] + # Wait for R1 to become a replica + wait_for_role 1 slave + # Validate final states + wait_for_slot_state 0 "\[609->-$R4_id\]" + wait_for_slot_state 1 "\[609-<-$R0_id\]" + wait_for_slot_state 3 "\[609->-$R4_id\]" + wait_for_slot_state 4 "\[609-<-$R0_id\]" + # Restore R1's primaryship + assert_equal {OK} [R 1 cluster failover] + wait_for_role 1 master + # Validate initial states + wait_for_slot_state 0 "\[609->-$R1_id\]" + wait_for_slot_state 1 "\[609-<-$R0_id\]" + wait_for_slot_state 3 "\[609->-$R1_id\]" + wait_for_slot_state 4 "\[609-<-$R0_id\]" + } + + test "Migration source is auto-updated after failover in source shard" { + # Restart R0 to trigger an auto-failover to R3 + # Make sure wait for twice the node timeout time + # to ensure the failover does occur + catch {R 0 debug restart [expr 2*$node_timeout]} e + catch {I/O error reading reply} $e + # Wait for R0 to come back + after [expr 3*$node_timeout] + # Wait for R0 to become a replica + wait_for_role 0 slave + # Validate final states + wait_for_slot_state 0 "\[609->-$R1_id\]" + wait_for_slot_state 1 "\[609-<-$R3_id\]" + wait_for_slot_state 3 "\[609->-$R1_id\]" + wait_for_slot_state 4 "\[609-<-$R3_id\]" + # Restore R0's primaryship + assert_equal {OK} [R 0 cluster failover] + wait_for_role 0 master + # Validate final states + wait_for_slot_state 0 "\[609->-$R1_id\]" + wait_for_slot_state 1 "\[609-<-$R0_id\]" + wait_for_slot_state 3 "\[609->-$R1_id\]" + wait_for_slot_state 4 "\[609-<-$R0_id\]" + } + + test "Replica redirects key access in migrating slots" { + # Validate initial states + assert_equal [get_open_slots 0] "\[609->-$R1_id\]" + assert_equal [get_open_slots 1] "\[609-<-$R0_id\]" + assert_equal [get_open_slots 3] "\[609->-$R1_id\]" + assert_equal [get_open_slots 4] "\[609-<-$R0_id\]" + catch {[R 3 get aga]} e + assert_equal {MOVED} [lindex [split $e] 0] + assert_equal {609} [lindex [split $e] 1] + } + + test "New replica inherits migrating slot" { + # Reset R3 to turn it into an empty node + assert_equal [get_open_slots 3] "\[609->-$R1_id\]" + assert_equal {OK} [R 3 cluster reset] + assert_not_equal [get_open_slots 3] "\[609->-$R1_id\]" + # Add R3 back as a replica of R0 + assert_equal {OK} [R 3 cluster meet [srv 0 "host"] [srv 0 "port"]] + after $node_timeout + assert_equal {OK} [R 3 cluster replicate $R0_id] + wait_for_role 3 slave + # Validate that R3 now sees slot 609 open + assert_equal [get_open_slots 3] "\[609->-$R1_id\]" + } + + test "New replica inherits importing slot" { + # Reset R4 to turn it into an empty node + assert_equal [get_open_slots 4] "\[609-<-$R0_id\]" + assert_equal {OK} [R 4 cluster reset] + assert_not_equal [get_open_slots 4] "\[609-<-$R0_id\]" + # Add R4 back as a replica of R1 + assert_equal {OK} [R 4 cluster meet [srv -1 "host"] [srv -1 "port"]] + after $node_timeout + assert_equal {OK} [R 4 cluster replicate $R1_id] + wait_for_role 4 slave + # Validate that R4 now sees slot 609 open + assert_equal [get_open_slots 4] "\[609-<-$R0_id\]" + } +} + +proc create_empty_shard {p r} { + set node_timeout [lindex [R 0 config get cluster-node-timeout] 1] + assert_equal {OK} [R $p cluster reset] + assert_equal {OK} [R $r cluster reset] + assert_equal {OK} [R $p cluster meet [srv 0 "host"] [srv 0 "port"]] + assert_equal {OK} [R $r cluster meet [srv 0 "host"] [srv 0 "port"]] + after $node_timeout + assert_equal {OK} [R $r cluster replicate [R $p cluster myid]] + wait_for_role $r slave + wait_for_role $p master +} + +start_cluster 3 5 {tags {external:skip cluster} overrides {cluster-allow-replica-migration no cluster-node-timeout 1000} } { + + set node_timeout [lindex [R 0 config get cluster-node-timeout] 1] + set R0_id [R 0 cluster myid] + set R1_id [R 1 cluster myid] + set R2_id [R 2 cluster myid] + set R3_id [R 3 cluster myid] + set R4_id [R 4 cluster myid] + set R5_id [R 5 cluster myid] + + create_empty_shard 6 7 + set R6_id [R 6 cluster myid] + set R7_id [R 7 cluster myid] + + test "Empty-shard migration replicates slot importing states" { + # Validate initial states + assert_not_equal [get_open_slots 0] "\[609->-$R6_id\]" + assert_not_equal [get_open_slots 6] "\[609-<-$R0_id\]" + assert_not_equal [get_open_slots 3] "\[609->-$R6_id\]" + assert_not_equal [get_open_slots 7] "\[609-<-$R0_id\]" + # Kick off the migration of slot 609 from R0 to R6 + assert_equal {OK} [R 0 cluster setslot 609 migrating $R6_id] + assert_equal {OK} [R 6 cluster setslot 609 importing $R0_id] + # Validate that R0 is migrating slot 609 to R6 + assert_equal [get_open_slots 0] "\[609->-$R6_id\]" + # Validate that R6 is importing slot 609 from R0 + assert_equal [get_open_slots 6] "\[609-<-$R0_id\]" + # Validate final states + wait_for_slot_state 0 "\[609->-$R6_id\]" + wait_for_slot_state 6 "\[609-<-$R0_id\]" + wait_for_slot_state 3 "\[609->-$R6_id\]" + wait_for_slot_state 7 "\[609-<-$R0_id\]" + } + + test "Empty-shard migration target is auto-updated after faiover in target shard" { + wait_for_role 6 master + # Restart R6 to trigger an auto-failover to R7 + catch {R 6 debug restart [expr 3*$node_timeout]} e + catch {I/O error reading reply} $e + # Wait for R6 to come back + after [expr 3*$node_timeout] + # Wait for R6 to become a replica + wait_for_role 6 slave + # Validate final states + wait_for_slot_state 0 "\[609->-$R7_id\]" + wait_for_slot_state 6 "\[609-<-$R0_id\]" + wait_for_slot_state 3 "\[609->-$R7_id\]" + wait_for_slot_state 7 "\[609-<-$R0_id\]" + # Restore R6's primaryship + assert_equal {OK} [R 6 cluster failover] + wait_for_role 6 master + # Validate final states + wait_for_slot_state 0 "\[609->-$R6_id\]" + wait_for_slot_state 6 "\[609-<-$R0_id\]" + wait_for_slot_state 3 "\[609->-$R6_id\]" + wait_for_slot_state 7 "\[609-<-$R0_id\]" + } + + test "Empty-shard migration source is auto-updated after source faiover in source shard" { + wait_for_role 0 master + # Restart R0 to trigger an auto-failover to R3 + catch {R 0 debug restart [expr 2*$node_timeout]} e + catch {I/O error reading reply} $e + # Wait for R0 to come back + after [expr 3*$node_timeout] + # Wait for R7 to become a replica + wait_for_role 0 slave + # Validate final states + wait_for_slot_state 0 "\[609->-$R6_id\]" + wait_for_slot_state 6 "\[609-<-$R3_id\]" + wait_for_slot_state 3 "\[609->-$R6_id\]" + wait_for_slot_state 7 "\[609-<-$R3_id\]" + # Restore R0's primaryship + assert_equal {OK} [R 0 cluster failover] + wait_for_role 0 master + # Validate final states + wait_for_slot_state 0 "\[609->-$R6_id\]" + wait_for_slot_state 6 "\[609-<-$R0_id\]" + wait_for_slot_state 3 "\[609->-$R6_id\]" + wait_for_slot_state 7 "\[609-<-$R0_id\]" + } +} + +proc migrate_slot {from to slot} { + set from_id [R $from cluster myid] + set to_id [R $to cluster myid] + assert_equal {OK} [R $from cluster setslot $slot migrating $to_id] + assert_equal {OK} [R $to cluster setslot $slot importing $from_id] +} + +start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-allow-replica-migration no cluster-node-timeout 1000} } { + + set node_timeout [lindex [R 0 config get cluster-node-timeout] 1] + set R0_id [R 0 cluster myid] + set R1_id [R 1 cluster myid] + set R2_id [R 2 cluster myid] + set R3_id [R 3 cluster myid] + set R4_id [R 4 cluster myid] + set R5_id [R 5 cluster myid] + + test "Multiple slot migration states are replicated" { + migrate_slot 0 1 13 + migrate_slot 0 1 7 + migrate_slot 0 1 17 + # Validate final states + wait_for_slot_state 0 "\[7->-$R1_id\] \[13->-$R1_id\] \[17->-$R1_id\]" + wait_for_slot_state 1 "\[7-<-$R0_id\] \[13-<-$R0_id\] \[17-<-$R0_id\]" + wait_for_slot_state 3 "\[7->-$R1_id\] \[13->-$R1_id\] \[17->-$R1_id\]" + wait_for_slot_state 4 "\[7-<-$R0_id\] \[13-<-$R0_id\] \[17-<-$R0_id\]" + } + + test "New replica inherits multiple migrating slots" { + # Reset R3 to turn it into an empty node + assert_equal {OK} [R 3 cluster reset] + # Add R3 back as a replica of R0 + assert_equal {OK} [R 3 cluster meet [srv 0 "host"] [srv 0 "port"]] + after $node_timeout + assert_equal {OK} [R 3 cluster replicate $R0_id] + wait_for_role 3 slave + # Validate final states + wait_for_slot_state 3 "\[7->-$R1_id\] \[13->-$R1_id\] \[17->-$R1_id\]" + } + + test "Slot finalization succeeds on replicas" { + # Trigger slot finalization on replicas + assert_equal {OK} [R 1 cluster setslot 7 node $R1_id replicaonly] + assert_equal {1} [R 1 wait 1 1000] + wait_for_slot_state 1 "\[7-<-$R0_id\] \[13-<-$R0_id\] \[17-<-$R0_id\]" + wait_for_slot_state 4 "\[13-<-$R0_id\] \[17-<-$R0_id\]" + assert_equal {OK} [R 1 cluster setslot 13 node $R1_id replicaonly] + assert_equal {1} [R 1 wait 1 1000] + wait_for_slot_state 1 "\[7-<-$R0_id\] \[13-<-$R0_id\] \[17-<-$R0_id\]" + wait_for_slot_state 4 "\[17-<-$R0_id\]" + assert_equal {OK} [R 1 cluster setslot 17 node $R1_id replicaonly] + assert_equal {1} [R 1 wait 1 1000] + wait_for_slot_state 1 "\[7-<-$R0_id\] \[13-<-$R0_id\] \[17-<-$R0_id\]" + wait_for_slot_state 4 "" + } + + test "Finalizing incorrect slot fails" { + catch {R 1 cluster setslot 123 node $R1_id replicaonly} e + assert_equal {ERR Slot is not open for importing} $e + } + + test "Slot migration without expected target replicas fails" { + migrate_slot 0 1 100 + # Move the target replica away + assert_equal {OK} [R 4 cluster replicate $R0_id] + after $node_timeout + # Slot finalization should fail + catch {R 1 cluster setslot 100 node $R1_id replicaonly} e + assert_equal {ERR Target node has no replicas} $e + } +} + +start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-allow-replica-migration no cluster-node-timeout 1000} } { + + set node_timeout [lindex [R 0 config get cluster-node-timeout] 1] + set R0_id [R 0 cluster myid] + set R1_id [R 1 cluster myid] + + test "Slot is auto-claimed by target after source relinquishes ownership" { + migrate_slot 0 1 609 + #Validate that R1 doesn't own slot 609 + catch {[R 1 get aga]} e + assert_equal {MOVED} [lindex [split $e] 0] + #Finalize the slot on the source first + assert_equal {OK} [R 0 cluster setslot 609 node $R1_id] + after $node_timeout + #R1 should claim slot 609 since it is still importing slot 609 + #from R0 but R0 no longer owns this slot + assert_equal {OK} [R 1 set aga foo] + } +} diff --git a/valkey.conf b/valkey.conf index 33442b340d..5081995e17 100644 --- a/valkey.conf +++ b/valkey.conf @@ -1663,8 +1663,10 @@ aof-timestamp-enabled no # cluster-migration-barrier 1 # Turning off this option allows to use less automatic cluster configuration. -# It both disables migration to orphaned masters and migration from masters -# that became empty. +# It disables migration of replicas to orphaned masters. Masters that become +# empty due to losing their last slots to another master will not automatically +# replicate from the master that took over their last slots. Instead, they will +# remain as empty masters without any slots. # # Default is 'yes' (allow automatic migrations). #