Skip to content

Commit

Permalink
fix(cluster): should remove the master if it's not node in the cluster (
Browse files Browse the repository at this point in the history
#2643)

Currently, the replica won't remove the master replication
while it's not a node in the cluster, which is an unexpected behavior
for the living master node in the cluster.

This fixes #2618.
  • Loading branch information
git-hulk authored Nov 4, 2024
1 parent 55e99f2 commit c701b98
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 24 deletions.
19 changes: 10 additions & 9 deletions src/cluster/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,7 @@ Status Cluster::SetNodeId(const std::string &node_id) {
}

// Set replication relationship
if (myself_) return SetMasterSlaveRepl();

return Status::OK();
return SetMasterSlaveRepl();
}

// The reason why the new version MUST be +1 of current version is that,
Expand Down Expand Up @@ -204,11 +202,8 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b
}

// Set replication relationship
if (myself_) {
s = SetMasterSlaveRepl();
if (!s.IsOK()) {
return s.Prefixed("failed to set master-replica replication");
}
if (auto s = SetMasterSlaveRepl(); !s.IsOK()) {
return s.Prefixed("failed to set master-replica replication");
}

// Clear data of migrated slots
Expand All @@ -234,7 +229,13 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b
Status Cluster::SetMasterSlaveRepl() {
if (!srv_) return Status::OK();

if (!myself_) return Status::OK();
// If the node is not in the cluster topology, remove the master replication if it's a replica.
if (!myself_) {
if (auto s = srv_->RemoveMaster(); !s.IsOK()) {
return s.Prefixed("failed to remove master");
}
return Status::OK();
}

if (myself_->role == kClusterMaster) {
// Master mode
Expand Down
51 changes: 36 additions & 15 deletions tests/gocase/integration/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,58 +154,79 @@ func TestClusterNodes(t *testing.T) {
}

func TestClusterReplicas(t *testing.T) {
srv := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
defer srv.Close()

ctx := context.Background()
rdb := srv.NewClient()
defer func() { require.NoError(t, rdb.Close()) }()
srv1 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
rdb1 := srv1.NewClient()
srv2 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
rdb2 := srv2.NewClient()

defer func() {
srv1.Close()
srv2.Close()
require.NoError(t, rdb1.Close())
require.NoError(t, rdb2.Close())
}()

nodes := ""

master1ID := "bb2e5b3c5282086df51eff6b3e35519aede96fa6"
master1Node := fmt.Sprintf("%s %s %d master - 0-8191", master1ID, srv.Host(), srv.Port())
master1Node := fmt.Sprintf("%s %s %d master - 0-8191", master1ID, srv1.Host(), srv1.Port())
nodes += master1Node + "\n"

master2ID := "159dde1194ebf5bfc5a293dff839c3d1476f2a49"
master2Node := fmt.Sprintf("%s %s %d master - 8192-16383", master2ID, srv.Host(), srv.Port())
master2Node := fmt.Sprintf("%s %s %d master - 8192-16383", master2ID, srv1.Host(), srv1.Port())
nodes += master2Node + "\n"

replica2ID := "7dbee3d628f04cc5d763b36e92b10533e627a1d0"
replica2Node := fmt.Sprintf("%s %s %d slave %s", replica2ID, srv.Host(), srv.Port(), master2ID)
replica2Node := fmt.Sprintf("%s %s %d slave %s", replica2ID, srv2.Host(), srv2.Port(), master2ID)
nodes += replica2Node

require.NoError(t, rdb.Do(ctx, "clusterx", "SETNODES", nodes, "2").Err())
require.EqualValues(t, "2", rdb.Do(ctx, "clusterx", "version").Val())
require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", nodes, "2").Err())
require.EqualValues(t, "2", rdb1.Do(ctx, "clusterx", "version").Val())
require.NoError(t, rdb2.Do(ctx, "clusterx", "SETNODES", nodes, "2").Err())

t.Run("with replicas", func(t *testing.T) {
replicas, err := rdb.Do(ctx, "cluster", "replicas", "159dde1194ebf5bfc5a293dff839c3d1476f2a49").Text()
replicas, err := rdb1.Do(ctx, "cluster", "replicas", "159dde1194ebf5bfc5a293dff839c3d1476f2a49").Text()
require.NoError(t, err)
fields := strings.Split(replicas, " ")
require.Len(t, fields, 8)
require.Equal(t, fmt.Sprintf("%s@%d", srv.HostPort(), srv.Port()+10000), fields[1])
require.Equal(t, fmt.Sprintf("%s@%d", srv2.HostPort(), srv2.Port()+10000), fields[1])
require.Equal(t, "slave", fields[2])
require.Equal(t, master2ID, fields[3])
require.Equal(t, "connected\n", fields[7])
})

t.Run("without replicas", func(t *testing.T) {
replicas, err := rdb.Do(ctx, "cluster", "replicas", "bb2e5b3c5282086df51eff6b3e35519aede96fa6").Text()
replicas, err := rdb1.Do(ctx, "cluster", "replicas", "bb2e5b3c5282086df51eff6b3e35519aede96fa6").Text()
require.NoError(t, err)
require.Empty(t, replicas)
})

t.Run("send command to replica", func(t *testing.T) {
err := rdb.Do(ctx, "cluster", "replicas", "7dbee3d628f04cc5d763b36e92b10533e627a1d0").Err()
err := rdb1.Do(ctx, "cluster", "replicas", "7dbee3d628f04cc5d763b36e92b10533e627a1d0").Err()
require.Error(t, err)
require.Contains(t, err.Error(), "The node isn't a master")
})

t.Run("unknown node", func(t *testing.T) {
err := rdb.Do(ctx, "cluster", "replicas", "unknown").Err()
err := rdb1.Do(ctx, "cluster", "replicas", "unknown").Err()
require.Error(t, err)
require.Contains(t, err.Error(), "Invalid cluster node id")
})

t.Run("remove the replication if the node is not in the cluster", func(t *testing.T) {
require.Equal(t, "slave", util.FindInfoEntry(rdb2, "role"))
// remove the cluster replica node
clusterNode := fmt.Sprintf("%s\n%s", master1Node, master2Node)
err := rdb1.Do(ctx, "clusterx", "SETNODES", clusterNode, "3").Err()
require.NoError(t, err)
err = rdb2.Do(ctx, "clusterx", "SETNODES", clusterNode, "3").Err()
require.NoError(t, err)

require.Eventually(t, func() bool {
return util.FindInfoEntry(rdb2, "role") == "master"
}, 5*time.Second, 100*time.Millisecond)
})
}

func TestClusterDumpAndLoadClusterNodesInfo(t *testing.T) {
Expand Down

0 comments on commit c701b98

Please sign in to comment.