diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc index c4dd9b8c697..3850e12fc29 100644 --- a/src/cluster/cluster.cc +++ b/src/cluster/cluster.cc @@ -75,9 +75,7 @@ Status Cluster::SetNodeId(const std::string &node_id) { } // Set replication relationship - if (myself_) return SetMasterSlaveRepl(); - - return Status::OK(); + return SetMasterSlaveRepl(); } // The reason why the new version MUST be +1 of current version is that, @@ -204,11 +202,8 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b } // Set replication relationship - if (myself_) { - s = SetMasterSlaveRepl(); - if (!s.IsOK()) { - return s.Prefixed("failed to set master-replica replication"); - } + if (auto s = SetMasterSlaveRepl(); !s.IsOK()) { + return s.Prefixed("failed to set master-replica replication"); } // Clear data of migrated slots @@ -234,7 +229,13 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b Status Cluster::SetMasterSlaveRepl() { if (!srv_) return Status::OK(); - if (!myself_) return Status::OK(); + // If the node is not in the cluster topology, remove the master replication if it's a replica. + if (!myself_) { + if (auto s = srv_->RemoveMaster(); !s.IsOK()) { + return s.Prefixed("failed to remove master"); + } + return Status::OK(); + } if (myself_->role == kClusterMaster) { // Master mode diff --git a/tests/gocase/integration/cluster/cluster_test.go b/tests/gocase/integration/cluster/cluster_test.go index 560479479b6..05ec27f028f 100644 --- a/tests/gocase/integration/cluster/cluster_test.go +++ b/tests/gocase/integration/cluster/cluster_test.go @@ -154,58 +154,79 @@ func TestClusterNodes(t *testing.T) { } func TestClusterReplicas(t *testing.T) { - srv := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) - defer srv.Close() - ctx := context.Background() - rdb := srv.NewClient() - defer func() { require.NoError(t, rdb.Close()) }() + srv1 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + rdb1 := srv1.NewClient() + srv2 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + rdb2 := srv2.NewClient() + + defer func() { + srv1.Close() + srv2.Close() + require.NoError(t, rdb1.Close()) + require.NoError(t, rdb2.Close()) + }() nodes := "" master1ID := "bb2e5b3c5282086df51eff6b3e35519aede96fa6" - master1Node := fmt.Sprintf("%s %s %d master - 0-8191", master1ID, srv.Host(), srv.Port()) + master1Node := fmt.Sprintf("%s %s %d master - 0-8191", master1ID, srv1.Host(), srv1.Port()) nodes += master1Node + "\n" master2ID := "159dde1194ebf5bfc5a293dff839c3d1476f2a49" - master2Node := fmt.Sprintf("%s %s %d master - 8192-16383", master2ID, srv.Host(), srv.Port()) + master2Node := fmt.Sprintf("%s %s %d master - 8192-16383", master2ID, srv1.Host(), srv1.Port()) nodes += master2Node + "\n" replica2ID := "7dbee3d628f04cc5d763b36e92b10533e627a1d0" - replica2Node := fmt.Sprintf("%s %s %d slave %s", replica2ID, srv.Host(), srv.Port(), master2ID) + replica2Node := fmt.Sprintf("%s %s %d slave %s", replica2ID, srv2.Host(), srv2.Port(), master2ID) nodes += replica2Node - require.NoError(t, rdb.Do(ctx, "clusterx", "SETNODES", nodes, "2").Err()) - require.EqualValues(t, "2", rdb.Do(ctx, "clusterx", "version").Val()) + require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", nodes, "2").Err()) + require.EqualValues(t, "2", rdb1.Do(ctx, "clusterx", "version").Val()) + require.NoError(t, rdb2.Do(ctx, "clusterx", "SETNODES", nodes, "2").Err()) t.Run("with replicas", func(t *testing.T) { - replicas, err := rdb.Do(ctx, "cluster", "replicas", "159dde1194ebf5bfc5a293dff839c3d1476f2a49").Text() + replicas, err := rdb1.Do(ctx, "cluster", "replicas", "159dde1194ebf5bfc5a293dff839c3d1476f2a49").Text() require.NoError(t, err) fields := strings.Split(replicas, " ") require.Len(t, fields, 8) - require.Equal(t, fmt.Sprintf("%s@%d", srv.HostPort(), srv.Port()+10000), fields[1]) + require.Equal(t, fmt.Sprintf("%s@%d", srv2.HostPort(), srv2.Port()+10000), fields[1]) require.Equal(t, "slave", fields[2]) require.Equal(t, master2ID, fields[3]) require.Equal(t, "connected\n", fields[7]) }) t.Run("without replicas", func(t *testing.T) { - replicas, err := rdb.Do(ctx, "cluster", "replicas", "bb2e5b3c5282086df51eff6b3e35519aede96fa6").Text() + replicas, err := rdb1.Do(ctx, "cluster", "replicas", "bb2e5b3c5282086df51eff6b3e35519aede96fa6").Text() require.NoError(t, err) require.Empty(t, replicas) }) t.Run("send command to replica", func(t *testing.T) { - err := rdb.Do(ctx, "cluster", "replicas", "7dbee3d628f04cc5d763b36e92b10533e627a1d0").Err() + err := rdb1.Do(ctx, "cluster", "replicas", "7dbee3d628f04cc5d763b36e92b10533e627a1d0").Err() require.Error(t, err) require.Contains(t, err.Error(), "The node isn't a master") }) t.Run("unknown node", func(t *testing.T) { - err := rdb.Do(ctx, "cluster", "replicas", "unknown").Err() + err := rdb1.Do(ctx, "cluster", "replicas", "unknown").Err() require.Error(t, err) require.Contains(t, err.Error(), "Invalid cluster node id") }) + + t.Run("remove the replication if the node is not in the cluster", func(t *testing.T) { + require.Equal(t, "slave", util.FindInfoEntry(rdb2, "role")) + // remove the cluster replica node + clusterNode := fmt.Sprintf("%s\n%s", master1Node, master2Node) + err := rdb1.Do(ctx, "clusterx", "SETNODES", clusterNode, "3").Err() + require.NoError(t, err) + err = rdb2.Do(ctx, "clusterx", "SETNODES", clusterNode, "3").Err() + require.NoError(t, err) + + require.Eventually(t, func() bool { + return util.FindInfoEntry(rdb2, "role") == "master" + }, 5*time.Second, 100*time.Millisecond) + }) } func TestClusterDumpAndLoadClusterNodesInfo(t *testing.T) {